Skip to main content

camel_processor/
do_try.rs

1//! ## Stop semantics (ADR-0025)
2//!
3//! This segment implements `OutcomePipeline` and propagates `PipelineOutcome::Stopped(ex)` with the exchange state intact (including mutations made inside the segment body before Stop fired). See ADR-0025 §3 (stopped-exchange-state-preservation invariant).
4
5use camel_api::error_handler::ExceptionDisposition;
6use camel_api::exchange::PROPERTY_EXCEPTION_HANDLED;
7use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
8use tower::Service;
9use tower::ServiceExt;
10
11/// Matcher for a `doCatch` clause.
12#[derive(Clone)]
13pub enum CatchMatcher {
14    /// Match by CamelError variant name (e.g. ["ProcessorError", "Io"]).
15    /// `"*"` matches any variant — equivalent to Camel's `doCatch(Throwable.class)`.
16    ByVariant(Vec<String>),
17    /// Match by predicate over the Exchange.
18    Predicate(FilterPredicate),
19}
20
21impl CatchMatcher {
22    /// Returns `true` if this matcher matches the given error and exchange.
23    pub fn matches(&self, err: &CamelError, ex: &Exchange) -> bool {
24        match self {
25            CatchMatcher::ByVariant(names) => {
26                if names.iter().any(|n| n == "*") {
27                    return true;
28                }
29                names.iter().any(|n| n == err.variant_name())
30            }
31            CatchMatcher::Predicate(p) => p(ex),
32        }
33    }
34}
35
36/// A single `doCatch` clause.
37#[derive(Clone)]
38pub struct CatchClause {
39    /// The main matcher (variant-name list or predicate).
40    pub matcher: CatchMatcher,
41    /// Optional sub-predicate evaluated AFTER the main matcher passes.
42    pub on_when: Option<FilterPredicate>,
43    /// Sub-pipeline executed when the clause matches.
44    pub steps: Vec<BoxProcessor>,
45    /// ADR-0019 disposition: Handled (default), Propagate, or Continued (rejected at parse time).
46    /// In YAML, use lowercase: `handled`, `propagate`, `continued`.
47    pub disposition: ExceptionDisposition,
48}
49
50/// The `doTry` processor. Wrap with `BoxProcessor::new(DoTryService::new(...))`.
51#[derive(Clone)]
52pub struct DoTryService {
53    /// Steps in the try block.
54    pub try_steps: Vec<BoxProcessor>,
55    /// Catch clauses evaluated first-match-wins.
56    pub catch_clauses: Vec<CatchClause>,
57    /// Steps in the finally block (empty = no finally).
58    pub finally_steps: Vec<BoxProcessor>,
59    /// Optional onWhen predicate for finally.
60    pub finally_on_when: Option<FilterPredicate>,
61}
62
63impl DoTryService {
64    /// Create a new `DoTryService` with the given try steps.
65    pub fn new(try_steps: Vec<BoxProcessor>) -> Self {
66        Self {
67            try_steps,
68            catch_clauses: Vec::new(),
69            finally_steps: Vec::new(),
70            finally_on_when: None,
71        }
72    }
73
74    /// Full constructor used by the compile pipeline (Task 10b control_flow.rs).
75    /// Builder API (Task 8) constructs via `new()` + field mutation.
76    pub fn with_catch_and_finally(
77        try_steps: Vec<BoxProcessor>,
78        catch_clauses: Vec<CatchClause>,
79        finally_steps: Vec<BoxProcessor>,
80        finally_on_when: Option<FilterPredicate>,
81    ) -> Self {
82        Self {
83            try_steps,
84            catch_clauses,
85            finally_steps,
86            finally_on_when,
87        }
88    }
89}
90
91/// Run a sequence of steps, preserving the last exchange state on error.
92/// Returns `Err((last_ex, err))` so DoTry can populate exception properties.
93async fn run_pipeline(
94    steps: Vec<BoxProcessor>,
95    mut ex: Exchange,
96) -> Result<Exchange, (Exchange, CamelError)> {
97    for mut svc in steps {
98        match svc.ready().await {
99            Ok(ready) => {
100                let snapshot = ex.clone();
101                match ready.call(ex).await {
102                    Ok(new_ex) => ex = new_ex,
103                    Err(err) => return Err((snapshot, err)),
104                }
105            }
106            Err(err) => return Err((ex, err)),
107        }
108    }
109    Ok(ex)
110}
111
112/// Run the finally block. Camel parity for finally-throws:
113/// - If finally succeeds: return its exchange.
114/// - If finally throws AND there was a previous error: restore previous (log finally_err).
115/// - If finally throws AND no previous error: propagate finally_err.
116async fn run_finally(
117    finally_steps: Vec<BoxProcessor>,
118    finally_on_when: Option<FilterPredicate>,
119    ex: Exchange,
120    previous_err: Option<CamelError>,
121) -> Result<Exchange, CamelError> {
122    if finally_steps.is_empty() {
123        return Ok(ex);
124    }
125    if let Some(on_when) = &finally_on_when
126        && !on_when(&ex)
127    {
128        return Ok(ex);
129    }
130    match run_pipeline(finally_steps, ex).await {
131        Ok(ex) => Ok(ex),
132        Err((_, finally_err)) => match previous_err {
133            Some(prev) => {
134                tracing::warn!(
135                    finally_error = %finally_err,
136                    previous_error = %prev,
137                    "doFinally threw; restoring previous exception (Camel parity)"
138                );
139                Err(prev)
140            }
141            None => {
142                tracing::warn!(error = %finally_err, "doFinally threw");
143                Err(finally_err)
144            }
145        },
146    }
147}
148
149impl tower::Service<Exchange> for DoTryService {
150    type Response = Exchange;
151    type Error = CamelError;
152    type Future = std::pin::Pin<
153        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
154    >;
155
156    fn poll_ready(
157        &mut self,
158        _cx: &mut std::task::Context<'_>,
159    ) -> std::task::Poll<Result<(), Self::Error>> {
160        std::task::Poll::Ready(Ok(()))
161    }
162
163    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
164        // Clear stale CamelExceptionHandled marker from prior handlers in the same route.
165        // Exchange::clear_error() does NOT touch HANDLED, so direct map access is required.
166        exchange.properties.remove(PROPERTY_EXCEPTION_HANDLED);
167
168        let try_steps = self.try_steps.clone();
169        let catch_clauses = self.catch_clauses.clone();
170        let finally_steps = self.finally_steps.clone();
171        let finally_on_when = self.finally_on_when.clone();
172
173        Box::pin(async move {
174            let try_result = run_pipeline(try_steps, exchange).await;
175            match try_result {
176                Ok(ex) => run_finally(finally_steps, finally_on_when, ex, None).await,
177                Err((failed_ex, original_err)) => {
178                    let mut ex = failed_ex;
179                    ex.set_error(original_err.clone());
180
181                    for clause in catch_clauses {
182                        let CatchClause {
183                            matcher,
184                            on_when,
185                            steps,
186                            disposition,
187                        } = clause;
188                        if !matcher.matches(&original_err, &ex) {
189                            continue;
190                        }
191                        if let Some(ref on_when) = on_when
192                            && !on_when(&ex)
193                        {
194                            continue;
195                        }
196
197                        let catch_result = run_pipeline(steps, ex.clone()).await;
198
199                        return match catch_result {
200                            Ok(ok_ex) => {
201                                // Determine previous-error threading based on disposition.
202                                // IMPORTANT: do NOT call handle_error() before run_finally() —
203                                // handle_error() calls clear_error() which removes
204                                // PROPERTY_EXCEPTION_MESSAGE/KIND/CAUGHT, preventing finally
205                                // steps from inspecting the caught exception.
206                                //
207                                // disposition semantics (ADR-0019 strict):
208                                //   Handled    -> catch output is final, no propagation
209                                //   Propagate  -> catch ran for side-effects, original propagates
210                                //   Continued  -> rejected at parse time (defensive: treat as
211                                //                 Propagate + log if we ever reach runtime)
212                                let prev = match disposition {
213                                    ExceptionDisposition::Handled => None,
214                                    ExceptionDisposition::Propagate => Some(original_err.clone()),
215                                    ExceptionDisposition::Continued => {
216                                        tracing::warn!(
217                                            "ExceptionDisposition::Continued reached doTry runtime; \
218                                             treating as Propagate. Should have been rejected at parse time."
219                                        );
220                                        Some(original_err.clone())
221                                    }
222                                };
223                                let mut ex = run_finally(
224                                    finally_steps.clone(),
225                                    finally_on_when.clone(),
226                                    ok_ex,
227                                    prev,
228                                )
229                                .await?;
230                                // AFTER finally has run (and had access to exception props),
231                                // apply handle_error() for Handled disposition to clear the
232                                // error state and set CamelExceptionHandled=true marker.
233                                if matches!(disposition, ExceptionDisposition::Handled) {
234                                    ex.handle_error();
235                                }
236                                match disposition {
237                                    ExceptionDisposition::Handled => Ok(ex),
238                                    _ => Err(original_err),
239                                }
240                            }
241                            Err((catch_ex, catch_err)) => {
242                                // Catch threw. Run finally with previous=catch_err.
243                                // Per Camel parity, if finally itself throws, catch_err is restored.
244                                let _ex = run_finally(
245                                    finally_steps.clone(),
246                                    finally_on_when.clone(),
247                                    catch_ex,
248                                    Some(catch_err.clone()),
249                                )
250                                .await?;
251                                Err(catch_err)
252                            }
253                        };
254                    }
255
256                    // No catch matched. Run finally with previous=original. Propagate original.
257                    let _ex = run_finally(
258                        finally_steps,
259                        finally_on_when,
260                        ex,
261                        Some(original_err.clone()),
262                    )
263                    .await?;
264                    Err(original_err)
265                }
266            }
267        })
268    }
269}
270
271// ── DoTrySegment (ADR-0025 OutcomePipeline) ──────────────────────────────
272
273/// Compilable segment for a `doCatch` clause within a `DoTrySegment`.
274///
275/// `disposition` controls outcome when the catch body completes:
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use camel_api::{BoxProcessor, BoxProcessorExt};
281    use std::sync::Arc;
282    use std::sync::atomic::{AtomicU32, Ordering};
283
284    fn passthrough() -> BoxProcessor {
285        BoxProcessor::from_fn(move |ex| Box::pin(async move { Ok(ex) }))
286    }
287
288    fn record_call(flag: Arc<AtomicU32>) -> BoxProcessor {
289        BoxProcessor::from_fn(move |ex| {
290            let f = flag.clone();
291            Box::pin(async move {
292                f.fetch_add(1, Ordering::SeqCst);
293                Ok(ex)
294            })
295        })
296    }
297
298    fn always_fail(err: CamelError) -> BoxProcessor {
299        BoxProcessor::from_fn(move |_ex| {
300            let e = err.clone();
301            Box::pin(async move { Err(e) })
302        })
303    }
304
305    #[tokio::test]
306    async fn happy_path_try_succeeds_finally_runs() {
307        let finally_flag = Arc::new(AtomicU32::new(0));
308        let mut svc = DoTryService::new(vec![passthrough()]);
309        svc.finally_steps = vec![record_call(finally_flag.clone())];
310
311        let mut boxed = BoxProcessor::new(svc);
312        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
313        assert!(result.is_ok());
314        assert_eq!(finally_flag.load(Ordering::SeqCst), 1);
315    }
316
317    #[tokio::test]
318    async fn catch_by_variant_handled_returns_ok() {
319        let try_step = always_fail(CamelError::ProcessorError("boom".into()));
320        let mut svc = DoTryService::new(vec![try_step]);
321        svc.catch_clauses.push(CatchClause {
322            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
323            on_when: None,
324            steps: vec![passthrough()],
325            disposition: ExceptionDisposition::Handled,
326        });
327
328        let mut boxed = BoxProcessor::new(svc);
329        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
330        assert!(result.is_ok(), "Handled must return Ok");
331        let ex = result.unwrap();
332        assert_eq!(
333            ex.properties.get(PROPERTY_EXCEPTION_HANDLED),
334            Some(&camel_api::Value::Bool(true)),
335            "CamelExceptionHandled must be set via handle_error()"
336        );
337    }
338
339    #[tokio::test]
340    async fn catch_by_variant_propagate_runs_side_effects_and_rethrows() {
341        let original = CamelError::ProcessorError("boom".into());
342        let try_step = always_fail(original.clone());
343        let side_effect = Arc::new(AtomicU32::new(0));
344        let catch_step = record_call(side_effect.clone());
345        let mut svc = DoTryService::new(vec![try_step]);
346        svc.catch_clauses.push(CatchClause {
347            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
348            on_when: None,
349            steps: vec![catch_step],
350            disposition: ExceptionDisposition::Propagate,
351        });
352
353        let mut boxed = BoxProcessor::new(svc);
354        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
355        assert!(result.is_err(), "Propagate must rethrow original");
356        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
357        assert_eq!(
358            side_effect.load(Ordering::SeqCst),
359            1,
360            "catch branch must have run for side-effects"
361        );
362    }
363
364    #[tokio::test]
365    async fn catch_by_predicate_matches_via_exception_kind() {
366        let try_step = always_fail(CamelError::Io("disk full".into()));
367        let predicate: FilterPredicate = Arc::new(|ex: &Exchange| {
368            ex.properties
369                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
370                .map(|v| matches!(v, camel_api::Value::String(s) if s == "io"))
371                .unwrap_or(false)
372        });
373        let mut svc = DoTryService::new(vec![try_step]);
374        svc.catch_clauses.push(CatchClause {
375            matcher: CatchMatcher::Predicate(predicate),
376            on_when: None,
377            steps: vec![passthrough()],
378            disposition: ExceptionDisposition::Handled,
379        });
380
381        let mut boxed = BoxProcessor::new(svc);
382        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
383        assert!(
384            result.is_ok(),
385            "Predicate matcher must catch the error and Handled must return Ok"
386        );
387    }
388
389    #[tokio::test]
390    async fn on_when_filters_clause_and_next_evaluated() {
391        let try_step = always_fail(CamelError::ProcessorError("boom".into()));
392        let first_call = Arc::new(AtomicU32::new(0));
393        let second_call = Arc::new(AtomicU32::new(0));
394
395        let mut svc = DoTryService::new(vec![try_step]);
396        svc.catch_clauses.push(CatchClause {
397            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
398            on_when: Some(Arc::new(|_ex| false)),
399            steps: vec![record_call(first_call.clone())],
400            disposition: ExceptionDisposition::Handled,
401        });
402        svc.catch_clauses.push(CatchClause {
403            matcher: CatchMatcher::ByVariant(vec!["*".into()]),
404            on_when: None,
405            steps: vec![record_call(second_call.clone())],
406            disposition: ExceptionDisposition::Handled,
407        });
408
409        let mut boxed = BoxProcessor::new(svc);
410        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
411        assert_eq!(first_call.load(Ordering::SeqCst), 0);
412        assert_eq!(second_call.load(Ordering::SeqCst), 1);
413    }
414
415    #[tokio::test]
416    async fn first_match_wins_subsequent_clauses_not_evaluated() {
417        let try_step = always_fail(CamelError::Io("err".into()));
418        let first_call = Arc::new(AtomicU32::new(0));
419        let second_call = Arc::new(AtomicU32::new(0));
420
421        let mut svc = DoTryService::new(vec![try_step]);
422        svc.catch_clauses.push(CatchClause {
423            matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
424            on_when: None,
425            steps: vec![record_call(first_call.clone())],
426            disposition: ExceptionDisposition::Handled,
427        });
428        svc.catch_clauses.push(CatchClause {
429            matcher: CatchMatcher::ByVariant(vec!["*".into()]),
430            on_when: None,
431            steps: vec![record_call(second_call.clone())],
432            disposition: ExceptionDisposition::Handled,
433        });
434
435        let mut boxed = BoxProcessor::new(svc);
436        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
437        assert_eq!(first_call.load(Ordering::SeqCst), 1);
438        assert_eq!(second_call.load(Ordering::SeqCst), 0);
439    }
440
441    #[tokio::test]
442    async fn no_clause_matches_propagates_original() {
443        let try_step = always_fail(CamelError::CircuitOpen("cb".into()));
444        let mut svc = DoTryService::new(vec![try_step]);
445        svc.catch_clauses.push(CatchClause {
446            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
447            on_when: None,
448            steps: vec![passthrough()],
449            disposition: ExceptionDisposition::Handled,
450        });
451
452        let mut boxed = BoxProcessor::new(svc);
453        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
454        assert!(result.is_err());
455        assert!(matches!(result.unwrap_err(), CamelError::CircuitOpen(_)));
456    }
457
458    #[tokio::test]
459    async fn catch_branch_throws_new_error_wins() {
460        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
461        let catch_step = always_fail(CamelError::Io("catch-fail".into()));
462        let mut svc = DoTryService::new(vec![try_step]);
463        svc.catch_clauses.push(CatchClause {
464            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
465            on_when: None,
466            steps: vec![catch_step],
467            disposition: ExceptionDisposition::Handled,
468        });
469
470        let mut boxed = BoxProcessor::new(svc);
471        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
472        assert!(result.is_err());
473        assert!(matches!(result.unwrap_err(), CamelError::Io(_)));
474    }
475
476    #[tokio::test]
477    async fn finally_throws_with_no_previous_error_propagates_finally_error() {
478        let finally_step = always_fail(CamelError::Config("fin".into()));
479        let mut svc = DoTryService::new(vec![passthrough()]);
480        svc.finally_steps = vec![finally_step];
481
482        let mut boxed = BoxProcessor::new(svc);
483        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
484        assert!(result.is_err());
485        assert!(matches!(result.unwrap_err(), CamelError::Config(_)));
486    }
487
488    #[tokio::test]
489    async fn finally_throws_with_previous_error_restores_previous() {
490        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
491        let finally_step = always_fail(CamelError::Config("fin".into()));
492        let mut svc = DoTryService::new(vec![try_step]);
493        svc.finally_steps = vec![finally_step];
494
495        let mut boxed = BoxProcessor::new(svc);
496        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
497        assert!(result.is_err());
498        assert!(
499            matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
500            "previous error must be restored when finally throws (Camel parity)"
501        );
502    }
503
504    #[tokio::test]
505    async fn finally_on_when_false_skips_finally() {
506        let finally_call = Arc::new(AtomicU32::new(0));
507        let mut svc = DoTryService::new(vec![passthrough()]);
508        svc.finally_steps = vec![record_call(finally_call.clone())];
509        svc.finally_on_when = Some(Arc::new(|_ex| false));
510
511        let mut boxed = BoxProcessor::new(svc);
512        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
513        assert_eq!(finally_call.load(Ordering::SeqCst), 0);
514    }
515
516    #[tokio::test]
517    async fn stale_handled_marker_cleared_on_entry() {
518        let mut ex = Exchange::default();
519        ex.set_property(PROPERTY_EXCEPTION_HANDLED, camel_api::Value::Bool(true));
520        let svc = DoTryService::new(vec![passthrough()]);
521        let mut boxed = BoxProcessor::new(svc);
522        let result = boxed.ready().await.unwrap().call(ex).await;
523        let ex = result.unwrap();
524        assert!(
525            !ex.properties.contains_key(PROPERTY_EXCEPTION_HANDLED),
526            "stale CamelExceptionHandled must be cleared on entry"
527        );
528    }
529
530    #[tokio::test]
531    async fn nested_do_try_inner_catch_does_not_leak_to_outer() {
532        let inner = {
533            let try_step = always_fail(CamelError::Io("inner".into()));
534            let mut d = DoTryService::new(vec![try_step]);
535            d.catch_clauses.push(CatchClause {
536                matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
537                on_when: None,
538                steps: vec![passthrough()],
539                disposition: ExceptionDisposition::Handled,
540            });
541            BoxProcessor::new(d)
542        };
543        let mut outer = DoTryService::new(vec![inner]);
544        outer.catch_clauses.push(CatchClause {
545            matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
546            on_when: None,
547            steps: vec![passthrough()],
548            disposition: ExceptionDisposition::Handled,
549        });
550
551        let mut boxed = BoxProcessor::new(outer);
552        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
553        assert!(
554            result.is_ok(),
555            "outer must see Ok because inner handled its own error"
556        );
557    }
558
559    #[tokio::test]
560    async fn catch_all_only_fires_when_no_specific_clause_matches() {
561        let try_step = always_fail(CamelError::Io("err".into()));
562        let processor_call = Arc::new(AtomicU32::new(0));
563        let catch_all_call = Arc::new(AtomicU32::new(0));
564
565        let mut svc = DoTryService::new(vec![try_step]);
566        // First clause (specific) targets ProcessorError — won't match Io error.
567        svc.catch_clauses.push(CatchClause {
568            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
569            on_when: None,
570            steps: vec![record_call(processor_call.clone())],
571            disposition: ExceptionDisposition::Handled,
572        });
573        // Second clause is the catch-all — should fire.
574        svc.catch_clauses.push(CatchClause {
575            matcher: CatchMatcher::ByVariant(vec!["*".into()]),
576            on_when: None,
577            steps: vec![record_call(catch_all_call.clone())],
578            disposition: ExceptionDisposition::Handled,
579        });
580
581        let mut boxed = BoxProcessor::new(svc);
582        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
583        assert_eq!(
584            processor_call.load(Ordering::SeqCst),
585            0,
586            "specific ProcessorError clause must not fire on Io error"
587        );
588        assert_eq!(
589            catch_all_call.load(Ordering::SeqCst),
590            1,
591            "catch-all clause must fire when no specific clause matches"
592        );
593    }
594
595    #[tokio::test]
596    async fn catch_throws_with_finally_runs_finally_and_propagates_catch_err() {
597        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
598        let catch_step = always_fail(CamelError::Io("catch-fail".into()));
599        let finally_flag = Arc::new(AtomicU32::new(0));
600        let finally_step = record_call(finally_flag.clone());
601
602        let mut svc = DoTryService::new(vec![try_step]);
603        svc.catch_clauses.push(CatchClause {
604            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
605            on_when: None,
606            steps: vec![catch_step],
607            disposition: ExceptionDisposition::Handled,
608        });
609        svc.finally_steps = vec![finally_step];
610
611        let mut boxed = BoxProcessor::new(svc);
612        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
613
614        assert!(result.is_err());
615        assert!(
616            matches!(result.unwrap_err(), CamelError::Io(_)),
617            "catch_err must propagate (not original ProcessorError)"
618        );
619        assert_eq!(
620            finally_flag.load(Ordering::SeqCst),
621            1,
622            "doFinally must run even when catch throws"
623        );
624    }
625
626    #[tokio::test]
627    async fn catch_throws_and_finally_throws_restores_catch_err() {
628        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
629        let catch_step = always_fail(CamelError::Io("catch-fail".into()));
630        let finally_step = always_fail(CamelError::Config("fin-fail".into()));
631
632        let mut svc = DoTryService::new(vec![try_step]);
633        svc.catch_clauses.push(CatchClause {
634            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
635            on_when: None,
636            steps: vec![catch_step],
637            disposition: ExceptionDisposition::Handled,
638        });
639        svc.finally_steps = vec![finally_step];
640
641        let mut boxed = BoxProcessor::new(svc);
642        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
643
644        assert!(result.is_err());
645        assert!(
646            matches!(result.unwrap_err(), CamelError::Io(_)),
647            "catch_err (Io) must be restored over finally_err (Config) per Camel parity"
648        );
649    }
650
651    #[tokio::test]
652    async fn finally_on_when_false_with_previous_error_still_propagates_original() {
653        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
654        let finally_flag = Arc::new(AtomicU32::new(0));
655        let finally_step = record_call(finally_flag.clone());
656
657        let mut svc = DoTryService::new(vec![try_step]);
658        // No catch clauses → original error stays.
659        svc.finally_steps = vec![finally_step];
660        svc.finally_on_when = Some(Arc::new(|_ex| false));
661
662        let mut boxed = BoxProcessor::new(svc);
663        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
664
665        assert!(result.is_err());
666        assert!(
667            matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
668            "original error must propagate even when finally_on_when skips finally"
669        );
670        assert_eq!(
671            finally_flag.load(Ordering::SeqCst),
672            0,
673            "doFinally must NOT run when on_when returns false"
674        );
675    }
676}