Skip to main content

camel_processor/
choice.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
8
9/// A single when-clause: a predicate + the sub-pipeline to execute when it matches.
10pub struct WhenClause {
11    pub predicate: FilterPredicate,
12    pub pipeline: BoxProcessor,
13}
14
15impl Clone for WhenClause {
16    fn clone(&self) -> Self {
17        Self {
18            predicate: self.predicate.clone(),
19            pipeline: self.pipeline.clone(),
20        }
21    }
22}
23
24/// Tower Service implementing the Choice EIP (Content-Based Router).
25///
26/// Evaluates `when` clauses in order. The first matching predicate routes the
27/// exchange through its sub-pipeline. If no predicate matches, the `otherwise`
28/// pipeline is used (if present); otherwise the exchange passes through unchanged.
29#[derive(Clone)]
30pub struct ChoiceService {
31    whens: Vec<WhenClause>,
32    otherwise: Option<BoxProcessor>,
33}
34
35impl ChoiceService {
36    /// Create a new `ChoiceService`.
37    ///
38    /// `whens` — ordered list of `(predicate, sub_pipeline)` pairs.
39    /// `otherwise` — optional fallback pipeline (executed when no `when` matches).
40    pub fn new(whens: Vec<WhenClause>, otherwise: Option<BoxProcessor>) -> Self {
41        Self { whens, otherwise }
42    }
43}
44
45impl Service<Exchange> for ChoiceService {
46    type Response = Exchange;
47    type Error = CamelError;
48    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
49
50    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
51        Poll::Ready(Ok(()))
52    }
53
54    fn call(&mut self, exchange: Exchange) -> Self::Future {
55        for when in &mut self.whens {
56            if (when.predicate)(&exchange) {
57                let fut = when.pipeline.call(exchange);
58                return Box::pin(fut);
59            }
60        }
61        if let Some(otherwise) = &mut self.otherwise {
62            let fut = otherwise.call(exchange);
63            return Box::pin(fut);
64        }
65        Box::pin(async move { Ok(exchange) })
66    }
67}
68
69#[cfg(test)]
70mod tests {
71    use super::*;
72    use camel_api::{Body, BoxProcessorExt, Message, Value};
73    use std::sync::Arc;
74    use tower::ServiceExt;
75
76    fn append_body(suffix: &'static str) -> BoxProcessor {
77        BoxProcessor::from_fn(move |mut ex: Exchange| {
78            Box::pin(async move {
79                if let Body::Text(s) = &ex.input.body {
80                    ex.input.body = Body::Text(format!("{s}{suffix}"));
81                }
82                Ok(ex)
83            })
84        })
85    }
86
87    fn failing() -> BoxProcessor {
88        BoxProcessor::from_fn(|_ex| {
89            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
90        })
91    }
92
93    fn pred_header(name: &'static str) -> FilterPredicate {
94        Arc::new(move |ex: &Exchange| ex.input.header(name).is_some())
95    }
96
97    // 1. First matching when executes its pipeline.
98    #[tokio::test]
99    async fn test_choice_first_when_matches() {
100        let whens = vec![
101            WhenClause {
102                predicate: pred_header("a"),
103                pipeline: append_body("-A"),
104            },
105            WhenClause {
106                predicate: pred_header("b"),
107                pipeline: append_body("-B"),
108            },
109        ];
110        let mut svc = ChoiceService::new(whens, None);
111        let mut ex = Exchange::new(Message::new("x"));
112        ex.input.set_header("a", Value::Bool(true));
113        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
114        assert_eq!(result.input.body.as_text(), Some("x-A"));
115    }
116
117    // 2. Second when executes when first does not match.
118    #[tokio::test]
119    async fn test_choice_second_when_matches() {
120        let whens = vec![
121            WhenClause {
122                predicate: pred_header("a"),
123                pipeline: append_body("-A"),
124            },
125            WhenClause {
126                predicate: pred_header("b"),
127                pipeline: append_body("-B"),
128            },
129        ];
130        let mut svc = ChoiceService::new(whens, None);
131        let mut ex = Exchange::new(Message::new("x"));
132        ex.input.set_header("b", Value::Bool(true));
133        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
134        assert_eq!(result.input.body.as_text(), Some("x-B"));
135    }
136
137    // 3. Only the FIRST matching when fires (short-circuit — both a and b present).
138    #[tokio::test]
139    async fn test_choice_short_circuits_at_first_match() {
140        let whens = vec![
141            WhenClause {
142                predicate: pred_header("a"),
143                pipeline: append_body("-A"),
144            },
145            WhenClause {
146                predicate: pred_header("b"),
147                pipeline: append_body("-B"),
148            },
149        ];
150        let mut svc = ChoiceService::new(whens, None);
151        let mut ex = Exchange::new(Message::new("x"));
152        ex.input.set_header("a", Value::Bool(true));
153        ex.input.set_header("b", Value::Bool(true));
154        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
155        assert_eq!(result.input.body.as_text(), Some("x-A"));
156    }
157
158    // 4. Otherwise executes when no when matches.
159    #[tokio::test]
160    async fn test_choice_otherwise_fires_when_no_when_matches() {
161        let whens = vec![WhenClause {
162            predicate: pred_header("a"),
163            pipeline: append_body("-A"),
164        }];
165        let mut svc = ChoiceService::new(whens, Some(append_body("-else")));
166        let ex = Exchange::new(Message::new("x"));
167        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
168        assert_eq!(result.input.body.as_text(), Some("x-else"));
169    }
170
171    // 5. No match and no otherwise → exchange passes unchanged.
172    #[tokio::test]
173    async fn test_choice_no_match_no_otherwise_passthrough() {
174        let whens = vec![WhenClause {
175            predicate: pred_header("a"),
176            pipeline: append_body("-A"),
177        }];
178        let mut svc = ChoiceService::new(whens, None);
179        let ex = Exchange::new(Message::new("untouched"));
180        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
181        assert_eq!(result.input.body.as_text(), Some("untouched"));
182    }
183
184    // 6. Errors in a matching when's pipeline propagate.
185    #[tokio::test]
186    async fn test_choice_error_in_when_propagates() {
187        let whens = vec![WhenClause {
188            predicate: pred_header("a"),
189            pipeline: failing(),
190        }];
191        let mut svc = ChoiceService::new(whens, None);
192        let mut ex = Exchange::new(Message::new("x"));
193        ex.input.set_header("a", Value::Bool(true));
194        let result = svc.ready().await.unwrap().call(ex).await;
195        assert!(result.is_err());
196        assert!(result.unwrap_err().to_string().contains("boom"));
197    }
198
199    // 7. Errors in otherwise pipeline propagate.
200    #[tokio::test]
201    async fn test_choice_error_in_otherwise_propagates() {
202        let mut svc = ChoiceService::new(vec![], Some(failing()));
203        let ex = Exchange::new(Message::new("x"));
204        let result = svc.ready().await.unwrap().call(ex).await;
205        assert!(result.is_err());
206    }
207}