camel_processor/
stream_cache.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::stream_cache::StreamCacheConfig;
9use camel_api::{CamelError, Exchange};
10use tower::ServiceExt;
11
12#[derive(Clone)]
13pub struct StreamCacheService<P> {
14 inner: P,
15 config: StreamCacheConfig,
16}
17
18impl<P> StreamCacheService<P> {
19 pub fn new(inner: P, config: StreamCacheConfig) -> Self {
20 Self { inner, config }
21 }
22}
23
24impl<P> Service<Exchange> for StreamCacheService<P>
25where
26 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
27 P::Future: Send,
28{
29 type Response = Exchange;
30 type Error = CamelError;
31 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34 self.inner.poll_ready(cx)
35 }
36
37 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38 let threshold = self.config.threshold;
39 let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
40 match body {
41 Body::Stream(_) => {
42 let inner = self.inner.clone();
43 Box::pin(async move {
44 let bytes = body.into_bytes(threshold).await?;
45 exchange.input.body = Body::Bytes(bytes);
46 inner.oneshot(exchange).await
47 })
48 }
49 _ => {
50 exchange.input.body = body;
51 let fut = self.inner.call(exchange);
52 Box::pin(fut)
53 }
54 }
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use bytes::Bytes;
62 use camel_api::body::{StreamBody, StreamMetadata};
63 use camel_api::{IdentityProcessor, Message};
64 use futures::stream;
65 use std::sync::Arc;
66 use tokio::sync::Mutex;
67 use tower::ServiceExt;
68
69 fn make_stream_body(data: Vec<u8>) -> Body {
70 let chunks: Vec<Result<Bytes, CamelError>> = vec![Ok(Bytes::from(data))];
71 let stream = stream::iter(chunks);
72 Body::Stream(StreamBody {
73 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
74 metadata: StreamMetadata::default(),
75 })
76 }
77
78 #[tokio::test]
79 async fn test_stream_cache_materializes_stream() {
80 let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
81 let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
82 let result = svc.oneshot(exchange).await.unwrap();
83 assert!(matches!(result.input.body, Body::Bytes(_)));
84 match &result.input.body {
85 Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello"),
86 _ => panic!("expected Bytes"),
87 }
88 }
89
90 #[tokio::test]
91 async fn test_stream_cache_passes_through_text() {
92 let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
93 let exchange = Exchange::new(Message::new(Body::Text("unchanged".to_string())));
94 let result = svc.oneshot(exchange).await.unwrap();
95 assert_eq!(result.input.body, Body::Text("unchanged".to_string()));
96 }
97
98 #[tokio::test]
99 async fn test_stream_cache_passes_through_bytes() {
100 let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
101 let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"data"))));
102 let result = svc.oneshot(exchange).await.unwrap();
103 assert!(matches!(result.input.body, Body::Bytes(_)));
104 }
105
106 #[tokio::test]
107 async fn test_stream_cache_passes_through_json() {
108 let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
109 let exchange = Exchange::new(Message::new(Body::Json(serde_json::json!({"k": 1}))));
110 let result = svc.oneshot(exchange).await.unwrap();
111 assert!(matches!(result.input.body, Body::Json(_)));
112 }
113
114 #[tokio::test]
115 async fn test_stream_cache_exceeds_threshold() {
116 let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::new(5));
117 let exchange = Exchange::new(Message::new(make_stream_body(b"hello world".to_vec())));
118 let result = svc.oneshot(exchange).await;
119 assert!(matches!(result, Err(CamelError::StreamLimitExceeded(_))));
120 }
121
122 #[tokio::test]
123 async fn test_stream_cache_preserves_headers() {
124 use camel_api::Value;
125 let svc = StreamCacheService::new(IdentityProcessor, StreamCacheConfig::default());
126 let mut msg = Message::new(make_stream_body(b"data".to_vec()));
127 msg.set_header("x-test", Value::String("kept".into()));
128 let exchange = Exchange::new(msg);
129 let result = svc.oneshot(exchange).await.unwrap();
130 assert_eq!(
131 result.input.header("x-test"),
132 Some(&Value::String("kept".into()))
133 );
134 }
135
136 #[tokio::test]
137 async fn test_e2e_stream_cache_then_convert_body_to() {
138 use crate::ConvertBodyTo;
139 use camel_api::body_converter::BodyType;
140
141 let inner = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
142 let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
143 let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
144 let result = svc.oneshot(exchange).await.unwrap();
145 assert_eq!(result.input.body, Body::Text("hello".to_string()));
146 }
147
148 #[tokio::test]
149 async fn test_e2e_stream_cache_then_unmarshal_json() {
150 use crate::UnmarshalService;
151 use crate::data_format::builtin_data_format;
152
153 let df = builtin_data_format("json").unwrap();
154 let inner = UnmarshalService::new(IdentityProcessor, df);
155 let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
156 let exchange = Exchange::new(Message::new(make_stream_body(
157 br#"{"key":"value"}"#.to_vec(),
158 )));
159 let result = svc.oneshot(exchange).await.unwrap();
160 match &result.input.body {
161 Body::Json(v) => assert_eq!(v["key"], "value"),
162 other => panic!("expected Json, got {:?}", other),
163 }
164 }
165}