Skip to main content

camel_api/
processor.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use crate::error::CamelError;
9use crate::exchange::Exchange;
10
11/// A Processor is a Tower Service that transforms an Exchange.
12///
13/// Any type implementing `Service<Exchange, Response = Exchange, Error = CamelError>`
14/// that is also `Clone + Send + Sync + 'static` automatically implements `Processor`.
15pub trait Processor:
16    Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
17{
18}
19
20// Blanket implementation: anything satisfying the bounds is a Processor.
21impl<P> Processor for P where
22    P: Service<Exchange, Response = Exchange, Error = CamelError> + Clone + Send + Sync + 'static
23{
24}
25
26/// An identity processor that passes the exchange through unchanged.
27#[derive(Debug, Clone)]
28pub struct IdentityProcessor;
29
30impl Service<Exchange> for IdentityProcessor {
31    type Response = Exchange;
32    type Error = CamelError;
33    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
34
35    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
36        Poll::Ready(Ok(()))
37    }
38
39    fn call(&mut self, exchange: Exchange) -> Self::Future {
40        Box::pin(async move { Ok(exchange) })
41    }
42}
43
44/// A type-erased, cloneable processor. This is the main runtime representation
45/// of a processor pipeline — a composed chain of Tower Services erased to a
46/// single boxed type.
47pub type BoxProcessor = tower::util::BoxCloneService<Exchange, Exchange, CamelError>;
48
49/// Extension trait for [`BoxProcessor`] providing ergonomic constructors.
50///
51/// Since `BoxProcessor` is a type alias for Tower's `BoxCloneService`, we cannot
52/// add inherent methods to it. This trait fills that gap.
53///
54/// # Example
55///
56/// ```ignore
57/// use camel_api::{BoxProcessor, BoxProcessorExt};
58///
59/// let processor = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
60/// ```
61pub trait BoxProcessorExt {
62    /// Create a [`BoxProcessor`] from an async closure.
63    ///
64    /// This is a convenience shorthand for `BoxProcessor::new(ProcessorFn::new(f))`.
65    fn from_fn<F, Fut>(f: F) -> BoxProcessor
66    where
67        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
68        Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static;
69}
70
71impl BoxProcessorExt for BoxProcessor {
72    fn from_fn<F, Fut>(f: F) -> BoxProcessor
73    where
74        F: Fn(Exchange) -> Fut + Send + Sync + 'static,
75        Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
76    {
77        BoxProcessor::new(ProcessorFn::new(f))
78    }
79}
80
81/// Adapts an `Fn(Exchange) -> Future<Result<Exchange>>` closure into a Tower Service.
82/// This allows user-provided async closures (via `.process()`) to participate
83/// in the Tower pipeline.
84pub struct ProcessorFn<F> {
85    f: Arc<F>,
86}
87
88// Manual Clone impl: Arc<F> is always Clone, regardless of F.
89impl<F> Clone for ProcessorFn<F> {
90    fn clone(&self) -> Self {
91        Self {
92            f: Arc::clone(&self.f),
93        }
94    }
95}
96
97impl<F> ProcessorFn<F> {
98    pub fn new(f: F) -> Self {
99        Self { f: Arc::new(f) }
100    }
101}
102
103impl<F, Fut> Service<Exchange> for ProcessorFn<F>
104where
105    F: Fn(Exchange) -> Fut + Send + Sync + 'static,
106    Fut: Future<Output = Result<Exchange, CamelError>> + Send + 'static,
107{
108    type Response = Exchange;
109    type Error = CamelError;
110    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
111
112    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113        Poll::Ready(Ok(()))
114    }
115
116    fn call(&mut self, exchange: Exchange) -> Self::Future {
117        let f = Arc::clone(&self.f);
118        Box::pin(async move { f(exchange).await })
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use crate::message::Message;
126    use tower::ServiceExt;
127
128    #[tokio::test]
129    async fn test_identity_processor_passes_through() {
130        let exchange = Exchange::new(Message::new("hello"));
131        let processor = IdentityProcessor;
132
133        let result = processor.oneshot(exchange).await.unwrap();
134        assert_eq!(result.input.body.as_text(), Some("hello"));
135    }
136
137    #[tokio::test]
138    async fn test_identity_processor_preserves_headers() {
139        let mut exchange = Exchange::new(Message::default());
140        exchange
141            .input
142            .set_header("key", serde_json::Value::String("value".into()));
143
144        let processor = IdentityProcessor;
145        let result = processor.oneshot(exchange).await.unwrap();
146        assert_eq!(
147            result.input.header("key"),
148            Some(&serde_json::Value::String("value".into()))
149        );
150    }
151
152    #[tokio::test]
153    async fn test_identity_processor_preserves_properties() {
154        let mut exchange = Exchange::new(Message::default());
155        exchange.set_property("prop", serde_json::Value::Bool(true));
156
157        let processor = IdentityProcessor;
158        let result = processor.oneshot(exchange).await.unwrap();
159        assert_eq!(
160            result.property("prop"),
161            Some(&serde_json::Value::Bool(true))
162        );
163    }
164
165    #[tokio::test]
166    async fn test_processor_fn_transforms_exchange() {
167        let processor = ProcessorFn::new(|mut ex: Exchange| async move {
168            ex.input.body = crate::body::Body::Text("transformed".into());
169            Ok(ex)
170        });
171
172        let exchange = Exchange::new(Message::new("original"));
173        let result = processor.oneshot(exchange).await.unwrap();
174        assert_eq!(result.input.body.as_text(), Some("transformed"));
175    }
176
177    #[tokio::test]
178    async fn test_processor_fn_can_return_error() {
179        let processor = ProcessorFn::new(|_ex: Exchange| async move {
180            Err(CamelError::ProcessorError("intentional error".into()))
181        });
182
183        let exchange = Exchange::new(Message::default());
184        let result: Result<Exchange, CamelError> = processor.oneshot(exchange).await;
185        assert!(result.is_err());
186    }
187
188    #[tokio::test]
189    async fn test_processor_fn_is_cloneable() {
190        let processor = ProcessorFn::new(|ex: Exchange| async move { Ok(ex) });
191        let cloned = processor.clone();
192
193        let exchange = Exchange::new(Message::new("test"));
194        let result = cloned.oneshot(exchange).await.unwrap();
195        assert_eq!(result.input.body.as_text(), Some("test"));
196    }
197
198    #[tokio::test]
199    async fn test_box_processor_from_identity() {
200        let processor: BoxProcessor = tower::util::BoxCloneService::new(IdentityProcessor);
201
202        let exchange = Exchange::new(Message::new("boxed"));
203        let result = processor.oneshot(exchange).await.unwrap();
204        assert_eq!(result.input.body.as_text(), Some("boxed"));
205    }
206
207    #[tokio::test]
208    async fn test_box_processor_from_processor_fn() {
209        let processor: BoxProcessor =
210            tower::util::BoxCloneService::new(ProcessorFn::new(|mut ex: Exchange| async move {
211                ex.input.body = crate::body::Body::Text("via_box".into());
212                Ok(ex)
213            }));
214
215        let exchange = Exchange::new(Message::new("original"));
216        let result = processor.oneshot(exchange).await.unwrap();
217        assert_eq!(result.input.body.as_text(), Some("via_box"));
218    }
219
220    #[tokio::test]
221    async fn test_box_processor_ext_from_fn() {
222        let processor = BoxProcessor::from_fn(|mut ex: Exchange| async move {
223            ex.input.body = crate::body::Body::Text("via_from_fn".into());
224            Ok(ex)
225        });
226
227        let exchange = Exchange::new(Message::new("original"));
228        let result = processor.oneshot(exchange).await.unwrap();
229        assert_eq!(result.input.body.as_text(), Some("via_from_fn"));
230    }
231}