Skip to main content

camel_processor/
stream_cache.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::stream_cache::StreamCacheConfig;
9use camel_api::{CamelError, Exchange};
10use tower::ServiceExt;
11
12#[derive(Clone)]
13pub struct StreamCacheService<P> {
14    inner: P,
15    config: StreamCacheConfig,
16}
17
18impl<P> StreamCacheService<P> {
19    pub fn new(inner: P, config: StreamCacheConfig) -> Self {
20        Self { inner, config }
21    }
22}
23
24impl<P> Service<Exchange> for StreamCacheService<P>
25where
26    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
27    P::Future: Send,
28{
29    type Response = Exchange;
30    type Error = CamelError;
31    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34        self.inner.poll_ready(cx)
35    }
36
37    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38        let threshold = self.config.threshold;
39        let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
40        match body {
41            Body::Stream(_) => {
42                let inner = self.inner.clone();
43                Box::pin(async move {
44                    let bytes = body.into_bytes(threshold).await?;
45                    exchange.input.body = Body::Bytes(bytes);
46                    inner.oneshot(exchange).await
47                })
48            }
49            _ => {
50                exchange.input.body = body;
51                let fut = self.inner.call(exchange);
52                Box::pin(fut)
53            }
54        }
55    }
56}
57
58#[cfg(test)]
59mod tests {
60    use super::*;
61    use bytes::Bytes;
62    use camel_api::body::{StreamBody, StreamMetadata};
63    use camel_api::{IdentityProcessor, Message};
64    use futures::stream;
65    use std::sync::Arc;
66    use tokio::sync::Mutex;
67    use tower::ServiceExt;
68
69    fn make_stream_body(data: Vec<u8>) -> Body {
70        let chunks: Vec<Result<Bytes, CamelError>> =
71            vec![Ok(Bytes::from(data))];
72        let stream = stream::iter(chunks);
73        Body::Stream(StreamBody {
74            stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
75            metadata: StreamMetadata::default(),
76        })
77    }
78
79    #[tokio::test]
80    async fn test_stream_cache_materializes_stream() {
81        let svc = StreamCacheService::new(
82            IdentityProcessor,
83            StreamCacheConfig::default(),
84        );
85        let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
86        let result = svc.oneshot(exchange).await.unwrap();
87        assert!(matches!(result.input.body, Body::Bytes(_)));
88        match &result.input.body {
89            Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello"),
90            _ => panic!("expected Bytes"),
91        }
92    }
93
94    #[tokio::test]
95    async fn test_stream_cache_passes_through_text() {
96        let svc = StreamCacheService::new(
97            IdentityProcessor,
98            StreamCacheConfig::default(),
99        );
100        let exchange = Exchange::new(Message::new(Body::Text("unchanged".to_string())));
101        let result = svc.oneshot(exchange).await.unwrap();
102        assert_eq!(result.input.body, Body::Text("unchanged".to_string()));
103    }
104
105    #[tokio::test]
106    async fn test_stream_cache_passes_through_bytes() {
107        let svc = StreamCacheService::new(
108            IdentityProcessor,
109            StreamCacheConfig::default(),
110        );
111        let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"data"))));
112        let result = svc.oneshot(exchange).await.unwrap();
113        assert!(matches!(result.input.body, Body::Bytes(_)));
114    }
115
116    #[tokio::test]
117    async fn test_stream_cache_passes_through_json() {
118        let svc = StreamCacheService::new(
119            IdentityProcessor,
120            StreamCacheConfig::default(),
121        );
122        let exchange = Exchange::new(Message::new(Body::Json(serde_json::json!({"k": 1}))));
123        let result = svc.oneshot(exchange).await.unwrap();
124        assert!(matches!(result.input.body, Body::Json(_)));
125    }
126
127    #[tokio::test]
128    async fn test_stream_cache_exceeds_threshold() {
129        let svc = StreamCacheService::new(
130            IdentityProcessor,
131            StreamCacheConfig::new(5),
132        );
133        let exchange = Exchange::new(Message::new(make_stream_body(b"hello world".to_vec())));
134        let result = svc.oneshot(exchange).await;
135        assert!(matches!(result, Err(CamelError::StreamLimitExceeded(_))));
136    }
137
138    #[tokio::test]
139    async fn test_stream_cache_preserves_headers() {
140        use camel_api::Value;
141        let svc = StreamCacheService::new(
142            IdentityProcessor,
143            StreamCacheConfig::default(),
144        );
145        let mut msg = Message::new(make_stream_body(b"data".to_vec()));
146        msg.set_header("x-test", Value::String("kept".into()));
147        let exchange = Exchange::new(msg);
148        let result = svc.oneshot(exchange).await.unwrap();
149        assert_eq!(
150            result.input.header("x-test"),
151            Some(&Value::String("kept".into()))
152        );
153    }
154
155    #[tokio::test]
156    async fn test_e2e_stream_cache_then_convert_body_to() {
157        use crate::ConvertBodyTo;
158        use camel_api::body_converter::BodyType;
159
160        let inner = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
161        let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
162        let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
163        let result = svc.oneshot(exchange).await.unwrap();
164        assert_eq!(result.input.body, Body::Text("hello".to_string()));
165    }
166
167    #[tokio::test]
168    async fn test_e2e_stream_cache_then_unmarshal_json() {
169        use crate::UnmarshalService;
170        use crate::data_format::builtin_data_format;
171
172        let df = builtin_data_format("json").unwrap();
173        let inner = UnmarshalService::new(IdentityProcessor, df);
174        let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
175        let exchange = Exchange::new(Message::new(make_stream_body(br#"{"key":"value"}"#.to_vec())));
176        let result = svc.oneshot(exchange).await.unwrap();
177        match &result.input.body {
178            Body::Json(v) => assert_eq!(v["key"], "value"),
179            other => panic!("expected Json, got {:?}", other),
180        }
181    }
182}