Skip to main content

camel_processor/
filter.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
9
10/// Tower Service implementing the Filter EIP.
11///
12/// If the predicate returns `true`, the exchange is forwarded through the sub-pipeline.
13/// If `false`, the exchange is returned as-is and the sub-pipeline is skipped entirely.
14#[derive(Clone)]
15pub struct FilterService {
16    predicate: FilterPredicate,
17    sub_pipeline: BoxProcessor,
18}
19
20impl FilterService {
21    /// Create from a closure predicate and a resolved sub-pipeline.
22    pub fn new(
23        predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
24        sub_pipeline: BoxProcessor,
25    ) -> Self {
26        Self {
27            predicate: Arc::new(predicate),
28            sub_pipeline,
29        }
30    }
31
32    /// Create from a pre-boxed `FilterPredicate` (used by `resolve_steps`).
33    pub fn from_predicate(predicate: FilterPredicate, sub_pipeline: BoxProcessor) -> Self {
34        Self {
35            predicate,
36            sub_pipeline,
37        }
38    }
39}
40
41impl Service<Exchange> for FilterService {
42    type Response = Exchange;
43    type Error = CamelError;
44    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
45
46    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47        self.sub_pipeline.poll_ready(cx)
48    }
49
50    fn call(&mut self, exchange: Exchange) -> Self::Future {
51        if (self.predicate)(&exchange) {
52            let fut = self.sub_pipeline.call(exchange);
53            Box::pin(fut)
54        } else {
55            Box::pin(async move { Ok(exchange) })
56        }
57    }
58}
59
60#[cfg(test)]
61mod tests {
62    use super::*;
63    use camel_api::{Body, BoxProcessorExt, Message, Value};
64    use tower::ServiceExt;
65
66    fn passthrough() -> BoxProcessor {
67        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
68    }
69
70    fn uppercase_body() -> BoxProcessor {
71        BoxProcessor::from_fn(|mut ex: Exchange| {
72            Box::pin(async move {
73                if let Body::Text(s) = &ex.input.body {
74                    ex.input.body = Body::Text(s.to_uppercase());
75                }
76                Ok(ex)
77            })
78        })
79    }
80
81    fn failing() -> BoxProcessor {
82        BoxProcessor::from_fn(|_ex| {
83            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
84        })
85    }
86
87    // 1. Matching exchange is forwarded to sub_pipeline.
88    #[tokio::test]
89    async fn test_filter_passes_matching_exchange() {
90        let mut svc = FilterService::new(
91            |ex: &Exchange| ex.input.header("active").is_some(),
92            uppercase_body(),
93        );
94        let mut ex = Exchange::new(Message::new("hello"));
95        ex.input.set_header("active", Value::Bool(true));
96        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
97        assert_eq!(result.input.body.as_text(), Some("HELLO"));
98    }
99
100    // 2. Non-matching exchange is returned as-is, sub_pipeline not called.
101    #[tokio::test]
102    async fn test_filter_blocks_non_matching_exchange() {
103        let mut svc = FilterService::new(
104            |ex: &Exchange| ex.input.header("active").is_some(),
105            uppercase_body(),
106        );
107        let ex = Exchange::new(Message::new("hello"));
108        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
109        // body unchanged — uppercase_body was NOT called
110        assert_eq!(result.input.body.as_text(), Some("hello"));
111    }
112
113    // 3. Result is the sub_pipeline's output, not the original exchange.
114    #[tokio::test]
115    async fn test_filter_sub_pipeline_transforms_body() {
116        let mut svc = FilterService::new(|_: &Exchange| true, uppercase_body());
117        let ex = Exchange::new(Message::new("world"));
118        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
119        assert_eq!(result.input.body.as_text(), Some("WORLD"));
120    }
121
122    // 4. Sub-pipeline errors propagate.
123    #[tokio::test]
124    async fn test_filter_sub_pipeline_error_propagates() {
125        let mut svc = FilterService::new(|_: &Exchange| true, failing());
126        let ex = Exchange::new(Message::new("x"));
127        let result = svc.ready().await.unwrap().call(ex).await;
128        assert!(result.is_err());
129        assert!(result.unwrap_err().to_string().contains("boom"));
130    }
131
132    // 5. Predicate receives the original exchange before sub_pipeline mutates it.
133    #[tokio::test]
134    async fn test_filter_predicate_receives_original_exchange() {
135        let mut svc = FilterService::new(
136            |ex: &Exchange| ex.input.body.as_text() == Some("check"),
137            uppercase_body(),
138        );
139        let ex = Exchange::new(Message::new("check"));
140        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
141        assert_eq!(result.input.body.as_text(), Some("CHECK"));
142    }
143
144    // 6. Cloned FilterService shares no mutable state (BoxProcessor clone is independent).
145    #[tokio::test]
146    async fn test_filter_clone_is_independent() {
147        let svc = FilterService::new(|_: &Exchange| true, passthrough());
148        let mut clone = svc.clone();
149        let ex = Exchange::new(Message::new("hi"));
150        let result = clone.ready().await.unwrap().call(ex).await.unwrap();
151        assert_eq!(result.input.body.as_text(), Some("hi"));
152    }
153}