1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor};
5
6pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14 pub max_attempts: u32,
15 pub initial_delay: Duration,
16 pub multiplier: f64,
17 pub max_delay: Duration,
18 pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn new(max_attempts: u32) -> Self {
27 Self {
28 max_attempts,
29 initial_delay: Duration::from_millis(100),
30 multiplier: 2.0,
31 max_delay: Duration::from_secs(10),
32 jitter_factor: 0.0,
33 }
34 }
35
36 pub fn with_initial_delay(mut self, d: Duration) -> Self {
38 self.initial_delay = d;
39 self
40 }
41
42 pub fn with_multiplier(mut self, m: f64) -> Self {
44 self.multiplier = m;
45 self
46 }
47
48 pub fn with_max_delay(mut self, d: Duration) -> Self {
50 self.max_delay = d;
51 self
52 }
53
54 pub fn with_jitter(mut self, j: f64) -> Self {
60 self.jitter_factor = j.clamp(0.0, 1.0);
61 self
62 }
63
64 pub fn delay_for(&self, attempt: u32) -> Duration {
66 let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69 if self.jitter_factor > 0.0 {
70 let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71 Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72 } else {
73 Duration::from_millis(capped_ms as u64)
74 }
75 }
76}
77
78#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
92#[serde(rename_all = "lowercase")]
93pub enum ExceptionDisposition {
94 #[default]
96 Propagate,
97 Handled,
99 Continued,
101}
102
103#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub struct PolicyId(pub usize);
109
110#[derive(Debug)]
112pub enum RetryOutcome {
113 Recovered(Exchange),
115 Stopped(Exchange),
119 Exhausted {
121 exchange: Exchange,
122 error: CamelError,
123 policy: Option<PolicyId>,
124 },
125}
126
127pub trait RetryableStep: Send {
133 fn invoke<'a>(
134 &'a mut self,
135 exchange: Exchange,
136 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>;
137}
138
139impl RetryableStep for crate::BoxProcessor {
144 fn invoke<'a>(
145 &'a mut self,
146 exchange: Exchange,
147 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>> {
148 use tower::ServiceExt;
149 Box::pin(async move {
150 match self.ready().await {
151 Ok(ready_svc) => match tower::Service::call(ready_svc, exchange).await {
152 Ok(ex) => PipelineOutcome::Completed(ex),
153 Err(err) => PipelineOutcome::Failed(err),
154 },
155 Err(err) => PipelineOutcome::Failed(err),
156 }
157 })
158 }
159}
160
161pub enum StepDisposition {
164 Propagate(CamelError),
165 Handled(Exchange),
166 Continued(Exchange),
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum BoundaryKind {
172 Security,
173 CircuitBreaker,
174 Readiness,
175}
176
177pub struct ExceptionPolicy {
179 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
181 pub retry: Option<RedeliveryPolicy>,
183 pub handled_by: Option<String>,
185 pub on_steps: Option<SyncBoxProcessor>,
187 pub disposition: ExceptionDisposition,
189}
190
191impl ExceptionPolicy {
192 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
194 Self {
195 matches: Arc::new(matches),
196 retry: None,
197 handled_by: None,
198 on_steps: None,
199 disposition: ExceptionDisposition::Propagate,
200 }
201 }
202}
203
204impl Clone for ExceptionPolicy {
205 fn clone(&self) -> Self {
206 Self {
207 matches: Arc::clone(&self.matches),
208 retry: self.retry.clone(),
209 handled_by: self.handled_by.clone(),
210 on_steps: self.on_steps.clone(),
211 disposition: self.disposition,
212 }
213 }
214}
215
216#[derive(Clone)]
218pub struct ErrorHandlerConfig {
219 pub dlc_uri: Option<String>,
221 pub policies: Vec<ExceptionPolicy>,
223 pub use_original_message: bool,
226}
227
228impl ErrorHandlerConfig {
229 pub fn log_only() -> Self {
231 Self {
232 dlc_uri: None,
233 policies: Vec::new(),
234 use_original_message: false,
235 }
236 }
237
238 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
240 Self {
241 dlc_uri: Some(uri.into()),
242 policies: Vec::new(),
243 use_original_message: false,
244 }
245 }
246
247 pub fn on_exception(
249 self,
250 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
251 ) -> ExceptionPolicyBuilder {
252 ExceptionPolicyBuilder {
253 config: self,
254 policy: ExceptionPolicy::new(matches),
255 }
256 }
257}
258
259pub struct ExceptionPolicyBuilder {
261 config: ErrorHandlerConfig,
262 policy: ExceptionPolicy,
263}
264
265impl ExceptionPolicyBuilder {
266 pub fn retry(mut self, max_attempts: u32) -> Self {
268 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
269 self
270 }
271
272 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
274 if let Some(ref mut p) = self.policy.retry {
275 p.initial_delay = initial;
276 p.multiplier = multiplier;
277 p.max_delay = max;
278 }
279 self
280 }
281
282 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
285 if let Some(ref mut p) = self.policy.retry {
286 p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
287 }
288 self
289 }
290
291 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
293 self.policy.handled_by = Some(uri.into());
294 self
295 }
296
297 pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
299 self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
300 self
301 }
302
303 pub fn handled(mut self, handled: bool) -> Self {
305 self.policy.disposition = if handled {
306 ExceptionDisposition::Handled
307 } else {
308 ExceptionDisposition::Propagate
309 };
310 self
311 }
312
313 pub fn continued(mut self, continued: bool) -> Self {
315 self.policy.disposition = if continued {
316 ExceptionDisposition::Continued
317 } else {
318 ExceptionDisposition::Propagate
319 };
320 self
321 }
322
323 pub fn propagate(mut self) -> Self {
325 self.policy.disposition = ExceptionDisposition::Propagate;
326 self
327 }
328
329 pub fn build(mut self) -> ErrorHandlerConfig {
331 self.config.policies.push(self.policy);
332 self.config
333 }
334}
335
336#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
338pub type ExponentialBackoff = RedeliveryPolicy;
339
340#[cfg(test)]
341mod retryable_step_tests {
342 use super::*;
343 use crate::{BoxProcessor, CamelError, Exchange, Message, PipelineOutcome};
344 use std::future::Future;
345 use std::pin::Pin;
346 use std::sync::Arc;
347 use std::sync::atomic::{AtomicUsize, Ordering};
348
349 struct CountingProcessor {
350 call_count: Arc<AtomicUsize>,
351 succeed: bool,
352 }
353
354 impl tower::Service<Exchange> for CountingProcessor {
355 type Response = Exchange;
356 type Error = CamelError;
357 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
358
359 fn poll_ready(
360 &mut self,
361 _cx: &mut std::task::Context<'_>,
362 ) -> std::task::Poll<Result<(), Self::Error>> {
363 std::task::Poll::Ready(Ok(()))
364 }
365
366 fn call(&mut self, exchange: Exchange) -> Self::Future {
367 let count = self.call_count.clone();
368 let succeed = self.succeed;
369 Box::pin(async move {
370 count.fetch_add(1, Ordering::SeqCst);
371 if succeed {
372 Ok(exchange)
373 } else {
374 Err(CamelError::ProcessorError("fail".into()))
375 }
376 })
377 }
378 }
379
380 impl Clone for CountingProcessor {
381 fn clone(&self) -> Self {
382 Self {
383 call_count: self.call_count.clone(),
384 succeed: self.succeed,
385 }
386 }
387 }
388
389 #[tokio::test]
390 async fn boxprocessor_adapter_maps_ok_to_completed() {
391 let count = Arc::new(AtomicUsize::new(0));
392 let processor = CountingProcessor {
393 call_count: count.clone(),
394 succeed: true,
395 };
396 let bp: BoxProcessor = BoxProcessor::new(processor);
397 let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
398 let ex = Exchange::new(Message::new("hello"));
399 let outcome = retryable.invoke(ex).await;
400 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
401 assert_eq!(count.load(Ordering::SeqCst), 1);
402 }
403
404 #[tokio::test]
405 async fn boxprocessor_adapter_maps_err_to_failed() {
406 let processor = CountingProcessor {
407 call_count: Arc::new(AtomicUsize::new(0)),
408 succeed: false,
409 };
410 let bp: BoxProcessor = BoxProcessor::new(processor);
411 let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
412 let ex = Exchange::new(Message::new("hello"));
413 let outcome = retryable.invoke(ex).await;
414 assert!(matches!(outcome, PipelineOutcome::Failed(_)));
415 }
416
417 #[tokio::test]
418 async fn boxprocessor_readiness_error_propagates_to_failed() {
419 struct AlwaysNotReady;
420
421 impl tower::Service<Exchange> for AlwaysNotReady {
422 type Response = Exchange;
423 type Error = CamelError;
424 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
425
426 fn poll_ready(
427 &mut self,
428 _cx: &mut std::task::Context<'_>,
429 ) -> std::task::Poll<Result<(), Self::Error>> {
430 std::task::Poll::Ready(Err(CamelError::ProcessorError(
431 "readiness failed: consumer closed".into(),
432 )))
433 }
434
435 fn call(&mut self, _ex: Exchange) -> Self::Future {
436 Box::pin(async {
437 unreachable!("call() must not be reached when poll_ready errors")
438 })
439 }
440 }
441
442 impl Clone for AlwaysNotReady {
443 fn clone(&self) -> Self {
444 AlwaysNotReady
445 }
446 }
447
448 let bp: BoxProcessor = BoxProcessor::new(AlwaysNotReady);
449 let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
450 let ex = Exchange::new(Message::new("hello"));
451 let outcome = retryable.invoke(ex).await;
452 match outcome {
453 PipelineOutcome::Failed(err) => {
454 let msg = err.to_string();
455 assert!(
456 msg.contains("readiness") || msg.contains("consumer"),
457 "readiness error message should be preserved, got: {msg}"
458 );
459 }
460 other => panic!(
461 "readiness failure must map to PipelineOutcome::Failed, got {:?}",
462 other
463 ),
464 }
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use crate::BoxProcessor;
472 use crate::CamelError;
473 use std::time::Duration;
474
475 #[test]
476 fn test_redelivery_policy_defaults() {
477 let p = RedeliveryPolicy::new(3);
478 assert_eq!(p.max_attempts, 3);
479 assert_eq!(p.initial_delay, Duration::from_millis(100));
480 assert_eq!(p.multiplier, 2.0);
481 assert_eq!(p.max_delay, Duration::from_secs(10));
482 assert_eq!(p.jitter_factor, 0.0);
483 }
484
485 #[test]
486 fn test_exception_policy_matches() {
487 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
488 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
489 assert!(!(policy.matches)(&CamelError::Io("io".into())));
490 }
491
492 #[test]
493 fn test_error_handler_config_log_only() {
494 let config = ErrorHandlerConfig::log_only();
495 assert!(config.dlc_uri.is_none());
496 assert!(config.policies.is_empty());
497 }
498
499 #[test]
500 fn test_error_handler_config_dlc() {
501 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
502 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
503 }
504
505 #[test]
506 fn test_error_handler_config_with_policy() {
507 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
508 .on_exception(|e| matches!(e, CamelError::Io(_)))
509 .retry(2)
510 .handled_by("log:io-errors")
511 .build();
512 assert_eq!(config.policies.len(), 1);
513 let p = &config.policies[0];
514 assert!(p.retry.is_some());
515 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
516 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
517 }
518
519 #[test]
520 fn test_jitter_applies_randomness() {
521 let policy = RedeliveryPolicy::new(3)
522 .with_initial_delay(Duration::from_millis(100))
523 .with_jitter(0.5);
524
525 let mut delays = std::collections::HashSet::new();
526 for _ in 0..10 {
527 delays.insert(policy.delay_for(0));
528 }
529
530 assert!(delays.len() > 1, "jitter should produce varying delays");
531 }
532
533 #[test]
534 fn test_jitter_stays_within_bounds() {
535 let policy = RedeliveryPolicy::new(3)
536 .with_initial_delay(Duration::from_millis(100))
537 .with_jitter(0.5);
538
539 for _ in 0..100 {
540 let delay = policy.delay_for(0);
541 assert!(
542 delay >= Duration::from_millis(50),
543 "delay too low: {:?}",
544 delay
545 );
546 assert!(
547 delay <= Duration::from_millis(150),
548 "delay too high: {:?}",
549 delay
550 );
551 }
552 }
553
554 #[test]
555 fn test_max_attempts_zero_means_no_retries() {
556 let policy = RedeliveryPolicy::new(0);
557 assert_eq!(policy.max_attempts, 0);
558 }
559
560 #[test]
561 fn test_jitter_zero_produces_exact_delay() {
562 let policy = RedeliveryPolicy::new(3)
563 .with_initial_delay(Duration::from_millis(100))
564 .with_jitter(0.0);
565
566 for _ in 0..10 {
567 let delay = policy.delay_for(0);
568 assert_eq!(delay, Duration::from_millis(100));
569 }
570 }
571
572 #[test]
573 fn test_jitter_one_produces_wide_range() {
574 let policy = RedeliveryPolicy::new(3)
575 .with_initial_delay(Duration::from_millis(100))
576 .with_jitter(1.0);
577
578 for _ in 0..100 {
579 let delay = policy.delay_for(0);
580 assert!(
581 delay >= Duration::from_millis(0),
582 "delay should be >= 0, got {:?}",
583 delay
584 );
585 assert!(
586 delay <= Duration::from_millis(200),
587 "delay should be <= 200ms, got {:?}",
588 delay
589 );
590 }
591 }
592
593 #[test]
594 fn test_redelivery_policy_builder_methods_apply_values() {
595 let p = RedeliveryPolicy::new(5)
596 .with_initial_delay(Duration::from_millis(250))
597 .with_multiplier(3.0)
598 .with_max_delay(Duration::from_secs(2))
599 .with_jitter(2.0);
600
601 assert_eq!(p.initial_delay, Duration::from_millis(250));
602 assert_eq!(p.multiplier, 3.0);
603 assert_eq!(p.max_delay, Duration::from_secs(2));
604 assert_eq!(p.jitter_factor, 1.0);
605 }
606
607 #[test]
608 fn test_with_jitter_clamps_low_bound() {
609 let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
610 assert_eq!(p.jitter_factor, 0.0);
611 }
612
613 #[test]
614 fn test_delay_for_exponential_growth_and_cap() {
615 let p = RedeliveryPolicy::new(3)
616 .with_initial_delay(Duration::from_millis(100))
617 .with_multiplier(2.0)
618 .with_max_delay(Duration::from_millis(250));
619
620 assert_eq!(p.delay_for(0), Duration::from_millis(100));
621 assert_eq!(p.delay_for(1), Duration::from_millis(200));
622 assert_eq!(p.delay_for(2), Duration::from_millis(250));
623 assert_eq!(p.delay_for(20), Duration::from_millis(250));
624 }
625
626 #[test]
627 fn test_exception_policy_builder_backoff_and_jitter() {
628 let config = ErrorHandlerConfig::log_only()
629 .on_exception(|e| matches!(e, CamelError::Io(_)))
630 .retry(4)
631 .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
632 .with_jitter(1.5)
633 .build();
634
635 let retry = config.policies[0].retry.as_ref().unwrap();
636 assert_eq!(retry.max_attempts, 4);
637 assert_eq!(retry.initial_delay, Duration::from_millis(10));
638 assert_eq!(retry.multiplier, 1.5);
639 assert_eq!(retry.max_delay, Duration::from_millis(40));
640 assert_eq!(retry.jitter_factor, 1.0);
641 }
642
643 #[test]
644 fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
645 let config = ErrorHandlerConfig::log_only()
646 .on_exception(|_| true)
647 .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
648 .with_jitter(0.8)
649 .build();
650
651 assert!(config.policies[0].retry.is_none());
652 }
653
654 #[test]
655 fn test_exception_policy_clone_preserves_behavior_and_fields() {
656 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
657 let mut configured = policy;
658 configured.retry = Some(RedeliveryPolicy::new(2));
659 configured.handled_by = Some("log:route-errors".to_string());
660
661 let cloned = configured.clone();
662 assert!((cloned.matches)(&CamelError::RouteError("x".into())));
663 assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
664 assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
665 }
666
667 #[test]
668 fn test_delay_for_respects_max_delay_with_jitter() {
669 let policy = RedeliveryPolicy::new(5)
670 .with_initial_delay(Duration::from_millis(200))
671 .with_multiplier(10.0)
672 .with_max_delay(Duration::from_millis(500))
673 .with_jitter(0.2);
674
675 for _ in 0..30 {
676 let delay = policy.delay_for(4);
677 assert!(delay <= Duration::from_millis(600));
678 assert!(delay >= Duration::from_millis(400));
679 }
680 }
681
682 #[test]
683 fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
684 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
685 .on_exception(|e| matches!(e, CamelError::Io(_)))
686 .retry(1)
687 .build()
688 .on_exception(|e| matches!(e, CamelError::RouteError(_)))
689 .handled_by("log:routes")
690 .build();
691
692 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
693 assert_eq!(config.policies.len(), 2);
694 assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
695 assert!((config.policies[1].matches)(&CamelError::RouteError(
696 "x".into()
697 )));
698 }
699
700 #[test]
701 fn test_backoff_without_retry_does_not_create_retry_config() {
702 let config = ErrorHandlerConfig::log_only()
703 .on_exception(|_| true)
704 .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
705 .build();
706
707 assert!(config.policies[0].retry.is_none());
708 }
709
710 #[test]
711 fn test_exception_disposition_default_is_propagate() {
712 assert_eq!(
713 ExceptionDisposition::default(),
714 ExceptionDisposition::Propagate
715 );
716 }
717
718 #[test]
719 fn test_exception_policy_new_has_propagate_disposition() {
720 let p = ExceptionPolicy::new(|_| true);
721 assert_eq!(p.disposition, ExceptionDisposition::Propagate);
722 }
723
724 #[test]
725 fn test_policy_id_equality() {
726 assert_eq!(PolicyId(0), PolicyId(0));
727 assert_ne!(PolicyId(0), PolicyId(1));
728 }
729
730 #[test]
731 fn test_builder_continued_sets_disposition() {
732 let cfg = ErrorHandlerConfig::log_only()
733 .on_exception(|_| true)
734 .continued(true)
735 .build();
736 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
737 }
738
739 #[test]
740 fn test_builder_propagate_sets_disposition() {
741 let cfg = ErrorHandlerConfig::log_only()
742 .on_exception(|_| true)
743 .propagate()
744 .build();
745 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
746 }
747
748 #[test]
749 fn test_builder_handled_true_still_works() {
750 let cfg = ErrorHandlerConfig::log_only()
751 .on_exception(|_| true)
752 .handled(true)
753 .build();
754 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
755 }
756}