Skip to main content

camel_processor/
stream_codec.rs

1use bytes::Bytes;
2use camel_api::{
3    Body, CamelError, Exchange, StreamMetadata, StreamSplitConfig, StreamSplitFormat,
4    fragment_exchange,
5};
6use futures::Stream;
7use std::pin::Pin;
8
9pub mod chunks;
10pub mod lines;
11pub mod ndjson;
12
13pub const CAMEL_STREAM_ORIGIN: &str = "CamelStreamOrigin";
14pub const CAMEL_STREAM_SOURCE_CONTENT_TYPE: &str = "CamelStreamSourceContentType";
15pub const CAMEL_STREAM_OFFSET: &str = "CamelStreamOffset";
16pub const CAMEL_STREAM_BATCH_SIZE: &str = "CamelStreamBatchSize";
17
18pub struct StreamSplitInput {
19    pub parent: Exchange,
20    pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, CamelError>> + Send>>,
21    pub metadata: StreamMetadata,
22}
23
24pub trait StreamSplitCodec: Send + Sync {
25    fn split(
26        &self,
27        input: StreamSplitInput,
28        config: StreamSplitConfig,
29    ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>;
30}
31
32pub fn resolve_format(
33    format: &StreamSplitFormat,
34    metadata: &StreamMetadata,
35) -> Result<StreamSplitFormat, CamelError> {
36    match format {
37        StreamSplitFormat::Auto => {
38            let ct = metadata
39                .content_type
40                .as_deref()
41                .unwrap_or("")
42                .to_lowercase();
43            let ct = ct.split(';').next().unwrap_or("").trim();
44            match ct {
45                "application/x-ndjson" => Ok(StreamSplitFormat::Ndjson),
46                "text/plain" => Ok(StreamSplitFormat::Lines),
47                "application/octet-stream" => Ok(StreamSplitFormat::Chunks),
48                "" => Err(CamelError::Config(
49                    "stream split format=Auto but stream has no content_type".into(),
50                )),
51                other => Err(CamelError::Config(format!(
52                    "stream split format=Auto: unknown content type '{}'",
53                    other
54                ))),
55            }
56        }
57        other => Ok(other.clone()),
58    }
59}
60
61pub fn resolve_codec(format: &StreamSplitFormat) -> Box<dyn StreamSplitCodec> {
62    match format {
63        StreamSplitFormat::Ndjson => Box::new(ndjson::NdjsonCodec),
64        StreamSplitFormat::Lines => Box::new(lines::LinesCodec),
65        StreamSplitFormat::Chunks => Box::new(chunks::ChunksCodec),
66        StreamSplitFormat::Auto => unreachable!("resolve_format must be called first"),
67    }
68}
69
70pub fn fragment_stream_exchange(parent: &Exchange, body: Body) -> Exchange {
71    let mut ex = fragment_exchange(parent, body);
72    ex.input.headers.remove("Content-Length");
73    ex.input.headers.remove("Content-Type");
74    ex
75}