1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER};
8use camel_api::{BoxProcessor, CamelError, Exchange, Value};
9
10pub struct ErrorHandlerLayer {
14 dlc_producer: Option<BoxProcessor>,
16 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
18}
19
20impl ErrorHandlerLayer {
21 pub fn new(
23 dlc_producer: Option<BoxProcessor>,
24 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
25 ) -> Self {
26 Self {
27 dlc_producer,
28 policies,
29 }
30 }
31}
32
33impl<S> Layer<S> for ErrorHandlerLayer
34where
35 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
36 S::Future: Send + 'static,
37{
38 type Service = ErrorHandlerService<S>;
39
40 fn layer(&self, inner: S) -> Self::Service {
41 ErrorHandlerService {
42 inner,
43 dlc_producer: self.dlc_producer.clone(),
44 policies: self
45 .policies
46 .iter()
47 .map(|(p, prod)| (p.clone(), prod.clone()))
48 .collect(),
49 }
50 }
51}
52
53pub struct ErrorHandlerService<S> {
58 inner: S,
59 dlc_producer: Option<BoxProcessor>,
60 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
61}
62
63impl<S: Clone> Clone for ErrorHandlerService<S> {
64 fn clone(&self) -> Self {
65 Self {
66 inner: self.inner.clone(),
67 dlc_producer: self.dlc_producer.clone(),
68 policies: self
69 .policies
70 .iter()
71 .map(|(p, prod)| (p.clone(), prod.clone()))
72 .collect(),
73 }
74 }
75}
76
77impl<S> ErrorHandlerService<S>
78where
79 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
80 S::Future: Send + 'static,
81{
82 pub fn new(
84 inner: S,
85 dlc_producer: Option<BoxProcessor>,
86 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
87 ) -> Self {
88 Self {
89 inner,
90 dlc_producer,
91 policies,
92 }
93 }
94}
95
96impl<S> Service<Exchange> for ErrorHandlerService<S>
97where
98 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
99 S::Future: Send + 'static,
100{
101 type Response = Exchange;
102 type Error = CamelError;
103 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
104
105 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 self.inner.poll_ready(cx)
107 }
108
109 fn call(&mut self, exchange: Exchange) -> Self::Future {
110 let mut inner = self.inner.clone();
111 let dlc = self.dlc_producer.clone();
112 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
113 .policies
114 .iter()
115 .map(|(p, prod)| (p.clone(), prod.clone()))
116 .collect();
117
118 Box::pin(async move {
119 let original = exchange.clone();
120 let result = inner.ready().await?.call(exchange).await;
121
122 let err = match result {
123 Ok(ex) => return Ok(ex),
124 Err(e) => e,
125 };
126
127 if matches!(err, CamelError::Stopped) {
129 return Err(err);
130 }
131
132 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
134
135 if let Some((policy, policy_producer)) = matched {
136 if let Some(ref backoff) = policy.retry {
138 for attempt in 0..backoff.max_attempts {
139 let delay = backoff.delay_for(attempt);
140 tokio::time::sleep(delay).await;
141
142 let mut ex = original.clone();
144 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
145 ex.input.set_header(HEADER_REDELIVERY_COUNTER, Value::Number((attempt + 1).into()));
146 ex.input.set_header(HEADER_REDELIVERY_MAX_COUNTER, Value::Number(backoff.max_attempts.into()));
147
148 match inner.ready().await?.call(ex).await {
149 Ok(ex) => return Ok(ex),
150 Err(_e) => {
151 if attempt + 1 == backoff.max_attempts {
152 let mut ex = original.clone();
154 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
155 ex.input.set_header(HEADER_REDELIVERY_COUNTER, Value::Number(backoff.max_attempts.into()));
156 ex.input.set_header(HEADER_REDELIVERY_MAX_COUNTER, Value::Number(backoff.max_attempts.into()));
157 ex.set_error(_e);
158 let handler = policy_producer.or(dlc);
159 return send_to_handler(ex, handler).await;
160 }
161 }
162 }
163 }
164 }
165 let mut ex = original.clone();
167 ex.set_error(err);
168 let handler = policy_producer.or(dlc);
169 send_to_handler(ex, handler).await
170 } else {
171 let mut ex = original;
173 ex.set_error(err);
174 send_to_handler(ex, dlc).await
175 }
176 })
177 }
178}
179
180async fn send_to_handler(
181 exchange: Exchange,
182 producer: Option<BoxProcessor>,
183) -> Result<Exchange, CamelError> {
184 match producer {
185 None => {
186 tracing::error!(
187 error = ?exchange.error,
188 "Exchange failed with no error handler configured"
189 );
190 Ok(exchange)
191 }
192 Some(mut prod) => match prod.ready().await {
193 Err(e) => {
194 tracing::error!("DLC/handler not ready: {e}");
195 Ok(exchange)
196 }
197 Ok(svc) => match svc.call(exchange.clone()).await {
198 Ok(ex) => Ok(ex),
199 Err(e) => {
200 tracing::error!("DLC/handler call failed: {e}");
201 Ok(exchange)
203 }
204 },
205 },
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use camel_api::{
213 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, Value,
214 error_handler::RedeliveryPolicy,
215 };
216 use std::sync::{
217 Arc,
218 atomic::{AtomicU32, Ordering},
219 };
220 use std::time::Duration;
221 use tower::ServiceExt;
222
223 fn make_exchange() -> Exchange {
224 Exchange::new(Message::new("test"))
225 }
226
227 fn failing_processor() -> BoxProcessor {
228 BoxProcessor::from_fn(|_ex| {
229 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
230 })
231 }
232
233 fn ok_processor() -> BoxProcessor {
234 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
235 }
236
237 fn fail_n_times(n: u32) -> BoxProcessor {
238 let count = Arc::new(AtomicU32::new(0));
239 BoxProcessor::from_fn(move |ex| {
240 let count = Arc::clone(&count);
241 Box::pin(async move {
242 let c = count.fetch_add(1, Ordering::SeqCst);
243 if c < n {
244 Err(CamelError::ProcessorError(format!("attempt {c}")))
245 } else {
246 Ok(ex)
247 }
248 })
249 })
250 }
251
252 #[tokio::test]
253 async fn test_ok_passthrough() {
254 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
255 let result = svc.oneshot(make_exchange()).await;
256 assert!(result.is_ok());
257 assert!(!result.unwrap().has_error());
258 }
259
260 #[tokio::test]
261 async fn test_error_goes_to_dlc() {
262 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
263 let received_clone = Arc::clone(&received);
264 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
265 let r = Arc::clone(&received_clone);
266 Box::pin(async move {
267 r.lock().unwrap().push(ex.clone());
268 Ok(ex)
269 })
270 });
271
272 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
273 let result = svc.oneshot(make_exchange()).await;
274 assert!(result.is_ok());
275 let ex = result.unwrap();
276 assert!(ex.has_error());
277 assert_eq!(received.lock().unwrap().len(), 1);
278 }
279
280 #[tokio::test]
281 async fn test_retry_recovers() {
282 let inner = fail_n_times(2);
283 let policy = ExceptionPolicy {
284 matches: Arc::new(|_| true),
285 retry: Some(RedeliveryPolicy {
286 max_attempts: 3,
287 initial_delay: Duration::from_millis(1),
288 multiplier: 1.0,
289 max_delay: Duration::from_millis(10),
290 jitter_factor: 0.0,
291 }),
292 handled_by: None,
293 };
294 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
295 let result = svc.oneshot(make_exchange()).await;
296 assert!(result.is_ok());
297 assert!(!result.unwrap().has_error());
298 }
299
300 #[tokio::test]
301 async fn test_retry_exhausted_goes_to_dlc() {
302 let inner = fail_n_times(10);
303 let received = Arc::new(std::sync::Mutex::new(0u32));
304 let received_clone = Arc::clone(&received);
305 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
306 let r = Arc::clone(&received_clone);
307 Box::pin(async move {
308 *r.lock().unwrap() += 1;
309 Ok(ex)
310 })
311 });
312 let policy = ExceptionPolicy {
313 matches: Arc::new(|_| true),
314 retry: Some(RedeliveryPolicy {
315 max_attempts: 2,
316 initial_delay: Duration::from_millis(1),
317 multiplier: 1.0,
318 max_delay: Duration::from_millis(10),
319 jitter_factor: 0.0,
320 }),
321 handled_by: None,
322 };
323 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
324 let result = svc.oneshot(make_exchange()).await;
325 assert!(result.is_ok());
326 assert!(result.unwrap().has_error());
327 assert_eq!(*received.lock().unwrap(), 1);
328 }
329
330 #[test]
331 fn test_poll_ready_delegates_to_inner() {
332 use std::sync::atomic::AtomicBool;
333
334 #[derive(Clone)]
336 struct DelayedReadyService {
337 ready: Arc<AtomicBool>,
338 }
339
340 impl Service<Exchange> for DelayedReadyService {
341 type Response = Exchange;
342 type Error = CamelError;
343 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
344
345 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
346 if self.ready.fetch_or(true, Ordering::SeqCst) {
347 Poll::Ready(Ok(()))
349 } else {
350 cx.waker().wake_by_ref();
352 Poll::Pending
353 }
354 }
355
356 fn call(&mut self, ex: Exchange) -> Self::Future {
357 Box::pin(async move { Ok(ex) })
358 }
359 }
360
361 let waker = futures::task::noop_waker();
362 let mut cx = Context::from_waker(&waker);
363
364 let inner = DelayedReadyService {
365 ready: Arc::new(AtomicBool::new(false)),
366 };
367 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
368
369 let first = Pin::new(&mut svc).poll_ready(&mut cx);
371 assert!(first.is_pending(), "expected Pending on first poll_ready");
372
373 let second = Pin::new(&mut svc).poll_ready(&mut cx);
375 assert!(second.is_ready(), "expected Ready on second poll_ready");
376 }
377
378 #[tokio::test]
379 async fn test_no_matching_policy_uses_dlc() {
380 let received = Arc::new(std::sync::Mutex::new(0u32));
381 let received_clone = Arc::clone(&received);
382 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
383 let r = Arc::clone(&received_clone);
384 Box::pin(async move {
385 *r.lock().unwrap() += 1;
386 Ok(ex)
387 })
388 });
389 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
390 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
391 let result = svc.oneshot(make_exchange()).await;
392 assert!(result.is_ok());
393 assert_eq!(*received.lock().unwrap(), 1);
394 }
395
396 #[tokio::test]
397 async fn test_redelivery_headers_are_set() {
398 use camel_api::error_handler::{RedeliveryPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER};
399
400 let inner = fail_n_times(10);
401 let received = Arc::new(std::sync::Mutex::new(None));
402 let received_clone = Arc::clone(&received);
403 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
404 let r = Arc::clone(&received_clone);
405 Box::pin(async move {
406 *r.lock().unwrap() = Some(ex.clone());
407 Ok(ex)
408 })
409 });
410
411 let policy = ExceptionPolicy {
412 matches: Arc::new(|_| true),
413 retry: Some(RedeliveryPolicy {
414 max_attempts: 2,
415 initial_delay: Duration::from_millis(1),
416 multiplier: 1.0,
417 max_delay: Duration::from_millis(10),
418 jitter_factor: 0.0,
419 }),
420 handled_by: None,
421 };
422
423 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
424 let _ = svc.oneshot(make_exchange()).await.unwrap();
425
426 let ex = received.lock().unwrap().take().unwrap();
427 assert_eq!(ex.input.header(HEADER_REDELIVERED), Some(&Value::Bool(true)));
428 assert_eq!(ex.input.header(HEADER_REDELIVERY_COUNTER), Some(&Value::Number(2.into())));
429 assert_eq!(ex.input.header(HEADER_REDELIVERY_MAX_COUNTER), Some(&Value::Number(2.into())));
430 }
431
432 #[tokio::test]
433 async fn test_jitter_produces_varying_delays_in_retry_flow() {
434 use std::time::Instant;
435
436 let inner = fail_n_times(10);
437 let received = Arc::new(std::sync::Mutex::new(None));
438 let received_clone = Arc::clone(&received);
439 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
440 let r = Arc::clone(&received_clone);
441 Box::pin(async move {
442 *r.lock().unwrap() = Some(ex.clone());
443 Ok(ex)
444 })
445 });
446
447 let policy = ExceptionPolicy {
448 matches: Arc::new(|_| true),
449 retry: Some(RedeliveryPolicy {
450 max_attempts: 5,
451 initial_delay: Duration::from_millis(20),
452 multiplier: 1.0,
453 max_delay: Duration::from_millis(100),
454 jitter_factor: 0.5,
455 }),
456 handled_by: None,
457 };
458
459 let start = Instant::now();
460 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
461 let _ = svc.oneshot(make_exchange()).await.unwrap();
462 let elapsed = start.elapsed();
463
464 assert!(received.lock().unwrap().is_some(), "DLC should have received exchange");
465
466 assert!(
467 elapsed >= Duration::from_millis(50),
468 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
469 );
470
471 assert!(
472 elapsed <= Duration::from_millis(500),
473 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
474 );
475 }
476
477 #[tokio::test]
480 async fn test_stopped_bypasses_error_handler() {
481 let stopped_inner =
482 BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
483
484 let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
486 let dlc_called_clone = Arc::clone(&dlc_called);
487 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
488 dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
489 Box::pin(async move { Ok(ex) })
490 });
491
492 let policy = ExceptionPolicy::new(|_| true); let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
494 let result = svc.oneshot(make_exchange()).await;
495
496 assert!(
498 matches!(result, Err(CamelError::Stopped)),
499 "expected Err(Stopped), got: {:?}",
500 result
501 );
502 assert!(
504 !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
505 "DLC should not be called for Stopped"
506 );
507 }
508}