Skip to main content

camel_api/
error_handler.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor};
5
6/// Camel-compatible header names for redelivery state.
7pub const HEADER_REDELIVERED: &str = "CamelRedelivered";
8pub const HEADER_REDELIVERY_COUNTER: &str = "CamelRedeliveryCounter";
9pub const HEADER_REDELIVERY_MAX_COUNTER: &str = "CamelRedeliveryMaxCounter";
10
11/// Redelivery policy with exponential backoff and optional jitter.
12#[derive(Debug, Clone)]
13pub struct RedeliveryPolicy {
14    pub max_attempts: u32,
15    pub initial_delay: Duration,
16    pub multiplier: f64,
17    pub max_delay: Duration,
18    pub jitter_factor: f64,
19}
20
21impl RedeliveryPolicy {
22    /// Create a new policy with default delays (100ms initial, 2x multiplier, 10s max, no jitter).
23    ///
24    /// Note: `max_attempts = 0` means no retries (immediate failure to DLC/handler).
25    /// Use `max_attempts > 0` to enable retry behavior.
26    pub fn new(max_attempts: u32) -> Self {
27        Self {
28            max_attempts,
29            initial_delay: Duration::from_millis(100),
30            multiplier: 2.0,
31            max_delay: Duration::from_secs(10),
32            jitter_factor: 0.0,
33        }
34    }
35
36    /// Override the initial delay before the first retry.
37    pub fn with_initial_delay(mut self, d: Duration) -> Self {
38        self.initial_delay = d;
39        self
40    }
41
42    /// Override the backoff multiplier applied after each attempt.
43    pub fn with_multiplier(mut self, m: f64) -> Self {
44        self.multiplier = m;
45        self
46    }
47
48    /// Cap the maximum delay between retries.
49    pub fn with_max_delay(mut self, d: Duration) -> Self {
50        self.max_delay = d;
51        self
52    }
53
54    /// Set jitter factor (0.0 = no jitter, 0.2 = ±20% randomization).
55    ///
56    /// Recommended values: 0.1-0.3 (10-30%) for most use cases.
57    /// Helps prevent thundering herd problems in distributed systems
58    /// by adding randomization to retry timing.
59    pub fn with_jitter(mut self, j: f64) -> Self {
60        self.jitter_factor = j.clamp(0.0, 1.0);
61        self
62    }
63
64    /// Compute the sleep duration before retry attempt N (0-indexed) with jitter applied.
65    pub fn delay_for(&self, attempt: u32) -> Duration {
66        let base_ms = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
67        let capped_ms = base_ms.min(self.max_delay.as_millis() as f64);
68
69        if self.jitter_factor > 0.0 {
70            let jitter = capped_ms * self.jitter_factor * (rand::random::<f64>() * 2.0 - 1.0);
71            Duration::from_millis((capped_ms + jitter).max(0.0) as u64)
72        } else {
73            Duration::from_millis(capped_ms as u64)
74        }
75    }
76}
77
78/// Disposition for an exception handler or catch clause (ADR-0019).
79///
80/// # YAML casing
81///
82/// serde uses `lowercase` casing: `handled`, `propagate`, `continued`.
83///
84/// # Default divergence
85///
86/// `ExceptionDisposition::default()` returns `Propagate` (first variant per `#[derive(Default)]`),
87/// but the YAML + builder layers default to `Handled` for doTry catch clauses
88/// (via `default_handled_disposition()` in `camel-dsl/src/yaml_ast.rs` and the `DoCatchBuilder::do_catch_exception` constructor).
89/// Direct struct construction should explicitly set disposition rather than relying on `Default`.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, serde::Deserialize)]
91#[serde(rename_all = "lowercase")]
92pub enum ExceptionDisposition {
93    /// After retry exhaustion / on_steps / DLC, re-throw to upstream.
94    #[default]
95    Propagate,
96    /// Suppress re-throw. The handler's exchange is the final result.
97    Handled,
98    /// Clear the error. The pipeline continues to the NEXT step.
99    Continued,
100}
101
102/// Opaque identifier for a matched ExceptionPolicy within a RouteErrorHandler.
103/// Index into the policies Vec — valid only within the handler that created it.
104/// If the policies vec is reordered or filtered, indices become stale.
105/// Safe because policies are immutable after handler construction.
106#[derive(Clone, Copy, Debug, PartialEq, Eq)]
107pub struct PolicyId(pub usize);
108
109/// Result of Phase 1 (retry) of error handling.
110pub enum RetryOutcome {
111    /// Retry succeeded — pipeline continues normally.
112    Recovered(Exchange),
113    /// Retries exhausted or no retry configured.
114    Exhausted {
115        exchange: Exchange,
116        error: CamelError,
117        policy: Option<PolicyId>,
118    },
119}
120
121/// Result of Phase 2 (handle) of step error handling.
122/// Handled and Continued MUST have Exchange.error cleared.
123pub enum StepDisposition {
124    Propagate(CamelError),
125    Handled(Exchange),
126    Continued(Exchange),
127}
128
129/// Identifies which boundary gate produced an infrastructure error.
130#[derive(Debug, Clone, Copy, PartialEq, Eq)]
131pub enum BoundaryKind {
132    Security,
133    CircuitBreaker,
134    Readiness,
135}
136
137/// A rule that matches specific errors and defines retry + redirect behaviour.
138pub struct ExceptionPolicy {
139    /// Predicate: returns `true` if this policy applies to the given error.
140    pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
141    /// Optional retry configuration; if absent, no retries are attempted.
142    pub retry: Option<RedeliveryPolicy>,
143    /// Optional URI of a specific endpoint to route failed exchanges to.
144    pub handled_by: Option<String>,
145    /// Optional custom pipeline executed when this policy triggers.
146    pub on_steps: Option<SyncBoxProcessor>,
147    /// What to do after this policy's handler runs.
148    pub disposition: ExceptionDisposition,
149}
150
151impl ExceptionPolicy {
152    /// Create a new policy that matches errors using the given predicate.
153    pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
154        Self {
155            matches: Arc::new(matches),
156            retry: None,
157            handled_by: None,
158            on_steps: None,
159            disposition: ExceptionDisposition::Propagate,
160        }
161    }
162}
163
164impl Clone for ExceptionPolicy {
165    fn clone(&self) -> Self {
166        Self {
167            matches: Arc::clone(&self.matches),
168            retry: self.retry.clone(),
169            handled_by: self.handled_by.clone(),
170            on_steps: self.on_steps.clone(),
171            disposition: self.disposition,
172        }
173    }
174}
175
176/// Full error handler configuration: Dead Letter Channel URI and per-exception policies.
177#[derive(Clone)]
178pub struct ErrorHandlerConfig {
179    /// URI of the Dead Letter Channel endpoint (None = log only).
180    pub dlc_uri: Option<String>,
181    /// Per-exception policies evaluated in order; first match wins.
182    pub policies: Vec<ExceptionPolicy>,
183}
184
185impl ErrorHandlerConfig {
186    /// Log-only error handler: errors are logged but not forwarded anywhere.
187    pub fn log_only() -> Self {
188        Self {
189            dlc_uri: None,
190            policies: Vec::new(),
191        }
192    }
193
194    /// Dead Letter Channel: failed exchanges are forwarded to the given URI.
195    pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
196        Self {
197            dlc_uri: Some(uri.into()),
198            policies: Vec::new(),
199        }
200    }
201
202    /// Start building an `ExceptionPolicy` attached to this config.
203    pub fn on_exception(
204        self,
205        matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
206    ) -> ExceptionPolicyBuilder {
207        ExceptionPolicyBuilder {
208            config: self,
209            policy: ExceptionPolicy::new(matches),
210        }
211    }
212}
213
214/// Builder for a single [`ExceptionPolicy`] attached to an [`ErrorHandlerConfig`].
215pub struct ExceptionPolicyBuilder {
216    config: ErrorHandlerConfig,
217    policy: ExceptionPolicy,
218}
219
220impl ExceptionPolicyBuilder {
221    /// Configure retry with the given maximum number of attempts (exponential backoff defaults).
222    pub fn retry(mut self, max_attempts: u32) -> Self {
223        self.policy.retry = Some(RedeliveryPolicy::new(max_attempts));
224        self
225    }
226
227    /// Override backoff parameters for the retry (call after `.retry()`).
228    pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
229        if let Some(ref mut p) = self.policy.retry {
230            p.initial_delay = initial;
231            p.multiplier = multiplier;
232            p.max_delay = max;
233        }
234        self
235    }
236
237    /// Set jitter factor for retry delays (call after `.retry()`).
238    /// Valid range: 0.0 (no jitter) to 1.0 (±100% randomization).
239    pub fn with_jitter(mut self, jitter_factor: f64) -> Self {
240        if let Some(ref mut p) = self.policy.retry {
241            p.jitter_factor = jitter_factor.clamp(0.0, 1.0);
242        }
243        self
244    }
245
246    /// Route failed exchanges matching this policy to the given URI instead of the DLC.
247    pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
248        self.policy.handled_by = Some(uri.into());
249        self
250    }
251
252    /// Attach a custom pipeline to execute when this policy triggers.
253    pub fn on_steps(mut self, pipeline: BoxProcessor) -> Self {
254        self.policy.on_steps = Some(SyncBoxProcessor::new(pipeline));
255        self
256    }
257
258    /// Mark the exception as handled (suppresses re-throw to upstream).
259    pub fn handled(mut self, handled: bool) -> Self {
260        self.policy.disposition = if handled {
261            ExceptionDisposition::Handled
262        } else {
263            ExceptionDisposition::Propagate
264        };
265        self
266    }
267
268    /// Mark the exception as continued (clear error, pipeline continues to next step).
269    pub fn continued(mut self, continued: bool) -> Self {
270        self.policy.disposition = if continued {
271            ExceptionDisposition::Continued
272        } else {
273            ExceptionDisposition::Propagate
274        };
275        self
276    }
277
278    /// Explicitly set disposition to Propagate (default, no-op).
279    pub fn propagate(mut self) -> Self {
280        self.policy.disposition = ExceptionDisposition::Propagate;
281        self
282    }
283
284    /// Finish this policy and return the updated config.
285    pub fn build(mut self) -> ErrorHandlerConfig {
286        self.config.policies.push(self.policy);
287        self.config
288    }
289}
290
291// Backwards compatibility alias
292#[deprecated(since = "0.1.0", note = "Use `RedeliveryPolicy` instead")]
293pub type ExponentialBackoff = RedeliveryPolicy;
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::BoxProcessor;
299    use crate::CamelError;
300    use std::time::Duration;
301
302    #[test]
303    fn test_redelivery_policy_defaults() {
304        let p = RedeliveryPolicy::new(3);
305        assert_eq!(p.max_attempts, 3);
306        assert_eq!(p.initial_delay, Duration::from_millis(100));
307        assert_eq!(p.multiplier, 2.0);
308        assert_eq!(p.max_delay, Duration::from_secs(10));
309        assert_eq!(p.jitter_factor, 0.0);
310    }
311
312    #[test]
313    fn test_exception_policy_matches() {
314        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
315        assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
316        assert!(!(policy.matches)(&CamelError::Io("io".into())));
317    }
318
319    #[test]
320    fn test_error_handler_config_log_only() {
321        let config = ErrorHandlerConfig::log_only();
322        assert!(config.dlc_uri.is_none());
323        assert!(config.policies.is_empty());
324    }
325
326    #[test]
327    fn test_error_handler_config_dlc() {
328        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
329        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
330    }
331
332    #[test]
333    fn test_error_handler_config_with_policy() {
334        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
335            .on_exception(|e| matches!(e, CamelError::Io(_)))
336            .retry(2)
337            .handled_by("log:io-errors")
338            .build();
339        assert_eq!(config.policies.len(), 1);
340        let p = &config.policies[0];
341        assert!(p.retry.is_some());
342        assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
343        assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
344    }
345
346    #[test]
347    fn test_jitter_applies_randomness() {
348        let policy = RedeliveryPolicy::new(3)
349            .with_initial_delay(Duration::from_millis(100))
350            .with_jitter(0.5);
351
352        let mut delays = std::collections::HashSet::new();
353        for _ in 0..10 {
354            delays.insert(policy.delay_for(0));
355        }
356
357        assert!(delays.len() > 1, "jitter should produce varying delays");
358    }
359
360    #[test]
361    fn test_jitter_stays_within_bounds() {
362        let policy = RedeliveryPolicy::new(3)
363            .with_initial_delay(Duration::from_millis(100))
364            .with_jitter(0.5);
365
366        for _ in 0..100 {
367            let delay = policy.delay_for(0);
368            assert!(
369                delay >= Duration::from_millis(50),
370                "delay too low: {:?}",
371                delay
372            );
373            assert!(
374                delay <= Duration::from_millis(150),
375                "delay too high: {:?}",
376                delay
377            );
378        }
379    }
380
381    #[test]
382    fn test_max_attempts_zero_means_no_retries() {
383        let policy = RedeliveryPolicy::new(0);
384        assert_eq!(policy.max_attempts, 0);
385    }
386
387    #[test]
388    fn test_jitter_zero_produces_exact_delay() {
389        let policy = RedeliveryPolicy::new(3)
390            .with_initial_delay(Duration::from_millis(100))
391            .with_jitter(0.0);
392
393        for _ in 0..10 {
394            let delay = policy.delay_for(0);
395            assert_eq!(delay, Duration::from_millis(100));
396        }
397    }
398
399    #[test]
400    fn test_jitter_one_produces_wide_range() {
401        let policy = RedeliveryPolicy::new(3)
402            .with_initial_delay(Duration::from_millis(100))
403            .with_jitter(1.0);
404
405        for _ in 0..100 {
406            let delay = policy.delay_for(0);
407            assert!(
408                delay >= Duration::from_millis(0),
409                "delay should be >= 0, got {:?}",
410                delay
411            );
412            assert!(
413                delay <= Duration::from_millis(200),
414                "delay should be <= 200ms, got {:?}",
415                delay
416            );
417        }
418    }
419
420    #[test]
421    fn test_redelivery_policy_builder_methods_apply_values() {
422        let p = RedeliveryPolicy::new(5)
423            .with_initial_delay(Duration::from_millis(250))
424            .with_multiplier(3.0)
425            .with_max_delay(Duration::from_secs(2))
426            .with_jitter(2.0);
427
428        assert_eq!(p.initial_delay, Duration::from_millis(250));
429        assert_eq!(p.multiplier, 3.0);
430        assert_eq!(p.max_delay, Duration::from_secs(2));
431        assert_eq!(p.jitter_factor, 1.0);
432    }
433
434    #[test]
435    fn test_with_jitter_clamps_low_bound() {
436        let p = RedeliveryPolicy::new(1).with_jitter(-0.2);
437        assert_eq!(p.jitter_factor, 0.0);
438    }
439
440    #[test]
441    fn test_delay_for_exponential_growth_and_cap() {
442        let p = RedeliveryPolicy::new(3)
443            .with_initial_delay(Duration::from_millis(100))
444            .with_multiplier(2.0)
445            .with_max_delay(Duration::from_millis(250));
446
447        assert_eq!(p.delay_for(0), Duration::from_millis(100));
448        assert_eq!(p.delay_for(1), Duration::from_millis(200));
449        assert_eq!(p.delay_for(2), Duration::from_millis(250));
450        assert_eq!(p.delay_for(20), Duration::from_millis(250));
451    }
452
453    #[test]
454    fn test_exception_policy_builder_backoff_and_jitter() {
455        let config = ErrorHandlerConfig::log_only()
456            .on_exception(|e| matches!(e, CamelError::Io(_)))
457            .retry(4)
458            .with_backoff(Duration::from_millis(10), 1.5, Duration::from_millis(40))
459            .with_jitter(1.5)
460            .build();
461
462        let retry = config.policies[0].retry.as_ref().unwrap();
463        assert_eq!(retry.max_attempts, 4);
464        assert_eq!(retry.initial_delay, Duration::from_millis(10));
465        assert_eq!(retry.multiplier, 1.5);
466        assert_eq!(retry.max_delay, Duration::from_millis(40));
467        assert_eq!(retry.jitter_factor, 1.0);
468    }
469
470    #[test]
471    fn test_exception_policy_builder_no_retry_ignores_backoff_and_jitter() {
472        let config = ErrorHandlerConfig::log_only()
473            .on_exception(|_| true)
474            .with_backoff(Duration::from_secs(1), 9.0, Duration::from_secs(2))
475            .with_jitter(0.8)
476            .build();
477
478        assert!(config.policies[0].retry.is_none());
479    }
480
481    #[test]
482    fn test_exception_policy_clone_preserves_behavior_and_fields() {
483        let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::RouteError(_)));
484        let mut configured = policy;
485        configured.retry = Some(RedeliveryPolicy::new(2));
486        configured.handled_by = Some("log:route-errors".to_string());
487
488        let cloned = configured.clone();
489        assert!((cloned.matches)(&CamelError::RouteError("x".into())));
490        assert_eq!(cloned.retry.as_ref().unwrap().max_attempts, 2);
491        assert_eq!(cloned.handled_by.as_deref(), Some("log:route-errors"));
492    }
493
494    #[test]
495    fn test_delay_for_respects_max_delay_with_jitter() {
496        let policy = RedeliveryPolicy::new(5)
497            .with_initial_delay(Duration::from_millis(200))
498            .with_multiplier(10.0)
499            .with_max_delay(Duration::from_millis(500))
500            .with_jitter(0.2);
501
502        for _ in 0..30 {
503            let delay = policy.delay_for(4);
504            assert!(delay <= Duration::from_millis(600));
505            assert!(delay >= Duration::from_millis(400));
506        }
507    }
508
509    #[test]
510    fn test_exception_policy_builder_keeps_dlc_and_policy_order() {
511        let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
512            .on_exception(|e| matches!(e, CamelError::Io(_)))
513            .retry(1)
514            .build()
515            .on_exception(|e| matches!(e, CamelError::RouteError(_)))
516            .handled_by("log:routes")
517            .build();
518
519        assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
520        assert_eq!(config.policies.len(), 2);
521        assert!((config.policies[0].matches)(&CamelError::Io("x".into())));
522        assert!((config.policies[1].matches)(&CamelError::RouteError(
523            "x".into()
524        )));
525    }
526
527    #[test]
528    fn test_backoff_without_retry_does_not_create_retry_config() {
529        let config = ErrorHandlerConfig::log_only()
530            .on_exception(|_| true)
531            .with_backoff(Duration::from_millis(1), 3.0, Duration::from_millis(9))
532            .build();
533
534        assert!(config.policies[0].retry.is_none());
535    }
536
537    #[test]
538    fn test_exception_disposition_default_is_propagate() {
539        assert_eq!(
540            ExceptionDisposition::default(),
541            ExceptionDisposition::Propagate
542        );
543    }
544
545    #[test]
546    fn test_exception_policy_new_has_propagate_disposition() {
547        let p = ExceptionPolicy::new(|_| true);
548        assert_eq!(p.disposition, ExceptionDisposition::Propagate);
549    }
550
551    #[test]
552    fn test_policy_id_equality() {
553        assert_eq!(PolicyId(0), PolicyId(0));
554        assert_ne!(PolicyId(0), PolicyId(1));
555    }
556
557    #[test]
558    fn test_builder_continued_sets_disposition() {
559        let cfg = ErrorHandlerConfig::log_only()
560            .on_exception(|_| true)
561            .continued(true)
562            .build();
563        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Continued);
564    }
565
566    #[test]
567    fn test_builder_propagate_sets_disposition() {
568        let cfg = ErrorHandlerConfig::log_only()
569            .on_exception(|_| true)
570            .propagate()
571            .build();
572        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Propagate);
573    }
574
575    #[test]
576    fn test_builder_handled_true_still_works() {
577        let cfg = ErrorHandlerConfig::log_only()
578            .on_exception(|_| true)
579            .handled(true)
580            .build();
581        assert_eq!(cfg.policies[0].disposition, ExceptionDisposition::Handled);
582    }
583}