fluvio_smartmodule/
input.rs

1use std::io::Cursor;
2use std::time::Duration;
3use std::{collections::BTreeMap, fmt::Display};
4use std::fmt;
5use fluvio_protocol::{Decoder, Encoder, Version};
6use fluvio_protocol::record::{Offset, Record};
7use fluvio_protocol::types::Timestamp;
8
9use crate::SmartModuleRecord;
10
11/// SmartModule Version with support for Lookback with Age and Timestamps,
12/// LTA is the acronym for Lookback, Timestamps, and Age.
13/// This version is used for encoding and decoding [`SmartModuleInput`]
14pub const SMARTMODULE_TIMESTAMPS_VERSION: Version = 22;
15
16#[derive(Debug, Default, Clone, Encoder, Decoder)]
17pub struct SmartModuleExtraParams {
18    inner: BTreeMap<String, String>,
19    #[fluvio(min_version = 20)]
20    lookback: Option<Lookback>,
21}
22
23impl From<BTreeMap<String, String>> for SmartModuleExtraParams {
24    fn from(inner: BTreeMap<String, String>) -> SmartModuleExtraParams {
25        SmartModuleExtraParams {
26            inner,
27            ..Default::default()
28        }
29    }
30}
31
32impl SmartModuleExtraParams {
33    pub fn new(params: BTreeMap<String, String>, lookback: Option<Lookback>) -> Self {
34        Self {
35            inner: params,
36            lookback,
37        }
38    }
39
40    pub fn get(&self, key: &str) -> Option<&String> {
41        self.inner.get(key)
42    }
43
44    pub fn insert(&mut self, key: String, value: String) {
45        self.inner.insert(key, value);
46    }
47
48    pub fn lookback(&self) -> Option<&Lookback> {
49        self.lookback.as_ref()
50    }
51
52    pub fn set_lookback(&mut self, lookback: Option<Lookback>) {
53        self.lookback = lookback;
54    }
55}
56
57#[derive(Debug, Default, Clone, Encoder, Decoder, PartialEq, Eq)]
58pub struct Lookback {
59    pub last: u64,
60    #[fluvio(min_version = 21)]
61    pub age: Option<Duration>,
62}
63
64impl Lookback {
65    pub fn last(last: u64) -> Self {
66        Self {
67            last,
68            ..Default::default()
69        }
70    }
71
72    pub fn age(age: Duration, last: Option<u64>) -> Self {
73        Self {
74            last: last.unwrap_or_default(),
75            age: Some(age),
76        }
77    }
78}
79
80/// A single SmartModule input record
81#[derive(Debug, Default, Clone, Encoder, Decoder)]
82pub struct SmartModuleInput {
83    /// The base offset of this batch of records
84    base_offset: Offset,
85    /// encoded version of Record
86    raw_bytes: Vec<u8>,
87    /// This is deprecrated, extra parameters should not be passed, they will be removed in the future
88    #[deprecated]
89    #[fluvio(max_version = 22)]
90    params: SmartModuleExtraParams,
91    #[fluvio(min_version = 16, max_version = 22)]
92    join_record: Vec<u8>,
93    /// The base timestamp of this batch of records
94    #[fluvio(min_version = 22)]
95    base_timestamp: Timestamp,
96}
97
98impl SmartModuleInput {
99    pub fn new(raw_bytes: Vec<u8>, base_offset: Offset, base_timestamp: Timestamp) -> Self {
100        Self {
101            base_offset,
102            raw_bytes,
103            base_timestamp,
104            ..Default::default()
105        }
106    }
107
108    pub fn base_offset(&self) -> Offset {
109        self.base_offset
110    }
111
112    pub fn set_base_offset(&mut self, base_offset: Offset) {
113        self.base_offset = base_offset;
114    }
115
116    pub fn base_timestamp(&self) -> Timestamp {
117        self.base_timestamp
118    }
119
120    pub fn set_base_timestamp(&mut self, base_timestamp: Timestamp) {
121        self.base_timestamp = base_timestamp;
122    }
123
124    pub fn raw_bytes(&self) -> &[u8] {
125        &self.raw_bytes
126    }
127
128    pub fn into_raw_bytes(self) -> Vec<u8> {
129        self.raw_bytes
130    }
131
132    pub fn parts(self) -> (Vec<u8>, Vec<u8>) {
133        (self.raw_bytes, self.join_record)
134    }
135
136    /// Creates an instance of [`Record`] from the raw bytes and ignoring the
137    /// base offset and timestamp. This method is used to keep backwards
138    /// compatibility with SmartModule engines previous to Version `21`.
139    #[deprecated = "use SmartModuleRecord instead. Read more here: https://www.fluvio.io/smartmodules/smdk/smartmodulerecord/."]
140    pub fn try_into_records(mut self, version: Version) -> Result<Vec<Record>, std::io::Error> {
141        Decoder::decode_from(&mut Cursor::new(&mut self.raw_bytes), version)
142    }
143
144    /// Attempts to map the internally encoded records into a vector of
145    /// [`SmartModuleRecord`] by decoding the raw bytes and filling up the base
146    /// offset and timestamp fields.
147    pub fn try_into_smartmodule_records(
148        self,
149        version: Version,
150    ) -> Result<Vec<SmartModuleRecord>, std::io::Error> {
151        let base_offset = self.base_offset();
152        let base_timestamp = self.base_timestamp();
153        let records_input = self.into_raw_bytes();
154        let mut records: Vec<Record> = vec![];
155
156        Decoder::decode(&mut records, &mut Cursor::new(records_input), version)?;
157
158        let records = records
159            .into_iter()
160            .map(|inner_record| SmartModuleRecord {
161                inner_record,
162                base_offset,
163                base_timestamp,
164            })
165            .collect();
166
167        Ok(records)
168    }
169
170    /// Attempts to map the [`Record`] vector and build a `SmartModuleInput`
171    /// instance from it.
172    pub fn try_from_records(
173        records: Vec<Record>,
174        version: Version,
175    ) -> Result<Self, std::io::Error> {
176        let mut raw_bytes = Vec::new();
177
178        records.encode(&mut raw_bytes, version)?;
179
180        Ok(SmartModuleInput {
181            raw_bytes,
182            ..Default::default()
183        })
184    }
185}
186
187impl Display for SmartModuleInput {
188    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
189        write!(
190            f,
191            "SmartModuleInput {{ base_offset: {:?}, base_timestamp: {:?}, record_data: {:?}, join_data: {:#?} }}",
192            self.base_offset,
193            self.base_timestamp,
194            self.raw_bytes.len(),
195            self.join_record.len()
196        )
197    }
198}
199
200/// A type to pass input to an Aggregate SmartModule WASM module
201#[derive(Debug, Default, Clone, Encoder, Decoder)]
202pub struct SmartModuleAggregateInput {
203    /// The base input required by all SmartModules
204    pub base: SmartModuleInput,
205    /// The current value of the Aggregate's accumulator
206    pub accumulator: Vec<u8>,
207}
208
209/// Input to SmartModule Init
210#[derive(Debug, Default, Clone, Encoder, Decoder)]
211pub struct SmartModuleInitInput {
212    pub params: SmartModuleExtraParams,
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218
219    #[test]
220    fn test_record_to_sm_input_and_back() {
221        //given
222        let records = vec![
223            Record::new("apple"),
224            Record::new("fruit"),
225            Record::new("banana"),
226        ];
227
228        //when
229        #[allow(deprecated)]
230        let sm_input: SmartModuleInput =
231            SmartModuleInput::try_from_records(records, SMARTMODULE_TIMESTAMPS_VERSION)
232                .expect("records to input conversion failed");
233
234        #[allow(deprecated)]
235        let records_decoded: Vec<Record> = sm_input
236            .try_into_records(SMARTMODULE_TIMESTAMPS_VERSION)
237            .expect("input to records conversion failed");
238
239        //then
240        assert_eq!(records_decoded[0].value.as_ref(), b"apple");
241        assert_eq!(records_decoded[1].value.as_ref(), b"fruit");
242        assert_eq!(records_decoded[2].value.as_ref(), b"banana");
243    }
244
245    #[test]
246    fn sets_the_provided_value_as_timestamp() {
247        let mut sm_input = SmartModuleInput::new(vec![0, 1, 2, 3], 0, 0);
248
249        assert_eq!(sm_input.base_timestamp, 0);
250        assert_eq!(sm_input.base_timestamp(), 0);
251
252        sm_input.set_base_timestamp(1234);
253
254        assert_eq!(sm_input.base_timestamp, 1234);
255        assert_eq!(sm_input.base_timestamp(), 1234);
256    }
257}