fluvio_dataplane_protocol/
smartmodule.rs1pub use encoding::{
2 SmartModuleRuntimeError, SmartModuleInternalError, SmartModuleKind, SmartModuleInput,
3 SmartModuleAggregateInput, SmartModuleOutput, SmartModuleExtraParams,
4 SmartModuleAggregateOutput,
5};
6
7mod encoding {
8 use std::fmt::{self, Display};
9 use crate::Offset;
10 use crate::record::{Record, RecordData};
11 use fluvio_protocol::{Encoder, Decoder};
12 use std::collections::BTreeMap;
13
14 #[derive(Debug, Default, Clone, Encoder, Decoder)]
15 pub struct SmartModuleExtraParams {
16 inner: BTreeMap<String, String>,
17 }
18
19 impl From<BTreeMap<String, String>> for SmartModuleExtraParams {
20 fn from(inner: BTreeMap<String, String>) -> SmartModuleExtraParams {
21 SmartModuleExtraParams { inner }
22 }
23 }
24
25 impl SmartModuleExtraParams {
26 pub fn get(&self, key: &str) -> Option<&String> {
27 self.inner.get(key)
28 }
29 }
30
31 #[derive(Debug, Default, Clone, Encoder, Decoder)]
33 pub struct SmartModuleInput {
34 pub base_offset: Offset,
36 pub record_data: Vec<u8>,
38 pub params: SmartModuleExtraParams,
39 #[fluvio(min_version = 16)]
40 pub join_record: Vec<u8>,
41 }
42 impl std::convert::TryFrom<Vec<Record>> for SmartModuleInput {
43 type Error = std::io::Error;
44 fn try_from(records: Vec<Record>) -> Result<Self, Self::Error> {
45 let mut record_data = Vec::new();
46 records.encode(&mut record_data, 0)?;
47 Ok(SmartModuleInput {
48 record_data,
49 ..Default::default()
50 })
51 }
52 }
53
54 impl Display for SmartModuleInput {
55 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
56 write!(
57 f,
58 "SmartModuleInput {{ base_offset: {:?}, record_data: {:?}, join_data: {:#?} }}",
59 self.base_offset,
60 self.record_data.len(),
61 self.join_record.len()
62 )
63 }
64 }
65
66 #[derive(Debug, Default, Clone, Encoder, Decoder)]
68 pub struct SmartModuleAggregateInput {
69 pub base: SmartModuleInput,
71 pub accumulator: Vec<u8>,
73 }
74 #[derive(Debug, Default, Encoder, Decoder)]
76 pub struct SmartModuleOutput {
77 pub successes: Vec<Record>,
79 pub error: Option<SmartModuleRuntimeError>,
81 }
82
83 #[derive(Debug, Default, Encoder, Decoder)]
85 pub struct SmartModuleAggregateOutput {
86 pub base: SmartModuleOutput,
88 #[fluvio(min_version = 16)]
89 pub accumulator: Vec<u8>,
90 }
91
92 #[repr(i32)]
103 #[derive(thiserror::Error, Debug, Clone, PartialEq, Encoder, Decoder)]
104 #[non_exhaustive]
105 #[fluvio(encode_discriminant)]
106 pub enum SmartModuleInternalError {
107 #[error("encountered unknown error during SmartModule processing")]
108 UnknownError = -1,
109 #[error("failed to decode SmartModule base input")]
110 DecodingBaseInput = -11,
111 #[error("failed to decode SmartModule record input")]
112 DecodingRecords = -22,
113 #[error("failed to encode SmartModule output")]
114 EncodingOutput = -33,
115 #[error("failed to parse SmartModule extra params")]
116 ParsingExtraParams = -44,
117 #[error("undefined right record in Join SmartModule")]
118 UndefinedRightRecord = -55,
119 #[error("Init params are not found")]
120 InitParamsNotFound = -60,
121 }
122
123 impl Default for SmartModuleInternalError {
124 fn default() -> Self {
125 Self::UnknownError
126 }
127 }
128
129 #[derive(thiserror::Error, Debug, Default, Clone, PartialEq, Encoder, Decoder)]
131 pub struct SmartModuleRuntimeError {
132 pub hint: String,
134 pub offset: Offset,
136 pub kind: SmartModuleKind,
138 pub record_key: Option<RecordData>,
140 pub record_value: RecordData,
142 }
143
144 impl SmartModuleRuntimeError {
145 pub fn new(
146 record: &Record,
147 base_offset: Offset,
148 kind: SmartModuleKind,
149 error: eyre::Error,
150 ) -> Self {
151 let hint = format!("{:?}", error);
152 let offset = base_offset + record.preamble.offset_delta();
153 let record_key = record.key.clone();
154 let record_value = record.value.clone();
155 Self {
156 hint,
157 offset,
158 kind,
159 record_key,
160 record_value,
161 }
162 }
163 }
164
165 impl fmt::Display for SmartModuleRuntimeError {
166 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
167 let key = self
168 .record_key
169 .as_ref()
170 .map(display_record_data)
171 .unwrap_or_else(|| "NULL".to_string());
172 let value = display_record_data(&self.record_value);
173 write!(
174 f,
175 "{}\n\n\
176 SmartModule Info: \n \
177 Type: {}\n \
178 Offset: {}\n \
179 Key: {}\n \
180 Value: {}",
181 self.hint, self.kind, self.offset, key, value,
182 )
183 }
184 }
185
186 fn display_record_data(record: &RecordData) -> String {
187 match std::str::from_utf8(record.as_ref()) {
188 Ok(s) => s.to_string(),
189 _ => format!("Binary: {} bytes", record.as_ref().len()),
190 }
191 }
192
193 #[derive(Debug, Clone, PartialEq, Encoder, Decoder)]
194 pub enum SmartModuleKind {
195 Filter,
196 Map,
197 #[fluvio(min_version = 15)]
198 ArrayMap,
199 #[fluvio(min_version = 13)]
200 Aggregate,
201 #[fluvio(min_version = 16)]
202 FilterMap,
203 #[fluvio(min_version = 16)]
204 Join,
205 #[fluvio(min_version = 17)]
206 Generic,
207 }
208
209 impl Default for SmartModuleKind {
210 fn default() -> Self {
211 Self::Filter
212 }
213 }
214
215 impl fmt::Display for SmartModuleKind {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 fmt::Debug::fmt(self, f)
219 }
220 }
221}