Skip to main content

camel_processor/stream_codec/
chunks.rs

1use crate::stream_codec::{
2    CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
3    CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
4};
5use bytes::{Bytes, BytesMut};
6use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
7use futures::{Stream, StreamExt};
8use std::pin::Pin;
9
10pub struct ChunksCodec;
11
12impl StreamSplitCodec for ChunksCodec {
13    fn split(
14        &self,
15        input: StreamSplitInput,
16        config: StreamSplitConfig,
17    ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
18        let StreamSplitInput {
19            parent,
20            stream,
21            metadata,
22        } = input;
23        let origin = metadata.origin;
24        let content_type = metadata.content_type;
25        let chunk_size = config.chunk_size.unwrap_or(8192);
26
27        Box::pin(async_stream::try_stream! {
28            let mut buffer = BytesMut::new();
29            let mut offset = 0u64;
30            let mut batch: Vec<Bytes> = Vec::new();
31            let mut stream = stream;
32
33            // Macro to flush the current batch as a single exchange
34            macro_rules! flush_batch {
35                () => {
36                    if !batch.is_empty() {
37                        let chunks = std::mem::take(&mut batch);
38                        let batch_offset = offset - (chunks.len() as u64);
39                        let body = if chunks.len() == 1 {
40                            Body::Bytes(chunks.into_iter().next().unwrap()) // allow-unwrap: len==1 guaranteed by enclosing if
41                        } else {
42                            let mut combined = BytesMut::new();
43                            for c in &chunks {
44                                combined.extend_from_slice(c);
45                            }
46                            Body::Bytes(combined.freeze())
47                        };
48                        let mut ex = fragment_stream_exchange(&parent, body);
49                        ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
50                        if let Some(ref ct) = content_type {
51                            ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
52                        }
53                        if config.include_origin {
54                            if let Some(ref o) = origin {
55                                ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
56                            }
57                        }
58                        if batch_offset != offset {
59                            ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
60                        }
61                        yield ex;
62                    }
63                };
64            }
65
66            while let Some(chunk) = stream.next().await {
67                let chunk = chunk?;
68                buffer.extend_from_slice(&chunk);
69
70                // Emit full chunks from the buffer
71                while buffer.len() >= chunk_size {
72                    let slice = buffer.split_to(chunk_size);
73                    batch.push(slice.freeze());
74                    offset += 1;
75
76                    // Flush if batch is full
77                    if batch.len() >= config.batch_size {
78                        flush_batch!();
79                    }
80                }
81            }
82
83            // Handle remaining data after stream ends (last partial chunk)
84            if !buffer.is_empty() {
85                if buffer.len() > config.max_record_bytes {
86                    flush_batch!();
87                    Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
88                }
89                let remaining = std::mem::take(&mut buffer);
90                batch.push(remaining.freeze());
91                offset += 1;
92
93                if batch.len() >= config.batch_size {
94                    flush_batch!();
95                }
96            }
97
98            // Final flush at end of stream
99            flush_batch!();
100        })
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107    use bytes::Bytes;
108    use camel_api::{Message, StreamMetadata};
109    use futures::stream;
110
111    fn make_chunks_input(data: Vec<&[u8]>) -> StreamSplitInput {
112        let chunks = data
113            .into_iter()
114            .map(|d| Ok(Bytes::from(d.to_vec())))
115            .collect::<Vec<_>>();
116        let stream = Box::pin(stream::iter(chunks));
117        let parent = Exchange::new(Message::new(Body::Empty));
118        StreamSplitInput {
119            parent,
120            stream,
121            metadata: StreamMetadata {
122                content_type: Some("application/octet-stream".into()),
123                size_hint: None,
124                origin: Some("test://chunks".into()),
125            },
126        }
127    }
128
129    #[tokio::test]
130    async fn test_chunks_exact_split() {
131        let input = make_chunks_input(vec![b"0123456789"]);
132        let config = StreamSplitConfig {
133            format: camel_api::StreamSplitFormat::Chunks,
134            chunk_size: Some(5),
135            ..Default::default()
136        };
137        let codec = ChunksCodec;
138        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
139        assert_eq!(fragments.len(), 2);
140        let f0 = fragments[0].as_ref().expect("ok");
141        assert!(matches!(&f0.input.body, Body::Bytes(b) if b.len() == 5));
142        assert!(matches!(&f0.input.body, Body::Bytes(b) if b == "01234"));
143        let f1 = fragments[1].as_ref().expect("ok");
144        assert!(matches!(&f1.input.body, Body::Bytes(b) if b == "56789"));
145    }
146
147    #[tokio::test]
148    async fn test_chunks_last_chunk_smaller() {
149        let input = make_chunks_input(vec![b"01234"]);
150        let config = StreamSplitConfig {
151            format: camel_api::StreamSplitFormat::Chunks,
152            chunk_size: Some(3),
153            ..Default::default()
154        };
155        let codec = ChunksCodec;
156        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
157        assert_eq!(fragments.len(), 2);
158        let f0 = fragments[0].as_ref().expect("ok");
159        let f1 = fragments[1].as_ref().expect("ok");
160        assert!(matches!(&f0.input.body, Body::Bytes(b) if b.len() == 3));
161        assert!(matches!(&f0.input.body, Body::Bytes(b) if b == "012"));
162        assert!(matches!(&f1.input.body, Body::Bytes(b) if b.len() == 2));
163        assert!(matches!(&f1.input.body, Body::Bytes(b) if b == "34"));
164    }
165
166    #[tokio::test]
167    async fn test_chunks_empty_stream() {
168        let input = make_chunks_input(vec![]);
169        let config = StreamSplitConfig {
170            format: camel_api::StreamSplitFormat::Chunks,
171            chunk_size: Some(10),
172            ..Default::default()
173        };
174        let codec = ChunksCodec;
175        let count = codec.split(input, config).count().await;
176        assert_eq!(count, 0);
177    }
178
179    #[tokio::test]
180    async fn test_chunks_auto_resolved_to_chunks_codec() {
181        let metadata = StreamMetadata {
182            content_type: Some("application/octet-stream".into()),
183            size_hint: None,
184            origin: None,
185        };
186        let result =
187            crate::stream_codec::resolve_format(&camel_api::StreamSplitFormat::Auto, &metadata);
188        assert_eq!(result.unwrap(), camel_api::StreamSplitFormat::Chunks);
189    }
190
191    #[tokio::test]
192    async fn test_chunks_chunk_size_zero_rejected() {
193        let config = StreamSplitConfig {
194            format: camel_api::StreamSplitFormat::Chunks,
195            chunk_size: Some(0),
196            ..Default::default()
197        };
198        assert!(config.validate().is_err());
199    }
200
201    #[test]
202    fn test_chunks_chunk_size_exceeds_max_record_bytes() {
203        let config = StreamSplitConfig {
204            format: camel_api::StreamSplitFormat::Chunks,
205            chunk_size: Some(2000),
206            max_record_bytes: 1000,
207            ..Default::default()
208        };
209        let err = config.validate().unwrap_err();
210        assert!(
211            err.to_string()
212                .contains("chunk_size must be <= max_record_bytes"),
213            "chunk exceeding max_record_bytes should be rejected by validate()"
214        );
215    }
216
217    #[tokio::test]
218    async fn test_chunks_sets_stream_properties() {
219        let input = make_chunks_input(vec![b"hello"]);
220        let config = StreamSplitConfig {
221            format: camel_api::StreamSplitFormat::Chunks,
222            chunk_size: Some(5),
223            include_origin: true,
224            ..Default::default()
225        };
226        let codec = ChunksCodec;
227        let ex = codec
228            .split(input, config)
229            .next()
230            .await
231            .unwrap()
232            .expect("ok");
233        assert_eq!(
234            ex.property(CAMEL_STREAM_ORIGIN),
235            Some(&Value::String("test://chunks".into()))
236        );
237    }
238
239    #[tokio::test]
240    async fn test_chunks_include_origin_false() {
241        let input = make_chunks_input(vec![b"hello"]);
242        let config = StreamSplitConfig {
243            format: camel_api::StreamSplitFormat::Chunks,
244            chunk_size: Some(5),
245            include_origin: false,
246            ..Default::default()
247        };
248        let codec = ChunksCodec;
249        let ex = codec
250            .split(input, config)
251            .next()
252            .await
253            .unwrap()
254            .expect("ok");
255        assert!(
256            ex.property(CAMEL_STREAM_ORIGIN).is_none(),
257            "Origin should not be set when include_origin=false"
258        );
259    }
260
261    #[tokio::test]
262    async fn test_chunks_headers_excluded() {
263        let mut parent = Exchange::new(Message::new(Body::Empty));
264        parent.input.headers.insert(
265            "Content-Type".into(),
266            Value::String("application/octet-stream".into()),
267        );
268        parent
269            .input
270            .headers
271            .insert("Content-Length".into(), Value::String("5".into()));
272        parent
273            .input
274            .headers
275            .insert("X-Custom".into(), Value::String("kept".into()));
276        let data = vec![Ok(Bytes::from(b"hello" as &[u8]))];
277        let stream = Box::pin(stream::iter(data));
278        let input = StreamSplitInput {
279            parent,
280            stream,
281            metadata: StreamMetadata {
282                content_type: Some("application/octet-stream".into()),
283                size_hint: None,
284                origin: None,
285            },
286        };
287        let config = StreamSplitConfig {
288            format: camel_api::StreamSplitFormat::Chunks,
289            chunk_size: Some(5),
290            ..Default::default()
291        };
292        let codec = ChunksCodec;
293        let ex = codec
294            .split(input, config)
295            .next()
296            .await
297            .unwrap()
298            .expect("ok");
299        assert!(
300            ex.input.headers.get("Content-Type").is_none(),
301            "Content-Type should be excluded"
302        );
303        assert!(
304            ex.input.headers.get("Content-Length").is_none(),
305            "Content-Length should be excluded"
306        );
307        assert_eq!(
308            ex.input.headers.get("X-Custom"),
309            Some(&Value::String("kept".into()))
310        );
311    }
312
313    #[tokio::test]
314    async fn test_chunks_sets_offset_property() {
315        let input = make_chunks_input(vec![b"0123456789"]);
316        let config = StreamSplitConfig {
317            format: camel_api::StreamSplitFormat::Chunks,
318            chunk_size: Some(5),
319            ..Default::default()
320        };
321        let codec = ChunksCodec;
322        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
323        assert_eq!(fragments.len(), 2);
324        for (i, frag) in fragments.iter().enumerate() {
325            let ex = frag.as_ref().expect("should be ok");
326            assert_eq!(
327                ex.property(CAMEL_STREAM_OFFSET),
328                Some(&Value::from(i as i64))
329            );
330        }
331    }
332
333    #[tokio::test]
334    async fn test_chunks_batch_size_two() {
335        let input = make_chunks_input(vec![b"0123456789"]);
336        let config = StreamSplitConfig {
337            format: camel_api::StreamSplitFormat::Chunks,
338            chunk_size: Some(3),
339            batch_size: 2,
340            ..Default::default()
341        };
342        let codec = ChunksCodec;
343        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
344        // 10 bytes / 3 = 4 chunks (3+3+3+1), batched as 2+2 => 2 fragments
345        assert_eq!(fragments.len(), 2);
346
347        // First batch: chunks 0+1 -> "012" + "345" = "012345", offset=0, chunk count=2
348        let ex0 = fragments[0].as_ref().expect("should be ok");
349        assert!(matches!(&ex0.input.body, Body::Bytes(b) if b == "012345"));
350        assert_eq!(ex0.property(CAMEL_STREAM_OFFSET), Some(&Value::from(0i64)));
351        assert_eq!(
352            ex0.property(CAMEL_STREAM_BATCH_SIZE),
353            Some(&Value::from(2i64))
354        );
355
356        // Second batch: chunks 2+3 -> "678" + "9" = "6789", offset=2, chunk count=2
357        let ex1 = fragments[1].as_ref().expect("should be ok");
358        assert!(matches!(&ex1.input.body, Body::Bytes(b) if b == "6789"));
359        assert_eq!(ex1.property(CAMEL_STREAM_OFFSET), Some(&Value::from(2i64)));
360        assert_eq!(
361            ex1.property(CAMEL_STREAM_BATCH_SIZE),
362            Some(&Value::from(2i64))
363        );
364    }
365
366    #[tokio::test]
367    async fn test_chunks_partial_chunk_flushed_at_end() {
368        let input = make_chunks_input(vec![b"abcde"]);
369        let config = StreamSplitConfig {
370            format: camel_api::StreamSplitFormat::Chunks,
371            chunk_size: Some(3),
372            ..Default::default()
373        };
374        let codec = ChunksCodec;
375        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
376        assert_eq!(fragments.len(), 2);
377        assert!(matches!(
378            &fragments[0].as_ref().expect("ok").input.body,
379            Body::Bytes(b) if b == "abc"
380        ));
381        assert!(matches!(
382            &fragments[1].as_ref().expect("ok").input.body,
383            Body::Bytes(b) if b == "de"
384        ));
385    }
386}