Skip to main content

camel_processor/
choice.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use tower::Service;
7
8use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
9
10/// A single when-clause: a predicate + the sub-pipeline to execute when it matches.
11pub struct WhenClause {
12    pub predicate: FilterPredicate,
13    pub pipeline: BoxProcessor,
14}
15
16impl Clone for WhenClause {
17    fn clone(&self) -> Self {
18        Self {
19            predicate: self.predicate.clone(),
20            pipeline: self.pipeline.clone(),
21        }
22    }
23}
24
25/// Tower Service implementing the Choice EIP (Content-Based Router).
26///
27/// Evaluates `when` clauses in order. The first matching predicate routes the
28/// exchange through its sub-pipeline. If no predicate matches, the `otherwise`
29/// pipeline is used (if present); otherwise the exchange passes through unchanged.
30#[derive(Clone)]
31pub struct ChoiceService {
32    whens: Vec<WhenClause>,
33    otherwise: Option<BoxProcessor>,
34}
35
36impl ChoiceService {
37    /// Create a new `ChoiceService`.
38    ///
39    /// `whens` — ordered list of `(predicate, sub_pipeline)` pairs.
40    /// `otherwise` — optional fallback pipeline (executed when no `when` matches).
41    pub fn new(whens: Vec<WhenClause>, otherwise: Option<BoxProcessor>) -> Self {
42        Self { whens, otherwise }
43    }
44}
45
46impl Service<Exchange> for ChoiceService {
47    type Response = Exchange;
48    type Error = CamelError;
49    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
50
51    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
52        Poll::Ready(Ok(()))
53    }
54
55    fn call(&mut self, exchange: Exchange) -> Self::Future {
56        for when in &mut self.whens {
57            if (when.predicate)(&exchange) {
58                let fut = when.pipeline.call(exchange);
59                return Box::pin(fut);
60            }
61        }
62        if let Some(otherwise) = &mut self.otherwise {
63            let fut = otherwise.call(exchange);
64            return Box::pin(fut);
65        }
66        Box::pin(async move { Ok(exchange) })
67    }
68}
69
70// ── ChoiceSegment + WhenClauseSegment (ADR-0025 OutcomePipeline) ──────────
71
72/// Outcome-aware structural EIP segment for a single when clause.
73pub struct WhenClauseSegment {
74    pub predicate: camel_api::FilterPredicate,
75    pub body: camel_api::OutcomeSegment,
76}
77
78impl Clone for WhenClauseSegment {
79    fn clone(&self) -> Self {
80        Self {
81            predicate: Arc::clone(&self.predicate),
82            body: self.body.clone(),
83        }
84    }
85}
86
87/// Outcome-aware structural EIP segment for the Choice pattern.
88///
89/// Evaluates `when` clauses in order. The first matching predicate runs its
90/// `body` (which can return `Completed`, `Stopped`, or `Failed`). If no
91/// predicate matches, the `otherwise` segment runs (if present); otherwise
92/// returns `Completed(original_exchange)`.
93///
94/// Unlike `ChoiceService` (which operates at the Tower layer and cannot
95/// preserve `Stopped(ex)` with mutations), `ChoiceSegment` operates at the
96/// `PipelineOutcome` layer and preserves the exchange at the Stop point
97/// including all mutations.
98pub struct ChoiceSegment {
99    pub clauses: Vec<WhenClauseSegment>,
100    pub otherwise: Option<camel_api::OutcomeSegment>,
101}
102
103impl Clone for ChoiceSegment {
104    fn clone(&self) -> Self {
105        Self {
106            clauses: self
107                .clauses
108                .iter()
109                .map(|c| WhenClauseSegment {
110                    predicate: Arc::clone(&c.predicate),
111                    body: c.body.clone(),
112                })
113                .collect(),
114            otherwise: self.otherwise.clone(),
115        }
116    }
117}
118
119impl camel_api::OutcomePipeline for ChoiceSegment {
120    fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
121        Box::new(self.clone())
122    }
123
124    fn run<'a>(
125        &'a mut self,
126        exchange: camel_api::Exchange,
127    ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
128        Box::pin(async move {
129            for clause in self.clauses.iter_mut() {
130                if (clause.predicate)(&exchange) {
131                    return clause.body.run(exchange).await;
132                }
133            }
134            if let Some(otherwise) = self.otherwise.as_mut() {
135                otherwise.run(exchange).await
136            } else {
137                camel_api::PipelineOutcome::Completed(exchange)
138            }
139        })
140    }
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use camel_api::{Body, BoxProcessorExt, Message, Value};
147    use std::sync::Arc;
148    use tower::ServiceExt;
149
150    fn append_body(suffix: &'static str) -> BoxProcessor {
151        BoxProcessor::from_fn(move |mut ex: Exchange| {
152            Box::pin(async move {
153                if let Body::Text(s) = &ex.input.body {
154                    ex.input.body = Body::Text(format!("{s}{suffix}"));
155                }
156                Ok(ex)
157            })
158        })
159    }
160
161    fn failing() -> BoxProcessor {
162        BoxProcessor::from_fn(|_ex| {
163            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
164        })
165    }
166
167    fn pred_header(name: &'static str) -> FilterPredicate {
168        Arc::new(move |ex: &Exchange| ex.input.header(name).is_some())
169    }
170
171    // 1. First matching when executes its pipeline.
172    #[tokio::test]
173    async fn test_choice_first_when_matches() {
174        let whens = vec![
175            WhenClause {
176                predicate: pred_header("a"),
177                pipeline: append_body("-A"),
178            },
179            WhenClause {
180                predicate: pred_header("b"),
181                pipeline: append_body("-B"),
182            },
183        ];
184        let mut svc = ChoiceService::new(whens, None);
185        let mut ex = Exchange::new(Message::new("x"));
186        ex.input.set_header("a", Value::Bool(true));
187        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
188        assert_eq!(result.input.body.as_text(), Some("x-A"));
189    }
190
191    // 2. Second when executes when first does not match.
192    #[tokio::test]
193    async fn test_choice_second_when_matches() {
194        let whens = vec![
195            WhenClause {
196                predicate: pred_header("a"),
197                pipeline: append_body("-A"),
198            },
199            WhenClause {
200                predicate: pred_header("b"),
201                pipeline: append_body("-B"),
202            },
203        ];
204        let mut svc = ChoiceService::new(whens, None);
205        let mut ex = Exchange::new(Message::new("x"));
206        ex.input.set_header("b", Value::Bool(true));
207        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
208        assert_eq!(result.input.body.as_text(), Some("x-B"));
209    }
210
211    // 3. Only the FIRST matching when fires (short-circuit — both a and b present).
212    #[tokio::test]
213    async fn test_choice_short_circuits_at_first_match() {
214        let whens = vec![
215            WhenClause {
216                predicate: pred_header("a"),
217                pipeline: append_body("-A"),
218            },
219            WhenClause {
220                predicate: pred_header("b"),
221                pipeline: append_body("-B"),
222            },
223        ];
224        let mut svc = ChoiceService::new(whens, None);
225        let mut ex = Exchange::new(Message::new("x"));
226        ex.input.set_header("a", Value::Bool(true));
227        ex.input.set_header("b", Value::Bool(true));
228        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
229        assert_eq!(result.input.body.as_text(), Some("x-A"));
230    }
231
232    // 4. Otherwise executes when no when matches.
233    #[tokio::test]
234    async fn test_choice_otherwise_fires_when_no_when_matches() {
235        let whens = vec![WhenClause {
236            predicate: pred_header("a"),
237            pipeline: append_body("-A"),
238        }];
239        let mut svc = ChoiceService::new(whens, Some(append_body("-else")));
240        let ex = Exchange::new(Message::new("x"));
241        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
242        assert_eq!(result.input.body.as_text(), Some("x-else"));
243    }
244
245    // 5. No match and no otherwise → exchange passes unchanged.
246    #[tokio::test]
247    async fn test_choice_no_match_no_otherwise_passthrough() {
248        let whens = vec![WhenClause {
249            predicate: pred_header("a"),
250            pipeline: append_body("-A"),
251        }];
252        let mut svc = ChoiceService::new(whens, None);
253        let ex = Exchange::new(Message::new("untouched"));
254        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
255        assert_eq!(result.input.body.as_text(), Some("untouched"));
256    }
257
258    // 6. Errors in a matching when's pipeline propagate.
259    #[tokio::test]
260    async fn test_choice_error_in_when_propagates() {
261        let whens = vec![WhenClause {
262            predicate: pred_header("a"),
263            pipeline: failing(),
264        }];
265        let mut svc = ChoiceService::new(whens, None);
266        let mut ex = Exchange::new(Message::new("x"));
267        ex.input.set_header("a", Value::Bool(true));
268        let result = svc.ready().await.unwrap().call(ex).await;
269        assert!(result.is_err());
270        assert!(result.unwrap_err().to_string().contains("boom"));
271    }
272
273    // 7. Errors in otherwise pipeline propagate.
274    #[tokio::test]
275    async fn test_choice_error_in_otherwise_propagates() {
276        let mut svc = ChoiceService::new(vec![], Some(failing()));
277        let ex = Exchange::new(Message::new("x"));
278        let result = svc.ready().await.unwrap().call(ex).await;
279        assert!(result.is_err());
280    }
281}