1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor};
5
6pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14 pub max_attempts: u32,
15 pub initial_delay: Duration,
16 pub multiplier: f64,
17 pub max_delay: Duration,
18 pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn new(max_attempts: u32) -> Self {
27 Self {
28 max_attempts,
29 initial_delay: Duration::from_millis(100),
30 multiplier: 2.0,
31 max_delay: Duration::from_secs(10),
32 jitter_factor: 0.0,
33 }
34 }
35
36 pub fn with_initial_delay(mut self, d: Duration) -> Self {
38 self.initial_delay = d;
39 self
40 }
41
42 pub fn with_multiplier(mut self, m: f64) -> Self {
44 self.multiplier = m;
45 self
46 }
47
48 pub fn with_max_delay(mut self, d: Duration) -> Self {
50 self.max_delay = d;
51 self
52 }
53
54 pub fn with_jitter(mut self, j: f64) -> Self {
60 self.jitter_factor = j.clamp(0.0, 1.0);
61 self
62 }
63
64 pub fn delay_for(&self, attempt: u32) -> Duration {
66 let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69 if self.jitter_factor > 0.0 {
70 let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71 Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72 } else {
73 Duration::from_millis(capped_ms as u64)
74 }
75 }
76}
77
78#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
91#[serde(rename_all = "lowercase")]
92pub enum ExceptionDisposition {
93 #[default]
95 Propagate,
96 Handled,
98 Continued,
100}
101
102#[derive(Clone, Copy, Debug, PartialEq, Eq)]
107pub struct PolicyId(pub usize);
108
109pub enum RetryOutcome {
111 Recovered(Exchange),
113 Exhausted {
115 exchange: Exchange,
116 error: CamelError,
117 policy: Option<PolicyId>,
118 },
119}
120
121pub enum StepDisposition {
124 Propagate(CamelError),
125 Handled(Exchange),
126 Continued(Exchange),
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum BoundaryKind {
132 Security,
133 CircuitBreaker,
134 Readiness,
135}
136
137pub struct ExceptionPolicy {
139 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
141 pub retry: Option<RedeliveryPolicy>,
143 pub handled_by: Option<String>,
145 pub on_steps: Option<SyncBoxProcessor>,
147 pub disposition: ExceptionDisposition,
149}
150
151impl ExceptionPolicy {
152 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
154 Self {
155 matches: Arc::new(matches),
156 retry: None,
157 handled_by: None,
158 on_steps: None,
159 disposition: ExceptionDisposition::Propagate,
160 }
161 }
162}
163
164impl Clone for ExceptionPolicy {
165 fn clone(&self) -> Self {
166 Self {
167 matches: Arc::clone(&self.matches),
168 retry: self.retry.clone(),
169 handled_by: self.handled_by.clone(),
170 on_steps: self.on_steps.clone(),
171 disposition: self.disposition,
172 }
173 }
174}
175
176#[derive(Clone)]
178pub struct ErrorHandlerConfig {
179 pub dlc_uri: Option<String>,
181 pub policies: Vec<ExceptionPolicy>,
183}
184
185impl ErrorHandlerConfig {
186 pub fn log_only() -> Self {
188 Self {
189 dlc_uri: None,
190 policies: Vec::new(),
191 }
192 }
193
194 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
196 Self {
197 dlc_uri: Some(uri.into()),
198 policies: Vec::new(),
199 }
200 }
201
202 pub fn on_exception(
204 self,
205 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
206 ) -> ExceptionPolicyBuilder {
207 ExceptionPolicyBuilder {
208 config: self,
209 policy: ExceptionPolicy::new(matches),
210 }
211 }
212}
213
214pub struct ExceptionPolicyBuilder {
216 config: ErrorHandlerConfig,
217 policy: ExceptionPolicy,
218}
219
220impl ExceptionPolicyBuilder {
221 pub fn retry(mut self, max_attempts: u32) -> Self {
223 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
224 self
225 }
226
227 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
229 if let Some(ref mut p) = self.policy.retry {
230 p.initial_delay = initial;
231 p.multiplier = multiplier;
232 p.max_delay = max;
233 }
234 self
235 }
236
237 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
240 if let Some(ref mut p) = self.policy.retry {
241 p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
242 }
243 self
244 }
245
246 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
248 self.policy.handled_by = Some(uri.into());
249 self
250 }
251
252 pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
254 self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
255 self
256 }
257
258 pub fn handled(mut self, handled: bool) -> Self {
260 self.policy.disposition = if handled {
261 ExceptionDisposition::Handled
262 } else {
263 ExceptionDisposition::Propagate
264 };
265 self
266 }
267
268 pub fn continued(mut self, continued: bool) -> Self {
270 self.policy.disposition = if continued {
271 ExceptionDisposition::Continued
272 } else {
273 ExceptionDisposition::Propagate
274 };
275 self
276 }
277
278 pub fn propagate(mut self) -> Self {
280 self.policy.disposition = ExceptionDisposition::Propagate;
281 self
282 }
283
284 pub fn build(mut self) -> ErrorHandlerConfig {
286 self.config.policies.push(self.policy);
287 self.config
288 }
289}
290
291#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
293pub type ExponentialBackoff = RedeliveryPolicy;
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::BoxProcessor;
299 use crate::CamelError;
300 use std::time::Duration;
301
302 #[test]
303 fn test_redelivery_policy_defaults() {
304 let p = RedeliveryPolicy::new(3);
305 assert_eq!(p.max_attempts, 3);
306 assert_eq!(p.initial_delay, Duration::from_millis(100));
307 assert_eq!(p.multiplier, 2.0);
308 assert_eq!(p.max_delay, Duration::from_secs(10));
309 assert_eq!(p.jitter_factor, 0.0);
310 }
311
312 #[test]
313 fn test_exception_policy_matches() {
314 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
315 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
316 assert!(!(policy.matches)(&CamelError::Io("io".into())));
317 }
318
319 #[test]
320 fn test_error_handler_config_log_only() {
321 let config = ErrorHandlerConfig::log_only();
322 assert!(config.dlc_uri.is_none());
323 assert!(config.policies.is_empty());
324 }
325
326 #[test]
327 fn test_error_handler_config_dlc() {
328 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
329 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
330 }
331
332 #[test]
333 fn test_error_handler_config_with_policy() {
334 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
335 .on_exception(|e| matches!(e, CamelError::Io(_)))
336 .retry(2)
337 .handled_by("log:io-errors")
338 .build();
339 assert_eq!(config.policies.len(), 1);
340 let p = &config.policies[0];
341 assert!(p.retry.is_some());
342 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
343 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
344 }
345
346 #[test]
347 fn test_jitter_applies_randomness() {
348 let policy = RedeliveryPolicy::new(3)
349 .with_initial_delay(Duration::from_millis(100))
350 .with_jitter(0.5);
351
352 let mut delays = std::collections::HashSet::new();
353 for _ in 0..10 {
354 delays.insert(policy.delay_for(0));
355 }
356
357 assert!(delays.len() > 1, "jitter should produce varying delays");
358 }
359
360 #[test]
361 fn test_jitter_stays_within_bounds() {
362 let policy = RedeliveryPolicy::new(3)
363 .with_initial_delay(Duration::from_millis(100))
364 .with_jitter(0.5);
365
366 for _ in 0..100 {
367 let delay = policy.delay_for(0);
368 assert!(
369 delay >= Duration::from_millis(50),
370 "delay too low: {:?}",
371 delay
372 );
373 assert!(
374 delay <= Duration::from_millis(150),
375 "delay too high: {:?}",
376 delay
377 );
378 }
379 }
380
381 #[test]
382 fn test_max_attempts_zero_means_no_retries() {
383 let policy = RedeliveryPolicy::new(0);
384 assert_eq!(policy.max_attempts, 0);
385 }
386
387 #[test]
388 fn test_jitter_zero_produces_exact_delay() {
389 let policy = RedeliveryPolicy::new(3)
390 .with_initial_delay(Duration::from_millis(100))
391 .with_jitter(0.0);
392
393 for _ in 0..10 {
394 let delay = policy.delay_for(0);
395 assert_eq!(delay, Duration::from_millis(100));
396 }
397 }
398
399 #[test]
400 fn test_jitter_one_produces_wide_range() {
401 let policy = RedeliveryPolicy::new(3)
402 .with_initial_delay(Duration::from_millis(100))
403 .with_jitter(1.0);
404
405 for _ in 0..100 {
406 let delay = policy.delay_for(0);
407 assert!(
408 delay >= Duration::from_millis(0),
409 "delay should be >= 0, got {:?}",
410 delay
411 );
412 assert!(
413 delay <= Duration::from_millis(200),
414 "delay should be <= 200ms, got {:?}",
415 delay
416 );
417 }
418 }
419
420 #[test]
421 fn test_redelivery_policy_builder_methods_apply_values() {
422 let p = RedeliveryPolicy::new(5)
423 .with_initial_delay(Duration::from_millis(250))
424 .with_multiplier(3.0)
425 .with_max_delay(Duration::from_secs(2))
426 .with_jitter(2.0);
427
428 assert_eq!(p.initial_delay, Duration::from_millis(250));
429 assert_eq!(p.multiplier, 3.0);
430 assert_eq!(p.max_delay, Duration::from_secs(2));
431 assert_eq!(p.jitter_factor, 1.0);
432 }
433
434 #[test]
435 fn test_with_jitter_clamps_low_bound() {
436 let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
437 assert_eq!(p.jitter_factor, 0.0);
438 }
439
440 #[test]
441 fn test_delay_for_exponential_growth_and_cap() {
442 let p = RedeliveryPolicy::new(3)
443 .with_initial_delay(Duration::from_millis(100))
444 .with_multiplier(2.0)
445 .with_max_delay(Duration::from_millis(250));
446
447 assert_eq!(p.delay_for(0), Duration::from_millis(100));
448 assert_eq!(p.delay_for(1), Duration::from_millis(200));
449 assert_eq!(p.delay_for(2), Duration::from_millis(250));
450 assert_eq!(p.delay_for(20), Duration::from_millis(250));
451 }
452
453 #[test]
454 fn test_exception_policy_builder_backoff_and_jitter() {
455 let config = ErrorHandlerConfig::log_only()
456 .on_exception(|e| matches!(e, CamelError::Io(_)))
457 .retry(4)
458 .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
459 .with_jitter(1.5)
460 .build();
461
462 let retry = config.policies[0].retry.as_ref().unwrap();
463 assert_eq!(retry.max_attempts, 4);
464 assert_eq!(retry.initial_delay, Duration::from_millis(10));
465 assert_eq!(retry.multiplier, 1.5);
466 assert_eq!(retry.max_delay, Duration::from_millis(40));
467 assert_eq!(retry.jitter_factor, 1.0);
468 }
469
470 #[test]
471 fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
472 let config = ErrorHandlerConfig::log_only()
473 .on_exception(|_| true)
474 .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
475 .with_jitter(0.8)
476 .build();
477
478 assert!(config.policies[0].retry.is_none());
479 }
480
481 #[test]
482 fn test_exception_policy_clone_preserves_behavior_and_fields() {
483 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
484 let mut configured = policy;
485 configured.retry = Some(RedeliveryPolicy::new(2));
486 configured.handled_by = Some("log:route-errors".to_string());
487
488 let cloned = configured.clone();
489 assert!((cloned.matches)(&CamelError::RouteError("x".into())));
490 assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
491 assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
492 }
493
494 #[test]
495 fn test_delay_for_respects_max_delay_with_jitter() {
496 let policy = RedeliveryPolicy::new(5)
497 .with_initial_delay(Duration::from_millis(200))
498 .with_multiplier(10.0)
499 .with_max_delay(Duration::from_millis(500))
500 .with_jitter(0.2);
501
502 for _ in 0..30 {
503 let delay = policy.delay_for(4);
504 assert!(delay <= Duration::from_millis(600));
505 assert!(delay >= Duration::from_millis(400));
506 }
507 }
508
509 #[test]
510 fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
511 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
512 .on_exception(|e| matches!(e, CamelError::Io(_)))
513 .retry(1)
514 .build()
515 .on_exception(|e| matches!(e, CamelError::RouteError(_)))
516 .handled_by("log:routes")
517 .build();
518
519 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
520 assert_eq!(config.policies.len(), 2);
521 assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
522 assert!((config.policies[1].matches)(&CamelError::RouteError(
523 "x".into()
524 )));
525 }
526
527 #[test]
528 fn test_backoff_without_retry_does_not_create_retry_config() {
529 let config = ErrorHandlerConfig::log_only()
530 .on_exception(|_| true)
531 .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
532 .build();
533
534 assert!(config.policies[0].retry.is_none());
535 }
536
537 #[test]
538 fn test_exception_disposition_default_is_propagate() {
539 assert_eq!(
540 ExceptionDisposition::default(),
541 ExceptionDisposition::Propagate
542 );
543 }
544
545 #[test]
546 fn test_exception_policy_new_has_propagate_disposition() {
547 let p = ExceptionPolicy::new(|_| true);
548 assert_eq!(p.disposition, ExceptionDisposition::Propagate);
549 }
550
551 #[test]
552 fn test_policy_id_equality() {
553 assert_eq!(PolicyId(0), PolicyId(0));
554 assert_ne!(PolicyId(0), PolicyId(1));
555 }
556
557 #[test]
558 fn test_builder_continued_sets_disposition() {
559 let cfg = ErrorHandlerConfig::log_only()
560 .on_exception(|_| true)
561 .continued(true)
562 .build();
563 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
564 }
565
566 #[test]
567 fn test_builder_propagate_sets_disposition() {
568 let cfg = ErrorHandlerConfig::log_only()
569 .on_exception(|_| true)
570 .propagate()
571 .build();
572 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
573 }
574
575 #[test]
576 fn test_builder_handled_true_still_works() {
577 let cfg = ErrorHandlerConfig::log_only()
578 .on_exception(|_| true)
579 .handled(true)
580 .build();
581 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
582 }
583}