fluvio_controlplane_metadata/smartmodule/
spec.rs1use std::borrow::Cow;
5use std::io::Error as IoError;
6
7use bytes::BufMut;
8use tracing::debug;
9
10use fluvio_protocol::{ByteBuf, Encoder, Decoder, Version};
11
12use super::{SmartModuleMetadata, spec_v1::SmartModuleSpecV1};
13
14const V2_FORMAT: Version = 10;
15
16#[derive(Debug, Default, Clone, Eq, PartialEq)]
17#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
18pub struct SmartModuleSpec {
19 pub meta: Option<SmartModuleMetadata>,
20 #[cfg_attr(feature = "use_serde", serde(skip))]
21 pub summary: Option<SmartModuleWasmSummary>, pub wasm: SmartModuleWasm,
23}
24
25impl Encoder for SmartModuleSpec {
27 fn write_size(&self, version: Version) -> usize {
28 if version < V2_FORMAT {
29 let spec_v1 = SmartModuleSpecV1::default();
32 let mut size = 0;
33 size += spec_v1.input_kind.write_size(version);
34 size += spec_v1.output_kind.write_size(version);
35 size += spec_v1.source_code.write_size(version);
36 size += self.wasm.write_size(version);
37 size += spec_v1.parameters.write_size(version);
38 size
39 } else {
40 let mut size = 0;
41 size += self.meta.write_size(version);
42 size += self.summary.write_size(version);
43 size += self.wasm.write_size(version);
44 size
45 }
46 }
47
48 fn encode<T>(&self, dest: &mut T, version: Version) -> Result<(), IoError>
49 where
50 T: BufMut,
51 {
52 if version < V2_FORMAT {
53 debug!("encoding for smartmodule spec v1");
54 let spec_v1 = SmartModuleSpecV1::default();
55 spec_v1.input_kind.encode(dest, version)?;
56 spec_v1.output_kind.encode(dest, version)?;
57 spec_v1.source_code.encode(dest, version)?;
58 self.wasm.encode(dest, version)?;
59 spec_v1.parameters.encode(dest, version)?;
60 } else {
61 self.meta.encode(dest, version)?;
62 self.summary.encode(dest, version)?;
63 self.wasm.encode(dest, version)?;
64 }
65 Ok(())
66 }
67}
68
69impl Decoder for SmartModuleSpec {
70 fn decode<T>(&mut self, src: &mut T, version: Version) -> Result<(), IoError>
71 where
72 T: bytes::Buf,
73 {
74 if version < V2_FORMAT {
75 debug!("decoding for smartmodule spec v1");
76 let mut spec_v1 = SmartModuleSpecV1::default();
77 spec_v1.decode(src, version)?;
78 self.wasm = spec_v1.wasm;
79 } else {
80 self.meta.decode(src, version)?;
81 self.summary.decode(src, version)?;
82 self.wasm.decode(src, version)?;
83 }
84
85 Ok(())
86 }
87}
88
89impl SmartModuleSpec {
90 pub fn fqdn<'a>(&self, store_id: &'a str) -> Cow<'a, str> {
92 if let Some(meta) = &self.meta {
93 meta.package.fqdn().into()
94 } else {
95 Cow::from(store_id)
96 }
97 }
98}
99
100#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
101pub struct SmartModuleWasmSummary {
102 pub wasm_length: u32,
103}
104
105#[derive(Clone, Default, Eq, PartialEq, Encoder, Decoder)]
106#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
107pub struct SmartModuleWasm {
108 pub format: SmartModuleWasmFormat,
109 #[cfg_attr(feature = "use_serde", serde(with = "base64"))]
110 pub payload: ByteBuf,
111}
112
113impl std::fmt::Debug for SmartModuleWasm {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 f.write_str(&format!(
116 "SmartModuleWasm {{ format: {:?}, payload: [REDACTED] }}",
117 self.format
118 ))
119 }
120}
121
122impl SmartModuleWasm {
123 pub fn from_compressed_gzip(payload: Vec<u8>) -> Self {
125 SmartModuleWasm {
126 payload: ByteBuf::from(payload),
127 format: SmartModuleWasmFormat::Binary,
128 }
129 }
130}
131
132#[cfg(feature = "smartmodule")]
133impl SmartModuleWasm {
134 pub fn from_raw_wasm_bytes(raw_payload: &[u8]) -> std::io::Result<Self> {
136 use std::io::Read;
137 use flate2::{Compression, bufread::GzEncoder};
138
139 let mut encoder = GzEncoder::new(raw_payload, Compression::default());
140 let mut buffer = Vec::with_capacity(raw_payload.len());
141 encoder.read_to_end(&mut buffer)?;
142
143 Ok(Self::from_compressed_gzip(buffer))
144 }
145
146 pub fn as_raw_wasm(&self) -> Result<Vec<u8>, IoError> {
147 use std::io::Read;
148 use flate2::bufread::GzDecoder;
149
150 let mut wasm = Vec::with_capacity(self.payload.len());
151 let mut decoder = GzDecoder::new(&**self.payload);
152 decoder.read_to_end(&mut wasm)?;
153 Ok(wasm)
154 }
155}
156
157#[derive(Debug, Default, Clone, Eq, PartialEq, Encoder, Decoder)]
158#[cfg_attr(feature = "use_serde", derive(serde::Serialize, serde::Deserialize))]
159pub enum SmartModuleWasmFormat {
160 #[default]
161 #[cfg_attr(feature = "use_serde", serde(rename = "BINARY"))]
162 #[fluvio(tag = 0)]
163 Binary,
164 #[cfg_attr(feature = "use_serde", serde(rename = "TEXT"))]
165 #[fluvio(tag = 1)]
166 Text,
167}
168
169#[cfg(feature = "use_serde")]
170mod base64 {
171 use std::ops::Deref;
172
173 use serde::{Serialize, Deserialize};
174 use serde::{Deserializer, Serializer};
175 use base64::Engine;
176
177 use fluvio_protocol::ByteBuf;
178
179 pub fn serialize<S>(bytebuf: &ByteBuf, serializer: S) -> Result<S::Ok, S::Error>
180 where
181 S: Serializer,
182 {
183 let base64 = base64::engine::general_purpose::STANDARD.encode(bytebuf.deref());
184 String::serialize(&base64, serializer)
185 }
186
187 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<ByteBuf, D::Error> {
188 let b64 = String::deserialize(d)?;
189 let bytes: Vec<u8> = base64::engine::general_purpose::STANDARD
190 .decode(b64.as_bytes())
191 .map_err(serde::de::Error::custom)?;
192 let bytebuf = ByteBuf::from(bytes);
193
194 Ok(bytebuf)
195 }
196}
197
198#[cfg(test)]
199mod tests {
200
201 #[cfg(feature = "smartmodule")]
202 #[test]
203 fn test_wasm_zip_unzip() {
204 use super::*;
205
206 let payload = b"test wasm";
208
209 let wasm = SmartModuleWasm::from_raw_wasm_bytes(payload).expect("created wasm");
211 let unzipped = wasm.as_raw_wasm().expect("unzipped wasm");
212
213 assert_eq!(payload, unzipped.as_slice());
215 }
216}