1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor};
5
6pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14 pub max_attempts: u32,
15 pub initial_delay: Duration,
16 pub multiplier: f64,
17 pub max_delay: Duration,
18 pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22 pub fn new(max_attempts: u32) -> Self {
27 Self {
28 max_attempts,
29 initial_delay: Duration::from_millis(100),
30 multiplier: 2.0,
31 max_delay: Duration::from_secs(10),
32 jitter_factor: 0.0,
33 }
34 }
35
36 pub fn with_initial_delay(mut self, d: Duration) -> Self {
38 self.initial_delay = d;
39 self
40 }
41
42 pub fn with_multiplier(mut self, m: f64) -> Self {
44 self.multiplier = m;
45 self
46 }
47
48 pub fn with_max_delay(mut self, d: Duration) -> Self {
50 self.max_delay = d;
51 self
52 }
53
54 pub fn with_jitter(mut self, j: f64) -> Self {
60 self.jitter_factor = j.clamp(0.0, 1.0);
61 self
62 }
63
64 pub fn delay_for(&self, attempt: u32) -> Duration {
66 let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67 let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69 if self.jitter_factor > 0.0 {
70 let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71 Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72 } else {
73 Duration::from_millis(capped_ms as u64)
74 }
75 }
76}
77
78#[cfg_attr(feature = "schema", derive(schemars::JsonSchema, ts_rs::TS))]
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
92#[serde(rename_all = "lowercase")]
93pub enum ExceptionDisposition {
94 #[default]
96 Propagate,
97 Handled,
99 Continued,
101}
102
103#[derive(Clone, Copy, Debug, PartialEq, Eq)]
108pub struct PolicyId(pub usize);
109
110#[derive(Debug)]
112pub enum RetryOutcome {
113 Recovered(Exchange),
115 Stopped(Exchange),
119 Exhausted {
121 exchange: Exchange,
122 error: CamelError,
123 policy: Option<PolicyId>,
124 },
125}
126
127pub trait RetryableStep: Send {
133 fn invoke<'a>(
134 &'a mut self,
135 exchange: Exchange,
136 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>;
137}
138
139impl RetryableStep for crate::BoxProcessor {
144 fn invoke<'a>(
145 &'a mut self,
146 exchange: Exchange,
147 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>> {
148 use tower::ServiceExt;
149 Box::pin(async move {
150 match self.ready().await {
151 Ok(ready_svc) => match tower::Service::call(ready_svc, exchange).await {
152 Ok(ex) => PipelineOutcome::Completed(ex),
153 Err(err) => PipelineOutcome::Failed(err),
154 },
155 Err(err) => PipelineOutcome::Failed(err),
156 }
157 })
158 }
159}
160
161pub enum StepDisposition {
164 Propagate(CamelError),
165 Handled(Exchange),
166 Continued(Exchange),
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum BoundaryKind {
172 Security,
173 CircuitBreaker,
174 Readiness,
175}
176
177pub struct ExceptionPolicy {
179 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
181 pub retry: Option<RedeliveryPolicy>,
183 pub handled_by: Option<String>,
185 pub on_steps: Option<SyncBoxProcessor>,
187 pub disposition: ExceptionDisposition,
189}
190
191impl ExceptionPolicy {
192 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
194 Self {
195 matches: Arc::new(matches),
196 retry: None,
197 handled_by: None,
198 on_steps: None,
199 disposition: ExceptionDisposition::Propagate,
200 }
201 }
202}
203
204impl Clone for ExceptionPolicy {
205 fn clone(&self) -> Self {
206 Self {
207 matches: Arc::clone(&self.matches),
208 retry: self.retry.clone(),
209 handled_by: self.handled_by.clone(),
210 on_steps: self.on_steps.clone(),
211 disposition: self.disposition,
212 }
213 }
214}
215
216#[derive(Clone)]
218pub struct ErrorHandlerConfig {
219 pub dlc_uri: Option<String>,
221 pub policies: Vec<ExceptionPolicy>,
223}
224
225impl ErrorHandlerConfig {
226 pub fn log_only() -> Self {
228 Self {
229 dlc_uri: None,
230 policies: Vec::new(),
231 }
232 }
233
234 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
236 Self {
237 dlc_uri: Some(uri.into()),
238 policies: Vec::new(),
239 }
240 }
241
242 pub fn on_exception(
244 self,
245 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
246 ) -> ExceptionPolicyBuilder {
247 ExceptionPolicyBuilder {
248 config: self,
249 policy: ExceptionPolicy::new(matches),
250 }
251 }
252}
253
254pub struct ExceptionPolicyBuilder {
256 config: ErrorHandlerConfig,
257 policy: ExceptionPolicy,
258}
259
260impl ExceptionPolicyBuilder {
261 pub fn retry(mut self, max_attempts: u32) -> Self {
263 self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
264 self
265 }
266
267 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
269 if let Some(ref mut p) = self.policy.retry {
270 p.initial_delay = initial;
271 p.multiplier = multiplier;
272 p.max_delay = max;
273 }
274 self
275 }
276
277 pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
280 if let Some(ref mut p) = self.policy.retry {
281 p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
282 }
283 self
284 }
285
286 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
288 self.policy.handled_by = Some(uri.into());
289 self
290 }
291
292 pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
294 self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
295 self
296 }
297
298 pub fn handled(mut self, handled: bool) -> Self {
300 self.policy.disposition = if handled {
301 ExceptionDisposition::Handled
302 } else {
303 ExceptionDisposition::Propagate
304 };
305 self
306 }
307
308 pub fn continued(mut self, continued: bool) -> Self {
310 self.policy.disposition = if continued {
311 ExceptionDisposition::Continued
312 } else {
313 ExceptionDisposition::Propagate
314 };
315 self
316 }
317
318 pub fn propagate(mut self) -> Self {
320 self.policy.disposition = ExceptionDisposition::Propagate;
321 self
322 }
323
324 pub fn build(mut self) -> ErrorHandlerConfig {
326 self.config.policies.push(self.policy);
327 self.config
328 }
329}
330
331#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
333pub type ExponentialBackoff = RedeliveryPolicy;
334
335#[cfg(test)]
336mod retryable_step_tests {
337 use super::*;
338 use crate::{BoxProcessor, CamelError, Exchange, Message, PipelineOutcome};
339 use std::future::Future;
340 use std::pin::Pin;
341 use std::sync::Arc;
342 use std::sync::atomic::{AtomicUsize, Ordering};
343
344 struct CountingProcessor {
345 call_count: Arc<AtomicUsize>,
346 succeed: bool,
347 }
348
349 impl tower::Service<Exchange> for CountingProcessor {
350 type Response = Exchange;
351 type Error = CamelError;
352 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
353
354 fn poll_ready(
355 &mut self,
356 _cx: &mut std::task::Context<'_>,
357 ) -> std::task::Poll<Result<(), Self::Error>> {
358 std::task::Poll::Ready(Ok(()))
359 }
360
361 fn call(&mut self, exchange: Exchange) -> Self::Future {
362 let count = self.call_count.clone();
363 let succeed = self.succeed;
364 Box::pin(async move {
365 count.fetch_add(1, Ordering::SeqCst);
366 if succeed {
367 Ok(exchange)
368 } else {
369 Err(CamelError::ProcessorError("fail".into()))
370 }
371 })
372 }
373 }
374
375 impl Clone for CountingProcessor {
376 fn clone(&self) -> Self {
377 Self {
378 call_count: self.call_count.clone(),
379 succeed: self.succeed,
380 }
381 }
382 }
383
384 #[tokio::test]
385 async fn boxprocessor_adapter_maps_ok_to_completed() {
386 let count = Arc::new(AtomicUsize::new(0));
387 let processor = CountingProcessor {
388 call_count: count.clone(),
389 succeed: true,
390 };
391 let bp: BoxProcessor = BoxProcessor::new(processor);
392 let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
393 let ex = Exchange::new(Message::new("hello"));
394 let outcome = retryable.invoke(ex).await;
395 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
396 assert_eq!(count.load(Ordering::SeqCst), 1);
397 }
398
399 #[tokio::test]
400 async fn boxprocessor_adapter_maps_err_to_failed() {
401 let processor = CountingProcessor {
402 call_count: Arc::new(AtomicUsize::new(0)),
403 succeed: false,
404 };
405 let bp: BoxProcessor = BoxProcessor::new(processor);
406 let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
407 let ex = Exchange::new(Message::new("hello"));
408 let outcome = retryable.invoke(ex).await;
409 assert!(matches!(outcome, PipelineOutcome::Failed(_)));
410 }
411
412 #[tokio::test]
413 async fn boxprocessor_readiness_error_propagates_to_failed() {
414 struct AlwaysNotReady;
415
416 impl tower::Service<Exchange> for AlwaysNotReady {
417 type Response = Exchange;
418 type Error = CamelError;
419 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
420
421 fn poll_ready(
422 &mut self,
423 _cx: &mut std::task::Context<'_>,
424 ) -> std::task::Poll<Result<(), Self::Error>> {
425 std::task::Poll::Ready(Err(CamelError::ProcessorError(
426 "readiness failed: consumer closed".into(),
427 )))
428 }
429
430 fn call(&mut self, _ex: Exchange) -> Self::Future {
431 Box::pin(async {
432 unreachable!("call() must not be reached when poll_ready errors")
433 })
434 }
435 }
436
437 impl Clone for AlwaysNotReady {
438 fn clone(&self) -> Self {
439 AlwaysNotReady
440 }
441 }
442
443 let bp: BoxProcessor = BoxProcessor::new(AlwaysNotReady);
444 let mut retryable: Box<dyn RetryableStep> = Box::new(bp);
445 let ex = Exchange::new(Message::new("hello"));
446 let outcome = retryable.invoke(ex).await;
447 match outcome {
448 PipelineOutcome::Failed(err) => {
449 let msg = err.to_string();
450 assert!(
451 msg.contains("readiness") || msg.contains("consumer"),
452 "readiness error message should be preserved, got: {msg}"
453 );
454 }
455 other => panic!(
456 "readiness failure must map to PipelineOutcome::Failed, got {:?}",
457 other
458 ),
459 }
460 }
461}
462
463#[cfg(test)]
464mod tests {
465 use super::*;
466 use crate::BoxProcessor;
467 use crate::CamelError;
468 use std::time::Duration;
469
470 #[test]
471 fn test_redelivery_policy_defaults() {
472 let p = RedeliveryPolicy::new(3);
473 assert_eq!(p.max_attempts, 3);
474 assert_eq!(p.initial_delay, Duration::from_millis(100));
475 assert_eq!(p.multiplier, 2.0);
476 assert_eq!(p.max_delay, Duration::from_secs(10));
477 assert_eq!(p.jitter_factor, 0.0);
478 }
479
480 #[test]
481 fn test_exception_policy_matches() {
482 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
483 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
484 assert!(!(policy.matches)(&CamelError::Io("io".into())));
485 }
486
487 #[test]
488 fn test_error_handler_config_log_only() {
489 let config = ErrorHandlerConfig::log_only();
490 assert!(config.dlc_uri.is_none());
491 assert!(config.policies.is_empty());
492 }
493
494 #[test]
495 fn test_error_handler_config_dlc() {
496 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
497 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
498 }
499
500 #[test]
501 fn test_error_handler_config_with_policy() {
502 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
503 .on_exception(|e| matches!(e, CamelError::Io(_)))
504 .retry(2)
505 .handled_by("log:io-errors")
506 .build();
507 assert_eq!(config.policies.len(), 1);
508 let p = &config.policies[0];
509 assert!(p.retry.is_some());
510 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
511 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
512 }
513
514 #[test]
515 fn test_jitter_applies_randomness() {
516 let policy = RedeliveryPolicy::new(3)
517 .with_initial_delay(Duration::from_millis(100))
518 .with_jitter(0.5);
519
520 let mut delays = std::collections::HashSet::new();
521 for _ in 0..10 {
522 delays.insert(policy.delay_for(0));
523 }
524
525 assert!(delays.len() > 1, "jitter should produce varying delays");
526 }
527
528 #[test]
529 fn test_jitter_stays_within_bounds() {
530 let policy = RedeliveryPolicy::new(3)
531 .with_initial_delay(Duration::from_millis(100))
532 .with_jitter(0.5);
533
534 for _ in 0..100 {
535 let delay = policy.delay_for(0);
536 assert!(
537 delay >= Duration::from_millis(50),
538 "delay too low: {:?}",
539 delay
540 );
541 assert!(
542 delay <= Duration::from_millis(150),
543 "delay too high: {:?}",
544 delay
545 );
546 }
547 }
548
549 #[test]
550 fn test_max_attempts_zero_means_no_retries() {
551 let policy = RedeliveryPolicy::new(0);
552 assert_eq!(policy.max_attempts, 0);
553 }
554
555 #[test]
556 fn test_jitter_zero_produces_exact_delay() {
557 let policy = RedeliveryPolicy::new(3)
558 .with_initial_delay(Duration::from_millis(100))
559 .with_jitter(0.0);
560
561 for _ in 0..10 {
562 let delay = policy.delay_for(0);
563 assert_eq!(delay, Duration::from_millis(100));
564 }
565 }
566
567 #[test]
568 fn test_jitter_one_produces_wide_range() {
569 let policy = RedeliveryPolicy::new(3)
570 .with_initial_delay(Duration::from_millis(100))
571 .with_jitter(1.0);
572
573 for _ in 0..100 {
574 let delay = policy.delay_for(0);
575 assert!(
576 delay >= Duration::from_millis(0),
577 "delay should be >= 0, got {:?}",
578 delay
579 );
580 assert!(
581 delay <= Duration::from_millis(200),
582 "delay should be <= 200ms, got {:?}",
583 delay
584 );
585 }
586 }
587
588 #[test]
589 fn test_redelivery_policy_builder_methods_apply_values() {
590 let p = RedeliveryPolicy::new(5)
591 .with_initial_delay(Duration::from_millis(250))
592 .with_multiplier(3.0)
593 .with_max_delay(Duration::from_secs(2))
594 .with_jitter(2.0);
595
596 assert_eq!(p.initial_delay, Duration::from_millis(250));
597 assert_eq!(p.multiplier, 3.0);
598 assert_eq!(p.max_delay, Duration::from_secs(2));
599 assert_eq!(p.jitter_factor, 1.0);
600 }
601
602 #[test]
603 fn test_with_jitter_clamps_low_bound() {
604 let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
605 assert_eq!(p.jitter_factor, 0.0);
606 }
607
608 #[test]
609 fn test_delay_for_exponential_growth_and_cap() {
610 let p = RedeliveryPolicy::new(3)
611 .with_initial_delay(Duration::from_millis(100))
612 .with_multiplier(2.0)
613 .with_max_delay(Duration::from_millis(250));
614
615 assert_eq!(p.delay_for(0), Duration::from_millis(100));
616 assert_eq!(p.delay_for(1), Duration::from_millis(200));
617 assert_eq!(p.delay_for(2), Duration::from_millis(250));
618 assert_eq!(p.delay_for(20), Duration::from_millis(250));
619 }
620
621 #[test]
622 fn test_exception_policy_builder_backoff_and_jitter() {
623 let config = ErrorHandlerConfig::log_only()
624 .on_exception(|e| matches!(e, CamelError::Io(_)))
625 .retry(4)
626 .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
627 .with_jitter(1.5)
628 .build();
629
630 let retry = config.policies[0].retry.as_ref().unwrap();
631 assert_eq!(retry.max_attempts, 4);
632 assert_eq!(retry.initial_delay, Duration::from_millis(10));
633 assert_eq!(retry.multiplier, 1.5);
634 assert_eq!(retry.max_delay, Duration::from_millis(40));
635 assert_eq!(retry.jitter_factor, 1.0);
636 }
637
638 #[test]
639 fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
640 let config = ErrorHandlerConfig::log_only()
641 .on_exception(|_| true)
642 .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
643 .with_jitter(0.8)
644 .build();
645
646 assert!(config.policies[0].retry.is_none());
647 }
648
649 #[test]
650 fn test_exception_policy_clone_preserves_behavior_and_fields() {
651 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
652 let mut configured = policy;
653 configured.retry = Some(RedeliveryPolicy::new(2));
654 configured.handled_by = Some("log:route-errors".to_string());
655
656 let cloned = configured.clone();
657 assert!((cloned.matches)(&CamelError::RouteError("x".into())));
658 assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
659 assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
660 }
661
662 #[test]
663 fn test_delay_for_respects_max_delay_with_jitter() {
664 let policy = RedeliveryPolicy::new(5)
665 .with_initial_delay(Duration::from_millis(200))
666 .with_multiplier(10.0)
667 .with_max_delay(Duration::from_millis(500))
668 .with_jitter(0.2);
669
670 for _ in 0..30 {
671 let delay = policy.delay_for(4);
672 assert!(delay <= Duration::from_millis(600));
673 assert!(delay >= Duration::from_millis(400));
674 }
675 }
676
677 #[test]
678 fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
679 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
680 .on_exception(|e| matches!(e, CamelError::Io(_)))
681 .retry(1)
682 .build()
683 .on_exception(|e| matches!(e, CamelError::RouteError(_)))
684 .handled_by("log:routes")
685 .build();
686
687 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
688 assert_eq!(config.policies.len(), 2);
689 assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
690 assert!((config.policies[1].matches)(&CamelError::RouteError(
691 "x".into()
692 )));
693 }
694
695 #[test]
696 fn test_backoff_without_retry_does_not_create_retry_config() {
697 let config = ErrorHandlerConfig::log_only()
698 .on_exception(|_| true)
699 .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
700 .build();
701
702 assert!(config.policies[0].retry.is_none());
703 }
704
705 #[test]
706 fn test_exception_disposition_default_is_propagate() {
707 assert_eq!(
708 ExceptionDisposition::default(),
709 ExceptionDisposition::Propagate
710 );
711 }
712
713 #[test]
714 fn test_exception_policy_new_has_propagate_disposition() {
715 let p = ExceptionPolicy::new(|_| true);
716 assert_eq!(p.disposition, ExceptionDisposition::Propagate);
717 }
718
719 #[test]
720 fn test_policy_id_equality() {
721 assert_eq!(PolicyId(0), PolicyId(0));
722 assert_ne!(PolicyId(0), PolicyId(1));
723 }
724
725 #[test]
726 fn test_builder_continued_sets_disposition() {
727 let cfg = ErrorHandlerConfig::log_only()
728 .on_exception(|_| true)
729 .continued(true)
730 .build();
731 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
732 }
733
734 #[test]
735 fn test_builder_propagate_sets_disposition() {
736 let cfg = ErrorHandlerConfig::log_only()
737 .on_exception(|_| true)
738 .propagate()
739 .build();
740 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
741 }
742
743 #[test]
744 fn test_builder_handled_true_still_works() {
745 let cfg = ErrorHandlerConfig::log_only()
746 .on_exception(|_| true)
747 .handled(true)
748 .build();
749 assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
750 }
751}