Skip to main content

camel_processor/
stream_codec.rs

1use bytes::Bytes;
2use camel_api::{
3    Body, CamelError, Exchange, StreamMetadata, StreamSplitConfig, StreamSplitFormat,
4    fragment_exchange,
5};
6use futures::Stream;
7use std::pin::Pin;
8
9pub mod chunks;
10pub mod lines;
11pub mod ndjson;
12
13pub const CAMEL_STREAM_ORIGIN: &str = "CamelStreamOrigin";
14pub const CAMEL_STREAM_SOURCE_CONTENT_TYPE: &str = "CamelStreamSourceContentType";
15pub const CAMEL_STREAM_OFFSET: &str = "CamelStreamOffset";
16pub const CAMEL_STREAM_BATCH_SIZE: &str = "CamelStreamBatchSize";
17
18pub struct StreamSplitInput {
19    pub parent: Exchange,
20    pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, CamelError>> + Send>>,
21    pub metadata: StreamMetadata,
22}
23
24pub trait StreamSplitCodec: Send + Sync {
25    fn split(
26        &self,
27        input: StreamSplitInput,
28        config: StreamSplitConfig,
29    ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>;
30}
31
32pub fn resolve_format(
33    format: &StreamSplitFormat,
34    metadata: &StreamMetadata,
35) -> Result<StreamSplitFormat, CamelError> {
36    match format {
37        StreamSplitFormat::Auto => {
38            let ct = metadata
39                .content_type
40                .as_deref()
41                .unwrap_or("")
42                .to_lowercase();
43            let ct = ct.split(';').next().unwrap_or("").trim();
44            match ct {
45                "application/x-ndjson" => Ok(StreamSplitFormat::Ndjson),
46                "text/plain" => Ok(StreamSplitFormat::Lines),
47                "application/octet-stream" => Ok(StreamSplitFormat::Chunks),
48                "application/zip" | "application/x-zip-compressed" => Err(CamelError::Config(
49                    "stream split format=Auto: ZIP archives require explicit stream.format: zip"
50                        .into(),
51                )),
52                "" => Err(CamelError::Config(
53                    "stream split format=Auto but stream has no content_type".into(),
54                )),
55                other => Err(CamelError::Config(format!(
56                    "stream split format=Auto: unknown content type '{}'",
57                    other
58                ))),
59            }
60        }
61        other => Ok(other.clone()),
62    }
63}
64
65#[derive(Debug)]
66pub enum ArchiveSplitKind {
67    Zip,
68}
69
70pub enum ResolvedStreamSplit {
71    Incremental(Box<dyn StreamSplitCodec>),
72    MaterializedArchive(ArchiveSplitKind),
73}
74
75pub fn resolve_incremental_codec(
76    format: &StreamSplitFormat,
77) -> Result<Box<dyn StreamSplitCodec>, CamelError> {
78    match format {
79        StreamSplitFormat::Ndjson => Ok(Box::new(ndjson::NdjsonCodec)),
80        StreamSplitFormat::Lines => Ok(Box::new(lines::LinesCodec)),
81        StreamSplitFormat::Chunks => Ok(Box::new(chunks::ChunksCodec)),
82        StreamSplitFormat::Zip => Err(CamelError::Config(
83            "Zip is a materialized archive format, not an incremental codec".into(),
84        )),
85        StreamSplitFormat::Auto => Err(CamelError::Config(
86            "resolve_incremental_codec requires a resolved format, not Auto".into(),
87        )),
88    }
89}
90
91pub fn resolve_split(
92    format: &StreamSplitFormat,
93    metadata: &StreamMetadata,
94) -> Result<ResolvedStreamSplit, CamelError> {
95    let resolved = resolve_format(format, metadata)?;
96    match resolved {
97        StreamSplitFormat::Zip => Ok(ResolvedStreamSplit::MaterializedArchive(
98            ArchiveSplitKind::Zip,
99        )),
100        _ => Ok(ResolvedStreamSplit::Incremental(resolve_incremental_codec(
101            &resolved,
102        )?)),
103    }
104}
105
106pub fn fragment_stream_exchange(parent: &Exchange, body: Body) -> Exchange {
107    let mut ex = fragment_exchange(parent, body);
108    ex.input.headers.remove("Content-Length");
109    ex.input.headers.remove("Content-Type");
110    ex
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use camel_api::StreamMetadata;
117
118    fn default_metadata() -> StreamMetadata {
119        StreamMetadata::default()
120    }
121
122    #[test]
123    fn test_resolve_split_ndjson_is_incremental() {
124        let result = resolve_split(&StreamSplitFormat::Ndjson, &default_metadata()).unwrap();
125        assert!(matches!(result, ResolvedStreamSplit::Incremental(_)));
126    }
127
128    #[test]
129    fn test_resolve_split_zip_is_materialized() {
130        let result = resolve_split(&StreamSplitFormat::Zip, &default_metadata()).unwrap();
131        assert!(matches!(
132            result,
133            ResolvedStreamSplit::MaterializedArchive(ArchiveSplitKind::Zip)
134        ));
135    }
136
137    #[test]
138    fn test_resolve_incremental_codec_returns_codec_for_ndjson() {
139        let result = resolve_incremental_codec(&StreamSplitFormat::Ndjson);
140        assert!(result.is_ok());
141    }
142
143    #[test]
144    fn test_resolve_incremental_codec_rejects_zip() {
145        let result = resolve_incremental_codec(&StreamSplitFormat::Zip);
146        assert!(result.is_err());
147    }
148
149    #[test]
150    fn test_auto_application_zip_suggests_explicit_format() {
151        let meta = StreamMetadata {
152            content_type: Some("application/zip".to_string()),
153            ..Default::default()
154        };
155        let result = resolve_split(&StreamSplitFormat::Auto, &meta);
156        let msg = match result {
157            Err(e) => e.to_string(),
158            Ok(_) => panic!("expected Err for Auto + application/zip"),
159        };
160        assert!(
161            msg.contains("format: zip"),
162            "expected suggestion in error, got: {msg}"
163        );
164    }
165}