Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8    BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9    HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10    StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15    original: Exchange,
16    original_err: CamelError,
17    on_steps: &SyncBoxProcessor,
18    disposition: ExceptionDisposition,
19    handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21    let snapshot = original.clone();
22    let mut ex = original;
23    ex.set_error(original_err.clone());
24    let mut pipeline = on_steps.clone_inner();
25    let step_result = async {
26        let svc = pipeline.ready().await?;
27        svc.call(ex).await
28    }
29    .await;
30
31    match step_result {
32        Ok(mut ex) => {
33            if disposition == ExceptionDisposition::Handled {
34                ex.handle_error();
35                Ok(ex)
36            } else {
37                // Propagate or Continued — steps execute for side-effects (e.g. logging) but
38                // the modified exchange is discarded and the original error propagated.
39                Err(original_err)
40            }
41        }
42        Err(_) => {
43            tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
44            let mut ex = snapshot;
45            ex.set_error(original_err);
46            send_to_handler(ex, handler).await
47        }
48    }
49}
50
51/// Invoke a processor: readiness check + call, unified into a single Result.
52///
53/// Readiness errors and call errors are both returned as `Err(CamelError)`,
54/// allowing the pipeline's recovery loop to handle them uniformly.
55pub async fn invoke_processor(
56    svc: &mut BoxProcessor,
57    ex: Exchange,
58) -> Result<Exchange, CamelError> {
59    match svc.ready().await {
60        Ok(ready) => ready.call(ex).await,
61        Err(err) => Err(err),
62    }
63}
64
65/// Route-level error handler owning ALL error handling logic.
66///
67/// Single owner of DLC, retry, onException policies. Called from
68/// `RouteChannelService` (boundary errors) and `run_steps` (step errors).
69#[async_trait::async_trait]
70pub trait RouteErrorHandler: Send + Sync {
71    /// Match a policy for the given error. Called once before retry.
72    fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
73
74    /// Phase 1: Retry the failed step.
75    async fn retry_step(
76        &self,
77        policy: Option<PolicyId>,
78        step: &mut BoxProcessor,
79        original: Exchange,
80        error: CamelError,
81    ) -> RetryOutcome;
82
83    /// Phase 2: Determine step disposition after retry exhaustion.
84    async fn handle_step(
85        &self,
86        policy: Option<PolicyId>,
87        exchange: Exchange,
88        error: CamelError,
89    ) -> Result<StepDisposition, CamelError>;
90
91    /// Handle boundary (infrastructure) errors.
92    async fn handle_boundary(
93        &self,
94        kind: BoundaryKind,
95        exchange: Exchange,
96        error: CamelError,
97    ) -> Result<Exchange, CamelError>;
98}
99
100/// Default implementation of RouteErrorHandler.
101/// Owns DLC producer exclusively. Encapsulates retry/onException/DLC logic.
102///
103/// Uses `SyncBoxProcessor` internally so the handler is `Send + Sync` as required
104/// by the `RouteErrorHandler` trait.
105pub struct DefaultRouteErrorHandler {
106    pub(crate) dlc_producer: Option<SyncBoxProcessor>,
107    pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
108}
109
110impl DefaultRouteErrorHandler {
111    pub fn new(
112        dlc_producer: Option<BoxProcessor>,
113        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
114    ) -> Self {
115        Self {
116            dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
117            policies: policies
118                .into_iter()
119                .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
120                .collect(),
121        }
122    }
123
124    /// Resolve (disposition, producer) for a matched policy.
125    /// Shared by handle_step and handle_boundary.
126    fn resolve_producer(
127        &self,
128        policy: Option<PolicyId>,
129    ) -> (ExceptionDisposition, Option<BoxProcessor>) {
130        match policy {
131            Some(PolicyId(idx)) => match self.policies.get(idx) {
132                Some((p, prod)) => (
133                    p.disposition,
134                    prod.as_ref()
135                        .map(|p| p.clone_inner())
136                        .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
137                ),
138                None => (
139                    ExceptionDisposition::Propagate,
140                    self.dlc_producer.as_ref().map(|p| p.clone_inner()),
141                ),
142            },
143            None => (
144                ExceptionDisposition::Propagate,
145                self.dlc_producer.as_ref().map(|p| p.clone_inner()),
146            ),
147        }
148    }
149}
150
151#[async_trait::async_trait]
152impl RouteErrorHandler for DefaultRouteErrorHandler {
153    fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
154        self.policies
155            .iter()
156            .position(|(p, _)| (p.matches)(err))
157            .map(PolicyId)
158    }
159
160    async fn retry_step(
161        &self,
162        policy: Option<PolicyId>,
163        step: &mut BoxProcessor,
164        original: Exchange,
165        error: CamelError,
166    ) -> RetryOutcome {
167        let Some(PolicyId(idx)) = policy else {
168            return RetryOutcome::Exhausted {
169                exchange: original,
170                error,
171                policy: None,
172            };
173        };
174        let Some((policy_def, _)) = self.policies.get(idx) else {
175            return RetryOutcome::Exhausted {
176                exchange: original,
177                error,
178                policy,
179            };
180        };
181        let Some(ref backoff) = policy_def.retry else {
182            return RetryOutcome::Exhausted {
183                exchange: original,
184                error,
185                policy,
186            };
187        };
188
189        for attempt in 0..backoff.max_attempts {
190            let delay = backoff.delay_for(attempt);
191            tokio::time::sleep(delay).await;
192
193            let mut ex = original.clone();
194            ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
195            ex.input.set_header(
196                HEADER_REDELIVERY_COUNTER,
197                Value::Number((attempt + 1).into()),
198            );
199            ex.input.set_header(
200                HEADER_REDELIVERY_MAX_COUNTER,
201                Value::Number(backoff.max_attempts.into()),
202            );
203
204            match invoke_processor(step, ex).await {
205                Ok(exchange) => return RetryOutcome::Recovered(exchange),
206                Err(retry_err) => {
207                    if attempt + 1 == backoff.max_attempts {
208                        let mut final_ex = original;
209                        final_ex
210                            .input
211                            .set_header(HEADER_REDELIVERED, Value::Bool(true));
212                        final_ex.input.set_header(
213                            HEADER_REDELIVERY_COUNTER,
214                            Value::Number(backoff.max_attempts.into()),
215                        );
216                        final_ex.input.set_header(
217                            HEADER_REDELIVERY_MAX_COUNTER,
218                            Value::Number(backoff.max_attempts.into()),
219                        );
220                        return RetryOutcome::Exhausted {
221                            exchange: final_ex,
222                            error: retry_err,
223                            policy,
224                        };
225                    }
226                }
227            }
228        }
229
230        RetryOutcome::Exhausted {
231            exchange: original,
232            error,
233            policy,
234        }
235    }
236
237    async fn handle_step(
238        &self,
239        policy: Option<PolicyId>,
240        mut exchange: Exchange,
241        error: CamelError,
242    ) -> Result<StepDisposition, CamelError> {
243        if matches!(error, CamelError::Stopped) {
244            return Ok(StepDisposition::Propagate(error));
245        }
246
247        let (disposition, producer) = self.resolve_producer(policy);
248
249        // Run on_steps if present (using the SAME policy identified by PolicyId)
250        if let Some(PolicyId(idx)) = policy
251            && let Some((p, _)) = self.policies.get(idx)
252            && let Some(ref steps) = p.on_steps
253        {
254            let snapshot = exchange.clone();
255            exchange.set_error(error.clone());
256            let mut step_pipeline = steps.clone_inner();
257            let step_result = async {
258                let svc = step_pipeline.ready().await?;
259                svc.call(exchange).await
260            }
261            .await;
262            match step_result {
263                Ok(mut ex) => match disposition {
264                    ExceptionDisposition::Handled => {
265                        ex.handle_error();
266                        return Ok(StepDisposition::Handled(ex));
267                    }
268                    ExceptionDisposition::Continued => {
269                        ex.clear_error();
270                        return Ok(StepDisposition::Continued(ex));
271                    }
272                    ExceptionDisposition::Propagate => {
273                        exchange = snapshot;
274                    }
275                },
276                Err(_) => {
277                    exchange = snapshot;
278                }
279            }
280        }
281
282        // No on_steps, on_steps failed, or Propagate — forward to DLC/handler.
283        // BIND the returned exchange (must use handler output).
284        exchange.set_error(error.clone());
285        match send_to_handler(exchange, producer).await {
286            Ok(handler_ex) => match disposition {
287                ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
288                ExceptionDisposition::Handled => {
289                    let mut ex = handler_ex;
290                    ex.clear_error();
291                    Ok(StepDisposition::Handled(ex))
292                }
293                ExceptionDisposition::Continued => {
294                    let mut ex = handler_ex;
295                    ex.clear_error();
296                    Ok(StepDisposition::Continued(ex))
297                }
298            },
299            // Dead code by construction: send_to_handler always returns Ok.
300            Err(_) => Ok(StepDisposition::Propagate(error)),
301        }
302    }
303
304    async fn handle_boundary(
305        &self,
306        _kind: BoundaryKind,
307        mut exchange: Exchange,
308        error: CamelError,
309    ) -> Result<Exchange, CamelError> {
310        // Boundary errors: match policy, run on_steps, forward to DLC.
311        // Disposition mapping:
312        //   Handled → clear error, return Ok(exchange)
313        //   Propagate | Continued → forward to DLC, return Ok(exchange_with_error)
314        //   (Continued at boundary = Propagate — no next step to continue to)
315        let policy = self.match_policy(&error);
316        let (disposition, producer) = self.resolve_producer(policy);
317
318        // Run on_steps if present (shared logic with handle_step)
319        if let Some(PolicyId(idx)) = policy
320            && let Some((p, _)) = self.policies.get(idx)
321            && let Some(ref steps) = p.on_steps
322        {
323            let snapshot = exchange.clone();
324            exchange.set_error(error.clone());
325            let mut step_pipeline = steps.clone_inner();
326            let step_result = async {
327                let svc = step_pipeline.ready().await?;
328                svc.call(exchange).await
329            }
330            .await;
331            match step_result {
332                Ok(mut ex) => match disposition {
333                    ExceptionDisposition::Handled => {
334                        ex.handle_error();
335                        return Ok(ex);
336                    }
337                    ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
338                        exchange = snapshot;
339                    }
340                },
341                Err(_) => {
342                    exchange = snapshot;
343                }
344            }
345        }
346
347        // Forward to DLC/handler — BIND returned exchange
348        exchange.set_error(error.clone());
349        match send_to_handler(exchange, producer).await {
350            Ok(handler_ex) => match disposition {
351                ExceptionDisposition::Handled => {
352                    let mut ex = handler_ex;
353                    ex.clear_error();
354                    Ok(ex)
355                }
356                ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
357                    let mut ex = handler_ex;
358                    ex.set_error(error);
359                    Ok(ex)
360                }
361            },
362            // Dead code by construction: send_to_handler always returns Ok.
363            Err(e) => Err(e),
364        }
365    }
366}
367
368/// Tower Layer that wraps a pipeline with error handling behaviour.
369///
370/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
371#[deprecated(
372    since = "0.16.0",
373    note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
374)]
375pub struct ErrorHandlerLayer {
376    /// Resolved DLC producer (None = log only).
377    dlc_producer: Option<BoxProcessor>,
378    /// Policies with their resolved `handled_by` producers.
379    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
380}
381
382#[allow(deprecated)]
383impl ErrorHandlerLayer {
384    /// Create the layer with pre-resolved producers.
385    pub fn new(
386        dlc_producer: Option<BoxProcessor>,
387        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
388    ) -> Self {
389        Self {
390            dlc_producer,
391            policies,
392        }
393    }
394}
395
396#[allow(deprecated)]
397impl<S> Layer<S> for ErrorHandlerLayer
398where
399    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
400    S::Future: Send + 'static,
401{
402    type Service = ErrorHandlerService<S>;
403
404    fn layer(&self, inner: S) -> Self::Service {
405        ErrorHandlerService {
406            inner,
407            dlc_producer: self.dlc_producer.clone(),
408            policies: self
409                .policies
410                .iter()
411                .map(|(p, prod)| (p.clone(), prod.clone()))
412                .collect(),
413        }
414    }
415}
416
417/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
418///
419/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
420/// `has_error() == true` if the pipeline ultimately failed.
421#[deprecated(
422    since = "0.16.0",
423    note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
424)]
425pub struct ErrorHandlerService<S> {
426    inner: S,
427    dlc_producer: Option<BoxProcessor>,
428    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
429}
430
431#[allow(deprecated)]
432impl<S: Clone> Clone for ErrorHandlerService<S> {
433    fn clone(&self) -> Self {
434        Self {
435            inner: self.inner.clone(),
436            dlc_producer: self.dlc_producer.clone(),
437            policies: self
438                .policies
439                .iter()
440                .map(|(p, prod)| (p.clone(), prod.clone()))
441                .collect(),
442        }
443    }
444}
445
446#[allow(deprecated)]
447impl<S> ErrorHandlerService<S>
448where
449    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
450    S::Future: Send + 'static,
451{
452    /// Create the service directly (used in unit tests; in production use the Layer).
453    pub fn new(
454        inner: S,
455        dlc_producer: Option<BoxProcessor>,
456        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
457    ) -> Self {
458        Self {
459            inner,
460            dlc_producer,
461            policies,
462        }
463    }
464}
465
466#[allow(deprecated)]
467impl<S> Service<Exchange> for ErrorHandlerService<S>
468where
469    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
470    S::Future: Send + 'static,
471{
472    type Response = Exchange;
473    type Error = CamelError;
474    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
475
476    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
477        // Preserve backpressure (Pending) but never leak readiness errors upward.
478        // Readiness errors are deferred to call(), where they go through the same
479        // retry/onException/DLC path as call() errors. This is safe because call()
480        // re-checks readiness on a fresh inner clone via inner.ready().await.
481        match self.inner.poll_ready(cx) {
482            Poll::Pending => Poll::Pending,
483            Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
484            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
485        }
486    }
487
488    fn call(&mut self, exchange: Exchange) -> Self::Future {
489        let mut inner = self.inner.clone();
490        let dlc = self.dlc_producer.clone();
491        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
492            .policies
493            .iter()
494            .map(|(p, prod)| (p.clone(), prod.clone()))
495            .collect();
496
497        Box::pin(async move {
498            let original = exchange.clone();
499            let result = match inner.ready().await {
500                Ok(svc) => svc.call(exchange).await,
501                Err(e) => Err(e), // readiness error — enters retry/DLC/onException path below
502            };
503
504            let err = match result {
505                Ok(ex) => return Ok(ex),
506                Err(e) => e,
507            };
508
509            // Stop EIP is a control-flow sentinel — pass through without retry or DLC.
510            if matches!(err, CamelError::Stopped) {
511                return Err(err);
512            }
513
514            // Find the first matching policy.
515            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
516
517            if let Some((policy, policy_producer)) = matched {
518                // Retry if configured.
519                if let Some(ref backoff) = policy.retry {
520                    for attempt in 0..backoff.max_attempts {
521                        let delay = backoff.delay_for(attempt);
522                        tokio::time::sleep(delay).await;
523
524                        // Set redelivery headers
525                        let mut ex = original.clone();
526                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
527                        ex.input.set_header(
528                            HEADER_REDELIVERY_COUNTER,
529                            Value::Number((attempt + 1).into()),
530                        );
531                        ex.input.set_header(
532                            HEADER_REDELIVERY_MAX_COUNTER,
533                            Value::Number(backoff.max_attempts.into()),
534                        );
535
536                        let result = match inner.ready().await {
537                            Ok(svc) => svc.call(ex).await,
538                            Err(e) => Err(e), // readiness error — enters retry exhaustion path
539                        };
540                        match result {
541                            Ok(ex) => return Ok(ex),
542                            Err(retry_err) => {
543                                if attempt + 1 == backoff.max_attempts {
544                                    // Retries exhausted — send to handler.
545                                    let mut original = original.clone();
546                                    original
547                                        .input
548                                        .set_header(HEADER_REDELIVERED, Value::Bool(true));
549                                    original.input.set_header(
550                                        HEADER_REDELIVERY_COUNTER,
551                                        Value::Number(backoff.max_attempts.into()),
552                                    );
553                                    original.input.set_header(
554                                        HEADER_REDELIVERY_MAX_COUNTER,
555                                        Value::Number(backoff.max_attempts.into()),
556                                    );
557                                    if let Some(ref steps) = policy.on_steps {
558                                        let handler = policy_producer.clone().or(dlc.clone());
559                                        return execute_on_steps(
560                                            original,
561                                            retry_err,
562                                            steps,
563                                            policy.disposition,
564                                            handler,
565                                        )
566                                        .await;
567                                    }
568                                    original.set_error(retry_err);
569                                    let handler = policy_producer.or(dlc);
570                                    return send_to_handler(original, handler).await;
571                                }
572                            }
573                        }
574                    }
575                }
576                // No retry configured (or 0 attempts) — send to policy handler or DLC.
577                if let Some(ref steps) = policy.on_steps {
578                    let handler = policy_producer.or(dlc);
579                    return execute_on_steps(original, err, steps, policy.disposition, handler)
580                        .await;
581                }
582                let mut ex = original.clone();
583                ex.set_error(err);
584                let handler = policy_producer.or(dlc);
585                send_to_handler(ex, handler).await
586            } else {
587                // No matching policy — forward directly to DLC.
588                let mut ex = original;
589                ex.set_error(err);
590                send_to_handler(ex, dlc).await
591            }
592        })
593    }
594}
595
596async fn send_to_handler(
597    exchange: Exchange,
598    producer: Option<BoxProcessor>,
599) -> Result<Exchange, CamelError> {
600    match producer {
601        None => {
602            // log-policy: system-broken
603            tracing::error!(
604                error = ?exchange.error,
605                "Exchange failed with no error handler configured"
606            );
607            Ok(exchange)
608        }
609        Some(mut prod) => match prod.ready().await {
610            Err(e) => {
611                // log-policy: system-broken
612                tracing::error!("DLC/handler not ready: {e}");
613                Ok(exchange)
614            }
615            Ok(svc) => match svc.call(exchange.clone()).await {
616                Ok(ex) => Ok(ex),
617                Err(e) => {
618                    // log-policy: system-broken
619                    tracing::error!("DLC/handler call failed: {e}");
620                    // Return the original exchange with original error intact.
621                    Ok(exchange)
622                }
623            },
624        },
625    }
626}
627
628#[cfg(test)]
629#[allow(deprecated)]
630mod tests {
631    use super::*;
632    use camel_api::{
633        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
634        error_handler::RedeliveryPolicy,
635    };
636    use std::sync::{
637        Arc,
638        atomic::{AtomicU32, Ordering},
639    };
640    use std::time::Duration;
641    use tower::ServiceExt;
642
643    fn make_exchange() -> Exchange {
644        Exchange::new(Message::new("test"))
645    }
646
647    fn failing_processor() -> BoxProcessor {
648        BoxProcessor::from_fn(|_ex| {
649            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
650        })
651    }
652
653    fn ok_processor() -> BoxProcessor {
654        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
655    }
656
657    fn fail_n_times(n: u32) -> BoxProcessor {
658        let count = Arc::new(AtomicU32::new(0));
659        BoxProcessor::from_fn(move |ex| {
660            let count = Arc::clone(&count);
661            Box::pin(async move {
662                let c = count.fetch_add(1, Ordering::SeqCst);
663                if c < n {
664                    Err(CamelError::ProcessorError(format!("attempt {c}")))
665                } else {
666                    Ok(ex)
667                }
668            })
669        })
670    }
671
672    #[tokio::test]
673    async fn test_ok_passthrough() {
674        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
675        let result = svc.oneshot(make_exchange()).await;
676        assert!(result.is_ok());
677        assert!(!result.unwrap().has_error());
678    }
679
680    #[tokio::test]
681    async fn test_error_goes_to_dlc() {
682        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
683        let received_clone = Arc::clone(&received);
684        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
685            let r = Arc::clone(&received_clone);
686            Box::pin(async move {
687                r.lock().unwrap().push(ex.clone());
688                Ok(ex)
689            })
690        });
691
692        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
693        let result = svc.oneshot(make_exchange()).await;
694        assert!(result.is_ok());
695        let ex = result.unwrap();
696        assert!(ex.has_error());
697        assert_eq!(received.lock().unwrap().len(), 1);
698    }
699
700    #[tokio::test]
701    async fn test_retry_recovers() {
702        let inner = fail_n_times(2);
703        let policy = ExceptionPolicy {
704            matches: Arc::new(|_| true),
705            retry: Some(RedeliveryPolicy {
706                max_attempts: 3,
707                initial_delay: Duration::from_millis(1),
708                multiplier: 1.0,
709                max_delay: Duration::from_millis(10),
710                jitter_factor: 0.0,
711            }),
712            handled_by: None,
713            on_steps: None,
714            disposition: ExceptionDisposition::Propagate,
715        };
716        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
717        let result = svc.oneshot(make_exchange()).await;
718        assert!(result.is_ok());
719        assert!(!result.unwrap().has_error());
720    }
721
722    #[tokio::test]
723    async fn test_retry_exhausted_goes_to_dlc() {
724        let inner = fail_n_times(10);
725        let received = Arc::new(std::sync::Mutex::new(0u32));
726        let received_clone = Arc::clone(&received);
727        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
728            let r = Arc::clone(&received_clone);
729            Box::pin(async move {
730                *r.lock().unwrap() += 1;
731                Ok(ex)
732            })
733        });
734        let policy = ExceptionPolicy {
735            matches: Arc::new(|_| true),
736            retry: Some(RedeliveryPolicy {
737                max_attempts: 2,
738                initial_delay: Duration::from_millis(1),
739                multiplier: 1.0,
740                max_delay: Duration::from_millis(10),
741                jitter_factor: 0.0,
742            }),
743            handled_by: None,
744            on_steps: None,
745            disposition: ExceptionDisposition::Propagate,
746        };
747        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
748        let result = svc.oneshot(make_exchange()).await;
749        assert!(result.is_ok());
750        assert!(result.unwrap().has_error());
751        assert_eq!(*received.lock().unwrap(), 1);
752    }
753
754    #[test]
755    fn test_poll_ready_delegates_to_inner() {
756        use std::sync::atomic::AtomicBool;
757
758        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
759        #[derive(Clone)]
760        struct DelayedReadyService {
761            ready: Arc<AtomicBool>,
762        }
763
764        impl Service<Exchange> for DelayedReadyService {
765            type Response = Exchange;
766            type Error = CamelError;
767            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
768
769            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
770                if self.ready.fetch_or(true, Ordering::SeqCst) {
771                    // Already marked ready (second+ call) → Ready
772                    Poll::Ready(Ok(()))
773                } else {
774                    // First call → Pending, schedule a wake
775                    cx.waker().wake_by_ref();
776                    Poll::Pending
777                }
778            }
779
780            fn call(&mut self, ex: Exchange) -> Self::Future {
781                Box::pin(async move { Ok(ex) })
782            }
783        }
784
785        let waker = futures::task::noop_waker();
786        let mut cx = Context::from_waker(&waker);
787
788        let inner = DelayedReadyService {
789            ready: Arc::new(AtomicBool::new(false)),
790        };
791        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
792
793        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
794        let first = Pin::new(&mut svc).poll_ready(&mut cx);
795        assert!(first.is_pending(), "expected Pending on first poll_ready");
796
797        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
798        let second = Pin::new(&mut svc).poll_ready(&mut cx);
799        assert!(second.is_ready(), "expected Ready on second poll_ready");
800    }
801
802    #[tokio::test]
803    async fn test_no_matching_policy_uses_dlc() {
804        let received = Arc::new(std::sync::Mutex::new(0u32));
805        let received_clone = Arc::clone(&received);
806        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
807            let r = Arc::clone(&received_clone);
808            Box::pin(async move {
809                *r.lock().unwrap() += 1;
810                Ok(ex)
811            })
812        });
813        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
814        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
815        let result = svc.oneshot(make_exchange()).await;
816        assert!(result.is_ok());
817        assert_eq!(*received.lock().unwrap(), 1);
818    }
819
820    #[tokio::test]
821    async fn test_redelivery_headers_are_set() {
822        use camel_api::error_handler::{
823            HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
824            RedeliveryPolicy,
825        };
826
827        let inner = fail_n_times(10);
828        let received = Arc::new(std::sync::Mutex::new(None));
829        let received_clone = Arc::clone(&received);
830        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
831            let r = Arc::clone(&received_clone);
832            Box::pin(async move {
833                *r.lock().unwrap() = Some(ex.clone());
834                Ok(ex)
835            })
836        });
837
838        let policy = ExceptionPolicy {
839            matches: Arc::new(|_| true),
840            retry: Some(RedeliveryPolicy {
841                max_attempts: 2,
842                initial_delay: Duration::from_millis(1),
843                multiplier: 1.0,
844                max_delay: Duration::from_millis(10),
845                jitter_factor: 0.0,
846            }),
847            handled_by: None,
848            on_steps: None,
849            disposition: ExceptionDisposition::Propagate,
850        };
851
852        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
853        let _ = svc.oneshot(make_exchange()).await.unwrap();
854
855        let ex = received.lock().unwrap().take().unwrap();
856        assert_eq!(
857            ex.input.header(HEADER_REDELIVERED),
858            Some(&Value::Bool(true))
859        );
860        assert_eq!(
861            ex.input.header(HEADER_REDELIVERY_COUNTER),
862            Some(&Value::Number(2.into()))
863        );
864        assert_eq!(
865            ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
866            Some(&Value::Number(2.into()))
867        );
868    }
869
870    #[tokio::test]
871    async fn test_jitter_produces_varying_delays_in_retry_flow() {
872        use std::time::Instant;
873
874        let inner = fail_n_times(10);
875        let received = Arc::new(std::sync::Mutex::new(None));
876        let received_clone = Arc::clone(&received);
877        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
878            let r = Arc::clone(&received_clone);
879            Box::pin(async move {
880                *r.lock().unwrap() = Some(ex.clone());
881                Ok(ex)
882            })
883        });
884
885        let policy = ExceptionPolicy {
886            matches: Arc::new(|_| true),
887            retry: Some(RedeliveryPolicy {
888                max_attempts: 5,
889                initial_delay: Duration::from_millis(20),
890                multiplier: 1.0,
891                max_delay: Duration::from_millis(100),
892                jitter_factor: 0.5,
893            }),
894            handled_by: None,
895            on_steps: None,
896            disposition: ExceptionDisposition::Propagate,
897        };
898
899        let start = Instant::now();
900        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
901        let _ = svc.oneshot(make_exchange()).await.unwrap();
902        let elapsed = start.elapsed();
903
904        assert!(
905            received.lock().unwrap().is_some(),
906            "DLC should have received exchange"
907        );
908
909        assert!(
910            elapsed >= Duration::from_millis(50),
911            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
912        );
913
914        assert!(
915            elapsed <= Duration::from_millis(500),
916            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
917        );
918    }
919
920    // Stopped is a control-flow sentinel, not a real error.
921    // ErrorHandlerService must pass it through without retrying or forwarding to DLC.
922    #[tokio::test]
923    async fn test_stopped_bypasses_error_handler() {
924        let stopped_inner =
925            BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
926
927        // DLC that tracks if it was ever called.
928        let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
929        let dlc_called_clone = Arc::clone(&dlc_called);
930        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
931            dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
932            Box::pin(async move { Ok(ex) })
933        });
934
935        let policy = ExceptionPolicy::new(|_| true); // matches everything
936        let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
937        let result = svc.oneshot(make_exchange()).await;
938
939        // Must propagate Err(Stopped) — not absorb it.
940        assert!(
941            matches!(result, Err(CamelError::Stopped)),
942            "expected Err(Stopped), got: {:?}",
943            result
944        );
945        // DLC must NOT have been called.
946        assert!(
947            !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
948            "DLC should not be called for Stopped"
949        );
950    }
951
952    #[tokio::test]
953    async fn test_on_steps_handled_true_consumes_error() {
954        use tower::ServiceExt;
955
956        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
957            ex.input.body = camel_api::Body::Bytes("handled".into());
958            async move { Ok(ex) }
959        }));
960        let policy = ExceptionPolicy {
961            matches: Arc::new(|_| true),
962            retry: None,
963            handled_by: None,
964            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
965            disposition: ExceptionDisposition::Handled,
966        };
967        let inner = tower::service_fn(|_ex: Exchange| async {
968            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
969        });
970        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
971        let ex = Exchange::default();
972        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
973        assert!(result.error.is_none(), "handled:true should clear error");
974        assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
975    }
976
977    #[tokio::test]
978    async fn test_on_steps_handled_false_propagates_error() {
979        use tower::ServiceExt;
980
981        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
982            ex.input.body = camel_api::Body::Bytes("handled".into());
983            async move { Ok(ex) }
984        }));
985        let policy = ExceptionPolicy {
986            matches: Arc::new(|_| true),
987            retry: None,
988            handled_by: None,
989            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
990            disposition: ExceptionDisposition::Propagate,
991        };
992        let inner = tower::service_fn(|_ex: Exchange| async {
993            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
994        });
995        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
996        let ex = Exchange::default();
997        let result = svc.ready().await.unwrap().call(ex).await;
998        assert!(result.is_err(), "handled:false should propagate error");
999    }
1000
1001    // --- Readiness error capture tests ---
1002    //
1003    // ErrorHandlerService must capture readiness errors (poll_ready returning Err)
1004    // and route them through retry/onException/DLC instead of propagating raw.
1005
1006    /// A service whose `poll_ready` always returns `Ready(Err(...))` but whose
1007    /// `call` returns `Ok`. This simulates a permanently-not-ready endpoint.
1008    #[derive(Clone)]
1009    struct ReadinessFailService {
1010        error: CamelError,
1011    }
1012
1013    impl ReadinessFailService {
1014        fn new(error: CamelError) -> Self {
1015            Self { error }
1016        }
1017    }
1018
1019    impl Service<Exchange> for ReadinessFailService {
1020        type Response = Exchange;
1021        type Error = CamelError;
1022        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1023
1024        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1025            Poll::Ready(Err(self.error.clone()))
1026        }
1027
1028        fn call(&mut self, ex: Exchange) -> Self::Future {
1029            // call() should never be reached if poll_ready returns Err,
1030            // but Tower's ready().await on a clone will re-encounter the readiness error.
1031            Box::pin(async move { Ok(ex) })
1032        }
1033    }
1034
1035    #[tokio::test]
1036    async fn test_readiness_error_goes_to_dlc() {
1037        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1038        let inner = ReadinessFailService {
1039            error: readiness_err,
1040        };
1041
1042        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
1043        let received_clone = Arc::clone(&received);
1044        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1045            let r = Arc::clone(&received_clone);
1046            Box::pin(async move {
1047                r.lock().unwrap().push(ex.clone());
1048                Ok(ex)
1049            })
1050        });
1051
1052        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1053        let result = svc.oneshot(make_exchange()).await;
1054
1055        // The error must be absorbed (Ok), not propagated raw (Err).
1056        assert!(
1057            result.is_ok(),
1058            "readiness error should be captured and sent to DLC, got: {:?}",
1059            result
1060        );
1061        let ex = result.unwrap();
1062        assert!(ex.has_error(), "exchange should carry the readiness error");
1063        assert_eq!(
1064            received.lock().unwrap().len(),
1065            1,
1066            "DLC should have received the exchange exactly once"
1067        );
1068    }
1069
1070    #[tokio::test]
1071    async fn test_readiness_error_goes_to_matching_policy() {
1072        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1073        let inner = ReadinessFailService {
1074            error: readiness_err,
1075        };
1076
1077        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1078            ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1079            async move { Ok(ex) }
1080        }));
1081        let policy = ExceptionPolicy {
1082            matches: Arc::new(|_| true),
1083            retry: None,
1084            handled_by: None,
1085            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1086            disposition: ExceptionDisposition::Handled,
1087        };
1088
1089        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1090        let result = svc.oneshot(make_exchange()).await;
1091
1092        // The error must be absorbed and routed to the on_steps handler.
1093        assert!(
1094            result.is_ok(),
1095            "readiness error should be captured by policy, got: {:?}",
1096            result
1097        );
1098        let ex = result.unwrap();
1099        assert!(ex.error.is_none(), "handled:true should clear error");
1100        assert!(
1101            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1102            "on_steps should have modified the body"
1103        );
1104    }
1105
1106    #[test]
1107    fn test_poll_ready_converts_readiness_error_to_ok() {
1108        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1109        let inner = ReadinessFailService {
1110            error: readiness_err,
1111        };
1112        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1113
1114        let waker = futures::task::noop_waker();
1115        let mut cx = Context::from_waker(&waker);
1116
1117        // poll_ready must NOT propagate the readiness error — convert to Ok.
1118        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1119        match poll {
1120            Poll::Ready(Ok(())) => { /* correct */ }
1121            Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1122            Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1123        }
1124    }
1125
1126    // --- invoke_processor tests ---
1127
1128    #[tokio::test]
1129    async fn test_invoke_processor_returns_ok_on_success() {
1130        let mut svc = ok_processor();
1131        let ex = make_exchange();
1132        let result = invoke_processor(&mut svc, ex).await;
1133        assert!(result.is_ok());
1134    }
1135
1136    #[tokio::test]
1137    async fn test_invoke_processor_captures_readiness_error() {
1138        let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1139            CamelError::ProcessorError("not ready".into()),
1140        ));
1141        let ex = make_exchange();
1142        let result = invoke_processor(&mut failing_ready, ex).await;
1143        assert!(result.is_err());
1144    }
1145
1146    #[tokio::test]
1147    async fn test_on_steps_handled_true_clears_exception_properties() {
1148        use tower::ServiceExt;
1149
1150        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1151            ex.input.body = camel_api::Body::Bytes("handled".into());
1152            async move { Ok(ex) }
1153        }));
1154        let policy = ExceptionPolicy {
1155            matches: Arc::new(|_| true),
1156            retry: None,
1157            handled_by: None,
1158            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1159            disposition: ExceptionDisposition::Handled,
1160        };
1161        let inner = tower::service_fn(|_ex: Exchange| async {
1162            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1163        });
1164        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1165        let ex = Exchange::default();
1166        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1167        assert!(result.error.is_none(), "handled:true should clear error");
1168        assert!(
1169            result
1170                .properties
1171                .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1172                .is_none(),
1173            "handled:true should clear exception properties"
1174        );
1175        assert!(
1176            result
1177                .properties
1178                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1179                .is_none(),
1180            "handled:true should clear exception kind property"
1181        );
1182        assert!(
1183            result
1184                .properties
1185                .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1186                .is_none(),
1187            "handled:true should clear exception caught property"
1188        );
1189    }
1190
1191    // ── DefaultRouteErrorHandler tests ──
1192
1193    #[test]
1194    fn test_match_policy_returns_id_for_matching_error() {
1195        let handler = DefaultRouteErrorHandler::new(
1196            None,
1197            vec![(
1198                ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1199                None,
1200            )],
1201        );
1202        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1203        assert_eq!(id, Some(PolicyId(0)));
1204    }
1205
1206    #[test]
1207    fn test_match_policy_returns_none_for_unmatched() {
1208        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1209        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1210        assert_eq!(id, None);
1211    }
1212
1213    // ── retry_step tests ──
1214
1215    #[tokio::test]
1216    async fn test_retry_step_succeeds_on_second_attempt() {
1217        let mut policy = ExceptionPolicy::new(|_| true);
1218        policy.retry = Some(RedeliveryPolicy::new(3));
1219        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1220        let mut step = fail_n_times(1); // fails once, then succeeds
1221        let ex = make_exchange();
1222        let outcome = handler
1223            .retry_step(
1224                Some(PolicyId(0)),
1225                &mut step,
1226                ex,
1227                CamelError::ProcessorError("attempt 0".into()),
1228            )
1229            .await;
1230        assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1231    }
1232
1233    #[tokio::test]
1234    async fn test_retry_step_exhausted_when_all_fail() {
1235        let mut policy = ExceptionPolicy::new(|_| true);
1236        policy.retry = Some(RedeliveryPolicy::new(3));
1237        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1238        let mut step = failing_processor();
1239        let ex = make_exchange();
1240        let outcome = handler
1241            .retry_step(
1242                Some(PolicyId(0)),
1243                &mut step,
1244                ex,
1245                CamelError::ProcessorError("boom".into()),
1246            )
1247            .await;
1248        assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1249    }
1250
1251    #[tokio::test]
1252    async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1253        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1254        let mut step = ok_processor();
1255        let ex = make_exchange();
1256        let outcome = handler
1257            .retry_step(
1258                None,
1259                &mut step,
1260                ex,
1261                CamelError::ProcessorError("boom".into()),
1262            )
1263            .await;
1264        assert!(matches!(
1265            outcome,
1266            RetryOutcome::Exhausted { policy: None, .. }
1267        ));
1268    }
1269
1270    // ── handle_step tests ──
1271
1272    #[tokio::test]
1273    async fn test_handle_step_propagate_sends_to_dlc() {
1274        let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1275        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1276        let ex = make_exchange();
1277        let result = handler
1278            .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1279            .await;
1280        assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1281    }
1282
1283    #[tokio::test]
1284    async fn test_handle_step_handled_uses_handler_output() {
1285        let handler_producer = BoxProcessor::from_fn(|mut ex| {
1286            Box::pin(async move {
1287                ex.input.set_header("processed_by", Value::Bool(true));
1288                Ok(ex)
1289            })
1290        });
1291        let policy = ExceptionPolicy {
1292            matches: std::sync::Arc::new(|_| true),
1293            retry: None,
1294            handled_by: None,
1295            on_steps: None,
1296            disposition: ExceptionDisposition::Handled,
1297        };
1298        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1299        let mut ex = make_exchange();
1300        ex.set_error(CamelError::ProcessorError("boom".into()));
1301        let result = handler
1302            .handle_step(
1303                Some(PolicyId(0)),
1304                ex,
1305                CamelError::ProcessorError("boom".into()),
1306            )
1307            .await;
1308        match result {
1309            Ok(StepDisposition::Handled(ex)) => {
1310                assert!(!ex.has_error(), "error should be cleared");
1311                assert_eq!(
1312                    ex.input.header("processed_by"),
1313                    Some(&Value::Bool(true)),
1314                    "should use handler's output exchange"
1315                );
1316            }
1317            other => panic!("expected Handled, got {:?}", other.is_ok()),
1318        }
1319    }
1320
1321    #[tokio::test]
1322    async fn test_handle_step_continued_clears_error() {
1323        let policy = ExceptionPolicy {
1324            matches: std::sync::Arc::new(|_| true),
1325            retry: None,
1326            handled_by: None,
1327            on_steps: None,
1328            disposition: ExceptionDisposition::Continued,
1329        };
1330        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1331        let mut ex = make_exchange();
1332        ex.set_error(CamelError::ProcessorError("boom".into()));
1333        let result = handler
1334            .handle_step(
1335                Some(PolicyId(0)),
1336                ex,
1337                CamelError::ProcessorError("boom".into()),
1338            )
1339            .await;
1340        match result {
1341            Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1342            other => panic!("expected Continued, got {:?}", other.is_ok()),
1343        }
1344    }
1345
1346    #[tokio::test]
1347    async fn test_handle_step_stopped_propagates_immediately() {
1348        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1349        let ex = make_exchange();
1350        let result = handler.handle_step(None, ex, CamelError::Stopped).await;
1351        assert!(
1352            matches!(result, Ok(StepDisposition::Propagate(CamelError::Stopped))),
1353            "Stopped should propagate immediately"
1354        );
1355    }
1356
1357    #[tokio::test]
1358    async fn test_handle_step_with_on_steps_handled() {
1359        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1360            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1361            async move { Ok(ex) }
1362        }));
1363        let policy = ExceptionPolicy {
1364            matches: std::sync::Arc::new(|_| true),
1365            retry: None,
1366            handled_by: None,
1367            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1368            disposition: ExceptionDisposition::Handled,
1369        };
1370        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1371        let mut ex = make_exchange();
1372        ex.set_error(CamelError::ProcessorError("boom".into()));
1373        let result = handler
1374            .handle_step(
1375                Some(PolicyId(0)),
1376                ex,
1377                CamelError::ProcessorError("boom".into()),
1378            )
1379            .await;
1380        match result {
1381            Ok(StepDisposition::Handled(ex)) => {
1382                assert!(!ex.has_error(), "error should be cleared");
1383                assert!(
1384                    matches!(ex.input.body, camel_api::Body::Bytes(_)),
1385                    "on_steps should have modified the body"
1386                );
1387            }
1388            other => panic!("expected Handled, got: {}", other.is_ok()),
1389        }
1390    }
1391
1392    #[tokio::test]
1393    async fn test_handle_step_with_on_steps_propagate_falls_through() {
1394        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1395            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1396            async move { Ok(ex) }
1397        }));
1398        let dlc_called = Arc::new(AtomicU32::new(0));
1399        let dlc_called_clone = dlc_called.clone();
1400        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1401            let c = dlc_called_clone.clone();
1402            Box::pin(async move {
1403                c.fetch_add(1, Ordering::SeqCst);
1404                Ok(ex)
1405            })
1406        });
1407        let policy = ExceptionPolicy {
1408            matches: std::sync::Arc::new(|_| true),
1409            retry: None,
1410            handled_by: None,
1411            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1412            disposition: ExceptionDisposition::Propagate,
1413        };
1414        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1415        let mut ex = make_exchange();
1416        ex.set_error(CamelError::ProcessorError("boom".into()));
1417        let result = handler
1418            .handle_step(
1419                Some(PolicyId(0)),
1420                ex,
1421                CamelError::ProcessorError("boom".into()),
1422            )
1423            .await;
1424        assert!(
1425            matches!(result, Ok(StepDisposition::Propagate(_))),
1426            "Propagate disposition should return Propagate"
1427        );
1428        assert_eq!(
1429            dlc_called.load(Ordering::SeqCst),
1430            1,
1431            "DLC should be called when on_steps disposition is Propagate"
1432        );
1433    }
1434
1435    #[tokio::test]
1436    async fn test_handle_step_dlc_failure_propagates() {
1437        let failing_dlc = BoxProcessor::from_fn(|_| {
1438            Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1439        });
1440        let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1441        let ex = make_exchange();
1442        let result = handler
1443            .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1444            .await;
1445        assert!(
1446            matches!(result, Ok(StepDisposition::Propagate(_))),
1447            "DLC failure should still return Propagate with original error"
1448        );
1449    }
1450
1451    // ── handle_boundary tests ──
1452
1453    #[tokio::test]
1454    async fn test_handle_boundary_security_error_goes_to_dlc() {
1455        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1456        let count_clone = dlc_count.clone();
1457        let dlc = BoxProcessor::from_fn(move |ex| {
1458            let c = count_clone.clone();
1459            Box::pin(async move {
1460                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1461                Ok(ex)
1462            })
1463        });
1464        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1465        let ex = make_exchange();
1466        let result = handler
1467            .handle_boundary(
1468                BoundaryKind::Security,
1469                ex,
1470                CamelError::Unauthorized("denied".into()),
1471            )
1472            .await;
1473        assert!(result.is_ok());
1474        assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1475    }
1476
1477    #[tokio::test]
1478    async fn test_handle_boundary_handled_clears_error() {
1479        let policy = ExceptionPolicy {
1480            matches: std::sync::Arc::new(|_| true),
1481            retry: None,
1482            handled_by: None,
1483            on_steps: None,
1484            disposition: ExceptionDisposition::Handled,
1485        };
1486        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1487        let ex = make_exchange();
1488        let result = handler
1489            .handle_boundary(
1490                BoundaryKind::Security,
1491                ex,
1492                CamelError::Unauthorized("denied".into()),
1493            )
1494            .await;
1495        assert!(result.is_ok());
1496        assert!(
1497            !result.unwrap().has_error(),
1498            "Handled disposition should clear error"
1499        );
1500    }
1501
1502    #[tokio::test]
1503    async fn test_handle_boundary_propagate_preserves_error() {
1504        let policy = ExceptionPolicy {
1505            matches: std::sync::Arc::new(|_| true),
1506            retry: None,
1507            handled_by: None,
1508            on_steps: None,
1509            disposition: ExceptionDisposition::Propagate,
1510        };
1511        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1512        let ex = make_exchange();
1513        let result = handler
1514            .handle_boundary(
1515                BoundaryKind::CircuitBreaker,
1516                ex,
1517                CamelError::CircuitOpen("open".into()),
1518            )
1519            .await;
1520        assert!(result.is_ok(), "boundary errors always return Ok");
1521        assert!(
1522            result.unwrap().has_error(),
1523            "Propagate disposition should preserve error"
1524        );
1525    }
1526
1527    #[tokio::test]
1528    async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1529        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1530        let count_clone = dlc_count.clone();
1531        let dlc = BoxProcessor::from_fn(move |ex| {
1532            let c = count_clone.clone();
1533            Box::pin(async move {
1534                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1535                Ok(ex)
1536            })
1537        });
1538        let policy = ExceptionPolicy {
1539            matches: std::sync::Arc::new(|_| true),
1540            retry: None,
1541            handled_by: None,
1542            on_steps: None,
1543            disposition: ExceptionDisposition::Continued,
1544        };
1545        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1546        let ex = make_exchange();
1547        let result = handler
1548            .handle_boundary(
1549                BoundaryKind::Security,
1550                ex,
1551                CamelError::Unauthorized("denied".into()),
1552            )
1553            .await;
1554        assert!(result.is_ok(), "boundary errors always return Ok");
1555        assert!(
1556            result.unwrap().has_error(),
1557            "Continued at boundary should preserve error"
1558        );
1559        assert_eq!(
1560            dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1561            1,
1562            "DLC should be called"
1563        );
1564    }
1565
1566    #[tokio::test]
1567    async fn test_handle_boundary_with_on_steps_handled() {
1568        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1569            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1570            async move { Ok(ex) }
1571        }));
1572        let policy = ExceptionPolicy {
1573            matches: std::sync::Arc::new(|_| true),
1574            retry: None,
1575            handled_by: None,
1576            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1577            disposition: ExceptionDisposition::Handled,
1578        };
1579        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1580        let ex = make_exchange();
1581        let result = handler
1582            .handle_boundary(
1583                BoundaryKind::Security,
1584                ex,
1585                CamelError::Unauthorized("denied".into()),
1586            )
1587            .await;
1588        assert!(result.is_ok(), "boundary errors always return Ok");
1589        let ex = result.unwrap();
1590        assert!(!ex.has_error(), "Handled disposition should clear error");
1591        assert!(
1592            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1593            "on_steps should have modified the body"
1594        );
1595    }
1596}