Skip to main content

camel_processor/stream_codec/
ndjson.rs

1use crate::stream_codec::{
2    CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
3    CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
4};
5use bytes::BytesMut;
6use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
7use futures::{Stream, StreamExt};
8use std::pin::Pin;
9
10pub struct NdjsonCodec;
11
12impl StreamSplitCodec for NdjsonCodec {
13    fn split(
14        &self,
15        input: StreamSplitInput,
16        config: StreamSplitConfig,
17    ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
18        let StreamSplitInput {
19            parent,
20            stream,
21            metadata,
22        } = input;
23        let origin = metadata.origin;
24        let content_type = metadata.content_type;
25
26        Box::pin(async_stream::try_stream! {
27            let mut buffer = BytesMut::new();
28            let mut offset = 0u64;
29            let mut batch: Vec<serde_json::Value> = Vec::new();
30            let mut stream = stream;
31
32            // Macro to flush the current batch as a single exchange
33            macro_rules! flush_batch {
34                () => {
35                    if !batch.is_empty() {
36                        let values = std::mem::take(&mut batch);
37                        let batch_offset = offset - (values.len() as u64);
38                        let body = if values.len() == 1 {
39                            Body::Json(values.into_iter().next().unwrap()) // allow-unwrap: len==1 guaranteed by enclosing if
40                        } else {
41                            Body::Json(serde_json::Value::Array(values))
42                        };
43                        let mut ex = fragment_stream_exchange(&parent, body);
44                        ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
45                        if let Some(ref ct) = content_type {
46                            ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
47                        }
48                        if config.include_origin {
49                            if let Some(ref o) = origin {
50                                ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
51                            }
52                        }
53                        if batch_offset != offset {
54                            ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
55                        }
56                        yield ex;
57                    }
58                };
59            }
60
61            while let Some(chunk) = stream.next().await {
62                let chunk = chunk?;
63                buffer.extend_from_slice(&chunk);
64
65                // Process all complete lines in the buffer
66                loop {
67                    // Check for line-too-long before finding a newline
68                    if buffer.len() > config.max_record_bytes {
69                        // Check if there's actually a newline in the buffer
70                        if !buffer.contains(&b'\n') {
71                            flush_batch!();
72                            Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
73                        }
74                    }
75
76                    let newline_pos = match buffer.iter().position(|&b| b == b'\n') {
77                        Some(pos) => pos,
78                        None => break, // need more data
79                    };
80
81                    let line_bytes = buffer.split_to(newline_pos + 1); // include '\n'
82                    let line = &line_bytes[..newline_pos]; // exclude '\n'
83
84                    // Skip empty/blank lines
85                    let trimmed = match std::str::from_utf8(line) {
86                        Ok(s) => s.trim(),
87                        Err(_) => {
88                            flush_batch!();
89                            Err(CamelError::TypeConversionFailed(
90                                "NDJSON line is not valid UTF-8".into(),
91                            ))?;
92                            unreachable!();
93                        }
94                    };
95                    if trimmed.is_empty() {
96                        continue;
97                    }
98
99                    // Check max_record_bytes on the line (without newline)
100                    if line.len() > config.max_record_bytes {
101                        flush_batch!();
102                        Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
103                    }
104
105                    // Parse as JSON
106                    let value: serde_json::Value = match serde_json::from_str(trimmed) {
107                        Ok(v) => v,
108                        Err(e) => {
109                            flush_batch!();
110                            Err(CamelError::TypeConversionFailed(format!(
111                                "NDJSON parse error: {}",
112                                e
113                            )))?;
114                            unreachable!();
115                        }
116                    };
117
118                    batch.push(value);
119                    offset += 1;
120
121                    // Flush if batch is full
122                    if batch.len() >= config.batch_size {
123                        flush_batch!();
124                    }
125                }
126            }
127
128            // Handle remaining data after stream ends (last line without trailing newline)
129            if !buffer.is_empty() {
130                let line = std::mem::take(&mut buffer);
131                let trimmed = match std::str::from_utf8(&line) {
132                    Ok(s) => s.trim(),
133                    Err(_) => {
134                        flush_batch!();
135                        Err(CamelError::TypeConversionFailed(
136                            "NDJSON line is not valid UTF-8".into(),
137                        ))?;
138                        unreachable!();
139                    }
140                };
141
142                if !trimmed.is_empty() {
143                    if line.len() > config.max_record_bytes {
144                        flush_batch!();
145                        Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
146                    }
147
148                    let value: serde_json::Value = match serde_json::from_str(trimmed) {
149                        Ok(v) => v,
150                        Err(e) => {
151                            flush_batch!();
152                            Err(CamelError::TypeConversionFailed(format!(
153                                "NDJSON parse error: {}",
154                                e
155                            )))?;
156                            unreachable!();
157                        }
158                    };
159
160                    batch.push(value);
161                    offset += 1;
162
163                    if batch.len() >= config.batch_size {
164                        flush_batch!();
165                    }
166                }
167            }
168
169            // Final flush at end of stream
170            flush_batch!();
171        })
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178    use bytes::Bytes;
179    use camel_api::{Message, StreamMetadata};
180    use futures::stream;
181
182    fn make_stream_input(lines: Vec<&str>) -> StreamSplitInput {
183        let data = lines
184            .iter()
185            .map(|l| Ok(Bytes::from(format!("{}\n", l))))
186            .collect::<Vec<_>>();
187        let stream = Box::pin(stream::iter(data));
188        let parent = Exchange::new(Message::new(Body::Empty));
189        StreamSplitInput {
190            parent,
191            stream,
192            metadata: StreamMetadata {
193                content_type: Some("application/x-ndjson".into()),
194                size_hint: None,
195                origin: Some("test://stream".into()),
196            },
197        }
198    }
199
200    #[tokio::test]
201    async fn test_ndjson_splits_three_rows() {
202        let input = make_stream_input(vec![
203            r#"{"id":1,"name":"a"}"#,
204            r#"{"id":2,"name":"b"}"#,
205            r#"{"id":3,"name":"c"}"#,
206        ]);
207        let config = StreamSplitConfig::default();
208        let codec = NdjsonCodec;
209        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
210        assert_eq!(fragments.len(), 3);
211        for frag in &fragments {
212            let ex = frag.as_ref().expect("should be ok");
213            assert!(matches!(ex.input.body, Body::Json(_)));
214        }
215    }
216
217    #[tokio::test]
218    async fn test_ndjson_empty_stream() {
219        let input = make_stream_input(vec![]);
220        let config = StreamSplitConfig::default();
221        let codec = NdjsonCodec;
222        let count = codec.split(input, config).count().await;
223        assert_eq!(count, 0);
224    }
225
226    #[tokio::test]
227    async fn test_ndjson_exceeds_max_record_bytes() {
228        let long_line = format!("{{\"id\":{}}}", "x".repeat(2000));
229        let input = make_stream_input(vec![&long_line]);
230        let config = StreamSplitConfig {
231            max_record_bytes: 100,
232            ..Default::default()
233        };
234        let codec = NdjsonCodec;
235        let result = codec.split(input, config).next().await.unwrap();
236        assert!(result.is_err());
237    }
238
239    #[tokio::test]
240    async fn test_ndjson_sets_stream_properties() {
241        let input = make_stream_input(vec![r#"{"id":1}"#]);
242        let config = StreamSplitConfig {
243            include_origin: true,
244            ..Default::default()
245        };
246        let codec = NdjsonCodec;
247        let ex = codec
248            .split(input, config)
249            .next()
250            .await
251            .unwrap()
252            .expect("ok");
253        assert_eq!(
254            ex.property(CAMEL_STREAM_ORIGIN),
255            Some(&Value::String("test://stream".into()))
256        );
257    }
258
259    #[tokio::test]
260    async fn test_ndjson_invalid_json_returns_error() {
261        let input = make_stream_input(vec!["not-json"]);
262        let config = StreamSplitConfig::default();
263        let codec = NdjsonCodec;
264        let result = codec.split(input, config).next().await.unwrap();
265        assert!(result.is_err());
266    }
267
268    #[tokio::test]
269    async fn test_ndjson_empty_lines_skipped() {
270        let input = make_stream_input(vec![r#"{"id":1}"#, "", r#"{"id":2}"#]);
271        let config = StreamSplitConfig::default();
272        let codec = NdjsonCodec;
273        let count = codec.split(input, config).count().await;
274        assert_eq!(count, 2);
275    }
276
277    #[tokio::test]
278    async fn test_ndjson_headers_excluded() {
279        let mut parent = Exchange::new(Message::new(Body::Empty));
280        parent.input.headers.insert(
281            "Content-Type".into(),
282            Value::String("application/x-ndjson".into()),
283        );
284        parent
285            .input
286            .headers
287            .insert("Content-Length".into(), Value::String("42".into()));
288        parent
289            .input
290            .headers
291            .insert("X-Custom".into(), Value::String("kept".into()));
292        let data = vec![Ok(Bytes::from("{\"id\":1}\n"))];
293        let stream = Box::pin(stream::iter(data));
294        let input = StreamSplitInput {
295            parent,
296            stream,
297            metadata: StreamMetadata {
298                content_type: Some("application/x-ndjson".into()),
299                size_hint: None,
300                origin: None,
301            },
302        };
303        let config = StreamSplitConfig::default();
304        let codec = NdjsonCodec;
305        let ex = codec
306            .split(input, config)
307            .next()
308            .await
309            .unwrap()
310            .expect("ok");
311        assert!(
312            ex.input.headers.get("Content-Type").is_none(),
313            "Content-Type should be excluded"
314        );
315        assert!(
316            ex.input.headers.get("Content-Length").is_none(),
317            "Content-Length should be excluded"
318        );
319        assert_eq!(
320            ex.input.headers.get("X-Custom"),
321            Some(&Value::String("kept".into()))
322        );
323    }
324
325    #[tokio::test]
326    async fn test_ndjson_sets_offset_property() {
327        let input = make_stream_input(vec![r#"{"id":1}"#, r#"{"id":2}"#, r#"{"id":3}"#]);
328        let config = StreamSplitConfig::default();
329        let codec = NdjsonCodec;
330        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
331        assert_eq!(fragments.len(), 3);
332        for (i, frag) in fragments.iter().enumerate() {
333            let ex = frag.as_ref().expect("should be ok");
334            assert_eq!(
335                ex.property(CAMEL_STREAM_OFFSET),
336                Some(&Value::from(i as i64))
337            );
338        }
339    }
340
341    #[tokio::test]
342    async fn test_ndjson_batch_size_two() {
343        let input = make_stream_input(vec![
344            r#"{"id":1}"#,
345            r#"{"id":2}"#,
346            r#"{"id":3}"#,
347            r#"{"id":4}"#,
348        ]);
349        let config = StreamSplitConfig {
350            batch_size: 2,
351            ..Default::default()
352        };
353        let codec = NdjsonCodec;
354        let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
355        assert_eq!(fragments.len(), 2);
356        for frag in &fragments {
357            let ex = frag.as_ref().expect("should be ok");
358            assert!(
359                matches!(&ex.input.body, Body::Json(serde_json::Value::Array(arr)) if arr.len() == 2)
360            );
361        }
362    }
363
364    #[tokio::test]
365    async fn test_ndjson_include_origin_false() {
366        let input = make_stream_input(vec![r#"{"id":1}"#]);
367        let config = StreamSplitConfig {
368            include_origin: false,
369            ..Default::default()
370        };
371        let codec = NdjsonCodec;
372        let ex = codec
373            .split(input, config)
374            .next()
375            .await
376            .unwrap()
377            .expect("ok");
378        assert!(
379            ex.property(CAMEL_STREAM_ORIGIN).is_none(),
380            "Origin should not be set when include_origin=false"
381        );
382    }
383}