Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor};
5
6/// Camel-compatible header names for redelivery state.
7pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11/// Redelivery policy with exponential backoff and optional jitter.
12#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14    pub max_attempts: u32,
15    pub initial_delay: Duration,
16    pub multiplier: f64,
17    pub max_delay: Duration,
18    pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22    /// Create a new policy with default delays (100ms initial, 2x multiplier, 10s max, no jitter).
23    ///
24    /// Note: `max_attempts = 0` means no retries (immediate failure to DLC/handler).
25    /// Use `max_attempts > 0` to enable retry behavior.
26    pub fn new(max_attempts: u32) -> Self {
27        Self {
28            max_attempts,
29            initial_delay: Duration::from_millis(100),
30            multiplier: 2.0,
31            max_delay: Duration::from_secs(10),
32            jitter_factor: 0.0,
33        }
34    }
35
36    /// Override the initial delay before the first retry.
37    pub fn with_initial_delay(mut self, d: Duration) -> Self {
38        self.initial_delay = d;
39        self
40    }
41
42    /// Override the backoff multiplier applied after each attempt.
43    pub fn with_multiplier(mut self, m: f64) -> Self {
44        self.multiplier = m;
45        self
46    }
47
48    /// Cap the maximum delay between retries.
49    pub fn with_max_delay(mut self, d: Duration) -> Self {
50        self.max_delay = d;
51        self
52    }
53
54    /// Set jitter factor (0.0 = no jitter, 0.2 = ±20% randomization).
55    ///
56    /// Recommended values: 0.1-0.3 (10-30%) for most use cases.
57    /// Helps prevent thundering herd problems in distributed systems
58    /// by adding randomization to retry timing.
59    pub fn with_jitter(mut self, j: f64) -> Self {
60        self.jitter_factor = j.clamp(0.0, 1.0);
61        self
62    }
63
64    /// Compute the sleep duration before retry attempt N (0-indexed) with jitter applied.
65    pub fn delay_for(&self, attempt: u32) -> Duration {
66        let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69        if self.jitter_factor > 0.0 {
70            let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71            Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72        } else {
73            Duration::from_millis(capped_ms as u64)
74        }
75    }
76}
77
78/// Disposition for an exception handler or catch clause (ADR-0019).
79///
80/// # YAML casing
81///
82/// serde uses `lowercase` casing: `handled`, `propagate`, `continued`.
83///
84/// # Default divergence
85///
86/// `ExceptionDisposition::default()` returns `Propagate` (first variant per `#[derive(Default)]`),
87/// but the YAML + builder layers default to `Handled` for doTry catch clauses
88/// (via `default_handled_disposition()` in `camel-dsl/src/route_ast.rs` and the `DoCatchBuilder::do_catch_exception` constructor).
89/// Direct struct construction should explicitly set disposition rather than relying on `Default`.
90#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
92#[serde(rename_all = "lowercase")]
93pub enum ExceptionDisposition {
94    /// After retry exhaustion / on_steps / DLC, re-throw to upstream.
95    #[default]
96    Propagate,
97    /// Suppress re-throw. The handler's exchange is the final result.
98    Handled,
99    /// Clear the error. The pipeline continues to the NEXT step.
100    Continued,
101}
102
103/// Opaque identifier for a matched ExceptionPolicy within a RouteErrorHandler.
104/// Index into the policies Vec — valid only within the handler that created it.
105/// If the policies vec is reordered or filtered, indices become stale.
106/// Safe because policies are immutable after handler construction.
107#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub struct PolicyId(pub usize);
109
110/// Result of a single retry attempt by `RouteErrorHandler::retry_step`.
111#[derive(Debug)]
112pub enum RetryOutcome {
113    /// Retry succeeded; pipeline continues with this Exchange.
114    Recovered(Exchange),
115    /// Retry attempt returned `Stopped(ex)`. Bypasses `handle_step` —
116    /// Stop is successful control flow, not exhausted error handling.
117    /// Maps to `PipelineOutcome::Stopped(ex)` in `run_steps`.
118    Stopped(Exchange),
119    /// All retry attempts exhausted. Handler decides via `handle_step`.
120    Exhausted {
121        exchange: Exchange,
122        error: CamelError,
123        policy: Option<PolicyId>,
124    },
125}
126
127/// Object-safe retry abstraction. Unifies `BoxProcessor` and `OutcomeSegment`
128/// for `RouteErrorHandler::retry_step`. See ADR-0024 amendment (Phase 4).
129///
130/// `invoke` returns `PipelineOutcome` (not Tower `Result`), preserving
131/// `Stopped(ex)` state across retry attempts.
132pub trait RetryableStep: Send {
133    fn invoke<'a>(
134        &'a mut self,
135        exchange: Exchange,
136    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>;
137}
138
139/// BoxProcessor adapter: preserves readiness-error routing by calling
140/// `ServiceExt::ready().await` before `Service::call()`. Readiness
141/// failures (e.g. channel closed, circuit breaker open) become
142/// `PipelineOutcome::Failed`, NOT panic or silent skip.
143impl RetryableStep for crate::BoxProcessor {
144    fn invoke<'a>(
145        &'a mut self,
146        exchange: Exchange,
147    ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>> {
148        use tower::ServiceExt;
149        Box::pin(async move {
150            match self.ready().await {
151                Ok(ready_svc) => match tower::Service::call(ready_svc, exchange).await {
152                    Ok(ex) => PipelineOutcome::Completed(ex),
153                    Err(err) => PipelineOutcome::Failed(err),
154                },
155                Err(err) => PipelineOutcome::Failed(err),
156            }
157        })
158    }
159}
160
161/// Result of Phase 2 (handle) of step error handling.
162/// Handled and Continued MUST have Exchange.error cleared.
163pub enum StepDisposition {
164    Propagate(CamelError),
165    Handled(Exchange),
166    Continued(Exchange),
167}
168
169/// Identifies which boundary gate produced an infrastructure error.
170#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum BoundaryKind {
172    Security,
173    CircuitBreaker,
174    Readiness,
175}
176
177/// A rule that matches specific errors and defines retry + redirect behaviour.
178pub struct ExceptionPolicy {
179    /// Predicate: returns `true` if this policy applies to the given error.
180    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
181    /// Optional retry configuration; if absent, no retries are attempted.
182    pub retry: Option<RedeliveryPolicy>,
183    /// Optional URI of a specific endpoint to route failed exchanges to.
184    pub handled_by: Option<String>,
185    /// Optional custom pipeline executed when this policy triggers.
186    pub on_steps: Option<SyncBoxProcessor>,
187    /// What to do after this policy's handler runs.
188    pub disposition: ExceptionDisposition,
189}
190
191impl ExceptionPolicy {
192    /// Create a new policy that matches errors using the given predicate.
193    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
194        Self {
195            matches: Arc::new(matches),
196            retry: None,
197            handled_by: None,
198            on_steps: None,
199            disposition: ExceptionDisposition::Propagate,
200        }
201    }
202}
203
204impl Clone for ExceptionPolicy {
205    fn clone(&self) -> Self {
206        Self {
207            matches: Arc::clone(&self.matches),
208            retry: self.retry.clone(),
209            handled_by: self.handled_by.clone(),
210            on_steps: self.on_steps.clone(),
211            disposition: self.disposition,
212        }
213    }
214}
215
216/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
217#[derive(Clone)]
218pub struct ErrorHandlerConfig {
219    /// URI of the Dead Letter Channel endpoint (None = log only).
220    pub dlc_uri: Option<String>,
221    /// Per-exception policies evaluated in order; first match wins.
222    pub policies: Vec<ExceptionPolicy>,
223}
224
225impl ErrorHandlerConfig {
226    /// Log-only error handler: errors are logged but not forwarded anywhere.
227    pub fn log_only() -> Self {
228        Self {
229            dlc_uri: None,
230            policies: Vec::new(),
231        }
232    }
233
234    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
235    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
236        Self {
237            dlc_uri: Some(uri.into()),
238            policies: Vec::new(),
239        }
240    }
241
242    /// Start building an `ExceptionPolicy` attached to this config.
243    pub fn on_exception(
244        self,
245        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
246    ) -> ExceptionPolicyBuilder {
247        ExceptionPolicyBuilder {
248            config: self,
249            policy: ExceptionPolicy::new(matches),
250        }
251    }
252}
253
254/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
255pub struct ExceptionPolicyBuilder {
256    config: ErrorHandlerConfig,
257    policy: ExceptionPolicy,
258}
259
260impl ExceptionPolicyBuilder {
261    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
262    pub fn retry(mut self, max_attempts: u32) -> Self {
263        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
264        self
265    }
266
267    /// Override backoff parameters for the retry (call after `.retry()`).
268    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
269        if let Some(ref mut p) = self.policy.retry {
270            p.initial_delay = initial;
271            p.multiplier = multiplier;
272            p.max_delay = max;
273        }
274        self
275    }
276
277    /// Set jitter factor for retry delays (call after `.retry()`).
278    /// Valid range: 0.0 (no jitter) to 1.0 (±100% randomization).
279    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
280        if let Some(ref mut p) = self.policy.retry {
281            p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
282        }
283        self
284    }
285
286    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
287    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
288        self.policy.handled_by = Some(uri.into());
289        self
290    }
291
292    /// Attach a custom pipeline to execute when this policy triggers.
293    pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
294        self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
295        self
296    }
297
298    /// Mark the exception as handled (suppresses re-throw to upstream).
299    pub fn handled(mut self, handled: bool) -> Self {
300        self.policy.disposition = if handled {
301            ExceptionDisposition::Handled
302        } else {
303            ExceptionDisposition::Propagate
304        };
305        self
306    }
307
308    /// Mark the exception as continued (clear error, pipeline continues to next step).
309    pub fn continued(mut self, continued: bool) -> Self {
310        self.policy.disposition = if continued {
311            ExceptionDisposition::Continued
312        } else {
313            ExceptionDisposition::Propagate
314        };
315        self
316    }
317
318    /// Explicitly set disposition to Propagate (default, no-op).
319    pub fn propagate(mut self) -> Self {
320        self.policy.disposition = ExceptionDisposition::Propagate;
321        self
322    }
323
324    /// Finish this policy and return the updated config.
325    pub fn build(mut self) -> ErrorHandlerConfig {
326        self.config.policies.push(self.policy);
327        self.config
328    }
329}
330
331// Backwards compatibility alias
332#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
333pub type ExponentialBackoff = RedeliveryPolicy;
334
335#[cfg(test)]
336mod retryable_step_tests {
337    use super::*;
338    use crate::{BoxProcessor, CamelError, Exchange, Message, PipelineOutcome};
339    use std::future::Future;
340    use std::pin::Pin;
341    use std::sync::Arc;
342    use std::sync::atomic::{AtomicUsize, Ordering};
343
344    struct CountingProcessor {
345        call_count: Arc<AtomicUsize>,
346        succeed: bool,
347    }
348
349    impl tower::Service<Exchange> for CountingProcessor {
350        type Response = Exchange;
351        type Error = CamelError;
352        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
353
354        fn poll_ready(
355            &mut self,
356            _cx: &mut std::task::Context<'_>,
357        ) -> std::task::Poll<Result<(), Self::Error>> {
358            std::task::Poll::Ready(Ok(()))
359        }
360
361        fn call(&mut self, exchange: Exchange) -> Self::Future {
362            let count = self.call_count.clone();
363            let succeed = self.succeed;
364            Box::pin(async move {
365                count.fetch_add(1, Ordering::SeqCst);
366                if succeed {
367                    Ok(exchange)
368                } else {
369                    Err(CamelError::ProcessorError("fail".into()))
370                }
371            })
372        }
373    }
374
375    impl Clone for CountingProcessor {
376        fn clone(&self) -> Self {
377            Self {
378                call_count: self.call_count.clone(),
379                succeed: self.succeed,
380            }
381        }
382    }
383
384    #[tokio::test]
385    async fn boxprocessor_adapter_maps_ok_to_completed() {
386        let count = Arc::new(AtomicUsize::new(0));
387        let processor = CountingProcessor {
388            call_count: count.clone(),
389            succeed: true,
390        };
391        let bp: BoxProcessor = BoxProcessor::new(processor);
392        let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
393        let ex = Exchange::new(Message::new("hello"));
394        let outcome = retryable.invoke(ex).await;
395        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
396        assert_eq!(count.load(Ordering::SeqCst), 1);
397    }
398
399    #[tokio::test]
400    async fn boxprocessor_adapter_maps_err_to_failed() {
401        let processor = CountingProcessor {
402            call_count: Arc::new(AtomicUsize::new(0)),
403            succeed: false,
404        };
405        let bp: BoxProcessor = BoxProcessor::new(processor);
406        let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
407        let ex = Exchange::new(Message::new("hello"));
408        let outcome = retryable.invoke(ex).await;
409        assert!(matches!(outcome, PipelineOutcome::Failed(_)));
410    }
411
412    #[tokio::test]
413    async fn boxprocessor_readiness_error_propagates_to_failed() {
414        struct AlwaysNotReady;
415
416        impl tower::Service<Exchange> for AlwaysNotReady {
417            type Response = Exchange;
418            type Error = CamelError;
419            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
420
421            fn poll_ready(
422                &mut self,
423                _cx: &mut std::task::Context<'_>,
424            ) -> std::task::Poll<Result<(), Self::Error>> {
425                std::task::Poll::Ready(Err(CamelError::ProcessorError(
426                    "readiness failed: consumer closed".into(),
427                )))
428            }
429
430            fn call(&mut self, _ex: Exchange) -> Self::Future {
431                Box::pin(async {
432                    unreachable!("call() must not be reached when poll_ready errors")
433                })
434            }
435        }
436
437        impl Clone for AlwaysNotReady {
438            fn clone(&self) -> Self {
439                AlwaysNotReady
440            }
441        }
442
443        let bp: BoxProcessor = BoxProcessor::new(AlwaysNotReady);
444        let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
445        let ex = Exchange::new(Message::new("hello"));
446        let outcome = retryable.invoke(ex).await;
447        match outcome {
448            PipelineOutcome::Failed(err) => {
449                let msg = err.to_string();
450                assert!(
451                    msg.contains("readiness") || msg.contains("consumer"),
452                    "readiness error message should be preserved, got: {msg}"
453                );
454            }
455            other => panic!(
456                "readiness failure must map to PipelineOutcome::Failed, got {:?}",
457                other
458            ),
459        }
460    }
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use crate::BoxProcessor;
467    use crate::CamelError;
468    use std::time::Duration;
469
470    #[test]
471    fn test_redelivery_policy_defaults() {
472        let p = RedeliveryPolicy::new(3);
473        assert_eq!(p.max_attempts, 3);
474        assert_eq!(p.initial_delay, Duration::from_millis(100));
475        assert_eq!(p.multiplier, 2.0);
476        assert_eq!(p.max_delay, Duration::from_secs(10));
477        assert_eq!(p.jitter_factor, 0.0);
478    }
479
480    #[test]
481    fn test_exception_policy_matches() {
482        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
483        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
484        assert!(!(policy.matches)(&CamelError::Io("io".into())));
485    }
486
487    #[test]
488    fn test_error_handler_config_log_only() {
489        let config = ErrorHandlerConfig::log_only();
490        assert!(config.dlc_uri.is_none());
491        assert!(config.policies.is_empty());
492    }
493
494    #[test]
495    fn test_error_handler_config_dlc() {
496        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
497        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
498    }
499
500    #[test]
501    fn test_error_handler_config_with_policy() {
502        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
503            .on_exception(|e| matches!(e, CamelError::Io(_)))
504            .retry(2)
505            .handled_by("log:io-errors")
506            .build();
507        assert_eq!(config.policies.len(), 1);
508        let p = &config.policies[0];
509        assert!(p.retry.is_some());
510        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
511        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
512    }
513
514    #[test]
515    fn test_jitter_applies_randomness() {
516        let policy = RedeliveryPolicy::new(3)
517            .with_initial_delay(Duration::from_millis(100))
518            .with_jitter(0.5);
519
520        let mut delays = std::collections::HashSet::new();
521        for _ in 0..10 {
522            delays.insert(policy.delay_for(0));
523        }
524
525        assert!(delays.len() > 1, "jitter should produce varying delays");
526    }
527
528    #[test]
529    fn test_jitter_stays_within_bounds() {
530        let policy = RedeliveryPolicy::new(3)
531            .with_initial_delay(Duration::from_millis(100))
532            .with_jitter(0.5);
533
534        for _ in 0..100 {
535            let delay = policy.delay_for(0);
536            assert!(
537                delay >= Duration::from_millis(50),
538                "delay too low: {:?}",
539                delay
540            );
541            assert!(
542                delay <= Duration::from_millis(150),
543                "delay too high: {:?}",
544                delay
545            );
546        }
547    }
548
549    #[test]
550    fn test_max_attempts_zero_means_no_retries() {
551        let policy = RedeliveryPolicy::new(0);
552        assert_eq!(policy.max_attempts, 0);
553    }
554
555    #[test]
556    fn test_jitter_zero_produces_exact_delay() {
557        let policy = RedeliveryPolicy::new(3)
558            .with_initial_delay(Duration::from_millis(100))
559            .with_jitter(0.0);
560
561        for _ in 0..10 {
562            let delay = policy.delay_for(0);
563            assert_eq!(delay, Duration::from_millis(100));
564        }
565    }
566
567    #[test]
568    fn test_jitter_one_produces_wide_range() {
569        let policy = RedeliveryPolicy::new(3)
570            .with_initial_delay(Duration::from_millis(100))
571            .with_jitter(1.0);
572
573        for _ in 0..100 {
574            let delay = policy.delay_for(0);
575            assert!(
576                delay >= Duration::from_millis(0),
577                "delay should be >= 0, got {:?}",
578                delay
579            );
580            assert!(
581                delay <= Duration::from_millis(200),
582                "delay should be <= 200ms, got {:?}",
583                delay
584            );
585        }
586    }
587
588    #[test]
589    fn test_redelivery_policy_builder_methods_apply_values() {
590        let p = RedeliveryPolicy::new(5)
591            .with_initial_delay(Duration::from_millis(250))
592            .with_multiplier(3.0)
593            .with_max_delay(Duration::from_secs(2))
594            .with_jitter(2.0);
595
596        assert_eq!(p.initial_delay, Duration::from_millis(250));
597        assert_eq!(p.multiplier, 3.0);
598        assert_eq!(p.max_delay, Duration::from_secs(2));
599        assert_eq!(p.jitter_factor, 1.0);
600    }
601
602    #[test]
603    fn test_with_jitter_clamps_low_bound() {
604        let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
605        assert_eq!(p.jitter_factor, 0.0);
606    }
607
608    #[test]
609    fn test_delay_for_exponential_growth_and_cap() {
610        let p = RedeliveryPolicy::new(3)
611            .with_initial_delay(Duration::from_millis(100))
612            .with_multiplier(2.0)
613            .with_max_delay(Duration::from_millis(250));
614
615        assert_eq!(p.delay_for(0), Duration::from_millis(100));
616        assert_eq!(p.delay_for(1), Duration::from_millis(200));
617        assert_eq!(p.delay_for(2), Duration::from_millis(250));
618        assert_eq!(p.delay_for(20), Duration::from_millis(250));
619    }
620
621    #[test]
622    fn test_exception_policy_builder_backoff_and_jitter() {
623        let config = ErrorHandlerConfig::log_only()
624            .on_exception(|e| matches!(e, CamelError::Io(_)))
625            .retry(4)
626            .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
627            .with_jitter(1.5)
628            .build();
629
630        let retry = config.policies[0].retry.as_ref().unwrap();
631        assert_eq!(retry.max_attempts, 4);
632        assert_eq!(retry.initial_delay, Duration::from_millis(10));
633        assert_eq!(retry.multiplier, 1.5);
634        assert_eq!(retry.max_delay, Duration::from_millis(40));
635        assert_eq!(retry.jitter_factor, 1.0);
636    }
637
638    #[test]
639    fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
640        let config = ErrorHandlerConfig::log_only()
641            .on_exception(|_| true)
642            .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
643            .with_jitter(0.8)
644            .build();
645
646        assert!(config.policies[0].retry.is_none());
647    }
648
649    #[test]
650    fn test_exception_policy_clone_preserves_behavior_and_fields() {
651        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
652        let mut configured = policy;
653        configured.retry = Some(RedeliveryPolicy::new(2));
654        configured.handled_by = Some("log:route-errors".to_string());
655
656        let cloned = configured.clone();
657        assert!((cloned.matches)(&CamelError::RouteError("x".into())));
658        assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
659        assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
660    }
661
662    #[test]
663    fn test_delay_for_respects_max_delay_with_jitter() {
664        let policy = RedeliveryPolicy::new(5)
665            .with_initial_delay(Duration::from_millis(200))
666            .with_multiplier(10.0)
667            .with_max_delay(Duration::from_millis(500))
668            .with_jitter(0.2);
669
670        for _ in 0..30 {
671            let delay = policy.delay_for(4);
672            assert!(delay <= Duration::from_millis(600));
673            assert!(delay >= Duration::from_millis(400));
674        }
675    }
676
677    #[test]
678    fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
679        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
680            .on_exception(|e| matches!(e, CamelError::Io(_)))
681            .retry(1)
682            .build()
683            .on_exception(|e| matches!(e, CamelError::RouteError(_)))
684            .handled_by("log:routes")
685            .build();
686
687        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
688        assert_eq!(config.policies.len(), 2);
689        assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
690        assert!((config.policies[1].matches)(&CamelError::RouteError(
691            "x".into()
692        )));
693    }
694
695    #[test]
696    fn test_backoff_without_retry_does_not_create_retry_config() {
697        let config = ErrorHandlerConfig::log_only()
698            .on_exception(|_| true)
699            .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
700            .build();
701
702        assert!(config.policies[0].retry.is_none());
703    }
704
705    #[test]
706    fn test_exception_disposition_default_is_propagate() {
707        assert_eq!(
708            ExceptionDisposition::default(),
709            ExceptionDisposition::Propagate
710        );
711    }
712
713    #[test]
714    fn test_exception_policy_new_has_propagate_disposition() {
715        let p = ExceptionPolicy::new(|_| true);
716        assert_eq!(p.disposition, ExceptionDisposition::Propagate);
717    }
718
719    #[test]
720    fn test_policy_id_equality() {
721        assert_eq!(PolicyId(0), PolicyId(0));
722        assert_ne!(PolicyId(0), PolicyId(1));
723    }
724
725    #[test]
726    fn test_builder_continued_sets_disposition() {
727        let cfg = ErrorHandlerConfig::log_only()
728            .on_exception(|_| true)
729            .continued(true)
730            .build();
731        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
732    }
733
734    #[test]
735    fn test_builder_propagate_sets_disposition() {
736        let cfg = ErrorHandlerConfig::log_only()
737            .on_exception(|_| true)
738            .propagate()
739            .build();
740        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
741    }
742
743    #[test]
744    fn test_builder_handled_true_still_works() {
745        let cfg = ErrorHandlerConfig::log_only()
746            .on_exception(|_| true)
747            .handled(true)
748            .build();
749        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
750    }
751}