camel-processor 0.16.0

Message processors for rust-camel
Documentation
use bytes::Bytes;
use camel_api::{
    Body, CamelError, Exchange, StreamMetadata, StreamSplitConfig, StreamSplitFormat,
    fragment_exchange,
};
use futures::Stream;
use std::pin::Pin;

pub mod chunks;
pub mod lines;
pub mod ndjson;

pub const CAMEL_STREAM_ORIGIN: &str = "CamelStreamOrigin";
pub const CAMEL_STREAM_SOURCE_CONTENT_TYPE: &str = "CamelStreamSourceContentType";
pub const CAMEL_STREAM_OFFSET: &str = "CamelStreamOffset";
pub const CAMEL_STREAM_BATCH_SIZE: &str = "CamelStreamBatchSize";

pub struct StreamSplitInput {
    pub parent: Exchange,
    pub stream: Pin<Box<dyn Stream<Item = Result<Bytes, CamelError>> + Send>>,
    pub metadata: StreamMetadata,
}

pub trait StreamSplitCodec: Send + Sync {
    fn split(
        &self,
        input: StreamSplitInput,
        config: StreamSplitConfig,
    ) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>;
}

pub fn resolve_format(
    format: &StreamSplitFormat,
    metadata: &StreamMetadata,
) -> Result<StreamSplitFormat, CamelError> {
    match format {
        StreamSplitFormat::Auto => {
            let ct = metadata
                .content_type
                .as_deref()
                .unwrap_or("")
                .to_lowercase();
            let ct = ct.split(';').next().unwrap_or("").trim();
            match ct {
                "application/x-ndjson" => Ok(StreamSplitFormat::Ndjson),
                "text/plain" => Ok(StreamSplitFormat::Lines),
                "application/octet-stream" => Ok(StreamSplitFormat::Chunks),
                "" => Err(CamelError::Config(
                    "stream split format=Auto but stream has no content_type".into(),
                )),
                other => Err(CamelError::Config(format!(
                    "stream split format=Auto: unknown content type '{}'",
                    other
                ))),
            }
        }
        other => Ok(other.clone()),
    }
}

pub fn resolve_codec(format: &StreamSplitFormat) -> Box<dyn StreamSplitCodec> {
    match format {
        StreamSplitFormat::Ndjson => Box::new(ndjson::NdjsonCodec),
        StreamSplitFormat::Lines => Box::new(lines::LinesCodec),
        StreamSplitFormat::Chunks => Box::new(chunks::ChunksCodec),
        StreamSplitFormat::Auto => unreachable!("resolve_format must be called first"),
    }
}

pub fn fragment_stream_exchange(parent: &Exchange, body: Body) -> Exchange {
    let mut ex = fragment_exchange(parent, body);
    ex.input.headers.remove("Content-Length");
    ex.input.headers.remove("Content-Type");
    ex
}