camel_processor/stream_codec/
ndjson.rs1use crate::stream_codec::{
2 CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
3 CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
4};
5use bytes::BytesMut;
6use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
7use futures::{Stream, StreamExt};
8use std::pin::Pin;
9
10pub struct NdjsonCodec;
11
12impl StreamSplitCodec for NdjsonCodec {
13 fn split(
14 &self,
15 input: StreamSplitInput,
16 config: StreamSplitConfig,
17 ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
18 let StreamSplitInput {
19 parent,
20 stream,
21 metadata,
22 } = input;
23 let origin = metadata.origin;
24 let content_type = metadata.content_type;
25
26 Box::pin(async_stream::try_stream! {
27 let mut buffer = BytesMut::new();
28 let mut offset = 0u64;
29 let mut batch: Vec<serde_json::Value> = Vec::new();
30 let mut stream = stream;
31
32 macro_rules! flush_batch {
34 () => {
35 if !batch.is_empty() {
36 let values = std::mem::take(&mut batch);
37 let batch_offset = offset - (values.len() as u64);
38 let body = if values.len() == 1 {
39 Body::Json(values.into_iter().next().unwrap()) } else {
41 Body::Json(serde_json::Value::Array(values))
42 };
43 let mut ex = fragment_stream_exchange(&parent, body);
44 ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
45 if let Some(ref ct) = content_type {
46 ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
47 }
48 if config.include_origin {
49 if let Some(ref o) = origin {
50 ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
51 }
52 }
53 if batch_offset != offset {
54 ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
55 }
56 yield ex;
57 }
58 };
59 }
60
61 while let Some(chunk) = stream.next().await {
62 let chunk = chunk?;
63 buffer.extend_from_slice(&chunk);
64
65 loop {
67 if buffer.len() > config.max_record_bytes {
69 if !buffer.contains(&b'\n') {
71 flush_batch!();
72 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
73 }
74 }
75
76 let newline_pos = match buffer.iter().position(|&b| b == b'\n') {
77 Some(pos) => pos,
78 None => break, };
80
81 let line_bytes = buffer.split_to(newline_pos + 1); let line = &line_bytes[..newline_pos]; let trimmed = match std::str::from_utf8(line) {
86 Ok(s) => s.trim(),
87 Err(_) => {
88 flush_batch!();
89 Err(CamelError::TypeConversionFailed(
90 "NDJSON line is not valid UTF-8".into(),
91 ))?;
92 unreachable!();
93 }
94 };
95 if trimmed.is_empty() {
96 continue;
97 }
98
99 if line.len() > config.max_record_bytes {
101 flush_batch!();
102 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
103 }
104
105 let value: serde_json::Value = match serde_json::from_str(trimmed) {
107 Ok(v) => v,
108 Err(e) => {
109 flush_batch!();
110 Err(CamelError::TypeConversionFailed(format!(
111 "NDJSON parse error: {}",
112 e
113 )))?;
114 unreachable!();
115 }
116 };
117
118 batch.push(value);
119 offset += 1;
120
121 if batch.len() >= config.batch_size {
123 flush_batch!();
124 }
125 }
126 }
127
128 if !buffer.is_empty() {
130 let line = std::mem::take(&mut buffer);
131 let trimmed = match std::str::from_utf8(&line) {
132 Ok(s) => s.trim(),
133 Err(_) => {
134 flush_batch!();
135 Err(CamelError::TypeConversionFailed(
136 "NDJSON line is not valid UTF-8".into(),
137 ))?;
138 unreachable!();
139 }
140 };
141
142 if !trimmed.is_empty() {
143 if line.len() > config.max_record_bytes {
144 flush_batch!();
145 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
146 }
147
148 let value: serde_json::Value = match serde_json::from_str(trimmed) {
149 Ok(v) => v,
150 Err(e) => {
151 flush_batch!();
152 Err(CamelError::TypeConversionFailed(format!(
153 "NDJSON parse error: {}",
154 e
155 )))?;
156 unreachable!();
157 }
158 };
159
160 batch.push(value);
161 offset += 1;
162
163 if batch.len() >= config.batch_size {
164 flush_batch!();
165 }
166 }
167 }
168
169 flush_batch!();
171 })
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178 use bytes::Bytes;
179 use camel_api::{Message, StreamMetadata};
180 use futures::stream;
181
182 fn make_stream_input(lines: Vec<&str>) -> StreamSplitInput {
183 let data = lines
184 .iter()
185 .map(|l| Ok(Bytes::from(format!("{}\n", l))))
186 .collect::<Vec<_>>();
187 let stream = Box::pin(stream::iter(data));
188 let parent = Exchange::new(Message::new(Body::Empty));
189 StreamSplitInput {
190 parent,
191 stream,
192 metadata: StreamMetadata {
193 content_type: Some("application/x-ndjson".into()),
194 size_hint: None,
195 origin: Some("test://stream".into()),
196 },
197 }
198 }
199
200 #[tokio::test]
201 async fn test_ndjson_splits_three_rows() {
202 let input = make_stream_input(vec![
203 r#"{"id":1,"name":"a"}"#,
204 r#"{"id":2,"name":"b"}"#,
205 r#"{"id":3,"name":"c"}"#,
206 ]);
207 let config = StreamSplitConfig::default();
208 let codec = NdjsonCodec;
209 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
210 assert_eq!(fragments.len(), 3);
211 for frag in &fragments {
212 let ex = frag.as_ref().expect("should be ok");
213 assert!(matches!(ex.input.body, Body::Json(_)));
214 }
215 }
216
217 #[tokio::test]
218 async fn test_ndjson_empty_stream() {
219 let input = make_stream_input(vec![]);
220 let config = StreamSplitConfig::default();
221 let codec = NdjsonCodec;
222 let count = codec.split(input, config).count().await;
223 assert_eq!(count, 0);
224 }
225
226 #[tokio::test]
227 async fn test_ndjson_exceeds_max_record_bytes() {
228 let long_line = format!("{{\"id\":{}}}", "x".repeat(2000));
229 let input = make_stream_input(vec![&long_line]);
230 let config = StreamSplitConfig {
231 max_record_bytes: 100,
232 ..Default::default()
233 };
234 let codec = NdjsonCodec;
235 let result = codec.split(input, config).next().await.unwrap();
236 assert!(result.is_err());
237 }
238
239 #[tokio::test]
240 async fn test_ndjson_sets_stream_properties() {
241 let input = make_stream_input(vec![r#"{"id":1}"#]);
242 let config = StreamSplitConfig {
243 include_origin: true,
244 ..Default::default()
245 };
246 let codec = NdjsonCodec;
247 let ex = codec
248 .split(input, config)
249 .next()
250 .await
251 .unwrap()
252 .expect("ok");
253 assert_eq!(
254 ex.property(CAMEL_STREAM_ORIGIN),
255 Some(&Value::String("test://stream".into()))
256 );
257 }
258
259 #[tokio::test]
260 async fn test_ndjson_invalid_json_returns_error() {
261 let input = make_stream_input(vec!["not-json"]);
262 let config = StreamSplitConfig::default();
263 let codec = NdjsonCodec;
264 let result = codec.split(input, config).next().await.unwrap();
265 assert!(result.is_err());
266 }
267
268 #[tokio::test]
269 async fn test_ndjson_empty_lines_skipped() {
270 let input = make_stream_input(vec![r#"{"id":1}"#, "", r#"{"id":2}"#]);
271 let config = StreamSplitConfig::default();
272 let codec = NdjsonCodec;
273 let count = codec.split(input, config).count().await;
274 assert_eq!(count, 2);
275 }
276
277 #[tokio::test]
278 async fn test_ndjson_headers_excluded() {
279 let mut parent = Exchange::new(Message::new(Body::Empty));
280 parent.input.headers.insert(
281 "Content-Type".into(),
282 Value::String("application/x-ndjson".into()),
283 );
284 parent
285 .input
286 .headers
287 .insert("Content-Length".into(), Value::String("42".into()));
288 parent
289 .input
290 .headers
291 .insert("X-Custom".into(), Value::String("kept".into()));
292 let data = vec![Ok(Bytes::from("{\"id\":1}\n"))];
293 let stream = Box::pin(stream::iter(data));
294 let input = StreamSplitInput {
295 parent,
296 stream,
297 metadata: StreamMetadata {
298 content_type: Some("application/x-ndjson".into()),
299 size_hint: None,
300 origin: None,
301 },
302 };
303 let config = StreamSplitConfig::default();
304 let codec = NdjsonCodec;
305 let ex = codec
306 .split(input, config)
307 .next()
308 .await
309 .unwrap()
310 .expect("ok");
311 assert!(
312 ex.input.headers.get("Content-Type").is_none(),
313 "Content-Type should be excluded"
314 );
315 assert!(
316 ex.input.headers.get("Content-Length").is_none(),
317 "Content-Length should be excluded"
318 );
319 assert_eq!(
320 ex.input.headers.get("X-Custom"),
321 Some(&Value::String("kept".into()))
322 );
323 }
324
325 #[tokio::test]
326 async fn test_ndjson_sets_offset_property() {
327 let input = make_stream_input(vec![r#"{"id":1}"#, r#"{"id":2}"#, r#"{"id":3}"#]);
328 let config = StreamSplitConfig::default();
329 let codec = NdjsonCodec;
330 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
331 assert_eq!(fragments.len(), 3);
332 for (i, frag) in fragments.iter().enumerate() {
333 let ex = frag.as_ref().expect("should be ok");
334 assert_eq!(
335 ex.property(CAMEL_STREAM_OFFSET),
336 Some(&Value::from(i as i64))
337 );
338 }
339 }
340
341 #[tokio::test]
342 async fn test_ndjson_batch_size_two() {
343 let input = make_stream_input(vec![
344 r#"{"id":1}"#,
345 r#"{"id":2}"#,
346 r#"{"id":3}"#,
347 r#"{"id":4}"#,
348 ]);
349 let config = StreamSplitConfig {
350 batch_size: 2,
351 ..Default::default()
352 };
353 let codec = NdjsonCodec;
354 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
355 assert_eq!(fragments.len(), 2);
356 for frag in &fragments {
357 let ex = frag.as_ref().expect("should be ok");
358 assert!(
359 matches!(&ex.input.body, Body::Json(serde_json::Value::Array(arr)) if arr.len() == 2)
360 );
361 }
362 }
363
364 #[tokio::test]
365 async fn test_ndjson_include_origin_false() {
366 let input = make_stream_input(vec![r#"{"id":1}"#]);
367 let config = StreamSplitConfig {
368 include_origin: false,
369 ..Default::default()
370 };
371 let codec = NdjsonCodec;
372 let ex = codec
373 .split(input, config)
374 .next()
375 .await
376 .unwrap()
377 .expect("ok");
378 assert!(
379 ex.property(CAMEL_STREAM_ORIGIN).is_none(),
380 "Origin should not be set when include_origin=false"
381 );
382 }
383}