Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, SyncBoxProcessor};
5
6/// Camel-compatible header names for redelivery state.
7pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11/// Redelivery policy with exponential backoff and optional jitter.
12#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14    pub max_attempts: u32,
15    pub initial_delay: Duration,
16    pub multiplier: f64,
17    pub max_delay: Duration,
18    pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22    /// Create a new policy with default delays (100ms initial, 2x multiplier, 10s max, no jitter).
23    ///
24    /// Note: `max_attempts = 0` means no retries (immediate failure to DLC/handler).
25    /// Use `max_attempts > 0` to enable retry behavior.
26    pub fn new(max_attempts: u32) -> Self {
27        Self {
28            max_attempts,
29            initial_delay: Duration::from_millis(100),
30            multiplier: 2.0,
31            max_delay: Duration::from_secs(10),
32            jitter_factor: 0.0,
33        }
34    }
35
36    /// Override the initial delay before the first retry.
37    pub fn with_initial_delay(mut self, d: Duration) -> Self {
38        self.initial_delay = d;
39        self
40    }
41
42    /// Override the backoff multiplier applied after each attempt.
43    pub fn with_multiplier(mut self, m: f64) -> Self {
44        self.multiplier = m;
45        self
46    }
47
48    /// Cap the maximum delay between retries.
49    pub fn with_max_delay(mut self, d: Duration) -> Self {
50        self.max_delay = d;
51        self
52    }
53
54    /// Set jitter factor (0.0 = no jitter, 0.2 = ±20% randomization).
55    ///
56    /// Recommended values: 0.1-0.3 (10-30%) for most use cases.
57    /// Helps prevent thundering herd problems in distributed systems
58    /// by adding randomization to retry timing.
59    pub fn with_jitter(mut self, j: f64) -> Self {
60        self.jitter_factor = j.clamp(0.0, 1.0);
61        self
62    }
63
64    /// Compute the sleep duration before retry attempt N (0-indexed) with jitter applied.
65    pub fn delay_for(&self, attempt: u32) -> Duration {
66        let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69        if self.jitter_factor > 0.0 {
70            let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71            Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72        } else {
73            Duration::from_millis(capped_ms as u64)
74        }
75    }
76}
77
78/// A rule that matches specific errors and defines retry + redirect behaviour.
79pub struct ExceptionPolicy {
80    /// Predicate: returns `true` if this policy applies to the given error.
81    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
82    /// Optional retry configuration; if absent, no retries are attempted.
83    pub retry: Option<RedeliveryPolicy>,
84    /// Optional URI of a specific endpoint to route failed exchanges to.
85    pub handled_by: Option<String>,
86    /// Optional custom pipeline executed when this policy triggers.
87    pub on_steps: Option<SyncBoxProcessor>,
88    /// Whether the exception is considered handled (suppresses re-throw).
89    pub handled: bool,
90}
91
92impl ExceptionPolicy {
93    /// Create a new policy that matches errors using the given predicate.
94    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
95        Self {
96            matches: Arc::new(matches),
97            retry: None,
98            handled_by: None,
99            on_steps: None,
100            handled: false,
101        }
102    }
103}
104
105impl Clone for ExceptionPolicy {
106    fn clone(&self) -> Self {
107        Self {
108            matches: Arc::clone(&self.matches),
109            retry: self.retry.clone(),
110            handled_by: self.handled_by.clone(),
111            on_steps: self.on_steps.clone(),
112            handled: self.handled,
113        }
114    }
115}
116
117/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
118#[derive(Clone)]
119pub struct ErrorHandlerConfig {
120    /// URI of the Dead Letter Channel endpoint (None = log only).
121    pub dlc_uri: Option<String>,
122    /// Per-exception policies evaluated in order; first match wins.
123    pub policies: Vec<ExceptionPolicy>,
124}
125
126impl ErrorHandlerConfig {
127    /// Log-only error handler: errors are logged but not forwarded anywhere.
128    pub fn log_only() -> Self {
129        Self {
130            dlc_uri: None,
131            policies: Vec::new(),
132        }
133    }
134
135    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
136    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
137        Self {
138            dlc_uri: Some(uri.into()),
139            policies: Vec::new(),
140        }
141    }
142
143    /// Start building an `ExceptionPolicy` attached to this config.
144    pub fn on_exception(
145        self,
146        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
147    ) -> ExceptionPolicyBuilder {
148        ExceptionPolicyBuilder {
149            config: self,
150            policy: ExceptionPolicy::new(matches),
151        }
152    }
153}
154
155/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
156pub struct ExceptionPolicyBuilder {
157    config: ErrorHandlerConfig,
158    policy: ExceptionPolicy,
159}
160
161impl ExceptionPolicyBuilder {
162    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
163    pub fn retry(mut self, max_attempts: u32) -> Self {
164        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
165        self
166    }
167
168    /// Override backoff parameters for the retry (call after `.retry()`).
169    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
170        if let Some(ref mut p) = self.policy.retry {
171            p.initial_delay = initial;
172            p.multiplier = multiplier;
173            p.max_delay = max;
174        }
175        self
176    }
177
178    /// Set jitter factor for retry delays (call after `.retry()`).
179    /// Valid range: 0.0 (no jitter) to 1.0 (±100% randomization).
180    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
181        if let Some(ref mut p) = self.policy.retry {
182            p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
183        }
184        self
185    }
186
187    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
188    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
189        self.policy.handled_by = Some(uri.into());
190        self
191    }
192
193    /// Attach a custom pipeline to execute when this policy triggers.
194    pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
195        self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
196        self
197    }
198
199    /// Mark the exception as handled (suppresses re-throw to upstream).
200    pub fn handled(mut self, handled: bool) -> Self {
201        self.policy.handled = handled;
202        self
203    }
204
205    /// Finish this policy and return the updated config.
206    pub fn build(mut self) -> ErrorHandlerConfig {
207        self.config.policies.push(self.policy);
208        self.config
209    }
210}
211
212// Backwards compatibility alias
213#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
214pub type ExponentialBackoff = RedeliveryPolicy;
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use crate::BoxProcessor;
220    use crate::CamelError;
221    use std::time::Duration;
222
223    #[test]
224    fn test_redelivery_policy_defaults() {
225        let p = RedeliveryPolicy::new(3);
226        assert_eq!(p.max_attempts, 3);
227        assert_eq!(p.initial_delay, Duration::from_millis(100));
228        assert_eq!(p.multiplier, 2.0);
229        assert_eq!(p.max_delay, Duration::from_secs(10));
230        assert_eq!(p.jitter_factor, 0.0);
231    }
232
233    #[test]
234    fn test_exception_policy_matches() {
235        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
236        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
237        assert!(!(policy.matches)(&CamelError::Io("io".into())));
238    }
239
240    #[test]
241    fn test_error_handler_config_log_only() {
242        let config = ErrorHandlerConfig::log_only();
243        assert!(config.dlc_uri.is_none());
244        assert!(config.policies.is_empty());
245    }
246
247    #[test]
248    fn test_error_handler_config_dlc() {
249        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
250        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
251    }
252
253    #[test]
254    fn test_error_handler_config_with_policy() {
255        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
256            .on_exception(|e| matches!(e, CamelError::Io(_)))
257            .retry(2)
258            .handled_by("log:io-errors")
259            .build();
260        assert_eq!(config.policies.len(), 1);
261        let p = &config.policies[0];
262        assert!(p.retry.is_some());
263        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
264        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
265    }
266
267    #[test]
268    fn test_jitter_applies_randomness() {
269        let policy = RedeliveryPolicy::new(3)
270            .with_initial_delay(Duration::from_millis(100))
271            .with_jitter(0.5);
272
273        let mut delays = std::collections::HashSet::new();
274        for _ in 0..10 {
275            delays.insert(policy.delay_for(0));
276        }
277
278        assert!(delays.len() > 1, "jitter should produce varying delays");
279    }
280
281    #[test]
282    fn test_jitter_stays_within_bounds() {
283        let policy = RedeliveryPolicy::new(3)
284            .with_initial_delay(Duration::from_millis(100))
285            .with_jitter(0.5);
286
287        for _ in 0..100 {
288            let delay = policy.delay_for(0);
289            assert!(
290                delay >= Duration::from_millis(50),
291                "delay too low: {:?}",
292                delay
293            );
294            assert!(
295                delay <= Duration::from_millis(150),
296                "delay too high: {:?}",
297                delay
298            );
299        }
300    }
301
302    #[test]
303    fn test_max_attempts_zero_means_no_retries() {
304        let policy = RedeliveryPolicy::new(0);
305        assert_eq!(policy.max_attempts, 0);
306    }
307
308    #[test]
309    fn test_jitter_zero_produces_exact_delay() {
310        let policy = RedeliveryPolicy::new(3)
311            .with_initial_delay(Duration::from_millis(100))
312            .with_jitter(0.0);
313
314        for _ in 0..10 {
315            let delay = policy.delay_for(0);
316            assert_eq!(delay, Duration::from_millis(100));
317        }
318    }
319
320    #[test]
321    fn test_jitter_one_produces_wide_range() {
322        let policy = RedeliveryPolicy::new(3)
323            .with_initial_delay(Duration::from_millis(100))
324            .with_jitter(1.0);
325
326        for _ in 0..100 {
327            let delay = policy.delay_for(0);
328            assert!(
329                delay >= Duration::from_millis(0),
330                "delay should be >= 0, got {:?}",
331                delay
332            );
333            assert!(
334                delay <= Duration::from_millis(200),
335                "delay should be <= 200ms, got {:?}",
336                delay
337            );
338        }
339    }
340
341    #[test]
342    fn test_redelivery_policy_builder_methods_apply_values() {
343        let p = RedeliveryPolicy::new(5)
344            .with_initial_delay(Duration::from_millis(250))
345            .with_multiplier(3.0)
346            .with_max_delay(Duration::from_secs(2))
347            .with_jitter(2.0);
348
349        assert_eq!(p.initial_delay, Duration::from_millis(250));
350        assert_eq!(p.multiplier, 3.0);
351        assert_eq!(p.max_delay, Duration::from_secs(2));
352        assert_eq!(p.jitter_factor, 1.0);
353    }
354
355    #[test]
356    fn test_with_jitter_clamps_low_bound() {
357        let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
358        assert_eq!(p.jitter_factor, 0.0);
359    }
360
361    #[test]
362    fn test_delay_for_exponential_growth_and_cap() {
363        let p = RedeliveryPolicy::new(3)
364            .with_initial_delay(Duration::from_millis(100))
365            .with_multiplier(2.0)
366            .with_max_delay(Duration::from_millis(250));
367
368        assert_eq!(p.delay_for(0), Duration::from_millis(100));
369        assert_eq!(p.delay_for(1), Duration::from_millis(200));
370        assert_eq!(p.delay_for(2), Duration::from_millis(250));
371        assert_eq!(p.delay_for(20), Duration::from_millis(250));
372    }
373
374    #[test]
375    fn test_exception_policy_builder_backoff_and_jitter() {
376        let config = ErrorHandlerConfig::log_only()
377            .on_exception(|e| matches!(e, CamelError::Io(_)))
378            .retry(4)
379            .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
380            .with_jitter(1.5)
381            .build();
382
383        let retry = config.policies[0].retry.as_ref().unwrap();
384        assert_eq!(retry.max_attempts, 4);
385        assert_eq!(retry.initial_delay, Duration::from_millis(10));
386        assert_eq!(retry.multiplier, 1.5);
387        assert_eq!(retry.max_delay, Duration::from_millis(40));
388        assert_eq!(retry.jitter_factor, 1.0);
389    }
390
391    #[test]
392    fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
393        let config = ErrorHandlerConfig::log_only()
394            .on_exception(|_| true)
395            .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
396            .with_jitter(0.8)
397            .build();
398
399        assert!(config.policies[0].retry.is_none());
400    }
401
402    #[test]
403    fn test_exception_policy_clone_preserves_behavior_and_fields() {
404        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
405        let mut configured = policy;
406        configured.retry = Some(RedeliveryPolicy::new(2));
407        configured.handled_by = Some("log:route-errors".to_string());
408
409        let cloned = configured.clone();
410        assert!((cloned.matches)(&CamelError::RouteError("x".into())));
411        assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
412        assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
413    }
414
415    #[test]
416    fn test_delay_for_respects_max_delay_with_jitter() {
417        let policy = RedeliveryPolicy::new(5)
418            .with_initial_delay(Duration::from_millis(200))
419            .with_multiplier(10.0)
420            .with_max_delay(Duration::from_millis(500))
421            .with_jitter(0.2);
422
423        for _ in 0..30 {
424            let delay = policy.delay_for(4);
425            assert!(delay <= Duration::from_millis(600));
426            assert!(delay >= Duration::from_millis(400));
427        }
428    }
429
430    #[test]
431    fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
432        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
433            .on_exception(|e| matches!(e, CamelError::Io(_)))
434            .retry(1)
435            .build()
436            .on_exception(|e| matches!(e, CamelError::RouteError(_)))
437            .handled_by("log:routes")
438            .build();
439
440        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
441        assert_eq!(config.policies.len(), 2);
442        assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
443        assert!((config.policies[1].matches)(&CamelError::RouteError(
444            "x".into()
445        )));
446    }
447
448    #[test]
449    fn test_backoff_without_retry_does_not_create_retry_config() {
450        let config = ErrorHandlerConfig::log_only()
451            .on_exception(|_| true)
452            .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
453            .build();
454
455        assert!(config.policies[0].retry.is_none());
456    }
457}