Skip to main content

camel_processor/
do_try_segment.rs

1//! ## Stop semantics (ADR-0025)
2//!
3//! This segment implements `OutcomePipeline` and propagates `PipelineOutcome::Stopped(ex)`
4//! with the exchange state intact. See ADR-0025 §3.
5
6use crate::do_try::CatchMatcher;
7use camel_api::error_handler::ExceptionDisposition;
8use camel_api::outcome_pipeline::OutcomePipeline;
9use camel_api::pipeline_outcome::PipelineOutcome;
10use camel_api::{CamelError, Exchange, FilterPredicate};
11use std::future::Future;
12use std::pin::Pin;
13
14/// - `Handled`: the exchange continues through finally and downstream.
15/// - `Propagate`: the catch body runs for side effects, finally runs with the
16///   catch body's exchange, and the original try-error re-throws as `Failed`.
17#[derive(Clone)]
18pub struct CatchClauseSegment {
19    pub matcher: CatchMatcher,
20    pub on_when: Option<FilterPredicate>,
21    pub body: camel_api::OutcomeSegment,
22    pub disposition: ExceptionDisposition,
23}
24
25/// Compilable segment for a `doFinally` clause within a `DoTrySegment`.
26#[derive(Clone)]
27pub struct FinallyClauseSegment {
28    pub on_when: Option<FilterPredicate>,
29    pub body: camel_api::OutcomeSegment,
30}
31
32/// Outcome-aware structural EIP segment for the doTry/doCatch/doFinally pattern.
33///
34/// Operates at the `PipelineOutcome` layer so that `Stopped(ex)` from a
35/// sub-step (e.g. Stop EIP) is preserved with the exchange including all
36/// mutations. See ADR-0025 §5.1 for full semantics.
37pub struct DoTrySegment {
38    pub try_body: camel_api::OutcomeSegment,
39    pub catches: Vec<CatchClauseSegment>,
40    pub finally: Option<FinallyClauseSegment>,
41}
42
43impl Clone for DoTrySegment {
44    fn clone(&self) -> Self {
45        Self {
46            try_body: self.try_body.clone(),
47            catches: self.catches.clone(),
48            finally: self.finally.clone(),
49        }
50    }
51}
52
53/// Narrow outcome type for `run_finally_body` so the compiler enforces
54/// exhaustiveness at the two call sites instead of `unreachable!()`.
55/// This is a private, transient type — the large-enum-variant lint fires
56/// because Exchange is a fat struct, but boxing it here would just add
57/// allocation churn in a hot path where the outcome is immediately consumed
58/// at the call site.
59#[allow(clippy::large_enum_variant)]
60enum FinallyOutcome {
61    Stopped(Exchange),
62    Failed(CamelError),
63}
64
65/// Run the finally body if present and on_when permits.
66/// Free function (not a method) to avoid borrow conflict with
67/// `self.catches.iter_mut()` inside the catch loop.
68async fn run_finally_body(
69    finally: &mut Option<FinallyClauseSegment>,
70    ex: Exchange,
71) -> Result<Exchange, FinallyOutcome> {
72    let Some(f) = finally.as_mut() else {
73        return Ok(ex);
74    };
75    if !f.on_when.as_ref().map(|p| p(&ex)).unwrap_or(true) {
76        return Ok(ex);
77    }
78    match f.body.run(ex).await {
79        PipelineOutcome::Completed(e) => Ok(e),
80        PipelineOutcome::Stopped(e) => Err(FinallyOutcome::Stopped(e)),
81        PipelineOutcome::Failed(e) => Err(FinallyOutcome::Failed(e)),
82    }
83}
84
85impl OutcomePipeline for DoTrySegment {
86    fn clone_box(&self) -> Box<dyn OutcomePipeline> {
87        Box::new(self.clone())
88    }
89
90    fn run<'a>(
91        &'a mut self,
92        exchange: Exchange,
93    ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
94        Box::pin(async move {
95            // Pre-clone for the "no catch matched" path; `exchange` itself is
96            // consumed by `try_body.run(...)` below.
97            let exchange_for_unmatched = exchange.clone();
98
99            // 1. Run try_body. If Stopped, return immediately (skip catch AND finally).
100            let try_outcome = self.try_body.run(exchange).await;
101            let returned_ex = match try_outcome {
102                PipelineOutcome::Stopped(ex) => return PipelineOutcome::Stopped(ex),
103                PipelineOutcome::Completed(ex) => ex,
104                PipelineOutcome::Failed(err) => {
105                    // 2. try failed — try each catch in order. Walk the catch
106                    // chain starting from the failed try-exchange. On the first
107                    // matching catch whose `on_when` is true (or unset), run it.
108                    let mut current_ex = exchange_for_unmatched;
109                    current_ex.set_error(err.clone());
110                    for catch in self.catches.iter_mut() {
111                        if catch.matcher.matches(&err, &current_ex)
112                            && catch
113                                .on_when
114                                .as_ref()
115                                .map(|p| p(&current_ex))
116                                .unwrap_or(true)
117                        {
118                            match catch.body.run(current_ex).await {
119                                // Stop in catch: skip finally AND outer route halts.
120                                PipelineOutcome::Stopped(stopped_ex) => {
121                                    return PipelineOutcome::Stopped(stopped_ex);
122                                }
123                                // Catch handled — exit catch chain with this exchange.
124                                PipelineOutcome::Completed(next) => {
125                                    match catch.disposition {
126                                        ExceptionDisposition::Propagate => {
127                                            // Run finally with the exchange from the catch body,
128                                            // then propagate the original try error.
129                                            match run_finally_body(&mut self.finally, next).await {
130                                                Ok(_) => {}
131                                                Err(FinallyOutcome::Stopped(e)) => {
132                                                    return PipelineOutcome::Stopped(e);
133                                                }
134                                                Err(FinallyOutcome::Failed(_finally_err)) => {
135                                                    tracing::warn!(
136                                                        error = %err,
137                                                        "doFinally threw during Propagate; \
138                                                         restoring original"
139                                                    );
140                                                    return PipelineOutcome::Failed(err);
141                                                }
142                                            }
143                                            return PipelineOutcome::Failed(err);
144                                        }
145                                        ExceptionDisposition::Handled => {
146                                            current_ex = next;
147                                            break;
148                                        }
149                                        ExceptionDisposition::Continued => {
150                                            current_ex = next;
151                                            break;
152                                        }
153                                    }
154                                }
155                                // Catch-body Failed: surface THAT error to outer
156                                // route. Per ADR-0025 invariant #4 ("doTry is a
157                                // local error-handler island"), a failing catch
158                                // body propagates as Failed — it does NOT re-enter
159                                // the catch chain (no recursive catch-of-catch in
160                                // Camel). Skip remaining catches and finally.
161                                PipelineOutcome::Failed(catch_err) => {
162                                    return PipelineOutcome::Failed(catch_err);
163                                }
164                            }
165                        }
166                    }
167                    current_ex
168                }
169            };
170            // 3. Run finally if present (skip if try/catch returned Stopped —
171            // already returned above; skip if catch-body Failed — also returned).
172            match run_finally_body(&mut self.finally, returned_ex).await {
173                Ok(ex) => PipelineOutcome::Completed(ex),
174                Err(FinallyOutcome::Stopped(e)) => PipelineOutcome::Stopped(e),
175                Err(FinallyOutcome::Failed(finally_err)) => {
176                    tracing::warn!(
177                        error = %finally_err,
178                        "doFinally threw during/after catch; surfacing finally error"
179                    );
180                    PipelineOutcome::Failed(finally_err)
181                }
182            }
183        })
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190    use crate::do_try::CatchMatcher;
191    use camel_api::pipeline_outcome::PipelineOutcome;
192    use std::sync::Arc;
193    use std::sync::atomic::{AtomicU32, Ordering};
194
195    // ── Helpers for DoTrySegment tests ──
196
197    struct CompleteSegment;
198
199    impl OutcomePipeline for CompleteSegment {
200        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
201            Box::new(CompleteSegment)
202        }
203        fn run<'a>(
204            &'a mut self,
205            exchange: Exchange,
206        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
207            Box::pin(async move { PipelineOutcome::Completed(exchange) })
208        }
209    }
210
211    fn seg_complete() -> camel_api::OutcomeSegment {
212        camel_api::OutcomeSegment::new(Box::new(CompleteSegment))
213    }
214
215    struct FailSegment(CamelError);
216
217    impl OutcomePipeline for FailSegment {
218        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
219            Box::new(FailSegment(self.0.clone()))
220        }
221        fn run<'a>(
222            &'a mut self,
223            _exchange: Exchange,
224        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
225            let e = self.0.clone();
226            Box::pin(async move { PipelineOutcome::Failed(e) })
227        }
228    }
229
230    fn seg_fail(err: CamelError) -> camel_api::OutcomeSegment {
231        camel_api::OutcomeSegment::new(Box::new(FailSegment(err)))
232    }
233
234    struct MutateThenStop {
235        mutator: Arc<dyn Fn(&mut Exchange) + Send + Sync>,
236    }
237
238    impl OutcomePipeline for MutateThenStop {
239        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
240            Box::new(MutateThenStop {
241                mutator: Arc::clone(&self.mutator),
242            })
243        }
244        fn run<'a>(
245            &'a mut self,
246            mut exchange: Exchange,
247        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
248            let m = Arc::clone(&self.mutator);
249            Box::pin(async move {
250                m(&mut exchange);
251                PipelineOutcome::Stopped(exchange)
252            })
253        }
254    }
255
256    fn seg_stop_with(
257        mutator: impl Fn(&mut Exchange) + Send + Sync + 'static,
258    ) -> camel_api::OutcomeSegment {
259        camel_api::OutcomeSegment::new(Box::new(MutateThenStop {
260            mutator: Arc::new(mutator),
261        }))
262    }
263
264    struct RecordCall {
265        counter: Arc<AtomicU32>,
266    }
267
268    impl OutcomePipeline for RecordCall {
269        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
270            Box::new(RecordCall {
271                counter: Arc::clone(&self.counter),
272            })
273        }
274        fn run<'a>(
275            &'a mut self,
276            exchange: Exchange,
277        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
278            let c = Arc::clone(&self.counter);
279            Box::pin(async move {
280                c.fetch_add(1, Ordering::SeqCst);
281                PipelineOutcome::Completed(exchange)
282            })
283        }
284    }
285
286    fn seg_record(counter: Arc<AtomicU32>) -> camel_api::OutcomeSegment {
287        camel_api::OutcomeSegment::new(Box::new(RecordCall { counter }))
288    }
289
290    // ── 5 New tests for DoTrySegment (ADR-0025) ──
291
292    #[tokio::test]
293    async fn stop_inside_try_skips_catch_and_finally() {
294        let catch_call = Arc::new(AtomicU32::new(0));
295        let finally_call = Arc::new(AtomicU32::new(0));
296
297        let mut seg = DoTrySegment {
298            try_body: seg_stop_with(|ex| {
299                ex.set_property("mutated", camel_api::Value::Bool(true));
300            }),
301            catches: vec![CatchClauseSegment {
302                matcher: CatchMatcher::ByVariant(vec!["*".into()]),
303                on_when: None,
304                body: seg_record(catch_call.clone()),
305                disposition: ExceptionDisposition::Handled,
306            }],
307            finally: Some(FinallyClauseSegment {
308                on_when: None,
309                body: seg_record(finally_call.clone()),
310            }),
311        };
312
313        let result = seg.run(Exchange::default()).await;
314        match result {
315            PipelineOutcome::Stopped(ex) => {
316                assert_eq!(
317                    ex.properties.get("mutated"),
318                    Some(&camel_api::Value::Bool(true)),
319                    "try body mutation must be preserved in Stopped exchange"
320                );
321            }
322            other => panic!("expected Stopped, got {:?}", other),
323        }
324        assert_eq!(
325            catch_call.load(Ordering::SeqCst),
326            0,
327            "catch must NOT run when try stops"
328        );
329        assert_eq!(
330            finally_call.load(Ordering::SeqCst),
331            0,
332            "finally must NOT run when try stops"
333        );
334    }
335
336    #[tokio::test]
337    async fn stop_inside_catch_skips_finally() {
338        let finally_call = Arc::new(AtomicU32::new(0));
339
340        let mut seg = DoTrySegment {
341            try_body: seg_fail(CamelError::ProcessorError("boom".into())),
342            catches: vec![CatchClauseSegment {
343                matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
344                on_when: None,
345                body: seg_stop_with(|ex| {
346                    ex.set_property("catch_mutated", camel_api::Value::Bool(true));
347                }),
348                disposition: ExceptionDisposition::Handled,
349            }],
350            finally: Some(FinallyClauseSegment {
351                on_when: None,
352                body: seg_record(finally_call.clone()),
353            }),
354        };
355
356        let result = seg.run(Exchange::default()).await;
357        match result {
358            PipelineOutcome::Stopped(ex) => {
359                assert_eq!(
360                    ex.properties.get("catch_mutated"),
361                    Some(&camel_api::Value::Bool(true)),
362                    "catch body mutation must be preserved in Stopped exchange"
363                );
364            }
365            other => panic!("expected Stopped, got {:?}", other),
366        }
367        assert_eq!(
368            finally_call.load(Ordering::SeqCst),
369            0,
370            "finally must NOT run when catch stops"
371        );
372    }
373
374    #[tokio::test]
375    async fn stop_inside_finally_stops_outer_route() {
376        let mut seg = DoTrySegment {
377            try_body: seg_complete(),
378            catches: vec![],
379            finally: Some(FinallyClauseSegment {
380                on_when: None,
381                body: seg_stop_with(|ex| {
382                    ex.set_property("finally_mutated", camel_api::Value::Bool(true));
383                }),
384            }),
385        };
386
387        let result = seg.run(Exchange::default()).await;
388        match result {
389            PipelineOutcome::Stopped(ex) => {
390                assert_eq!(
391                    ex.properties.get("finally_mutated"),
392                    Some(&camel_api::Value::Bool(true)),
393                    "finally body mutation must be preserved in Stopped exchange"
394                );
395            }
396            other => panic!("expected Stopped, got {:?}", other),
397        }
398    }
399
400    #[tokio::test]
401    async fn catch_on_when_false_falls_through_to_next_catch() {
402        let first_call = Arc::new(AtomicU32::new(0));
403        let second_call = Arc::new(AtomicU32::new(0));
404
405        let mut seg = DoTrySegment {
406            try_body: seg_fail(CamelError::Io("disk err".into())),
407            catches: vec![
408                CatchClauseSegment {
409                    matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
410                    on_when: Some(Arc::new(|_ex| false)),
411                    body: seg_record(first_call.clone()),
412                    disposition: ExceptionDisposition::Handled,
413                },
414                CatchClauseSegment {
415                    matcher: CatchMatcher::ByVariant(vec!["*".into()]),
416                    on_when: None,
417                    body: seg_record(second_call.clone()),
418                    disposition: ExceptionDisposition::Handled,
419                },
420            ],
421            finally: None,
422        };
423
424        let result = seg.run(Exchange::default()).await;
425        assert!(
426            matches!(result, PipelineOutcome::Completed(_)),
427            "expected Completed after second catch"
428        );
429        assert_eq!(
430            first_call.load(Ordering::SeqCst),
431            0,
432            "first catch must NOT fire (on_when=false)"
433        );
434        assert_eq!(
435            second_call.load(Ordering::SeqCst),
436            1,
437            "second catch must fire"
438        );
439    }
440
441    #[tokio::test]
442    async fn finally_on_when_false_skips_finally_entirely() {
443        let finally_call = Arc::new(AtomicU32::new(0));
444        let mut ex = Exchange::default();
445        ex.set_property("try_set", camel_api::Value::Bool(true));
446
447        let mut seg = DoTrySegment {
448            try_body: seg_complete(),
449            catches: vec![],
450            finally: Some(FinallyClauseSegment {
451                on_when: Some(Arc::new(|_ex| false)),
452                body: seg_record(finally_call.clone()),
453            }),
454        };
455
456        let result = seg.run(ex).await;
457        match result {
458            PipelineOutcome::Completed(ex) => {
459                assert_eq!(
460                    ex.properties.get("try_set"),
461                    Some(&camel_api::Value::Bool(true)),
462                    "exchange state from try must be preserved"
463                );
464            }
465            other => panic!("expected Completed, got {:?}", other),
466        }
467        assert_eq!(
468            finally_call.load(Ordering::SeqCst),
469            0,
470            "finally must NOT run when on_when=false"
471        );
472    }
473}