Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor};
5
6/// Camel-compatible header names for redelivery state.
7pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11/// Redelivery policy with exponential backoff and optional jitter.
12#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14    pub max_attempts: u32,
15    pub initial_delay: Duration,
16    pub multiplier: f64,
17    pub max_delay: Duration,
18    pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22    /// Create a new policy with default delays (100ms initial, 2x multiplier, 10s max, no jitter).
23    ///
24    /// Note: `max_attempts = 0` means no retries (immediate failure to DLC/handler).
25    /// Use `max_attempts > 0` to enable retry behavior.
26    pub fn new(max_attempts: u32) -> Self {
27        Self {
28            max_attempts,
29            initial_delay: Duration::from_millis(100),
30            multiplier: 2.0,
31            max_delay: Duration::from_secs(10),
32            jitter_factor: 0.0,
33        }
34    }
35
36    /// Override the initial delay before the first retry.
37    pub fn with_initial_delay(mut self, d: Duration) -> Self {
38        self.initial_delay = d;
39        self
40    }
41
42    /// Override the backoff multiplier applied after each attempt.
43    pub fn with_multiplier(mut self, m: f64) -> Self {
44        self.multiplier = m;
45        self
46    }
47
48    /// Cap the maximum delay between retries.
49    pub fn with_max_delay(mut self, d: Duration) -> Self {
50        self.max_delay = d;
51        self
52    }
53
54    /// Set jitter factor (0.0 = no jitter, 0.2 = ±20% randomization).
55    ///
56    /// Recommended values: 0.1-0.3 (10-30%) for most use cases.
57    /// Helps prevent thundering herd problems in distributed systems
58    /// by adding randomization to retry timing.
59    pub fn with_jitter(mut self, j: f64) -> Self {
60        self.jitter_factor = j.clamp(0.0, 1.0);
61        self
62    }
63
64    /// Compute the sleep duration before retry attempt N (0-indexed) with jitter applied.
65    pub fn delay_for(&self, attempt: u32) -> Duration {
66        let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69        if self.jitter_factor > 0.0 {
70            let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71            Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72        } else {
73            Duration::from_millis(capped_ms as u64)
74        }
75    }
76}
77
78/// Disposition for an exception handler or catch clause (ADR-0019).
79///
80/// # YAML casing
81///
82/// serde uses `lowercase` casing: `handled`, `propagate`, `continued`.
83///
84/// # Default divergence
85///
86/// `ExceptionDisposition::default()` returns `Propagate` (first variant per `#[derive(Default)]`),
87/// but the YAML + builder layers default to `Handled` for doTry catch clauses
88/// (via `default_handled_disposition()` in `camel-dsl/src/route_ast.rs` and the `DoCatchBuilder::do_catch_exception` constructor).
89/// Direct struct construction should explicitly set disposition rather than relying on `Default`.
90#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
92#[serde(rename_all = "lowercase")]
93pub enum ExceptionDisposition {
94    /// After retry exhaustion / on_steps / DLC, re-throw to upstream.
95    #[default]
96    Propagate,
97    /// Suppress re-throw. The handler's exchange is the final result.
98    Handled,
99    /// Clear the error. The pipeline continues to the NEXT step.
100    Continued,
101}
102
103/// Opaque identifier for a matched ExceptionPolicy within a RouteErrorHandler.
104/// Index into the policies Vec — valid only within the handler that created it.
105/// If the policies vec is reordered or filtered, indices become stale.
106/// Safe because policies are immutable after handler construction.
107#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub struct PolicyId(pub usize);
109
110/// Result of a single retry attempt by `RouteErrorHandler::retry_step`.
111#[derive(Debug)]
112pub enum RetryOutcome {
113    /// Retry succeeded; pipeline continues with this Exchange.
114    Recovered(Exchange),
115    /// Retry attempt returned `Stopped(ex)`. Bypasses `handle_step` —
116    /// Stop is successful control flow, not exhausted error handling.
117    /// Maps to `PipelineOutcome::Stopped(ex)` in `run_steps`.
118    Stopped(Exchange),
119    /// All retry attempts exhausted. Handler decides via `handle_step`.
120    Exhausted {
121        exchange: Exchange,
122        error: CamelError,
123        policy: Option<PolicyId>,
124    },
125}
126
127/// Object-safe retry abstraction. Unifies `BoxProcessor` and `OutcomeSegment`
128/// for `RouteErrorHandler::retry_step`. See ADR-0024 amendment (Phase 4).
129///
130/// `invoke` returns `PipelineOutcome` (not Tower `Result`), preserving
131/// `Stopped(ex)` state across retry attempts.
132pub trait RetryableStep: Send {
133    fn invoke<'a>(
134        &'a mut self,
135        exchange: Exchange,
136    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>;
137}
138
139/// BoxProcessor adapter: preserves readiness-error routing by calling
140/// `ServiceExt::ready().await` before `Service::call()`. Readiness
141/// failures (e.g. channel closed, circuit breaker open) become
142/// `PipelineOutcome::Failed`, NOT panic or silent skip.
143impl RetryableStep for crate::BoxProcessor {
144    fn invoke<'a>(
145        &'a mut self,
146        exchange: Exchange,
147    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>> {
148        use tower::ServiceExt;
149        Box::pin(async move {
150            match self.ready().await {
151                Ok(ready_svc) => match tower::Service::call(ready_svc, exchange).await {
152                    Ok(ex) => PipelineOutcome::Completed(ex),
153                    Err(err) => PipelineOutcome::Failed(err),
154                },
155                Err(err) => PipelineOutcome::Failed(err),
156            }
157        })
158    }
159}
160
161/// Result of Phase 2 (handle) of step error handling.
162/// Handled and Continued MUST have Exchange.error cleared.
163pub enum StepDisposition {
164    Propagate(CamelError),
165    Handled(Exchange),
166    Continued(Exchange),
167}
168
169/// Identifies which boundary gate produced an infrastructure error.
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum BoundaryKind {
172    Security,
173    CircuitBreaker,
174    Readiness,
175}
176
177/// A rule that matches specific errors and defines retry + redirect behaviour.
178pub struct ExceptionPolicy {
179    /// Predicate: returns `true` if this policy applies to the given error.
180    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
181    /// Optional retry configuration; if absent, no retries are attempted.
182    pub retry: Option<RedeliveryPolicy>,
183    /// Optional URI of a specific endpoint to route failed exchanges to.
184    pub handled_by: Option<String>,
185    /// Optional custom pipeline executed when this policy triggers.
186    pub on_steps: Option<SyncBoxProcessor>,
187    /// What to do after this policy's handler runs.
188    pub disposition: ExceptionDisposition,
189}
190
191impl ExceptionPolicy {
192    /// Create a new policy that matches errors using the given predicate.
193    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
194        Self {
195            matches: Arc::new(matches),
196            retry: None,
197            handled_by: None,
198            on_steps: None,
199            disposition: ExceptionDisposition::Propagate,
200        }
201    }
202}
203
204impl Clone for ExceptionPolicy {
205    fn clone(&self) -> Self {
206        Self {
207            matches: Arc::clone(&self.matches),
208            retry: self.retry.clone(),
209            handled_by: self.handled_by.clone(),
210            on_steps: self.on_steps.clone(),
211            disposition: self.disposition,
212        }
213    }
214}
215
216/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
217#[derive(Clone)]
218pub struct ErrorHandlerConfig {
219    /// URI of the Dead Letter Channel endpoint (None = log only).
220    pub dlc_uri: Option<String>,
221    /// Per-exception policies evaluated in order; first match wins.
222    pub policies: Vec<ExceptionPolicy>,
223    /// When true, restore the original (pre-route, pre-mutation) Message
224    /// (body and headers) before forwarding to the DLC/handler.
225    pub use_original_message: bool,
226}
227
228impl ErrorHandlerConfig {
229    /// Log-only error handler: errors are logged but not forwarded anywhere.
230    pub fn log_only() -> Self {
231        Self {
232            dlc_uri: None,
233            policies: Vec::new(),
234            use_original_message: false,
235        }
236    }
237
238    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
239    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
240        Self {
241            dlc_uri: Some(uri.into()),
242            policies: Vec::new(),
243            use_original_message: false,
244        }
245    }
246
247    /// Start building an `ExceptionPolicy` attached to this config.
248    pub fn on_exception(
249        self,
250        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
251    ) -> ExceptionPolicyBuilder {
252        ExceptionPolicyBuilder {
253            config: self,
254            policy: ExceptionPolicy::new(matches),
255        }
256    }
257}
258
259/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
260pub struct ExceptionPolicyBuilder {
261    config: ErrorHandlerConfig,
262    policy: ExceptionPolicy,
263}
264
265impl ExceptionPolicyBuilder {
266    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
267    pub fn retry(mut self, max_attempts: u32) -> Self {
268        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
269        self
270    }
271
272    /// Override backoff parameters for the retry (call after `.retry()`).
273    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
274        if let Some(ref mut p) = self.policy.retry {
275            p.initial_delay = initial;
276            p.multiplier = multiplier;
277            p.max_delay = max;
278        }
279        self
280    }
281
282    /// Set jitter factor for retry delays (call after `.retry()`).
283    /// Valid range: 0.0 (no jitter) to 1.0 (±100% randomization).
284    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
285        if let Some(ref mut p) = self.policy.retry {
286            p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
287        }
288        self
289    }
290
291    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
292    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
293        self.policy.handled_by = Some(uri.into());
294        self
295    }
296
297    /// Attach a custom pipeline to execute when this policy triggers.
298    pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
299        self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
300        self
301    }
302
303    /// Mark the exception as handled (suppresses re-throw to upstream).
304    pub fn handled(mut self, handled: bool) -> Self {
305        self.policy.disposition = if handled {
306            ExceptionDisposition::Handled
307        } else {
308            ExceptionDisposition::Propagate
309        };
310        self
311    }
312
313    /// Mark the exception as continued (clear error, pipeline continues to next step).
314    pub fn continued(mut self, continued: bool) -> Self {
315        self.policy.disposition = if continued {
316            ExceptionDisposition::Continued
317        } else {
318            ExceptionDisposition::Propagate
319        };
320        self
321    }
322
323    /// Explicitly set disposition to Propagate (default, no-op).
324    pub fn propagate(mut self) -> Self {
325        self.policy.disposition = ExceptionDisposition::Propagate;
326        self
327    }
328
329    /// Finish this policy and return the updated config.
330    pub fn build(mut self) -> ErrorHandlerConfig {
331        self.config.policies.push(self.policy);
332        self.config
333    }
334}
335
336// Backwards compatibility alias
337#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
338pub type ExponentialBackoff = RedeliveryPolicy;
339
340#[cfg(test)]
341mod retryable_step_tests {
342    use super::*;
343    use crate::{BoxProcessor, CamelError, Exchange, Message, PipelineOutcome};
344    use std::future::Future;
345    use std::pin::Pin;
346    use std::sync::Arc;
347    use std::sync::atomic::{AtomicUsize, Ordering};
348
349    struct CountingProcessor {
350        call_count: Arc<AtomicUsize>,
351        succeed: bool,
352    }
353
354    impl tower::Service<Exchange> for CountingProcessor {
355        type Response = Exchange;
356        type Error = CamelError;
357        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
358
359        fn poll_ready(
360            &mut self,
361            _cx: &mut std::task::Context<'_>,
362        ) -> std::task::Poll<Result<(), Self::Error>> {
363            std::task::Poll::Ready(Ok(()))
364        }
365
366        fn call(&mut self, exchange: Exchange) -> Self::Future {
367            let count = self.call_count.clone();
368            let succeed = self.succeed;
369            Box::pin(async move {
370                count.fetch_add(1, Ordering::SeqCst);
371                if succeed {
372                    Ok(exchange)
373                } else {
374                    Err(CamelError::ProcessorError("fail".into()))
375                }
376            })
377        }
378    }
379
380    impl Clone for CountingProcessor {
381        fn clone(&self) -> Self {
382            Self {
383                call_count: self.call_count.clone(),
384                succeed: self.succeed,
385            }
386        }
387    }
388
389    #[tokio::test]
390    async fn boxprocessor_adapter_maps_ok_to_completed() {
391        let count = Arc::new(AtomicUsize::new(0));
392        let processor = CountingProcessor {
393            call_count: count.clone(),
394            succeed: true,
395        };
396        let bp: BoxProcessor = BoxProcessor::new(processor);
397        let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
398        let ex = Exchange::new(Message::new("hello"));
399        let outcome = retryable.invoke(ex).await;
400        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
401        assert_eq!(count.load(Ordering::SeqCst), 1);
402    }
403
404    #[tokio::test]
405    async fn boxprocessor_adapter_maps_err_to_failed() {
406        let processor = CountingProcessor {
407            call_count: Arc::new(AtomicUsize::new(0)),
408            succeed: false,
409        };
410        let bp: BoxProcessor = BoxProcessor::new(processor);
411        let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
412        let ex = Exchange::new(Message::new("hello"));
413        let outcome = retryable.invoke(ex).await;
414        assert!(matches!(outcome, PipelineOutcome::Failed(_)));
415    }
416
417    #[tokio::test]
418    async fn boxprocessor_readiness_error_propagates_to_failed() {
419        struct AlwaysNotReady;
420
421        impl tower::Service<Exchange> for AlwaysNotReady {
422            type Response = Exchange;
423            type Error = CamelError;
424            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
425
426            fn poll_ready(
427                &mut self,
428                _cx: &mut std::task::Context<'_>,
429            ) -> std::task::Poll<Result<(), Self::Error>> {
430                std::task::Poll::Ready(Err(CamelError::ProcessorError(
431                    "readiness failed: consumer closed".into(),
432                )))
433            }
434
435            fn call(&mut self, _ex: Exchange) -> Self::Future {
436                Box::pin(async {
437                    unreachable!("call() must not be reached when poll_ready errors")
438                })
439            }
440        }
441
442        impl Clone for AlwaysNotReady {
443            fn clone(&self) -> Self {
444                AlwaysNotReady
445            }
446        }
447
448        let bp: BoxProcessor = BoxProcessor::new(AlwaysNotReady);
449        let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
450        let ex = Exchange::new(Message::new("hello"));
451        let outcome = retryable.invoke(ex).await;
452        match outcome {
453            PipelineOutcome::Failed(err) => {
454                let msg = err.to_string();
455                assert!(
456                    msg.contains("readiness") || msg.contains("consumer"),
457                    "readiness error message should be preserved, got: {msg}"
458                );
459            }
460            other => panic!(
461                "readiness failure must map to PipelineOutcome::Failed, got {:?}",
462                other
463            ),
464        }
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use crate::BoxProcessor;
472    use crate::CamelError;
473    use std::time::Duration;
474
475    #[test]
476    fn test_redelivery_policy_defaults() {
477        let p = RedeliveryPolicy::new(3);
478        assert_eq!(p.max_attempts, 3);
479        assert_eq!(p.initial_delay, Duration::from_millis(100));
480        assert_eq!(p.multiplier, 2.0);
481        assert_eq!(p.max_delay, Duration::from_secs(10));
482        assert_eq!(p.jitter_factor, 0.0);
483    }
484
485    #[test]
486    fn test_exception_policy_matches() {
487        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
488        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
489        assert!(!(policy.matches)(&CamelError::Io("io".into())));
490    }
491
492    #[test]
493    fn test_error_handler_config_log_only() {
494        let config = ErrorHandlerConfig::log_only();
495        assert!(config.dlc_uri.is_none());
496        assert!(config.policies.is_empty());
497    }
498
499    #[test]
500    fn test_error_handler_config_dlc() {
501        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
502        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
503    }
504
505    #[test]
506    fn test_error_handler_config_with_policy() {
507        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
508            .on_exception(|e| matches!(e, CamelError::Io(_)))
509            .retry(2)
510            .handled_by("log:io-errors")
511            .build();
512        assert_eq!(config.policies.len(), 1);
513        let p = &config.policies[0];
514        assert!(p.retry.is_some());
515        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
516        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
517    }
518
519    #[test]
520    fn test_jitter_applies_randomness() {
521        let policy = RedeliveryPolicy::new(3)
522            .with_initial_delay(Duration::from_millis(100))
523            .with_jitter(0.5);
524
525        let mut delays = std::collections::HashSet::new();
526        for _ in 0..10 {
527            delays.insert(policy.delay_for(0));
528        }
529
530        assert!(delays.len() > 1, "jitter should produce varying delays");
531    }
532
533    #[test]
534    fn test_jitter_stays_within_bounds() {
535        let policy = RedeliveryPolicy::new(3)
536            .with_initial_delay(Duration::from_millis(100))
537            .with_jitter(0.5);
538
539        for _ in 0..100 {
540            let delay = policy.delay_for(0);
541            assert!(
542                delay >= Duration::from_millis(50),
543                "delay too low: {:?}",
544                delay
545            );
546            assert!(
547                delay <= Duration::from_millis(150),
548                "delay too high: {:?}",
549                delay
550            );
551        }
552    }
553
554    #[test]
555    fn test_max_attempts_zero_means_no_retries() {
556        let policy = RedeliveryPolicy::new(0);
557        assert_eq!(policy.max_attempts, 0);
558    }
559
560    #[test]
561    fn test_jitter_zero_produces_exact_delay() {
562        let policy = RedeliveryPolicy::new(3)
563            .with_initial_delay(Duration::from_millis(100))
564            .with_jitter(0.0);
565
566        for _ in 0..10 {
567            let delay = policy.delay_for(0);
568            assert_eq!(delay, Duration::from_millis(100));
569        }
570    }
571
572    #[test]
573    fn test_jitter_one_produces_wide_range() {
574        let policy = RedeliveryPolicy::new(3)
575            .with_initial_delay(Duration::from_millis(100))
576            .with_jitter(1.0);
577
578        for _ in 0..100 {
579            let delay = policy.delay_for(0);
580            assert!(
581                delay >= Duration::from_millis(0),
582                "delay should be >= 0, got {:?}",
583                delay
584            );
585            assert!(
586                delay <= Duration::from_millis(200),
587                "delay should be <= 200ms, got {:?}",
588                delay
589            );
590        }
591    }
592
593    #[test]
594    fn test_redelivery_policy_builder_methods_apply_values() {
595        let p = RedeliveryPolicy::new(5)
596            .with_initial_delay(Duration::from_millis(250))
597            .with_multiplier(3.0)
598            .with_max_delay(Duration::from_secs(2))
599            .with_jitter(2.0);
600
601        assert_eq!(p.initial_delay, Duration::from_millis(250));
602        assert_eq!(p.multiplier, 3.0);
603        assert_eq!(p.max_delay, Duration::from_secs(2));
604        assert_eq!(p.jitter_factor, 1.0);
605    }
606
607    #[test]
608    fn test_with_jitter_clamps_low_bound() {
609        let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
610        assert_eq!(p.jitter_factor, 0.0);
611    }
612
613    #[test]
614    fn test_delay_for_exponential_growth_and_cap() {
615        let p = RedeliveryPolicy::new(3)
616            .with_initial_delay(Duration::from_millis(100))
617            .with_multiplier(2.0)
618            .with_max_delay(Duration::from_millis(250));
619
620        assert_eq!(p.delay_for(0), Duration::from_millis(100));
621        assert_eq!(p.delay_for(1), Duration::from_millis(200));
622        assert_eq!(p.delay_for(2), Duration::from_millis(250));
623        assert_eq!(p.delay_for(20), Duration::from_millis(250));
624    }
625
626    #[test]
627    fn test_exception_policy_builder_backoff_and_jitter() {
628        let config = ErrorHandlerConfig::log_only()
629            .on_exception(|e| matches!(e, CamelError::Io(_)))
630            .retry(4)
631            .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
632            .with_jitter(1.5)
633            .build();
634
635        let retry = config.policies[0].retry.as_ref().unwrap();
636        assert_eq!(retry.max_attempts, 4);
637        assert_eq!(retry.initial_delay, Duration::from_millis(10));
638        assert_eq!(retry.multiplier, 1.5);
639        assert_eq!(retry.max_delay, Duration::from_millis(40));
640        assert_eq!(retry.jitter_factor, 1.0);
641    }
642
643    #[test]
644    fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
645        let config = ErrorHandlerConfig::log_only()
646            .on_exception(|_| true)
647            .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
648            .with_jitter(0.8)
649            .build();
650
651        assert!(config.policies[0].retry.is_none());
652    }
653
654    #[test]
655    fn test_exception_policy_clone_preserves_behavior_and_fields() {
656        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
657        let mut configured = policy;
658        configured.retry = Some(RedeliveryPolicy::new(2));
659        configured.handled_by = Some("log:route-errors".to_string());
660
661        let cloned = configured.clone();
662        assert!((cloned.matches)(&CamelError::RouteError("x".into())));
663        assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
664        assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
665    }
666
667    #[test]
668    fn test_delay_for_respects_max_delay_with_jitter() {
669        let policy = RedeliveryPolicy::new(5)
670            .with_initial_delay(Duration::from_millis(200))
671            .with_multiplier(10.0)
672            .with_max_delay(Duration::from_millis(500))
673            .with_jitter(0.2);
674
675        for _ in 0..30 {
676            let delay = policy.delay_for(4);
677            assert!(delay <= Duration::from_millis(600));
678            assert!(delay >= Duration::from_millis(400));
679        }
680    }
681
682    #[test]
683    fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
684        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
685            .on_exception(|e| matches!(e, CamelError::Io(_)))
686            .retry(1)
687            .build()
688            .on_exception(|e| matches!(e, CamelError::RouteError(_)))
689            .handled_by("log:routes")
690            .build();
691
692        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
693        assert_eq!(config.policies.len(), 2);
694        assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
695        assert!((config.policies[1].matches)(&CamelError::RouteError(
696            "x".into()
697        )));
698    }
699
700    #[test]
701    fn test_backoff_without_retry_does_not_create_retry_config() {
702        let config = ErrorHandlerConfig::log_only()
703            .on_exception(|_| true)
704            .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
705            .build();
706
707        assert!(config.policies[0].retry.is_none());
708    }
709
710    #[test]
711    fn test_exception_disposition_default_is_propagate() {
712        assert_eq!(
713            ExceptionDisposition::default(),
714            ExceptionDisposition::Propagate
715        );
716    }
717
718    #[test]
719    fn test_exception_policy_new_has_propagate_disposition() {
720        let p = ExceptionPolicy::new(|_| true);
721        assert_eq!(p.disposition, ExceptionDisposition::Propagate);
722    }
723
724    #[test]
725    fn test_policy_id_equality() {
726        assert_eq!(PolicyId(0), PolicyId(0));
727        assert_ne!(PolicyId(0), PolicyId(1));
728    }
729
730    #[test]
731    fn test_builder_continued_sets_disposition() {
732        let cfg = ErrorHandlerConfig::log_only()
733            .on_exception(|_| true)
734            .continued(true)
735            .build();
736        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
737    }
738
739    #[test]
740    fn test_builder_propagate_sets_disposition() {
741        let cfg = ErrorHandlerConfig::log_only()
742            .on_exception(|_| true)
743            .propagate()
744            .build();
745        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
746    }
747
748    #[test]
749    fn test_builder_handled_true_still_works() {
750        let cfg = ErrorHandlerConfig::log_only()
751            .on_exception(|_| true)
752            .handled(true)
753            .build();
754        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
755    }
756}