1use std::io::Cursor;
2use std::time::Duration;
3use std::{collections::BTreeMap, fmt::Display};
4use std::fmt;
5use fluvio_protocol::{Decoder, Encoder, Version};
6use fluvio_protocol::record::{Offset, Record};
7use fluvio_protocol::types::Timestamp;
8
9use crate::SmartModuleRecord;
10
11pub const SMARTMODULE_TIMESTAMPS_VERSION: Version = 22;
15
16#[derive(Debug, Default, Clone, Encoder, Decoder)]
17pub struct SmartModuleExtraParams {
18 inner: BTreeMap<String, String>,
19 #[fluvio(min_version = 20)]
20 lookback: Option<Lookback>,
21}
22
23impl From<BTreeMap<String, String>> for SmartModuleExtraParams {
24 fn from(inner: BTreeMap<String, String>) -> SmartModuleExtraParams {
25 SmartModuleExtraParams {
26 inner,
27 ..Default::default()
28 }
29 }
30}
31
32impl SmartModuleExtraParams {
33 pub fn new(params: BTreeMap<String, String>, lookback: Option<Lookback>) -> Self {
34 Self {
35 inner: params,
36 lookback,
37 }
38 }
39
40 pub fn get(&self, key: &str) -> Option<&String> {
41 self.inner.get(key)
42 }
43
44 pub fn insert(&mut self, key: String, value: String) {
45 self.inner.insert(key, value);
46 }
47
48 pub fn lookback(&self) -> Option<&Lookback> {
49 self.lookback.as_ref()
50 }
51
52 pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
53 self.lookback = lookback;
54 }
55}
56
57#[derive(Debug, Default, Clone, Encoder, Decoder, PartialEq, Eq)]
58pub struct Lookback {
59 pub last: u64,
60 #[fluvio(min_version = 21)]
61 pub age: Option<Duration>,
62}
63
64impl Lookback {
65 pub fn last(last: u64) -> Self {
66 Self {
67 last,
68 ..Default::default()
69 }
70 }
71
72 pub fn age(age: Duration, last: Option<u64>) -> Self {
73 Self {
74 last: last.unwrap_or_default(),
75 age: Some(age),
76 }
77 }
78}
79
80#[derive(Debug, Default, Clone, Encoder, Decoder)]
82pub struct SmartModuleInput {
83 base_offset: Offset,
85 raw_bytes: Vec<u8>,
87 #[deprecated]
89 #[fluvio(max_version = 22)]
90 params: SmartModuleExtraParams,
91 #[fluvio(min_version = 16, max_version = 22)]
92 join_record: Vec<u8>,
93 #[fluvio(min_version = 22)]
95 base_timestamp: Timestamp,
96}
97
98impl SmartModuleInput {
99 pub fn new(raw_bytes: Vec<u8>, base_offset: Offset, base_timestamp: Timestamp) -> Self {
100 Self {
101 base_offset,
102 raw_bytes,
103 base_timestamp,
104 ..Default::default()
105 }
106 }
107
108 pub fn base_offset(&self) -> Offset {
109 self.base_offset
110 }
111
112 pub fn set_base_offset(&mut self, base_offset: Offset) {
113 self.base_offset = base_offset;
114 }
115
116 pub fn base_timestamp(&self) -> Timestamp {
117 self.base_timestamp
118 }
119
120 pub fn set_base_timestamp(&mut self, base_timestamp: Timestamp) {
121 self.base_timestamp = base_timestamp;
122 }
123
124 pub fn raw_bytes(&self) -> &[u8] {
125 &self.raw_bytes
126 }
127
128 pub fn into_raw_bytes(self) -> Vec<u8> {
129 self.raw_bytes
130 }
131
132 pub fn parts(self) -> (Vec<u8>, Vec<u8>) {
133 (self.raw_bytes, self.join_record)
134 }
135
136 #[deprecated = "use SmartModuleRecord instead. Read more here: https://www.fluvio.io/smartmodules/smdk/smartmodulerecord/."]
140 pub fn try_into_records(mut self, version: Version) -> Result<Vec<Record>, std::io::Error> {
141 Decoder::decode_from(&mut Cursor::new(&mut self.raw_bytes), version)
142 }
143
144 pub fn try_into_smartmodule_records(
148 self,
149 version: Version,
150 ) -> Result<Vec<SmartModuleRecord>, std::io::Error> {
151 let base_offset = self.base_offset();
152 let base_timestamp = self.base_timestamp();
153 let records_input = self.into_raw_bytes();
154 let mut records: Vec<Record> = vec![];
155
156 Decoder::decode(&mut records, &mut Cursor::new(records_input), version)?;
157
158 let records = records
159 .into_iter()
160 .map(|inner_record| SmartModuleRecord {
161 inner_record,
162 base_offset,
163 base_timestamp,
164 })
165 .collect();
166
167 Ok(records)
168 }
169
170 pub fn try_from_records(
173 records: Vec<Record>,
174 version: Version,
175 ) -> Result<Self, std::io::Error> {
176 let mut raw_bytes = Vec::new();
177
178 records.encode(&mut raw_bytes, version)?;
179
180 Ok(SmartModuleInput {
181 raw_bytes,
182 ..Default::default()
183 })
184 }
185}
186
187impl Display for SmartModuleInput {
188 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
189 write!(
190 f,
191 "SmartModuleInput {{ base_offset: {:?}, base_timestamp: {:?}, record_data: {:?}, join_data: {:#?} }}",
192 self.base_offset,
193 self.base_timestamp,
194 self.raw_bytes.len(),
195 self.join_record.len()
196 )
197 }
198}
199
200#[derive(Debug, Default, Clone, Encoder, Decoder)]
202pub struct SmartModuleAggregateInput {
203 pub base: SmartModuleInput,
205 pub accumulator: Vec<u8>,
207}
208
209#[derive(Debug, Default, Clone, Encoder, Decoder)]
211pub struct SmartModuleInitInput {
212 pub params: SmartModuleExtraParams,
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218
219 #[test]
220 fn test_record_to_sm_input_and_back() {
221 let records = vec![
223 Record::new("apple"),
224 Record::new("fruit"),
225 Record::new("banana"),
226 ];
227
228 #[allow(deprecated)]
230 let sm_input: SmartModuleInput =
231 SmartModuleInput::try_from_records(records, SMARTMODULE_TIMESTAMPS_VERSION)
232 .expect("records to input conversion failed");
233
234 #[allow(deprecated)]
235 let records_decoded: Vec<Record> = sm_input
236 .try_into_records(SMARTMODULE_TIMESTAMPS_VERSION)
237 .expect("input to records conversion failed");
238
239 assert_eq!(records_decoded[0].value.as_ref(), b"apple");
241 assert_eq!(records_decoded[1].value.as_ref(), b"fruit");
242 assert_eq!(records_decoded[2].value.as_ref(), b"banana");
243 }
244
245 #[test]
246 fn sets_the_provided_value_as_timestamp() {
247 let mut sm_input = SmartModuleInput::new(vec![0, 1, 2, 3], 0, 0);
248
249 assert_eq!(sm_input.base_timestamp, 0);
250 assert_eq!(sm_input.base_timestamp(), 0);
251
252 sm_input.set_base_timestamp(1234);
253
254 assert_eq!(sm_input.base_timestamp, 1234);
255 assert_eq!(sm_input.base_timestamp(), 1234);
256 }
257}