camel_processor/
stream_codec.rs1use bytes::Bytes;
2use camel_api::{
3 Body, CamelError, Exchange, StreamMetadata, StreamSplitConfig, StreamSplitFormat,
4 fragment_exchange,
5};
6use futures::Stream;
7use std::pin::Pin;
8
9pub mod chunks;
10pub mod lines;
11pub mod ndjson;
12
13pub const CAMEL_STREAM_ORIGIN: &str = "CamelStreamOrigin";
14pub const CAMEL_STREAM_SOURCE_CONTENT_TYPE: &str = "CamelStreamSourceContentType";
15pub const CAMEL_STREAM_OFFSET: &str = "CamelStreamOffset";
16pub const CAMEL_STREAM_BATCH_SIZE: &str = "CamelStreamBatchSize";
17
18pub struct StreamSplitInput {
19 pub parent: Exchange,
20 pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, CamelError>> + Send>>,
21 pub metadata: StreamMetadata,
22}
23
24pub trait StreamSplitCodec: Send + Sync {
25 fn split(
26 &self,
27 input: StreamSplitInput,
28 config: StreamSplitConfig,
29 ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>;
30}
31
32pub fn resolve_format(
33 format: &StreamSplitFormat,
34 metadata: &StreamMetadata,
35) -> Result<StreamSplitFormat, CamelError> {
36 match format {
37 StreamSplitFormat::Auto => {
38 let ct = metadata
39 .content_type
40 .as_deref()
41 .unwrap_or("")
42 .to_lowercase();
43 let ct = ct.split(';').next().unwrap_or("").trim();
44 match ct {
45 "application/x-ndjson" => Ok(StreamSplitFormat::Ndjson),
46 "text/plain" => Ok(StreamSplitFormat::Lines),
47 "application/octet-stream" => Ok(StreamSplitFormat::Chunks),
48 "" => Err(CamelError::Config(
49 "stream split format=Auto but stream has no content_type".into(),
50 )),
51 other => Err(CamelError::Config(format!(
52 "stream split format=Auto: unknown content type '{}'",
53 other
54 ))),
55 }
56 }
57 other => Ok(other.clone()),
58 }
59}
60
61pub fn resolve_codec(format: &StreamSplitFormat) -> Box<dyn StreamSplitCodec> {
62 match format {
63 StreamSplitFormat::Ndjson => Box::new(ndjson::NdjsonCodec),
64 StreamSplitFormat::Lines => Box::new(lines::LinesCodec),
65 StreamSplitFormat::Chunks => Box::new(chunks::ChunksCodec),
66 StreamSplitFormat::Auto => unreachable!("resolve_format must be called first"),
67 }
68}
69
70pub fn fragment_stream_exchange(parent: &Exchange, body: Body) -> Exchange {
71 let mut ex = fragment_exchange(parent, body);
72 ex.input.headers.remove("Content-Length");
73 ex.input.headers.remove("Content-Type");
74 ex
75}