camel-processor 0.8.1

Message processors for rust-camel
Documentation
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use tower::Service;

use camel_api::body::Body;
use camel_api::stream_cache::StreamCacheConfig;
use camel_api::{CamelError, Exchange};
use tower::ServiceExt;

#[derive(Clone)]
pub struct StreamCacheService<P> {
    inner: P,
    config: StreamCacheConfig,
}

impl<P> StreamCacheService<P> {
    pub fn new(inner: P, config: StreamCacheConfig) -> Self {
        Self { inner, config }
    }
}

impl<P> Service<Exchange> for StreamCacheService<P>
where
    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
    P::Future: Send,
{
    type Response = Exchange;
    type Error = CamelError;
    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.inner.poll_ready(cx)
    }

    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
        let threshold = self.config.threshold;
        let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
        match body {
            Body::Stream(_) => {
                let inner = self.inner.clone();
                Box::pin(async move {
                    let bytes = body.into_bytes(threshold).await?;
                    exchange.input.body = Body::Bytes(bytes);
                    inner.oneshot(exchange).await
                })
            }
            _ => {
                exchange.input.body = body;
                let fut = self.inner.call(exchange);
                Box::pin(fut)
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use bytes::Bytes;
    use camel_api::body::{StreamBody, StreamMetadata};
    use camel_api::{IdentityProcessor, Message};
    use futures::stream;
    use std::sync::Arc;
    use tokio::sync::Mutex;
    use tower::ServiceExt;

    fn make_stream_body(data: Vec<u8>) -> Body {
        let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from(data))];
        let stream = stream::iter(chunks);
        Body::Stream(StreamBody {
            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
            metadata: StreamMetadata::default(),
        })
    }

    #[tokio::test]
    async fn test_stream_cache_materializes_stream() {
        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
        let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
        let result = svc.oneshot(exchange).await.unwrap();
        assert!(matches!(result.input.body, Body::Bytes(_)));
        match &result.input.body {
            Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello"),
            _ => panic!("expected Bytes"),
        }
    }

    #[tokio::test]
    async fn test_stream_cache_passes_through_text() {
        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
        let exchange = Exchange::new(Message::new(Body::Text("unchanged".to_string())));
        let result = svc.oneshot(exchange).await.unwrap();
        assert_eq!(result.input.body, Body::Text("unchanged".to_string()));
    }

    #[tokio::test]
    async fn test_stream_cache_passes_through_bytes() {
        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
        let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"data"))));
        let result = svc.oneshot(exchange).await.unwrap();
        assert!(matches!(result.input.body, Body::Bytes(_)));
    }

    #[tokio::test]
    async fn test_stream_cache_passes_through_json() {
        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
        let exchange = Exchange::new(Message::new(Body::Json(serde_json::json!({"k": 1}))));
        let result = svc.oneshot(exchange).await.unwrap();
        assert!(matches!(result.input.body, Body::Json(_)));
    }

    #[tokio::test]
    async fn test_stream_cache_exceeds_threshold() {
        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::new(5));
        let exchange = Exchange::new(Message::new(make_stream_body(b"hello world".to_vec())));
        let result = svc.oneshot(exchange).await;
        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(_))));
    }

    #[tokio::test]
    async fn test_stream_cache_preserves_headers() {
        use camel_api::Value;
        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
        let mut msg = Message::new(make_stream_body(b"data".to_vec()));
        msg.set_header("x-test", Value::String("kept".into()));
        let exchange = Exchange::new(msg);
        let result = svc.oneshot(exchange).await.unwrap();
        assert_eq!(
            result.input.header("x-test"),
            Some(&Value::String("kept".into()))
        );
    }

    #[tokio::test]
    async fn test_e2e_stream_cache_then_convert_body_to() {
        use crate::ConvertBodyTo;
        use camel_api::body_converter::BodyType;

        let inner = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
        let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
        let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
        let result = svc.oneshot(exchange).await.unwrap();
        assert_eq!(result.input.body, Body::Text("hello".to_string()));
    }

    #[tokio::test]
    async fn test_e2e_stream_cache_then_unmarshal_json() {
        use crate::UnmarshalService;
        use crate::data_format::builtin_data_format;

        let df = builtin_data_format("json").unwrap();
        let inner = UnmarshalService::new(IdentityProcessor, df);
        let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
        let exchange = Exchange::new(Message::new(make_stream_body(
            br#"{"key":"value"}"#.to_vec(),
        )));
        let result = svc.oneshot(exchange).await.unwrap();
        match &result.input.body {
            Body::Json(v) => assert_eq!(v["key"], "value"),
            other => panic!("expected Json, got {:?}", other),
        }
    }
}