fluvio_spu_schema/server/
stream_fetch.rs

1//!
2//! # Continuous Fetch
3//!
4//! Stream records to client
5//!
6use std::fmt::Debug;
7use std::marker::PhantomData;
8
9use educe::Educe;
10use derive_builder::Builder;
11
12use fluvio_protocol::record::RawRecords;
13use fluvio_protocol::{Encoder, Decoder};
14use fluvio_protocol::api::Request;
15use fluvio_protocol::record::RecordSet;
16use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
17use fluvio_types::{PartitionId, defaults::FLUVIO_CLIENT_MAX_FETCH_BYTES};
18
19use crate::COMMON_VERSION;
20use crate::fetch::FetchablePartitionResponse;
21use crate::isolation::Isolation;
22
23pub type DefaultStreamFetchResponse = StreamFetchResponse<RecordSet<RawRecords>>;
24pub type DefaultStreamFetchRequest = StreamFetchRequest<RecordSet<RawRecords>>;
25
26use super::SpuServerApiKey;
27#[allow(deprecated)]
28use super::smartmodule::SmartModuleInvocation;
29
30// version for WASM_MODULE
31pub const WASM_MODULE_API: i16 = 11;
32pub const WASM_MODULE_V2_API: i16 = 12;
33
34// version for aggregator SmartModule
35pub const AGGREGATOR_API: i16 = 13;
36
37// version for gzipped WASM payloads
38pub const GZIP_WASM_API: i16 = 14;
39
40// version for SmartModule array map
41pub const ARRAY_MAP_WASM_API: i16 = 15;
42
43// version for persistent SmartModule
44pub const SMART_MODULE_API: i16 = 16;
45
46pub const GENERIC_SMARTMODULE_API: i16 = 17;
47pub const CHAIN_SMARTMODULE_API: i16 = 18;
48
49pub const SMARTMODULE_LOOKBACK: i16 = 20;
50
51pub const SMARTMODULE_LOOKBACK_AGE: i16 = 21;
52
53pub const SMARTMODULE_TIMESTAMP: i16 = 22;
54
55pub const OFFSET_MANAGEMENT_API: i16 = 23;
56
57/// Fetch records continuously
58/// Output will be send back as stream
59#[allow(deprecated)]
60#[derive(Decoder, Encoder, Builder, Default, Educe)]
61#[builder(setter(into))]
62#[educe(Debug)]
63pub struct StreamFetchRequest<R> {
64    pub topic: String,
65    #[builder(default = "0")]
66    pub partition: PartitionId,
67    #[builder(default = "0")]
68    pub fetch_offset: i64,
69    #[builder(default = "FLUVIO_CLIENT_MAX_FETCH_BYTES")]
70    pub max_bytes: i32,
71    #[builder(default = "Isolation::ReadUncommitted")]
72    pub isolation: Isolation,
73    // these private fields will be removed
74    #[educe(Debug(ignore))]
75    #[builder(setter(skip))]
76    #[fluvio(min_version = 11, max_version = 18)]
77    wasm_module: Vec<u8>,
78    #[builder(setter(skip))]
79    #[fluvio(min_version = 16, max_version = 18)]
80    smartmodule: Option<SmartModuleInvocation>,
81    #[builder(setter(skip))]
82    #[fluvio(min_version = 16, max_version = 18)]
83    derivedstream: Option<DerivedStreamInvocation>,
84    #[builder(default)]
85    #[fluvio(min_version = 18)]
86    pub smartmodules: Vec<SmartModuleInvocation>,
87    #[builder(default)]
88    #[fluvio(min_version = 23)]
89    pub consumer_id: Option<String>,
90    #[builder(setter(skip))]
91    data: PhantomData<R>,
92}
93
94impl<R> StreamFetchRequest<R>
95where
96    R: Clone,
97{
98    pub fn builder() -> StreamFetchRequestBuilder<R> {
99        StreamFetchRequestBuilder::default()
100    }
101}
102
103impl<R> Request for StreamFetchRequest<R>
104where
105    R: Debug + Decoder + Encoder,
106{
107    const API_KEY: u16 = SpuServerApiKey::StreamFetch as u16;
108    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
109    type Response = StreamFetchResponse<R>;
110}
111
112#[derive(Debug, Default, Clone, Encoder, Decoder)]
113pub(crate) struct DerivedStreamInvocation {
114    pub stream: String,
115    pub params: SmartModuleExtraParams,
116}
117
118#[derive(Encoder, Decoder, Default, Debug)]
119pub struct StreamFetchResponse<R> {
120    pub topic: String,
121    pub stream_id: u32,
122    pub partition: FetchablePartitionResponse<R>,
123}
124
125#[cfg(feature = "file")]
126pub use file::*;
127
128#[cfg(feature = "file")]
129mod file {
130
131    use std::io::Error as IoError;
132
133    use tracing::trace;
134    use bytes::BytesMut;
135
136    use fluvio_protocol::Version;
137    use fluvio_protocol::store::{StoreValue, FileWrite};
138
139    use crate::file::FileRecordSet;
140
141    pub type FileStreamFetchRequest = StreamFetchRequest<FileRecordSet>;
142
143    use super::*;
144
145    impl FileWrite for StreamFetchResponse<FileRecordSet> {
146        fn file_encode(
147            &self,
148            src: &mut BytesMut,
149            data: &mut Vec<StoreValue>,
150            version: Version,
151        ) -> Result<(), IoError> {
152            trace!("file encoding FlvContinuousFetchResponse");
153            trace!("topic {}", self.topic);
154            self.topic.encode(src, version)?;
155            self.stream_id.encode(src, version)?;
156            self.partition.file_encode(src, data, version)?;
157            Ok(())
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164
165    use fluvio_smartmodule::dataplane::smartmodule::Lookback;
166
167    use crate::server::smartmodule::{SmartModuleInvocationWasm, SmartModuleKind};
168
169    use super::*;
170
171    #[test]
172    fn test_encode_stream_fetch_request() {
173        let mut dest = Vec::new();
174        let value = DefaultStreamFetchRequest {
175            topic: "one".to_string(),
176            partition: 3,
177            smartmodules: vec![
178                (SmartModuleInvocation {
179                    wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
180                    kind: SmartModuleKind::Filter,
181                    ..Default::default()
182                }),
183            ],
184            ..Default::default()
185        };
186        value
187            .encode(&mut dest, CHAIN_SMARTMODULE_API)
188            .expect("should encode");
189        let expected = vec![
190            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
191            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
192            0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
193            0x00, 0x00,
194        ];
195        assert_eq!(dest, expected);
196    }
197
198    #[test]
199    fn test_encode_stream_fetch_request_last_version() {
200        let name = "test-adhoc";
201        let mut dest = Vec::new();
202        let mut params = SmartModuleExtraParams::default();
203        params.set_lookback(Some(Lookback::last(1)));
204        let value = DefaultStreamFetchRequest {
205            topic: "one".to_string(),
206            partition: 3,
207            smartmodules: vec![
208                (SmartModuleInvocation {
209                    wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
210                    kind: SmartModuleKind::Filter,
211                    params,
212                    name: Some(name.to_string()),
213                }),
214            ],
215            ..Default::default()
216        };
217        value
218            .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION)
219            .expect("should encode");
220        // DBG: keep this for debugging, pre name
221        // let expected = vec![
222        //     0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
223        //     0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
224        //     0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
225        //     0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
226        // ];
227        let expected = vec![
228            // including sm name
229            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
230            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
231            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
232            0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x01, 0, 10, 116, 101, 115, 116, 45, 97, 100, 104,
233            111, 99, 0,
234        ];
235        assert_eq!(dest, expected);
236    }
237
238    #[test]
239    fn test_encode_stream_fetch_request_prev_version() {
240        let name = "test-adhoc";
241        let mut dest = Vec::new();
242        let mut params = SmartModuleExtraParams::default();
243        params.set_lookback(Some(Lookback::last(1)));
244        let value = DefaultStreamFetchRequest {
245            topic: "one".to_string(),
246            partition: 3,
247            smartmodules: vec![
248                (SmartModuleInvocation {
249                    wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
250                    kind: SmartModuleKind::Filter,
251                    params,
252                    name: Some(name.to_string()),
253                }),
254            ],
255            ..Default::default()
256        };
257        value
258            .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION - 1)
259            .expect("should encode");
260        let expected = vec![
261            // Pre sm name encoding
262            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
263            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
264            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
265            0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
266        ];
267        // let expected = vec![// DBG: keep this for debugging, w/ name
268        //     0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
269        //     0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
270        //     0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
271        //     0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 1, 0, 10, 116, 101, 115, 116, 45, 97, 100, 104,
272        //     111, 99, 0,
273        // ];
274        assert_eq!(dest, expected);
275    }
276
277    #[test]
278    fn test_decode_stream_fetch_request() {
279        let bytes = vec![
280            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
281            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
282            0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
283            0x00, 0x00, 0x00,
284        ];
285        let mut value = DefaultStreamFetchRequest::default();
286        value
287            .decode(&mut std::io::Cursor::new(bytes), CHAIN_SMARTMODULE_API)
288            .unwrap();
289        assert_eq!(value.topic, "one");
290        assert_eq!(value.partition, 3);
291        let sm = match value.smartmodules.first() {
292            Some(wasm) => wasm,
293            _ => panic!("should have smartmodule payload"),
294        };
295        let wasm = match &sm.wasm {
296            SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
297            #[allow(unreachable_patterns)]
298            _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
299        };
300        assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
301        assert!(matches!(sm.kind, SmartModuleKind::Filter));
302    }
303
304    #[test]
305    fn test_decode_stream_fetch_request_last_version() {
306        let bytes = vec![
307            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
308            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
309            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
310            0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
311        ];
312        let mut value = DefaultStreamFetchRequest::default();
313        value
314            .decode(
315                &mut std::io::Cursor::new(bytes),
316                DefaultStreamFetchRequest::MAX_API_VERSION,
317            )
318            .unwrap();
319        assert_eq!(value.topic, "one");
320        assert_eq!(value.partition, 3);
321        let sm = match value.smartmodules.first() {
322            Some(wasm) => wasm,
323            _ => panic!("should have smartmodule payload"),
324        };
325        assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
326        let wasm = match &sm.wasm {
327            SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
328            #[allow(unreachable_patterns)]
329            _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
330        };
331        assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
332        assert!(matches!(sm.kind, SmartModuleKind::Filter));
333    }
334
335    #[test]
336    fn test_decode_stream_fetch_request_prev_version() {
337        let bytes = vec![
338            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
339            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
340            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
341            0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
342        ];
343        let mut value = DefaultStreamFetchRequest::default();
344        value
345            .decode(
346                &mut std::io::Cursor::new(bytes),
347                DefaultStreamFetchRequest::MAX_API_VERSION - 1,
348            )
349            .unwrap();
350        assert_eq!(value.topic, "one");
351        assert_eq!(value.partition, 3);
352        let sm = match value.smartmodules.first() {
353            Some(wasm) => wasm,
354            _ => panic!("should have smartmodule payload"),
355        };
356        assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
357        let wasm = match &sm.wasm {
358            SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
359            #[allow(unreachable_patterns)]
360            _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
361        };
362        assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
363        assert!(matches!(sm.kind, SmartModuleKind::Filter));
364    }
365
366    #[test]
367    fn test_zip_unzip_works() {
368        const ORIG_LEN: usize = 1024;
369        let orig = vec![0x01; ORIG_LEN];
370        let compressed = SmartModuleInvocationWasm::adhoc_from_bytes(orig.as_slice())
371            .expect("compression failed");
372        assert!(matches!(&compressed, SmartModuleInvocationWasm::AdHoc(x) if x.len() < ORIG_LEN));
373        let uncompressed = compressed.into_raw().expect("decompression failed");
374        assert_eq!(orig, uncompressed);
375    }
376}