1use std::fmt::Debug;
7use std::marker::PhantomData;
8
9use educe::Educe;
10use derive_builder::Builder;
11
12use fluvio_protocol::record::RawRecords;
13use fluvio_protocol::{Encoder, Decoder};
14use fluvio_protocol::api::Request;
15use fluvio_protocol::record::RecordSet;
16use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
17use fluvio_types::{PartitionId, defaults::FLUVIO_CLIENT_MAX_FETCH_BYTES};
18
19use crate::COMMON_VERSION;
20use crate::fetch::FetchablePartitionResponse;
21use crate::isolation::Isolation;
22
23pub type DefaultStreamFetchResponse = StreamFetchResponse<RecordSet<RawRecords>>;
24pub type DefaultStreamFetchRequest = StreamFetchRequest<RecordSet<RawRecords>>;
25
26use super::SpuServerApiKey;
27#[allow(deprecated)]
28use super::smartmodule::SmartModuleInvocation;
29
30pub const WASM_MODULE_API: i16 = 11;
32pub const WASM_MODULE_V2_API: i16 = 12;
33
34pub const AGGREGATOR_API: i16 = 13;
36
37pub const GZIP_WASM_API: i16 = 14;
39
40pub const ARRAY_MAP_WASM_API: i16 = 15;
42
43pub const SMART_MODULE_API: i16 = 16;
45
46pub const GENERIC_SMARTMODULE_API: i16 = 17;
47pub const CHAIN_SMARTMODULE_API: i16 = 18;
48
49pub const SMARTMODULE_LOOKBACK: i16 = 20;
50
51pub const SMARTMODULE_LOOKBACK_AGE: i16 = 21;
52
53pub const SMARTMODULE_TIMESTAMP: i16 = 22;
54
55pub const OFFSET_MANAGEMENT_API: i16 = 23;
56
57#[allow(deprecated)]
60#[derive(Decoder, Encoder, Builder, Default, Educe)]
61#[builder(setter(into))]
62#[educe(Debug)]
63pub struct StreamFetchRequest<R> {
64 pub topic: String,
65 #[builder(default = "0")]
66 pub partition: PartitionId,
67 #[builder(default = "0")]
68 pub fetch_offset: i64,
69 #[builder(default = "FLUVIO_CLIENT_MAX_FETCH_BYTES")]
70 pub max_bytes: i32,
71 #[builder(default = "Isolation::ReadUncommitted")]
72 pub isolation: Isolation,
73 #[educe(Debug(ignore))]
75 #[builder(setter(skip))]
76 #[fluvio(min_version = 11, max_version = 18)]
77 wasm_module: Vec<u8>,
78 #[builder(setter(skip))]
79 #[fluvio(min_version = 16, max_version = 18)]
80 smartmodule: Option<SmartModuleInvocation>,
81 #[builder(setter(skip))]
82 #[fluvio(min_version = 16, max_version = 18)]
83 derivedstream: Option<DerivedStreamInvocation>,
84 #[builder(default)]
85 #[fluvio(min_version = 18)]
86 pub smartmodules: Vec<SmartModuleInvocation>,
87 #[builder(default)]
88 #[fluvio(min_version = 23)]
89 pub consumer_id: Option<String>,
90 #[builder(setter(skip))]
91 data: PhantomData<R>,
92}
93
94impl<R> StreamFetchRequest<R>
95where
96 R: Clone,
97{
98 pub fn builder() -> StreamFetchRequestBuilder<R> {
99 StreamFetchRequestBuilder::default()
100 }
101}
102
103impl<R> Request for StreamFetchRequest<R>
104where
105 R: Debug + Decoder + Encoder,
106{
107 const API_KEY: u16 = SpuServerApiKey::StreamFetch as u16;
108 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
109 type Response = StreamFetchResponse<R>;
110}
111
112#[derive(Debug, Default, Clone, Encoder, Decoder)]
113pub(crate) struct DerivedStreamInvocation {
114 pub stream: String,
115 pub params: SmartModuleExtraParams,
116}
117
118#[derive(Encoder, Decoder, Default, Debug)]
119pub struct StreamFetchResponse<R> {
120 pub topic: String,
121 pub stream_id: u32,
122 pub partition: FetchablePartitionResponse<R>,
123}
124
125#[cfg(feature = "file")]
126pub use file::*;
127
128#[cfg(feature = "file")]
129mod file {
130
131 use std::io::Error as IoError;
132
133 use tracing::trace;
134 use bytes::BytesMut;
135
136 use fluvio_protocol::Version;
137 use fluvio_protocol::store::{StoreValue, FileWrite};
138
139 use crate::file::FileRecordSet;
140
141 pub type FileStreamFetchRequest = StreamFetchRequest<FileRecordSet>;
142
143 use super::*;
144
145 impl FileWrite for StreamFetchResponse<FileRecordSet> {
146 fn file_encode(
147 &self,
148 src: &mut BytesMut,
149 data: &mut Vec<StoreValue>,
150 version: Version,
151 ) -> Result<(), IoError> {
152 trace!("file encoding FlvContinuousFetchResponse");
153 trace!("topic {}", self.topic);
154 self.topic.encode(src, version)?;
155 self.stream_id.encode(src, version)?;
156 self.partition.file_encode(src, data, version)?;
157 Ok(())
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164
165 use fluvio_smartmodule::dataplane::smartmodule::Lookback;
166
167 use crate::server::smartmodule::{SmartModuleInvocationWasm, SmartModuleKind};
168
169 use super::*;
170
171 #[test]
172 fn test_encode_stream_fetch_request() {
173 let mut dest = Vec::new();
174 let value = DefaultStreamFetchRequest {
175 topic: "one".to_string(),
176 partition: 3,
177 smartmodules: vec![
178 (SmartModuleInvocation {
179 wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
180 kind: SmartModuleKind::Filter,
181 ..Default::default()
182 }),
183 ],
184 ..Default::default()
185 };
186 value
187 .encode(&mut dest, CHAIN_SMARTMODULE_API)
188 .expect("should encode");
189 let expected = vec![
190 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
191 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
192 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
193 0x00, 0x00,
194 ];
195 assert_eq!(dest, expected);
196 }
197
198 #[test]
199 fn test_encode_stream_fetch_request_last_version() {
200 let name = "test-adhoc";
201 let mut dest = Vec::new();
202 let mut params = SmartModuleExtraParams::default();
203 params.set_lookback(Some(Lookback::last(1)));
204 let value = DefaultStreamFetchRequest {
205 topic: "one".to_string(),
206 partition: 3,
207 smartmodules: vec![
208 (SmartModuleInvocation {
209 wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
210 kind: SmartModuleKind::Filter,
211 params,
212 name: Some(name.to_string()),
213 }),
214 ],
215 ..Default::default()
216 };
217 value
218 .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION)
219 .expect("should encode");
220 let expected = vec![
228 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
230 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
231 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
232 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0, 10, 116, 101, 115, 116, 45, 97, 100, 104,
233 111, 99, 0,
234 ];
235 assert_eq!(dest, expected);
236 }
237
238 #[test]
239 fn test_encode_stream_fetch_request_prev_version() {
240 let name = "test-adhoc";
241 let mut dest = Vec::new();
242 let mut params = SmartModuleExtraParams::default();
243 params.set_lookback(Some(Lookback::last(1)));
244 let value = DefaultStreamFetchRequest {
245 topic: "one".to_string(),
246 partition: 3,
247 smartmodules: vec![
248 (SmartModuleInvocation {
249 wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
250 kind: SmartModuleKind::Filter,
251 params,
252 name: Some(name.to_string()),
253 }),
254 ],
255 ..Default::default()
256 };
257 value
258 .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION - 1)
259 .expect("should encode");
260 let expected = vec![
261 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
263 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
264 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
265 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
266 ];
267 assert_eq!(dest, expected);
275 }
276
277 #[test]
278 fn test_decode_stream_fetch_request() {
279 let bytes = vec![
280 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
281 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
282 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
283 0x00, 0x00, 0x00,
284 ];
285 let mut value = DefaultStreamFetchRequest::default();
286 value
287 .decode(&mut std::io::Cursor::new(bytes), CHAIN_SMARTMODULE_API)
288 .unwrap();
289 assert_eq!(value.topic, "one");
290 assert_eq!(value.partition, 3);
291 let sm = match value.smartmodules.first() {
292 Some(wasm) => wasm,
293 _ => panic!("should have smartmodule payload"),
294 };
295 let wasm = match &sm.wasm {
296 SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
297 #[allow(unreachable_patterns)]
298 _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
299 };
300 assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
301 assert!(matches!(sm.kind, SmartModuleKind::Filter));
302 }
303
304 #[test]
305 fn test_decode_stream_fetch_request_last_version() {
306 let bytes = vec![
307 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
308 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
309 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
310 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
311 ];
312 let mut value = DefaultStreamFetchRequest::default();
313 value
314 .decode(
315 &mut std::io::Cursor::new(bytes),
316 DefaultStreamFetchRequest::MAX_API_VERSION,
317 )
318 .unwrap();
319 assert_eq!(value.topic, "one");
320 assert_eq!(value.partition, 3);
321 let sm = match value.smartmodules.first() {
322 Some(wasm) => wasm,
323 _ => panic!("should have smartmodule payload"),
324 };
325 assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
326 let wasm = match &sm.wasm {
327 SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
328 #[allow(unreachable_patterns)]
329 _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
330 };
331 assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
332 assert!(matches!(sm.kind, SmartModuleKind::Filter));
333 }
334
335 #[test]
336 fn test_decode_stream_fetch_request_prev_version() {
337 let bytes = vec![
338 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
339 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
340 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
341 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
342 ];
343 let mut value = DefaultStreamFetchRequest::default();
344 value
345 .decode(
346 &mut std::io::Cursor::new(bytes),
347 DefaultStreamFetchRequest::MAX_API_VERSION - 1,
348 )
349 .unwrap();
350 assert_eq!(value.topic, "one");
351 assert_eq!(value.partition, 3);
352 let sm = match value.smartmodules.first() {
353 Some(wasm) => wasm,
354 _ => panic!("should have smartmodule payload"),
355 };
356 assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
357 let wasm = match &sm.wasm {
358 SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
359 #[allow(unreachable_patterns)]
360 _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
361 };
362 assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
363 assert!(matches!(sm.kind, SmartModuleKind::Filter));
364 }
365
366 #[test]
367 fn test_zip_unzip_works() {
368 const ORIG_LEN: usize = 1024;
369 let orig = vec![0x01; ORIG_LEN];
370 let compressed = SmartModuleInvocationWasm::adhoc_from_bytes(orig.as_slice())
371 .expect("compression failed");
372 assert!(matches!(&compressed, SmartModuleInvocationWasm::AdHoc(x) if x.len() < ORIG_LEN));
373 let uncompressed = compressed.into_raw().expect("decompression failed");
374 assert_eq!(orig, uncompressed);
375 }
376}