camel_processor/stream_codec/
chunks.rs1use crate::stream_codec::{
2 CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
3 CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
4};
5use bytes::{Bytes, BytesMut};
6use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
7use futures::{Stream, StreamExt};
8use std::pin::Pin;
9
10pub struct ChunksCodec;
11
12impl StreamSplitCodec for ChunksCodec {
13 fn split(
14 &self,
15 input: StreamSplitInput,
16 config: StreamSplitConfig,
17 ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
18 let StreamSplitInput {
19 parent,
20 stream,
21 metadata,
22 } = input;
23 let origin = metadata.origin;
24 let content_type = metadata.content_type;
25 let chunk_size = config.chunk_size.unwrap_or(8192);
26
27 Box::pin(async_stream::try_stream! {
28 let mut buffer = BytesMut::new();
29 let mut offset = 0u64;
30 let mut batch: Vec<Bytes> = Vec::new();
31 let mut stream = stream;
32
33 macro_rules! flush_batch {
35 () => {
36 if !batch.is_empty() {
37 let chunks = std::mem::take(&mut batch);
38 let batch_offset = offset - (chunks.len() as u64);
39 let body = if chunks.len() == 1 {
40 Body::Bytes(chunks.into_iter().next().unwrap()) } else {
42 let mut combined = BytesMut::new();
43 for c in &chunks {
44 combined.extend_from_slice(c);
45 }
46 Body::Bytes(combined.freeze())
47 };
48 let mut ex = fragment_stream_exchange(&parent, body);
49 ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
50 if let Some(ref ct) = content_type {
51 ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
52 }
53 if config.include_origin {
54 if let Some(ref o) = origin {
55 ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
56 }
57 }
58 if batch_offset != offset {
59 ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
60 }
61 yield ex;
62 }
63 };
64 }
65
66 while let Some(chunk) = stream.next().await {
67 let chunk = chunk?;
68 buffer.extend_from_slice(&chunk);
69
70 while buffer.len() >= chunk_size {
72 let slice = buffer.split_to(chunk_size);
73 batch.push(slice.freeze());
74 offset += 1;
75
76 if batch.len() >= config.batch_size {
78 flush_batch!();
79 }
80 }
81 }
82
83 if !buffer.is_empty() {
85 if buffer.len() > config.max_record_bytes {
86 flush_batch!();
87 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
88 }
89 let remaining = std::mem::take(&mut buffer);
90 batch.push(remaining.freeze());
91 offset += 1;
92
93 if batch.len() >= config.batch_size {
94 flush_batch!();
95 }
96 }
97
98 flush_batch!();
100 })
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107 use bytes::Bytes;
108 use camel_api::{Message, StreamMetadata};
109 use futures::stream;
110
111 fn make_chunks_input(data: Vec<&[u8]>) -> StreamSplitInput {
112 let chunks = data
113 .into_iter()
114 .map(|d| Ok(Bytes::from(d.to_vec())))
115 .collect::<Vec<_>>();
116 let stream = Box::pin(stream::iter(chunks));
117 let parent = Exchange::new(Message::new(Body::Empty));
118 StreamSplitInput {
119 parent,
120 stream,
121 metadata: StreamMetadata {
122 content_type: Some("application/octet-stream".into()),
123 size_hint: None,
124 origin: Some("test://chunks".into()),
125 },
126 }
127 }
128
129 #[tokio::test]
130 async fn test_chunks_exact_split() {
131 let input = make_chunks_input(vec![b"0123456789"]);
132 let config = StreamSplitConfig {
133 format: camel_api::StreamSplitFormat::Chunks,
134 chunk_size: Some(5),
135 ..Default::default()
136 };
137 let codec = ChunksCodec;
138 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
139 assert_eq!(fragments.len(), 2);
140 let f0 = fragments[0].as_ref().expect("ok");
141 assert!(matches!(&f0.input.body, Body::Bytes(b) if b.len() == 5));
142 assert!(matches!(&f0.input.body, Body::Bytes(b) if b == "01234"));
143 let f1 = fragments[1].as_ref().expect("ok");
144 assert!(matches!(&f1.input.body, Body::Bytes(b) if b == "56789"));
145 }
146
147 #[tokio::test]
148 async fn test_chunks_last_chunk_smaller() {
149 let input = make_chunks_input(vec![b"01234"]);
150 let config = StreamSplitConfig {
151 format: camel_api::StreamSplitFormat::Chunks,
152 chunk_size: Some(3),
153 ..Default::default()
154 };
155 let codec = ChunksCodec;
156 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
157 assert_eq!(fragments.len(), 2);
158 let f0 = fragments[0].as_ref().expect("ok");
159 let f1 = fragments[1].as_ref().expect("ok");
160 assert!(matches!(&f0.input.body, Body::Bytes(b) if b.len() == 3));
161 assert!(matches!(&f0.input.body, Body::Bytes(b) if b == "012"));
162 assert!(matches!(&f1.input.body, Body::Bytes(b) if b.len() == 2));
163 assert!(matches!(&f1.input.body, Body::Bytes(b) if b == "34"));
164 }
165
166 #[tokio::test]
167 async fn test_chunks_empty_stream() {
168 let input = make_chunks_input(vec![]);
169 let config = StreamSplitConfig {
170 format: camel_api::StreamSplitFormat::Chunks,
171 chunk_size: Some(10),
172 ..Default::default()
173 };
174 let codec = ChunksCodec;
175 let count = codec.split(input, config).count().await;
176 assert_eq!(count, 0);
177 }
178
179 #[tokio::test]
180 async fn test_chunks_auto_resolved_to_chunks_codec() {
181 let metadata = StreamMetadata {
182 content_type: Some("application/octet-stream".into()),
183 size_hint: None,
184 origin: None,
185 };
186 let result =
187 crate::stream_codec::resolve_format(&camel_api::StreamSplitFormat::Auto, &metadata);
188 assert_eq!(result.unwrap(), camel_api::StreamSplitFormat::Chunks);
189 }
190
191 #[tokio::test]
192 async fn test_chunks_chunk_size_zero_rejected() {
193 let config = StreamSplitConfig {
194 format: camel_api::StreamSplitFormat::Chunks,
195 chunk_size: Some(0),
196 ..Default::default()
197 };
198 assert!(config.validate().is_err());
199 }
200
201 #[test]
202 fn test_chunks_chunk_size_exceeds_max_record_bytes() {
203 let config = StreamSplitConfig {
204 format: camel_api::StreamSplitFormat::Chunks,
205 chunk_size: Some(2000),
206 max_record_bytes: 1000,
207 ..Default::default()
208 };
209 let err = config.validate().unwrap_err();
210 assert!(
211 err.to_string()
212 .contains("chunk_size must be <= max_record_bytes"),
213 "chunk exceeding max_record_bytes should be rejected by validate()"
214 );
215 }
216
217 #[tokio::test]
218 async fn test_chunks_sets_stream_properties() {
219 let input = make_chunks_input(vec![b"hello"]);
220 let config = StreamSplitConfig {
221 format: camel_api::StreamSplitFormat::Chunks,
222 chunk_size: Some(5),
223 include_origin: true,
224 ..Default::default()
225 };
226 let codec = ChunksCodec;
227 let ex = codec
228 .split(input, config)
229 .next()
230 .await
231 .unwrap()
232 .expect("ok");
233 assert_eq!(
234 ex.property(CAMEL_STREAM_ORIGIN),
235 Some(&Value::String("test://chunks".into()))
236 );
237 }
238
239 #[tokio::test]
240 async fn test_chunks_include_origin_false() {
241 let input = make_chunks_input(vec![b"hello"]);
242 let config = StreamSplitConfig {
243 format: camel_api::StreamSplitFormat::Chunks,
244 chunk_size: Some(5),
245 include_origin: false,
246 ..Default::default()
247 };
248 let codec = ChunksCodec;
249 let ex = codec
250 .split(input, config)
251 .next()
252 .await
253 .unwrap()
254 .expect("ok");
255 assert!(
256 ex.property(CAMEL_STREAM_ORIGIN).is_none(),
257 "Origin should not be set when include_origin=false"
258 );
259 }
260
261 #[tokio::test]
262 async fn test_chunks_headers_excluded() {
263 let mut parent = Exchange::new(Message::new(Body::Empty));
264 parent.input.headers.insert(
265 "Content-Type".into(),
266 Value::String("application/octet-stream".into()),
267 );
268 parent
269 .input
270 .headers
271 .insert("Content-Length".into(), Value::String("5".into()));
272 parent
273 .input
274 .headers
275 .insert("X-Custom".into(), Value::String("kept".into()));
276 let data = vec![Ok(Bytes::from(b"hello" as &[u8]))];
277 let stream = Box::pin(stream::iter(data));
278 let input = StreamSplitInput {
279 parent,
280 stream,
281 metadata: StreamMetadata {
282 content_type: Some("application/octet-stream".into()),
283 size_hint: None,
284 origin: None,
285 },
286 };
287 let config = StreamSplitConfig {
288 format: camel_api::StreamSplitFormat::Chunks,
289 chunk_size: Some(5),
290 ..Default::default()
291 };
292 let codec = ChunksCodec;
293 let ex = codec
294 .split(input, config)
295 .next()
296 .await
297 .unwrap()
298 .expect("ok");
299 assert!(
300 ex.input.headers.get("Content-Type").is_none(),
301 "Content-Type should be excluded"
302 );
303 assert!(
304 ex.input.headers.get("Content-Length").is_none(),
305 "Content-Length should be excluded"
306 );
307 assert_eq!(
308 ex.input.headers.get("X-Custom"),
309 Some(&Value::String("kept".into()))
310 );
311 }
312
313 #[tokio::test]
314 async fn test_chunks_sets_offset_property() {
315 let input = make_chunks_input(vec![b"0123456789"]);
316 let config = StreamSplitConfig {
317 format: camel_api::StreamSplitFormat::Chunks,
318 chunk_size: Some(5),
319 ..Default::default()
320 };
321 let codec = ChunksCodec;
322 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
323 assert_eq!(fragments.len(), 2);
324 for (i, frag) in fragments.iter().enumerate() {
325 let ex = frag.as_ref().expect("should be ok");
326 assert_eq!(
327 ex.property(CAMEL_STREAM_OFFSET),
328 Some(&Value::from(i as i64))
329 );
330 }
331 }
332
333 #[tokio::test]
334 async fn test_chunks_batch_size_two() {
335 let input = make_chunks_input(vec![b"0123456789"]);
336 let config = StreamSplitConfig {
337 format: camel_api::StreamSplitFormat::Chunks,
338 chunk_size: Some(3),
339 batch_size: 2,
340 ..Default::default()
341 };
342 let codec = ChunksCodec;
343 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
344 assert_eq!(fragments.len(), 2);
346
347 let ex0 = fragments[0].as_ref().expect("should be ok");
349 assert!(matches!(&ex0.input.body, Body::Bytes(b) if b == "012345"));
350 assert_eq!(ex0.property(CAMEL_STREAM_OFFSET), Some(&Value::from(0i64)));
351 assert_eq!(
352 ex0.property(CAMEL_STREAM_BATCH_SIZE),
353 Some(&Value::from(2i64))
354 );
355
356 let ex1 = fragments[1].as_ref().expect("should be ok");
358 assert!(matches!(&ex1.input.body, Body::Bytes(b) if b == "6789"));
359 assert_eq!(ex1.property(CAMEL_STREAM_OFFSET), Some(&Value::from(2i64)));
360 assert_eq!(
361 ex1.property(CAMEL_STREAM_BATCH_SIZE),
362 Some(&Value::from(2i64))
363 );
364 }
365
366 #[tokio::test]
367 async fn test_chunks_partial_chunk_flushed_at_end() {
368 let input = make_chunks_input(vec![b"abcde"]);
369 let config = StreamSplitConfig {
370 format: camel_api::StreamSplitFormat::Chunks,
371 chunk_size: Some(3),
372 ..Default::default()
373 };
374 let codec = ChunksCodec;
375 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
376 assert_eq!(fragments.len(), 2);
377 assert!(matches!(
378 &fragments[0].as_ref().expect("ok").input.body,
379 Body::Bytes(b) if b == "abc"
380 ));
381 assert!(matches!(
382 &fragments[1].as_ref().expect("ok").input.body,
383 Body::Bytes(b) if b == "de"
384 ));
385 }
386}