Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::CamelError;
5
6/// Camel-compatible header names for redelivery state.
7pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11/// Redelivery policy with exponential backoff and optional jitter.
12#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14    pub max_attempts: u32,
15    pub initial_delay: Duration,
16    pub multiplier: f64,
17    pub max_delay: Duration,
18    pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22    /// Create a new policy with default delays (100ms initial, 2x multiplier, 10s max, no jitter).
23    ///
24    /// Note: `max_attempts = 0` means no retries (immediate failure to DLC/handler).
25    /// Use `max_attempts > 0` to enable retry behavior.
26    pub fn new(max_attempts: u32) -> Self {
27        Self {
28            max_attempts,
29            initial_delay: Duration::from_millis(100),
30            multiplier: 2.0,
31            max_delay: Duration::from_secs(10),
32            jitter_factor: 0.0,
33        }
34    }
35
36    /// Override the initial delay before the first retry.
37    pub fn with_initial_delay(mut self, d: Duration) -> Self {
38        self.initial_delay = d;
39        self
40    }
41
42    /// Override the backoff multiplier applied after each attempt.
43    pub fn with_multiplier(mut self, m: f64) -> Self {
44        self.multiplier = m;
45        self
46    }
47
48    /// Cap the maximum delay between retries.
49    pub fn with_max_delay(mut self, d: Duration) -> Self {
50        self.max_delay = d;
51        self
52    }
53
54    /// Set jitter factor (0.0 = no jitter, 0.2 = ±20% randomization).
55    ///
56    /// Recommended values: 0.1-0.3 (10-30%) for most use cases.
57    /// Helps prevent thundering herd problems in distributed systems
58    /// by adding randomization to retry timing.
59    pub fn with_jitter(mut self, j: f64) -> Self {
60        self.jitter_factor = j.clamp(0.0, 1.0);
61        self
62    }
63
64    /// Compute the sleep duration before retry attempt N (0-indexed) with jitter applied.
65    pub fn delay_for(&self, attempt: u32) -> Duration {
66        let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69        if self.jitter_factor > 0.0 {
70            let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71            Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72        } else {
73            Duration::from_millis(capped_ms as u64)
74        }
75    }
76}
77
78/// A rule that matches specific errors and defines retry + redirect behaviour.
79pub struct ExceptionPolicy {
80    /// Predicate: returns `true` if this policy applies to the given error.
81    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
82    /// Optional retry configuration; if absent, no retries are attempted.
83    pub retry: Option<RedeliveryPolicy>,
84    /// Optional URI of a specific endpoint to route failed exchanges to.
85    pub handled_by: Option<String>,
86}
87
88impl ExceptionPolicy {
89    /// Create a new policy that matches errors using the given predicate.
90    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
91        Self {
92            matches: Arc::new(matches),
93            retry: None,
94            handled_by: None,
95        }
96    }
97}
98
99impl Clone for ExceptionPolicy {
100    fn clone(&self) -> Self {
101        Self {
102            matches: Arc::clone(&self.matches),
103            retry: self.retry.clone(),
104            handled_by: self.handled_by.clone(),
105        }
106    }
107}
108
109/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
110#[derive(Clone)]
111pub struct ErrorHandlerConfig {
112    /// URI of the Dead Letter Channel endpoint (None = log only).
113    pub dlc_uri: Option<String>,
114    /// Per-exception policies evaluated in order; first match wins.
115    pub policies: Vec<ExceptionPolicy>,
116}
117
118impl ErrorHandlerConfig {
119    /// Log-only error handler: errors are logged but not forwarded anywhere.
120    pub fn log_only() -> Self {
121        Self {
122            dlc_uri: None,
123            policies: Vec::new(),
124        }
125    }
126
127    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
128    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
129        Self {
130            dlc_uri: Some(uri.into()),
131            policies: Vec::new(),
132        }
133    }
134
135    /// Start building an `ExceptionPolicy` attached to this config.
136    pub fn on_exception(
137        self,
138        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
139    ) -> ExceptionPolicyBuilder {
140        ExceptionPolicyBuilder {
141            config: self,
142            policy: ExceptionPolicy::new(matches),
143        }
144    }
145}
146
147/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
148pub struct ExceptionPolicyBuilder {
149    config: ErrorHandlerConfig,
150    policy: ExceptionPolicy,
151}
152
153impl ExceptionPolicyBuilder {
154    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
155    pub fn retry(mut self, max_attempts: u32) -> Self {
156        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
157        self
158    }
159
160    /// Override backoff parameters for the retry (call after `.retry()`).
161    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
162        if let Some(ref mut p) = self.policy.retry {
163            p.initial_delay = initial;
164            p.multiplier = multiplier;
165            p.max_delay = max;
166        }
167        self
168    }
169
170    /// Set jitter factor for retry delays (call after `.retry()`).
171    /// Valid range: 0.0 (no jitter) to 1.0 (±100% randomization).
172    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
173        if let Some(ref mut p) = self.policy.retry {
174            p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
175        }
176        self
177    }
178
179    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
180    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
181        self.policy.handled_by = Some(uri.into());
182        self
183    }
184
185    /// Finish this policy and return the updated config.
186    pub fn build(mut self) -> ErrorHandlerConfig {
187        self.config.policies.push(self.policy);
188        self.config
189    }
190}
191
192// Backwards compatibility alias
193#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
194pub type ExponentialBackoff = RedeliveryPolicy;
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use crate::CamelError;
200    use std::time::Duration;
201
202    #[test]
203    fn test_redelivery_policy_defaults() {
204        let p = RedeliveryPolicy::new(3);
205        assert_eq!(p.max_attempts, 3);
206        assert_eq!(p.initial_delay, Duration::from_millis(100));
207        assert_eq!(p.multiplier, 2.0);
208        assert_eq!(p.max_delay, Duration::from_secs(10));
209        assert_eq!(p.jitter_factor, 0.0);
210    }
211
212    #[test]
213    fn test_exception_policy_matches() {
214        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
215        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
216        assert!(!(policy.matches)(&CamelError::Io("io".into())));
217    }
218
219    #[test]
220    fn test_error_handler_config_log_only() {
221        let config = ErrorHandlerConfig::log_only();
222        assert!(config.dlc_uri.is_none());
223        assert!(config.policies.is_empty());
224    }
225
226    #[test]
227    fn test_error_handler_config_dlc() {
228        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
229        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
230    }
231
232    #[test]
233    fn test_error_handler_config_with_policy() {
234        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
235            .on_exception(|e| matches!(e, CamelError::Io(_)))
236            .retry(2)
237            .handled_by("log:io-errors")
238            .build();
239        assert_eq!(config.policies.len(), 1);
240        let p = &config.policies[0];
241        assert!(p.retry.is_some());
242        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
243        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
244    }
245
246    #[test]
247    fn test_jitter_applies_randomness() {
248        let policy = RedeliveryPolicy::new(3)
249            .with_initial_delay(Duration::from_millis(100))
250            .with_jitter(0.5);
251
252        let mut delays = std::collections::HashSet::new();
253        for _ in 0..10 {
254            delays.insert(policy.delay_for(0));
255        }
256
257        assert!(delays.len() > 1, "jitter should produce varying delays");
258    }
259
260    #[test]
261    fn test_jitter_stays_within_bounds() {
262        let policy = RedeliveryPolicy::new(3)
263            .with_initial_delay(Duration::from_millis(100))
264            .with_jitter(0.5);
265
266        for _ in 0..100 {
267            let delay = policy.delay_for(0);
268            assert!(
269                delay >= Duration::from_millis(50),
270                "delay too low: {:?}",
271                delay
272            );
273            assert!(
274                delay <= Duration::from_millis(150),
275                "delay too high: {:?}",
276                delay
277            );
278        }
279    }
280
281    #[test]
282    fn test_max_attempts_zero_means_no_retries() {
283        let policy = RedeliveryPolicy::new(0);
284        assert_eq!(policy.max_attempts, 0);
285    }
286
287    #[test]
288    fn test_jitter_zero_produces_exact_delay() {
289        let policy = RedeliveryPolicy::new(3)
290            .with_initial_delay(Duration::from_millis(100))
291            .with_jitter(0.0);
292
293        for _ in 0..10 {
294            let delay = policy.delay_for(0);
295            assert_eq!(delay, Duration::from_millis(100));
296        }
297    }
298
299    #[test]
300    fn test_jitter_one_produces_wide_range() {
301        let policy = RedeliveryPolicy::new(3)
302            .with_initial_delay(Duration::from_millis(100))
303            .with_jitter(1.0);
304
305        for _ in 0..100 {
306            let delay = policy.delay_for(0);
307            assert!(
308                delay >= Duration::from_millis(0),
309                "delay should be >= 0, got {:?}",
310                delay
311            );
312            assert!(
313                delay <= Duration::from_millis(200),
314                "delay should be <= 200ms, got {:?}",
315                delay
316            );
317        }
318    }
319}