Skip to main content

camel_processor/stream_codec/
lines.rs

1use crate::stream_codec::{
2    CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
3    CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
4};
5use bytes::BytesMut;
6use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
7use futures::{Stream, StreamExt};
8use std::pin::Pin;
9
10pub struct LinesCodec;
11
12impl StreamSplitCodec for LinesCodec {
13    fn split(
14        &self,
15        input: StreamSplitInput,
16        config: StreamSplitConfig,
17    ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
18        let StreamSplitInput {
19            parent,
20            stream,
21            metadata,
22        } = input;
23        let origin = metadata.origin;
24        let content_type = metadata.content_type;
25
26        Box::pin(async_stream::try_stream! {
27            let mut buffer = BytesMut::new();
28            let mut offset = 0u64;
29            let mut batch: Vec<String> = Vec::new();
30            let mut stream = stream;
31
32            // Macro to flush the current batch as a single exchange
33            macro_rules! flush_batch {
34                () => {
35                    if !batch.is_empty() {
36                        let lines = std::mem::take(&mut batch);
37                        let batch_offset = offset - (lines.len() as u64);
38                        let body = if lines.len() == 1 {
39                            Body::Text(lines.into_iter().next().unwrap()) // allow-unwrap: len==1 guaranteed by enclosing if
40                        } else {
41                            Body::Text(lines.join("\n"))
42                        };
43                        let mut ex = fragment_stream_exchange(&parent, body);
44                        ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
45                        if let Some(ref ct) = content_type {
46                            ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
47                        }
48                        if config.include_origin {
49                            if let Some(ref o) = origin {
50                                ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
51                            }
52                        }
53                        if batch_offset != offset {
54                            ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
55                        }
56                        yield ex;
57                    }
58                };
59            }
60
61            while let Some(chunk) = stream.next().await {
62                let chunk = chunk?;
63                buffer.extend_from_slice(&chunk);
64
65                // Process all complete lines in the buffer
66                loop {
67                    // Check for line-too-long before finding a newline
68                    if buffer.len() > config.max_record_bytes {
69                        // Check if there's actually a newline in the buffer
70                        if !buffer.contains(&b'\n') {
71                            flush_batch!();
72                            Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
73                        }
74                    }
75
76                    let newline_pos = match buffer.iter().position(|&b| b == b'\n') {
77                        Some(pos) => pos,
78                        None => break, // need more data
79                    };
80
81                    let line_bytes = buffer.split_to(newline_pos + 1); // include '\n'
82                    let line = &line_bytes[..newline_pos]; // exclude '\n'
83
84                    // Check max_record_bytes on the line (without newline)
85                    if line.len() > config.max_record_bytes {
86                        flush_batch!();
87                        Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
88                    }
89
90                    // Convert to string
91                    let trimmed = match std::str::from_utf8(line) {
92                        Ok(s) => s.trim(),
93                        Err(_) => {
94                            flush_batch!();
95                            Err(CamelError::TypeConversionFailed(
96                                "Line is not valid UTF-8".into(),
97                            ))?;
98                            unreachable!();
99                        }
100                    };
101
102                    // Skip empty/blank lines
103                    if trimmed.is_empty() {
104                        continue;
105                    }
106
107                    batch.push(trimmed.to_string());
108                    offset += 1;
109
110                    // Flush if batch is full
111                    if batch.len() >= config.batch_size {
112                        flush_batch!();
113                    }
114                }
115            }
116
117            // Handle remaining data after stream ends (last line without trailing newline)
118            if !buffer.is_empty() {
119                let line = std::mem::take(&mut buffer);
120                let trimmed = match std::str::from_utf8(&line) {
121                    Ok(s) => s.trim(),
122                    Err(_) => {
123                        flush_batch!();
124                        Err(CamelError::TypeConversionFailed(
125                            "Line is not valid UTF-8".into(),
126                        ))?;
127                        unreachable!();
128                    }
129                };
130
131                if !trimmed.is_empty() {
132                    if line.len() > config.max_record_bytes {
133                        flush_batch!();
134                        Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
135                    }
136
137                    batch.push(trimmed.to_string());
138                    offset += 1;
139
140                    if batch.len() >= config.batch_size {
141                        flush_batch!();
142                    }
143                }
144            }
145
146            // Final flush at end of stream
147            flush_batch!();
148        })
149    }
150}
151
152#[cfg(test)]
153mod tests {
154    use super::*;
155    use bytes::Bytes;
156    use camel_api::{Message, StreamMetadata};
157    use futures::stream;
158
159    fn make_lines_input(lines: Vec<&str>) -> StreamSplitInput {
160        let data = lines
161            .iter()
162            .map(|l| Ok(Bytes::from(format!("{}\n", l))))
163            .collect::<Vec<_>>();
164        let stream = Box::pin(stream::iter(data));
165        let parent = Exchange::new(Message::new(Body::Empty));
166        StreamSplitInput {
167            parent,
168            stream,
169            metadata: StreamMetadata {
170                content_type: Some("text/plain".into()),
171                size_hint: None,
172                origin: Some("test://lines".into()),
173            },
174        }
175    }
176
177    #[tokio::test]
178    async fn test_lines_splits_three_lines() {
179        let input = make_lines_input(vec!["hello", "world", "foo"]);
180        let config = StreamSplitConfig::default();
181        let codec = LinesCodec;
182        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
183        assert_eq!(fragments.len(), 3);
184        let bodies: Vec<_> = fragments.into_iter().map(|r| r.expect("ok")).collect();
185        assert!(matches!(&bodies[0].input.body, Body::Text(s) if s == "hello"));
186        assert!(matches!(&bodies[1].input.body, Body::Text(s) if s == "world"));
187        assert!(matches!(&bodies[2].input.body, Body::Text(s) if s == "foo"));
188    }
189
190    #[tokio::test]
191    async fn test_lines_empty_stream() {
192        let input = make_lines_input(vec![]);
193        let config = StreamSplitConfig::default();
194        let codec = LinesCodec;
195        let count = codec.split(input, config).count().await;
196        assert_eq!(count, 0);
197    }
198
199    #[tokio::test]
200    async fn test_lines_empty_lines_skipped() {
201        let input = make_lines_input(vec!["a", "", "b"]);
202        let config = StreamSplitConfig::default();
203        let codec = LinesCodec;
204        let count = codec.split(input, config).count().await;
205        assert_eq!(count, 2);
206    }
207
208    #[tokio::test]
209    async fn test_lines_trailing_newline_no_extra_fragment() {
210        let data = vec![Ok(Bytes::from("hello\nworld\n"))];
211        let stream = Box::pin(stream::iter(data));
212        let parent = Exchange::new(Message::new(Body::Empty));
213        let input = StreamSplitInput {
214            parent,
215            stream,
216            metadata: StreamMetadata {
217                content_type: Some("text/plain".into()),
218                size_hint: None,
219                origin: None,
220            },
221        };
222        let config = StreamSplitConfig::default();
223        let codec = LinesCodec;
224        let count = codec.split(input, config).count().await;
225        assert_eq!(count, 2);
226    }
227
228    #[tokio::test]
229    async fn test_lines_exceeds_max_record_bytes() {
230        let long_line = "x".repeat(2000);
231        let input = make_lines_input(vec![&long_line]);
232        let config = StreamSplitConfig {
233            max_record_bytes: 100,
234            ..Default::default()
235        };
236        let codec = LinesCodec;
237        let result = codec.split(input, config).next().await.unwrap();
238        assert!(result.is_err());
239    }
240
241    #[tokio::test]
242    async fn test_lines_sets_stream_properties() {
243        let input = make_lines_input(vec!["hello"]);
244        let config = StreamSplitConfig {
245            include_origin: true,
246            ..Default::default()
247        };
248        let codec = LinesCodec;
249        let ex = codec
250            .split(input, config)
251            .next()
252            .await
253            .unwrap()
254            .expect("ok");
255        assert_eq!(
256            ex.property(CAMEL_STREAM_ORIGIN),
257            Some(&Value::String("test://lines".into()))
258        );
259    }
260
261    #[tokio::test]
262    async fn test_lines_sets_offset_property() {
263        let input = make_lines_input(vec!["alpha", "beta", "gamma"]);
264        let config = StreamSplitConfig::default();
265        let codec = LinesCodec;
266        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
267        assert_eq!(fragments.len(), 3);
268        for (i, frag) in fragments.iter().enumerate() {
269            let ex = frag.as_ref().expect("should be ok");
270            assert_eq!(
271                ex.property(CAMEL_STREAM_OFFSET),
272                Some(&Value::from(i as i64))
273            );
274        }
275    }
276
277    #[tokio::test]
278    async fn test_lines_headers_excluded() {
279        let mut parent = Exchange::new(Message::new(Body::Empty));
280        parent
281            .input
282            .headers
283            .insert("Content-Type".into(), Value::String("text/plain".into()));
284        parent
285            .input
286            .headers
287            .insert("Content-Length".into(), Value::String("42".into()));
288        parent
289            .input
290            .headers
291            .insert("X-Custom".into(), Value::String("kept".into()));
292        let data = vec![Ok(Bytes::from("hello\n"))];
293        let stream = Box::pin(stream::iter(data));
294        let input = StreamSplitInput {
295            parent,
296            stream,
297            metadata: StreamMetadata {
298                content_type: Some("text/plain".into()),
299                size_hint: None,
300                origin: None,
301            },
302        };
303        let config = StreamSplitConfig::default();
304        let codec = LinesCodec;
305        let ex = codec
306            .split(input, config)
307            .next()
308            .await
309            .unwrap()
310            .expect("ok");
311        assert!(
312            ex.input.headers.get("Content-Type").is_none(),
313            "Content-Type should be excluded"
314        );
315        assert!(
316            ex.input.headers.get("Content-Length").is_none(),
317            "Content-Length should be excluded"
318        );
319        assert_eq!(
320            ex.input.headers.get("X-Custom"),
321            Some(&Value::String("kept".into()))
322        );
323    }
324
325    #[tokio::test]
326    async fn test_lines_include_origin_false() {
327        let input = make_lines_input(vec!["hello"]);
328        let config = StreamSplitConfig {
329            include_origin: false,
330            ..Default::default()
331        };
332        let codec = LinesCodec;
333        let ex = codec
334            .split(input, config)
335            .next()
336            .await
337            .unwrap()
338            .expect("ok");
339        assert!(
340            ex.property(CAMEL_STREAM_ORIGIN).is_none(),
341            "Origin should not be set when include_origin=false"
342        );
343    }
344
345    #[tokio::test]
346    async fn test_lines_batch_size_two() {
347        let input = make_lines_input(vec!["a", "b", "c", "d"]);
348        let config = StreamSplitConfig {
349            batch_size: 2,
350            ..Default::default()
351        };
352        let codec = LinesCodec;
353        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
354        assert_eq!(fragments.len(), 2);
355        for frag in &fragments {
356            let ex = frag.as_ref().expect("should be ok");
357            assert!(
358                matches!(&ex.input.body, Body::Text(s) if s.len() > 0),
359                "batch should contain lines"
360            );
361        }
362    }
363
364    #[tokio::test]
365    async fn test_lines_non_utf8_returns_error() {
366        // Invalid UTF-8 bytes
367        let data = vec![Ok(Bytes::from(b"\xff\xfe\n".as_slice()))];
368        let stream = Box::pin(stream::iter(data));
369        let parent = Exchange::new(Message::new(Body::Empty));
370        let input = StreamSplitInput {
371            parent,
372            stream,
373            metadata: StreamMetadata {
374                content_type: Some("text/plain".into()),
375                size_hint: None,
376                origin: None,
377            },
378        };
379        let config = StreamSplitConfig::default();
380        let codec = LinesCodec;
381        let result = codec.split(input, config).next().await.unwrap();
382        assert!(result.is_err());
383    }
384}