Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8    BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9    HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10    StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15    original: Exchange,
16    original_err: CamelError,
17    on_steps: &SyncBoxProcessor,
18    disposition: ExceptionDisposition,
19    handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21    let snapshot = original.clone();
22    let mut ex = original;
23    ex.set_error(original_err.clone());
24    let mut pipeline = on_steps.clone_inner();
25    let step_result = async {
26        let svc = pipeline.ready().await?;
27        svc.call(ex).await
28    }
29    .await;
30
31    match step_result {
32        Ok(mut ex) => {
33            if disposition == ExceptionDisposition::Handled {
34                ex.handle_error();
35                Ok(ex)
36            } else {
37                // Propagate or Continued — steps execute for side-effects (e.g. logging) but
38                // the modified exchange is discarded and the original error propagated.
39                Err(original_err)
40            }
41        }
42        Err(_) => {
43            tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
44            let mut ex = snapshot;
45            ex.set_error(original_err);
46            send_to_handler(ex, handler).await
47        }
48    }
49}
50
51/// Invoke a processor: readiness check + call, unified into a single Result.
52///
53/// Readiness errors and call errors are both returned as `Err(CamelError)`,
54/// allowing the pipeline's recovery loop to handle them uniformly.
55pub async fn invoke_processor(
56    svc: &mut BoxProcessor,
57    ex: Exchange,
58) -> Result<Exchange, CamelError> {
59    match svc.ready().await {
60        Ok(ready) => ready.call(ex).await,
61        Err(err) => Err(err),
62    }
63}
64
65/// Route-level error handler owning ALL error handling logic.
66///
67/// Single owner of DLC, retry, onException policies. Called from
68/// `RouteChannelService` (boundary errors) and `run_steps` (step errors).
69#[async_trait::async_trait]
70pub trait RouteErrorHandler: Send + Sync {
71    /// Match a policy for the given error. Called once before retry.
72    fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
73
74    /// Phase 1: Retry the failed step.
75    async fn retry_step(
76        &self,
77        policy: Option<PolicyId>,
78        step: &mut BoxProcessor,
79        original: Exchange,
80        error: CamelError,
81    ) -> RetryOutcome;
82
83    /// Phase 2: Determine step disposition after retry exhaustion.
84    async fn handle_step(
85        &self,
86        policy: Option<PolicyId>,
87        exchange: Exchange,
88        error: CamelError,
89    ) -> Result<StepDisposition, CamelError>;
90
91    /// Handle boundary (infrastructure) errors.
92    async fn handle_boundary(
93        &self,
94        kind: BoundaryKind,
95        exchange: Exchange,
96        error: CamelError,
97    ) -> Result<Exchange, CamelError>;
98}
99
100/// Default implementation of RouteErrorHandler.
101/// Owns DLC producer exclusively. Encapsulates retry/onException/DLC logic.
102///
103/// Uses `SyncBoxProcessor` internally so the handler is `Send + Sync` as required
104/// by the `RouteErrorHandler` trait.
105pub struct DefaultRouteErrorHandler {
106    pub(crate) dlc_producer: Option<SyncBoxProcessor>,
107    pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
108}
109
110impl DefaultRouteErrorHandler {
111    pub fn new(
112        dlc_producer: Option<BoxProcessor>,
113        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
114    ) -> Self {
115        Self {
116            dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
117            policies: policies
118                .into_iter()
119                .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
120                .collect(),
121        }
122    }
123
124    /// Resolve (disposition, producer) for a matched policy.
125    /// Shared by handle_step and handle_boundary.
126    fn resolve_producer(
127        &self,
128        policy: Option<PolicyId>,
129    ) -> (ExceptionDisposition, Option<BoxProcessor>) {
130        match policy {
131            Some(PolicyId(idx)) => match self.policies.get(idx) {
132                Some((p, prod)) => (
133                    p.disposition,
134                    prod.as_ref()
135                        .map(|p| p.clone_inner())
136                        .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
137                ),
138                None => (
139                    ExceptionDisposition::Propagate,
140                    self.dlc_producer.as_ref().map(|p| p.clone_inner()),
141                ),
142            },
143            None => (
144                ExceptionDisposition::Propagate,
145                self.dlc_producer.as_ref().map(|p| p.clone_inner()),
146            ),
147        }
148    }
149}
150
151#[async_trait::async_trait]
152impl RouteErrorHandler for DefaultRouteErrorHandler {
153    fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
154        self.policies
155            .iter()
156            .position(|(p, _)| (p.matches)(err))
157            .map(PolicyId)
158    }
159
160    async fn retry_step(
161        &self,
162        policy: Option<PolicyId>,
163        step: &mut BoxProcessor,
164        original: Exchange,
165        error: CamelError,
166    ) -> RetryOutcome {
167        let Some(PolicyId(idx)) = policy else {
168            return RetryOutcome::Exhausted {
169                exchange: original,
170                error,
171                policy: None,
172            };
173        };
174        let Some((policy_def, _)) = self.policies.get(idx) else {
175            return RetryOutcome::Exhausted {
176                exchange: original,
177                error,
178                policy,
179            };
180        };
181        let Some(ref backoff) = policy_def.retry else {
182            return RetryOutcome::Exhausted {
183                exchange: original,
184                error,
185                policy,
186            };
187        };
188
189        for attempt in 0..backoff.max_attempts {
190            let delay = backoff.delay_for(attempt);
191            tokio::time::sleep(delay).await;
192
193            let mut ex = original.clone();
194            ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
195            ex.input.set_header(
196                HEADER_REDELIVERY_COUNTER,
197                Value::Number((attempt + 1).into()),
198            );
199            ex.input.set_header(
200                HEADER_REDELIVERY_MAX_COUNTER,
201                Value::Number(backoff.max_attempts.into()),
202            );
203
204            match invoke_processor(step, ex).await {
205                Ok(exchange) => return RetryOutcome::Recovered(exchange),
206                Err(retry_err) => {
207                    if attempt + 1 == backoff.max_attempts {
208                        let mut final_ex = original;
209                        final_ex
210                            .input
211                            .set_header(HEADER_REDELIVERED, Value::Bool(true));
212                        final_ex.input.set_header(
213                            HEADER_REDELIVERY_COUNTER,
214                            Value::Number(backoff.max_attempts.into()),
215                        );
216                        final_ex.input.set_header(
217                            HEADER_REDELIVERY_MAX_COUNTER,
218                            Value::Number(backoff.max_attempts.into()),
219                        );
220                        return RetryOutcome::Exhausted {
221                            exchange: final_ex,
222                            error: retry_err,
223                            policy,
224                        };
225                    }
226                }
227            }
228        }
229
230        RetryOutcome::Exhausted {
231            exchange: original,
232            error,
233            policy,
234        }
235    }
236
237    async fn handle_step(
238        &self,
239        policy: Option<PolicyId>,
240        mut exchange: Exchange,
241        error: CamelError,
242    ) -> Result<StepDisposition, CamelError> {
243        let (disposition, producer) = self.resolve_producer(policy);
244
245        // Run on_steps if present (using the SAME policy identified by PolicyId)
246        if let Some(PolicyId(idx)) = policy
247            && let Some((p, _)) = self.policies.get(idx)
248            && let Some(ref steps) = p.on_steps
249        {
250            let snapshot = exchange.clone();
251            exchange.set_error(error.clone());
252            let mut step_pipeline = steps.clone_inner();
253            let step_result = async {
254                let svc = step_pipeline.ready().await?;
255                svc.call(exchange).await
256            }
257            .await;
258            match step_result {
259                Ok(mut ex) => match disposition {
260                    ExceptionDisposition::Handled => {
261                        ex.handle_error();
262                        return Ok(StepDisposition::Handled(ex));
263                    }
264                    ExceptionDisposition::Continued => {
265                        ex.clear_error();
266                        return Ok(StepDisposition::Continued(ex));
267                    }
268                    ExceptionDisposition::Propagate => {
269                        exchange = snapshot;
270                    }
271                },
272                Err(_) => {
273                    exchange = snapshot;
274                }
275            }
276        }
277
278        // No on_steps, on_steps failed, or Propagate — forward to DLC/handler.
279        // BIND the returned exchange (must use handler output).
280        exchange.set_error(error.clone());
281        match send_to_handler(exchange, producer).await {
282            Ok(handler_ex) => match disposition {
283                ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
284                ExceptionDisposition::Handled => {
285                    let mut ex = handler_ex;
286                    ex.clear_error();
287                    Ok(StepDisposition::Handled(ex))
288                }
289                ExceptionDisposition::Continued => {
290                    let mut ex = handler_ex;
291                    ex.clear_error();
292                    Ok(StepDisposition::Continued(ex))
293                }
294            },
295            // Dead code by construction: send_to_handler always returns Ok.
296            Err(_) => Ok(StepDisposition::Propagate(error)),
297        }
298    }
299
300    async fn handle_boundary(
301        &self,
302        _kind: BoundaryKind,
303        mut exchange: Exchange,
304        error: CamelError,
305    ) -> Result<Exchange, CamelError> {
306        // Boundary errors: match policy, run on_steps, forward to DLC.
307        // Disposition mapping:
308        //   Handled → clear error, return Ok(exchange)
309        //   Propagate | Continued → forward to DLC, return Ok(exchange_with_error)
310        //   (Continued at boundary = Propagate — no next step to continue to)
311        let policy = self.match_policy(&error);
312        let (disposition, producer) = self.resolve_producer(policy);
313
314        // Run on_steps if present (shared logic with handle_step)
315        if let Some(PolicyId(idx)) = policy
316            && let Some((p, _)) = self.policies.get(idx)
317            && let Some(ref steps) = p.on_steps
318        {
319            let snapshot = exchange.clone();
320            exchange.set_error(error.clone());
321            let mut step_pipeline = steps.clone_inner();
322            let step_result = async {
323                let svc = step_pipeline.ready().await?;
324                svc.call(exchange).await
325            }
326            .await;
327            match step_result {
328                Ok(mut ex) => match disposition {
329                    ExceptionDisposition::Handled => {
330                        ex.handle_error();
331                        return Ok(ex);
332                    }
333                    ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
334                        exchange = snapshot;
335                    }
336                },
337                Err(_) => {
338                    exchange = snapshot;
339                }
340            }
341        }
342
343        // Forward to DLC/handler — BIND returned exchange
344        exchange.set_error(error.clone());
345        match send_to_handler(exchange, producer).await {
346            Ok(handler_ex) => match disposition {
347                ExceptionDisposition::Handled => {
348                    let mut ex = handler_ex;
349                    ex.clear_error();
350                    Ok(ex)
351                }
352                ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
353                    let mut ex = handler_ex;
354                    ex.set_error(error);
355                    Ok(ex)
356                }
357            },
358            // Dead code by construction: send_to_handler always returns Ok.
359            Err(e) => Err(e),
360        }
361    }
362}
363
364/// Tower Layer that wraps a pipeline with error handling behaviour.
365///
366/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
367#[deprecated(
368    since = "0.16.0",
369    note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
370)]
371pub struct ErrorHandlerLayer {
372    /// Resolved DLC producer (None = log only).
373    dlc_producer: Option<BoxProcessor>,
374    /// Policies with their resolved `handled_by` producers.
375    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
376}
377
378#[allow(deprecated)]
379impl ErrorHandlerLayer {
380    /// Create the layer with pre-resolved producers.
381    pub fn new(
382        dlc_producer: Option<BoxProcessor>,
383        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
384    ) -> Self {
385        Self {
386            dlc_producer,
387            policies,
388        }
389    }
390}
391
392#[allow(deprecated)]
393impl<S> Layer<S> for ErrorHandlerLayer
394where
395    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
396    S::Future: Send + 'static,
397{
398    type Service = ErrorHandlerService<S>;
399
400    fn layer(&self, inner: S) -> Self::Service {
401        ErrorHandlerService {
402            inner,
403            dlc_producer: self.dlc_producer.clone(),
404            policies: self
405                .policies
406                .iter()
407                .map(|(p, prod)| (p.clone(), prod.clone()))
408                .collect(),
409        }
410    }
411}
412
413/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
414///
415/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
416/// `has_error() == true` if the pipeline ultimately failed.
417#[deprecated(
418    since = "0.16.0",
419    note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
420)]
421pub struct ErrorHandlerService<S> {
422    inner: S,
423    dlc_producer: Option<BoxProcessor>,
424    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
425}
426
427#[allow(deprecated)]
428impl<S: Clone> Clone for ErrorHandlerService<S> {
429    fn clone(&self) -> Self {
430        Self {
431            inner: self.inner.clone(),
432            dlc_producer: self.dlc_producer.clone(),
433            policies: self
434                .policies
435                .iter()
436                .map(|(p, prod)| (p.clone(), prod.clone()))
437                .collect(),
438        }
439    }
440}
441
442#[allow(deprecated)]
443impl<S> ErrorHandlerService<S>
444where
445    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
446    S::Future: Send + 'static,
447{
448    /// Create the service directly (used in unit tests; in production use the Layer).
449    pub fn new(
450        inner: S,
451        dlc_producer: Option<BoxProcessor>,
452        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
453    ) -> Self {
454        Self {
455            inner,
456            dlc_producer,
457            policies,
458        }
459    }
460}
461
462#[allow(deprecated)]
463impl<S> Service<Exchange> for ErrorHandlerService<S>
464where
465    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
466    S::Future: Send + 'static,
467{
468    type Response = Exchange;
469    type Error = CamelError;
470    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
471
472    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
473        // Preserve backpressure (Pending) but never leak readiness errors upward.
474        // Readiness errors are deferred to call(), where they go through the same
475        // retry/onException/DLC path as call() errors. This is safe because call()
476        // re-checks readiness on a fresh inner clone via inner.ready().await.
477        match self.inner.poll_ready(cx) {
478            Poll::Pending => Poll::Pending,
479            Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
480            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
481        }
482    }
483
484    fn call(&mut self, exchange: Exchange) -> Self::Future {
485        let mut inner = self.inner.clone();
486        let dlc = self.dlc_producer.clone();
487        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
488            .policies
489            .iter()
490            .map(|(p, prod)| (p.clone(), prod.clone()))
491            .collect();
492
493        Box::pin(async move {
494            let original = exchange.clone();
495            let result = match inner.ready().await {
496                Ok(svc) => svc.call(exchange).await,
497                Err(e) => Err(e), // readiness error — enters retry/DLC/onException path below
498            };
499
500            let err = match result {
501                Ok(ex) => return Ok(ex),
502                Err(e) => e,
503            };
504
505            // Find the first matching policy.
506            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
507
508            if let Some((policy, policy_producer)) = matched {
509                // Retry if configured.
510                if let Some(ref backoff) = policy.retry {
511                    for attempt in 0..backoff.max_attempts {
512                        let delay = backoff.delay_for(attempt);
513                        tokio::time::sleep(delay).await;
514
515                        // Set redelivery headers
516                        let mut ex = original.clone();
517                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
518                        ex.input.set_header(
519                            HEADER_REDELIVERY_COUNTER,
520                            Value::Number((attempt + 1).into()),
521                        );
522                        ex.input.set_header(
523                            HEADER_REDELIVERY_MAX_COUNTER,
524                            Value::Number(backoff.max_attempts.into()),
525                        );
526
527                        let result = match inner.ready().await {
528                            Ok(svc) => svc.call(ex).await,
529                            Err(e) => Err(e), // readiness error — enters retry exhaustion path
530                        };
531                        match result {
532                            Ok(ex) => return Ok(ex),
533                            Err(retry_err) => {
534                                if attempt + 1 == backoff.max_attempts {
535                                    // Retries exhausted — send to handler.
536                                    let mut original = original.clone();
537                                    original
538                                        .input
539                                        .set_header(HEADER_REDELIVERED, Value::Bool(true));
540                                    original.input.set_header(
541                                        HEADER_REDELIVERY_COUNTER,
542                                        Value::Number(backoff.max_attempts.into()),
543                                    );
544                                    original.input.set_header(
545                                        HEADER_REDELIVERY_MAX_COUNTER,
546                                        Value::Number(backoff.max_attempts.into()),
547                                    );
548                                    if let Some(ref steps) = policy.on_steps {
549                                        let handler = policy_producer.clone().or(dlc.clone());
550                                        return execute_on_steps(
551                                            original,
552                                            retry_err,
553                                            steps,
554                                            policy.disposition,
555                                            handler,
556                                        )
557                                        .await;
558                                    }
559                                    original.set_error(retry_err);
560                                    let handler = policy_producer.or(dlc);
561                                    return send_to_handler(original, handler).await;
562                                }
563                            }
564                        }
565                    }
566                }
567                // No retry configured (or 0 attempts) — send to policy handler or DLC.
568                if let Some(ref steps) = policy.on_steps {
569                    let handler = policy_producer.or(dlc);
570                    return execute_on_steps(original, err, steps, policy.disposition, handler)
571                        .await;
572                }
573                let mut ex = original.clone();
574                ex.set_error(err);
575                let handler = policy_producer.or(dlc);
576                send_to_handler(ex, handler).await
577            } else {
578                // No matching policy — forward directly to DLC.
579                let mut ex = original;
580                ex.set_error(err);
581                send_to_handler(ex, dlc).await
582            }
583        })
584    }
585}
586
587async fn send_to_handler(
588    exchange: Exchange,
589    producer: Option<BoxProcessor>,
590) -> Result<Exchange, CamelError> {
591    match producer {
592        None => {
593            // log-policy: system-broken
594            tracing::error!(
595                error = ?exchange.error,
596                "Exchange failed with no error handler configured"
597            );
598            Ok(exchange)
599        }
600        Some(mut prod) => match prod.ready().await {
601            Err(e) => {
602                // log-policy: system-broken
603                tracing::error!("DLC/handler not ready: {e}");
604                Ok(exchange)
605            }
606            Ok(svc) => match svc.call(exchange.clone()).await {
607                Ok(ex) => Ok(ex),
608                Err(e) => {
609                    // log-policy: system-broken
610                    tracing::error!("DLC/handler call failed: {e}");
611                    // Return the original exchange with original error intact.
612                    Ok(exchange)
613                }
614            },
615        },
616    }
617}
618
619#[cfg(test)]
620#[allow(deprecated)]
621mod tests {
622    use super::*;
623    use camel_api::{
624        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
625        error_handler::RedeliveryPolicy,
626    };
627    use std::sync::{
628        Arc,
629        atomic::{AtomicU32, Ordering},
630    };
631    use std::time::Duration;
632    use tower::ServiceExt;
633
634    fn make_exchange() -> Exchange {
635        Exchange::new(Message::new("test"))
636    }
637
638    fn failing_processor() -> BoxProcessor {
639        BoxProcessor::from_fn(|_ex| {
640            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
641        })
642    }
643
644    fn ok_processor() -> BoxProcessor {
645        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
646    }
647
648    fn fail_n_times(n: u32) -> BoxProcessor {
649        let count = Arc::new(AtomicU32::new(0));
650        BoxProcessor::from_fn(move |ex| {
651            let count = Arc::clone(&count);
652            Box::pin(async move {
653                let c = count.fetch_add(1, Ordering::SeqCst);
654                if c < n {
655                    Err(CamelError::ProcessorError(format!("attempt {c}")))
656                } else {
657                    Ok(ex)
658                }
659            })
660        })
661    }
662
663    #[tokio::test]
664    async fn test_ok_passthrough() {
665        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
666        let result = svc.oneshot(make_exchange()).await;
667        assert!(result.is_ok());
668        assert!(!result.unwrap().has_error());
669    }
670
671    #[tokio::test]
672    async fn test_error_goes_to_dlc() {
673        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
674        let received_clone = Arc::clone(&received);
675        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
676            let r = Arc::clone(&received_clone);
677            Box::pin(async move {
678                r.lock().unwrap().push(ex.clone());
679                Ok(ex)
680            })
681        });
682
683        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
684        let result = svc.oneshot(make_exchange()).await;
685        assert!(result.is_ok());
686        let ex = result.unwrap();
687        assert!(ex.has_error());
688        assert_eq!(received.lock().unwrap().len(), 1);
689    }
690
691    #[tokio::test]
692    async fn test_retry_recovers() {
693        let inner = fail_n_times(2);
694        let policy = ExceptionPolicy {
695            matches: Arc::new(|_| true),
696            retry: Some(RedeliveryPolicy {
697                max_attempts: 3,
698                initial_delay: Duration::from_millis(1),
699                multiplier: 1.0,
700                max_delay: Duration::from_millis(10),
701                jitter_factor: 0.0,
702            }),
703            handled_by: None,
704            on_steps: None,
705            disposition: ExceptionDisposition::Propagate,
706        };
707        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
708        let result = svc.oneshot(make_exchange()).await;
709        assert!(result.is_ok());
710        assert!(!result.unwrap().has_error());
711    }
712
713    #[tokio::test]
714    async fn test_retry_exhausted_goes_to_dlc() {
715        let inner = fail_n_times(10);
716        let received = Arc::new(std::sync::Mutex::new(0u32));
717        let received_clone = Arc::clone(&received);
718        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
719            let r = Arc::clone(&received_clone);
720            Box::pin(async move {
721                *r.lock().unwrap() += 1;
722                Ok(ex)
723            })
724        });
725        let policy = ExceptionPolicy {
726            matches: Arc::new(|_| true),
727            retry: Some(RedeliveryPolicy {
728                max_attempts: 2,
729                initial_delay: Duration::from_millis(1),
730                multiplier: 1.0,
731                max_delay: Duration::from_millis(10),
732                jitter_factor: 0.0,
733            }),
734            handled_by: None,
735            on_steps: None,
736            disposition: ExceptionDisposition::Propagate,
737        };
738        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
739        let result = svc.oneshot(make_exchange()).await;
740        assert!(result.is_ok());
741        assert!(result.unwrap().has_error());
742        assert_eq!(*received.lock().unwrap(), 1);
743    }
744
745    #[test]
746    fn test_poll_ready_delegates_to_inner() {
747        use std::sync::atomic::AtomicBool;
748
749        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
750        #[derive(Clone)]
751        struct DelayedReadyService {
752            ready: Arc<AtomicBool>,
753        }
754
755        impl Service<Exchange> for DelayedReadyService {
756            type Response = Exchange;
757            type Error = CamelError;
758            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
759
760            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
761                if self.ready.fetch_or(true, Ordering::SeqCst) {
762                    // Already marked ready (second+ call) → Ready
763                    Poll::Ready(Ok(()))
764                } else {
765                    // First call → Pending, schedule a wake
766                    cx.waker().wake_by_ref();
767                    Poll::Pending
768                }
769            }
770
771            fn call(&mut self, ex: Exchange) -> Self::Future {
772                Box::pin(async move { Ok(ex) })
773            }
774        }
775
776        let waker = futures::task::noop_waker();
777        let mut cx = Context::from_waker(&waker);
778
779        let inner = DelayedReadyService {
780            ready: Arc::new(AtomicBool::new(false)),
781        };
782        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
783
784        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
785        let first = Pin::new(&mut svc).poll_ready(&mut cx);
786        assert!(first.is_pending(), "expected Pending on first poll_ready");
787
788        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
789        let second = Pin::new(&mut svc).poll_ready(&mut cx);
790        assert!(second.is_ready(), "expected Ready on second poll_ready");
791    }
792
793    #[tokio::test]
794    async fn test_no_matching_policy_uses_dlc() {
795        let received = Arc::new(std::sync::Mutex::new(0u32));
796        let received_clone = Arc::clone(&received);
797        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
798            let r = Arc::clone(&received_clone);
799            Box::pin(async move {
800                *r.lock().unwrap() += 1;
801                Ok(ex)
802            })
803        });
804        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
805        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
806        let result = svc.oneshot(make_exchange()).await;
807        assert!(result.is_ok());
808        assert_eq!(*received.lock().unwrap(), 1);
809    }
810
811    #[tokio::test]
812    async fn test_redelivery_headers_are_set() {
813        use camel_api::error_handler::{
814            HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
815            RedeliveryPolicy,
816        };
817
818        let inner = fail_n_times(10);
819        let received = Arc::new(std::sync::Mutex::new(None));
820        let received_clone = Arc::clone(&received);
821        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
822            let r = Arc::clone(&received_clone);
823            Box::pin(async move {
824                *r.lock().unwrap() = Some(ex.clone());
825                Ok(ex)
826            })
827        });
828
829        let policy = ExceptionPolicy {
830            matches: Arc::new(|_| true),
831            retry: Some(RedeliveryPolicy {
832                max_attempts: 2,
833                initial_delay: Duration::from_millis(1),
834                multiplier: 1.0,
835                max_delay: Duration::from_millis(10),
836                jitter_factor: 0.0,
837            }),
838            handled_by: None,
839            on_steps: None,
840            disposition: ExceptionDisposition::Propagate,
841        };
842
843        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
844        let _ = svc.oneshot(make_exchange()).await.unwrap();
845
846        let ex = received.lock().unwrap().take().unwrap();
847        assert_eq!(
848            ex.input.header(HEADER_REDELIVERED),
849            Some(&Value::Bool(true))
850        );
851        assert_eq!(
852            ex.input.header(HEADER_REDELIVERY_COUNTER),
853            Some(&Value::Number(2.into()))
854        );
855        assert_eq!(
856            ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
857            Some(&Value::Number(2.into()))
858        );
859    }
860
861    #[tokio::test]
862    async fn test_jitter_produces_varying_delays_in_retry_flow() {
863        use std::time::Instant;
864
865        let inner = fail_n_times(10);
866        let received = Arc::new(std::sync::Mutex::new(None));
867        let received_clone = Arc::clone(&received);
868        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
869            let r = Arc::clone(&received_clone);
870            Box::pin(async move {
871                *r.lock().unwrap() = Some(ex.clone());
872                Ok(ex)
873            })
874        });
875
876        let policy = ExceptionPolicy {
877            matches: Arc::new(|_| true),
878            retry: Some(RedeliveryPolicy {
879                max_attempts: 5,
880                initial_delay: Duration::from_millis(20),
881                multiplier: 1.0,
882                max_delay: Duration::from_millis(100),
883                jitter_factor: 0.5,
884            }),
885            handled_by: None,
886            on_steps: None,
887            disposition: ExceptionDisposition::Propagate,
888        };
889
890        let start = Instant::now();
891        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
892        let _ = svc.oneshot(make_exchange()).await.unwrap();
893        let elapsed = start.elapsed();
894
895        assert!(
896            received.lock().unwrap().is_some(),
897            "DLC should have received exchange"
898        );
899
900        assert!(
901            elapsed >= Duration::from_millis(50),
902            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
903        );
904
905        assert!(
906            elapsed <= Duration::from_millis(500),
907            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
908        );
909    }
910
911    #[tokio::test]
912    async fn test_on_steps_handled_true_consumes_error() {
913        use tower::ServiceExt;
914
915        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
916            ex.input.body = camel_api::Body::Bytes("handled".into());
917            async move { Ok(ex) }
918        }));
919        let policy = ExceptionPolicy {
920            matches: Arc::new(|_| true),
921            retry: None,
922            handled_by: None,
923            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
924            disposition: ExceptionDisposition::Handled,
925        };
926        let inner = tower::service_fn(|_ex: Exchange| async {
927            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
928        });
929        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
930        let ex = Exchange::default();
931        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
932        assert!(result.error.is_none(), "handled:true should clear error");
933        assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
934    }
935
936    #[tokio::test]
937    async fn test_on_steps_handled_false_propagates_error() {
938        use tower::ServiceExt;
939
940        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
941            ex.input.body = camel_api::Body::Bytes("handled".into());
942            async move { Ok(ex) }
943        }));
944        let policy = ExceptionPolicy {
945            matches: Arc::new(|_| true),
946            retry: None,
947            handled_by: None,
948            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
949            disposition: ExceptionDisposition::Propagate,
950        };
951        let inner = tower::service_fn(|_ex: Exchange| async {
952            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
953        });
954        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
955        let ex = Exchange::default();
956        let result = svc.ready().await.unwrap().call(ex).await;
957        assert!(result.is_err(), "handled:false should propagate error");
958    }
959
960    // --- Readiness error capture tests ---
961    //
962    // ErrorHandlerService must capture readiness errors (poll_ready returning Err)
963    // and route them through retry/onException/DLC instead of propagating raw.
964
965    /// A service whose `poll_ready` always returns `Ready(Err(...))` but whose
966    /// `call` returns `Ok`. This simulates a permanently-not-ready endpoint.
967    #[derive(Clone)]
968    struct ReadinessFailService {
969        error: CamelError,
970    }
971
972    impl ReadinessFailService {
973        fn new(error: CamelError) -> Self {
974            Self { error }
975        }
976    }
977
978    impl Service<Exchange> for ReadinessFailService {
979        type Response = Exchange;
980        type Error = CamelError;
981        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
982
983        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
984            Poll::Ready(Err(self.error.clone()))
985        }
986
987        fn call(&mut self, ex: Exchange) -> Self::Future {
988            // call() should never be reached if poll_ready returns Err,
989            // but Tower's ready().await on a clone will re-encounter the readiness error.
990            Box::pin(async move { Ok(ex) })
991        }
992    }
993
994    #[tokio::test]
995    async fn test_readiness_error_goes_to_dlc() {
996        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
997        let inner = ReadinessFailService {
998            error: readiness_err,
999        };
1000
1001        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
1002        let received_clone = Arc::clone(&received);
1003        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1004            let r = Arc::clone(&received_clone);
1005            Box::pin(async move {
1006                r.lock().unwrap().push(ex.clone());
1007                Ok(ex)
1008            })
1009        });
1010
1011        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1012        let result = svc.oneshot(make_exchange()).await;
1013
1014        // The error must be absorbed (Ok), not propagated raw (Err).
1015        assert!(
1016            result.is_ok(),
1017            "readiness error should be captured and sent to DLC, got: {:?}",
1018            result
1019        );
1020        let ex = result.unwrap();
1021        assert!(ex.has_error(), "exchange should carry the readiness error");
1022        assert_eq!(
1023            received.lock().unwrap().len(),
1024            1,
1025            "DLC should have received the exchange exactly once"
1026        );
1027    }
1028
1029    #[tokio::test]
1030    async fn test_readiness_error_goes_to_matching_policy() {
1031        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1032        let inner = ReadinessFailService {
1033            error: readiness_err,
1034        };
1035
1036        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1037            ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1038            async move { Ok(ex) }
1039        }));
1040        let policy = ExceptionPolicy {
1041            matches: Arc::new(|_| true),
1042            retry: None,
1043            handled_by: None,
1044            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1045            disposition: ExceptionDisposition::Handled,
1046        };
1047
1048        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1049        let result = svc.oneshot(make_exchange()).await;
1050
1051        // The error must be absorbed and routed to the on_steps handler.
1052        assert!(
1053            result.is_ok(),
1054            "readiness error should be captured by policy, got: {:?}",
1055            result
1056        );
1057        let ex = result.unwrap();
1058        assert!(ex.error.is_none(), "handled:true should clear error");
1059        assert!(
1060            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1061            "on_steps should have modified the body"
1062        );
1063    }
1064
1065    #[test]
1066    fn test_poll_ready_converts_readiness_error_to_ok() {
1067        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1068        let inner = ReadinessFailService {
1069            error: readiness_err,
1070        };
1071        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1072
1073        let waker = futures::task::noop_waker();
1074        let mut cx = Context::from_waker(&waker);
1075
1076        // poll_ready must NOT propagate the readiness error — convert to Ok.
1077        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1078        match poll {
1079            Poll::Ready(Ok(())) => { /* correct */ }
1080            Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1081            Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1082        }
1083    }
1084
1085    // --- invoke_processor tests ---
1086
1087    #[tokio::test]
1088    async fn test_invoke_processor_returns_ok_on_success() {
1089        let mut svc = ok_processor();
1090        let ex = make_exchange();
1091        let result = invoke_processor(&mut svc, ex).await;
1092        assert!(result.is_ok());
1093    }
1094
1095    #[tokio::test]
1096    async fn test_invoke_processor_captures_readiness_error() {
1097        let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1098            CamelError::ProcessorError("not ready".into()),
1099        ));
1100        let ex = make_exchange();
1101        let result = invoke_processor(&mut failing_ready, ex).await;
1102        assert!(result.is_err());
1103    }
1104
1105    #[tokio::test]
1106    async fn test_on_steps_handled_true_clears_exception_properties() {
1107        use tower::ServiceExt;
1108
1109        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1110            ex.input.body = camel_api::Body::Bytes("handled".into());
1111            async move { Ok(ex) }
1112        }));
1113        let policy = ExceptionPolicy {
1114            matches: Arc::new(|_| true),
1115            retry: None,
1116            handled_by: None,
1117            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1118            disposition: ExceptionDisposition::Handled,
1119        };
1120        let inner = tower::service_fn(|_ex: Exchange| async {
1121            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1122        });
1123        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1124        let ex = Exchange::default();
1125        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1126        assert!(result.error.is_none(), "handled:true should clear error");
1127        assert!(
1128            result
1129                .properties
1130                .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1131                .is_none(),
1132            "handled:true should clear exception properties"
1133        );
1134        assert!(
1135            result
1136                .properties
1137                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1138                .is_none(),
1139            "handled:true should clear exception kind property"
1140        );
1141        assert!(
1142            result
1143                .properties
1144                .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1145                .is_none(),
1146            "handled:true should clear exception caught property"
1147        );
1148    }
1149
1150    // ── DefaultRouteErrorHandler tests ──
1151
1152    #[test]
1153    fn test_match_policy_returns_id_for_matching_error() {
1154        let handler = DefaultRouteErrorHandler::new(
1155            None,
1156            vec![(
1157                ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1158                None,
1159            )],
1160        );
1161        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1162        assert_eq!(id, Some(PolicyId(0)));
1163    }
1164
1165    #[test]
1166    fn test_match_policy_returns_none_for_unmatched() {
1167        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1168        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1169        assert_eq!(id, None);
1170    }
1171
1172    // ── retry_step tests ──
1173
1174    #[tokio::test]
1175    async fn test_retry_step_succeeds_on_second_attempt() {
1176        let mut policy = ExceptionPolicy::new(|_| true);
1177        policy.retry = Some(RedeliveryPolicy::new(3));
1178        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1179        let mut step = fail_n_times(1); // fails once, then succeeds
1180        let ex = make_exchange();
1181        let outcome = handler
1182            .retry_step(
1183                Some(PolicyId(0)),
1184                &mut step,
1185                ex,
1186                CamelError::ProcessorError("attempt 0".into()),
1187            )
1188            .await;
1189        assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1190    }
1191
1192    #[tokio::test]
1193    async fn test_retry_step_exhausted_when_all_fail() {
1194        let mut policy = ExceptionPolicy::new(|_| true);
1195        policy.retry = Some(RedeliveryPolicy::new(3));
1196        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1197        let mut step = failing_processor();
1198        let ex = make_exchange();
1199        let outcome = handler
1200            .retry_step(
1201                Some(PolicyId(0)),
1202                &mut step,
1203                ex,
1204                CamelError::ProcessorError("boom".into()),
1205            )
1206            .await;
1207        assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1208    }
1209
1210    #[tokio::test]
1211    async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1212        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1213        let mut step = ok_processor();
1214        let ex = make_exchange();
1215        let outcome = handler
1216            .retry_step(
1217                None,
1218                &mut step,
1219                ex,
1220                CamelError::ProcessorError("boom".into()),
1221            )
1222            .await;
1223        assert!(matches!(
1224            outcome,
1225            RetryOutcome::Exhausted { policy: None, .. }
1226        ));
1227    }
1228
1229    // ── handle_step tests ──
1230
1231    #[tokio::test]
1232    async fn test_handle_step_propagate_sends_to_dlc() {
1233        let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1234        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1235        let ex = make_exchange();
1236        let result = handler
1237            .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1238            .await;
1239        assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1240    }
1241
1242    #[tokio::test]
1243    async fn test_handle_step_handled_uses_handler_output() {
1244        let handler_producer = BoxProcessor::from_fn(|mut ex| {
1245            Box::pin(async move {
1246                ex.input.set_header("processed_by", Value::Bool(true));
1247                Ok(ex)
1248            })
1249        });
1250        let policy = ExceptionPolicy {
1251            matches: std::sync::Arc::new(|_| true),
1252            retry: None,
1253            handled_by: None,
1254            on_steps: None,
1255            disposition: ExceptionDisposition::Handled,
1256        };
1257        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1258        let mut ex = make_exchange();
1259        ex.set_error(CamelError::ProcessorError("boom".into()));
1260        let result = handler
1261            .handle_step(
1262                Some(PolicyId(0)),
1263                ex,
1264                CamelError::ProcessorError("boom".into()),
1265            )
1266            .await;
1267        match result {
1268            Ok(StepDisposition::Handled(ex)) => {
1269                assert!(!ex.has_error(), "error should be cleared");
1270                assert_eq!(
1271                    ex.input.header("processed_by"),
1272                    Some(&Value::Bool(true)),
1273                    "should use handler's output exchange"
1274                );
1275            }
1276            other => panic!("expected Handled, got {:?}", other.is_ok()),
1277        }
1278    }
1279
1280    #[tokio::test]
1281    async fn test_handle_step_continued_clears_error() {
1282        let policy = ExceptionPolicy {
1283            matches: std::sync::Arc::new(|_| true),
1284            retry: None,
1285            handled_by: None,
1286            on_steps: None,
1287            disposition: ExceptionDisposition::Continued,
1288        };
1289        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1290        let mut ex = make_exchange();
1291        ex.set_error(CamelError::ProcessorError("boom".into()));
1292        let result = handler
1293            .handle_step(
1294                Some(PolicyId(0)),
1295                ex,
1296                CamelError::ProcessorError("boom".into()),
1297            )
1298            .await;
1299        match result {
1300            Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1301            other => panic!("expected Continued, got {:?}", other.is_ok()),
1302        }
1303    }
1304
1305    #[tokio::test]
1306    async fn test_handle_step_with_on_steps_handled() {
1307        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1308            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1309            async move { Ok(ex) }
1310        }));
1311        let policy = ExceptionPolicy {
1312            matches: std::sync::Arc::new(|_| true),
1313            retry: None,
1314            handled_by: None,
1315            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1316            disposition: ExceptionDisposition::Handled,
1317        };
1318        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1319        let mut ex = make_exchange();
1320        ex.set_error(CamelError::ProcessorError("boom".into()));
1321        let result = handler
1322            .handle_step(
1323                Some(PolicyId(0)),
1324                ex,
1325                CamelError::ProcessorError("boom".into()),
1326            )
1327            .await;
1328        match result {
1329            Ok(StepDisposition::Handled(ex)) => {
1330                assert!(!ex.has_error(), "error should be cleared");
1331                assert!(
1332                    matches!(ex.input.body, camel_api::Body::Bytes(_)),
1333                    "on_steps should have modified the body"
1334                );
1335            }
1336            other => panic!("expected Handled, got: {}", other.is_ok()),
1337        }
1338    }
1339
1340    #[tokio::test]
1341    async fn test_handle_step_with_on_steps_propagate_falls_through() {
1342        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1343            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1344            async move { Ok(ex) }
1345        }));
1346        let dlc_called = Arc::new(AtomicU32::new(0));
1347        let dlc_called_clone = dlc_called.clone();
1348        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1349            let c = dlc_called_clone.clone();
1350            Box::pin(async move {
1351                c.fetch_add(1, Ordering::SeqCst);
1352                Ok(ex)
1353            })
1354        });
1355        let policy = ExceptionPolicy {
1356            matches: std::sync::Arc::new(|_| true),
1357            retry: None,
1358            handled_by: None,
1359            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1360            disposition: ExceptionDisposition::Propagate,
1361        };
1362        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1363        let mut ex = make_exchange();
1364        ex.set_error(CamelError::ProcessorError("boom".into()));
1365        let result = handler
1366            .handle_step(
1367                Some(PolicyId(0)),
1368                ex,
1369                CamelError::ProcessorError("boom".into()),
1370            )
1371            .await;
1372        assert!(
1373            matches!(result, Ok(StepDisposition::Propagate(_))),
1374            "Propagate disposition should return Propagate"
1375        );
1376        assert_eq!(
1377            dlc_called.load(Ordering::SeqCst),
1378            1,
1379            "DLC should be called when on_steps disposition is Propagate"
1380        );
1381    }
1382
1383    #[tokio::test]
1384    async fn test_handle_step_dlc_failure_propagates() {
1385        let failing_dlc = BoxProcessor::from_fn(|_| {
1386            Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1387        });
1388        let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1389        let ex = make_exchange();
1390        let result = handler
1391            .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1392            .await;
1393        assert!(
1394            matches!(result, Ok(StepDisposition::Propagate(_))),
1395            "DLC failure should still return Propagate with original error"
1396        );
1397    }
1398
1399    // ── handle_boundary tests ──
1400
1401    #[tokio::test]
1402    async fn test_handle_boundary_security_error_goes_to_dlc() {
1403        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1404        let count_clone = dlc_count.clone();
1405        let dlc = BoxProcessor::from_fn(move |ex| {
1406            let c = count_clone.clone();
1407            Box::pin(async move {
1408                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1409                Ok(ex)
1410            })
1411        });
1412        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1413        let ex = make_exchange();
1414        let result = handler
1415            .handle_boundary(
1416                BoundaryKind::Security,
1417                ex,
1418                CamelError::Unauthorized("denied".into()),
1419            )
1420            .await;
1421        assert!(result.is_ok());
1422        assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1423    }
1424
1425    #[tokio::test]
1426    async fn test_handle_boundary_handled_clears_error() {
1427        let policy = ExceptionPolicy {
1428            matches: std::sync::Arc::new(|_| true),
1429            retry: None,
1430            handled_by: None,
1431            on_steps: None,
1432            disposition: ExceptionDisposition::Handled,
1433        };
1434        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1435        let ex = make_exchange();
1436        let result = handler
1437            .handle_boundary(
1438                BoundaryKind::Security,
1439                ex,
1440                CamelError::Unauthorized("denied".into()),
1441            )
1442            .await;
1443        assert!(result.is_ok());
1444        assert!(
1445            !result.unwrap().has_error(),
1446            "Handled disposition should clear error"
1447        );
1448    }
1449
1450    #[tokio::test]
1451    async fn test_handle_boundary_propagate_preserves_error() {
1452        let policy = ExceptionPolicy {
1453            matches: std::sync::Arc::new(|_| true),
1454            retry: None,
1455            handled_by: None,
1456            on_steps: None,
1457            disposition: ExceptionDisposition::Propagate,
1458        };
1459        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1460        let ex = make_exchange();
1461        let result = handler
1462            .handle_boundary(
1463                BoundaryKind::CircuitBreaker,
1464                ex,
1465                CamelError::CircuitOpen("open".into()),
1466            )
1467            .await;
1468        assert!(result.is_ok(), "boundary errors always return Ok");
1469        assert!(
1470            result.unwrap().has_error(),
1471            "Propagate disposition should preserve error"
1472        );
1473    }
1474
1475    #[tokio::test]
1476    async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1477        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1478        let count_clone = dlc_count.clone();
1479        let dlc = BoxProcessor::from_fn(move |ex| {
1480            let c = count_clone.clone();
1481            Box::pin(async move {
1482                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1483                Ok(ex)
1484            })
1485        });
1486        let policy = ExceptionPolicy {
1487            matches: std::sync::Arc::new(|_| true),
1488            retry: None,
1489            handled_by: None,
1490            on_steps: None,
1491            disposition: ExceptionDisposition::Continued,
1492        };
1493        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1494        let ex = make_exchange();
1495        let result = handler
1496            .handle_boundary(
1497                BoundaryKind::Security,
1498                ex,
1499                CamelError::Unauthorized("denied".into()),
1500            )
1501            .await;
1502        assert!(result.is_ok(), "boundary errors always return Ok");
1503        assert!(
1504            result.unwrap().has_error(),
1505            "Continued at boundary should preserve error"
1506        );
1507        assert_eq!(
1508            dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1509            1,
1510            "DLC should be called"
1511        );
1512    }
1513
1514    #[tokio::test]
1515    async fn test_handle_boundary_with_on_steps_handled() {
1516        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1517            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1518            async move { Ok(ex) }
1519        }));
1520        let policy = ExceptionPolicy {
1521            matches: std::sync::Arc::new(|_| true),
1522            retry: None,
1523            handled_by: None,
1524            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1525            disposition: ExceptionDisposition::Handled,
1526        };
1527        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1528        let ex = make_exchange();
1529        let result = handler
1530            .handle_boundary(
1531                BoundaryKind::Security,
1532                ex,
1533                CamelError::Unauthorized("denied".into()),
1534            )
1535            .await;
1536        assert!(result.is_ok(), "boundary errors always return Ok");
1537        let ex = result.unwrap();
1538        assert!(!ex.has_error(), "Handled disposition should clear error");
1539        assert!(
1540            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1541            "on_steps should have modified the body"
1542        );
1543    }
1544}