camel_api/
error_handler.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::CamelError;
5
6pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14 pub max_attempts: u32,
15 pub initial_delay: Duration,
16 pub multiplier: f64,
17 pub max_delay: Duration,
18 pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn new(max_attempts: u32) -> Self {
27 Self {
28 max_attempts,
29 initial_delay: Duration::from_millis(100),
30 multiplier: 2.0,
31 max_delay: Duration::from_secs(10),
32 jitter_factor: 0.0,
33 }
34 }
35
36 pub fn with_initial_delay(mut self, d: Duration) -> Self {
38 self.initial_delay = d;
39 self
40 }
41
42 pub fn with_multiplier(mut self, m: f64) -> Self {
44 self.multiplier = m;
45 self
46 }
47
48 pub fn with_max_delay(mut self, d: Duration) -> Self {
50 self.max_delay = d;
51 self
52 }
53
54 pub fn with_jitter(mut self, j: f64) -> Self {
60 self.jitter_factor = j.clamp(0.0, 1.0);
61 self
62 }
63
64 pub fn delay_for(&self, attempt: u32) -> Duration {
66 let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69 if self.jitter_factor > 0.0 {
70 let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71 Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72 } else {
73 Duration::from_millis(capped_ms as u64)
74 }
75 }
76}
77
78pub struct ExceptionPolicy {
80 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
82 pub retry: Option<RedeliveryPolicy>,
84 pub handled_by: Option<String>,
86}
87
88impl ExceptionPolicy {
89 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
91 Self {
92 matches: Arc::new(matches),
93 retry: None,
94 handled_by: None,
95 }
96 }
97}
98
99impl Clone for ExceptionPolicy {
100 fn clone(&self) -> Self {
101 Self {
102 matches: Arc::clone(&self.matches),
103 retry: self.retry.clone(),
104 handled_by: self.handled_by.clone(),
105 }
106 }
107}
108
109#[derive(Clone)]
111pub struct ErrorHandlerConfig {
112 pub dlc_uri: Option<String>,
114 pub policies: Vec<ExceptionPolicy>,
116}
117
118impl ErrorHandlerConfig {
119 pub fn log_only() -> Self {
121 Self {
122 dlc_uri: None,
123 policies: Vec::new(),
124 }
125 }
126
127 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
129 Self {
130 dlc_uri: Some(uri.into()),
131 policies: Vec::new(),
132 }
133 }
134
135 pub fn on_exception(
137 self,
138 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
139 ) -> ExceptionPolicyBuilder {
140 ExceptionPolicyBuilder {
141 config: self,
142 policy: ExceptionPolicy::new(matches),
143 }
144 }
145}
146
147pub struct ExceptionPolicyBuilder {
149 config: ErrorHandlerConfig,
150 policy: ExceptionPolicy,
151}
152
153impl ExceptionPolicyBuilder {
154 pub fn retry(mut self, max_attempts: u32) -> Self {
156 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
157 self
158 }
159
160 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
162 if let Some(ref mut p) = self.policy.retry {
163 p.initial_delay = initial;
164 p.multiplier = multiplier;
165 p.max_delay = max;
166 }
167 self
168 }
169
170 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
173 if let Some(ref mut p) = self.policy.retry {
174 p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
175 }
176 self
177 }
178
179 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
181 self.policy.handled_by = Some(uri.into());
182 self
183 }
184
185 pub fn build(mut self) -> ErrorHandlerConfig {
187 self.config.policies.push(self.policy);
188 self.config
189 }
190}
191
192#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
194pub type ExponentialBackoff = RedeliveryPolicy;
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use crate::CamelError;
200 use std::time::Duration;
201
202 #[test]
203 fn test_redelivery_policy_defaults() {
204 let p = RedeliveryPolicy::new(3);
205 assert_eq!(p.max_attempts, 3);
206 assert_eq!(p.initial_delay, Duration::from_millis(100));
207 assert_eq!(p.multiplier, 2.0);
208 assert_eq!(p.max_delay, Duration::from_secs(10));
209 assert_eq!(p.jitter_factor, 0.0);
210 }
211
212 #[test]
213 fn test_exception_policy_matches() {
214 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
215 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
216 assert!(!(policy.matches)(&CamelError::Io("io".into())));
217 }
218
219 #[test]
220 fn test_error_handler_config_log_only() {
221 let config = ErrorHandlerConfig::log_only();
222 assert!(config.dlc_uri.is_none());
223 assert!(config.policies.is_empty());
224 }
225
226 #[test]
227 fn test_error_handler_config_dlc() {
228 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
229 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
230 }
231
232 #[test]
233 fn test_error_handler_config_with_policy() {
234 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
235 .on_exception(|e| matches!(e, CamelError::Io(_)))
236 .retry(2)
237 .handled_by("log:io-errors")
238 .build();
239 assert_eq!(config.policies.len(), 1);
240 let p = &config.policies[0];
241 assert!(p.retry.is_some());
242 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
243 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
244 }
245
246 #[test]
247 fn test_jitter_applies_randomness() {
248 let policy = RedeliveryPolicy::new(3)
249 .with_initial_delay(Duration::from_millis(100))
250 .with_jitter(0.5);
251
252 let mut delays = std::collections::HashSet::new();
253 for _ in 0..10 {
254 delays.insert(policy.delay_for(0));
255 }
256
257 assert!(delays.len() > 1, "jitter should produce varying delays");
258 }
259
260 #[test]
261 fn test_jitter_stays_within_bounds() {
262 let policy = RedeliveryPolicy::new(3)
263 .with_initial_delay(Duration::from_millis(100))
264 .with_jitter(0.5);
265
266 for _ in 0..100 {
267 let delay = policy.delay_for(0);
268 assert!(
269 delay >= Duration::from_millis(50),
270 "delay too low: {:?}",
271 delay
272 );
273 assert!(
274 delay <= Duration::from_millis(150),
275 "delay too high: {:?}",
276 delay
277 );
278 }
279 }
280
281 #[test]
282 fn test_max_attempts_zero_means_no_retries() {
283 let policy = RedeliveryPolicy::new(0);
284 assert_eq!(policy.max_attempts, 0);
285 }
286
287 #[test]
288 fn test_jitter_zero_produces_exact_delay() {
289 let policy = RedeliveryPolicy::new(3)
290 .with_initial_delay(Duration::from_millis(100))
291 .with_jitter(0.0);
292
293 for _ in 0..10 {
294 let delay = policy.delay_for(0);
295 assert_eq!(delay, Duration::from_millis(100));
296 }
297 }
298
299 #[test]
300 fn test_jitter_one_produces_wide_range() {
301 let policy = RedeliveryPolicy::new(3)
302 .with_initial_delay(Duration::from_millis(100))
303 .with_jitter(1.0);
304
305 for _ in 0..100 {
306 let delay = policy.delay_for(0);
307 assert!(
308 delay >= Duration::from_millis(0),
309 "delay should be >= 0, got {:?}",
310 delay
311 );
312 assert!(
313 delay <= Duration::from_millis(200),
314 "delay should be <= 200ms, got {:?}",
315 delay
316 );
317 }
318 }
319}