fluvio_spu_schema/server/
smartmodule.rs

1#![allow(deprecated)]
2
3use std::io::Read;
4use std::io;
5use std::fmt::{Debug, self};
6
7use flate2::{
8    Compression,
9    bufread::{GzEncoder, GzDecoder},
10};
11
12use fluvio_protocol::{Encoder, Decoder};
13use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
14
15/// The request payload when using a Consumer SmartModule.
16///
17/// This includes the WASM module name as well as the invocation being used.
18/// It also carries any data that is required for specific invocations of SmartModules.
19#[derive(Debug, Default, Clone, Encoder, Decoder)]
20pub struct SmartModuleInvocation {
21    pub wasm: SmartModuleInvocationWasm,
22    pub kind: SmartModuleKind,
23    pub params: SmartModuleExtraParams,
24}
25
26#[derive(Clone, Encoder, Decoder)]
27pub enum SmartModuleInvocationWasm {
28    /// Name of SmartModule
29    #[fluvio(tag = 0)]
30    Predefined(String),
31    /// Compressed WASM module payload using Gzip
32    #[fluvio(tag = 1)]
33    AdHoc(Vec<u8>),
34}
35
36impl SmartModuleInvocationWasm {
37    pub fn adhoc_from_bytes(bytes: &[u8]) -> io::Result<Self> {
38        Ok(Self::AdHoc(zip(bytes)?))
39    }
40
41    /// consume and get the raw bytes of the WASM module
42    pub fn into_raw(self) -> io::Result<Vec<u8>> {
43        match self {
44            Self::AdHoc(gzipped) => Ok(unzip(gzipped.as_ref())?),
45            _ => Err(io::Error::new(
46                io::ErrorKind::InvalidData,
47                "unable to represent as raw data",
48            )),
49        }
50    }
51}
52
53impl Default for SmartModuleInvocationWasm {
54    fn default() -> Self {
55        Self::AdHoc(Vec::new())
56    }
57}
58
59impl Debug for SmartModuleInvocationWasm {
60    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61        match self {
62            Self::Predefined(module) => write!(f, "Predefined{module}"),
63            Self::AdHoc(bytes) => f
64                .debug_tuple("Adhoc")
65                .field(&format!("{} bytes", bytes.len()))
66                .finish(),
67        }
68    }
69}
70
71/// Indicates the type of SmartModule as well as any special data required
72#[derive(Debug, Clone, Encoder, Decoder, Default)]
73pub enum SmartModuleKind {
74    #[default]
75    #[fluvio(tag = 0)]
76    Filter,
77    #[fluvio(tag = 1)]
78    Map,
79    #[fluvio(tag = 2)]
80    #[fluvio(min_version = ARRAY_MAP_WASM_API)]
81    ArrayMap,
82    #[fluvio(tag = 3)]
83    Aggregate { accumulator: Vec<u8> },
84    #[fluvio(tag = 4)]
85    #[fluvio(min_version = ARRAY_MAP_WASM_API)]
86    FilterMap,
87    #[fluvio(tag = 5)]
88    #[fluvio(min_version = SMART_MODULE_API, max_version = CHAIN_SMARTMODULE_API)]
89    Join(String),
90    #[fluvio(tag = 6)]
91    #[fluvio(min_version = SMART_MODULE_API, max_version = CHAIN_SMARTMODULE_API)]
92    JoinStream {
93        topic: String,
94        derivedstream: String,
95    },
96    #[fluvio(tag = 7)]
97    #[fluvio(min_version = GENERIC_SMARTMODULE_API)]
98    Generic(SmartModuleContextData),
99}
100
101impl std::fmt::Display for SmartModuleKind {
102    fn fmt(&self, out: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
103        let name = match self {
104            SmartModuleKind::Filter => "filter",
105            SmartModuleKind::Map => "map",
106            SmartModuleKind::ArrayMap => "array_map",
107            SmartModuleKind::Aggregate { .. } => "aggregate",
108            SmartModuleKind::FilterMap => "filter_map",
109            SmartModuleKind::Join(..) => "join",
110            SmartModuleKind::JoinStream { .. } => "join_stream",
111            SmartModuleKind::Generic(..) => "smartmodule",
112        };
113        out.write_str(name)
114    }
115}
116
117#[derive(Debug, Clone, Encoder, Decoder, Default)]
118pub enum SmartModuleContextData {
119    #[default]
120    #[fluvio(tag = 0)]
121    None,
122    #[fluvio(tag = 1)]
123    Aggregate { accumulator: Vec<u8> },
124    #[fluvio(tag = 2)]
125    Join(String),
126    #[fluvio(tag = 3)]
127    JoinStream {
128        topic: String,
129        derivedstream: String,
130    },
131}
132
133fn zip(raw: &[u8]) -> io::Result<Vec<u8>> {
134    let mut encoder = GzEncoder::new(raw, Compression::default());
135    let mut buffer = Vec::with_capacity(raw.len());
136    encoder.read_to_end(&mut buffer)?;
137    Ok(buffer)
138}
139
140fn unzip(compressed: &[u8]) -> io::Result<Vec<u8>> {
141    let mut decoder = GzDecoder::new(compressed);
142    let mut buffer = Vec::with_capacity(compressed.len());
143    decoder.read_to_end(&mut buffer)?;
144    Ok(buffer)
145}
146
147#[cfg(test)]
148mod tests {
149
150    use super::*;
151
152    #[test]
153    fn test_encode_smartmodulekind() {
154        let mut dest = Vec::new();
155        let value: SmartModuleKind = SmartModuleKind::Filter;
156        value.encode(&mut dest, 0).expect("should encode");
157        assert_eq!(dest.len(), 1);
158        assert_eq!(dest[0], 0x00);
159    }
160
161    #[test]
162    fn test_decode_smartmodulekind() {
163        let bytes = vec![0x01];
164        let mut value: SmartModuleKind = Default::default();
165        value
166            .decode(&mut io::Cursor::new(bytes), 0)
167            .expect("should decode");
168        assert!(matches!(value, SmartModuleKind::Map));
169    }
170
171    #[test]
172    fn test_gzip_smartmoduleinvocationwasm() {
173        let bytes = vec![0xde, 0xad, 0xbe, 0xef];
174        let value: SmartModuleInvocationWasm =
175            SmartModuleInvocationWasm::adhoc_from_bytes(&bytes).expect("should encode");
176        if let SmartModuleInvocationWasm::AdHoc(compressed_bytes) = value {
177            let decompressed_bytes = unzip(&compressed_bytes).expect("should decompress");
178            assert_eq!(decompressed_bytes, bytes);
179        } else {
180            panic!("not adhoc")
181        }
182    }
183}