use crate::stream_codec::{
CAMEL_STREAM_BATCH_SIZE, CAMEL_STREAM_OFFSET, CAMEL_STREAM_ORIGIN,
CAMEL_STREAM_SOURCE_CONTENT_TYPE, StreamSplitCodec, StreamSplitInput, fragment_stream_exchange,
};
use bytes::BytesMut;
use camel_api::{Body, CamelError, Exchange, StreamSplitConfig, Value};
use futures::{Stream, StreamExt};
use std::pin::Pin;
pub struct LinesCodec;
impl StreamSplitCodec for LinesCodec {
fn split(
&self,
input: StreamSplitInput,
config: StreamSplitConfig,
) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>> {
let StreamSplitInput {
parent,
stream,
metadata,
} = input;
let origin = metadata.origin;
let content_type = metadata.content_type;
Box::pin(async_stream::try_stream! {
let mut buffer = BytesMut::new();
let mut offset = 0u64;
let mut batch: Vec<String> = Vec::new();
let mut stream = stream;
macro_rules! flush_batch {
() => {
if !batch.is_empty() {
let lines = std::mem::take(&mut batch);
let batch_offset = offset - (lines.len() as u64);
let body = if lines.len() == 1 {
Body::Text(lines.into_iter().next().unwrap()) } else {
Body::Text(lines.join("\n"))
};
let mut ex = fragment_stream_exchange(&parent, body);
ex.set_property(CAMEL_STREAM_OFFSET, Value::from(batch_offset as i64));
if let Some(ref ct) = content_type {
ex.set_property(CAMEL_STREAM_SOURCE_CONTENT_TYPE, Value::String(ct.clone()));
}
if config.include_origin {
if let Some(ref o) = origin {
ex.set_property(CAMEL_STREAM_ORIGIN, Value::String(o.clone()));
}
}
if batch_offset != offset {
ex.set_property(CAMEL_STREAM_BATCH_SIZE, Value::from((offset - batch_offset) as i64));
}
yield ex;
}
};
}
while let Some(chunk) = stream.next().await {
let chunk = chunk?;
buffer.extend_from_slice(&chunk);
loop {
if buffer.len() > config.max_record_bytes {
if !buffer.contains(&b'\n') {
flush_batch!();
Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
}
}
let newline_pos = match buffer.iter().position(|&b| b == b'\n') {
Some(pos) => pos,
None => break, };
let line_bytes = buffer.split_to(newline_pos + 1); let line = &line_bytes[..newline_pos];
if line.len() > config.max_record_bytes {
flush_batch!();
Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
}
let trimmed = match std::str::from_utf8(line) {
Ok(s) => s.trim(),
Err(_) => {
flush_batch!();
Err(CamelError::TypeConversionFailed(
"Line is not valid UTF-8".into(),
))?;
unreachable!();
}
};
if trimmed.is_empty() {
continue;
}
batch.push(trimmed.to_string());
offset += 1;
if batch.len() >= config.batch_size {
flush_batch!();
}
}
}
if !buffer.is_empty() {
let line = std::mem::take(&mut buffer);
let trimmed = match std::str::from_utf8(&line) {
Ok(s) => s.trim(),
Err(_) => {
flush_batch!();
Err(CamelError::TypeConversionFailed(
"Line is not valid UTF-8".into(),
))?;
unreachable!();
}
};
if !trimmed.is_empty() {
if line.len() > config.max_record_bytes {
flush_batch!();
Err(CamelError::StreamLimitExceeded(config.max_record_bytes))?;
}
batch.push(trimmed.to_string());
offset += 1;
if batch.len() >= config.batch_size {
flush_batch!();
}
}
}
flush_batch!();
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use camel_api::{Message, StreamMetadata};
use futures::stream;
fn make_lines_input(lines: Vec<&str>) -> StreamSplitInput {
let data = lines
.iter()
.map(|l| Ok(Bytes::from(format!("{}\n", l))))
.collect::<Vec<_>>();
let stream = Box::pin(stream::iter(data));
let parent = Exchange::new(Message::new(Body::Empty));
StreamSplitInput {
parent,
stream,
metadata: StreamMetadata {
content_type: Some("text/plain".into()),
size_hint: None,
origin: Some("test://lines".into()),
},
}
}
#[tokio::test]
async fn test_lines_splits_three_lines() {
let input = make_lines_input(vec!["hello", "world", "foo"]);
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
assert_eq!(fragments.len(), 3);
let bodies: Vec<_> = fragments.into_iter().map(|r| r.expect("ok")).collect();
assert!(matches!(&bodies[0].input.body, Body::Text(s) if s == "hello"));
assert!(matches!(&bodies[1].input.body, Body::Text(s) if s == "world"));
assert!(matches!(&bodies[2].input.body, Body::Text(s) if s == "foo"));
}
#[tokio::test]
async fn test_lines_empty_stream() {
let input = make_lines_input(vec![]);
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let count = codec.split(input, config).count().await;
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_lines_empty_lines_skipped() {
let input = make_lines_input(vec!["a", "", "b"]);
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let count = codec.split(input, config).count().await;
assert_eq!(count, 2);
}
#[tokio::test]
async fn test_lines_trailing_newline_no_extra_fragment() {
let data = vec![Ok(Bytes::from("hello\nworld\n"))];
let stream = Box::pin(stream::iter(data));
let parent = Exchange::new(Message::new(Body::Empty));
let input = StreamSplitInput {
parent,
stream,
metadata: StreamMetadata {
content_type: Some("text/plain".into()),
size_hint: None,
origin: None,
},
};
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let count = codec.split(input, config).count().await;
assert_eq!(count, 2);
}
#[tokio::test]
async fn test_lines_exceeds_max_record_bytes() {
let long_line = "x".repeat(2000);
let input = make_lines_input(vec![&long_line]);
let config = StreamSplitConfig {
max_record_bytes: 100,
..Default::default()
};
let codec = LinesCodec;
let result = codec.split(input, config).next().await.unwrap();
assert!(result.is_err());
}
#[tokio::test]
async fn test_lines_sets_stream_properties() {
let input = make_lines_input(vec!["hello"]);
let config = StreamSplitConfig {
include_origin: true,
..Default::default()
};
let codec = LinesCodec;
let ex = codec
.split(input, config)
.next()
.await
.unwrap()
.expect("ok");
assert_eq!(
ex.property(CAMEL_STREAM_ORIGIN),
Some(&Value::String("test://lines".into()))
);
}
#[tokio::test]
async fn test_lines_sets_offset_property() {
let input = make_lines_input(vec!["alpha", "beta", "gamma"]);
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
assert_eq!(fragments.len(), 3);
for (i, frag) in fragments.iter().enumerate() {
let ex = frag.as_ref().expect("should be ok");
assert_eq!(
ex.property(CAMEL_STREAM_OFFSET),
Some(&Value::from(i as i64))
);
}
}
#[tokio::test]
async fn test_lines_headers_excluded() {
let mut parent = Exchange::new(Message::new(Body::Empty));
parent
.input
.headers
.insert("Content-Type".into(), Value::String("text/plain".into()));
parent
.input
.headers
.insert("Content-Length".into(), Value::String("42".into()));
parent
.input
.headers
.insert("X-Custom".into(), Value::String("kept".into()));
let data = vec![Ok(Bytes::from("hello\n"))];
let stream = Box::pin(stream::iter(data));
let input = StreamSplitInput {
parent,
stream,
metadata: StreamMetadata {
content_type: Some("text/plain".into()),
size_hint: None,
origin: None,
},
};
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let ex = codec
.split(input, config)
.next()
.await
.unwrap()
.expect("ok");
assert!(
ex.input.headers.get("Content-Type").is_none(),
"Content-Type should be excluded"
);
assert!(
ex.input.headers.get("Content-Length").is_none(),
"Content-Length should be excluded"
);
assert_eq!(
ex.input.headers.get("X-Custom"),
Some(&Value::String("kept".into()))
);
}
#[tokio::test]
async fn test_lines_include_origin_false() {
let input = make_lines_input(vec!["hello"]);
let config = StreamSplitConfig {
include_origin: false,
..Default::default()
};
let codec = LinesCodec;
let ex = codec
.split(input, config)
.next()
.await
.unwrap()
.expect("ok");
assert!(
ex.property(CAMEL_STREAM_ORIGIN).is_none(),
"Origin should not be set when include_origin=false"
);
}
#[tokio::test]
async fn test_lines_batch_size_two() {
let input = make_lines_input(vec!["a", "b", "c", "d"]);
let config = StreamSplitConfig {
batch_size: 2,
..Default::default()
};
let codec = LinesCodec;
let fragments: Vec<_> = codec.split(input, config).collect::<Vec<_>>().await;
assert_eq!(fragments.len(), 2);
for frag in &fragments {
let ex = frag.as_ref().expect("should be ok");
assert!(
matches!(&ex.input.body, Body::Text(s) if s.len() > 0),
"batch should contain lines"
);
}
}
#[tokio::test]
async fn test_lines_non_utf8_returns_error() {
let data = vec![Ok(Bytes::from(b"\xff\xfe\n".as_slice()))];
let stream = Box::pin(stream::iter(data));
let parent = Exchange::new(Message::new(Body::Empty));
let input = StreamSplitInput {
parent,
stream,
metadata: StreamMetadata {
content_type: Some("text/plain".into()),
size_hint: None,
origin: None,
},
};
let config = StreamSplitConfig::default();
let codec = LinesCodec;
let result = codec.split(input, config).next().await.unwrap();
assert!(result.is_err());
}
}