Skip to main content

camel_processor/
filter.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
9
10/// Tower Service implementing the Filter EIP.
11///
12/// If the predicate returns `true`, the exchange is forwarded through the sub-pipeline.
13/// If `false`, the exchange is returned as-is and the sub-pipeline is skipped entirely.
14#[derive(Clone)]
15pub struct FilterService {
16    predicate: FilterPredicate,
17    sub_pipeline: BoxProcessor,
18}
19
20impl FilterService {
21    /// Create from a closure predicate and a resolved sub-pipeline.
22    pub fn new(
23        predicate: impl Fn(&Exchange) -> bool + Send + Sync + 'static,
24        sub_pipeline: BoxProcessor,
25    ) -> Self {
26        Self {
27            predicate: Arc::new(predicate),
28            sub_pipeline,
29        }
30    }
31
32    /// Create from a pre-boxed `FilterPredicate` (used by `resolve_steps`).
33    pub fn from_predicate(predicate: FilterPredicate, sub_pipeline: BoxProcessor) -> Self {
34        Self {
35            predicate,
36            sub_pipeline,
37        }
38    }
39}
40
41impl Service<Exchange> for FilterService {
42    type Response = Exchange;
43    type Error = CamelError;
44    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
45
46    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
47        self.sub_pipeline.poll_ready(cx)
48    }
49
50    fn call(&mut self, exchange: Exchange) -> Self::Future {
51        if (self.predicate)(&exchange) {
52            let fut = self.sub_pipeline.call(exchange);
53            Box::pin(fut)
54        } else {
55            Box::pin(async move { Ok(exchange) })
56        }
57    }
58}
59
60// ── FilterSegment (ADR-0025 OutcomePipeline) ─────────────────────────────
61
62/// Outcome-aware structural EIP segment for the Filter pattern.
63///
64/// When the predicate passes, delegates to `body` (which can return
65/// `Completed`, `Stopped`, or `Failed`). When the predicate fails,
66/// returns `Completed(original_exchange)` — the exchange is returned
67/// as-is and the body is skipped entirely.
68///
69/// Unlike `FilterService` (which operates at the Tower layer and cannot
70/// preserve `Stopped(ex)` with mutations), `FilterSegment` operates at
71/// the `PipelineOutcome` layer and preserves the exchange at the Stop
72/// point including all mutations.
73pub struct FilterSegment {
74    pub predicate: camel_api::FilterPredicate,
75    pub body: camel_api::OutcomeSegment,
76}
77
78impl Clone for FilterSegment {
79    fn clone(&self) -> Self {
80        Self {
81            predicate: Arc::clone(&self.predicate),
82            body: self.body.clone(),
83        }
84    }
85}
86
87impl camel_api::OutcomePipeline for FilterSegment {
88    fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
89        Box::new(self.clone())
90    }
91
92    fn run<'a>(
93        &'a mut self,
94        exchange: camel_api::Exchange,
95    ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
96        Box::pin(async move {
97            if (self.predicate)(&exchange) {
98                self.body.run(exchange).await
99            } else {
100                camel_api::PipelineOutcome::Completed(exchange)
101            }
102        })
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109    use camel_api::{Body, BoxProcessorExt, Message, Value};
110    use tower::ServiceExt;
111
112    fn passthrough() -> BoxProcessor {
113        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
114    }
115
116    fn uppercase_body() -> BoxProcessor {
117        BoxProcessor::from_fn(|mut ex: Exchange| {
118            Box::pin(async move {
119                if let Body::Text(s) = &ex.input.body {
120                    ex.input.body = Body::Text(s.to_uppercase());
121                }
122                Ok(ex)
123            })
124        })
125    }
126
127    fn failing() -> BoxProcessor {
128        BoxProcessor::from_fn(|_ex| {
129            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
130        })
131    }
132
133    // 1. Matching exchange is forwarded to sub_pipeline.
134    #[tokio::test]
135    async fn test_filter_passes_matching_exchange() {
136        let mut svc = FilterService::new(
137            |ex: &Exchange| ex.input.header("active").is_some(),
138            uppercase_body(),
139        );
140        let mut ex = Exchange::new(Message::new("hello"));
141        ex.input.set_header("active", Value::Bool(true));
142        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
143        assert_eq!(result.input.body.as_text(), Some("HELLO"));
144    }
145
146    // 2. Non-matching exchange is returned as-is, sub_pipeline not called.
147    #[tokio::test]
148    async fn test_filter_blocks_non_matching_exchange() {
149        let mut svc = FilterService::new(
150            |ex: &Exchange| ex.input.header("active").is_some(),
151            uppercase_body(),
152        );
153        let ex = Exchange::new(Message::new("hello"));
154        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
155        // body unchanged — uppercase_body was NOT called
156        assert_eq!(result.input.body.as_text(), Some("hello"));
157    }
158
159    // 3. Result is the sub_pipeline's output, not the original exchange.
160    #[tokio::test]
161    async fn test_filter_sub_pipeline_transforms_body() {
162        let mut svc = FilterService::new(|_: &Exchange| true, uppercase_body());
163        let ex = Exchange::new(Message::new("world"));
164        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
165        assert_eq!(result.input.body.as_text(), Some("WORLD"));
166    }
167
168    // 4. Sub-pipeline errors propagate.
169    #[tokio::test]
170    async fn test_filter_sub_pipeline_error_propagates() {
171        let mut svc = FilterService::new(|_: &Exchange| true, failing());
172        let ex = Exchange::new(Message::new("x"));
173        let result = svc.ready().await.unwrap().call(ex).await;
174        assert!(result.is_err());
175        assert!(result.unwrap_err().to_string().contains("boom"));
176    }
177
178    // 5. Predicate receives the original exchange before sub_pipeline mutates it.
179    #[tokio::test]
180    async fn test_filter_predicate_receives_original_exchange() {
181        let mut svc = FilterService::new(
182            |ex: &Exchange| ex.input.body.as_text() == Some("check"),
183            uppercase_body(),
184        );
185        let ex = Exchange::new(Message::new("check"));
186        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
187        assert_eq!(result.input.body.as_text(), Some("CHECK"));
188    }
189
190    // 6. Cloned FilterService shares no mutable state (BoxProcessor clone is independent).
191    #[tokio::test]
192    async fn test_filter_clone_is_independent() {
193        let svc = FilterService::new(|_: &Exchange| true, passthrough());
194        let mut clone = svc.clone();
195        let ex = Exchange::new(Message::new("hi"));
196        let result = clone.ready().await.unwrap().call(ex).await.unwrap();
197        assert_eq!(result.input.body.as_text(), Some("hi"));
198    }
199}