fluvio_dataplane_protocol/
smartmodule.rs

1pub use encoding::{
2    SmartModuleRuntimeError, SmartModuleInternalError, SmartModuleKind, SmartModuleInput,
3    SmartModuleAggregateInput, SmartModuleOutput, SmartModuleExtraParams,
4    SmartModuleAggregateOutput,
5};
6
7mod encoding {
8    use std::fmt::{self, Display};
9    use crate::Offset;
10    use crate::record::{Record, RecordData};
11    use fluvio_protocol::{Encoder, Decoder};
12    use std::collections::BTreeMap;
13
14    #[derive(Debug, Default, Clone, Encoder, Decoder)]
15    pub struct SmartModuleExtraParams {
16        inner: BTreeMap<String, String>,
17    }
18
19    impl From<BTreeMap<String, String>> for SmartModuleExtraParams {
20        fn from(inner: BTreeMap<String, String>) -> SmartModuleExtraParams {
21            SmartModuleExtraParams { inner }
22        }
23    }
24
25    impl SmartModuleExtraParams {
26        pub fn get(&self, key: &str) -> Option<&String> {
27            self.inner.get(key)
28        }
29    }
30
31    /// Common data that gets passed as input to every SmartModule WASM module
32    #[derive(Debug, Default, Clone, Encoder, Decoder)]
33    pub struct SmartModuleInput {
34        /// The base offset of this batch of records
35        pub base_offset: Offset,
36        /// The records for the SmartModule to process
37        pub record_data: Vec<u8>,
38        pub params: SmartModuleExtraParams,
39        #[fluvio(min_version = 16)]
40        pub join_record: Vec<u8>,
41    }
42    impl std::convert::TryFrom<Vec<Record>> for SmartModuleInput {
43        type Error = std::io::Error;
44        fn try_from(records: Vec<Record>) -> Result<Self, Self::Error> {
45            let mut record_data = Vec::new();
46            records.encode(&mut record_data, 0)?;
47            Ok(SmartModuleInput {
48                record_data,
49                ..Default::default()
50            })
51        }
52    }
53
54    impl Display for SmartModuleInput {
55        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56            write!(
57                f,
58                "SmartModuleInput {{ base_offset: {:?}, record_data: {:?}, join_data: {:#?} }}",
59                self.base_offset,
60                self.record_data.len(),
61                self.join_record.len()
62            )
63        }
64    }
65
66    /// A type to pass input to an Aggregate SmartModule WASM module
67    #[derive(Debug, Default, Clone, Encoder, Decoder)]
68    pub struct SmartModuleAggregateInput {
69        /// The base input required by all SmartModules
70        pub base: SmartModuleInput,
71        /// The current value of the Aggregate's accumulator
72        pub accumulator: Vec<u8>,
73    }
74    /// A type used to return processed records and/or an error from a SmartModule
75    #[derive(Debug, Default, Encoder, Decoder)]
76    pub struct SmartModuleOutput {
77        /// The successfully processed output Records
78        pub successes: Vec<Record>,
79        /// Any runtime error if one was encountered
80        pub error: Option<SmartModuleRuntimeError>,
81    }
82
83    /// A type used to return processed records and/or an error from an Aggregate SmartModule
84    #[derive(Debug, Default, Encoder, Decoder)]
85    pub struct SmartModuleAggregateOutput {
86        /// The base output required by all SmartModules
87        pub base: SmartModuleOutput,
88        #[fluvio(min_version = 16)]
89        pub accumulator: Vec<u8>,
90    }
91
92    /// Indicates an internal error from within a SmartModule.
93    //
94    // The presence of one of these errors most likely indicates a logic bug.
95    // This error type is `#[repr(i32)]` because these errors are returned
96    // as the raw return type of a SmartModule WASM function, i.e. the return
97    // type in `extern "C" fn filter(ptr, len) -> i32`. Positive return values
98    // indicate the numbers of records, and negative values indicate various
99    // types of errors.
100    //
101    // THEREFORE, THE DISCRIMINANTS FOR ALL VARIANTS ON THIS TYPE MUST BE NEGATIVE
102    #[repr(i32)]
103    #[derive(thiserror::Error, Debug, Clone, PartialEq, Encoder, Decoder)]
104    #[non_exhaustive]
105    #[fluvio(encode_discriminant)]
106    pub enum SmartModuleInternalError {
107        #[error("encountered unknown error during SmartModule processing")]
108        UnknownError = -1,
109        #[error("failed to decode SmartModule base input")]
110        DecodingBaseInput = -11,
111        #[error("failed to decode SmartModule record input")]
112        DecodingRecords = -22,
113        #[error("failed to encode SmartModule output")]
114        EncodingOutput = -33,
115        #[error("failed to parse SmartModule extra params")]
116        ParsingExtraParams = -44,
117        #[error("undefined right record in Join SmartModule")]
118        UndefinedRightRecord = -55,
119        #[error("Init params are not found")]
120        InitParamsNotFound = -60,
121    }
122
123    impl Default for SmartModuleInternalError {
124        fn default() -> Self {
125            Self::UnknownError
126        }
127    }
128
129    /// A type used to capture and serialize errors from within a SmartModule
130    #[derive(thiserror::Error, Debug, Default, Clone, PartialEq, Encoder, Decoder)]
131    pub struct SmartModuleRuntimeError {
132        /// Error hint: meant for users, not for code
133        pub hint: String,
134        /// The offset of the Record that had a runtime error
135        pub offset: Offset,
136        /// The type of SmartModule that had a runtime error
137        pub kind: SmartModuleKind,
138        /// The Record key that caused this error
139        pub record_key: Option<RecordData>,
140        /// The Record value that caused this error
141        pub record_value: RecordData,
142    }
143
144    impl SmartModuleRuntimeError {
145        pub fn new(
146            record: &Record,
147            base_offset: Offset,
148            kind: SmartModuleKind,
149            error: eyre::Error,
150        ) -> Self {
151            let hint = format!("{:?}", error);
152            let offset = base_offset + record.preamble.offset_delta();
153            let record_key = record.key.clone();
154            let record_value = record.value.clone();
155            Self {
156                hint,
157                offset,
158                kind,
159                record_key,
160                record_value,
161            }
162        }
163    }
164
165    impl fmt::Display for SmartModuleRuntimeError {
166        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167            let key = self
168                .record_key
169                .as_ref()
170                .map(display_record_data)
171                .unwrap_or_else(|| "NULL".to_string());
172            let value = display_record_data(&self.record_value);
173            write!(
174                f,
175                "{}\n\n\
176                SmartModule Info: \n    \
177                Type: {}\n    \
178                Offset: {}\n    \
179                Key: {}\n    \
180                Value: {}",
181                self.hint, self.kind, self.offset, key, value,
182            )
183        }
184    }
185
186    fn display_record_data(record: &RecordData) -> String {
187        match std::str::from_utf8(record.as_ref()) {
188            Ok(s) => s.to_string(),
189            _ => format!("Binary: {} bytes", record.as_ref().len()),
190        }
191    }
192
193    #[derive(Debug, Clone, PartialEq, Encoder, Decoder)]
194    pub enum SmartModuleKind {
195        Filter,
196        Map,
197        #[fluvio(min_version = 15)]
198        ArrayMap,
199        #[fluvio(min_version = 13)]
200        Aggregate,
201        #[fluvio(min_version = 16)]
202        FilterMap,
203        #[fluvio(min_version = 16)]
204        Join,
205        #[fluvio(min_version = 17)]
206        Generic,
207    }
208
209    impl Default for SmartModuleKind {
210        fn default() -> Self {
211            Self::Filter
212        }
213    }
214
215    impl fmt::Display for SmartModuleKind {
216        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217            // Use Debug for Display to print variant name
218            fmt::Debug::fmt(self, f)
219        }
220    }
221}