Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor};
5
6/// Camel-compatible header names for redelivery state.
7pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11/// Redelivery policy with exponential backoff and optional jitter.
12#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14    pub max_attempts: u32,
15    pub initial_delay: Duration,
16    pub multiplier: f64,
17    pub max_delay: Duration,
18    pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22    /// Create a new policy with default delays (100ms initial, 2x multiplier, 10s max, no jitter).
23    ///
24    /// Note: `max_attempts = 0` means no retries (immediate failure to DLC/handler).
25    /// Use `max_attempts > 0` to enable retry behavior.
26    pub fn new(max_attempts: u32) -> Self {
27        Self {
28            max_attempts,
29            initial_delay: Duration::from_millis(100),
30            multiplier: 2.0,
31            max_delay: Duration::from_secs(10),
32            jitter_factor: 0.0,
33        }
34    }
35
36    /// Override the initial delay before the first retry.
37    pub fn with_initial_delay(mut self, d: Duration) -> Self {
38        self.initial_delay = d;
39        self
40    }
41
42    /// Override the backoff multiplier applied after each attempt.
43    pub fn with_multiplier(mut self, m: f64) -> Self {
44        self.multiplier = m;
45        self
46    }
47
48    /// Cap the maximum delay between retries.
49    pub fn with_max_delay(mut self, d: Duration) -> Self {
50        self.max_delay = d;
51        self
52    }
53
54    /// Set jitter factor (0.0 = no jitter, 0.2 = ±20% randomization).
55    ///
56    /// Recommended values: 0.1-0.3 (10-30%) for most use cases.
57    /// Helps prevent thundering herd problems in distributed systems
58    /// by adding randomization to retry timing.
59    pub fn with_jitter(mut self, j: f64) -> Self {
60        self.jitter_factor = j.clamp(0.0, 1.0);
61        self
62    }
63
64    /// Compute the sleep duration before retry attempt N (0-indexed) with jitter applied.
65    pub fn delay_for(&self, attempt: u32) -> Duration {
66        let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69        if self.jitter_factor > 0.0 {
70            let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71            Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72        } else {
73            Duration::from_millis(capped_ms as u64)
74        }
75    }
76}
77
78/// Configures what happens after an error handler processes a matched exception.
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum ExceptionDisposition {
81    /// After retry exhaustion / on_steps / DLC, re-throw to upstream.
82    #[default]
83    Propagate,
84    /// Suppress re-throw. The handler's exchange is the final result.
85    Handled,
86    /// Clear the error. The pipeline continues to the NEXT step.
87    Continued,
88}
89
90/// Opaque identifier for a matched ExceptionPolicy within a RouteErrorHandler.
91/// Index into the policies Vec — valid only within the handler that created it.
92/// If the policies vec is reordered or filtered, indices become stale.
93/// Safe because policies are immutable after handler construction.
94#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95pub struct PolicyId(pub usize);
96
97/// Result of Phase 1 (retry) of error handling.
98pub enum RetryOutcome {
99    /// Retry succeeded — pipeline continues normally.
100    Recovered(Exchange),
101    /// Retries exhausted or no retry configured.
102    Exhausted {
103        exchange: Exchange,
104        error: CamelError,
105        policy: Option<PolicyId>,
106    },
107}
108
109/// Result of Phase 2 (handle) of step error handling.
110/// Handled and Continued MUST have Exchange.error cleared.
111pub enum StepDisposition {
112    Propagate(CamelError),
113    Handled(Exchange),
114    Continued(Exchange),
115}
116
117/// Identifies which boundary gate produced an infrastructure error.
118#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum BoundaryKind {
120    Security,
121    CircuitBreaker,
122    Readiness,
123}
124
125/// A rule that matches specific errors and defines retry + redirect behaviour.
126pub struct ExceptionPolicy {
127    /// Predicate: returns `true` if this policy applies to the given error.
128    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
129    /// Optional retry configuration; if absent, no retries are attempted.
130    pub retry: Option<RedeliveryPolicy>,
131    /// Optional URI of a specific endpoint to route failed exchanges to.
132    pub handled_by: Option<String>,
133    /// Optional custom pipeline executed when this policy triggers.
134    pub on_steps: Option<SyncBoxProcessor>,
135    /// What to do after this policy's handler runs.
136    pub disposition: ExceptionDisposition,
137}
138
139impl ExceptionPolicy {
140    /// Create a new policy that matches errors using the given predicate.
141    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
142        Self {
143            matches: Arc::new(matches),
144            retry: None,
145            handled_by: None,
146            on_steps: None,
147            disposition: ExceptionDisposition::Propagate,
148        }
149    }
150}
151
152impl Clone for ExceptionPolicy {
153    fn clone(&self) -> Self {
154        Self {
155            matches: Arc::clone(&self.matches),
156            retry: self.retry.clone(),
157            handled_by: self.handled_by.clone(),
158            on_steps: self.on_steps.clone(),
159            disposition: self.disposition,
160        }
161    }
162}
163
164/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
165#[derive(Clone)]
166pub struct ErrorHandlerConfig {
167    /// URI of the Dead Letter Channel endpoint (None = log only).
168    pub dlc_uri: Option<String>,
169    /// Per-exception policies evaluated in order; first match wins.
170    pub policies: Vec<ExceptionPolicy>,
171}
172
173impl ErrorHandlerConfig {
174    /// Log-only error handler: errors are logged but not forwarded anywhere.
175    pub fn log_only() -> Self {
176        Self {
177            dlc_uri: None,
178            policies: Vec::new(),
179        }
180    }
181
182    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
183    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
184        Self {
185            dlc_uri: Some(uri.into()),
186            policies: Vec::new(),
187        }
188    }
189
190    /// Start building an `ExceptionPolicy` attached to this config.
191    pub fn on_exception(
192        self,
193        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
194    ) -> ExceptionPolicyBuilder {
195        ExceptionPolicyBuilder {
196            config: self,
197            policy: ExceptionPolicy::new(matches),
198        }
199    }
200}
201
202/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
203pub struct ExceptionPolicyBuilder {
204    config: ErrorHandlerConfig,
205    policy: ExceptionPolicy,
206}
207
208impl ExceptionPolicyBuilder {
209    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
210    pub fn retry(mut self, max_attempts: u32) -> Self {
211        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
212        self
213    }
214
215    /// Override backoff parameters for the retry (call after `.retry()`).
216    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
217        if let Some(ref mut p) = self.policy.retry {
218            p.initial_delay = initial;
219            p.multiplier = multiplier;
220            p.max_delay = max;
221        }
222        self
223    }
224
225    /// Set jitter factor for retry delays (call after `.retry()`).
226    /// Valid range: 0.0 (no jitter) to 1.0 (±100% randomization).
227    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
228        if let Some(ref mut p) = self.policy.retry {
229            p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
230        }
231        self
232    }
233
234    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
235    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
236        self.policy.handled_by = Some(uri.into());
237        self
238    }
239
240    /// Attach a custom pipeline to execute when this policy triggers.
241    pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
242        self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
243        self
244    }
245
246    /// Mark the exception as handled (suppresses re-throw to upstream).
247    pub fn handled(mut self, handled: bool) -> Self {
248        self.policy.disposition = if handled {
249            ExceptionDisposition::Handled
250        } else {
251            ExceptionDisposition::Propagate
252        };
253        self
254    }
255
256    /// Mark the exception as continued (clear error, pipeline continues to next step).
257    pub fn continued(mut self, continued: bool) -> Self {
258        self.policy.disposition = if continued {
259            ExceptionDisposition::Continued
260        } else {
261            ExceptionDisposition::Propagate
262        };
263        self
264    }
265
266    /// Explicitly set disposition to Propagate (default, no-op).
267    pub fn propagate(mut self) -> Self {
268        self.policy.disposition = ExceptionDisposition::Propagate;
269        self
270    }
271
272    /// Finish this policy and return the updated config.
273    pub fn build(mut self) -> ErrorHandlerConfig {
274        self.config.policies.push(self.policy);
275        self.config
276    }
277}
278
279// Backwards compatibility alias
280#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
281pub type ExponentialBackoff = RedeliveryPolicy;
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use crate::BoxProcessor;
287    use crate::CamelError;
288    use std::time::Duration;
289
290    #[test]
291    fn test_redelivery_policy_defaults() {
292        let p = RedeliveryPolicy::new(3);
293        assert_eq!(p.max_attempts, 3);
294        assert_eq!(p.initial_delay, Duration::from_millis(100));
295        assert_eq!(p.multiplier, 2.0);
296        assert_eq!(p.max_delay, Duration::from_secs(10));
297        assert_eq!(p.jitter_factor, 0.0);
298    }
299
300    #[test]
301    fn test_exception_policy_matches() {
302        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
303        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
304        assert!(!(policy.matches)(&CamelError::Io("io".into())));
305    }
306
307    #[test]
308    fn test_error_handler_config_log_only() {
309        let config = ErrorHandlerConfig::log_only();
310        assert!(config.dlc_uri.is_none());
311        assert!(config.policies.is_empty());
312    }
313
314    #[test]
315    fn test_error_handler_config_dlc() {
316        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
317        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
318    }
319
320    #[test]
321    fn test_error_handler_config_with_policy() {
322        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
323            .on_exception(|e| matches!(e, CamelError::Io(_)))
324            .retry(2)
325            .handled_by("log:io-errors")
326            .build();
327        assert_eq!(config.policies.len(), 1);
328        let p = &config.policies[0];
329        assert!(p.retry.is_some());
330        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
331        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
332    }
333
334    #[test]
335    fn test_jitter_applies_randomness() {
336        let policy = RedeliveryPolicy::new(3)
337            .with_initial_delay(Duration::from_millis(100))
338            .with_jitter(0.5);
339
340        let mut delays = std::collections::HashSet::new();
341        for _ in 0..10 {
342            delays.insert(policy.delay_for(0));
343        }
344
345        assert!(delays.len() > 1, "jitter should produce varying delays");
346    }
347
348    #[test]
349    fn test_jitter_stays_within_bounds() {
350        let policy = RedeliveryPolicy::new(3)
351            .with_initial_delay(Duration::from_millis(100))
352            .with_jitter(0.5);
353
354        for _ in 0..100 {
355            let delay = policy.delay_for(0);
356            assert!(
357                delay >= Duration::from_millis(50),
358                "delay too low: {:?}",
359                delay
360            );
361            assert!(
362                delay <= Duration::from_millis(150),
363                "delay too high: {:?}",
364                delay
365            );
366        }
367    }
368
369    #[test]
370    fn test_max_attempts_zero_means_no_retries() {
371        let policy = RedeliveryPolicy::new(0);
372        assert_eq!(policy.max_attempts, 0);
373    }
374
375    #[test]
376    fn test_jitter_zero_produces_exact_delay() {
377        let policy = RedeliveryPolicy::new(3)
378            .with_initial_delay(Duration::from_millis(100))
379            .with_jitter(0.0);
380
381        for _ in 0..10 {
382            let delay = policy.delay_for(0);
383            assert_eq!(delay, Duration::from_millis(100));
384        }
385    }
386
387    #[test]
388    fn test_jitter_one_produces_wide_range() {
389        let policy = RedeliveryPolicy::new(3)
390            .with_initial_delay(Duration::from_millis(100))
391            .with_jitter(1.0);
392
393        for _ in 0..100 {
394            let delay = policy.delay_for(0);
395            assert!(
396                delay >= Duration::from_millis(0),
397                "delay should be >= 0, got {:?}",
398                delay
399            );
400            assert!(
401                delay <= Duration::from_millis(200),
402                "delay should be <= 200ms, got {:?}",
403                delay
404            );
405        }
406    }
407
408    #[test]
409    fn test_redelivery_policy_builder_methods_apply_values() {
410        let p = RedeliveryPolicy::new(5)
411            .with_initial_delay(Duration::from_millis(250))
412            .with_multiplier(3.0)
413            .with_max_delay(Duration::from_secs(2))
414            .with_jitter(2.0);
415
416        assert_eq!(p.initial_delay, Duration::from_millis(250));
417        assert_eq!(p.multiplier, 3.0);
418        assert_eq!(p.max_delay, Duration::from_secs(2));
419        assert_eq!(p.jitter_factor, 1.0);
420    }
421
422    #[test]
423    fn test_with_jitter_clamps_low_bound() {
424        let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
425        assert_eq!(p.jitter_factor, 0.0);
426    }
427
428    #[test]
429    fn test_delay_for_exponential_growth_and_cap() {
430        let p = RedeliveryPolicy::new(3)
431            .with_initial_delay(Duration::from_millis(100))
432            .with_multiplier(2.0)
433            .with_max_delay(Duration::from_millis(250));
434
435        assert_eq!(p.delay_for(0), Duration::from_millis(100));
436        assert_eq!(p.delay_for(1), Duration::from_millis(200));
437        assert_eq!(p.delay_for(2), Duration::from_millis(250));
438        assert_eq!(p.delay_for(20), Duration::from_millis(250));
439    }
440
441    #[test]
442    fn test_exception_policy_builder_backoff_and_jitter() {
443        let config = ErrorHandlerConfig::log_only()
444            .on_exception(|e| matches!(e, CamelError::Io(_)))
445            .retry(4)
446            .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
447            .with_jitter(1.5)
448            .build();
449
450        let retry = config.policies[0].retry.as_ref().unwrap();
451        assert_eq!(retry.max_attempts, 4);
452        assert_eq!(retry.initial_delay, Duration::from_millis(10));
453        assert_eq!(retry.multiplier, 1.5);
454        assert_eq!(retry.max_delay, Duration::from_millis(40));
455        assert_eq!(retry.jitter_factor, 1.0);
456    }
457
458    #[test]
459    fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
460        let config = ErrorHandlerConfig::log_only()
461            .on_exception(|_| true)
462            .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
463            .with_jitter(0.8)
464            .build();
465
466        assert!(config.policies[0].retry.is_none());
467    }
468
469    #[test]
470    fn test_exception_policy_clone_preserves_behavior_and_fields() {
471        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
472        let mut configured = policy;
473        configured.retry = Some(RedeliveryPolicy::new(2));
474        configured.handled_by = Some("log:route-errors".to_string());
475
476        let cloned = configured.clone();
477        assert!((cloned.matches)(&CamelError::RouteError("x".into())));
478        assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
479        assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
480    }
481
482    #[test]
483    fn test_delay_for_respects_max_delay_with_jitter() {
484        let policy = RedeliveryPolicy::new(5)
485            .with_initial_delay(Duration::from_millis(200))
486            .with_multiplier(10.0)
487            .with_max_delay(Duration::from_millis(500))
488            .with_jitter(0.2);
489
490        for _ in 0..30 {
491            let delay = policy.delay_for(4);
492            assert!(delay <= Duration::from_millis(600));
493            assert!(delay >= Duration::from_millis(400));
494        }
495    }
496
497    #[test]
498    fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
499        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
500            .on_exception(|e| matches!(e, CamelError::Io(_)))
501            .retry(1)
502            .build()
503            .on_exception(|e| matches!(e, CamelError::RouteError(_)))
504            .handled_by("log:routes")
505            .build();
506
507        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
508        assert_eq!(config.policies.len(), 2);
509        assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
510        assert!((config.policies[1].matches)(&CamelError::RouteError(
511            "x".into()
512        )));
513    }
514
515    #[test]
516    fn test_backoff_without_retry_does_not_create_retry_config() {
517        let config = ErrorHandlerConfig::log_only()
518            .on_exception(|_| true)
519            .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
520            .build();
521
522        assert!(config.policies[0].retry.is_none());
523    }
524
525    #[test]
526    fn test_exception_disposition_default_is_propagate() {
527        assert_eq!(
528            ExceptionDisposition::default(),
529            ExceptionDisposition::Propagate
530        );
531    }
532
533    #[test]
534    fn test_exception_policy_new_has_propagate_disposition() {
535        let p = ExceptionPolicy::new(|_| true);
536        assert_eq!(p.disposition, ExceptionDisposition::Propagate);
537    }
538
539    #[test]
540    fn test_policy_id_equality() {
541        assert_eq!(PolicyId(0), PolicyId(0));
542        assert_ne!(PolicyId(0), PolicyId(1));
543    }
544
545    #[test]
546    fn test_builder_continued_sets_disposition() {
547        let cfg = ErrorHandlerConfig::log_only()
548            .on_exception(|_| true)
549            .continued(true)
550            .build();
551        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
552    }
553
554    #[test]
555    fn test_builder_propagate_sets_disposition() {
556        let cfg = ErrorHandlerConfig::log_only()
557            .on_exception(|_| true)
558            .propagate()
559            .build();
560        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
561    }
562
563    #[test]
564    fn test_builder_handled_true_still_works() {
565        let cfg = ErrorHandlerConfig::log_only()
566            .on_exception(|_| true)
567            .handled(true)
568            .build();
569        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
570    }
571}