fluvio_spu_schema/server/
smartmodule.rs1#![allow(deprecated)]
2
3use std::io::Read;
4use std::io;
5use std::fmt::{Debug, self};
6
7use flate2::{
8 Compression,
9 bufread::{GzEncoder, GzDecoder},
10};
11
12use fluvio_protocol::{Encoder, Decoder};
13use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
14
15#[derive(Debug, Default, Clone, Encoder, Decoder)]
20pub struct SmartModuleInvocation {
21 pub wasm: SmartModuleInvocationWasm,
22 pub kind: SmartModuleKind,
23 pub params: SmartModuleExtraParams,
24}
25
26#[derive(Clone, Encoder, Decoder)]
27pub enum SmartModuleInvocationWasm {
28 #[fluvio(tag = 0)]
30 Predefined(String),
31 #[fluvio(tag = 1)]
33 AdHoc(Vec<u8>),
34}
35
36impl SmartModuleInvocationWasm {
37 pub fn adhoc_from_bytes(bytes: &[u8]) -> io::Result<Self> {
38 Ok(Self::AdHoc(zip(bytes)?))
39 }
40
41 pub fn into_raw(self) -> io::Result<Vec<u8>> {
43 match self {
44 Self::AdHoc(gzipped) => Ok(unzip(gzipped.as_ref())?),
45 _ => Err(io::Error::new(
46 io::ErrorKind::InvalidData,
47 "unable to represent as raw data",
48 )),
49 }
50 }
51}
52
53impl Default for SmartModuleInvocationWasm {
54 fn default() -> Self {
55 Self::AdHoc(Vec::new())
56 }
57}
58
59impl Debug for SmartModuleInvocationWasm {
60 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61 match self {
62 Self::Predefined(module) => write!(f, "Predefined{module}"),
63 Self::AdHoc(bytes) => f
64 .debug_tuple("Adhoc")
65 .field(&format!("{} bytes", bytes.len()))
66 .finish(),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Encoder, Decoder, Default)]
73pub enum SmartModuleKind {
74 #[default]
75 #[fluvio(tag = 0)]
76 Filter,
77 #[fluvio(tag = 1)]
78 Map,
79 #[fluvio(tag = 2)]
80 #[fluvio(min_version = ARRAY_MAP_WASM_API)]
81 ArrayMap,
82 #[fluvio(tag = 3)]
83 Aggregate { accumulator: Vec<u8> },
84 #[fluvio(tag = 4)]
85 #[fluvio(min_version = ARRAY_MAP_WASM_API)]
86 FilterMap,
87 #[fluvio(tag = 5)]
88 #[fluvio(min_version = SMART_MODULE_API, max_version = CHAIN_SMARTMODULE_API)]
89 Join(String),
90 #[fluvio(tag = 6)]
91 #[fluvio(min_version = SMART_MODULE_API, max_version = CHAIN_SMARTMODULE_API)]
92 JoinStream {
93 topic: String,
94 derivedstream: String,
95 },
96 #[fluvio(tag = 7)]
97 #[fluvio(min_version = GENERIC_SMARTMODULE_API)]
98 Generic(SmartModuleContextData),
99}
100
101impl std::fmt::Display for SmartModuleKind {
102 fn fmt(&self, out: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
103 let name = match self {
104 SmartModuleKind::Filter => "filter",
105 SmartModuleKind::Map => "map",
106 SmartModuleKind::ArrayMap => "array_map",
107 SmartModuleKind::Aggregate { .. } => "aggregate",
108 SmartModuleKind::FilterMap => "filter_map",
109 SmartModuleKind::Join(..) => "join",
110 SmartModuleKind::JoinStream { .. } => "join_stream",
111 SmartModuleKind::Generic(..) => "smartmodule",
112 };
113 out.write_str(name)
114 }
115}
116
117#[derive(Debug, Clone, Encoder, Decoder, Default)]
118pub enum SmartModuleContextData {
119 #[default]
120 #[fluvio(tag = 0)]
121 None,
122 #[fluvio(tag = 1)]
123 Aggregate { accumulator: Vec<u8> },
124 #[fluvio(tag = 2)]
125 Join(String),
126 #[fluvio(tag = 3)]
127 JoinStream {
128 topic: String,
129 derivedstream: String,
130 },
131}
132
133fn zip(raw: &[u8]) -> io::Result<Vec<u8>> {
134 let mut encoder = GzEncoder::new(raw, Compression::default());
135 let mut buffer = Vec::with_capacity(raw.len());
136 encoder.read_to_end(&mut buffer)?;
137 Ok(buffer)
138}
139
140fn unzip(compressed: &[u8]) -> io::Result<Vec<u8>> {
141 let mut decoder = GzDecoder::new(compressed);
142 let mut buffer = Vec::with_capacity(compressed.len());
143 decoder.read_to_end(&mut buffer)?;
144 Ok(buffer)
145}
146
147#[cfg(test)]
148mod tests {
149
150 use super::*;
151
152 #[test]
153 fn test_encode_smartmodulekind() {
154 let mut dest = Vec::new();
155 let value: SmartModuleKind = SmartModuleKind::Filter;
156 value.encode(&mut dest, 0).expect("should encode");
157 assert_eq!(dest.len(), 1);
158 assert_eq!(dest[0], 0x00);
159 }
160
161 #[test]
162 fn test_decode_smartmodulekind() {
163 let bytes = vec![0x01];
164 let mut value: SmartModuleKind = Default::default();
165 value
166 .decode(&mut io::Cursor::new(bytes), 0)
167 .expect("should decode");
168 assert!(matches!(value, SmartModuleKind::Map));
169 }
170
171 #[test]
172 fn test_gzip_smartmoduleinvocationwasm() {
173 let bytes = vec![0xde, 0xad, 0xbe, 0xef];
174 let value: SmartModuleInvocationWasm =
175 SmartModuleInvocationWasm::adhoc_from_bytes(&bytes).expect("should encode");
176 if let SmartModuleInvocationWasm::AdHoc(compressed_bytes) = value {
177 let decompressed_bytes = unzip(&compressed_bytes).expect("should decompress");
178 assert_eq!(decompressed_bytes, bytes);
179 } else {
180 panic!("not adhoc")
181 }
182 }
183}