Skip to main content

camel_processor/
do_try.rs

1use camel_api::error_handler::ExceptionDisposition;
2use camel_api::exchange::PROPERTY_EXCEPTION_HANDLED;
3use camel_api::{BoxProcessor, CamelError, Exchange, FilterPredicate};
4use tower::Service;
5use tower::ServiceExt;
6
7/// Matcher for a `doCatch` clause.
8#[derive(Clone)]
9pub enum CatchMatcher {
10    /// Match by CamelError variant name (e.g. ["ProcessorError", "Io"]).
11    /// `"*"` matches any variant — equivalent to Camel's `doCatch(Throwable.class)`.
12    ByVariant(Vec<String>),
13    /// Match by predicate over the Exchange.
14    Predicate(FilterPredicate),
15}
16
17impl CatchMatcher {
18    /// Returns `true` if this matcher matches the given error and exchange.
19    pub fn matches(&self, err: &CamelError, ex: &Exchange) -> bool {
20        match self {
21            CatchMatcher::ByVariant(names) => {
22                if names.iter().any(|n| n == "*") {
23                    return true;
24                }
25                names.iter().any(|n| n == err.variant_name())
26            }
27            CatchMatcher::Predicate(p) => p(ex),
28        }
29    }
30}
31
32/// A single `doCatch` clause.
33#[derive(Clone)]
34pub struct CatchClause {
35    /// The main matcher (variant-name list or predicate).
36    pub matcher: CatchMatcher,
37    /// Optional sub-predicate evaluated AFTER the main matcher passes.
38    pub on_when: Option<FilterPredicate>,
39    /// Sub-pipeline executed when the clause matches.
40    pub steps: Vec<BoxProcessor>,
41    /// ADR-0019 disposition: Handled (default), Propagate, or Continued (rejected at parse time).
42    /// In YAML, use lowercase: `handled`, `propagate`, `continued`.
43    pub disposition: ExceptionDisposition,
44}
45
46/// The `doTry` processor. Wrap with `BoxProcessor::new(DoTryService::new(...))`.
47#[derive(Clone)]
48pub struct DoTryService {
49    /// Steps in the try block.
50    pub try_steps: Vec<BoxProcessor>,
51    /// Catch clauses evaluated first-match-wins.
52    pub catch_clauses: Vec<CatchClause>,
53    /// Steps in the finally block (empty = no finally).
54    pub finally_steps: Vec<BoxProcessor>,
55    /// Optional onWhen predicate for finally.
56    pub finally_on_when: Option<FilterPredicate>,
57}
58
59impl DoTryService {
60    /// Create a new `DoTryService` with the given try steps.
61    pub fn new(try_steps: Vec<BoxProcessor>) -> Self {
62        Self {
63            try_steps,
64            catch_clauses: Vec::new(),
65            finally_steps: Vec::new(),
66            finally_on_when: None,
67        }
68    }
69
70    /// Full constructor used by the compile pipeline (Task 10b control_flow.rs).
71    /// Builder API (Task 8) constructs via `new()` + field mutation.
72    pub fn with_catch_and_finally(
73        try_steps: Vec<BoxProcessor>,
74        catch_clauses: Vec<CatchClause>,
75        finally_steps: Vec<BoxProcessor>,
76        finally_on_when: Option<FilterPredicate>,
77    ) -> Self {
78        Self {
79            try_steps,
80            catch_clauses,
81            finally_steps,
82            finally_on_when,
83        }
84    }
85}
86
87/// Run a sequence of steps, preserving the last exchange state on error.
88/// Returns `Err((last_ex, err))` so DoTry can populate exception properties.
89async fn run_pipeline(
90    steps: Vec<BoxProcessor>,
91    mut ex: Exchange,
92) -> Result<Exchange, (Exchange, CamelError)> {
93    for mut svc in steps {
94        match svc.ready().await {
95            Ok(ready) => {
96                let snapshot = ex.clone();
97                match ready.call(ex).await {
98                    Ok(new_ex) => ex = new_ex,
99                    Err(err) => return Err((snapshot, err)),
100                }
101            }
102            Err(err) => return Err((ex, err)),
103        }
104    }
105    Ok(ex)
106}
107
108/// Run the finally block. Camel parity for finally-throws:
109/// - If finally succeeds: return its exchange.
110/// - If finally throws AND there was a previous error: restore previous (log finally_err).
111/// - If finally throws AND no previous error: propagate finally_err.
112async fn run_finally(
113    finally_steps: Vec<BoxProcessor>,
114    finally_on_when: Option<FilterPredicate>,
115    ex: Exchange,
116    previous_err: Option<CamelError>,
117) -> Result<Exchange, CamelError> {
118    if finally_steps.is_empty() {
119        return Ok(ex);
120    }
121    if let Some(on_when) = &finally_on_when
122        && !on_when(&ex)
123    {
124        return Ok(ex);
125    }
126    match run_pipeline(finally_steps, ex).await {
127        Ok(ex) => Ok(ex),
128        Err((_, finally_err)) => match previous_err {
129            Some(prev) => {
130                tracing::warn!(
131                    finally_error = %finally_err,
132                    previous_error = %prev,
133                    "doFinally threw; restoring previous exception (Camel parity)"
134                );
135                Err(prev)
136            }
137            None => {
138                tracing::warn!(error = %finally_err, "doFinally threw");
139                Err(finally_err)
140            }
141        },
142    }
143}
144
145impl tower::Service<Exchange> for DoTryService {
146    type Response = Exchange;
147    type Error = CamelError;
148    type Future = std::pin::Pin<
149        Box<dyn std::future::Future<Output = Result<Self::Response, Self::Error>> + Send>,
150    >;
151
152    fn poll_ready(
153        &mut self,
154        _cx: &mut std::task::Context<'_>,
155    ) -> std::task::Poll<Result<(), Self::Error>> {
156        std::task::Poll::Ready(Ok(()))
157    }
158
159    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
160        // Clear stale CamelExceptionHandled marker from prior handlers in the same route.
161        // Exchange::clear_error() does NOT touch HANDLED, so direct map access is required.
162        exchange.properties.remove(PROPERTY_EXCEPTION_HANDLED);
163
164        let try_steps = self.try_steps.clone();
165        let catch_clauses = self.catch_clauses.clone();
166        let finally_steps = self.finally_steps.clone();
167        let finally_on_when = self.finally_on_when.clone();
168
169        Box::pin(async move {
170            let try_result = run_pipeline(try_steps, exchange).await;
171            match try_result {
172                Ok(ex) => run_finally(finally_steps, finally_on_when, ex, None).await,
173                Err((failed_ex, original_err)) => {
174                    let mut ex = failed_ex;
175                    ex.set_error(original_err.clone());
176
177                    for clause in catch_clauses {
178                        let CatchClause {
179                            matcher,
180                            on_when,
181                            steps,
182                            disposition,
183                        } = clause;
184                        if !matcher.matches(&original_err, &ex) {
185                            continue;
186                        }
187                        if let Some(ref on_when) = on_when
188                            && !on_when(&ex)
189                        {
190                            continue;
191                        }
192
193                        let catch_result = run_pipeline(steps, ex.clone()).await;
194
195                        return match catch_result {
196                            Ok(ok_ex) => {
197                                // Determine previous-error threading based on disposition.
198                                // IMPORTANT: do NOT call handle_error() before run_finally() —
199                                // handle_error() calls clear_error() which removes
200                                // PROPERTY_EXCEPTION_MESSAGE/KIND/CAUGHT, preventing finally
201                                // steps from inspecting the caught exception.
202                                //
203                                // disposition semantics (ADR-0019 strict):
204                                //   Handled    -> catch output is final, no propagation
205                                //   Propagate  -> catch ran for side-effects, original propagates
206                                //   Continued  -> rejected at parse time (defensive: treat as
207                                //                 Propagate + log if we ever reach runtime)
208                                let prev = match disposition {
209                                    ExceptionDisposition::Handled => None,
210                                    ExceptionDisposition::Propagate => Some(original_err.clone()),
211                                    ExceptionDisposition::Continued => {
212                                        tracing::warn!(
213                                            "ExceptionDisposition::Continued reached doTry runtime; \
214                                             treating as Propagate. Should have been rejected at parse time."
215                                        );
216                                        Some(original_err.clone())
217                                    }
218                                };
219                                let mut ex = run_finally(
220                                    finally_steps.clone(),
221                                    finally_on_when.clone(),
222                                    ok_ex,
223                                    prev,
224                                )
225                                .await?;
226                                // AFTER finally has run (and had access to exception props),
227                                // apply handle_error() for Handled disposition to clear the
228                                // error state and set CamelExceptionHandled=true marker.
229                                if matches!(disposition, ExceptionDisposition::Handled) {
230                                    ex.handle_error();
231                                }
232                                match disposition {
233                                    ExceptionDisposition::Handled => Ok(ex),
234                                    _ => Err(original_err),
235                                }
236                            }
237                            Err((catch_ex, catch_err)) => {
238                                // Catch threw. Run finally with previous=catch_err.
239                                // Per Camel parity, if finally itself throws, catch_err is restored.
240                                let _ex = run_finally(
241                                    finally_steps.clone(),
242                                    finally_on_when.clone(),
243                                    catch_ex,
244                                    Some(catch_err.clone()),
245                                )
246                                .await?;
247                                Err(catch_err)
248                            }
249                        };
250                    }
251
252                    // No catch matched. Run finally with previous=original. Propagate original.
253                    let _ex = run_finally(
254                        finally_steps,
255                        finally_on_when,
256                        ex,
257                        Some(original_err.clone()),
258                    )
259                    .await?;
260                    Err(original_err)
261                }
262            }
263        })
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use camel_api::{BoxProcessor, BoxProcessorExt};
271    use std::sync::Arc;
272    use std::sync::atomic::{AtomicU32, Ordering};
273
274    fn passthrough() -> BoxProcessor {
275        BoxProcessor::from_fn(move |ex| Box::pin(async move { Ok(ex) }))
276    }
277
278    fn record_call(flag: Arc<AtomicU32>) -> BoxProcessor {
279        BoxProcessor::from_fn(move |ex| {
280            let f = flag.clone();
281            Box::pin(async move {
282                f.fetch_add(1, Ordering::SeqCst);
283                Ok(ex)
284            })
285        })
286    }
287
288    fn always_fail(err: CamelError) -> BoxProcessor {
289        BoxProcessor::from_fn(move |_ex| {
290            let e = err.clone();
291            Box::pin(async move { Err(e) })
292        })
293    }
294
295    #[tokio::test]
296    async fn happy_path_try_succeeds_finally_runs() {
297        let finally_flag = Arc::new(AtomicU32::new(0));
298        let mut svc = DoTryService::new(vec![passthrough()]);
299        svc.finally_steps = vec![record_call(finally_flag.clone())];
300
301        let mut boxed = BoxProcessor::new(svc);
302        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
303        assert!(result.is_ok());
304        assert_eq!(finally_flag.load(Ordering::SeqCst), 1);
305    }
306
307    #[tokio::test]
308    async fn catch_by_variant_handled_returns_ok() {
309        let try_step = always_fail(CamelError::ProcessorError("boom".into()));
310        let mut svc = DoTryService::new(vec![try_step]);
311        svc.catch_clauses.push(CatchClause {
312            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
313            on_when: None,
314            steps: vec![passthrough()],
315            disposition: ExceptionDisposition::Handled,
316        });
317
318        let mut boxed = BoxProcessor::new(svc);
319        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
320        assert!(result.is_ok(), "Handled must return Ok");
321        let ex = result.unwrap();
322        assert_eq!(
323            ex.properties.get(PROPERTY_EXCEPTION_HANDLED),
324            Some(&camel_api::Value::Bool(true)),
325            "CamelExceptionHandled must be set via handle_error()"
326        );
327    }
328
329    #[tokio::test]
330    async fn catch_by_variant_propagate_runs_side_effects_and_rethrows() {
331        let original = CamelError::ProcessorError("boom".into());
332        let try_step = always_fail(original.clone());
333        let side_effect = Arc::new(AtomicU32::new(0));
334        let catch_step = record_call(side_effect.clone());
335        let mut svc = DoTryService::new(vec![try_step]);
336        svc.catch_clauses.push(CatchClause {
337            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
338            on_when: None,
339            steps: vec![catch_step],
340            disposition: ExceptionDisposition::Propagate,
341        });
342
343        let mut boxed = BoxProcessor::new(svc);
344        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
345        assert!(result.is_err(), "Propagate must rethrow original");
346        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
347        assert_eq!(
348            side_effect.load(Ordering::SeqCst),
349            1,
350            "catch branch must have run for side-effects"
351        );
352    }
353
354    #[tokio::test]
355    async fn catch_by_predicate_matches_via_exception_kind() {
356        let try_step = always_fail(CamelError::Io("disk full".into()));
357        let predicate: FilterPredicate = Arc::new(|ex: &Exchange| {
358            ex.properties
359                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
360                .map(|v| matches!(v, camel_api::Value::String(s) if s == "io"))
361                .unwrap_or(false)
362        });
363        let mut svc = DoTryService::new(vec![try_step]);
364        svc.catch_clauses.push(CatchClause {
365            matcher: CatchMatcher::Predicate(predicate),
366            on_when: None,
367            steps: vec![passthrough()],
368            disposition: ExceptionDisposition::Handled,
369        });
370
371        let mut boxed = BoxProcessor::new(svc);
372        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
373        assert!(
374            result.is_ok(),
375            "Predicate matcher must catch the error and Handled must return Ok"
376        );
377    }
378
379    #[tokio::test]
380    async fn on_when_filters_clause_and_next_evaluated() {
381        let try_step = always_fail(CamelError::ProcessorError("boom".into()));
382        let first_call = Arc::new(AtomicU32::new(0));
383        let second_call = Arc::new(AtomicU32::new(0));
384
385        let mut svc = DoTryService::new(vec![try_step]);
386        svc.catch_clauses.push(CatchClause {
387            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
388            on_when: Some(Arc::new(|_ex| false)),
389            steps: vec![record_call(first_call.clone())],
390            disposition: ExceptionDisposition::Handled,
391        });
392        svc.catch_clauses.push(CatchClause {
393            matcher: CatchMatcher::ByVariant(vec!["*".into()]),
394            on_when: None,
395            steps: vec![record_call(second_call.clone())],
396            disposition: ExceptionDisposition::Handled,
397        });
398
399        let mut boxed = BoxProcessor::new(svc);
400        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
401        assert_eq!(first_call.load(Ordering::SeqCst), 0);
402        assert_eq!(second_call.load(Ordering::SeqCst), 1);
403    }
404
405    #[tokio::test]
406    async fn first_match_wins_subsequent_clauses_not_evaluated() {
407        let try_step = always_fail(CamelError::Io("err".into()));
408        let first_call = Arc::new(AtomicU32::new(0));
409        let second_call = Arc::new(AtomicU32::new(0));
410
411        let mut svc = DoTryService::new(vec![try_step]);
412        svc.catch_clauses.push(CatchClause {
413            matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
414            on_when: None,
415            steps: vec![record_call(first_call.clone())],
416            disposition: ExceptionDisposition::Handled,
417        });
418        svc.catch_clauses.push(CatchClause {
419            matcher: CatchMatcher::ByVariant(vec!["*".into()]),
420            on_when: None,
421            steps: vec![record_call(second_call.clone())],
422            disposition: ExceptionDisposition::Handled,
423        });
424
425        let mut boxed = BoxProcessor::new(svc);
426        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
427        assert_eq!(first_call.load(Ordering::SeqCst), 1);
428        assert_eq!(second_call.load(Ordering::SeqCst), 0);
429    }
430
431    #[tokio::test]
432    async fn no_clause_matches_propagates_original() {
433        let try_step = always_fail(CamelError::CircuitOpen("cb".into()));
434        let mut svc = DoTryService::new(vec![try_step]);
435        svc.catch_clauses.push(CatchClause {
436            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
437            on_when: None,
438            steps: vec![passthrough()],
439            disposition: ExceptionDisposition::Handled,
440        });
441
442        let mut boxed = BoxProcessor::new(svc);
443        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
444        assert!(result.is_err());
445        assert!(matches!(result.unwrap_err(), CamelError::CircuitOpen(_)));
446    }
447
448    #[tokio::test]
449    async fn catch_branch_throws_new_error_wins() {
450        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
451        let catch_step = always_fail(CamelError::Io("catch-fail".into()));
452        let mut svc = DoTryService::new(vec![try_step]);
453        svc.catch_clauses.push(CatchClause {
454            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
455            on_when: None,
456            steps: vec![catch_step],
457            disposition: ExceptionDisposition::Handled,
458        });
459
460        let mut boxed = BoxProcessor::new(svc);
461        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
462        assert!(result.is_err());
463        assert!(matches!(result.unwrap_err(), CamelError::Io(_)));
464    }
465
466    #[tokio::test]
467    async fn finally_throws_with_no_previous_error_propagates_finally_error() {
468        let finally_step = always_fail(CamelError::Config("fin".into()));
469        let mut svc = DoTryService::new(vec![passthrough()]);
470        svc.finally_steps = vec![finally_step];
471
472        let mut boxed = BoxProcessor::new(svc);
473        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
474        assert!(result.is_err());
475        assert!(matches!(result.unwrap_err(), CamelError::Config(_)));
476    }
477
478    #[tokio::test]
479    async fn finally_throws_with_previous_error_restores_previous() {
480        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
481        let finally_step = always_fail(CamelError::Config("fin".into()));
482        let mut svc = DoTryService::new(vec![try_step]);
483        svc.finally_steps = vec![finally_step];
484
485        let mut boxed = BoxProcessor::new(svc);
486        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
487        assert!(result.is_err());
488        assert!(
489            matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
490            "previous error must be restored when finally throws (Camel parity)"
491        );
492    }
493
494    #[tokio::test]
495    async fn finally_on_when_false_skips_finally() {
496        let finally_call = Arc::new(AtomicU32::new(0));
497        let mut svc = DoTryService::new(vec![passthrough()]);
498        svc.finally_steps = vec![record_call(finally_call.clone())];
499        svc.finally_on_when = Some(Arc::new(|_ex| false));
500
501        let mut boxed = BoxProcessor::new(svc);
502        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
503        assert_eq!(finally_call.load(Ordering::SeqCst), 0);
504    }
505
506    #[tokio::test]
507    async fn stale_handled_marker_cleared_on_entry() {
508        let mut ex = Exchange::default();
509        ex.set_property(PROPERTY_EXCEPTION_HANDLED, camel_api::Value::Bool(true));
510        let svc = DoTryService::new(vec![passthrough()]);
511        let mut boxed = BoxProcessor::new(svc);
512        let result = boxed.ready().await.unwrap().call(ex).await;
513        let ex = result.unwrap();
514        assert!(
515            !ex.properties.contains_key(PROPERTY_EXCEPTION_HANDLED),
516            "stale CamelExceptionHandled must be cleared on entry"
517        );
518    }
519
520    #[tokio::test]
521    async fn nested_do_try_inner_catch_does_not_leak_to_outer() {
522        let inner = {
523            let try_step = always_fail(CamelError::Io("inner".into()));
524            let mut d = DoTryService::new(vec![try_step]);
525            d.catch_clauses.push(CatchClause {
526                matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
527                on_when: None,
528                steps: vec![passthrough()],
529                disposition: ExceptionDisposition::Handled,
530            });
531            BoxProcessor::new(d)
532        };
533        let mut outer = DoTryService::new(vec![inner]);
534        outer.catch_clauses.push(CatchClause {
535            matcher: CatchMatcher::ByVariant(vec!["Io".into()]),
536            on_when: None,
537            steps: vec![passthrough()],
538            disposition: ExceptionDisposition::Handled,
539        });
540
541        let mut boxed = BoxProcessor::new(outer);
542        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
543        assert!(
544            result.is_ok(),
545            "outer must see Ok because inner handled its own error"
546        );
547    }
548
549    #[tokio::test]
550    async fn catch_all_only_fires_when_no_specific_clause_matches() {
551        let try_step = always_fail(CamelError::Io("err".into()));
552        let processor_call = Arc::new(AtomicU32::new(0));
553        let catch_all_call = Arc::new(AtomicU32::new(0));
554
555        let mut svc = DoTryService::new(vec![try_step]);
556        // First clause (specific) targets ProcessorError — won't match Io error.
557        svc.catch_clauses.push(CatchClause {
558            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
559            on_when: None,
560            steps: vec![record_call(processor_call.clone())],
561            disposition: ExceptionDisposition::Handled,
562        });
563        // Second clause is the catch-all — should fire.
564        svc.catch_clauses.push(CatchClause {
565            matcher: CatchMatcher::ByVariant(vec!["*".into()]),
566            on_when: None,
567            steps: vec![record_call(catch_all_call.clone())],
568            disposition: ExceptionDisposition::Handled,
569        });
570
571        let mut boxed = BoxProcessor::new(svc);
572        let _ = boxed.ready().await.unwrap().call(Exchange::default()).await;
573        assert_eq!(
574            processor_call.load(Ordering::SeqCst),
575            0,
576            "specific ProcessorError clause must not fire on Io error"
577        );
578        assert_eq!(
579            catch_all_call.load(Ordering::SeqCst),
580            1,
581            "catch-all clause must fire when no specific clause matches"
582        );
583    }
584
585    #[tokio::test]
586    async fn catch_throws_with_finally_runs_finally_and_propagates_catch_err() {
587        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
588        let catch_step = always_fail(CamelError::Io("catch-fail".into()));
589        let finally_flag = Arc::new(AtomicU32::new(0));
590        let finally_step = record_call(finally_flag.clone());
591
592        let mut svc = DoTryService::new(vec![try_step]);
593        svc.catch_clauses.push(CatchClause {
594            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
595            on_when: None,
596            steps: vec![catch_step],
597            disposition: ExceptionDisposition::Handled,
598        });
599        svc.finally_steps = vec![finally_step];
600
601        let mut boxed = BoxProcessor::new(svc);
602        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
603
604        assert!(result.is_err());
605        assert!(
606            matches!(result.unwrap_err(), CamelError::Io(_)),
607            "catch_err must propagate (not original ProcessorError)"
608        );
609        assert_eq!(
610            finally_flag.load(Ordering::SeqCst),
611            1,
612            "doFinally must run even when catch throws"
613        );
614    }
615
616    #[tokio::test]
617    async fn catch_throws_and_finally_throws_restores_catch_err() {
618        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
619        let catch_step = always_fail(CamelError::Io("catch-fail".into()));
620        let finally_step = always_fail(CamelError::Config("fin-fail".into()));
621
622        let mut svc = DoTryService::new(vec![try_step]);
623        svc.catch_clauses.push(CatchClause {
624            matcher: CatchMatcher::ByVariant(vec!["ProcessorError".into()]),
625            on_when: None,
626            steps: vec![catch_step],
627            disposition: ExceptionDisposition::Handled,
628        });
629        svc.finally_steps = vec![finally_step];
630
631        let mut boxed = BoxProcessor::new(svc);
632        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
633
634        assert!(result.is_err());
635        assert!(
636            matches!(result.unwrap_err(), CamelError::Io(_)),
637            "catch_err (Io) must be restored over finally_err (Config) per Camel parity"
638        );
639    }
640
641    #[tokio::test]
642    async fn finally_on_when_false_with_previous_error_still_propagates_original() {
643        let try_step = always_fail(CamelError::ProcessorError("orig".into()));
644        let finally_flag = Arc::new(AtomicU32::new(0));
645        let finally_step = record_call(finally_flag.clone());
646
647        let mut svc = DoTryService::new(vec![try_step]);
648        // No catch clauses → original error stays.
649        svc.finally_steps = vec![finally_step];
650        svc.finally_on_when = Some(Arc::new(|_ex| false));
651
652        let mut boxed = BoxProcessor::new(svc);
653        let result = boxed.ready().await.unwrap().call(Exchange::default()).await;
654
655        assert!(result.is_err());
656        assert!(
657            matches!(result.unwrap_err(), CamelError::ProcessorError(_)),
658            "original error must propagate even when finally_on_when skips finally"
659        );
660        assert_eq!(
661            finally_flag.load(Ordering::SeqCst),
662            0,
663            "doFinally must NOT run when on_when returns false"
664        );
665    }
666}