Skip to main content

camel_processor/
stream_cache.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::stream_cache::StreamCacheConfig;
9use camel_api::{CamelError, Exchange};
10use tower::ServiceExt;
11
12#[derive(Clone)]
13pub struct StreamCacheService<P> {
14    inner: P,
15    config: StreamCacheConfig,
16}
17
18impl<P> StreamCacheService<P> {
19    pub fn new(inner: P, config: StreamCacheConfig) -> Self {
20        Self { inner, config }
21    }
22}
23
24impl<P> Service<Exchange> for StreamCacheService<P>
25where
26    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
27    P::Future: Send,
28{
29    type Response = Exchange;
30    type Error = CamelError;
31    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34        self.inner.poll_ready(cx)
35    }
36
37    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38        let threshold = self.config.threshold;
39        let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
40        match body {
41            Body::Stream(_) => {
42                let inner = self.inner.clone();
43                Box::pin(async move {
44                    let bytes = body.into_bytes(threshold).await?;
45                    exchange.input.body = Body::Bytes(bytes);
46                    inner.oneshot(exchange).await
47                })
48            }
49            _ => {
50                exchange.input.body = body;
51                let fut = self.inner.call(exchange);
52                Box::pin(fut)
53            }
54        }
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use bytes::Bytes;
62    use camel_api::body::{StreamBody, StreamMetadata};
63    use camel_api::{IdentityProcessor, Message};
64    use futures::stream;
65    use std::sync::Arc;
66    use tokio::sync::Mutex;
67    use tower::ServiceExt;
68
69    fn make_stream_body(data: Vec<u8>) -> Body {
70        let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from(data))];
71        let stream = stream::iter(chunks);
72        Body::Stream(StreamBody {
73            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
74            metadata: StreamMetadata::default(),
75        })
76    }
77
78    #[tokio::test]
79    async fn test_stream_cache_materializes_stream() {
80        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
81        let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
82        let result = svc.oneshot(exchange).await.unwrap();
83        assert!(matches!(result.input.body, Body::Bytes(_)));
84        match &result.input.body {
85            Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello"),
86            _ => panic!("expected Bytes"),
87        }
88    }
89
90    #[tokio::test]
91    async fn test_stream_cache_passes_through_text() {
92        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
93        let exchange = Exchange::new(Message::new(Body::Text("unchanged".to_string())));
94        let result = svc.oneshot(exchange).await.unwrap();
95        assert_eq!(result.input.body, Body::Text("unchanged".to_string()));
96    }
97
98    #[tokio::test]
99    async fn test_stream_cache_passes_through_bytes() {
100        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
101        let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"data"))));
102        let result = svc.oneshot(exchange).await.unwrap();
103        assert!(matches!(result.input.body, Body::Bytes(_)));
104    }
105
106    #[tokio::test]
107    async fn test_stream_cache_passes_through_json() {
108        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
109        let exchange = Exchange::new(Message::new(Body::Json(serde_json::json!({"k": 1}))));
110        let result = svc.oneshot(exchange).await.unwrap();
111        assert!(matches!(result.input.body, Body::Json(_)));
112    }
113
114    #[tokio::test]
115    async fn test_stream_cache_exceeds_threshold() {
116        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::new(5));
117        let exchange = Exchange::new(Message::new(make_stream_body(b"hello world".to_vec())));
118        let result = svc.oneshot(exchange).await;
119        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(_))));
120    }
121
122    #[tokio::test]
123    async fn test_stream_cache_preserves_headers() {
124        use camel_api::Value;
125        let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
126        let mut msg = Message::new(make_stream_body(b"data".to_vec()));
127        msg.set_header("x-test", Value::String("kept".into()));
128        let exchange = Exchange::new(msg);
129        let result = svc.oneshot(exchange).await.unwrap();
130        assert_eq!(
131            result.input.header("x-test"),
132            Some(&Value::String("kept".into()))
133        );
134    }
135
136    #[tokio::test]
137    async fn test_e2e_stream_cache_then_convert_body_to() {
138        use crate::ConvertBodyTo;
139        use camel_api::body_converter::BodyType;
140
141        let inner = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
142        let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
143        let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
144        let result = svc.oneshot(exchange).await.unwrap();
145        assert_eq!(result.input.body, Body::Text("hello".to_string()));
146    }
147
148    #[tokio::test]
149    async fn test_e2e_stream_cache_then_unmarshal_json() {
150        use crate::UnmarshalService;
151        use crate::data_format::builtin_data_format;
152
153        let df = builtin_data_format("json").unwrap();
154        let inner = UnmarshalService::new(IdentityProcessor, df);
155        let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
156        let exchange = Exchange::new(Message::new(make_stream_body(
157            br#"{"key":"value"}"#.to_vec(),
158        )));
159        let result = svc.oneshot(exchange).await.unwrap();
160        match &result.input.body {
161            Body::Json(v) => assert_eq!(v["key"], "value"),
162            other => panic!("expected Json, got {:?}", other),
163        }
164    }
165}