camel_processor/
stream_codec.rs1use bytes::Bytes;
2use camel_api::{
3 Body, CamelError, Exchange, StreamMetadata, StreamSplitConfig, StreamSplitFormat,
4 fragment_exchange,
5};
6use futures::Stream;
7use std::pin::Pin;
8
9pub mod chunks;
10pub mod lines;
11pub mod ndjson;
12
13pub const CAMEL_STREAM_ORIGIN: &str = "CamelStreamOrigin";
14pub const CAMEL_STREAM_SOURCE_CONTENT_TYPE: &str = "CamelStreamSourceContentType";
15pub const CAMEL_STREAM_OFFSET: &str = "CamelStreamOffset";
16pub const CAMEL_STREAM_BATCH_SIZE: &str = "CamelStreamBatchSize";
17
18pub struct StreamSplitInput {
19 pub parent: Exchange,
20 pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, CamelError>> + Send>>,
21 pub metadata: StreamMetadata,
22}
23
24pub trait StreamSplitCodec: Send + Sync {
25 fn split(
26 &self,
27 input: StreamSplitInput,
28 config: StreamSplitConfig,
29 ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>;
30}
31
32pub fn resolve_format(
33 format: &StreamSplitFormat,
34 metadata: &StreamMetadata,
35) -> Result<StreamSplitFormat, CamelError> {
36 match format {
37 StreamSplitFormat::Auto => {
38 let ct = metadata
39 .content_type
40 .as_deref()
41 .unwrap_or("")
42 .to_lowercase();
43 let ct = ct.split(';').next().unwrap_or("").trim();
44 match ct {
45 "application/x-ndjson" => Ok(StreamSplitFormat::Ndjson),
46 "text/plain" => Ok(StreamSplitFormat::Lines),
47 "application/octet-stream" => Ok(StreamSplitFormat::Chunks),
48 "application/zip" | "application/x-zip-compressed" => Err(CamelError::Config(
49 "stream split format=Auto: ZIP archives require explicit stream.format: zip"
50 .into(),
51 )),
52 "" => Err(CamelError::Config(
53 "stream split format=Auto but stream has no content_type".into(),
54 )),
55 other => Err(CamelError::Config(format!(
56 "stream split format=Auto: unknown content type '{}'",
57 other
58 ))),
59 }
60 }
61 other => Ok(other.clone()),
62 }
63}
64
65#[derive(Debug)]
66pub enum ArchiveSplitKind {
67 Zip,
68}
69
70pub enum ResolvedStreamSplit {
71 Incremental(Box<dyn StreamSplitCodec>),
72 MaterializedArchive(ArchiveSplitKind),
73}
74
75pub fn resolve_incremental_codec(
76 format: &StreamSplitFormat,
77) -> Result<Box<dyn StreamSplitCodec>, CamelError> {
78 match format {
79 StreamSplitFormat::Ndjson => Ok(Box::new(ndjson::NdjsonCodec)),
80 StreamSplitFormat::Lines => Ok(Box::new(lines::LinesCodec)),
81 StreamSplitFormat::Chunks => Ok(Box::new(chunks::ChunksCodec)),
82 StreamSplitFormat::Zip => Err(CamelError::Config(
83 "Zip is a materialized archive format, not an incremental codec".into(),
84 )),
85 StreamSplitFormat::Auto => Err(CamelError::Config(
86 "resolve_incremental_codec requires a resolved format, not Auto".into(),
87 )),
88 }
89}
90
91pub fn resolve_split(
92 format: &StreamSplitFormat,
93 metadata: &StreamMetadata,
94) -> Result<ResolvedStreamSplit, CamelError> {
95 let resolved = resolve_format(format, metadata)?;
96 match resolved {
97 StreamSplitFormat::Zip => Ok(ResolvedStreamSplit::MaterializedArchive(
98 ArchiveSplitKind::Zip,
99 )),
100 _ => Ok(ResolvedStreamSplit::Incremental(resolve_incremental_codec(
101 &resolved,
102 )?)),
103 }
104}
105
106pub fn fragment_stream_exchange(parent: &Exchange, body: Body) -> Exchange {
107 let mut ex = fragment_exchange(parent, body);
108 ex.input.headers.remove("Content-Length");
109 ex.input.headers.remove("Content-Type");
110 ex
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use camel_api::StreamMetadata;
117
118 fn default_metadata() -> StreamMetadata {
119 StreamMetadata::default()
120 }
121
122 #[test]
123 fn test_resolve_split_ndjson_is_incremental() {
124 let result = resolve_split(&StreamSplitFormat::Ndjson, &default_metadata()).unwrap();
125 assert!(matches!(result, ResolvedStreamSplit::Incremental(_)));
126 }
127
128 #[test]
129 fn test_resolve_split_zip_is_materialized() {
130 let result = resolve_split(&StreamSplitFormat::Zip, &default_metadata()).unwrap();
131 assert!(matches!(
132 result,
133 ResolvedStreamSplit::MaterializedArchive(ArchiveSplitKind::Zip)
134 ));
135 }
136
137 #[test]
138 fn test_resolve_incremental_codec_returns_codec_for_ndjson() {
139 let result = resolve_incremental_codec(&StreamSplitFormat::Ndjson);
140 assert!(result.is_ok());
141 }
142
143 #[test]
144 fn test_resolve_incremental_codec_rejects_zip() {
145 let result = resolve_incremental_codec(&StreamSplitFormat::Zip);
146 assert!(result.is_err());
147 }
148
149 #[test]
150 fn test_auto_application_zip_suggests_explicit_format() {
151 let meta = StreamMetadata {
152 content_type: Some("application/zip".to_string()),
153 ..Default::default()
154 };
155 let result = resolve_split(&StreamSplitFormat::Auto, &meta);
156 let msg = match result {
157 Err(e) => e.to_string(),
158 Ok(_) => panic!("expected Err for Auto + application/zip"),
159 };
160 assert!(
161 msg.contains("format: zip"),
162 "expected suggestion in error, got: {msg}"
163 );
164 }
165}