fluvio_spu_schema/server/
stream_fetch.rs

1//!
2//! # Continuous Fetch
3//!
4//! Stream records to client
5//!
6use std::fmt::Debug;
7use std::marker::PhantomData;
8
9use educe::Educe;
10use derive_builder::Builder;
11
12use fluvio_protocol::record::RawRecords;
13use fluvio_protocol::{Encoder, Decoder};
14use fluvio_protocol::api::Request;
15use fluvio_protocol::record::RecordSet;
16use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
17use fluvio_types::{PartitionId, defaults::FLUVIO_CLIENT_MAX_FETCH_BYTES};
18
19use crate::COMMON_VERSION;
20use crate::fetch::FetchablePartitionResponse;
21use crate::isolation::Isolation;
22
23pub type DefaultStreamFetchResponse = StreamFetchResponse<RecordSet<RawRecords>>;
24pub type DefaultStreamFetchRequest = StreamFetchRequest<RecordSet<RawRecords>>;
25
26use super::SpuServerApiKey;
27#[allow(deprecated)]
28use super::smartmodule::SmartModuleInvocation;
29
30// version for WASM_MODULE
31pub const WASM_MODULE_API: i16 = 11;
32pub const WASM_MODULE_V2_API: i16 = 12;
33
34// version for aggregator SmartModule
35pub const AGGREGATOR_API: i16 = 13;
36
37// version for gzipped WASM payloads
38pub const GZIP_WASM_API: i16 = 14;
39
40// version for SmartModule array map
41pub const ARRAY_MAP_WASM_API: i16 = 15;
42
43// version for persistent SmartModule
44pub const SMART_MODULE_API: i16 = 16;
45
46pub const GENERIC_SMARTMODULE_API: i16 = 17;
47pub const CHAIN_SMARTMODULE_API: i16 = 18;
48
49pub const SMARTMODULE_LOOKBACK: i16 = 20;
50
51pub const SMARTMODULE_LOOKBACK_AGE: i16 = 21;
52
53pub const SMARTMODULE_TIMESTAMP: i16 = 22;
54
55pub const OFFSET_MANAGEMENT_API: i16 = 23;
56
57/// Fetch records continuously
58/// Output will be send back as stream
59#[allow(deprecated)]
60#[derive(Decoder, Encoder, Builder, Default, Educe)]
61#[builder(setter(into))]
62#[educe(Debug)]
63pub struct StreamFetchRequest<R> {
64    pub topic: String,
65    #[builder(default = "0")]
66    pub partition: PartitionId,
67    #[builder(default = "0")]
68    pub fetch_offset: i64,
69    #[builder(default = "FLUVIO_CLIENT_MAX_FETCH_BYTES")]
70    pub max_bytes: i32,
71    #[builder(default = "Isolation::ReadUncommitted")]
72    pub isolation: Isolation,
73    // these private fields will be removed
74    #[educe(Debug(ignore))]
75    #[builder(setter(skip))]
76    #[fluvio(min_version = 11, max_version = 18)]
77    wasm_module: Vec<u8>,
78    #[builder(setter(skip))]
79    #[fluvio(min_version = 16, max_version = 18)]
80    smartmodule: Option<SmartModuleInvocation>,
81    #[builder(setter(skip))]
82    #[fluvio(min_version = 16, max_version = 18)]
83    derivedstream: Option<DerivedStreamInvocation>,
84    #[builder(default)]
85    #[fluvio(min_version = 18)]
86    pub smartmodules: Vec<SmartModuleInvocation>,
87    #[builder(default)]
88    #[fluvio(min_version = 23)]
89    pub consumer_id: Option<String>,
90    #[builder(setter(skip))]
91    data: PhantomData<R>,
92}
93
94impl<R> StreamFetchRequest<R>
95where
96    R: Clone,
97{
98    pub fn builder() -> StreamFetchRequestBuilder<R> {
99        StreamFetchRequestBuilder::default()
100    }
101}
102
103impl<R> Request for StreamFetchRequest<R>
104where
105    R: Debug + Decoder + Encoder,
106{
107    const API_KEY: u16 = SpuServerApiKey::StreamFetch as u16;
108    const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
109    type Response = StreamFetchResponse<R>;
110}
111
112#[derive(Debug, Default, Clone, Encoder, Decoder)]
113pub(crate) struct DerivedStreamInvocation {
114    pub stream: String,
115    pub params: SmartModuleExtraParams,
116}
117
118#[derive(Encoder, Decoder, Default, Debug)]
119pub struct StreamFetchResponse<R> {
120    pub topic: String,
121    pub stream_id: u32,
122    pub partition: FetchablePartitionResponse<R>,
123}
124
125#[cfg(feature = "file")]
126pub use file::*;
127
128#[cfg(feature = "file")]
129mod file {
130
131    use std::io::Error as IoError;
132
133    use tracing::trace;
134    use bytes::BytesMut;
135
136    use fluvio_protocol::Version;
137    use fluvio_protocol::store::{StoreValue, FileWrite};
138
139    use crate::file::FileRecordSet;
140
141    pub type FileStreamFetchRequest = StreamFetchRequest<FileRecordSet>;
142
143    use super::*;
144
145    impl FileWrite for StreamFetchResponse<FileRecordSet> {
146        fn file_encode(
147            &self,
148            src: &mut BytesMut,
149            data: &mut Vec<StoreValue>,
150            version: Version,
151        ) -> Result<(), IoError> {
152            trace!("file encoding FlvContinuousFetchResponse");
153            trace!("topic {}", self.topic);
154            self.topic.encode(src, version)?;
155            self.stream_id.encode(src, version)?;
156            self.partition.file_encode(src, data, version)?;
157            Ok(())
158        }
159    }
160}
161
162#[cfg(test)]
163mod tests {
164
165    use fluvio_smartmodule::dataplane::smartmodule::Lookback;
166
167    use crate::server::smartmodule::{SmartModuleInvocationWasm, SmartModuleKind};
168
169    use super::*;
170
171    #[test]
172    fn test_encode_stream_fetch_request() {
173        let mut dest = Vec::new();
174        let value = DefaultStreamFetchRequest {
175            topic: "one".to_string(),
176            partition: 3,
177            smartmodules: vec![
178                (SmartModuleInvocation {
179                    wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
180                    kind: SmartModuleKind::Filter,
181                    ..Default::default()
182                }),
183            ],
184            ..Default::default()
185        };
186        value
187            .encode(&mut dest, CHAIN_SMARTMODULE_API)
188            .expect("should encode");
189        let expected = vec![
190            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
191            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
192            0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
193            0x00, 0x00,
194        ];
195        assert_eq!(dest, expected);
196    }
197
198    #[test]
199    fn test_encode_stream_fetch_request_last_version() {
200        let mut dest = Vec::new();
201        let mut params = SmartModuleExtraParams::default();
202        params.set_lookback(Some(Lookback::last(1)));
203        let value = DefaultStreamFetchRequest {
204            topic: "one".to_string(),
205            partition: 3,
206            smartmodules: vec![
207                (SmartModuleInvocation {
208                    wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
209                    kind: SmartModuleKind::Filter,
210                    params,
211                }),
212            ],
213            ..Default::default()
214        };
215        value
216            .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION)
217            .expect("should encode");
218        let expected = vec![
219            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
220            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
221            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
222            0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
223        ];
224        assert_eq!(dest, expected);
225    }
226
227    #[test]
228    fn test_encode_stream_fetch_request_prev_version() {
229        let mut dest = Vec::new();
230        let mut params = SmartModuleExtraParams::default();
231        params.set_lookback(Some(Lookback::last(1)));
232        let value = DefaultStreamFetchRequest {
233            topic: "one".to_string(),
234            partition: 3,
235            smartmodules: vec![
236                (SmartModuleInvocation {
237                    wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
238                    kind: SmartModuleKind::Filter,
239                    params,
240                }),
241            ],
242            ..Default::default()
243        };
244        value
245            .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION - 1)
246            .expect("should encode");
247        let expected = vec![
248            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
249            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
250            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
251            0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
252        ];
253        assert_eq!(dest, expected);
254    }
255
256    #[test]
257    fn test_decode_stream_fetch_request() {
258        let bytes = vec![
259            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
260            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
261            0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
262            0x00, 0x00,
263        ];
264        let mut value = DefaultStreamFetchRequest::default();
265        value
266            .decode(&mut std::io::Cursor::new(bytes), CHAIN_SMARTMODULE_API)
267            .unwrap();
268        assert_eq!(value.topic, "one");
269        assert_eq!(value.partition, 3);
270        let sm = match value.smartmodules.first() {
271            Some(wasm) => wasm,
272            _ => panic!("should have smartmodule payload"),
273        };
274        let wasm = match &sm.wasm {
275            SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
276            #[allow(unreachable_patterns)]
277            _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
278        };
279        assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
280        assert!(matches!(sm.kind, SmartModuleKind::Filter));
281    }
282
283    #[test]
284    fn test_decode_stream_fetch_request_last_version() {
285        let bytes = vec![
286            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
287            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
288            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
289            0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
290        ];
291        let mut value = DefaultStreamFetchRequest::default();
292        value
293            .decode(
294                &mut std::io::Cursor::new(bytes),
295                DefaultStreamFetchRequest::MAX_API_VERSION,
296            )
297            .unwrap();
298        assert_eq!(value.topic, "one");
299        assert_eq!(value.partition, 3);
300        let sm = match value.smartmodules.first() {
301            Some(wasm) => wasm,
302            _ => panic!("should have smartmodule payload"),
303        };
304        assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
305        let wasm = match &sm.wasm {
306            SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
307            #[allow(unreachable_patterns)]
308            _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
309        };
310        assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
311        assert!(matches!(sm.kind, SmartModuleKind::Filter));
312    }
313
314    #[test]
315    fn test_decode_stream_fetch_request_prev_version() {
316        let bytes = vec![
317            0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
318            0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
319            0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
320            0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
321        ];
322        let mut value = DefaultStreamFetchRequest::default();
323        value
324            .decode(
325                &mut std::io::Cursor::new(bytes),
326                DefaultStreamFetchRequest::MAX_API_VERSION - 1,
327            )
328            .unwrap();
329        assert_eq!(value.topic, "one");
330        assert_eq!(value.partition, 3);
331        let sm = match value.smartmodules.first() {
332            Some(wasm) => wasm,
333            _ => panic!("should have smartmodule payload"),
334        };
335        assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
336        let wasm = match &sm.wasm {
337            SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
338            #[allow(unreachable_patterns)]
339            _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
340        };
341        assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
342        assert!(matches!(sm.kind, SmartModuleKind::Filter));
343    }
344
345    #[test]
346    fn test_zip_unzip_works() {
347        const ORIG_LEN: usize = 1024;
348        let orig = vec![0x01; ORIG_LEN];
349        let compressed = SmartModuleInvocationWasm::adhoc_from_bytes(orig.as_slice())
350            .expect("compression failed");
351        assert!(
352            matches!(&compressed, SmartModuleInvocationWasm::AdHoc(ref x) if x.len() < ORIG_LEN)
353        );
354        let uncompressed = compressed.into_raw().expect("decompression failed");
355        assert_eq!(orig, uncompressed);
356    }
357}