camel_processor/
stream_cache.rs1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::body::Body;
8use camel_api::stream_cache::StreamCacheConfig;
9use camel_api::{CamelError, Exchange};
10use tower::ServiceExt;
11
12#[derive(Clone)]
13pub struct StreamCacheService<P> {
14 inner: P,
15 config: StreamCacheConfig,
16}
17
18impl<P> StreamCacheService<P> {
19 pub fn new(inner: P, config: StreamCacheConfig) -> Self {
20 Self { inner, config }
21 }
22}
23
24impl<P> Service<Exchange> for StreamCacheService<P>
25where
26 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + 'static,
27 P::Future: Send,
28{
29 type Response = Exchange;
30 type Error = CamelError;
31 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
32
33 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
34 self.inner.poll_ready(cx)
35 }
36
37 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
38 let threshold = self.config.threshold;
39 let body = std::mem::replace(&mut exchange.input.body, Body::Empty);
40 match body {
41 Body::Stream(_) => {
42 let inner = self.inner.clone();
43 Box::pin(async move {
44 let bytes = body.into_bytes(threshold).await?;
45 exchange.input.body = Body::Bytes(bytes);
46 inner.oneshot(exchange).await
47 })
48 }
49 _ => {
50 exchange.input.body = body;
51 let fut = self.inner.call(exchange);
52 Box::pin(fut)
53 }
54 }
55 }
56}
57
58#[cfg(test)]
59mod tests {
60 use super::*;
61 use bytes::Bytes;
62 use camel_api::body::{StreamBody, StreamMetadata};
63 use camel_api::{IdentityProcessor, Message};
64 use futures::stream;
65 use std::sync::Arc;
66 use tokio::sync::Mutex;
67 use tower::ServiceExt;
68
69 fn make_stream_body(data: Vec<u8>) -> Body {
70 let chunks: Vec<Result<Bytes, CamelError>> =
71 vec![Ok(Bytes::from(data))];
72 let stream = stream::iter(chunks);
73 Body::Stream(StreamBody {
74 stream: Arc::new(Mutex::new(Some(Box::pin(stream)))),
75 metadata: StreamMetadata::default(),
76 })
77 }
78
79 #[tokio::test]
80 async fn test_stream_cache_materializes_stream() {
81 let svc = StreamCacheService::new(
82 IdentityProcessor,
83 StreamCacheConfig::default(),
84 );
85 let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
86 let result = svc.oneshot(exchange).await.unwrap();
87 assert!(matches!(result.input.body, Body::Bytes(_)));
88 match &result.input.body {
89 Body::Bytes(b) => assert_eq!(b.as_ref(), b"hello"),
90 _ => panic!("expected Bytes"),
91 }
92 }
93
94 #[tokio::test]
95 async fn test_stream_cache_passes_through_text() {
96 let svc = StreamCacheService::new(
97 IdentityProcessor,
98 StreamCacheConfig::default(),
99 );
100 let exchange = Exchange::new(Message::new(Body::Text("unchanged".to_string())));
101 let result = svc.oneshot(exchange).await.unwrap();
102 assert_eq!(result.input.body, Body::Text("unchanged".to_string()));
103 }
104
105 #[tokio::test]
106 async fn test_stream_cache_passes_through_bytes() {
107 let svc = StreamCacheService::new(
108 IdentityProcessor,
109 StreamCacheConfig::default(),
110 );
111 let exchange = Exchange::new(Message::new(Body::Bytes(Bytes::from_static(b"data"))));
112 let result = svc.oneshot(exchange).await.unwrap();
113 assert!(matches!(result.input.body, Body::Bytes(_)));
114 }
115
116 #[tokio::test]
117 async fn test_stream_cache_passes_through_json() {
118 let svc = StreamCacheService::new(
119 IdentityProcessor,
120 StreamCacheConfig::default(),
121 );
122 let exchange = Exchange::new(Message::new(Body::Json(serde_json::json!({"k": 1}))));
123 let result = svc.oneshot(exchange).await.unwrap();
124 assert!(matches!(result.input.body, Body::Json(_)));
125 }
126
127 #[tokio::test]
128 async fn test_stream_cache_exceeds_threshold() {
129 let svc = StreamCacheService::new(
130 IdentityProcessor,
131 StreamCacheConfig::new(5),
132 );
133 let exchange = Exchange::new(Message::new(make_stream_body(b"hello world".to_vec())));
134 let result = svc.oneshot(exchange).await;
135 assert!(matches!(result, Err(CamelError::StreamLimitExceeded(_))));
136 }
137
138 #[tokio::test]
139 async fn test_stream_cache_preserves_headers() {
140 use camel_api::Value;
141 let svc = StreamCacheService::new(
142 IdentityProcessor,
143 StreamCacheConfig::default(),
144 );
145 let mut msg = Message::new(make_stream_body(b"data".to_vec()));
146 msg.set_header("x-test", Value::String("kept".into()));
147 let exchange = Exchange::new(msg);
148 let result = svc.oneshot(exchange).await.unwrap();
149 assert_eq!(
150 result.input.header("x-test"),
151 Some(&Value::String("kept".into()))
152 );
153 }
154
155 #[tokio::test]
156 async fn test_e2e_stream_cache_then_convert_body_to() {
157 use crate::ConvertBodyTo;
158 use camel_api::body_converter::BodyType;
159
160 let inner = ConvertBodyTo::new(IdentityProcessor, BodyType::Text);
161 let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
162 let exchange = Exchange::new(Message::new(make_stream_body(b"hello".to_vec())));
163 let result = svc.oneshot(exchange).await.unwrap();
164 assert_eq!(result.input.body, Body::Text("hello".to_string()));
165 }
166
167 #[tokio::test]
168 async fn test_e2e_stream_cache_then_unmarshal_json() {
169 use crate::UnmarshalService;
170 use crate::data_format::builtin_data_format;
171
172 let df = builtin_data_format("json").unwrap();
173 let inner = UnmarshalService::new(IdentityProcessor, df);
174 let svc = StreamCacheService::new(inner, StreamCacheConfig::default());
175 let exchange = Exchange::new(Message::new(make_stream_body(br#"{"key":"value"}"#.to_vec())));
176 let result = svc.oneshot(exchange).await.unwrap();
177 match &result.input.body {
178 Body::Json(v) => assert_eq!(v["key"], "value"),
179 other => panic!("expected Json, got {:?}", other),
180 }
181 }
182}