Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8    ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
9};
10use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
11
12async fn execute_on_steps(
13    original: Exchange,
14    original_err: CamelError,
15    on_steps: &SyncBoxProcessor,
16    handled: bool,
17    handler: Option<BoxProcessor>,
18) -> Result<Exchange, CamelError> {
19    let snapshot = original.clone();
20    let mut ex = original;
21    ex.set_error(original_err.clone());
22    let mut pipeline = on_steps.clone_inner();
23    let step_result = async {
24        let svc = pipeline.ready().await?;
25        svc.call(ex).await
26    }
27    .await;
28
29    match step_result {
30        Ok(mut ex) => {
31            if handled {
32                ex.handle_error();
33                Ok(ex)
34            } else {
35                // handled:false — steps execute for side-effects (e.g. logging) but
36                // the modified exchange is discarded and the original error propagated.
37                Err(original_err)
38            }
39        }
40        Err(_) => {
41            tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
42            let mut ex = snapshot;
43            ex.set_error(original_err);
44            send_to_handler(ex, handler).await
45        }
46    }
47}
48
49/// Tower Layer that wraps a pipeline with error handling behaviour.
50///
51/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
52pub struct ErrorHandlerLayer {
53    /// Resolved DLC producer (None = log only).
54    dlc_producer: Option<BoxProcessor>,
55    /// Policies with their resolved `handled_by` producers.
56    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
57}
58
59impl ErrorHandlerLayer {
60    /// Create the layer with pre-resolved producers.
61    pub fn new(
62        dlc_producer: Option<BoxProcessor>,
63        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
64    ) -> Self {
65        Self {
66            dlc_producer,
67            policies,
68        }
69    }
70}
71
72impl<S> Layer<S> for ErrorHandlerLayer
73where
74    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
75    S::Future: Send + 'static,
76{
77    type Service = ErrorHandlerService<S>;
78
79    fn layer(&self, inner: S) -> Self::Service {
80        ErrorHandlerService {
81            inner,
82            dlc_producer: self.dlc_producer.clone(),
83            policies: self
84                .policies
85                .iter()
86                .map(|(p, prod)| (p.clone(), prod.clone()))
87                .collect(),
88        }
89    }
90}
91
92/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
93///
94/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
95/// `has_error() == true` if the pipeline ultimately failed.
96pub struct ErrorHandlerService<S> {
97    inner: S,
98    dlc_producer: Option<BoxProcessor>,
99    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
100}
101
102impl<S: Clone> Clone for ErrorHandlerService<S> {
103    fn clone(&self) -> Self {
104        Self {
105            inner: self.inner.clone(),
106            dlc_producer: self.dlc_producer.clone(),
107            policies: self
108                .policies
109                .iter()
110                .map(|(p, prod)| (p.clone(), prod.clone()))
111                .collect(),
112        }
113    }
114}
115
116impl<S> ErrorHandlerService<S>
117where
118    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
119    S::Future: Send + 'static,
120{
121    /// Create the service directly (used in unit tests; in production use the Layer).
122    pub fn new(
123        inner: S,
124        dlc_producer: Option<BoxProcessor>,
125        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
126    ) -> Self {
127        Self {
128            inner,
129            dlc_producer,
130            policies,
131        }
132    }
133}
134
135impl<S> Service<Exchange> for ErrorHandlerService<S>
136where
137    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
138    S::Future: Send + 'static,
139{
140    type Response = Exchange;
141    type Error = CamelError;
142    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
143
144    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145        self.inner.poll_ready(cx)
146    }
147
148    fn call(&mut self, exchange: Exchange) -> Self::Future {
149        let mut inner = self.inner.clone();
150        let dlc = self.dlc_producer.clone();
151        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
152            .policies
153            .iter()
154            .map(|(p, prod)| (p.clone(), prod.clone()))
155            .collect();
156
157        Box::pin(async move {
158            let original = exchange.clone();
159            let result = inner.ready().await?.call(exchange).await;
160
161            let err = match result {
162                Ok(ex) => return Ok(ex),
163                Err(e) => e,
164            };
165
166            // Stop EIP is a control-flow sentinel — pass through without retry or DLC.
167            if matches!(err, CamelError::Stopped) {
168                return Err(err);
169            }
170
171            // Find the first matching policy.
172            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
173
174            if let Some((policy, policy_producer)) = matched {
175                // Retry if configured.
176                if let Some(ref backoff) = policy.retry {
177                    for attempt in 0..backoff.max_attempts {
178                        let delay = backoff.delay_for(attempt);
179                        tokio::time::sleep(delay).await;
180
181                        // Set redelivery headers
182                        let mut ex = original.clone();
183                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
184                        ex.input.set_header(
185                            HEADER_REDELIVERY_COUNTER,
186                            Value::Number((attempt + 1).into()),
187                        );
188                        ex.input.set_header(
189                            HEADER_REDELIVERY_MAX_COUNTER,
190                            Value::Number(backoff.max_attempts.into()),
191                        );
192
193                        match inner.ready().await?.call(ex).await {
194                            Ok(ex) => return Ok(ex),
195                            Err(retry_err) => {
196                                if attempt + 1 == backoff.max_attempts {
197                                    // Retries exhausted — send to handler.
198                                    let mut original = original.clone();
199                                    original
200                                        .input
201                                        .set_header(HEADER_REDELIVERED, Value::Bool(true));
202                                    original.input.set_header(
203                                        HEADER_REDELIVERY_COUNTER,
204                                        Value::Number(backoff.max_attempts.into()),
205                                    );
206                                    original.input.set_header(
207                                        HEADER_REDELIVERY_MAX_COUNTER,
208                                        Value::Number(backoff.max_attempts.into()),
209                                    );
210                                    if let Some(ref steps) = policy.on_steps {
211                                        let handler = policy_producer.clone().or(dlc.clone());
212                                        return execute_on_steps(
213                                            original,
214                                            retry_err,
215                                            steps,
216                                            policy.handled,
217                                            handler,
218                                        )
219                                        .await;
220                                    }
221                                    original.set_error(retry_err);
222                                    let handler = policy_producer.or(dlc);
223                                    return send_to_handler(original, handler).await;
224                                }
225                            }
226                        }
227                    }
228                }
229                // No retry configured (or 0 attempts) — send to policy handler or DLC.
230                if let Some(ref steps) = policy.on_steps {
231                    let handler = policy_producer.or(dlc);
232                    return execute_on_steps(original, err, steps, policy.handled, handler).await;
233                }
234                let mut ex = original.clone();
235                ex.set_error(err);
236                let handler = policy_producer.or(dlc);
237                send_to_handler(ex, handler).await
238            } else {
239                // No matching policy — forward directly to DLC.
240                let mut ex = original;
241                ex.set_error(err);
242                send_to_handler(ex, dlc).await
243            }
244        })
245    }
246}
247
248async fn send_to_handler(
249    exchange: Exchange,
250    producer: Option<BoxProcessor>,
251) -> Result<Exchange, CamelError> {
252    match producer {
253        None => {
254            // log-policy: system-broken
255            tracing::error!(
256                error = ?exchange.error,
257                "Exchange failed with no error handler configured"
258            );
259            Ok(exchange)
260        }
261        Some(mut prod) => match prod.ready().await {
262            Err(e) => {
263                // log-policy: system-broken
264                tracing::error!("DLC/handler not ready: {e}");
265                Ok(exchange)
266            }
267            Ok(svc) => match svc.call(exchange.clone()).await {
268                Ok(ex) => Ok(ex),
269                Err(e) => {
270                    // log-policy: system-broken
271                    tracing::error!("DLC/handler call failed: {e}");
272                    // Return the original exchange with original error intact.
273                    Ok(exchange)
274                }
275            },
276        },
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use camel_api::{
284        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
285        error_handler::RedeliveryPolicy,
286    };
287    use std::sync::{
288        Arc,
289        atomic::{AtomicU32, Ordering},
290    };
291    use std::time::Duration;
292    use tower::ServiceExt;
293
294    fn make_exchange() -> Exchange {
295        Exchange::new(Message::new("test"))
296    }
297
298    fn failing_processor() -> BoxProcessor {
299        BoxProcessor::from_fn(|_ex| {
300            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
301        })
302    }
303
304    fn ok_processor() -> BoxProcessor {
305        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
306    }
307
308    fn fail_n_times(n: u32) -> BoxProcessor {
309        let count = Arc::new(AtomicU32::new(0));
310        BoxProcessor::from_fn(move |ex| {
311            let count = Arc::clone(&count);
312            Box::pin(async move {
313                let c = count.fetch_add(1, Ordering::SeqCst);
314                if c < n {
315                    Err(CamelError::ProcessorError(format!("attempt {c}")))
316                } else {
317                    Ok(ex)
318                }
319            })
320        })
321    }
322
323    #[tokio::test]
324    async fn test_ok_passthrough() {
325        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
326        let result = svc.oneshot(make_exchange()).await;
327        assert!(result.is_ok());
328        assert!(!result.unwrap().has_error());
329    }
330
331    #[tokio::test]
332    async fn test_error_goes_to_dlc() {
333        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
334        let received_clone = Arc::clone(&received);
335        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
336            let r = Arc::clone(&received_clone);
337            Box::pin(async move {
338                r.lock().unwrap().push(ex.clone());
339                Ok(ex)
340            })
341        });
342
343        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
344        let result = svc.oneshot(make_exchange()).await;
345        assert!(result.is_ok());
346        let ex = result.unwrap();
347        assert!(ex.has_error());
348        assert_eq!(received.lock().unwrap().len(), 1);
349    }
350
351    #[tokio::test]
352    async fn test_retry_recovers() {
353        let inner = fail_n_times(2);
354        let policy = ExceptionPolicy {
355            matches: Arc::new(|_| true),
356            retry: Some(RedeliveryPolicy {
357                max_attempts: 3,
358                initial_delay: Duration::from_millis(1),
359                multiplier: 1.0,
360                max_delay: Duration::from_millis(10),
361                jitter_factor: 0.0,
362            }),
363            handled_by: None,
364            on_steps: None,
365            handled: false,
366        };
367        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
368        let result = svc.oneshot(make_exchange()).await;
369        assert!(result.is_ok());
370        assert!(!result.unwrap().has_error());
371    }
372
373    #[tokio::test]
374    async fn test_retry_exhausted_goes_to_dlc() {
375        let inner = fail_n_times(10);
376        let received = Arc::new(std::sync::Mutex::new(0u32));
377        let received_clone = Arc::clone(&received);
378        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
379            let r = Arc::clone(&received_clone);
380            Box::pin(async move {
381                *r.lock().unwrap() += 1;
382                Ok(ex)
383            })
384        });
385        let policy = ExceptionPolicy {
386            matches: Arc::new(|_| true),
387            retry: Some(RedeliveryPolicy {
388                max_attempts: 2,
389                initial_delay: Duration::from_millis(1),
390                multiplier: 1.0,
391                max_delay: Duration::from_millis(10),
392                jitter_factor: 0.0,
393            }),
394            handled_by: None,
395            on_steps: None,
396            handled: false,
397        };
398        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
399        let result = svc.oneshot(make_exchange()).await;
400        assert!(result.is_ok());
401        assert!(result.unwrap().has_error());
402        assert_eq!(*received.lock().unwrap(), 1);
403    }
404
405    #[test]
406    fn test_poll_ready_delegates_to_inner() {
407        use std::sync::atomic::AtomicBool;
408
409        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
410        #[derive(Clone)]
411        struct DelayedReadyService {
412            ready: Arc<AtomicBool>,
413        }
414
415        impl Service<Exchange> for DelayedReadyService {
416            type Response = Exchange;
417            type Error = CamelError;
418            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
419
420            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
421                if self.ready.fetch_or(true, Ordering::SeqCst) {
422                    // Already marked ready (second+ call) → Ready
423                    Poll::Ready(Ok(()))
424                } else {
425                    // First call → Pending, schedule a wake
426                    cx.waker().wake_by_ref();
427                    Poll::Pending
428                }
429            }
430
431            fn call(&mut self, ex: Exchange) -> Self::Future {
432                Box::pin(async move { Ok(ex) })
433            }
434        }
435
436        let waker = futures::task::noop_waker();
437        let mut cx = Context::from_waker(&waker);
438
439        let inner = DelayedReadyService {
440            ready: Arc::new(AtomicBool::new(false)),
441        };
442        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
443
444        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
445        let first = Pin::new(&mut svc).poll_ready(&mut cx);
446        assert!(first.is_pending(), "expected Pending on first poll_ready");
447
448        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
449        let second = Pin::new(&mut svc).poll_ready(&mut cx);
450        assert!(second.is_ready(), "expected Ready on second poll_ready");
451    }
452
453    #[tokio::test]
454    async fn test_no_matching_policy_uses_dlc() {
455        let received = Arc::new(std::sync::Mutex::new(0u32));
456        let received_clone = Arc::clone(&received);
457        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
458            let r = Arc::clone(&received_clone);
459            Box::pin(async move {
460                *r.lock().unwrap() += 1;
461                Ok(ex)
462            })
463        });
464        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
465        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
466        let result = svc.oneshot(make_exchange()).await;
467        assert!(result.is_ok());
468        assert_eq!(*received.lock().unwrap(), 1);
469    }
470
471    #[tokio::test]
472    async fn test_redelivery_headers_are_set() {
473        use camel_api::error_handler::{
474            HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
475            RedeliveryPolicy,
476        };
477
478        let inner = fail_n_times(10);
479        let received = Arc::new(std::sync::Mutex::new(None));
480        let received_clone = Arc::clone(&received);
481        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
482            let r = Arc::clone(&received_clone);
483            Box::pin(async move {
484                *r.lock().unwrap() = Some(ex.clone());
485                Ok(ex)
486            })
487        });
488
489        let policy = ExceptionPolicy {
490            matches: Arc::new(|_| true),
491            retry: Some(RedeliveryPolicy {
492                max_attempts: 2,
493                initial_delay: Duration::from_millis(1),
494                multiplier: 1.0,
495                max_delay: Duration::from_millis(10),
496                jitter_factor: 0.0,
497            }),
498            handled_by: None,
499            on_steps: None,
500            handled: false,
501        };
502
503        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
504        let _ = svc.oneshot(make_exchange()).await.unwrap();
505
506        let ex = received.lock().unwrap().take().unwrap();
507        assert_eq!(
508            ex.input.header(HEADER_REDELIVERED),
509            Some(&Value::Bool(true))
510        );
511        assert_eq!(
512            ex.input.header(HEADER_REDELIVERY_COUNTER),
513            Some(&Value::Number(2.into()))
514        );
515        assert_eq!(
516            ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
517            Some(&Value::Number(2.into()))
518        );
519    }
520
521    #[tokio::test]
522    async fn test_jitter_produces_varying_delays_in_retry_flow() {
523        use std::time::Instant;
524
525        let inner = fail_n_times(10);
526        let received = Arc::new(std::sync::Mutex::new(None));
527        let received_clone = Arc::clone(&received);
528        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
529            let r = Arc::clone(&received_clone);
530            Box::pin(async move {
531                *r.lock().unwrap() = Some(ex.clone());
532                Ok(ex)
533            })
534        });
535
536        let policy = ExceptionPolicy {
537            matches: Arc::new(|_| true),
538            retry: Some(RedeliveryPolicy {
539                max_attempts: 5,
540                initial_delay: Duration::from_millis(20),
541                multiplier: 1.0,
542                max_delay: Duration::from_millis(100),
543                jitter_factor: 0.5,
544            }),
545            handled_by: None,
546            on_steps: None,
547            handled: false,
548        };
549
550        let start = Instant::now();
551        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
552        let _ = svc.oneshot(make_exchange()).await.unwrap();
553        let elapsed = start.elapsed();
554
555        assert!(
556            received.lock().unwrap().is_some(),
557            "DLC should have received exchange"
558        );
559
560        assert!(
561            elapsed >= Duration::from_millis(50),
562            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
563        );
564
565        assert!(
566            elapsed <= Duration::from_millis(500),
567            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
568        );
569    }
570
571    // Stopped is a control-flow sentinel, not a real error.
572    // ErrorHandlerService must pass it through without retrying or forwarding to DLC.
573    #[tokio::test]
574    async fn test_stopped_bypasses_error_handler() {
575        let stopped_inner =
576            BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
577
578        // DLC that tracks if it was ever called.
579        let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
580        let dlc_called_clone = Arc::clone(&dlc_called);
581        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
582            dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
583            Box::pin(async move { Ok(ex) })
584        });
585
586        let policy = ExceptionPolicy::new(|_| true); // matches everything
587        let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
588        let result = svc.oneshot(make_exchange()).await;
589
590        // Must propagate Err(Stopped) — not absorb it.
591        assert!(
592            matches!(result, Err(CamelError::Stopped)),
593            "expected Err(Stopped), got: {:?}",
594            result
595        );
596        // DLC must NOT have been called.
597        assert!(
598            !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
599            "DLC should not be called for Stopped"
600        );
601    }
602
603    #[tokio::test]
604    async fn test_on_steps_handled_true_consumes_error() {
605        use tower::ServiceExt;
606
607        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
608            ex.input.body = camel_api::Body::Bytes("handled".into());
609            async move { Ok(ex) }
610        }));
611        let policy = ExceptionPolicy {
612            matches: Arc::new(|_| true),
613            retry: None,
614            handled_by: None,
615            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
616            handled: true,
617        };
618        let inner = tower::service_fn(|_ex: Exchange| async {
619            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
620        });
621        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
622        let ex = Exchange::default();
623        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
624        assert!(result.error.is_none(), "handled:true should clear error");
625        assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
626    }
627
628    #[tokio::test]
629    async fn test_on_steps_handled_false_propagates_error() {
630        use tower::ServiceExt;
631
632        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
633            ex.input.body = camel_api::Body::Bytes("handled".into());
634            async move { Ok(ex) }
635        }));
636        let policy = ExceptionPolicy {
637            matches: Arc::new(|_| true),
638            retry: None,
639            handled_by: None,
640            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
641            handled: false,
642        };
643        let inner = tower::service_fn(|_ex: Exchange| async {
644            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
645        });
646        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
647        let ex = Exchange::default();
648        let result = svc.ready().await.unwrap().call(ex).await;
649        assert!(result.is_err(), "handled:false should propagate error");
650    }
651
652    #[tokio::test]
653    async fn test_on_steps_handled_true_clears_exception_properties() {
654        use tower::ServiceExt;
655
656        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
657            ex.input.body = camel_api::Body::Bytes("handled".into());
658            async move { Ok(ex) }
659        }));
660        let policy = ExceptionPolicy {
661            matches: Arc::new(|_| true),
662            retry: None,
663            handled_by: None,
664            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
665            handled: true,
666        };
667        let inner = tower::service_fn(|_ex: Exchange| async {
668            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
669        });
670        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
671        let ex = Exchange::default();
672        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
673        assert!(result.error.is_none(), "handled:true should clear error");
674        assert!(
675            result
676                .properties
677                .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
678                .is_none(),
679            "handled:true should clear exception properties"
680        );
681        assert!(
682            result
683                .properties
684                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
685                .is_none(),
686            "handled:true should clear exception kind property"
687        );
688        assert!(
689            result
690                .properties
691                .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
692                .is_none(),
693            "handled:true should clear exception caught property"
694        );
695    }
696}