1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor};
5
6pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14 pub max_attempts: u32,
15 pub initial_delay: Duration,
16 pub multiplier: f64,
17 pub max_delay: Duration,
18 pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn new(max_attempts: u32) -> Self {
27 Self {
28 max_attempts,
29 initial_delay: Duration::from_millis(100),
30 multiplier: 2.0,
31 max_delay: Duration::from_secs(10),
32 jitter_factor: 0.0,
33 }
34 }
35
36 pub fn with_initial_delay(mut self, d: Duration) -> Self {
38 self.initial_delay = d;
39 self
40 }
41
42 pub fn with_multiplier(mut self, m: f64) -> Self {
44 self.multiplier = m;
45 self
46 }
47
48 pub fn with_max_delay(mut self, d: Duration) -> Self {
50 self.max_delay = d;
51 self
52 }
53
54 pub fn with_jitter(mut self, j: f64) -> Self {
60 self.jitter_factor = j.clamp(0.0, 1.0);
61 self
62 }
63
64 pub fn delay_for(&self, attempt: u32) -> Duration {
66 let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69 if self.jitter_factor > 0.0 {
70 let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71 Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72 } else {
73 Duration::from_millis(capped_ms as u64)
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
80pub enum ExceptionDisposition {
81 #[default]
83 Propagate,
84 Handled,
86 Continued,
88}
89
90#[derive(Clone, Copy, Debug, PartialEq, Eq)]
95pub struct PolicyId(pub usize);
96
97pub enum RetryOutcome {
99 Recovered(Exchange),
101 Exhausted {
103 exchange: Exchange,
104 error: CamelError,
105 policy: Option<PolicyId>,
106 },
107}
108
109pub enum StepDisposition {
112 Propagate(CamelError),
113 Handled(Exchange),
114 Continued(Exchange),
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
119pub enum BoundaryKind {
120 Security,
121 CircuitBreaker,
122 Readiness,
123}
124
125pub struct ExceptionPolicy {
127 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
129 pub retry: Option<RedeliveryPolicy>,
131 pub handled_by: Option<String>,
133 pub on_steps: Option<SyncBoxProcessor>,
135 pub disposition: ExceptionDisposition,
137}
138
139impl ExceptionPolicy {
140 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
142 Self {
143 matches: Arc::new(matches),
144 retry: None,
145 handled_by: None,
146 on_steps: None,
147 disposition: ExceptionDisposition::Propagate,
148 }
149 }
150}
151
152impl Clone for ExceptionPolicy {
153 fn clone(&self) -> Self {
154 Self {
155 matches: Arc::clone(&self.matches),
156 retry: self.retry.clone(),
157 handled_by: self.handled_by.clone(),
158 on_steps: self.on_steps.clone(),
159 disposition: self.disposition,
160 }
161 }
162}
163
164#[derive(Clone)]
166pub struct ErrorHandlerConfig {
167 pub dlc_uri: Option<String>,
169 pub policies: Vec<ExceptionPolicy>,
171}
172
173impl ErrorHandlerConfig {
174 pub fn log_only() -> Self {
176 Self {
177 dlc_uri: None,
178 policies: Vec::new(),
179 }
180 }
181
182 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
184 Self {
185 dlc_uri: Some(uri.into()),
186 policies: Vec::new(),
187 }
188 }
189
190 pub fn on_exception(
192 self,
193 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
194 ) -> ExceptionPolicyBuilder {
195 ExceptionPolicyBuilder {
196 config: self,
197 policy: ExceptionPolicy::new(matches),
198 }
199 }
200}
201
202pub struct ExceptionPolicyBuilder {
204 config: ErrorHandlerConfig,
205 policy: ExceptionPolicy,
206}
207
208impl ExceptionPolicyBuilder {
209 pub fn retry(mut self, max_attempts: u32) -> Self {
211 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
212 self
213 }
214
215 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
217 if let Some(ref mut p) = self.policy.retry {
218 p.initial_delay = initial;
219 p.multiplier = multiplier;
220 p.max_delay = max;
221 }
222 self
223 }
224
225 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
228 if let Some(ref mut p) = self.policy.retry {
229 p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
230 }
231 self
232 }
233
234 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
236 self.policy.handled_by = Some(uri.into());
237 self
238 }
239
240 pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
242 self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
243 self
244 }
245
246 pub fn handled(mut self, handled: bool) -> Self {
248 self.policy.disposition = if handled {
249 ExceptionDisposition::Handled
250 } else {
251 ExceptionDisposition::Propagate
252 };
253 self
254 }
255
256 pub fn continued(mut self, continued: bool) -> Self {
258 self.policy.disposition = if continued {
259 ExceptionDisposition::Continued
260 } else {
261 ExceptionDisposition::Propagate
262 };
263 self
264 }
265
266 pub fn propagate(mut self) -> Self {
268 self.policy.disposition = ExceptionDisposition::Propagate;
269 self
270 }
271
272 pub fn build(mut self) -> ErrorHandlerConfig {
274 self.config.policies.push(self.policy);
275 self.config
276 }
277}
278
279#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
281pub type ExponentialBackoff = RedeliveryPolicy;
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use crate::BoxProcessor;
287 use crate::CamelError;
288 use std::time::Duration;
289
290 #[test]
291 fn test_redelivery_policy_defaults() {
292 let p = RedeliveryPolicy::new(3);
293 assert_eq!(p.max_attempts, 3);
294 assert_eq!(p.initial_delay, Duration::from_millis(100));
295 assert_eq!(p.multiplier, 2.0);
296 assert_eq!(p.max_delay, Duration::from_secs(10));
297 assert_eq!(p.jitter_factor, 0.0);
298 }
299
300 #[test]
301 fn test_exception_policy_matches() {
302 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
303 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
304 assert!(!(policy.matches)(&CamelError::Io("io".into())));
305 }
306
307 #[test]
308 fn test_error_handler_config_log_only() {
309 let config = ErrorHandlerConfig::log_only();
310 assert!(config.dlc_uri.is_none());
311 assert!(config.policies.is_empty());
312 }
313
314 #[test]
315 fn test_error_handler_config_dlc() {
316 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
317 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
318 }
319
320 #[test]
321 fn test_error_handler_config_with_policy() {
322 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
323 .on_exception(|e| matches!(e, CamelError::Io(_)))
324 .retry(2)
325 .handled_by("log:io-errors")
326 .build();
327 assert_eq!(config.policies.len(), 1);
328 let p = &config.policies[0];
329 assert!(p.retry.is_some());
330 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
331 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
332 }
333
334 #[test]
335 fn test_jitter_applies_randomness() {
336 let policy = RedeliveryPolicy::new(3)
337 .with_initial_delay(Duration::from_millis(100))
338 .with_jitter(0.5);
339
340 let mut delays = std::collections::HashSet::new();
341 for _ in 0..10 {
342 delays.insert(policy.delay_for(0));
343 }
344
345 assert!(delays.len() > 1, "jitter should produce varying delays");
346 }
347
348 #[test]
349 fn test_jitter_stays_within_bounds() {
350 let policy = RedeliveryPolicy::new(3)
351 .with_initial_delay(Duration::from_millis(100))
352 .with_jitter(0.5);
353
354 for _ in 0..100 {
355 let delay = policy.delay_for(0);
356 assert!(
357 delay >= Duration::from_millis(50),
358 "delay too low: {:?}",
359 delay
360 );
361 assert!(
362 delay <= Duration::from_millis(150),
363 "delay too high: {:?}",
364 delay
365 );
366 }
367 }
368
369 #[test]
370 fn test_max_attempts_zero_means_no_retries() {
371 let policy = RedeliveryPolicy::new(0);
372 assert_eq!(policy.max_attempts, 0);
373 }
374
375 #[test]
376 fn test_jitter_zero_produces_exact_delay() {
377 let policy = RedeliveryPolicy::new(3)
378 .with_initial_delay(Duration::from_millis(100))
379 .with_jitter(0.0);
380
381 for _ in 0..10 {
382 let delay = policy.delay_for(0);
383 assert_eq!(delay, Duration::from_millis(100));
384 }
385 }
386
387 #[test]
388 fn test_jitter_one_produces_wide_range() {
389 let policy = RedeliveryPolicy::new(3)
390 .with_initial_delay(Duration::from_millis(100))
391 .with_jitter(1.0);
392
393 for _ in 0..100 {
394 let delay = policy.delay_for(0);
395 assert!(
396 delay >= Duration::from_millis(0),
397 "delay should be >= 0, got {:?}",
398 delay
399 );
400 assert!(
401 delay <= Duration::from_millis(200),
402 "delay should be <= 200ms, got {:?}",
403 delay
404 );
405 }
406 }
407
408 #[test]
409 fn test_redelivery_policy_builder_methods_apply_values() {
410 let p = RedeliveryPolicy::new(5)
411 .with_initial_delay(Duration::from_millis(250))
412 .with_multiplier(3.0)
413 .with_max_delay(Duration::from_secs(2))
414 .with_jitter(2.0);
415
416 assert_eq!(p.initial_delay, Duration::from_millis(250));
417 assert_eq!(p.multiplier, 3.0);
418 assert_eq!(p.max_delay, Duration::from_secs(2));
419 assert_eq!(p.jitter_factor, 1.0);
420 }
421
422 #[test]
423 fn test_with_jitter_clamps_low_bound() {
424 let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
425 assert_eq!(p.jitter_factor, 0.0);
426 }
427
428 #[test]
429 fn test_delay_for_exponential_growth_and_cap() {
430 let p = RedeliveryPolicy::new(3)
431 .with_initial_delay(Duration::from_millis(100))
432 .with_multiplier(2.0)
433 .with_max_delay(Duration::from_millis(250));
434
435 assert_eq!(p.delay_for(0), Duration::from_millis(100));
436 assert_eq!(p.delay_for(1), Duration::from_millis(200));
437 assert_eq!(p.delay_for(2), Duration::from_millis(250));
438 assert_eq!(p.delay_for(20), Duration::from_millis(250));
439 }
440
441 #[test]
442 fn test_exception_policy_builder_backoff_and_jitter() {
443 let config = ErrorHandlerConfig::log_only()
444 .on_exception(|e| matches!(e, CamelError::Io(_)))
445 .retry(4)
446 .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
447 .with_jitter(1.5)
448 .build();
449
450 let retry = config.policies[0].retry.as_ref().unwrap();
451 assert_eq!(retry.max_attempts, 4);
452 assert_eq!(retry.initial_delay, Duration::from_millis(10));
453 assert_eq!(retry.multiplier, 1.5);
454 assert_eq!(retry.max_delay, Duration::from_millis(40));
455 assert_eq!(retry.jitter_factor, 1.0);
456 }
457
458 #[test]
459 fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
460 let config = ErrorHandlerConfig::log_only()
461 .on_exception(|_| true)
462 .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
463 .with_jitter(0.8)
464 .build();
465
466 assert!(config.policies[0].retry.is_none());
467 }
468
469 #[test]
470 fn test_exception_policy_clone_preserves_behavior_and_fields() {
471 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
472 let mut configured = policy;
473 configured.retry = Some(RedeliveryPolicy::new(2));
474 configured.handled_by = Some("log:route-errors".to_string());
475
476 let cloned = configured.clone();
477 assert!((cloned.matches)(&CamelError::RouteError("x".into())));
478 assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
479 assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
480 }
481
482 #[test]
483 fn test_delay_for_respects_max_delay_with_jitter() {
484 let policy = RedeliveryPolicy::new(5)
485 .with_initial_delay(Duration::from_millis(200))
486 .with_multiplier(10.0)
487 .with_max_delay(Duration::from_millis(500))
488 .with_jitter(0.2);
489
490 for _ in 0..30 {
491 let delay = policy.delay_for(4);
492 assert!(delay <= Duration::from_millis(600));
493 assert!(delay >= Duration::from_millis(400));
494 }
495 }
496
497 #[test]
498 fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
499 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
500 .on_exception(|e| matches!(e, CamelError::Io(_)))
501 .retry(1)
502 .build()
503 .on_exception(|e| matches!(e, CamelError::RouteError(_)))
504 .handled_by("log:routes")
505 .build();
506
507 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
508 assert_eq!(config.policies.len(), 2);
509 assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
510 assert!((config.policies[1].matches)(&CamelError::RouteError(
511 "x".into()
512 )));
513 }
514
515 #[test]
516 fn test_backoff_without_retry_does_not_create_retry_config() {
517 let config = ErrorHandlerConfig::log_only()
518 .on_exception(|_| true)
519 .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
520 .build();
521
522 assert!(config.policies[0].retry.is_none());
523 }
524
525 #[test]
526 fn test_exception_disposition_default_is_propagate() {
527 assert_eq!(
528 ExceptionDisposition::default(),
529 ExceptionDisposition::Propagate
530 );
531 }
532
533 #[test]
534 fn test_exception_policy_new_has_propagate_disposition() {
535 let p = ExceptionPolicy::new(|_| true);
536 assert_eq!(p.disposition, ExceptionDisposition::Propagate);
537 }
538
539 #[test]
540 fn test_policy_id_equality() {
541 assert_eq!(PolicyId(0), PolicyId(0));
542 assert_ne!(PolicyId(0), PolicyId(1));
543 }
544
545 #[test]
546 fn test_builder_continued_sets_disposition() {
547 let cfg = ErrorHandlerConfig::log_only()
548 .on_exception(|_| true)
549 .continued(true)
550 .build();
551 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
552 }
553
554 #[test]
555 fn test_builder_propagate_sets_disposition() {
556 let cfg = ErrorHandlerConfig::log_only()
557 .on_exception(|_| true)
558 .propagate()
559 .build();
560 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
561 }
562
563 #[test]
564 fn test_builder_handled_true_still_works() {
565 let cfg = ErrorHandlerConfig::log_only()
566 .on_exception(|_| true)
567 .handled(true)
568 .build();
569 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
570 }
571}