fluvio_controlplane_metadata/smartmodule/
spec.rs

1//!
2//! # SmartModule Spec
3//!
4use std::borrow::Cow;
5use std::io::Error as IoError;
6
7use bytes::BufMut;
8use tracing::debug;
9
10use fluvio_protocol::{ByteBuf, Encoder, Decoder, Version};
11
12use super::{SmartModuleMetadata, spec_v1::SmartModuleSpecV1};
13
14const V2_FORMAT: Version = 10;
15
16#[derive(Debug, Default, Clone, Eq, PartialEq)]
17#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
18pub struct SmartModuleSpec {
19    pub meta: Option<SmartModuleMetadata>,
20    #[cfg_attr(feature = "use_serde", serde(skip))]
21    pub summary: Option<SmartModuleWasmSummary>, // only passed from SC to CLI
22    pub wasm: SmartModuleWasm,
23}
24
25// custom encoding to handle prev version
26impl Encoder for SmartModuleSpec {
27    fn write_size(&self, version: Version) -> usize {
28        if version < V2_FORMAT {
29            //trace!("computing size for smartmodule spec v1");
30            // just used for computing size
31            let spec_v1 = SmartModuleSpecV1::default();
32            let mut size = 0;
33            size += spec_v1.input_kind.write_size(version);
34            size += spec_v1.output_kind.write_size(version);
35            size += spec_v1.source_code.write_size(version);
36            size += self.wasm.write_size(version);
37            size += spec_v1.parameters.write_size(version);
38            size
39        } else {
40            let mut size = 0;
41            size += self.meta.write_size(version);
42            size += self.summary.write_size(version);
43            size += self.wasm.write_size(version);
44            size
45        }
46    }
47
48    fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
49    where
50        T: BufMut,
51    {
52        if version < V2_FORMAT {
53            debug!("encoding for smartmodule spec v1");
54            let spec_v1 = SmartModuleSpecV1::default();
55            spec_v1.input_kind.encode(dest, version)?;
56            spec_v1.output_kind.encode(dest, version)?;
57            spec_v1.source_code.encode(dest, version)?;
58            self.wasm.encode(dest, version)?;
59            spec_v1.parameters.encode(dest, version)?;
60        } else {
61            self.meta.encode(dest, version)?;
62            self.summary.encode(dest, version)?;
63            self.wasm.encode(dest, version)?;
64        }
65        Ok(())
66    }
67}
68
69impl Decoder for SmartModuleSpec {
70    fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), IoError>
71    where
72        T: bytes::Buf,
73    {
74        if version < V2_FORMAT {
75            debug!("decoding for smartmodule spec v1");
76            let mut spec_v1 = SmartModuleSpecV1::default();
77            spec_v1.decode(src, version)?;
78            self.wasm = spec_v1.wasm;
79        } else {
80            self.meta.decode(src, version)?;
81            self.summary.decode(src, version)?;
82            self.wasm.decode(src, version)?;
83        }
84
85        Ok(())
86    }
87}
88
89impl SmartModuleSpec {
90    /// return fully qualified name given store key
91    pub fn fqdn<'a>(&self, store_id: &'a str) -> Cow<'a, str> {
92        if let Some(meta) = &self.meta {
93            meta.package.fqdn().into()
94        } else {
95            Cow::from(store_id)
96        }
97    }
98}
99
100#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
101pub struct SmartModuleWasmSummary {
102    pub wasm_length: u32,
103}
104
105#[derive(Clone, Default, Eq, PartialEq, Encoder, Decoder)]
106#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
107pub struct SmartModuleWasm {
108    pub format: SmartModuleWasmFormat,
109    #[cfg_attr(feature = "use_serde", serde(with = "base64"))]
110    pub payload: ByteBuf,
111}
112
113impl std::fmt::Debug for SmartModuleWasm {
114    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115        f.write_str(&format!(
116            "SmartModuleWasm {{ format: {:?}, payload: [REDACTED] }}",
117            self.format
118        ))
119    }
120}
121
122impl SmartModuleWasm {
123    /// Create SmartModule from compressed Gzip format
124    pub fn from_compressed_gzip(payload: Vec<u8>) -> Self {
125        SmartModuleWasm {
126            payload: ByteBuf::from(payload),
127            format: SmartModuleWasmFormat::Binary,
128        }
129    }
130}
131
132#[cfg(feature = "smartmodule")]
133impl SmartModuleWasm {
134    /// Create SmartModule from uncompressed Wasm format
135    pub fn from_raw_wasm_bytes(raw_payload: &[u8]) -> std::io::Result<Self> {
136        use std::io::Read;
137        use flate2::{Compression, bufread::GzEncoder};
138
139        let mut encoder = GzEncoder::new(raw_payload, Compression::default());
140        let mut buffer = Vec::with_capacity(raw_payload.len());
141        encoder.read_to_end(&mut buffer)?;
142
143        Ok(Self::from_compressed_gzip(buffer))
144    }
145
146    pub fn as_raw_wasm(&self) -> Result<Vec<u8>, IoError> {
147        use std::io::Read;
148        use flate2::bufread::GzDecoder;
149
150        let mut wasm = Vec::with_capacity(self.payload.len());
151        let mut decoder = GzDecoder::new(&**self.payload);
152        decoder.read_to_end(&mut wasm)?;
153        Ok(wasm)
154    }
155}
156
157#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
158#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
159pub enum SmartModuleWasmFormat {
160    #[default]
161    #[cfg_attr(feature = "use_serde", serde(rename = "BINARY"))]
162    #[fluvio(tag = 0)]
163    Binary,
164    #[cfg_attr(feature = "use_serde", serde(rename = "TEXT"))]
165    #[fluvio(tag = 1)]
166    Text,
167}
168
169#[cfg(feature = "use_serde")]
170mod base64 {
171    use std::ops::Deref;
172
173    use serde::{Serialize, Deserialize};
174    use serde::{Deserializer, Serializer};
175    use base64::Engine;
176
177    use fluvio_protocol::ByteBuf;
178
179    pub fn serialize<S>(bytebuf: &ByteBuf, serializer: S) -> Result<S::Ok, S::Error>
180    where
181        S: Serializer,
182    {
183        let base64 = base64::engine::general_purpose::STANDARD.encode(bytebuf.deref());
184        String::serialize(&base64, serializer)
185    }
186
187    pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<ByteBuf, D::Error> {
188        let b64 = String::deserialize(d)?;
189        let bytes: Vec<u8> = base64::engine::general_purpose::STANDARD
190            .decode(b64.as_bytes())
191            .map_err(serde::de::Error::custom)?;
192        let bytebuf = ByteBuf::from(bytes);
193
194        Ok(bytebuf)
195    }
196}
197
198#[cfg(test)]
199mod tests {
200
201    #[cfg(feature = "smartmodule")]
202    #[test]
203    fn test_wasm_zip_unzip() {
204        use super::*;
205
206        //given
207        let payload = b"test wasm";
208
209        //when
210        let wasm = SmartModuleWasm::from_raw_wasm_bytes(payload).expect("created wasm");
211        let unzipped = wasm.as_raw_wasm().expect("unzipped wasm");
212
213        //then
214        assert_eq!(payload, unzipped.as_slice());
215    }
216}