Skip to main content

camel_api/
processor.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::{Arc, Mutex};
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use crate::error::CamelError;
9use crate::exchange::Exchange;
10
11/// A Processor is a Tower Service that transforms an Exchange.
12///
13/// Any type implementing `Service<Exchange, Response = Exchange, Error = CamelError>`
14/// that is also `Clone + Send + Sync + 'static` automatically implements `Processor`.
15pub trait Processor:
16    Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
17{
18}
19
20// Blanket implementation: anything satisfying the bounds is a Processor.
21impl<P> Processor for P where
22    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
23{
24}
25
26/// An identity processor that passes the exchange through unchanged.
27#[derive(Debug, Clone)]
28pub struct IdentityProcessor;
29
30impl Service<Exchange> for IdentityProcessor {
31    type Response = Exchange;
32    type Error = CamelError;
33    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
34
35    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
36        Poll::Ready(Ok(()))
37    }
38
39    fn call(&mut self, exchange: Exchange) -> Self::Future {
40        Box::pin(async move { Ok(exchange) })
41    }
42}
43
44/// A type-erased, cloneable processor. This is the main runtime representation
45/// of a processor pipeline — a composed chain of Tower Services erased to a
46/// single boxed type.
47pub type BoxProcessor = tower::util::BoxCloneService<Exchange, Exchange, CamelError>;
48
49/// Thread-safe wrapper for [`BoxProcessor`].
50///
51/// `BoxProcessor` (`BoxCloneService`) is `Send` but not `Sync` because the
52/// inner `Box<dyn CloneServiceInner>` lacks a `Sync` bound. This wrapper
53/// stores the processor behind `Arc<Mutex<...>>`, providing safe `Send+Sync`
54/// access. The Mutex is only held briefly during `clone()` — each caller
55/// gets an independent `BoxProcessor` copy.
56pub struct SyncBoxProcessor(Arc<Mutex<BoxProcessor>>);
57
58impl SyncBoxProcessor {
59    pub fn new(processor: BoxProcessor) -> Self {
60        SyncBoxProcessor(Arc::new(Mutex::new(processor)))
61    }
62
63    pub fn clone_inner(&self) -> BoxProcessor {
64        self.0.lock().unwrap_or_else(|e| e.into_inner()).clone()
65    }
66}
67
68impl Clone for SyncBoxProcessor {
69    fn clone(&self) -> Self {
70        SyncBoxProcessor(self.0.clone())
71    }
72}
73
74/// Extension trait for [`BoxProcessor`] providing ergonomic constructors.
75///
76/// Since `BoxProcessor` is a type alias for Tower's `BoxCloneService`, we cannot
77/// add inherent methods to it. This trait fills that gap.
78///
79/// # Example
80///
81/// ```ignore
82/// use camel_api::{BoxProcessor, BoxProcessorExt};
83///
84/// let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
85/// ```
86pub trait BoxProcessorExt {
87    /// Create a [`BoxProcessor`] from an async closure.
88    ///
89    /// This is a convenience shorthand for `BoxProcessor::new(ProcessorFn::new(f))`.
90    fn from_fn<F, Fut>(f: F) -> BoxProcessor
91    where
92        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
93        Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static;
94}
95
96impl BoxProcessorExt for BoxProcessor {
97    fn from_fn<F, Fut>(f: F) -> BoxProcessor
98    where
99        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
100        Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
101    {
102        BoxProcessor::new(ProcessorFn::new(f))
103    }
104}
105
106/// Adapts an `Fn(Exchange) -> Future<Result<Exchange>>` closure into a Tower Service.
107/// This allows user-provided async closures (via `.process()`) to participate
108/// in the Tower pipeline.
109pub struct ProcessorFn<F> {
110    f: Arc<F>,
111}
112
113// Manual Clone impl: Arc<F> is always Clone, regardless of F.
114impl<F> Clone for ProcessorFn<F> {
115    fn clone(&self) -> Self {
116        Self {
117            f: Arc::clone(&self.f),
118        }
119    }
120}
121
122impl<F> ProcessorFn<F> {
123    pub fn new(f: F) -> Self {
124        Self { f: Arc::new(f) }
125    }
126}
127
128impl<F, Fut> Service<Exchange> for ProcessorFn<F>
129where
130    F: Fn(Exchange) -> Fut + Send + Sync + 'static,
131    Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
132{
133    type Response = Exchange;
134    type Error = CamelError;
135    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
136
137    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
138        Poll::Ready(Ok(()))
139    }
140
141    fn call(&mut self, exchange: Exchange) -> Self::Future {
142        let f = Arc::clone(&self.f);
143        Box::pin(async move { f(exchange).await })
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::message::Message;
151    use tower::ServiceExt;
152
153    #[tokio::test]
154    async fn test_identity_processor_passes_through() {
155        let exchange = Exchange::new(Message::new("hello"));
156        let processor = IdentityProcessor;
157
158        let result = processor.oneshot(exchange).await.unwrap();
159        assert_eq!(result.input.body.as_text(), Some("hello"));
160    }
161
162    #[tokio::test]
163    async fn test_identity_processor_preserves_headers() {
164        let mut exchange = Exchange::new(Message::default());
165        exchange
166            .input
167            .set_header("key", serde_json::Value::String("value".into()));
168
169        let processor = IdentityProcessor;
170        let result = processor.oneshot(exchange).await.unwrap();
171        assert_eq!(
172            result.input.header("key"),
173            Some(&serde_json::Value::String("value".into()))
174        );
175    }
176
177    #[tokio::test]
178    async fn test_identity_processor_preserves_properties() {
179        let mut exchange = Exchange::new(Message::default());
180        exchange.set_property("prop", serde_json::Value::Bool(true));
181
182        let processor = IdentityProcessor;
183        let result = processor.oneshot(exchange).await.unwrap();
184        assert_eq!(
185            result.property("prop"),
186            Some(&serde_json::Value::Bool(true))
187        );
188    }
189
190    #[tokio::test]
191    async fn test_processor_fn_transforms_exchange() {
192        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
193            ex.input.body = crate::body::Body::Text("transformed".into());
194            Ok(ex)
195        });
196
197        let exchange = Exchange::new(Message::new("original"));
198        let result = processor.oneshot(exchange).await.unwrap();
199        assert_eq!(result.input.body.as_text(), Some("transformed"));
200    }
201
202    #[tokio::test]
203    async fn test_processor_fn_can_return_error() {
204        let processor = ProcessorFn::new(|_ex: Exchange| async move {
205            Err(CamelError::ProcessorError("intentional error".into()))
206        });
207
208        let exchange = Exchange::new(Message::default());
209        let result: Result<Exchange, CamelError> = processor.oneshot(exchange).await;
210        assert!(result.is_err());
211    }
212
213    #[tokio::test]
214    async fn test_processor_fn_is_cloneable() {
215        let processor = ProcessorFn::new(|ex: Exchange| async move { Ok(ex) });
216        let cloned = processor.clone();
217
218        let exchange = Exchange::new(Message::new("test"));
219        let result = cloned.oneshot(exchange).await.unwrap();
220        assert_eq!(result.input.body.as_text(), Some("test"));
221    }
222
223    #[tokio::test]
224    async fn test_box_processor_from_identity() {
225        let processor: BoxProcessor = tower::util::BoxCloneService::new(IdentityProcessor);
226
227        let exchange = Exchange::new(Message::new("boxed"));
228        let result = processor.oneshot(exchange).await.unwrap();
229        assert_eq!(result.input.body.as_text(), Some("boxed"));
230    }
231
232    #[tokio::test]
233    async fn test_box_processor_from_processor_fn() {
234        let processor: BoxProcessor =
235            tower::util::BoxCloneService::new(ProcessorFn::new(|mut ex: Exchange| async move {
236                ex.input.body = crate::body::Body::Text("via_box".into());
237                Ok(ex)
238            }));
239
240        let exchange = Exchange::new(Message::new("original"));
241        let result = processor.oneshot(exchange).await.unwrap();
242        assert_eq!(result.input.body.as_text(), Some("via_box"));
243    }
244
245    #[tokio::test]
246    async fn test_box_processor_ext_from_fn() {
247        let processor = BoxProcessor::from_fn(|mut ex: Exchange| async move {
248            ex.input.body = crate::body::Body::Text("via_from_fn".into());
249            Ok(ex)
250        });
251
252        let exchange = Exchange::new(Message::new("original"));
253        let result = processor.oneshot(exchange).await.unwrap();
254        assert_eq!(result.input.body.as_text(), Some("via_from_fn"));
255    }
256}