Skip to main content

camel_processor/
error_handler.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8    BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9    HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10    StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15    original: Exchange,
16    original_err: CamelError,
17    on_steps: &SyncBoxProcessor,
18    disposition: ExceptionDisposition,
19    handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21    let snapshot = original.clone();
22    let mut ex = original;
23    ex.set_error(original_err.clone());
24    let mut pipeline = on_steps.clone_inner();
25    let step_result = async {
26        let svc = pipeline.ready().await?;
27        svc.call(ex).await
28    }
29    .await;
30
31    match step_result {
32        Ok(mut ex) => {
33            if disposition == ExceptionDisposition::Handled {
34                ex.handle_error();
35                Ok(ex)
36            } else {
37                // Propagate or Continued — steps execute for side-effects (e.g. logging) but
38                // the modified exchange is discarded and the original error propagated.
39                Err(original_err)
40            }
41        }
42        Err(_) => {
43            // log-policy: handler-owned
44            tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
45            let mut ex = snapshot;
46            ex.set_error(original_err);
47            send_to_handler(ex, handler).await
48        }
49    }
50}
51
52/// Invoke a processor: readiness check + call, unified into a single Result.
53///
54/// Readiness errors and call errors are both returned as `Err(CamelError)`,
55/// allowing the pipeline's recovery loop to handle them uniformly.
56pub async fn invoke_processor(
57    svc: &mut BoxProcessor,
58    ex: Exchange,
59) -> Result<Exchange, CamelError> {
60    match svc.ready().await {
61        Ok(ready) => ready.call(ex).await,
62        Err(err) => Err(err),
63    }
64}
65
66/// Route-level error handler owning ALL error handling logic.
67///
68/// Single owner of DLC, retry, onException policies. Called from
69/// `RouteChannelService` (boundary errors) and `run_steps` (step errors).
70#[async_trait::async_trait]
71pub trait RouteErrorHandler: Send + Sync {
72    /// Match a policy for the given error. Called once before retry.
73    fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
74
75    /// Phase 1: Retry the failed step.
76    async fn retry_step(
77        &self,
78        policy: Option<PolicyId>,
79        step: &mut dyn camel_api::error_handler::RetryableStep,
80        original: Exchange,
81        error: CamelError,
82    ) -> RetryOutcome;
83
84    /// Phase 2: Determine step disposition after retry exhaustion.
85    async fn handle_step(
86        &self,
87        policy: Option<PolicyId>,
88        exchange: Exchange,
89        error: CamelError,
90    ) -> Result<StepDisposition, CamelError>;
91
92    /// Handle boundary (infrastructure) errors.
93    async fn handle_boundary(
94        &self,
95        kind: BoundaryKind,
96        exchange: Exchange,
97        error: CamelError,
98    ) -> Result<Exchange, CamelError>;
99}
100
101/// Default implementation of RouteErrorHandler.
102/// Owns DLC producer exclusively. Encapsulates retry/onException/DLC logic.
103///
104/// Uses `SyncBoxProcessor` internally so the handler is `Send + Sync` as required
105/// by the `RouteErrorHandler` trait.
106pub struct DefaultRouteErrorHandler {
107    pub(crate) dlc_producer: Option<SyncBoxProcessor>,
108    pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
109    /// When true, restore the original Message (body and headers, pre-route, pre-mutation)
110    /// before forwarding to the DLC/handler. The original message is stashed by
111    /// `RouteChannelService::call` as the `ORIGINAL_MESSAGE_EXTENSION` extension.
112    pub use_original_message: bool,
113}
114
115impl DefaultRouteErrorHandler {
116    pub fn new(
117        dlc_producer: Option<BoxProcessor>,
118        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
119    ) -> Self {
120        Self {
121            dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
122            policies: policies
123                .into_iter()
124                .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
125                .collect(),
126            use_original_message: false,
127        }
128    }
129
130    /// Enable restoration of the original message before forwarding to DLC.
131    pub fn with_use_original_message(mut self, enabled: bool) -> Self {
132        self.use_original_message = enabled;
133        self
134    }
135
136    /// Restore the original message from the `CamelOriginalMessage` extension if
137    /// `use_original_message` is enabled. Called immediately before `send_to_handler`
138    /// dispatch in both `handle_step` and `handle_boundary`.
139    fn restore_original_message_if_enabled(&self, exchange: &mut Exchange) {
140        if self.use_original_message
141            && let Some(orig) =
142                exchange.get_extension::<camel_api::Message>(camel_api::ORIGINAL_MESSAGE_EXTENSION)
143        {
144            exchange.input = orig.clone();
145        }
146    }
147
148    /// Resolve (disposition, producer) for a matched policy.
149    /// Shared by handle_step and handle_boundary.
150    fn resolve_producer(
151        &self,
152        policy: Option<PolicyId>,
153    ) -> (ExceptionDisposition, Option<BoxProcessor>) {
154        match policy {
155            Some(PolicyId(idx)) => match self.policies.get(idx) {
156                Some((p, prod)) => (
157                    p.disposition,
158                    prod.as_ref()
159                        .map(|p| p.clone_inner())
160                        .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
161                ),
162                None => (
163                    ExceptionDisposition::Propagate,
164                    self.dlc_producer.as_ref().map(|p| p.clone_inner()),
165                ),
166            },
167            None => (
168                ExceptionDisposition::Propagate,
169                self.dlc_producer.as_ref().map(|p| p.clone_inner()),
170            ),
171        }
172    }
173}
174
175#[async_trait::async_trait]
176impl RouteErrorHandler for DefaultRouteErrorHandler {
177    fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
178        self.policies
179            .iter()
180            .position(|(p, _)| (p.matches)(err))
181            .map(PolicyId)
182    }
183
184    async fn retry_step(
185        &self,
186        policy: Option<PolicyId>,
187        step: &mut dyn camel_api::error_handler::RetryableStep,
188        original: Exchange,
189        error: CamelError,
190    ) -> RetryOutcome {
191        let Some(PolicyId(idx)) = policy else {
192            return RetryOutcome::Exhausted {
193                exchange: original,
194                error,
195                policy: None,
196            };
197        };
198        let Some((policy_def, _)) = self.policies.get(idx) else {
199            return RetryOutcome::Exhausted {
200                exchange: original,
201                error,
202                policy,
203            };
204        };
205        let Some(ref backoff) = policy_def.retry else {
206            return RetryOutcome::Exhausted {
207                exchange: original,
208                error,
209                policy,
210            };
211        };
212
213        for attempt in 0..backoff.max_attempts {
214            let delay = backoff.delay_for(attempt);
215            tokio::time::sleep(delay).await;
216
217            let mut ex = original.clone();
218            ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
219            ex.input.set_header(
220                HEADER_REDELIVERY_COUNTER,
221                Value::Number((attempt + 1).into()),
222            );
223            ex.input.set_header(
224                HEADER_REDELIVERY_MAX_COUNTER,
225                Value::Number(backoff.max_attempts.into()),
226            );
227
228            match step.invoke(ex).await {
229                PipelineOutcome::Completed(exchange) => {
230                    return RetryOutcome::Recovered(exchange);
231                }
232                PipelineOutcome::Stopped(stopped_ex) => {
233                    return RetryOutcome::Stopped(stopped_ex);
234                }
235                PipelineOutcome::Failed(retry_err) => {
236                    if attempt + 1 == backoff.max_attempts {
237                        let mut final_ex = original;
238                        final_ex
239                            .input
240                            .set_header(HEADER_REDELIVERED, Value::Bool(true));
241                        final_ex.input.set_header(
242                            HEADER_REDELIVERY_COUNTER,
243                            Value::Number(backoff.max_attempts.into()),
244                        );
245                        final_ex.input.set_header(
246                            HEADER_REDELIVERY_MAX_COUNTER,
247                            Value::Number(backoff.max_attempts.into()),
248                        );
249                        return RetryOutcome::Exhausted {
250                            exchange: final_ex,
251                            error: retry_err,
252                            policy,
253                        };
254                    }
255                }
256            }
257        }
258
259        RetryOutcome::Exhausted {
260            exchange: original,
261            error,
262            policy,
263        }
264    }
265
266    async fn handle_step(
267        &self,
268        policy: Option<PolicyId>,
269        mut exchange: Exchange,
270        error: CamelError,
271    ) -> Result<StepDisposition, CamelError> {
272        let (disposition, producer) = self.resolve_producer(policy);
273
274        // Run on_steps if present (using the SAME policy identified by PolicyId)
275        if let Some(PolicyId(idx)) = policy
276            && let Some((p, _)) = self.policies.get(idx)
277            && let Some(ref steps) = p.on_steps
278        {
279            let snapshot = exchange.clone();
280            exchange.set_error(error.clone());
281            let mut step_pipeline = steps.clone_inner();
282            let step_result = async {
283                let svc = step_pipeline.ready().await?;
284                svc.call(exchange).await
285            }
286            .await;
287            match step_result {
288                Ok(mut ex) => match disposition {
289                    ExceptionDisposition::Handled => {
290                        ex.handle_error();
291                        return Ok(StepDisposition::Handled(ex));
292                    }
293                    ExceptionDisposition::Continued => {
294                        ex.clear_error();
295                        return Ok(StepDisposition::Continued(ex));
296                    }
297                    ExceptionDisposition::Propagate => {
298                        exchange = snapshot;
299                    }
300                },
301                Err(_) => {
302                    exchange = snapshot;
303                }
304            }
305        }
306
307        // No on_steps, on_steps failed, or Propagate — forward to DLC/handler.
308        // BIND the returned exchange (must use handler output).
309        self.restore_original_message_if_enabled(&mut exchange);
310        exchange.set_error(error.clone());
311        match send_to_handler(exchange, producer).await {
312            Ok(handler_ex) => match disposition {
313                ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
314                ExceptionDisposition::Handled => {
315                    let mut ex = handler_ex;
316                    ex.clear_error();
317                    Ok(StepDisposition::Handled(ex))
318                }
319                ExceptionDisposition::Continued => {
320                    let mut ex = handler_ex;
321                    ex.clear_error();
322                    Ok(StepDisposition::Continued(ex))
323                }
324            },
325            // Dead code by construction: send_to_handler always returns Ok.
326            Err(_) => Ok(StepDisposition::Propagate(error)),
327        }
328    }
329
330    async fn handle_boundary(
331        &self,
332        _kind: BoundaryKind,
333        mut exchange: Exchange,
334        error: CamelError,
335    ) -> Result<Exchange, CamelError> {
336        // Boundary errors: match policy, run on_steps, forward to DLC.
337        // Disposition mapping:
338        //   Handled → clear error, return Ok(exchange)
339        //   Propagate | Continued → forward to DLC, return Ok(exchange_with_error)
340        //   (Continued at boundary = Propagate — no next step to continue to)
341        let policy = self.match_policy(&error);
342        let (disposition, producer) = self.resolve_producer(policy);
343
344        // Run on_steps if present (shared logic with handle_step)
345        if let Some(PolicyId(idx)) = policy
346            && let Some((p, _)) = self.policies.get(idx)
347            && let Some(ref steps) = p.on_steps
348        {
349            let snapshot = exchange.clone();
350            exchange.set_error(error.clone());
351            let mut step_pipeline = steps.clone_inner();
352            let step_result = async {
353                let svc = step_pipeline.ready().await?;
354                svc.call(exchange).await
355            }
356            .await;
357            match step_result {
358                Ok(mut ex) => match disposition {
359                    ExceptionDisposition::Handled => {
360                        ex.handle_error();
361                        return Ok(ex);
362                    }
363                    ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
364                        exchange = snapshot;
365                    }
366                },
367                Err(_) => {
368                    exchange = snapshot;
369                }
370            }
371        }
372
373        // Forward to DLC/handler — BIND returned exchange
374        self.restore_original_message_if_enabled(&mut exchange);
375        exchange.set_error(error.clone());
376        match send_to_handler(exchange, producer).await {
377            Ok(handler_ex) => match disposition {
378                ExceptionDisposition::Handled => {
379                    let mut ex = handler_ex;
380                    ex.clear_error();
381                    Ok(ex)
382                }
383                ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
384                    let mut ex = handler_ex;
385                    ex.set_error(error);
386                    Ok(ex)
387                }
388            },
389            // Dead code by construction: send_to_handler always returns Ok.
390            Err(e) => Err(e),
391        }
392    }
393}
394
395/// Tower Layer that wraps a pipeline with error handling behaviour.
396///
397/// Constructed with already-resolved producers; URI resolution happens in `camel-core`.
398pub struct ErrorHandlerLayer {
399    /// Resolved DLC producer (None = log only).
400    dlc_producer: Option<BoxProcessor>,
401    /// Policies with their resolved `handled_by` producers.
402    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
403}
404
405impl ErrorHandlerLayer {
406    /// Create the layer with pre-resolved producers.
407    pub fn new(
408        dlc_producer: Option<BoxProcessor>,
409        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
410    ) -> Self {
411        Self {
412            dlc_producer,
413            policies,
414        }
415    }
416}
417
418impl<S> Layer<S> for ErrorHandlerLayer
419where
420    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
421    S::Future: Send + 'static,
422{
423    type Service = ErrorHandlerService<S>;
424
425    fn layer(&self, inner: S) -> Self::Service {
426        ErrorHandlerService {
427            inner,
428            dlc_producer: self.dlc_producer.clone(),
429            policies: self
430                .policies
431                .iter()
432                .map(|(p, prod)| (p.clone(), prod.clone()))
433                .collect(),
434        }
435    }
436}
437
438/// Tower Service that absorbs pipeline errors by retrying and/or forwarding to a DLC.
439///
440/// `call` always returns `Ok` — errors are absorbed. The returned exchange will have
441/// `has_error() == true` if the pipeline ultimately failed.
442pub struct ErrorHandlerService<S> {
443    inner: S,
444    dlc_producer: Option<BoxProcessor>,
445    policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
446}
447
448impl<S: Clone> Clone for ErrorHandlerService<S> {
449    fn clone(&self) -> Self {
450        Self {
451            inner: self.inner.clone(),
452            dlc_producer: self.dlc_producer.clone(),
453            policies: self
454                .policies
455                .iter()
456                .map(|(p, prod)| (p.clone(), prod.clone()))
457                .collect(),
458        }
459    }
460}
461
462impl<S> ErrorHandlerService<S>
463where
464    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
465    S::Future: Send + 'static,
466{
467    /// Create the service directly (used in unit tests; in production use the Layer).
468    pub fn new(
469        inner: S,
470        dlc_producer: Option<BoxProcessor>,
471        policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
472    ) -> Self {
473        Self {
474            inner,
475            dlc_producer,
476            policies,
477        }
478    }
479}
480
481impl<S> Service<Exchange> for ErrorHandlerService<S>
482where
483    S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
484    S::Future: Send + 'static,
485{
486    type Response = Exchange;
487    type Error = CamelError;
488    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
489
490    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
491        // Preserve backpressure (Pending) but never leak readiness errors upward.
492        // Readiness errors are deferred to call(), where they go through the same
493        // retry/onException/DLC path as call() errors. This is safe because call()
494        // re-checks readiness on a fresh inner clone via inner.ready().await.
495        match self.inner.poll_ready(cx) {
496            Poll::Pending => Poll::Pending,
497            Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
498            Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
499        }
500    }
501
502    fn call(&mut self, exchange: Exchange) -> Self::Future {
503        let mut inner = self.inner.clone();
504        let dlc = self.dlc_producer.clone();
505        let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
506            .policies
507            .iter()
508            .map(|(p, prod)| (p.clone(), prod.clone()))
509            .collect();
510
511        Box::pin(async move {
512            let original = exchange.clone();
513            let result = match inner.ready().await {
514                Ok(svc) => svc.call(exchange).await,
515                Err(e) => Err(e), // readiness error — enters retry/DLC/onException path below
516            };
517
518            let err = match result {
519                Ok(ex) => return Ok(ex),
520                Err(e) => e,
521            };
522
523            // Find the first matching policy.
524            let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
525
526            if let Some((policy, policy_producer)) = matched {
527                // Retry if configured.
528                if let Some(ref backoff) = policy.retry {
529                    for attempt in 0..backoff.max_attempts {
530                        let delay = backoff.delay_for(attempt);
531                        tokio::time::sleep(delay).await;
532
533                        // Set redelivery headers
534                        let mut ex = original.clone();
535                        ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
536                        ex.input.set_header(
537                            HEADER_REDELIVERY_COUNTER,
538                            Value::Number((attempt + 1).into()),
539                        );
540                        ex.input.set_header(
541                            HEADER_REDELIVERY_MAX_COUNTER,
542                            Value::Number(backoff.max_attempts.into()),
543                        );
544
545                        let result = match inner.ready().await {
546                            Ok(svc) => svc.call(ex).await,
547                            Err(e) => Err(e), // readiness error — enters retry exhaustion path
548                        };
549                        match result {
550                            Ok(ex) => return Ok(ex),
551                            Err(retry_err) => {
552                                if attempt + 1 == backoff.max_attempts {
553                                    // Retries exhausted — send to handler.
554                                    let mut original = original.clone();
555                                    original
556                                        .input
557                                        .set_header(HEADER_REDELIVERED, Value::Bool(true));
558                                    original.input.set_header(
559                                        HEADER_REDELIVERY_COUNTER,
560                                        Value::Number(backoff.max_attempts.into()),
561                                    );
562                                    original.input.set_header(
563                                        HEADER_REDELIVERY_MAX_COUNTER,
564                                        Value::Number(backoff.max_attempts.into()),
565                                    );
566                                    if let Some(ref steps) = policy.on_steps {
567                                        let handler = policy_producer.clone().or(dlc.clone());
568                                        return execute_on_steps(
569                                            original,
570                                            retry_err,
571                                            steps,
572                                            policy.disposition,
573                                            handler,
574                                        )
575                                        .await;
576                                    }
577                                    original.set_error(retry_err);
578                                    let handler = policy_producer.or(dlc);
579                                    return send_to_handler(original, handler).await;
580                                }
581                            }
582                        }
583                    }
584                }
585                // No retry configured (or 0 attempts) — send to policy handler or DLC.
586                if let Some(ref steps) = policy.on_steps {
587                    let handler = policy_producer.or(dlc);
588                    return execute_on_steps(original, err, steps, policy.disposition, handler)
589                        .await;
590                }
591                let mut ex = original.clone();
592                ex.set_error(err);
593                let handler = policy_producer.or(dlc);
594                send_to_handler(ex, handler).await
595            } else {
596                // No matching policy — forward directly to DLC.
597                let mut ex = original;
598                ex.set_error(err);
599                send_to_handler(ex, dlc).await
600            }
601        })
602    }
603}
604
605async fn send_to_handler(
606    exchange: Exchange,
607    producer: Option<BoxProcessor>,
608) -> Result<Exchange, CamelError> {
609    match producer {
610        None => {
611            // log-policy: system-broken
612            tracing::error!(
613                error = ?exchange.error,
614                "Exchange failed with no error handler configured"
615            );
616            Ok(exchange)
617        }
618        Some(mut prod) => match prod.ready().await {
619            Err(e) => {
620                // log-policy: system-broken
621                tracing::error!("DLC/handler not ready: {e}");
622                Ok(exchange)
623            }
624            Ok(svc) => match svc.call(exchange.clone()).await {
625                Ok(ex) => Ok(ex),
626                Err(e) => {
627                    // log-policy: system-broken
628                    tracing::error!("DLC/handler call failed: {e}");
629                    // Return the original exchange with original error intact.
630                    Ok(exchange)
631                }
632            },
633        },
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use camel_api::{
641        BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, OutcomePipeline,
642        OutcomeSegment, PipelineOutcome, RetryableStep, SyncBoxProcessor, Value,
643        error_handler::RedeliveryPolicy,
644    };
645    use std::future::Future;
646    use std::pin::Pin;
647    use std::sync::{
648        Arc,
649        atomic::{AtomicU32, Ordering},
650    };
651    use std::time::Duration;
652    use tower::ServiceExt;
653
654    fn make_exchange() -> Exchange {
655        Exchange::new(Message::new("test"))
656    }
657
658    fn failing_processor() -> BoxProcessor {
659        BoxProcessor::from_fn(|_ex| {
660            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
661        })
662    }
663
664    fn ok_processor() -> BoxProcessor {
665        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
666    }
667
668    fn fail_n_times(n: u32) -> BoxProcessor {
669        let count = Arc::new(AtomicU32::new(0));
670        BoxProcessor::from_fn(move |ex| {
671            let count = Arc::clone(&count);
672            Box::pin(async move {
673                let c = count.fetch_add(1, Ordering::SeqCst);
674                if c < n {
675                    Err(CamelError::ProcessorError(format!("attempt {c}")))
676                } else {
677                    Ok(ex)
678                }
679            })
680        })
681    }
682
683    #[tokio::test]
684    async fn test_ok_passthrough() {
685        let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
686        let result = svc.oneshot(make_exchange()).await;
687        assert!(result.is_ok());
688        assert!(!result.unwrap().has_error());
689    }
690
691    #[tokio::test]
692    async fn test_error_goes_to_dlc() {
693        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
694        let received_clone = Arc::clone(&received);
695        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
696            let r = Arc::clone(&received_clone);
697            Box::pin(async move {
698                r.lock().unwrap().push(ex.clone());
699                Ok(ex)
700            })
701        });
702
703        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
704        let result = svc.oneshot(make_exchange()).await;
705        assert!(result.is_ok());
706        let ex = result.unwrap();
707        assert!(ex.has_error());
708        assert_eq!(received.lock().unwrap().len(), 1);
709    }
710
711    #[tokio::test]
712    async fn test_retry_recovers() {
713        let inner = fail_n_times(2);
714        let policy = ExceptionPolicy {
715            matches: Arc::new(|_| true),
716            retry: Some(RedeliveryPolicy {
717                max_attempts: 3,
718                initial_delay: Duration::from_millis(1),
719                multiplier: 1.0,
720                max_delay: Duration::from_millis(10),
721                jitter_factor: 0.0,
722            }),
723            handled_by: None,
724            on_steps: None,
725            disposition: ExceptionDisposition::Propagate,
726        };
727        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
728        let result = svc.oneshot(make_exchange()).await;
729        assert!(result.is_ok());
730        assert!(!result.unwrap().has_error());
731    }
732
733    #[tokio::test]
734    async fn test_retry_exhausted_goes_to_dlc() {
735        let inner = fail_n_times(10);
736        let received = Arc::new(std::sync::Mutex::new(0u32));
737        let received_clone = Arc::clone(&received);
738        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
739            let r = Arc::clone(&received_clone);
740            Box::pin(async move {
741                *r.lock().unwrap() += 1;
742                Ok(ex)
743            })
744        });
745        let policy = ExceptionPolicy {
746            matches: Arc::new(|_| true),
747            retry: Some(RedeliveryPolicy {
748                max_attempts: 2,
749                initial_delay: Duration::from_millis(1),
750                multiplier: 1.0,
751                max_delay: Duration::from_millis(10),
752                jitter_factor: 0.0,
753            }),
754            handled_by: None,
755            on_steps: None,
756            disposition: ExceptionDisposition::Propagate,
757        };
758        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
759        let result = svc.oneshot(make_exchange()).await;
760        assert!(result.is_ok());
761        assert!(result.unwrap().has_error());
762        assert_eq!(*received.lock().unwrap(), 1);
763    }
764
765    #[test]
766    fn test_poll_ready_delegates_to_inner() {
767        use std::sync::atomic::AtomicBool;
768
769        /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
770        #[derive(Clone)]
771        struct DelayedReadyService {
772            ready: Arc<AtomicBool>,
773        }
774
775        impl Service<Exchange> for DelayedReadyService {
776            type Response = Exchange;
777            type Error = CamelError;
778            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
779
780            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
781                if self.ready.fetch_or(true, Ordering::SeqCst) {
782                    // Already marked ready (second+ call) → Ready
783                    Poll::Ready(Ok(()))
784                } else {
785                    // First call → Pending, schedule a wake
786                    cx.waker().wake_by_ref();
787                    Poll::Pending
788                }
789            }
790
791            fn call(&mut self, ex: Exchange) -> Self::Future {
792                Box::pin(async move { Ok(ex) })
793            }
794        }
795
796        let waker = futures::task::noop_waker();
797        let mut cx = Context::from_waker(&waker);
798
799        let inner = DelayedReadyService {
800            ready: Arc::new(AtomicBool::new(false)),
801        };
802        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
803
804        // First poll_ready: inner returns Pending, so ErrorHandlerService must too.
805        let first = Pin::new(&mut svc).poll_ready(&mut cx);
806        assert!(first.is_pending(), "expected Pending on first poll_ready");
807
808        // Second poll_ready: inner returns Ready, so ErrorHandlerService must too.
809        let second = Pin::new(&mut svc).poll_ready(&mut cx);
810        assert!(second.is_ready(), "expected Ready on second poll_ready");
811    }
812
813    #[tokio::test]
814    async fn test_no_matching_policy_uses_dlc() {
815        let received = Arc::new(std::sync::Mutex::new(0u32));
816        let received_clone = Arc::clone(&received);
817        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
818            let r = Arc::clone(&received_clone);
819            Box::pin(async move {
820                *r.lock().unwrap() += 1;
821                Ok(ex)
822            })
823        });
824        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
825        let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
826        let result = svc.oneshot(make_exchange()).await;
827        assert!(result.is_ok());
828        assert_eq!(*received.lock().unwrap(), 1);
829    }
830
831    #[tokio::test]
832    async fn test_redelivery_headers_are_set() {
833        use camel_api::error_handler::{
834            HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
835            RedeliveryPolicy,
836        };
837
838        let inner = fail_n_times(10);
839        let received = Arc::new(std::sync::Mutex::new(None));
840        let received_clone = Arc::clone(&received);
841        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
842            let r = Arc::clone(&received_clone);
843            Box::pin(async move {
844                *r.lock().unwrap() = Some(ex.clone());
845                Ok(ex)
846            })
847        });
848
849        let policy = ExceptionPolicy {
850            matches: Arc::new(|_| true),
851            retry: Some(RedeliveryPolicy {
852                max_attempts: 2,
853                initial_delay: Duration::from_millis(1),
854                multiplier: 1.0,
855                max_delay: Duration::from_millis(10),
856                jitter_factor: 0.0,
857            }),
858            handled_by: None,
859            on_steps: None,
860            disposition: ExceptionDisposition::Propagate,
861        };
862
863        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
864        let _ = svc.oneshot(make_exchange()).await.unwrap();
865
866        let ex = received.lock().unwrap().take().unwrap();
867        assert_eq!(
868            ex.input.header(HEADER_REDELIVERED),
869            Some(&Value::Bool(true))
870        );
871        assert_eq!(
872            ex.input.header(HEADER_REDELIVERY_COUNTER),
873            Some(&Value::Number(2.into()))
874        );
875        assert_eq!(
876            ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
877            Some(&Value::Number(2.into()))
878        );
879    }
880
881    #[tokio::test]
882    async fn test_jitter_produces_varying_delays_in_retry_flow() {
883        use std::time::Instant;
884
885        let inner = fail_n_times(10);
886        let received = Arc::new(std::sync::Mutex::new(None));
887        let received_clone = Arc::clone(&received);
888        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
889            let r = Arc::clone(&received_clone);
890            Box::pin(async move {
891                *r.lock().unwrap() = Some(ex.clone());
892                Ok(ex)
893            })
894        });
895
896        let policy = ExceptionPolicy {
897            matches: Arc::new(|_| true),
898            retry: Some(RedeliveryPolicy {
899                max_attempts: 5,
900                initial_delay: Duration::from_millis(20),
901                multiplier: 1.0,
902                max_delay: Duration::from_millis(100),
903                jitter_factor: 0.5,
904            }),
905            handled_by: None,
906            on_steps: None,
907            disposition: ExceptionDisposition::Propagate,
908        };
909
910        let start = Instant::now();
911        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
912        let _ = svc.oneshot(make_exchange()).await.unwrap();
913        let elapsed = start.elapsed();
914
915        assert!(
916            received.lock().unwrap().is_some(),
917            "DLC should have received exchange"
918        );
919
920        assert!(
921            elapsed >= Duration::from_millis(50),
922            "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
923        );
924
925        assert!(
926            elapsed <= Duration::from_millis(500),
927            "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
928        );
929    }
930
931    #[tokio::test]
932    async fn test_on_steps_handled_true_consumes_error() {
933        use tower::ServiceExt;
934
935        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
936            ex.input.body = camel_api::Body::Bytes("handled".into());
937            async move { Ok(ex) }
938        }));
939        let policy = ExceptionPolicy {
940            matches: Arc::new(|_| true),
941            retry: None,
942            handled_by: None,
943            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
944            disposition: ExceptionDisposition::Handled,
945        };
946        let inner = tower::service_fn(|_ex: Exchange| async {
947            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
948        });
949        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
950        let ex = Exchange::default();
951        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
952        assert!(result.error.is_none(), "handled:true should clear error");
953        assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
954    }
955
956    #[tokio::test]
957    async fn test_on_steps_handled_false_propagates_error() {
958        use tower::ServiceExt;
959
960        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
961            ex.input.body = camel_api::Body::Bytes("handled".into());
962            async move { Ok(ex) }
963        }));
964        let policy = ExceptionPolicy {
965            matches: Arc::new(|_| true),
966            retry: None,
967            handled_by: None,
968            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
969            disposition: ExceptionDisposition::Propagate,
970        };
971        let inner = tower::service_fn(|_ex: Exchange| async {
972            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
973        });
974        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
975        let ex = Exchange::default();
976        let result = svc.ready().await.unwrap().call(ex).await;
977        assert!(result.is_err(), "handled:false should propagate error");
978    }
979
980    // --- Readiness error capture tests ---
981    //
982    // ErrorHandlerService must capture readiness errors (poll_ready returning Err)
983    // and route them through retry/onException/DLC instead of propagating raw.
984
985    /// A service whose `poll_ready` always returns `Ready(Err(...))` but whose
986    /// `call` returns `Ok`. This simulates a permanently-not-ready endpoint.
987    #[derive(Clone)]
988    struct ReadinessFailService {
989        error: CamelError,
990    }
991
992    impl ReadinessFailService {
993        fn new(error: CamelError) -> Self {
994            Self { error }
995        }
996    }
997
998    impl Service<Exchange> for ReadinessFailService {
999        type Response = Exchange;
1000        type Error = CamelError;
1001        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1002
1003        fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1004            Poll::Ready(Err(self.error.clone()))
1005        }
1006
1007        fn call(&mut self, ex: Exchange) -> Self::Future {
1008            // call() should never be reached if poll_ready returns Err,
1009            // but Tower's ready().await on a clone will re-encounter the readiness error.
1010            Box::pin(async move { Ok(ex) })
1011        }
1012    }
1013
1014    #[tokio::test]
1015    async fn test_readiness_error_goes_to_dlc() {
1016        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1017        let inner = ReadinessFailService {
1018            error: readiness_err,
1019        };
1020
1021        let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
1022        let received_clone = Arc::clone(&received);
1023        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1024            let r = Arc::clone(&received_clone);
1025            Box::pin(async move {
1026                r.lock().unwrap().push(ex.clone());
1027                Ok(ex)
1028            })
1029        });
1030
1031        let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1032        let result = svc.oneshot(make_exchange()).await;
1033
1034        // The error must be absorbed (Ok), not propagated raw (Err).
1035        assert!(
1036            result.is_ok(),
1037            "readiness error should be captured and sent to DLC, got: {:?}",
1038            result
1039        );
1040        let ex = result.unwrap();
1041        assert!(ex.has_error(), "exchange should carry the readiness error");
1042        assert_eq!(
1043            received.lock().unwrap().len(),
1044            1,
1045            "DLC should have received the exchange exactly once"
1046        );
1047    }
1048
1049    #[tokio::test]
1050    async fn test_readiness_error_goes_to_matching_policy() {
1051        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1052        let inner = ReadinessFailService {
1053            error: readiness_err,
1054        };
1055
1056        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1057            ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1058            async move { Ok(ex) }
1059        }));
1060        let policy = ExceptionPolicy {
1061            matches: Arc::new(|_| true),
1062            retry: None,
1063            handled_by: None,
1064            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1065            disposition: ExceptionDisposition::Handled,
1066        };
1067
1068        let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1069        let result = svc.oneshot(make_exchange()).await;
1070
1071        // The error must be absorbed and routed to the on_steps handler.
1072        assert!(
1073            result.is_ok(),
1074            "readiness error should be captured by policy, got: {:?}",
1075            result
1076        );
1077        let ex = result.unwrap();
1078        assert!(ex.error.is_none(), "handled:true should clear error");
1079        assert!(
1080            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1081            "on_steps should have modified the body"
1082        );
1083    }
1084
1085    #[test]
1086    fn test_poll_ready_converts_readiness_error_to_ok() {
1087        let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1088        let inner = ReadinessFailService {
1089            error: readiness_err,
1090        };
1091        let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1092
1093        let waker = futures::task::noop_waker();
1094        let mut cx = Context::from_waker(&waker);
1095
1096        // poll_ready must NOT propagate the readiness error — convert to Ok.
1097        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1098        match poll {
1099            Poll::Ready(Ok(())) => { /* correct */ }
1100            Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1101            Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1102        }
1103    }
1104
1105    // --- invoke_processor tests ---
1106
1107    #[tokio::test]
1108    async fn test_invoke_processor_returns_ok_on_success() {
1109        let mut svc = ok_processor();
1110        let ex = make_exchange();
1111        let result = invoke_processor(&mut svc, ex).await;
1112        assert!(result.is_ok());
1113    }
1114
1115    #[tokio::test]
1116    async fn test_invoke_processor_captures_readiness_error() {
1117        let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1118            CamelError::ProcessorError("not ready".into()),
1119        ));
1120        let ex = make_exchange();
1121        let result = invoke_processor(&mut failing_ready, ex).await;
1122        assert!(result.is_err());
1123    }
1124
1125    #[tokio::test]
1126    async fn test_on_steps_handled_true_clears_exception_properties() {
1127        use tower::ServiceExt;
1128
1129        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1130            ex.input.body = camel_api::Body::Bytes("handled".into());
1131            async move { Ok(ex) }
1132        }));
1133        let policy = ExceptionPolicy {
1134            matches: Arc::new(|_| true),
1135            retry: None,
1136            handled_by: None,
1137            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1138            disposition: ExceptionDisposition::Handled,
1139        };
1140        let inner = tower::service_fn(|_ex: Exchange| async {
1141            Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1142        });
1143        let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1144        let ex = Exchange::default();
1145        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1146        assert!(result.error.is_none(), "handled:true should clear error");
1147        assert!(
1148            result
1149                .properties
1150                .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1151                .is_none(),
1152            "handled:true should clear exception properties"
1153        );
1154        assert!(
1155            result
1156                .properties
1157                .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1158                .is_none(),
1159            "handled:true should clear exception kind property"
1160        );
1161        assert!(
1162            result
1163                .properties
1164                .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1165                .is_none(),
1166            "handled:true should clear exception caught property"
1167        );
1168    }
1169
1170    // ── DefaultRouteErrorHandler tests ──
1171
1172    #[test]
1173    fn test_match_policy_returns_id_for_matching_error() {
1174        let handler = DefaultRouteErrorHandler::new(
1175            None,
1176            vec![(
1177                ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1178                None,
1179            )],
1180        );
1181        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1182        assert_eq!(id, Some(PolicyId(0)));
1183    }
1184
1185    #[test]
1186    fn test_match_policy_returns_none_for_unmatched() {
1187        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1188        let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1189        assert_eq!(id, None);
1190    }
1191
1192    // ── retry_step tests ──
1193
1194    #[tokio::test]
1195    async fn test_retry_step_succeeds_on_second_attempt() {
1196        let mut policy = ExceptionPolicy::new(|_| true);
1197        policy.retry = Some(RedeliveryPolicy::new(3));
1198        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1199        let mut step = fail_n_times(1); // fails once, then succeeds
1200        let ex = make_exchange();
1201        let outcome = handler
1202            .retry_step(
1203                Some(PolicyId(0)),
1204                &mut step,
1205                ex,
1206                CamelError::ProcessorError("attempt 0".into()),
1207            )
1208            .await;
1209        assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1210    }
1211
1212    #[tokio::test]
1213    async fn test_retry_step_exhausted_when_all_fail() {
1214        let mut policy = ExceptionPolicy::new(|_| true);
1215        policy.retry = Some(RedeliveryPolicy::new(3));
1216        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1217        let mut step = failing_processor();
1218        let ex = make_exchange();
1219        let outcome = handler
1220            .retry_step(
1221                Some(PolicyId(0)),
1222                &mut step,
1223                ex,
1224                CamelError::ProcessorError("boom".into()),
1225            )
1226            .await;
1227        assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1228    }
1229
1230    #[tokio::test]
1231    async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1232        let handler = DefaultRouteErrorHandler::new(None, vec![]);
1233        let mut step = ok_processor();
1234        let ex = make_exchange();
1235        let outcome = handler
1236            .retry_step(
1237                None,
1238                &mut step,
1239                ex,
1240                CamelError::ProcessorError("boom".into()),
1241            )
1242            .await;
1243        assert!(matches!(
1244            outcome,
1245            RetryOutcome::Exhausted { policy: None, .. }
1246        ));
1247    }
1248
1249    // ── handle_step tests ──
1250
1251    #[tokio::test]
1252    async fn test_handle_step_propagate_sends_to_dlc() {
1253        let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1254        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1255        let ex = make_exchange();
1256        let result = handler
1257            .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1258            .await;
1259        assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1260    }
1261
1262    #[tokio::test]
1263    async fn test_handle_step_handled_uses_handler_output() {
1264        let handler_producer = BoxProcessor::from_fn(|mut ex| {
1265            Box::pin(async move {
1266                ex.input.set_header("processed_by", Value::Bool(true));
1267                Ok(ex)
1268            })
1269        });
1270        let policy = ExceptionPolicy {
1271            matches: std::sync::Arc::new(|_| true),
1272            retry: None,
1273            handled_by: None,
1274            on_steps: None,
1275            disposition: ExceptionDisposition::Handled,
1276        };
1277        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1278        let mut ex = make_exchange();
1279        ex.set_error(CamelError::ProcessorError("boom".into()));
1280        let result = handler
1281            .handle_step(
1282                Some(PolicyId(0)),
1283                ex,
1284                CamelError::ProcessorError("boom".into()),
1285            )
1286            .await;
1287        match result {
1288            Ok(StepDisposition::Handled(ex)) => {
1289                assert!(!ex.has_error(), "error should be cleared");
1290                assert_eq!(
1291                    ex.input.header("processed_by"),
1292                    Some(&Value::Bool(true)),
1293                    "should use handler's output exchange"
1294                );
1295            }
1296            other => panic!("expected Handled, got {:?}", other.is_ok()),
1297        }
1298    }
1299
1300    #[tokio::test]
1301    async fn test_handle_step_continued_clears_error() {
1302        let policy = ExceptionPolicy {
1303            matches: std::sync::Arc::new(|_| true),
1304            retry: None,
1305            handled_by: None,
1306            on_steps: None,
1307            disposition: ExceptionDisposition::Continued,
1308        };
1309        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1310        let mut ex = make_exchange();
1311        ex.set_error(CamelError::ProcessorError("boom".into()));
1312        let result = handler
1313            .handle_step(
1314                Some(PolicyId(0)),
1315                ex,
1316                CamelError::ProcessorError("boom".into()),
1317            )
1318            .await;
1319        match result {
1320            Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1321            other => panic!("expected Continued, got {:?}", other.is_ok()),
1322        }
1323    }
1324
1325    #[tokio::test]
1326    async fn test_handle_step_with_on_steps_handled() {
1327        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1328            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1329            async move { Ok(ex) }
1330        }));
1331        let policy = ExceptionPolicy {
1332            matches: std::sync::Arc::new(|_| true),
1333            retry: None,
1334            handled_by: None,
1335            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1336            disposition: ExceptionDisposition::Handled,
1337        };
1338        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1339        let mut ex = make_exchange();
1340        ex.set_error(CamelError::ProcessorError("boom".into()));
1341        let result = handler
1342            .handle_step(
1343                Some(PolicyId(0)),
1344                ex,
1345                CamelError::ProcessorError("boom".into()),
1346            )
1347            .await;
1348        match result {
1349            Ok(StepDisposition::Handled(ex)) => {
1350                assert!(!ex.has_error(), "error should be cleared");
1351                assert!(
1352                    matches!(ex.input.body, camel_api::Body::Bytes(_)),
1353                    "on_steps should have modified the body"
1354                );
1355            }
1356            other => panic!("expected Handled, got: {}", other.is_ok()),
1357        }
1358    }
1359
1360    #[tokio::test]
1361    async fn test_handle_step_with_on_steps_propagate_falls_through() {
1362        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1363            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1364            async move { Ok(ex) }
1365        }));
1366        let dlc_called = Arc::new(AtomicU32::new(0));
1367        let dlc_called_clone = dlc_called.clone();
1368        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1369            let c = dlc_called_clone.clone();
1370            Box::pin(async move {
1371                c.fetch_add(1, Ordering::SeqCst);
1372                Ok(ex)
1373            })
1374        });
1375        let policy = ExceptionPolicy {
1376            matches: std::sync::Arc::new(|_| true),
1377            retry: None,
1378            handled_by: None,
1379            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1380            disposition: ExceptionDisposition::Propagate,
1381        };
1382        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1383        let mut ex = make_exchange();
1384        ex.set_error(CamelError::ProcessorError("boom".into()));
1385        let result = handler
1386            .handle_step(
1387                Some(PolicyId(0)),
1388                ex,
1389                CamelError::ProcessorError("boom".into()),
1390            )
1391            .await;
1392        assert!(
1393            matches!(result, Ok(StepDisposition::Propagate(_))),
1394            "Propagate disposition should return Propagate"
1395        );
1396        assert_eq!(
1397            dlc_called.load(Ordering::SeqCst),
1398            1,
1399            "DLC should be called when on_steps disposition is Propagate"
1400        );
1401    }
1402
1403    #[tokio::test]
1404    async fn test_handle_step_dlc_failure_propagates() {
1405        let failing_dlc = BoxProcessor::from_fn(|_| {
1406            Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1407        });
1408        let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1409        let ex = make_exchange();
1410        let result = handler
1411            .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1412            .await;
1413        assert!(
1414            matches!(result, Ok(StepDisposition::Propagate(_))),
1415            "DLC failure should still return Propagate with original error"
1416        );
1417    }
1418
1419    // ── handle_boundary tests ──
1420
1421    #[tokio::test]
1422    async fn test_handle_boundary_security_error_goes_to_dlc() {
1423        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1424        let count_clone = dlc_count.clone();
1425        let dlc = BoxProcessor::from_fn(move |ex| {
1426            let c = count_clone.clone();
1427            Box::pin(async move {
1428                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1429                Ok(ex)
1430            })
1431        });
1432        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1433        let ex = make_exchange();
1434        let result = handler
1435            .handle_boundary(
1436                BoundaryKind::Security,
1437                ex,
1438                CamelError::Unauthorized("denied".into()),
1439            )
1440            .await;
1441        assert!(result.is_ok());
1442        assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1443    }
1444
1445    #[tokio::test]
1446    async fn test_handle_boundary_handled_clears_error() {
1447        let policy = ExceptionPolicy {
1448            matches: std::sync::Arc::new(|_| true),
1449            retry: None,
1450            handled_by: None,
1451            on_steps: None,
1452            disposition: ExceptionDisposition::Handled,
1453        };
1454        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1455        let ex = make_exchange();
1456        let result = handler
1457            .handle_boundary(
1458                BoundaryKind::Security,
1459                ex,
1460                CamelError::Unauthorized("denied".into()),
1461            )
1462            .await;
1463        assert!(result.is_ok());
1464        assert!(
1465            !result.unwrap().has_error(),
1466            "Handled disposition should clear error"
1467        );
1468    }
1469
1470    #[tokio::test]
1471    async fn test_handle_boundary_propagate_preserves_error() {
1472        let policy = ExceptionPolicy {
1473            matches: std::sync::Arc::new(|_| true),
1474            retry: None,
1475            handled_by: None,
1476            on_steps: None,
1477            disposition: ExceptionDisposition::Propagate,
1478        };
1479        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1480        let ex = make_exchange();
1481        let result = handler
1482            .handle_boundary(
1483                BoundaryKind::CircuitBreaker,
1484                ex,
1485                CamelError::CircuitOpen("open".into()),
1486            )
1487            .await;
1488        assert!(result.is_ok(), "boundary errors always return Ok");
1489        assert!(
1490            result.unwrap().has_error(),
1491            "Propagate disposition should preserve error"
1492        );
1493    }
1494
1495    #[tokio::test]
1496    async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1497        let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1498        let count_clone = dlc_count.clone();
1499        let dlc = BoxProcessor::from_fn(move |ex| {
1500            let c = count_clone.clone();
1501            Box::pin(async move {
1502                c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1503                Ok(ex)
1504            })
1505        });
1506        let policy = ExceptionPolicy {
1507            matches: std::sync::Arc::new(|_| true),
1508            retry: None,
1509            handled_by: None,
1510            on_steps: None,
1511            disposition: ExceptionDisposition::Continued,
1512        };
1513        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1514        let ex = make_exchange();
1515        let result = handler
1516            .handle_boundary(
1517                BoundaryKind::Security,
1518                ex,
1519                CamelError::Unauthorized("denied".into()),
1520            )
1521            .await;
1522        assert!(result.is_ok(), "boundary errors always return Ok");
1523        assert!(
1524            result.unwrap().has_error(),
1525            "Continued at boundary should preserve error"
1526        );
1527        assert_eq!(
1528            dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1529            1,
1530            "DLC should be called"
1531        );
1532    }
1533
1534    #[tokio::test]
1535    async fn test_handle_boundary_with_on_steps_handled() {
1536        let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1537            ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1538            async move { Ok(ex) }
1539        }));
1540        let policy = ExceptionPolicy {
1541            matches: std::sync::Arc::new(|_| true),
1542            retry: None,
1543            handled_by: None,
1544            on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1545            disposition: ExceptionDisposition::Handled,
1546        };
1547        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1548        let ex = make_exchange();
1549        let result = handler
1550            .handle_boundary(
1551                BoundaryKind::Security,
1552                ex,
1553                CamelError::Unauthorized("denied".into()),
1554            )
1555            .await;
1556        assert!(result.is_ok(), "boundary errors always return Ok");
1557        let ex = result.unwrap();
1558        assert!(!ex.has_error(), "Handled disposition should clear error");
1559        assert!(
1560            matches!(ex.input.body, camel_api::Body::Bytes(_)),
1561            "on_steps should have modified the body"
1562        );
1563    }
1564
1565    #[tokio::test]
1566    async fn retry_step_segment_stop_maps_to_retry_outcome_stopped() {
1567        use std::sync::Arc;
1568        use std::sync::atomic::{AtomicUsize, Ordering};
1569
1570        #[derive(Clone)]
1571        struct StoppingSegment {
1572            n: Arc<AtomicUsize>,
1573        }
1574        impl OutcomePipeline for StoppingSegment {
1575            fn clone_box(&self) -> Box<dyn OutcomePipeline> {
1576                Box::new(self.clone())
1577            }
1578            fn run<'a>(
1579                &'a mut self,
1580                ex: Exchange,
1581            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
1582                let n = self.n.clone();
1583                Box::pin(async move {
1584                    n.fetch_add(1, Ordering::SeqCst);
1585                    PipelineOutcome::Stopped(ex)
1586                })
1587            }
1588        }
1589
1590        let call_count = Arc::new(AtomicUsize::new(0));
1591        let seg = OutcomeSegment::new(Box::new(StoppingSegment {
1592            n: call_count.clone(),
1593        }));
1594        let mut retryable: Box<dyn RetryableStep> = Box::new(seg);
1595
1596        let mut policy = ExceptionPolicy::new(|_e: &CamelError| true);
1597        policy.retry = Some(RedeliveryPolicy::new(3));
1598        let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1599
1600        let original = Exchange::new(Message::new("retry-me"));
1601        let err = CamelError::ProcessorError("trigger retry".into());
1602        let outcome = handler
1603            .retry_step(Some(PolicyId(0)), retryable.as_mut(), original, err)
1604            .await;
1605
1606        assert!(
1607            matches!(outcome, RetryOutcome::Stopped(_)),
1608            "Segment Stop must map to RetryOutcome::Stopped, got {:?}",
1609            outcome
1610        );
1611        assert_eq!(
1612            call_count.load(Ordering::SeqCst),
1613            1,
1614            "Stop must short-circuit retry — only 1 invoke expected, got {}",
1615            call_count.load(Ordering::SeqCst)
1616        );
1617    }
1618
1619    #[tokio::test]
1620    async fn retry_step_new_signature_works_with_dlc_producer() {
1621        use std::sync::Arc;
1622        use std::sync::atomic::{AtomicUsize, Ordering};
1623
1624        #[derive(Clone)]
1625        struct CountingProducer {
1626            count: Arc<AtomicUsize>,
1627            succeed_on: usize,
1628        }
1629        impl tower::Service<Exchange> for CountingProducer {
1630            type Response = Exchange;
1631            type Error = CamelError;
1632            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1633            fn poll_ready(
1634                &mut self,
1635                _cx: &mut std::task::Context<'_>,
1636            ) -> std::task::Poll<Result<(), Self::Error>> {
1637                std::task::Poll::Ready(Ok(()))
1638            }
1639            fn call(&mut self, ex: Exchange) -> Self::Future {
1640                let n = self.count.fetch_add(1, Ordering::SeqCst);
1641                let succeed_on = self.succeed_on;
1642                Box::pin(async move {
1643                    if n >= succeed_on {
1644                        Ok(ex)
1645                    } else {
1646                        Err(CamelError::ProcessorError("retry".into()))
1647                    }
1648                })
1649            }
1650        }
1651
1652        let count = Arc::new(AtomicUsize::new(0));
1653        let producer = CountingProducer {
1654            count: count.clone(),
1655            succeed_on: 2,
1656        };
1657        let sync_bp = SyncBoxProcessor::new(BoxProcessor::new(producer));
1658        let bp1 = sync_bp.clone_inner();
1659        let bp2 = sync_bp.clone_inner();
1660        let mut retryable1: Box<dyn RetryableStep> = Box::new(bp1);
1661        let mut retryable2: Box<dyn RetryableStep> = Box::new(bp2);
1662
1663        let ex = Exchange::new(Message::new("dlc"));
1664        let outcome1 = retryable1.invoke(ex.clone()).await;
1665        let outcome2 = retryable2.invoke(ex).await;
1666        assert!(matches!(outcome1, PipelineOutcome::Failed(_)));
1667        assert!(matches!(outcome2, PipelineOutcome::Failed(_)));
1668        assert_eq!(
1669            count.load(Ordering::SeqCst),
1670            2,
1671            "DLC producer must be invoked exactly twice through SyncBoxProcessor"
1672        );
1673        drop(retryable1);
1674        drop(retryable2);
1675        drop(sync_bp);
1676    }
1677
1678    // ── use_original_message tests ──
1679
1680    #[tokio::test]
1681    async fn test_use_original_message_restores_body_before_dlc() {
1682        // Verifies that when the extension is set, handle_step restores the
1683        // original message body before the DLC sees it.
1684        let dlc_received = Arc::new(std::sync::Mutex::new(None::<Exchange>));
1685        let dlc_received_clone = Arc::clone(&dlc_received);
1686        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1687            let r = Arc::clone(&dlc_received_clone);
1688            Box::pin(async move {
1689                *r.lock().unwrap() = Some(ex.clone());
1690                Ok(ex)
1691            })
1692        });
1693
1694        let mut handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1695        handler.use_original_message = true;
1696
1697        // Build exchange with original body, stash it, then mutate.
1698        let mut ex = make_exchange();
1699        ex.input = Message::new("original-body");
1700
1701        // Simulate RouteChannelService stashing the original message.
1702        let original: Arc<Message> = Arc::new(ex.input.clone());
1703        ex.set_extension(camel_api::ORIGINAL_MESSAGE_EXTENSION, original);
1704
1705        // Mutate the body (simulating a pipeline step that transforms then fails).
1706        ex.input.body = camel_api::Body::Bytes("mutated-body".into());
1707
1708        // Call handle_step — the restore should fire before send_to_handler.
1709        let result = handler
1710            .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1711            .await;
1712        assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1713
1714        // The DLC must have received the exchange with the ORIGINAL body.
1715        let received = dlc_received
1716            .lock()
1717            .unwrap()
1718            .take()
1719            .expect("DLC should have been called");
1720        let received_text = match &received.input.body {
1721            camel_api::Body::Text(s) => s.clone(),
1722            camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
1723            camel_api::Body::Json(v) => v.to_string(),
1724            _ => String::new(),
1725        };
1726        assert_eq!(
1727            received_text, "original-body",
1728            "DLC should receive original message body, not mutated version"
1729        );
1730    }
1731
1732    #[tokio::test]
1733    async fn test_use_original_message_handle_boundary_restores_before_dlc() {
1734        let dlc_received = Arc::new(std::sync::Mutex::new(None::<Exchange>));
1735        let dlc_received_clone = Arc::clone(&dlc_received);
1736        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1737            let r = Arc::clone(&dlc_received_clone);
1738            Box::pin(async move {
1739                *r.lock().unwrap() = Some(ex.clone());
1740                Ok(ex)
1741            })
1742        });
1743
1744        let mut handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1745        handler.use_original_message = true;
1746
1747        let mut ex = make_exchange();
1748        ex.input = Message::new("orig-boundary");
1749
1750        let original: Arc<Message> = Arc::new(ex.input.clone());
1751        ex.set_extension(camel_api::ORIGINAL_MESSAGE_EXTENSION, original);
1752
1753        // Mutate body before boundary error.
1754        ex.input.body = camel_api::Body::Bytes("mutated-boundary".into());
1755
1756        let result = handler
1757            .handle_boundary(
1758                BoundaryKind::Security,
1759                ex,
1760                CamelError::Unauthorized("denied".into()),
1761            )
1762            .await;
1763        assert!(result.is_ok());
1764
1765        let received = dlc_received
1766            .lock()
1767            .unwrap()
1768            .take()
1769            .expect("DLC should have been called");
1770        let received_text = match &received.input.body {
1771            camel_api::Body::Text(s) => s.clone(),
1772            camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
1773            camel_api::Body::Json(v) => v.to_string(),
1774            _ => String::new(),
1775        };
1776        assert_eq!(
1777            received_text, "orig-boundary",
1778            "handle_boundary should restore original message before sending to DLC"
1779        );
1780    }
1781
1782    #[tokio::test]
1783    async fn test_use_original_message_false_does_not_restore() {
1784        // When use_original_message is false (default), the mutation should PASS THROUGH.
1785        let dlc_received = Arc::new(std::sync::Mutex::new(None::<Exchange>));
1786        let dlc_received_clone = Arc::clone(&dlc_received);
1787        let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1788            let r = Arc::clone(&dlc_received_clone);
1789            Box::pin(async move {
1790                *r.lock().unwrap() = Some(ex.clone());
1791                Ok(ex)
1792            })
1793        });
1794
1795        let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1796        // use_original_message defaults to false
1797
1798        let mut ex = make_exchange();
1799        ex.input = Message::new("original-body");
1800
1801        // Stash still set but flag is false — must be ignored.
1802        let original: Arc<Message> = Arc::new(ex.input.clone());
1803        ex.set_extension(camel_api::ORIGINAL_MESSAGE_EXTENSION, original);
1804
1805        // Mutate the body.
1806        ex.input.body = camel_api::Body::Bytes("mutated-body".into());
1807
1808        let result = handler
1809            .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1810            .await;
1811        assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1812
1813        let received = dlc_received
1814            .lock()
1815            .unwrap()
1816            .take()
1817            .expect("DLC should have been called");
1818        let received_text = match &received.input.body {
1819            camel_api::Body::Text(s) => s.clone(),
1820            camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
1821            camel_api::Body::Json(v) => v.to_string(),
1822            _ => String::new(),
1823        };
1824        assert_eq!(
1825            received_text, "mutated-body",
1826            "When use_original_message=false, DLC should see the mutated body"
1827        );
1828    }
1829}