Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::CamelError;
5
6/// Exponential backoff configuration for retry.
7#[derive(Debug, Clone)]
8pub struct ExponentialBackoff {
9    pub max_attempts: u32,
10    pub initial_delay: Duration,
11    pub multiplier: f64,
12    pub max_delay: Duration,
13}
14
15impl ExponentialBackoff {
16    /// Create a new backoff config with default delays (100ms initial, 2x multiplier, 10s max).
17    pub fn new(max_attempts: u32) -> Self {
18        Self {
19            max_attempts,
20            initial_delay: Duration::from_millis(100),
21            multiplier: 2.0,
22            max_delay: Duration::from_secs(10),
23        }
24    }
25
26    /// Override the initial delay before the first retry.
27    pub fn with_initial_delay(mut self, d: Duration) -> Self {
28        self.initial_delay = d;
29        self
30    }
31
32    /// Override the backoff multiplier applied after each attempt.
33    pub fn with_multiplier(mut self, m: f64) -> Self {
34        self.multiplier = m;
35        self
36    }
37
38    /// Cap the maximum delay between retries.
39    pub fn with_max_delay(mut self, d: Duration) -> Self {
40        self.max_delay = d;
41        self
42    }
43
44    /// Compute the sleep duration before retry attempt N (0-indexed).
45    pub fn delay_for(&self, attempt: u32) -> Duration {
46        let millis = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
47        let d = Duration::from_millis(millis as u64);
48        d.min(self.max_delay)
49    }
50}
51
52/// A rule that matches specific errors and defines retry + redirect behaviour.
53pub struct ExceptionPolicy {
54    /// Predicate: returns `true` if this policy applies to the given error.
55    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
56    /// Optional retry configuration; if absent, no retries are attempted.
57    pub retry: Option<ExponentialBackoff>,
58    /// Optional URI of a specific endpoint to route failed exchanges to.
59    pub handled_by: Option<String>,
60}
61
62impl ExceptionPolicy {
63    /// Create a new policy that matches errors using the given predicate.
64    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
65        Self {
66            matches: Arc::new(matches),
67            retry: None,
68            handled_by: None,
69        }
70    }
71}
72
73impl Clone for ExceptionPolicy {
74    fn clone(&self) -> Self {
75        Self {
76            matches: Arc::clone(&self.matches),
77            retry: self.retry.clone(),
78            handled_by: self.handled_by.clone(),
79        }
80    }
81}
82
83/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
84#[derive(Clone)]
85pub struct ErrorHandlerConfig {
86    /// URI of the Dead Letter Channel endpoint (None = log only).
87    pub dlc_uri: Option<String>,
88    /// Per-exception policies evaluated in order; first match wins.
89    pub policies: Vec<ExceptionPolicy>,
90}
91
92impl ErrorHandlerConfig {
93    /// Log-only error handler: errors are logged but not forwarded anywhere.
94    pub fn log_only() -> Self {
95        Self {
96            dlc_uri: None,
97            policies: Vec::new(),
98        }
99    }
100
101    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
102    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
103        Self {
104            dlc_uri: Some(uri.into()),
105            policies: Vec::new(),
106        }
107    }
108
109    /// Start building an `ExceptionPolicy` attached to this config.
110    pub fn on_exception(
111        self,
112        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
113    ) -> ExceptionPolicyBuilder {
114        ExceptionPolicyBuilder {
115            config: self,
116            policy: ExceptionPolicy::new(matches),
117        }
118    }
119}
120
121/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
122pub struct ExceptionPolicyBuilder {
123    config: ErrorHandlerConfig,
124    policy: ExceptionPolicy,
125}
126
127impl ExceptionPolicyBuilder {
128    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
129    pub fn retry(mut self, max_attempts: u32) -> Self {
130        self.policy.retry = Some(ExponentialBackoff::new(max_attempts));
131        self
132    }
133
134    /// Override backoff parameters for the retry (call after `.retry()`).
135    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
136        if let Some(ref mut b) = self.policy.retry {
137            b.initial_delay = initial;
138            b.multiplier = multiplier;
139            b.max_delay = max;
140        }
141        self
142    }
143
144    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
145    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
146        self.policy.handled_by = Some(uri.into());
147        self
148    }
149
150    /// Finish this policy and return the updated config.
151    pub fn build(mut self) -> ErrorHandlerConfig {
152        self.config.policies.push(self.policy);
153        self.config
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::CamelError;
161    use std::time::Duration;
162
163    #[test]
164    fn test_exponential_backoff_defaults() {
165        let b = ExponentialBackoff::new(3);
166        assert_eq!(b.max_attempts, 3);
167        assert_eq!(b.initial_delay, Duration::from_millis(100));
168        assert_eq!(b.multiplier, 2.0);
169        assert_eq!(b.max_delay, Duration::from_secs(10));
170    }
171
172    #[test]
173    fn test_exception_policy_matches() {
174        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
175        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
176        assert!(!(policy.matches)(&CamelError::Io("io".into())));
177    }
178
179    #[test]
180    fn test_error_handler_config_log_only() {
181        let config = ErrorHandlerConfig::log_only();
182        assert!(config.dlc_uri.is_none());
183        assert!(config.policies.is_empty());
184    }
185
186    #[test]
187    fn test_error_handler_config_dlc() {
188        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
189        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
190    }
191
192    #[test]
193    fn test_error_handler_config_with_policy() {
194        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
195            .on_exception(|e| matches!(e, CamelError::Io(_)))
196            .retry(2)
197            .handled_by("log:io-errors")
198            .build();
199        assert_eq!(config.policies.len(), 1);
200        let p = &config.policies[0];
201        assert!(p.retry.is_some());
202        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
203        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
204    }
205}