camel_processor/stream_codec/
lines.rs1use crate::stream_codec::{
2 CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
3 CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
4};
5use bytes::BytesMut;
6use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
7use futures::{Stream, StreamExt};
8use std::pin::Pin;
9
10pub struct LinesCodec;
11
12impl StreamSplitCodec for LinesCodec {
13 fn split(
14 &self,
15 input: StreamSplitInput,
16 config: StreamSplitConfig,
17 ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
18 let StreamSplitInput {
19 parent,
20 stream,
21 metadata,
22 } = input;
23 let origin = metadata.origin;
24 let content_type = metadata.content_type;
25
26 Box::pin(async_stream::try_stream! {
27 let mut buffer = BytesMut::new();
28 let mut offset = 0u64;
29 let mut batch: Vec<String> = Vec::new();
30 let mut stream = stream;
31
32 macro_rules! flush_batch {
34 () => {
35 if !batch.is_empty() {
36 let lines = std::mem::take(&mut batch);
37 let batch_offset = offset - (lines.len() as u64);
38 let body = if lines.len() == 1 {
39 Body::Text(lines.into_iter().next().unwrap()) } else {
41 Body::Text(lines.join("\n"))
42 };
43 let mut ex = fragment_stream_exchange(&parent, body);
44 ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
45 if let Some(ref ct) = content_type {
46 ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
47 }
48 if config.include_origin {
49 if let Some(ref o) = origin {
50 ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
51 }
52 }
53 if batch_offset != offset {
54 ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
55 }
56 yield ex;
57 }
58 };
59 }
60
61 while let Some(chunk) = stream.next().await {
62 let chunk = chunk?;
63 buffer.extend_from_slice(&chunk);
64
65 loop {
67 if buffer.len() > config.max_record_bytes {
69 if !buffer.contains(&b'\n') {
71 flush_batch!();
72 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
73 }
74 }
75
76 let newline_pos = match buffer.iter().position(|&b| b == b'\n') {
77 Some(pos) => pos,
78 None => break, };
80
81 let line_bytes = buffer.split_to(newline_pos + 1); let line = &line_bytes[..newline_pos]; if line.len() > config.max_record_bytes {
86 flush_batch!();
87 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
88 }
89
90 let trimmed = match std::str::from_utf8(line) {
92 Ok(s) => s.trim(),
93 Err(_) => {
94 flush_batch!();
95 Err(CamelError::TypeConversionFailed(
96 "Line is not valid UTF-8".into(),
97 ))?;
98 unreachable!();
99 }
100 };
101
102 if trimmed.is_empty() {
104 continue;
105 }
106
107 batch.push(trimmed.to_string());
108 offset += 1;
109
110 if batch.len() >= config.batch_size {
112 flush_batch!();
113 }
114 }
115 }
116
117 if !buffer.is_empty() {
119 let line = std::mem::take(&mut buffer);
120 let trimmed = match std::str::from_utf8(&line) {
121 Ok(s) => s.trim(),
122 Err(_) => {
123 flush_batch!();
124 Err(CamelError::TypeConversionFailed(
125 "Line is not valid UTF-8".into(),
126 ))?;
127 unreachable!();
128 }
129 };
130
131 if !trimmed.is_empty() {
132 if line.len() > config.max_record_bytes {
133 flush_batch!();
134 Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
135 }
136
137 batch.push(trimmed.to_string());
138 offset += 1;
139
140 if batch.len() >= config.batch_size {
141 flush_batch!();
142 }
143 }
144 }
145
146 flush_batch!();
148 })
149 }
150}
151
152#[cfg(test)]
153mod tests {
154 use super::*;
155 use bytes::Bytes;
156 use camel_api::{Message, StreamMetadata};
157 use futures::stream;
158
159 fn make_lines_input(lines: Vec<&str>) -> StreamSplitInput {
160 let data = lines
161 .iter()
162 .map(|l| Ok(Bytes::from(format!("{}\n", l))))
163 .collect::<Vec<_>>();
164 let stream = Box::pin(stream::iter(data));
165 let parent = Exchange::new(Message::new(Body::Empty));
166 StreamSplitInput {
167 parent,
168 stream,
169 metadata: StreamMetadata {
170 content_type: Some("text/plain".into()),
171 size_hint: None,
172 origin: Some("test://lines".into()),
173 },
174 }
175 }
176
177 #[tokio::test]
178 async fn test_lines_splits_three_lines() {
179 let input = make_lines_input(vec!["hello", "world", "foo"]);
180 let config = StreamSplitConfig::default();
181 let codec = LinesCodec;
182 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
183 assert_eq!(fragments.len(), 3);
184 let bodies: Vec<_> = fragments.into_iter().map(|r| r.expect("ok")).collect();
185 assert!(matches!(&bodies[0].input.body, Body::Text(s) if s == "hello"));
186 assert!(matches!(&bodies[1].input.body, Body::Text(s) if s == "world"));
187 assert!(matches!(&bodies[2].input.body, Body::Text(s) if s == "foo"));
188 }
189
190 #[tokio::test]
191 async fn test_lines_empty_stream() {
192 let input = make_lines_input(vec![]);
193 let config = StreamSplitConfig::default();
194 let codec = LinesCodec;
195 let count = codec.split(input, config).count().await;
196 assert_eq!(count, 0);
197 }
198
199 #[tokio::test]
200 async fn test_lines_empty_lines_skipped() {
201 let input = make_lines_input(vec!["a", "", "b"]);
202 let config = StreamSplitConfig::default();
203 let codec = LinesCodec;
204 let count = codec.split(input, config).count().await;
205 assert_eq!(count, 2);
206 }
207
208 #[tokio::test]
209 async fn test_lines_trailing_newline_no_extra_fragment() {
210 let data = vec![Ok(Bytes::from("hello\nworld\n"))];
211 let stream = Box::pin(stream::iter(data));
212 let parent = Exchange::new(Message::new(Body::Empty));
213 let input = StreamSplitInput {
214 parent,
215 stream,
216 metadata: StreamMetadata {
217 content_type: Some("text/plain".into()),
218 size_hint: None,
219 origin: None,
220 },
221 };
222 let config = StreamSplitConfig::default();
223 let codec = LinesCodec;
224 let count = codec.split(input, config).count().await;
225 assert_eq!(count, 2);
226 }
227
228 #[tokio::test]
229 async fn test_lines_exceeds_max_record_bytes() {
230 let long_line = "x".repeat(2000);
231 let input = make_lines_input(vec![&long_line]);
232 let config = StreamSplitConfig {
233 max_record_bytes: 100,
234 ..Default::default()
235 };
236 let codec = LinesCodec;
237 let result = codec.split(input, config).next().await.unwrap();
238 assert!(result.is_err());
239 }
240
241 #[tokio::test]
242 async fn test_lines_sets_stream_properties() {
243 let input = make_lines_input(vec!["hello"]);
244 let config = StreamSplitConfig {
245 include_origin: true,
246 ..Default::default()
247 };
248 let codec = LinesCodec;
249 let ex = codec
250 .split(input, config)
251 .next()
252 .await
253 .unwrap()
254 .expect("ok");
255 assert_eq!(
256 ex.property(CAMEL_STREAM_ORIGIN),
257 Some(&Value::String("test://lines".into()))
258 );
259 }
260
261 #[tokio::test]
262 async fn test_lines_sets_offset_property() {
263 let input = make_lines_input(vec!["alpha", "beta", "gamma"]);
264 let config = StreamSplitConfig::default();
265 let codec = LinesCodec;
266 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
267 assert_eq!(fragments.len(), 3);
268 for (i, frag) in fragments.iter().enumerate() {
269 let ex = frag.as_ref().expect("should be ok");
270 assert_eq!(
271 ex.property(CAMEL_STREAM_OFFSET),
272 Some(&Value::from(i as i64))
273 );
274 }
275 }
276
277 #[tokio::test]
278 async fn test_lines_headers_excluded() {
279 let mut parent = Exchange::new(Message::new(Body::Empty));
280 parent
281 .input
282 .headers
283 .insert("Content-Type".into(), Value::String("text/plain".into()));
284 parent
285 .input
286 .headers
287 .insert("Content-Length".into(), Value::String("42".into()));
288 parent
289 .input
290 .headers
291 .insert("X-Custom".into(), Value::String("kept".into()));
292 let data = vec![Ok(Bytes::from("hello\n"))];
293 let stream = Box::pin(stream::iter(data));
294 let input = StreamSplitInput {
295 parent,
296 stream,
297 metadata: StreamMetadata {
298 content_type: Some("text/plain".into()),
299 size_hint: None,
300 origin: None,
301 },
302 };
303 let config = StreamSplitConfig::default();
304 let codec = LinesCodec;
305 let ex = codec
306 .split(input, config)
307 .next()
308 .await
309 .unwrap()
310 .expect("ok");
311 assert!(
312 ex.input.headers.get("Content-Type").is_none(),
313 "Content-Type should be excluded"
314 );
315 assert!(
316 ex.input.headers.get("Content-Length").is_none(),
317 "Content-Length should be excluded"
318 );
319 assert_eq!(
320 ex.input.headers.get("X-Custom"),
321 Some(&Value::String("kept".into()))
322 );
323 }
324
325 #[tokio::test]
326 async fn test_lines_include_origin_false() {
327 let input = make_lines_input(vec!["hello"]);
328 let config = StreamSplitConfig {
329 include_origin: false,
330 ..Default::default()
331 };
332 let codec = LinesCodec;
333 let ex = codec
334 .split(input, config)
335 .next()
336 .await
337 .unwrap()
338 .expect("ok");
339 assert!(
340 ex.property(CAMEL_STREAM_ORIGIN).is_none(),
341 "Origin should not be set when include_origin=false"
342 );
343 }
344
345 #[tokio::test]
346 async fn test_lines_batch_size_two() {
347 let input = make_lines_input(vec!["a", "b", "c", "d"]);
348 let config = StreamSplitConfig {
349 batch_size: 2,
350 ..Default::default()
351 };
352 let codec = LinesCodec;
353 let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
354 assert_eq!(fragments.len(), 2);
355 for frag in &fragments {
356 let ex = frag.as_ref().expect("should be ok");
357 assert!(
358 matches!(&ex.input.body, Body::Text(s) if s.len() > 0),
359 "batch should contain lines"
360 );
361 }
362 }
363
364 #[tokio::test]
365 async fn test_lines_non_utf8_returns_error() {
366 let data = vec![Ok(Bytes::from(b"\xff\xfe\n".as_slice()))];
368 let stream = Box::pin(stream::iter(data));
369 let parent = Exchange::new(Message::new(Body::Empty));
370 let input = StreamSplitInput {
371 parent,
372 stream,
373 metadata: StreamMetadata {
374 content_type: Some("text/plain".into()),
375 size_hint: None,
376 origin: None,
377 },
378 };
379 let config = StreamSplitConfig::default();
380 let codec = LinesCodec;
381 let result = codec.split(input, config).next().await.unwrap();
382 assert!(result.is_err());
383 }
384}