1use std::fmt::Debug;
7use std::marker::PhantomData;
8
9use educe::Educe;
10use derive_builder::Builder;
11
12use fluvio_protocol::record::RawRecords;
13use fluvio_protocol::{Encoder, Decoder};
14use fluvio_protocol::api::Request;
15use fluvio_protocol::record::RecordSet;
16use fluvio_smartmodule::dataplane::smartmodule::SmartModuleExtraParams;
17use fluvio_types::{PartitionId, defaults::FLUVIO_CLIENT_MAX_FETCH_BYTES};
18
19use crate::COMMON_VERSION;
20use crate::fetch::FetchablePartitionResponse;
21use crate::isolation::Isolation;
22
23pub type DefaultStreamFetchResponse = StreamFetchResponse<RecordSet<RawRecords>>;
24pub type DefaultStreamFetchRequest = StreamFetchRequest<RecordSet<RawRecords>>;
25
26use super::SpuServerApiKey;
27#[allow(deprecated)]
28use super::smartmodule::SmartModuleInvocation;
29
30pub const WASM_MODULE_API: i16 = 11;
32pub const WASM_MODULE_V2_API: i16 = 12;
33
34pub const AGGREGATOR_API: i16 = 13;
36
37pub const GZIP_WASM_API: i16 = 14;
39
40pub const ARRAY_MAP_WASM_API: i16 = 15;
42
43pub const SMART_MODULE_API: i16 = 16;
45
46pub const GENERIC_SMARTMODULE_API: i16 = 17;
47pub const CHAIN_SMARTMODULE_API: i16 = 18;
48
49pub const SMARTMODULE_LOOKBACK: i16 = 20;
50
51pub const SMARTMODULE_LOOKBACK_AGE: i16 = 21;
52
53pub const SMARTMODULE_TIMESTAMP: i16 = 22;
54
55pub const OFFSET_MANAGEMENT_API: i16 = 23;
56
57#[allow(deprecated)]
60#[derive(Decoder, Encoder, Builder, Default, Educe)]
61#[builder(setter(into))]
62#[educe(Debug)]
63pub struct StreamFetchRequest<R> {
64 pub topic: String,
65 #[builder(default = "0")]
66 pub partition: PartitionId,
67 #[builder(default = "0")]
68 pub fetch_offset: i64,
69 #[builder(default = "FLUVIO_CLIENT_MAX_FETCH_BYTES")]
70 pub max_bytes: i32,
71 #[builder(default = "Isolation::ReadUncommitted")]
72 pub isolation: Isolation,
73 #[educe(Debug(ignore))]
75 #[builder(setter(skip))]
76 #[fluvio(min_version = 11, max_version = 18)]
77 wasm_module: Vec<u8>,
78 #[builder(setter(skip))]
79 #[fluvio(min_version = 16, max_version = 18)]
80 smartmodule: Option<SmartModuleInvocation>,
81 #[builder(setter(skip))]
82 #[fluvio(min_version = 16, max_version = 18)]
83 derivedstream: Option<DerivedStreamInvocation>,
84 #[builder(default)]
85 #[fluvio(min_version = 18)]
86 pub smartmodules: Vec<SmartModuleInvocation>,
87 #[builder(default)]
88 #[fluvio(min_version = 23)]
89 pub consumer_id: Option<String>,
90 #[builder(setter(skip))]
91 data: PhantomData<R>,
92}
93
94impl<R> StreamFetchRequest<R>
95where
96 R: Clone,
97{
98 pub fn builder() -> StreamFetchRequestBuilder<R> {
99 StreamFetchRequestBuilder::default()
100 }
101}
102
103impl<R> Request for StreamFetchRequest<R>
104where
105 R: Debug + Decoder + Encoder,
106{
107 const API_KEY: u16 = SpuServerApiKey::StreamFetch as u16;
108 const DEFAULT_API_VERSION: i16 = COMMON_VERSION;
109 type Response = StreamFetchResponse<R>;
110}
111
112#[derive(Debug, Default, Clone, Encoder, Decoder)]
113pub(crate) struct DerivedStreamInvocation {
114 pub stream: String,
115 pub params: SmartModuleExtraParams,
116}
117
118#[derive(Encoder, Decoder, Default, Debug)]
119pub struct StreamFetchResponse<R> {
120 pub topic: String,
121 pub stream_id: u32,
122 pub partition: FetchablePartitionResponse<R>,
123}
124
125#[cfg(feature = "file")]
126pub use file::*;
127
128#[cfg(feature = "file")]
129mod file {
130
131 use std::io::Error as IoError;
132
133 use tracing::trace;
134 use bytes::BytesMut;
135
136 use fluvio_protocol::Version;
137 use fluvio_protocol::store::{StoreValue, FileWrite};
138
139 use crate::file::FileRecordSet;
140
141 pub type FileStreamFetchRequest = StreamFetchRequest<FileRecordSet>;
142
143 use super::*;
144
145 impl FileWrite for StreamFetchResponse<FileRecordSet> {
146 fn file_encode(
147 &self,
148 src: &mut BytesMut,
149 data: &mut Vec<StoreValue>,
150 version: Version,
151 ) -> Result<(), IoError> {
152 trace!("file encoding FlvContinuousFetchResponse");
153 trace!("topic {}", self.topic);
154 self.topic.encode(src, version)?;
155 self.stream_id.encode(src, version)?;
156 self.partition.file_encode(src, data, version)?;
157 Ok(())
158 }
159 }
160}
161
162#[cfg(test)]
163mod tests {
164
165 use fluvio_smartmodule::dataplane::smartmodule::Lookback;
166
167 use crate::server::smartmodule::{SmartModuleInvocationWasm, SmartModuleKind};
168
169 use super::*;
170
171 #[test]
172 fn test_encode_stream_fetch_request() {
173 let mut dest = Vec::new();
174 let value = DefaultStreamFetchRequest {
175 topic: "one".to_string(),
176 partition: 3,
177 smartmodules: vec![
178 (SmartModuleInvocation {
179 wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
180 kind: SmartModuleKind::Filter,
181 ..Default::default()
182 }),
183 ],
184 ..Default::default()
185 };
186 value
187 .encode(&mut dest, CHAIN_SMARTMODULE_API)
188 .expect("should encode");
189 let expected = vec![
190 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
191 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
192 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
193 0x00, 0x00,
194 ];
195 assert_eq!(dest, expected);
196 }
197
198 #[test]
199 fn test_encode_stream_fetch_request_last_version() {
200 let mut dest = Vec::new();
201 let mut params = SmartModuleExtraParams::default();
202 params.set_lookback(Some(Lookback::last(1)));
203 let value = DefaultStreamFetchRequest {
204 topic: "one".to_string(),
205 partition: 3,
206 smartmodules: vec![
207 (SmartModuleInvocation {
208 wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
209 kind: SmartModuleKind::Filter,
210 params,
211 }),
212 ],
213 ..Default::default()
214 };
215 value
216 .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION)
217 .expect("should encode");
218 let expected = vec![
219 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
220 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
221 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
222 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00,
223 ];
224 assert_eq!(dest, expected);
225 }
226
227 #[test]
228 fn test_encode_stream_fetch_request_prev_version() {
229 let mut dest = Vec::new();
230 let mut params = SmartModuleExtraParams::default();
231 params.set_lookback(Some(Lookback::last(1)));
232 let value = DefaultStreamFetchRequest {
233 topic: "one".to_string(),
234 partition: 3,
235 smartmodules: vec![
236 (SmartModuleInvocation {
237 wasm: SmartModuleInvocationWasm::AdHoc(vec![0xde, 0xad, 0xbe, 0xef]),
238 kind: SmartModuleKind::Filter,
239 params,
240 }),
241 ],
242 ..Default::default()
243 };
244 value
245 .encode(&mut dest, DefaultStreamFetchRequest::MAX_API_VERSION - 1)
246 .expect("should encode");
247 let expected = vec![
248 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
249 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
250 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
251 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
252 ];
253 assert_eq!(dest, expected);
254 }
255
256 #[test]
257 fn test_decode_stream_fetch_request() {
258 let bytes = vec![
259 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
260 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
261 0x00, 0x00, 0x00, 0x01, 0x01, 0x00, 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00,
262 0x00, 0x00,
263 ];
264 let mut value = DefaultStreamFetchRequest::default();
265 value
266 .decode(&mut std::io::Cursor::new(bytes), CHAIN_SMARTMODULE_API)
267 .unwrap();
268 assert_eq!(value.topic, "one");
269 assert_eq!(value.partition, 3);
270 let sm = match value.smartmodules.first() {
271 Some(wasm) => wasm,
272 _ => panic!("should have smartmodule payload"),
273 };
274 let wasm = match &sm.wasm {
275 SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
276 #[allow(unreachable_patterns)]
277 _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
278 };
279 assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
280 assert!(matches!(sm.kind, SmartModuleKind::Filter));
281 }
282
283 #[test]
284 fn test_decode_stream_fetch_request_last_version() {
285 let bytes = vec![
286 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
287 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
288 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
289 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
290 ];
291 let mut value = DefaultStreamFetchRequest::default();
292 value
293 .decode(
294 &mut std::io::Cursor::new(bytes),
295 DefaultStreamFetchRequest::MAX_API_VERSION,
296 )
297 .unwrap();
298 assert_eq!(value.topic, "one");
299 assert_eq!(value.partition, 3);
300 let sm = match value.smartmodules.first() {
301 Some(wasm) => wasm,
302 _ => panic!("should have smartmodule payload"),
303 };
304 assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
305 let wasm = match &sm.wasm {
306 SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
307 #[allow(unreachable_patterns)]
308 _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
309 };
310 assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
311 assert!(matches!(sm.kind, SmartModuleKind::Filter));
312 }
313
314 #[test]
315 fn test_decode_stream_fetch_request_prev_version() {
316 let bytes = vec![
317 0x00, 0x03, 0x6f, 0x6e, 0x65, 0x00, 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00,
318 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x00,
319 0x00, 0x00, 0x04, 0xde, 0xad, 0xbe, 0xef, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00,
320 0x00, 0x00, 0x00, 0x00, 0x01, 0x00,
321 ];
322 let mut value = DefaultStreamFetchRequest::default();
323 value
324 .decode(
325 &mut std::io::Cursor::new(bytes),
326 DefaultStreamFetchRequest::MAX_API_VERSION - 1,
327 )
328 .unwrap();
329 assert_eq!(value.topic, "one");
330 assert_eq!(value.partition, 3);
331 let sm = match value.smartmodules.first() {
332 Some(wasm) => wasm,
333 _ => panic!("should have smartmodule payload"),
334 };
335 assert_eq!(sm.params.lookback(), Some(&Lookback::last(1)));
336 let wasm = match &sm.wasm {
337 SmartModuleInvocationWasm::AdHoc(wasm) => wasm.as_slice(),
338 #[allow(unreachable_patterns)]
339 _ => panic!("should be SmartModuleInvocationWasm::AdHoc"),
340 };
341 assert_eq!(wasm, vec![0xde, 0xad, 0xbe, 0xef]);
342 assert!(matches!(sm.kind, SmartModuleKind::Filter));
343 }
344
345 #[test]
346 fn test_zip_unzip_works() {
347 const ORIG_LEN: usize = 1024;
348 let orig = vec![0x01; ORIG_LEN];
349 let compressed = SmartModuleInvocationWasm::adhoc_from_bytes(orig.as_slice())
350 .expect("compression failed");
351 assert!(
352 matches!(&compressed, SmartModuleInvocationWasm::AdHoc(ref x) if x.len() < ORIG_LEN)
353 );
354 let uncompressed = compressed.into_raw().expect("decompression failed");
355 assert_eq!(orig, uncompressed);
356 }
357}