camel_api/
error_handler.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::CamelError;
5
6#[derive(Debug, Clone)]
8pub struct ExponentialBackoff {
9 pub max_attempts: u32,
10 pub initial_delay: Duration,
11 pub multiplier: f64,
12 pub max_delay: Duration,
13}
14
15impl ExponentialBackoff {
16 pub fn new(max_attempts: u32) -> Self {
18 Self {
19 max_attempts,
20 initial_delay: Duration::from_millis(100),
21 multiplier: 2.0,
22 max_delay: Duration::from_secs(10),
23 }
24 }
25
26 pub fn with_initial_delay(mut self, d: Duration) -> Self {
28 self.initial_delay = d;
29 self
30 }
31
32 pub fn with_multiplier(mut self, m: f64) -> Self {
34 self.multiplier = m;
35 self
36 }
37
38 pub fn with_max_delay(mut self, d: Duration) -> Self {
40 self.max_delay = d;
41 self
42 }
43
44 pub fn delay_for(&self, attempt: u32) -> Duration {
46 let millis = self.initial_delay.as_millis() as f64 * self.multiplier.powi(attempt as i32);
47 let d = Duration::from_millis(millis as u64);
48 d.min(self.max_delay)
49 }
50}
51
52pub struct ExceptionPolicy {
54 pub matches: Arc<dyn Fn(&CamelError) -> bool + Send + Sync>,
56 pub retry: Option<ExponentialBackoff>,
58 pub handled_by: Option<String>,
60}
61
62impl ExceptionPolicy {
63 pub fn new(matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static) -> Self {
65 Self {
66 matches: Arc::new(matches),
67 retry: None,
68 handled_by: None,
69 }
70 }
71}
72
73impl Clone for ExceptionPolicy {
74 fn clone(&self) -> Self {
75 Self {
76 matches: Arc::clone(&self.matches),
77 retry: self.retry.clone(),
78 handled_by: self.handled_by.clone(),
79 }
80 }
81}
82
83#[derive(Clone)]
85pub struct ErrorHandlerConfig {
86 pub dlc_uri: Option<String>,
88 pub policies: Vec<ExceptionPolicy>,
90}
91
92impl ErrorHandlerConfig {
93 pub fn log_only() -> Self {
95 Self {
96 dlc_uri: None,
97 policies: Vec::new(),
98 }
99 }
100
101 pub fn dead_letter_channel(uri: impl Into<String>) -> Self {
103 Self {
104 dlc_uri: Some(uri.into()),
105 policies: Vec::new(),
106 }
107 }
108
109 pub fn on_exception(
111 self,
112 matches: impl Fn(&CamelError) -> bool + Send + Sync + 'static,
113 ) -> ExceptionPolicyBuilder {
114 ExceptionPolicyBuilder {
115 config: self,
116 policy: ExceptionPolicy::new(matches),
117 }
118 }
119}
120
121pub struct ExceptionPolicyBuilder {
123 config: ErrorHandlerConfig,
124 policy: ExceptionPolicy,
125}
126
127impl ExceptionPolicyBuilder {
128 pub fn retry(mut self, max_attempts: u32) -> Self {
130 self.policy.retry = Some(ExponentialBackoff::new(max_attempts));
131 self
132 }
133
134 pub fn with_backoff(mut self, initial: Duration, multiplier: f64, max: Duration) -> Self {
136 if let Some(ref mut b) = self.policy.retry {
137 b.initial_delay = initial;
138 b.multiplier = multiplier;
139 b.max_delay = max;
140 }
141 self
142 }
143
144 pub fn handled_by(mut self, uri: impl Into<String>) -> Self {
146 self.policy.handled_by = Some(uri.into());
147 self
148 }
149
150 pub fn build(mut self) -> ErrorHandlerConfig {
152 self.config.policies.push(self.policy);
153 self.config
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::CamelError;
161 use std::time::Duration;
162
163 #[test]
164 fn test_exponential_backoff_defaults() {
165 let b = ExponentialBackoff::new(3);
166 assert_eq!(b.max_attempts, 3);
167 assert_eq!(b.initial_delay, Duration::from_millis(100));
168 assert_eq!(b.multiplier, 2.0);
169 assert_eq!(b.max_delay, Duration::from_secs(10));
170 }
171
172 #[test]
173 fn test_exception_policy_matches() {
174 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_)));
175 assert!((policy.matches)(&CamelError::ProcessorError("oops".into())));
176 assert!(!(policy.matches)(&CamelError::Io("io".into())));
177 }
178
179 #[test]
180 fn test_error_handler_config_log_only() {
181 let config = ErrorHandlerConfig::log_only();
182 assert!(config.dlc_uri.is_none());
183 assert!(config.policies.is_empty());
184 }
185
186 #[test]
187 fn test_error_handler_config_dlc() {
188 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc");
189 assert_eq!(config.dlc_uri.as_deref(), Some("log:dlc"));
190 }
191
192 #[test]
193 fn test_error_handler_config_with_policy() {
194 let config = ErrorHandlerConfig::dead_letter_channel("log:dlc")
195 .on_exception(|e| matches!(e, CamelError::Io(_)))
196 .retry(2)
197 .handled_by("log:io-errors")
198 .build();
199 assert_eq!(config.policies.len(), 1);
200 let p = &config.policies[0];
201 assert!(p.retry.is_some());
202 assert_eq!(p.retry.as_ref().unwrap().max_attempts, 2);
203 assert_eq!(p.handled_by.as_deref(), Some("log:io-errors"));
204 }
205}