Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8    BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9    HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10    StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15    original: Exchange,
16    original_err: CamelError,
17    on_steps: &SyncBoxProcessor,
18    disposition: ExceptionDisposition,
19    handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21    let snapshot = original.clone();
22    let mut ex = original;
23    ex.set_error(original_err.clone());
24    let mut pipeline = on_steps.clone_inner();
25    let step_result = async {
26        let svc = pipeline.ready().await?;
27        svc.call(ex).await
28    }
29    .await;
30
31    match step_result {
32        Ok(mut ex) => {
33            if disposition == ExceptionDisposition::Handled {
34                ex.handle_error();
35                Ok(ex)
36            } else {
37                // Propagate or Continued — steps execute for side-effects (e.g. logging) but
38                // the modified exchange is discarded and the original error propagated.
39                Err(original_err)
40            }
41        }
42        Err(_) => {
43            tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
44            let mut ex = snapshot;
45            ex.set_error(original_err);
46            send_to_handler(ex, handler).await
47        }
48    }
49}
50
51/// Invoke a processor: readiness check + call, unified into a single Result.
52///
53/// Readiness errors and call errors are both returned as `Err(CamelError)`,
54/// allowing the pipeline's recovery loop to handle them uniformly.
55pub async fn invoke_processor(
56    svc: &mut BoxProcessor,
57    ex: Exchange,
58) -> Result<Exchange, CamelError> {
59    match svc.ready().await {
60        Ok(ready) => ready.call(ex).await,
61        Err(err) => Err(err),
62    }
63}
64
65/// Route-level error handler owning ALL error handling logic.
66///
67/// Single owner of DLC, retry, onException policies. Called from
68/// `RouteChannelService` (boundary errors) and `run_steps` (step errors).
69#[async_trait::async_trait]
70pub trait RouteErrorHandler: Send + Sync {
71    /// Match a policy for the given error. Called once before retry.
72    fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
73
74    /// Phase 1: Retry the failed step.
75    async fn retry_step(
76        &self,
77        policy: Option<PolicyId>,
78        step: &mut dyn camel_api::error_handler::RetryableStep,
79        original: Exchange,
80        error: CamelError,
81    ) -> RetryOutcome;
82
83    /// Phase 2: Determine step disposition after retry exhaustion.
84    async fn handle_step(
85        &self,
86        policy: Option<PolicyId>,
87        exchange: Exchange,
88        error: CamelError,
89    ) -> Result<StepDisposition, CamelError>;
90
91    /// Handle boundary (infrastructure) errors.
92    async fn handle_boundary(
93        &self,
94        kind: BoundaryKind,
95        exchange: Exchange,
96        error: CamelError,
97    ) -> Result<Exchange, CamelError>;
98}
99
100/// Default implementation of RouteErrorHandler.
101/// Owns DLC producer exclusively. Encapsulates retry/onException/DLC logic.
102///
103/// Uses `SyncBoxProcessor` internally so the handler is `Send + Sync` as required
104/// by the `RouteErrorHandler` trait.
105pub struct DefaultRouteErrorHandler {
106    pub(crate) dlc_producer: Option<SyncBoxProcessor>,
107    pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
108}
109
110impl DefaultRouteErrorHandler {
111    pub fn new(
112        dlc_producer: Option<BoxProcessor>,
113        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
114    ) -> Self {
115        Self {
116            dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
117            policies: policies
118                .into_iter()
119                .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
120                .collect(),
121        }
122    }
123
124    /// Resolve (disposition, producer) for a matched policy.
125    /// Shared by handle_step and handle_boundary.
126    fn resolve_producer(
127        &self,
128        policy: Option<PolicyId>,
129    ) -> (ExceptionDisposition, Option<BoxProcessor>) {
130        match policy {
131            Some(PolicyId(idx)) => match self.policies.get(idx) {
132                Some((p, prod)) => (
133                    p.disposition,
134                    prod.as_ref()
135                        .map(|p| p.clone_inner())
136                        .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
137                ),
138                None => (
139                    ExceptionDisposition::Propagate,
140                    self.dlc_producer.as_ref().map(|p| p.clone_inner()),
141                ),
142            },
143            None => (
144                ExceptionDisposition::Propagate,
145                self.dlc_producer.as_ref().map(|p| p.clone_inner()),
146            ),
147        }
148    }
149}
150
151#[async_trait::async_trait]
152impl RouteErrorHandler for DefaultRouteErrorHandler {
153    fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
154        self.policies
155            .iter()
156            .position(|(p, _)| (p.matches)(err))
157            .map(PolicyId)
158    }
159
160    async fn retry_step(
161        &self,
162        policy: Option<PolicyId>,
163        step: &mut dyn camel_api::error_handler::RetryableStep,
164        original: Exchange,
165        error: CamelError,
166    ) -> RetryOutcome {
167        let Some(PolicyId(idx)) = policy else {
168            return RetryOutcome::Exhausted {
169                exchange: original,
170                error,
171                policy: None,
172            };
173        };
174        let Some((policy_def, _)) = self.policies.get(idx) else {
175            return RetryOutcome::Exhausted {
176                exchange: original,
177                error,
178                policy,
179            };
180        };
181        let Some(ref backoff) = policy_def.retry else {
182            return RetryOutcome::Exhausted {
183                exchange: original,
184                error,
185                policy,
186            };
187        };
188
189        for attempt in 0..backoff.max_attempts {
190            let delay = backoff.delay_for(attempt);
191            tokio::time::sleep(delay).await;
192
193            let mut ex = original.clone();
194            ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
195            ex.input.set_header(
196                HEADER_REDELIVERY_COUNTER,
197                Value::Number((attempt + 1).into()),
198            );
199            ex.input.set_header(
200                HEADER_REDELIVERY_MAX_COUNTER,
201                Value::Number(backoff.max_attempts.into()),
202            );
203
204            match step.invoke(ex).await {
205                PipelineOutcome::Completed(exchange) => {
206                    return RetryOutcome::Recovered(exchange);
207                }
208                PipelineOutcome::Stopped(stopped_ex) => {
209                    return RetryOutcome::Stopped(stopped_ex);
210                }
211                PipelineOutcome::Failed(retry_err) => {
212                    if attempt + 1 == backoff.max_attempts {
213                        let mut final_ex = original;
214                        final_ex
215                            .input
216                            .set_header(HEADER_REDELIVERED, Value::Bool(true));
217                        final_ex.input.set_header(
218                            HEADER_REDELIVERY_COUNTER,
219                            Value::Number(backoff.max_attempts.into()),
220                        );
221                        final_ex.input.set_header(
222                            HEADER_REDELIVERY_MAX_COUNTER,
223                            Value::Number(backoff.max_attempts.into()),
224                        );
225                        return RetryOutcome::Exhausted {
226                            exchange: final_ex,
227                            error: retry_err,
228                            policy,
229                        };
230                    }
231                }
232            }
233        }
234
235        RetryOutcome::Exhausted {
236            exchange: original,
237            error,
238            policy,
239        }
240    }
241
242    async fn handle_step(
243        &self,
244        policy: Option<PolicyId>,
245        mut exchange: Exchange,
246        error: CamelError,
247    ) -> Result<StepDisposition, CamelError> {
248        let (disposition, producer) = self.resolve_producer(policy);
249
250        // Run on_steps if present (using the SAME policy identified by PolicyId)
251        if let Some(PolicyId(idx)) = policy
252            && let Some((p, _)) = self.policies.get(idx)
253            && let Some(ref steps) = p.on_steps
254        {
255            let snapshot = exchange.clone();
256            exchange.set_error(error.clone());
257            let mut step_pipeline = steps.clone_inner();
258            let step_result = async {
259                let svc = step_pipeline.ready().await?;
260                svc.call(exchange).await
261            }
262            .await;
263            match step_result {
264                Ok(mut ex) => match disposition {
265                    ExceptionDisposition::Handled => {
266                        ex.handle_error();
267                        return Ok(StepDisposition::Handled(ex));
268                    }
269                    ExceptionDisposition::Continued => {
270                        ex.clear_error();
271                        return Ok(StepDisposition::Continued(ex));
272                    }
273                    ExceptionDisposition::Propagate => {
274                        exchange = snapshot;
275                    }
276                },
277                Err(_) => {
278                    exchange = snapshot;
279                }
280            }
281        }
282
283        // No on_steps, on_steps failed, or Propagate — forward to DLC/handler.
284        // BIND the returned exchange (must use handler output).
285        exchange.set_error(error.clone());
286        match send_to_handler(exchange, producer).await {
287            Ok(handler_ex) => match disposition {
288                ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
289                ExceptionDisposition::Handled => {
290                    let mut ex = handler_ex;
291                    ex.clear_error();
292                    Ok(StepDisposition::Handled(ex))
293                }
294                ExceptionDisposition::Continued => {
295                    let mut ex = handler_ex;
296                    ex.clear_error();
297                    Ok(StepDisposition::Continued(ex))
298                }
299            },
300            // Dead code by construction: send_to_handler always returns Ok.
301            Err(_) => Ok(StepDisposition::Propagate(error)),
302        }
303    }
304
305    async fn handle_boundary(
306        &self,
307        _kind: BoundaryKind,
308        mut exchange: Exchange,
309        error: CamelError,
310    ) -> Result<Exchange, CamelError> {
311        // Boundary errors: match policy, run on_steps, forward to DLC.
312        // Disposition mapping:
313        //   Handled → clear error, return Ok(exchange)
314        //   Propagate | Continued → forward to DLC, return Ok(exchange_with_error)
315        //   (Continued at boundary = Propagate — no next step to continue to)
316        let policy = self.match_policy(&error);
317        let (disposition, producer) = self.resolve_producer(policy);
318
319        // Run on_steps if present (shared logic with handle_step)
320        if let Some(PolicyId(idx)) = policy
321            && let Some((p, _)) = self.policies.get(idx)
322            && let Some(ref steps) = p.on_steps
323        {
324            let snapshot = exchange.clone();
325            exchange.set_error(error.clone());
326            let mut step_pipeline = steps.clone_inner();
327            let step_result = async {
328                let svc = step_pipeline.ready().await?;
329                svc.call(exchange).await
330            }
331            .await;
332            match step_result {
333                Ok(mut ex) => match disposition {
334                    ExceptionDisposition::Handled => {
335                        ex.handle_error();
336                        return Ok(ex);
337                    }
338                    ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
339                        exchange = snapshot;
340                    }
341                },
342                Err(_) => {
343                    exchange = snapshot;
344                }
345            }
346        }
347
348        // Forward to DLC/handler — BIND returned exchange
349        exchange.set_error(error.clone());
350        match send_to_handler(exchange, producer).await {
351            Ok(handler_ex) => match disposition {
352                ExceptionDisposition::Handled => {
353                    let mut ex = handler_ex;
354                    ex.clear_error();
355                    Ok(ex)
356                }
357                ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
358                    let mut ex = handler_ex;
359                    ex.set_error(error);
360                    Ok(ex)
361                }
362            },
363            // Dead code by construction: send_to_handler always returns Ok.
364            Err(e) => Err(e),
365        }
366    }
367}
368
369/// Tower Layer that wraps a pipeline with error handling behaviour.
370///
371/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
372pub struct ErrorHandlerLayer {
373    /// Resolved DLC producer (None = log only).
374    dlc_producer: Option<BoxProcessor>,
375    /// Policies with their resolved `handled_by` producers.
376    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
377}
378
379impl ErrorHandlerLayer {
380    /// Create the layer with pre-resolved producers.
381    pub fn new(
382        dlc_producer: Option<BoxProcessor>,
383        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
384    ) -> Self {
385        Self {
386            dlc_producer,
387            policies,
388        }
389    }
390}
391
392impl<S> Layer<S> for ErrorHandlerLayer
393where
394    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
395    S::Future: Send + 'static,
396{
397    type Service = ErrorHandlerService<S>;
398
399    fn layer(&self, inner: S) -> Self::Service {
400        ErrorHandlerService {
401            inner,
402            dlc_producer: self.dlc_producer.clone(),
403            policies: self
404                .policies
405                .iter()
406                .map(|(p, prod)| (p.clone(), prod.clone()))
407                .collect(),
408        }
409    }
410}
411
412/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
413///
414/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
415/// `has_error() == true` if the pipeline ultimately failed.
416pub struct ErrorHandlerService<S> {
417    inner: S,
418    dlc_producer: Option<BoxProcessor>,
419    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
420}
421
422impl<S: Clone> Clone for ErrorHandlerService<S> {
423    fn clone(&self) -> Self {
424        Self {
425            inner: self.inner.clone(),
426            dlc_producer: self.dlc_producer.clone(),
427            policies: self
428                .policies
429                .iter()
430                .map(|(p, prod)| (p.clone(), prod.clone()))
431                .collect(),
432        }
433    }
434}
435
436impl<S> ErrorHandlerService<S>
437where
438    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
439    S::Future: Send + 'static,
440{
441    /// Create the service directly (used in unit tests; in production use the Layer).
442    pub fn new(
443        inner: S,
444        dlc_producer: Option<BoxProcessor>,
445        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
446    ) -> Self {
447        Self {
448            inner,
449            dlc_producer,
450            policies,
451        }
452    }
453}
454
455impl<S> Service<Exchange> for ErrorHandlerService<S>
456where
457    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
458    S::Future: Send + 'static,
459{
460    type Response = Exchange;
461    type Error = CamelError;
462    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
463
464    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
465        // Preserve backpressure (Pending) but never leak readiness errors upward.
466        // Readiness errors are deferred to call(), where they go through the same
467        // retry/onException/DLC path as call() errors. This is safe because call()
468        // re-checks readiness on a fresh inner clone via inner.ready().await.
469        match self.inner.poll_ready(cx) {
470            Poll::Pending => Poll::Pending,
471            Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
472            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
473        }
474    }
475
476    fn call(&mut self, exchange: Exchange) -> Self::Future {
477        let mut inner = self.inner.clone();
478        let dlc = self.dlc_producer.clone();
479        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
480            .policies
481            .iter()
482            .map(|(p, prod)| (p.clone(), prod.clone()))
483            .collect();
484
485        Box::pin(async move {
486            let original = exchange.clone();
487            let result = match inner.ready().await {
488                Ok(svc) => svc.call(exchange).await,
489                Err(e) => Err(e), // readiness error — enters retry/DLC/onException path below
490            };
491
492            let err = match result {
493                Ok(ex) => return Ok(ex),
494                Err(e) => e,
495            };
496
497            // Find the first matching policy.
498            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
499
500            if let Some((policy, policy_producer)) = matched {
501                // Retry if configured.
502                if let Some(ref backoff) = policy.retry {
503                    for attempt in 0..backoff.max_attempts {
504                        let delay = backoff.delay_for(attempt);
505                        tokio::time::sleep(delay).await;
506
507                        // Set redelivery headers
508                        let mut ex = original.clone();
509                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
510                        ex.input.set_header(
511                            HEADER_REDELIVERY_COUNTER,
512                            Value::Number((attempt + 1).into()),
513                        );
514                        ex.input.set_header(
515                            HEADER_REDELIVERY_MAX_COUNTER,
516                            Value::Number(backoff.max_attempts.into()),
517                        );
518
519                        let result = match inner.ready().await {
520                            Ok(svc) => svc.call(ex).await,
521                            Err(e) => Err(e), // readiness error — enters retry exhaustion path
522                        };
523                        match result {
524                            Ok(ex) => return Ok(ex),
525                            Err(retry_err) => {
526                                if attempt + 1 == backoff.max_attempts {
527                                    // Retries exhausted — send to handler.
528                                    let mut original = original.clone();
529                                    original
530                                        .input
531                                        .set_header(HEADER_REDELIVERED, Value::Bool(true));
532                                    original.input.set_header(
533                                        HEADER_REDELIVERY_COUNTER,
534                                        Value::Number(backoff.max_attempts.into()),
535                                    );
536                                    original.input.set_header(
537                                        HEADER_REDELIVERY_MAX_COUNTER,
538                                        Value::Number(backoff.max_attempts.into()),
539                                    );
540                                    if let Some(ref steps) = policy.on_steps {
541                                        let handler = policy_producer.clone().or(dlc.clone());
542                                        return execute_on_steps(
543                                            original,
544                                            retry_err,
545                                            steps,
546                                            policy.disposition,
547                                            handler,
548                                        )
549                                        .await;
550                                    }
551                                    original.set_error(retry_err);
552                                    let handler = policy_producer.or(dlc);
553                                    return send_to_handler(original, handler).await;
554                                }
555                            }
556                        }
557                    }
558                }
559                // No retry configured (or 0 attempts) — send to policy handler or DLC.
560                if let Some(ref steps) = policy.on_steps {
561                    let handler = policy_producer.or(dlc);
562                    return execute_on_steps(original, err, steps, policy.disposition, handler)
563                        .await;
564                }
565                let mut ex = original.clone();
566                ex.set_error(err);
567                let handler = policy_producer.or(dlc);
568                send_to_handler(ex, handler).await
569            } else {
570                // No matching policy — forward directly to DLC.
571                let mut ex = original;
572                ex.set_error(err);
573                send_to_handler(ex, dlc).await
574            }
575        })
576    }
577}
578
579async fn send_to_handler(
580    exchange: Exchange,
581    producer: Option<BoxProcessor>,
582) -> Result<Exchange, CamelError> {
583    match producer {
584        None => {
585            // log-policy: system-broken
586            tracing::error!(
587                error = ?exchange.error,
588                "Exchange failed with no error handler configured"
589            );
590            Ok(exchange)
591        }
592        Some(mut prod) => match prod.ready().await {
593            Err(e) => {
594                // log-policy: system-broken
595                tracing::error!("DLC/handler not ready: {e}");
596                Ok(exchange)
597            }
598            Ok(svc) => match svc.call(exchange.clone()).await {
599                Ok(ex) => Ok(ex),
600                Err(e) => {
601                    // log-policy: system-broken
602                    tracing::error!("DLC/handler call failed: {e}");
603                    // Return the original exchange with original error intact.
604                    Ok(exchange)
605                }
606            },
607        },
608    }
609}
610
611#[cfg(test)]
612mod tests {
613    use super::*;
614    use camel_api::{
615        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, OutcomePipeline,
616        OutcomeSegment, PipelineOutcome, RetryableStep, SyncBoxProcessor, Value,
617        error_handler::RedeliveryPolicy,
618    };
619    use std::future::Future;
620    use std::pin::Pin;
621    use std::sync::{
622        Arc,
623        atomic::{AtomicU32, Ordering},
624    };
625    use std::time::Duration;
626    use tower::ServiceExt;
627
628    fn make_exchange() -> Exchange {
629        Exchange::new(Message::new("test"))
630    }
631
632    fn failing_processor() -> BoxProcessor {
633        BoxProcessor::from_fn(|_ex| {
634            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
635        })
636    }
637
638    fn ok_processor() -> BoxProcessor {
639        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
640    }
641
642    fn fail_n_times(n: u32) -> BoxProcessor {
643        let count = Arc::new(AtomicU32::new(0));
644        BoxProcessor::from_fn(move |ex| {
645            let count = Arc::clone(&count);
646            Box::pin(async move {
647                let c = count.fetch_add(1, Ordering::SeqCst);
648                if c < n {
649                    Err(CamelError::ProcessorError(format!("attempt {c}")))
650                } else {
651                    Ok(ex)
652                }
653            })
654        })
655    }
656
657    #[tokio::test]
658    async fn test_ok_passthrough() {
659        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
660        let result = svc.oneshot(make_exchange()).await;
661        assert!(result.is_ok());
662        assert!(!result.unwrap().has_error());
663    }
664
665    #[tokio::test]
666    async fn test_error_goes_to_dlc() {
667        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
668        let received_clone = Arc::clone(&received);
669        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
670            let r = Arc::clone(&received_clone);
671            Box::pin(async move {
672                r.lock().unwrap().push(ex.clone());
673                Ok(ex)
674            })
675        });
676
677        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
678        let result = svc.oneshot(make_exchange()).await;
679        assert!(result.is_ok());
680        let ex = result.unwrap();
681        assert!(ex.has_error());
682        assert_eq!(received.lock().unwrap().len(), 1);
683    }
684
685    #[tokio::test]
686    async fn test_retry_recovers() {
687        let inner = fail_n_times(2);
688        let policy = ExceptionPolicy {
689            matches: Arc::new(|_| true),
690            retry: Some(RedeliveryPolicy {
691                max_attempts: 3,
692                initial_delay: Duration::from_millis(1),
693                multiplier: 1.0,
694                max_delay: Duration::from_millis(10),
695                jitter_factor: 0.0,
696            }),
697            handled_by: None,
698            on_steps: None,
699            disposition: ExceptionDisposition::Propagate,
700        };
701        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
702        let result = svc.oneshot(make_exchange()).await;
703        assert!(result.is_ok());
704        assert!(!result.unwrap().has_error());
705    }
706
707    #[tokio::test]
708    async fn test_retry_exhausted_goes_to_dlc() {
709        let inner = fail_n_times(10);
710        let received = Arc::new(std::sync::Mutex::new(0u32));
711        let received_clone = Arc::clone(&received);
712        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
713            let r = Arc::clone(&received_clone);
714            Box::pin(async move {
715                *r.lock().unwrap() += 1;
716                Ok(ex)
717            })
718        });
719        let policy = ExceptionPolicy {
720            matches: Arc::new(|_| true),
721            retry: Some(RedeliveryPolicy {
722                max_attempts: 2,
723                initial_delay: Duration::from_millis(1),
724                multiplier: 1.0,
725                max_delay: Duration::from_millis(10),
726                jitter_factor: 0.0,
727            }),
728            handled_by: None,
729            on_steps: None,
730            disposition: ExceptionDisposition::Propagate,
731        };
732        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
733        let result = svc.oneshot(make_exchange()).await;
734        assert!(result.is_ok());
735        assert!(result.unwrap().has_error());
736        assert_eq!(*received.lock().unwrap(), 1);
737    }
738
739    #[test]
740    fn test_poll_ready_delegates_to_inner() {
741        use std::sync::atomic::AtomicBool;
742
743        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
744        #[derive(Clone)]
745        struct DelayedReadyService {
746            ready: Arc<AtomicBool>,
747        }
748
749        impl Service<Exchange> for DelayedReadyService {
750            type Response = Exchange;
751            type Error = CamelError;
752            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
753
754            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
755                if self.ready.fetch_or(true, Ordering::SeqCst) {
756                    // Already marked ready (second+ call) → Ready
757                    Poll::Ready(Ok(()))
758                } else {
759                    // First call → Pending, schedule a wake
760                    cx.waker().wake_by_ref();
761                    Poll::Pending
762                }
763            }
764
765            fn call(&mut self, ex: Exchange) -> Self::Future {
766                Box::pin(async move { Ok(ex) })
767            }
768        }
769
770        let waker = futures::task::noop_waker();
771        let mut cx = Context::from_waker(&waker);
772
773        let inner = DelayedReadyService {
774            ready: Arc::new(AtomicBool::new(false)),
775        };
776        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
777
778        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
779        let first = Pin::new(&mut svc).poll_ready(&mut cx);
780        assert!(first.is_pending(), "expected Pending on first poll_ready");
781
782        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
783        let second = Pin::new(&mut svc).poll_ready(&mut cx);
784        assert!(second.is_ready(), "expected Ready on second poll_ready");
785    }
786
787    #[tokio::test]
788    async fn test_no_matching_policy_uses_dlc() {
789        let received = Arc::new(std::sync::Mutex::new(0u32));
790        let received_clone = Arc::clone(&received);
791        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
792            let r = Arc::clone(&received_clone);
793            Box::pin(async move {
794                *r.lock().unwrap() += 1;
795                Ok(ex)
796            })
797        });
798        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
799        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
800        let result = svc.oneshot(make_exchange()).await;
801        assert!(result.is_ok());
802        assert_eq!(*received.lock().unwrap(), 1);
803    }
804
805    #[tokio::test]
806    async fn test_redelivery_headers_are_set() {
807        use camel_api::error_handler::{
808            HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
809            RedeliveryPolicy,
810        };
811
812        let inner = fail_n_times(10);
813        let received = Arc::new(std::sync::Mutex::new(None));
814        let received_clone = Arc::clone(&received);
815        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
816            let r = Arc::clone(&received_clone);
817            Box::pin(async move {
818                *r.lock().unwrap() = Some(ex.clone());
819                Ok(ex)
820            })
821        });
822
823        let policy = ExceptionPolicy {
824            matches: Arc::new(|_| true),
825            retry: Some(RedeliveryPolicy {
826                max_attempts: 2,
827                initial_delay: Duration::from_millis(1),
828                multiplier: 1.0,
829                max_delay: Duration::from_millis(10),
830                jitter_factor: 0.0,
831            }),
832            handled_by: None,
833            on_steps: None,
834            disposition: ExceptionDisposition::Propagate,
835        };
836
837        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
838        let _ = svc.oneshot(make_exchange()).await.unwrap();
839
840        let ex = received.lock().unwrap().take().unwrap();
841        assert_eq!(
842            ex.input.header(HEADER_REDELIVERED),
843            Some(&Value::Bool(true))
844        );
845        assert_eq!(
846            ex.input.header(HEADER_REDELIVERY_COUNTER),
847            Some(&Value::Number(2.into()))
848        );
849        assert_eq!(
850            ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
851            Some(&Value::Number(2.into()))
852        );
853    }
854
855    #[tokio::test]
856    async fn test_jitter_produces_varying_delays_in_retry_flow() {
857        use std::time::Instant;
858
859        let inner = fail_n_times(10);
860        let received = Arc::new(std::sync::Mutex::new(None));
861        let received_clone = Arc::clone(&received);
862        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
863            let r = Arc::clone(&received_clone);
864            Box::pin(async move {
865                *r.lock().unwrap() = Some(ex.clone());
866                Ok(ex)
867            })
868        });
869
870        let policy = ExceptionPolicy {
871            matches: Arc::new(|_| true),
872            retry: Some(RedeliveryPolicy {
873                max_attempts: 5,
874                initial_delay: Duration::from_millis(20),
875                multiplier: 1.0,
876                max_delay: Duration::from_millis(100),
877                jitter_factor: 0.5,
878            }),
879            handled_by: None,
880            on_steps: None,
881            disposition: ExceptionDisposition::Propagate,
882        };
883
884        let start = Instant::now();
885        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
886        let _ = svc.oneshot(make_exchange()).await.unwrap();
887        let elapsed = start.elapsed();
888
889        assert!(
890            received.lock().unwrap().is_some(),
891            "DLC should have received exchange"
892        );
893
894        assert!(
895            elapsed >= Duration::from_millis(50),
896            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
897        );
898
899        assert!(
900            elapsed <= Duration::from_millis(500),
901            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
902        );
903    }
904
905    #[tokio::test]
906    async fn test_on_steps_handled_true_consumes_error() {
907        use tower::ServiceExt;
908
909        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
910            ex.input.body = camel_api::Body::Bytes("handled".into());
911            async move { Ok(ex) }
912        }));
913        let policy = ExceptionPolicy {
914            matches: Arc::new(|_| true),
915            retry: None,
916            handled_by: None,
917            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
918            disposition: ExceptionDisposition::Handled,
919        };
920        let inner = tower::service_fn(|_ex: Exchange| async {
921            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
922        });
923        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
924        let ex = Exchange::default();
925        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
926        assert!(result.error.is_none(), "handled:true should clear error");
927        assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
928    }
929
930    #[tokio::test]
931    async fn test_on_steps_handled_false_propagates_error() {
932        use tower::ServiceExt;
933
934        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
935            ex.input.body = camel_api::Body::Bytes("handled".into());
936            async move { Ok(ex) }
937        }));
938        let policy = ExceptionPolicy {
939            matches: Arc::new(|_| true),
940            retry: None,
941            handled_by: None,
942            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
943            disposition: ExceptionDisposition::Propagate,
944        };
945        let inner = tower::service_fn(|_ex: Exchange| async {
946            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
947        });
948        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
949        let ex = Exchange::default();
950        let result = svc.ready().await.unwrap().call(ex).await;
951        assert!(result.is_err(), "handled:false should propagate error");
952    }
953
954    // --- Readiness error capture tests ---
955    //
956    // ErrorHandlerService must capture readiness errors (poll_ready returning Err)
957    // and route them through retry/onException/DLC instead of propagating raw.
958
959    /// A service whose `poll_ready` always returns `Ready(Err(...))` but whose
960    /// `call` returns `Ok`. This simulates a permanently-not-ready endpoint.
961    #[derive(Clone)]
962    struct ReadinessFailService {
963        error: CamelError,
964    }
965
966    impl ReadinessFailService {
967        fn new(error: CamelError) -> Self {
968            Self { error }
969        }
970    }
971
972    impl Service<Exchange> for ReadinessFailService {
973        type Response = Exchange;
974        type Error = CamelError;
975        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
976
977        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
978            Poll::Ready(Err(self.error.clone()))
979        }
980
981        fn call(&mut self, ex: Exchange) -> Self::Future {
982            // call() should never be reached if poll_ready returns Err,
983            // but Tower's ready().await on a clone will re-encounter the readiness error.
984            Box::pin(async move { Ok(ex) })
985        }
986    }
987
988    #[tokio::test]
989    async fn test_readiness_error_goes_to_dlc() {
990        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
991        let inner = ReadinessFailService {
992            error: readiness_err,
993        };
994
995        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
996        let received_clone = Arc::clone(&received);
997        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
998            let r = Arc::clone(&received_clone);
999            Box::pin(async move {
1000                r.lock().unwrap().push(ex.clone());
1001                Ok(ex)
1002            })
1003        });
1004
1005        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1006        let result = svc.oneshot(make_exchange()).await;
1007
1008        // The error must be absorbed (Ok), not propagated raw (Err).
1009        assert!(
1010            result.is_ok(),
1011            "readiness error should be captured and sent to DLC, got: {:?}",
1012            result
1013        );
1014        let ex = result.unwrap();
1015        assert!(ex.has_error(), "exchange should carry the readiness error");
1016        assert_eq!(
1017            received.lock().unwrap().len(),
1018            1,
1019            "DLC should have received the exchange exactly once"
1020        );
1021    }
1022
1023    #[tokio::test]
1024    async fn test_readiness_error_goes_to_matching_policy() {
1025        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1026        let inner = ReadinessFailService {
1027            error: readiness_err,
1028        };
1029
1030        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1031            ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1032            async move { Ok(ex) }
1033        }));
1034        let policy = ExceptionPolicy {
1035            matches: Arc::new(|_| true),
1036            retry: None,
1037            handled_by: None,
1038            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1039            disposition: ExceptionDisposition::Handled,
1040        };
1041
1042        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1043        let result = svc.oneshot(make_exchange()).await;
1044
1045        // The error must be absorbed and routed to the on_steps handler.
1046        assert!(
1047            result.is_ok(),
1048            "readiness error should be captured by policy, got: {:?}",
1049            result
1050        );
1051        let ex = result.unwrap();
1052        assert!(ex.error.is_none(), "handled:true should clear error");
1053        assert!(
1054            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1055            "on_steps should have modified the body"
1056        );
1057    }
1058
1059    #[test]
1060    fn test_poll_ready_converts_readiness_error_to_ok() {
1061        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1062        let inner = ReadinessFailService {
1063            error: readiness_err,
1064        };
1065        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1066
1067        let waker = futures::task::noop_waker();
1068        let mut cx = Context::from_waker(&waker);
1069
1070        // poll_ready must NOT propagate the readiness error — convert to Ok.
1071        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1072        match poll {
1073            Poll::Ready(Ok(())) => { /* correct */ }
1074            Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1075            Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1076        }
1077    }
1078
1079    // --- invoke_processor tests ---
1080
1081    #[tokio::test]
1082    async fn test_invoke_processor_returns_ok_on_success() {
1083        let mut svc = ok_processor();
1084        let ex = make_exchange();
1085        let result = invoke_processor(&mut svc, ex).await;
1086        assert!(result.is_ok());
1087    }
1088
1089    #[tokio::test]
1090    async fn test_invoke_processor_captures_readiness_error() {
1091        let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1092            CamelError::ProcessorError("not ready".into()),
1093        ));
1094        let ex = make_exchange();
1095        let result = invoke_processor(&mut failing_ready, ex).await;
1096        assert!(result.is_err());
1097    }
1098
1099    #[tokio::test]
1100    async fn test_on_steps_handled_true_clears_exception_properties() {
1101        use tower::ServiceExt;
1102
1103        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1104            ex.input.body = camel_api::Body::Bytes("handled".into());
1105            async move { Ok(ex) }
1106        }));
1107        let policy = ExceptionPolicy {
1108            matches: Arc::new(|_| true),
1109            retry: None,
1110            handled_by: None,
1111            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1112            disposition: ExceptionDisposition::Handled,
1113        };
1114        let inner = tower::service_fn(|_ex: Exchange| async {
1115            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1116        });
1117        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1118        let ex = Exchange::default();
1119        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1120        assert!(result.error.is_none(), "handled:true should clear error");
1121        assert!(
1122            result
1123                .properties
1124                .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1125                .is_none(),
1126            "handled:true should clear exception properties"
1127        );
1128        assert!(
1129            result
1130                .properties
1131                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1132                .is_none(),
1133            "handled:true should clear exception kind property"
1134        );
1135        assert!(
1136            result
1137                .properties
1138                .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1139                .is_none(),
1140            "handled:true should clear exception caught property"
1141        );
1142    }
1143
1144    // ── DefaultRouteErrorHandler tests ──
1145
1146    #[test]
1147    fn test_match_policy_returns_id_for_matching_error() {
1148        let handler = DefaultRouteErrorHandler::new(
1149            None,
1150            vec![(
1151                ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1152                None,
1153            )],
1154        );
1155        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1156        assert_eq!(id, Some(PolicyId(0)));
1157    }
1158
1159    #[test]
1160    fn test_match_policy_returns_none_for_unmatched() {
1161        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1162        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1163        assert_eq!(id, None);
1164    }
1165
1166    // ── retry_step tests ──
1167
1168    #[tokio::test]
1169    async fn test_retry_step_succeeds_on_second_attempt() {
1170        let mut policy = ExceptionPolicy::new(|_| true);
1171        policy.retry = Some(RedeliveryPolicy::new(3));
1172        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1173        let mut step = fail_n_times(1); // fails once, then succeeds
1174        let ex = make_exchange();
1175        let outcome = handler
1176            .retry_step(
1177                Some(PolicyId(0)),
1178                &mut step,
1179                ex,
1180                CamelError::ProcessorError("attempt 0".into()),
1181            )
1182            .await;
1183        assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1184    }
1185
1186    #[tokio::test]
1187    async fn test_retry_step_exhausted_when_all_fail() {
1188        let mut policy = ExceptionPolicy::new(|_| true);
1189        policy.retry = Some(RedeliveryPolicy::new(3));
1190        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1191        let mut step = failing_processor();
1192        let ex = make_exchange();
1193        let outcome = handler
1194            .retry_step(
1195                Some(PolicyId(0)),
1196                &mut step,
1197                ex,
1198                CamelError::ProcessorError("boom".into()),
1199            )
1200            .await;
1201        assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1202    }
1203
1204    #[tokio::test]
1205    async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1206        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1207        let mut step = ok_processor();
1208        let ex = make_exchange();
1209        let outcome = handler
1210            .retry_step(
1211                None,
1212                &mut step,
1213                ex,
1214                CamelError::ProcessorError("boom".into()),
1215            )
1216            .await;
1217        assert!(matches!(
1218            outcome,
1219            RetryOutcome::Exhausted { policy: None, .. }
1220        ));
1221    }
1222
1223    // ── handle_step tests ──
1224
1225    #[tokio::test]
1226    async fn test_handle_step_propagate_sends_to_dlc() {
1227        let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1228        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1229        let ex = make_exchange();
1230        let result = handler
1231            .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1232            .await;
1233        assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1234    }
1235
1236    #[tokio::test]
1237    async fn test_handle_step_handled_uses_handler_output() {
1238        let handler_producer = BoxProcessor::from_fn(|mut ex| {
1239            Box::pin(async move {
1240                ex.input.set_header("processed_by", Value::Bool(true));
1241                Ok(ex)
1242            })
1243        });
1244        let policy = ExceptionPolicy {
1245            matches: std::sync::Arc::new(|_| true),
1246            retry: None,
1247            handled_by: None,
1248            on_steps: None,
1249            disposition: ExceptionDisposition::Handled,
1250        };
1251        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1252        let mut ex = make_exchange();
1253        ex.set_error(CamelError::ProcessorError("boom".into()));
1254        let result = handler
1255            .handle_step(
1256                Some(PolicyId(0)),
1257                ex,
1258                CamelError::ProcessorError("boom".into()),
1259            )
1260            .await;
1261        match result {
1262            Ok(StepDisposition::Handled(ex)) => {
1263                assert!(!ex.has_error(), "error should be cleared");
1264                assert_eq!(
1265                    ex.input.header("processed_by"),
1266                    Some(&Value::Bool(true)),
1267                    "should use handler's output exchange"
1268                );
1269            }
1270            other => panic!("expected Handled, got {:?}", other.is_ok()),
1271        }
1272    }
1273
1274    #[tokio::test]
1275    async fn test_handle_step_continued_clears_error() {
1276        let policy = ExceptionPolicy {
1277            matches: std::sync::Arc::new(|_| true),
1278            retry: None,
1279            handled_by: None,
1280            on_steps: None,
1281            disposition: ExceptionDisposition::Continued,
1282        };
1283        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1284        let mut ex = make_exchange();
1285        ex.set_error(CamelError::ProcessorError("boom".into()));
1286        let result = handler
1287            .handle_step(
1288                Some(PolicyId(0)),
1289                ex,
1290                CamelError::ProcessorError("boom".into()),
1291            )
1292            .await;
1293        match result {
1294            Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1295            other => panic!("expected Continued, got {:?}", other.is_ok()),
1296        }
1297    }
1298
1299    #[tokio::test]
1300    async fn test_handle_step_with_on_steps_handled() {
1301        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1302            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1303            async move { Ok(ex) }
1304        }));
1305        let policy = ExceptionPolicy {
1306            matches: std::sync::Arc::new(|_| true),
1307            retry: None,
1308            handled_by: None,
1309            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1310            disposition: ExceptionDisposition::Handled,
1311        };
1312        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1313        let mut ex = make_exchange();
1314        ex.set_error(CamelError::ProcessorError("boom".into()));
1315        let result = handler
1316            .handle_step(
1317                Some(PolicyId(0)),
1318                ex,
1319                CamelError::ProcessorError("boom".into()),
1320            )
1321            .await;
1322        match result {
1323            Ok(StepDisposition::Handled(ex)) => {
1324                assert!(!ex.has_error(), "error should be cleared");
1325                assert!(
1326                    matches!(ex.input.body, camel_api::Body::Bytes(_)),
1327                    "on_steps should have modified the body"
1328                );
1329            }
1330            other => panic!("expected Handled, got: {}", other.is_ok()),
1331        }
1332    }
1333
1334    #[tokio::test]
1335    async fn test_handle_step_with_on_steps_propagate_falls_through() {
1336        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1337            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1338            async move { Ok(ex) }
1339        }));
1340        let dlc_called = Arc::new(AtomicU32::new(0));
1341        let dlc_called_clone = dlc_called.clone();
1342        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1343            let c = dlc_called_clone.clone();
1344            Box::pin(async move {
1345                c.fetch_add(1, Ordering::SeqCst);
1346                Ok(ex)
1347            })
1348        });
1349        let policy = ExceptionPolicy {
1350            matches: std::sync::Arc::new(|_| true),
1351            retry: None,
1352            handled_by: None,
1353            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1354            disposition: ExceptionDisposition::Propagate,
1355        };
1356        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1357        let mut ex = make_exchange();
1358        ex.set_error(CamelError::ProcessorError("boom".into()));
1359        let result = handler
1360            .handle_step(
1361                Some(PolicyId(0)),
1362                ex,
1363                CamelError::ProcessorError("boom".into()),
1364            )
1365            .await;
1366        assert!(
1367            matches!(result, Ok(StepDisposition::Propagate(_))),
1368            "Propagate disposition should return Propagate"
1369        );
1370        assert_eq!(
1371            dlc_called.load(Ordering::SeqCst),
1372            1,
1373            "DLC should be called when on_steps disposition is Propagate"
1374        );
1375    }
1376
1377    #[tokio::test]
1378    async fn test_handle_step_dlc_failure_propagates() {
1379        let failing_dlc = BoxProcessor::from_fn(|_| {
1380            Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1381        });
1382        let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1383        let ex = make_exchange();
1384        let result = handler
1385            .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1386            .await;
1387        assert!(
1388            matches!(result, Ok(StepDisposition::Propagate(_))),
1389            "DLC failure should still return Propagate with original error"
1390        );
1391    }
1392
1393    // ── handle_boundary tests ──
1394
1395    #[tokio::test]
1396    async fn test_handle_boundary_security_error_goes_to_dlc() {
1397        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1398        let count_clone = dlc_count.clone();
1399        let dlc = BoxProcessor::from_fn(move |ex| {
1400            let c = count_clone.clone();
1401            Box::pin(async move {
1402                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1403                Ok(ex)
1404            })
1405        });
1406        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1407        let ex = make_exchange();
1408        let result = handler
1409            .handle_boundary(
1410                BoundaryKind::Security,
1411                ex,
1412                CamelError::Unauthorized("denied".into()),
1413            )
1414            .await;
1415        assert!(result.is_ok());
1416        assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1417    }
1418
1419    #[tokio::test]
1420    async fn test_handle_boundary_handled_clears_error() {
1421        let policy = ExceptionPolicy {
1422            matches: std::sync::Arc::new(|_| true),
1423            retry: None,
1424            handled_by: None,
1425            on_steps: None,
1426            disposition: ExceptionDisposition::Handled,
1427        };
1428        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1429        let ex = make_exchange();
1430        let result = handler
1431            .handle_boundary(
1432                BoundaryKind::Security,
1433                ex,
1434                CamelError::Unauthorized("denied".into()),
1435            )
1436            .await;
1437        assert!(result.is_ok());
1438        assert!(
1439            !result.unwrap().has_error(),
1440            "Handled disposition should clear error"
1441        );
1442    }
1443
1444    #[tokio::test]
1445    async fn test_handle_boundary_propagate_preserves_error() {
1446        let policy = ExceptionPolicy {
1447            matches: std::sync::Arc::new(|_| true),
1448            retry: None,
1449            handled_by: None,
1450            on_steps: None,
1451            disposition: ExceptionDisposition::Propagate,
1452        };
1453        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1454        let ex = make_exchange();
1455        let result = handler
1456            .handle_boundary(
1457                BoundaryKind::CircuitBreaker,
1458                ex,
1459                CamelError::CircuitOpen("open".into()),
1460            )
1461            .await;
1462        assert!(result.is_ok(), "boundary errors always return Ok");
1463        assert!(
1464            result.unwrap().has_error(),
1465            "Propagate disposition should preserve error"
1466        );
1467    }
1468
1469    #[tokio::test]
1470    async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1471        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1472        let count_clone = dlc_count.clone();
1473        let dlc = BoxProcessor::from_fn(move |ex| {
1474            let c = count_clone.clone();
1475            Box::pin(async move {
1476                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1477                Ok(ex)
1478            })
1479        });
1480        let policy = ExceptionPolicy {
1481            matches: std::sync::Arc::new(|_| true),
1482            retry: None,
1483            handled_by: None,
1484            on_steps: None,
1485            disposition: ExceptionDisposition::Continued,
1486        };
1487        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1488        let ex = make_exchange();
1489        let result = handler
1490            .handle_boundary(
1491                BoundaryKind::Security,
1492                ex,
1493                CamelError::Unauthorized("denied".into()),
1494            )
1495            .await;
1496        assert!(result.is_ok(), "boundary errors always return Ok");
1497        assert!(
1498            result.unwrap().has_error(),
1499            "Continued at boundary should preserve error"
1500        );
1501        assert_eq!(
1502            dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1503            1,
1504            "DLC should be called"
1505        );
1506    }
1507
1508    #[tokio::test]
1509    async fn test_handle_boundary_with_on_steps_handled() {
1510        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1511            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1512            async move { Ok(ex) }
1513        }));
1514        let policy = ExceptionPolicy {
1515            matches: std::sync::Arc::new(|_| true),
1516            retry: None,
1517            handled_by: None,
1518            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1519            disposition: ExceptionDisposition::Handled,
1520        };
1521        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1522        let ex = make_exchange();
1523        let result = handler
1524            .handle_boundary(
1525                BoundaryKind::Security,
1526                ex,
1527                CamelError::Unauthorized("denied".into()),
1528            )
1529            .await;
1530        assert!(result.is_ok(), "boundary errors always return Ok");
1531        let ex = result.unwrap();
1532        assert!(!ex.has_error(), "Handled disposition should clear error");
1533        assert!(
1534            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1535            "on_steps should have modified the body"
1536        );
1537    }
1538
1539    #[tokio::test]
1540    async fn retry_step_segment_stop_maps_to_retry_outcome_stopped() {
1541        use std::sync::Arc;
1542        use std::sync::atomic::{AtomicUsize, Ordering};
1543
1544        #[derive(Clone)]
1545        struct StoppingSegment {
1546            n: Arc<AtomicUsize>,
1547        }
1548        impl OutcomePipeline for StoppingSegment {
1549            fn clone_box(&self) -> Box<dyn OutcomePipeline> {
1550                Box::new(self.clone())
1551            }
1552            fn run<'a>(
1553                &'a mut self,
1554                ex: Exchange,
1555            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
1556                let n = self.n.clone();
1557                Box::pin(async move {
1558                    n.fetch_add(1, Ordering::SeqCst);
1559                    PipelineOutcome::Stopped(ex)
1560                })
1561            }
1562        }
1563
1564        let call_count = Arc::new(AtomicUsize::new(0));
1565        let seg = OutcomeSegment::new(Box::new(StoppingSegment {
1566            n: call_count.clone(),
1567        }));
1568        let mut retryable: Box<dyn RetryableStep> = Box::new(seg);
1569
1570        let mut policy = ExceptionPolicy::new(|_e: &CamelError| true);
1571        policy.retry = Some(RedeliveryPolicy::new(3));
1572        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1573
1574        let original = Exchange::new(Message::new("retry-me"));
1575        let err = CamelError::ProcessorError("trigger retry".into());
1576        let outcome = handler
1577            .retry_step(Some(PolicyId(0)), retryable.as_mut(), original, err)
1578            .await;
1579
1580        assert!(
1581            matches!(outcome, RetryOutcome::Stopped(_)),
1582            "Segment Stop must map to RetryOutcome::Stopped, got {:?}",
1583            outcome
1584        );
1585        assert_eq!(
1586            call_count.load(Ordering::SeqCst),
1587            1,
1588            "Stop must short-circuit retry — only 1 invoke expected, got {}",
1589            call_count.load(Ordering::SeqCst)
1590        );
1591    }
1592
1593    #[tokio::test]
1594    async fn retry_step_new_signature_works_with_dlc_producer() {
1595        use std::sync::Arc;
1596        use std::sync::atomic::{AtomicUsize, Ordering};
1597
1598        #[derive(Clone)]
1599        struct CountingProducer {
1600            count: Arc<AtomicUsize>,
1601            succeed_on: usize,
1602        }
1603        impl tower::Service<Exchange> for CountingProducer {
1604            type Response = Exchange;
1605            type Error = CamelError;
1606            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1607            fn poll_ready(
1608                &mut self,
1609                _cx: &mut std::task::Context<'_>,
1610            ) -> std::task::Poll<Result<(), Self::Error>> {
1611                std::task::Poll::Ready(Ok(()))
1612            }
1613            fn call(&mut self, ex: Exchange) -> Self::Future {
1614                let n = self.count.fetch_add(1, Ordering::SeqCst);
1615                let succeed_on = self.succeed_on;
1616                Box::pin(async move {
1617                    if n >= succeed_on {
1618                        Ok(ex)
1619                    } else {
1620                        Err(CamelError::ProcessorError("retry".into()))
1621                    }
1622                })
1623            }
1624        }
1625
1626        let count = Arc::new(AtomicUsize::new(0));
1627        let producer = CountingProducer {
1628            count: count.clone(),
1629            succeed_on: 2,
1630        };
1631        let sync_bp = SyncBoxProcessor::new(BoxProcessor::new(producer));
1632        let bp1 = sync_bp.clone_inner();
1633        let bp2 = sync_bp.clone_inner();
1634        let mut retryable1: Box<dyn RetryableStep> = Box::new(bp1);
1635        let mut retryable2: Box<dyn RetryableStep> = Box::new(bp2);
1636
1637        let ex = Exchange::new(Message::new("dlc"));
1638        let outcome1 = retryable1.invoke(ex.clone()).await;
1639        let outcome2 = retryable2.invoke(ex).await;
1640        assert!(matches!(outcome1, PipelineOutcome::Failed(_)));
1641        assert!(matches!(outcome2, PipelineOutcome::Failed(_)));
1642        assert_eq!(
1643            count.load(Ordering::SeqCst),
1644            2,
1645            "DLC producer must be invoked exactly twice through SyncBoxProcessor"
1646        );
1647        drop(retryable1);
1648        drop(retryable2);
1649        drop(sync_bp);
1650    }
1651}