1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, SyncBoxProcessor};
5
6pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14 pub max_attempts: u32,
15 pub initial_delay: Duration,
16 pub multiplier: f64,
17 pub max_delay: Duration,
18 pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn new(max_attempts: u32) -> Self {
27 Self {
28 max_attempts,
29 initial_delay: Duration::from_millis(100),
30 multiplier: 2.0,
31 max_delay: Duration::from_secs(10),
32 jitter_factor: 0.0,
33 }
34 }
35
36 pub fn with_initial_delay(mut self, d: Duration) -> Self {
38 self.initial_delay = d;
39 self
40 }
41
42 pub fn with_multiplier(mut self, m: f64) -> Self {
44 self.multiplier = m;
45 self
46 }
47
48 pub fn with_max_delay(mut self, d: Duration) -> Self {
50 self.max_delay = d;
51 self
52 }
53
54 pub fn with_jitter(mut self, j: f64) -> Self {
60 self.jitter_factor = j.clamp(0.0, 1.0);
61 self
62 }
63
64 pub fn delay_for(&self, attempt: u32) -> Duration {
66 let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69 if self.jitter_factor > 0.0 {
70 let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71 Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72 } else {
73 Duration::from_millis(capped_ms as u64)
74 }
75 }
76}
77
78pub struct ExceptionPolicy {
80 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
82 pub retry: Option<RedeliveryPolicy>,
84 pub handled_by: Option<String>,
86 pub on_steps: Option<SyncBoxProcessor>,
88 pub handled: bool,
90}
91
92impl ExceptionPolicy {
93 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
95 Self {
96 matches: Arc::new(matches),
97 retry: None,
98 handled_by: None,
99 on_steps: None,
100 handled: false,
101 }
102 }
103}
104
105impl Clone for ExceptionPolicy {
106 fn clone(&self) -> Self {
107 Self {
108 matches: Arc::clone(&self.matches),
109 retry: self.retry.clone(),
110 handled_by: self.handled_by.clone(),
111 on_steps: self.on_steps.clone(),
112 handled: self.handled,
113 }
114 }
115}
116
117#[derive(Clone)]
119pub struct ErrorHandlerConfig {
120 pub dlc_uri: Option<String>,
122 pub policies: Vec<ExceptionPolicy>,
124}
125
126impl ErrorHandlerConfig {
127 pub fn log_only() -> Self {
129 Self {
130 dlc_uri: None,
131 policies: Vec::new(),
132 }
133 }
134
135 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
137 Self {
138 dlc_uri: Some(uri.into()),
139 policies: Vec::new(),
140 }
141 }
142
143 pub fn on_exception(
145 self,
146 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
147 ) -> ExceptionPolicyBuilder {
148 ExceptionPolicyBuilder {
149 config: self,
150 policy: ExceptionPolicy::new(matches),
151 }
152 }
153}
154
155pub struct ExceptionPolicyBuilder {
157 config: ErrorHandlerConfig,
158 policy: ExceptionPolicy,
159}
160
161impl ExceptionPolicyBuilder {
162 pub fn retry(mut self, max_attempts: u32) -> Self {
164 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
165 self
166 }
167
168 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
170 if let Some(ref mut p) = self.policy.retry {
171 p.initial_delay = initial;
172 p.multiplier = multiplier;
173 p.max_delay = max;
174 }
175 self
176 }
177
178 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
181 if let Some(ref mut p) = self.policy.retry {
182 p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
183 }
184 self
185 }
186
187 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
189 self.policy.handled_by = Some(uri.into());
190 self
191 }
192
193 pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
195 self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
196 self
197 }
198
199 pub fn handled(mut self, handled: bool) -> Self {
201 self.policy.handled = handled;
202 self
203 }
204
205 pub fn build(mut self) -> ErrorHandlerConfig {
207 self.config.policies.push(self.policy);
208 self.config
209 }
210}
211
212#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
214pub type ExponentialBackoff = RedeliveryPolicy;
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219 use crate::BoxProcessor;
220 use crate::CamelError;
221 use std::time::Duration;
222
223 #[test]
224 fn test_redelivery_policy_defaults() {
225 let p = RedeliveryPolicy::new(3);
226 assert_eq!(p.max_attempts, 3);
227 assert_eq!(p.initial_delay, Duration::from_millis(100));
228 assert_eq!(p.multiplier, 2.0);
229 assert_eq!(p.max_delay, Duration::from_secs(10));
230 assert_eq!(p.jitter_factor, 0.0);
231 }
232
233 #[test]
234 fn test_exception_policy_matches() {
235 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
236 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
237 assert!(!(policy.matches)(&CamelError::Io("io".into())));
238 }
239
240 #[test]
241 fn test_error_handler_config_log_only() {
242 let config = ErrorHandlerConfig::log_only();
243 assert!(config.dlc_uri.is_none());
244 assert!(config.policies.is_empty());
245 }
246
247 #[test]
248 fn test_error_handler_config_dlc() {
249 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
250 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
251 }
252
253 #[test]
254 fn test_error_handler_config_with_policy() {
255 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
256 .on_exception(|e| matches!(e, CamelError::Io(_)))
257 .retry(2)
258 .handled_by("log:io-errors")
259 .build();
260 assert_eq!(config.policies.len(), 1);
261 let p = &config.policies[0];
262 assert!(p.retry.is_some());
263 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
264 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
265 }
266
267 #[test]
268 fn test_jitter_applies_randomness() {
269 let policy = RedeliveryPolicy::new(3)
270 .with_initial_delay(Duration::from_millis(100))
271 .with_jitter(0.5);
272
273 let mut delays = std::collections::HashSet::new();
274 for _ in 0..10 {
275 delays.insert(policy.delay_for(0));
276 }
277
278 assert!(delays.len() > 1, "jitter should produce varying delays");
279 }
280
281 #[test]
282 fn test_jitter_stays_within_bounds() {
283 let policy = RedeliveryPolicy::new(3)
284 .with_initial_delay(Duration::from_millis(100))
285 .with_jitter(0.5);
286
287 for _ in 0..100 {
288 let delay = policy.delay_for(0);
289 assert!(
290 delay >= Duration::from_millis(50),
291 "delay too low: {:?}",
292 delay
293 );
294 assert!(
295 delay <= Duration::from_millis(150),
296 "delay too high: {:?}",
297 delay
298 );
299 }
300 }
301
302 #[test]
303 fn test_max_attempts_zero_means_no_retries() {
304 let policy = RedeliveryPolicy::new(0);
305 assert_eq!(policy.max_attempts, 0);
306 }
307
308 #[test]
309 fn test_jitter_zero_produces_exact_delay() {
310 let policy = RedeliveryPolicy::new(3)
311 .with_initial_delay(Duration::from_millis(100))
312 .with_jitter(0.0);
313
314 for _ in 0..10 {
315 let delay = policy.delay_for(0);
316 assert_eq!(delay, Duration::from_millis(100));
317 }
318 }
319
320 #[test]
321 fn test_jitter_one_produces_wide_range() {
322 let policy = RedeliveryPolicy::new(3)
323 .with_initial_delay(Duration::from_millis(100))
324 .with_jitter(1.0);
325
326 for _ in 0..100 {
327 let delay = policy.delay_for(0);
328 assert!(
329 delay >= Duration::from_millis(0),
330 "delay should be >= 0, got {:?}",
331 delay
332 );
333 assert!(
334 delay <= Duration::from_millis(200),
335 "delay should be <= 200ms, got {:?}",
336 delay
337 );
338 }
339 }
340
341 #[test]
342 fn test_redelivery_policy_builder_methods_apply_values() {
343 let p = RedeliveryPolicy::new(5)
344 .with_initial_delay(Duration::from_millis(250))
345 .with_multiplier(3.0)
346 .with_max_delay(Duration::from_secs(2))
347 .with_jitter(2.0);
348
349 assert_eq!(p.initial_delay, Duration::from_millis(250));
350 assert_eq!(p.multiplier, 3.0);
351 assert_eq!(p.max_delay, Duration::from_secs(2));
352 assert_eq!(p.jitter_factor, 1.0);
353 }
354
355 #[test]
356 fn test_with_jitter_clamps_low_bound() {
357 let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
358 assert_eq!(p.jitter_factor, 0.0);
359 }
360
361 #[test]
362 fn test_delay_for_exponential_growth_and_cap() {
363 let p = RedeliveryPolicy::new(3)
364 .with_initial_delay(Duration::from_millis(100))
365 .with_multiplier(2.0)
366 .with_max_delay(Duration::from_millis(250));
367
368 assert_eq!(p.delay_for(0), Duration::from_millis(100));
369 assert_eq!(p.delay_for(1), Duration::from_millis(200));
370 assert_eq!(p.delay_for(2), Duration::from_millis(250));
371 assert_eq!(p.delay_for(20), Duration::from_millis(250));
372 }
373
374 #[test]
375 fn test_exception_policy_builder_backoff_and_jitter() {
376 let config = ErrorHandlerConfig::log_only()
377 .on_exception(|e| matches!(e, CamelError::Io(_)))
378 .retry(4)
379 .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
380 .with_jitter(1.5)
381 .build();
382
383 let retry = config.policies[0].retry.as_ref().unwrap();
384 assert_eq!(retry.max_attempts, 4);
385 assert_eq!(retry.initial_delay, Duration::from_millis(10));
386 assert_eq!(retry.multiplier, 1.5);
387 assert_eq!(retry.max_delay, Duration::from_millis(40));
388 assert_eq!(retry.jitter_factor, 1.0);
389 }
390
391 #[test]
392 fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
393 let config = ErrorHandlerConfig::log_only()
394 .on_exception(|_| true)
395 .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
396 .with_jitter(0.8)
397 .build();
398
399 assert!(config.policies[0].retry.is_none());
400 }
401
402 #[test]
403 fn test_exception_policy_clone_preserves_behavior_and_fields() {
404 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
405 let mut configured = policy;
406 configured.retry = Some(RedeliveryPolicy::new(2));
407 configured.handled_by = Some("log:route-errors".to_string());
408
409 let cloned = configured.clone();
410 assert!((cloned.matches)(&CamelError::RouteError("x".into())));
411 assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
412 assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
413 }
414
415 #[test]
416 fn test_delay_for_respects_max_delay_with_jitter() {
417 let policy = RedeliveryPolicy::new(5)
418 .with_initial_delay(Duration::from_millis(200))
419 .with_multiplier(10.0)
420 .with_max_delay(Duration::from_millis(500))
421 .with_jitter(0.2);
422
423 for _ in 0..30 {
424 let delay = policy.delay_for(4);
425 assert!(delay <= Duration::from_millis(600));
426 assert!(delay >= Duration::from_millis(400));
427 }
428 }
429
430 #[test]
431 fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
432 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
433 .on_exception(|e| matches!(e, CamelError::Io(_)))
434 .retry(1)
435 .build()
436 .on_exception(|e| matches!(e, CamelError::RouteError(_)))
437 .handled_by("log:routes")
438 .build();
439
440 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
441 assert_eq!(config.policies.len(), 2);
442 assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
443 assert!((config.policies[1].matches)(&CamelError::RouteError(
444 "x".into()
445 )));
446 }
447
448 #[test]
449 fn test_backoff_without_retry_does_not_create_retry_config() {
450 let config = ErrorHandlerConfig::log_only()
451 .on_exception(|_| true)
452 .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
453 .build();
454
455 assert!(config.policies[0].retry.is_none());
456 }
457}