1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use crate::error::CamelError;
9use crate::exchange::Exchange;
10
11pub trait Processor:
16 Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
17{
18}
19
20impl<P> Processor for P where
22 P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
23{
24}
25
26#[derive(Debug, Clone)]
28pub struct IdentityProcessor;
29
30impl Service<Exchange> for IdentityProcessor {
31 type Response = Exchange;
32 type Error = CamelError;
33 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
34
35 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
36 Poll::Ready(Ok(()))
37 }
38
39 fn call(&mut self, exchange: Exchange) -> Self::Future {
40 Box::pin(async move { Ok(exchange) })
41 }
42}
43
44pub type BoxProcessor = tower::util::BoxCloneService<Exchange, Exchange, CamelError>;
48
49pub struct SyncBoxProcessor(Arc<Mutex<BoxProcessor>>);
57
58impl SyncBoxProcessor {
59 pub fn new(processor: BoxProcessor) -> Self {
60 SyncBoxProcessor(Arc::new(Mutex::new(processor)))
61 }
62
63 pub fn clone_inner(&self) -> BoxProcessor {
64 self.0.lock().unwrap_or_else(|e| e.into_inner()).clone()
65 }
66}
67
68impl Clone for SyncBoxProcessor {
69 fn clone(&self) -> Self {
70 SyncBoxProcessor(self.0.clone())
71 }
72}
73
74pub trait BoxProcessorExt {
87 fn from_fn<F, Fut>(f: F) -> BoxProcessor
91 where
92 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
93 Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static;
94}
95
96impl BoxProcessorExt for BoxProcessor {
97 fn from_fn<F, Fut>(f: F) -> BoxProcessor
98 where
99 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
100 Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
101 {
102 BoxProcessor::new(ProcessorFn::new(f))
103 }
104}
105
106pub struct ProcessorFn<F> {
110 f: Arc<F>,
111}
112
113impl<F> Clone for ProcessorFn<F> {
115 fn clone(&self) -> Self {
116 Self {
117 f: Arc::clone(&self.f),
118 }
119 }
120}
121
122impl<F> ProcessorFn<F> {
123 pub fn new(f: F) -> Self {
124 Self { f: Arc::new(f) }
125 }
126}
127
128impl<F, Fut> Service<Exchange> for ProcessorFn<F>
129where
130 F: Fn(Exchange) -> Fut + Send + Sync + 'static,
131 Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
132{
133 type Response = Exchange;
134 type Error = CamelError;
135 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
136
137 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138 Poll::Ready(Ok(()))
139 }
140
141 fn call(&mut self, exchange: Exchange) -> Self::Future {
142 let f = Arc::clone(&self.f);
143 Box::pin(async move { f(exchange).await })
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::message::Message;
151 use tower::ServiceExt;
152
153 #[tokio::test]
154 async fn test_identity_processor_passes_through() {
155 let exchange = Exchange::new(Message::new("hello"));
156 let processor = IdentityProcessor;
157
158 let result = processor.oneshot(exchange).await.unwrap();
159 assert_eq!(result.input.body.as_text(), Some("hello"));
160 }
161
162 #[tokio::test]
163 async fn test_identity_processor_preserves_headers() {
164 let mut exchange = Exchange::new(Message::default());
165 exchange
166 .input
167 .set_header("key", serde_json::Value::String("value".into()));
168
169 let processor = IdentityProcessor;
170 let result = processor.oneshot(exchange).await.unwrap();
171 assert_eq!(
172 result.input.header("key"),
173 Some(&serde_json::Value::String("value".into()))
174 );
175 }
176
177 #[tokio::test]
178 async fn test_identity_processor_preserves_properties() {
179 let mut exchange = Exchange::new(Message::default());
180 exchange.set_property("prop", serde_json::Value::Bool(true));
181
182 let processor = IdentityProcessor;
183 let result = processor.oneshot(exchange).await.unwrap();
184 assert_eq!(
185 result.property("prop"),
186 Some(&serde_json::Value::Bool(true))
187 );
188 }
189
190 #[tokio::test]
191 async fn test_processor_fn_transforms_exchange() {
192 let processor = ProcessorFn::new(|mut ex: Exchange| async move {
193 ex.input.body = crate::body::Body::Text("transformed".into());
194 Ok(ex)
195 });
196
197 let exchange = Exchange::new(Message::new("original"));
198 let result = processor.oneshot(exchange).await.unwrap();
199 assert_eq!(result.input.body.as_text(), Some("transformed"));
200 }
201
202 #[tokio::test]
203 async fn test_processor_fn_can_return_error() {
204 let processor = ProcessorFn::new(|_ex: Exchange| async move {
205 Err(CamelError::ProcessorError("intentional error".into()))
206 });
207
208 let exchange = Exchange::new(Message::default());
209 let result: Result<Exchange, CamelError> = processor.oneshot(exchange).await;
210 assert!(result.is_err());
211 }
212
213 #[tokio::test]
214 async fn test_processor_fn_is_cloneable() {
215 let processor = ProcessorFn::new(|ex: Exchange| async move { Ok(ex) });
216 let cloned = processor.clone();
217
218 let exchange = Exchange::new(Message::new("test"));
219 let result = cloned.oneshot(exchange).await.unwrap();
220 assert_eq!(result.input.body.as_text(), Some("test"));
221 }
222
223 #[tokio::test]
224 async fn test_box_processor_from_identity() {
225 let processor: BoxProcessor = tower::util::BoxCloneService::new(IdentityProcessor);
226
227 let exchange = Exchange::new(Message::new("boxed"));
228 let result = processor.oneshot(exchange).await.unwrap();
229 assert_eq!(result.input.body.as_text(), Some("boxed"));
230 }
231
232 #[tokio::test]
233 async fn test_box_processor_from_processor_fn() {
234 let processor: BoxProcessor =
235 tower::util::BoxCloneService::new(ProcessorFn::new(|mut ex: Exchange| async move {
236 ex.input.body = crate::body::Body::Text("via_box".into());
237 Ok(ex)
238 }));
239
240 let exchange = Exchange::new(Message::new("original"));
241 let result = processor.oneshot(exchange).await.unwrap();
242 assert_eq!(result.input.body.as_text(), Some("via_box"));
243 }
244
245 #[tokio::test]
246 async fn test_box_processor_ext_from_fn() {
247 let processor = BoxProcessor::from_fn(|mut ex: Exchange| async move {
248 ex.input.body = crate::body::Body::Text("via_from_fn".into());
249 Ok(ex)
250 });
251
252 let exchange = Exchange::new(Message::new("original"));
253 let result = processor.oneshot(exchange).await.unwrap();
254 assert_eq!(result.input.body.as_text(), Some("via_from_fn"));
255 }
256}