1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
9};
10use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
11
12async fn execute_on_steps(
13 original: Exchange,
14 original_err: CamelError,
15 on_steps: &SyncBoxProcessor,
16 handled: bool,
17 handler: Option<BoxProcessor>,
18) -> Result<Exchange, CamelError> {
19 let snapshot = original.clone();
20 let mut ex = original;
21 ex.set_error(original_err.clone());
22 let mut pipeline = on_steps.clone_inner();
23 let step_result = async {
24 let svc = pipeline.ready().await?;
25 svc.call(ex).await
26 }
27 .await;
28
29 match step_result {
30 Ok(mut ex) => {
31 if handled {
32 ex.handle_error();
33 Ok(ex)
34 } else {
35 Err(original_err)
38 }
39 }
40 Err(_) => {
41 tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
42 let mut ex = snapshot;
43 ex.set_error(original_err);
44 send_to_handler(ex, handler).await
45 }
46 }
47}
48
49pub struct ErrorHandlerLayer {
53 dlc_producer: Option<BoxProcessor>,
55 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
57}
58
59impl ErrorHandlerLayer {
60 pub fn new(
62 dlc_producer: Option<BoxProcessor>,
63 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
64 ) -> Self {
65 Self {
66 dlc_producer,
67 policies,
68 }
69 }
70}
71
72impl<S> Layer<S> for ErrorHandlerLayer
73where
74 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
75 S::Future: Send + 'static,
76{
77 type Service = ErrorHandlerService<S>;
78
79 fn layer(&self, inner: S) -> Self::Service {
80 ErrorHandlerService {
81 inner,
82 dlc_producer: self.dlc_producer.clone(),
83 policies: self
84 .policies
85 .iter()
86 .map(|(p, prod)| (p.clone(), prod.clone()))
87 .collect(),
88 }
89 }
90}
91
92pub struct ErrorHandlerService<S> {
97 inner: S,
98 dlc_producer: Option<BoxProcessor>,
99 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
100}
101
102impl<S: Clone> Clone for ErrorHandlerService<S> {
103 fn clone(&self) -> Self {
104 Self {
105 inner: self.inner.clone(),
106 dlc_producer: self.dlc_producer.clone(),
107 policies: self
108 .policies
109 .iter()
110 .map(|(p, prod)| (p.clone(), prod.clone()))
111 .collect(),
112 }
113 }
114}
115
116impl<S> ErrorHandlerService<S>
117where
118 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
119 S::Future: Send + 'static,
120{
121 pub fn new(
123 inner: S,
124 dlc_producer: Option<BoxProcessor>,
125 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
126 ) -> Self {
127 Self {
128 inner,
129 dlc_producer,
130 policies,
131 }
132 }
133}
134
135impl<S> Service<Exchange> for ErrorHandlerService<S>
136where
137 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
138 S::Future: Send + 'static,
139{
140 type Response = Exchange;
141 type Error = CamelError;
142 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
143
144 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145 self.inner.poll_ready(cx)
146 }
147
148 fn call(&mut self, exchange: Exchange) -> Self::Future {
149 let mut inner = self.inner.clone();
150 let dlc = self.dlc_producer.clone();
151 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
152 .policies
153 .iter()
154 .map(|(p, prod)| (p.clone(), prod.clone()))
155 .collect();
156
157 Box::pin(async move {
158 let original = exchange.clone();
159 let result = inner.ready().await?.call(exchange).await;
160
161 let err = match result {
162 Ok(ex) => return Ok(ex),
163 Err(e) => e,
164 };
165
166 if matches!(err, CamelError::Stopped) {
168 return Err(err);
169 }
170
171 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
173
174 if let Some((policy, policy_producer)) = matched {
175 if let Some(ref backoff) = policy.retry {
177 for attempt in 0..backoff.max_attempts {
178 let delay = backoff.delay_for(attempt);
179 tokio::time::sleep(delay).await;
180
181 let mut ex = original.clone();
183 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
184 ex.input.set_header(
185 HEADER_REDELIVERY_COUNTER,
186 Value::Number((attempt + 1).into()),
187 );
188 ex.input.set_header(
189 HEADER_REDELIVERY_MAX_COUNTER,
190 Value::Number(backoff.max_attempts.into()),
191 );
192
193 match inner.ready().await?.call(ex).await {
194 Ok(ex) => return Ok(ex),
195 Err(retry_err) => {
196 if attempt + 1 == backoff.max_attempts {
197 let mut original = original.clone();
199 original
200 .input
201 .set_header(HEADER_REDELIVERED, Value::Bool(true));
202 original.input.set_header(
203 HEADER_REDELIVERY_COUNTER,
204 Value::Number(backoff.max_attempts.into()),
205 );
206 original.input.set_header(
207 HEADER_REDELIVERY_MAX_COUNTER,
208 Value::Number(backoff.max_attempts.into()),
209 );
210 if let Some(ref steps) = policy.on_steps {
211 let handler = policy_producer.clone().or(dlc.clone());
212 return execute_on_steps(
213 original,
214 retry_err,
215 steps,
216 policy.handled,
217 handler,
218 )
219 .await;
220 }
221 original.set_error(retry_err);
222 let handler = policy_producer.or(dlc);
223 return send_to_handler(original, handler).await;
224 }
225 }
226 }
227 }
228 }
229 if let Some(ref steps) = policy.on_steps {
231 let handler = policy_producer.or(dlc);
232 return execute_on_steps(original, err, steps, policy.handled, handler).await;
233 }
234 let mut ex = original.clone();
235 ex.set_error(err);
236 let handler = policy_producer.or(dlc);
237 send_to_handler(ex, handler).await
238 } else {
239 let mut ex = original;
241 ex.set_error(err);
242 send_to_handler(ex, dlc).await
243 }
244 })
245 }
246}
247
248async fn send_to_handler(
249 exchange: Exchange,
250 producer: Option<BoxProcessor>,
251) -> Result<Exchange, CamelError> {
252 match producer {
253 None => {
254 tracing::error!(
256 error = ?exchange.error,
257 "Exchange failed with no error handler configured"
258 );
259 Ok(exchange)
260 }
261 Some(mut prod) => match prod.ready().await {
262 Err(e) => {
263 tracing::error!("DLC/handler not ready: {e}");
265 Ok(exchange)
266 }
267 Ok(svc) => match svc.call(exchange.clone()).await {
268 Ok(ex) => Ok(ex),
269 Err(e) => {
270 tracing::error!("DLC/handler call failed: {e}");
272 Ok(exchange)
274 }
275 },
276 },
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use camel_api::{
284 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
285 error_handler::RedeliveryPolicy,
286 };
287 use std::sync::{
288 Arc,
289 atomic::{AtomicU32, Ordering},
290 };
291 use std::time::Duration;
292 use tower::ServiceExt;
293
294 fn make_exchange() -> Exchange {
295 Exchange::new(Message::new("test"))
296 }
297
298 fn failing_processor() -> BoxProcessor {
299 BoxProcessor::from_fn(|_ex| {
300 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
301 })
302 }
303
304 fn ok_processor() -> BoxProcessor {
305 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
306 }
307
308 fn fail_n_times(n: u32) -> BoxProcessor {
309 let count = Arc::new(AtomicU32::new(0));
310 BoxProcessor::from_fn(move |ex| {
311 let count = Arc::clone(&count);
312 Box::pin(async move {
313 let c = count.fetch_add(1, Ordering::SeqCst);
314 if c < n {
315 Err(CamelError::ProcessorError(format!("attempt {c}")))
316 } else {
317 Ok(ex)
318 }
319 })
320 })
321 }
322
323 #[tokio::test]
324 async fn test_ok_passthrough() {
325 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
326 let result = svc.oneshot(make_exchange()).await;
327 assert!(result.is_ok());
328 assert!(!result.unwrap().has_error());
329 }
330
331 #[tokio::test]
332 async fn test_error_goes_to_dlc() {
333 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
334 let received_clone = Arc::clone(&received);
335 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
336 let r = Arc::clone(&received_clone);
337 Box::pin(async move {
338 r.lock().unwrap().push(ex.clone());
339 Ok(ex)
340 })
341 });
342
343 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
344 let result = svc.oneshot(make_exchange()).await;
345 assert!(result.is_ok());
346 let ex = result.unwrap();
347 assert!(ex.has_error());
348 assert_eq!(received.lock().unwrap().len(), 1);
349 }
350
351 #[tokio::test]
352 async fn test_retry_recovers() {
353 let inner = fail_n_times(2);
354 let policy = ExceptionPolicy {
355 matches: Arc::new(|_| true),
356 retry: Some(RedeliveryPolicy {
357 max_attempts: 3,
358 initial_delay: Duration::from_millis(1),
359 multiplier: 1.0,
360 max_delay: Duration::from_millis(10),
361 jitter_factor: 0.0,
362 }),
363 handled_by: None,
364 on_steps: None,
365 handled: false,
366 };
367 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
368 let result = svc.oneshot(make_exchange()).await;
369 assert!(result.is_ok());
370 assert!(!result.unwrap().has_error());
371 }
372
373 #[tokio::test]
374 async fn test_retry_exhausted_goes_to_dlc() {
375 let inner = fail_n_times(10);
376 let received = Arc::new(std::sync::Mutex::new(0u32));
377 let received_clone = Arc::clone(&received);
378 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
379 let r = Arc::clone(&received_clone);
380 Box::pin(async move {
381 *r.lock().unwrap() += 1;
382 Ok(ex)
383 })
384 });
385 let policy = ExceptionPolicy {
386 matches: Arc::new(|_| true),
387 retry: Some(RedeliveryPolicy {
388 max_attempts: 2,
389 initial_delay: Duration::from_millis(1),
390 multiplier: 1.0,
391 max_delay: Duration::from_millis(10),
392 jitter_factor: 0.0,
393 }),
394 handled_by: None,
395 on_steps: None,
396 handled: false,
397 };
398 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
399 let result = svc.oneshot(make_exchange()).await;
400 assert!(result.is_ok());
401 assert!(result.unwrap().has_error());
402 assert_eq!(*received.lock().unwrap(), 1);
403 }
404
405 #[test]
406 fn test_poll_ready_delegates_to_inner() {
407 use std::sync::atomic::AtomicBool;
408
409 #[derive(Clone)]
411 struct DelayedReadyService {
412 ready: Arc<AtomicBool>,
413 }
414
415 impl Service<Exchange> for DelayedReadyService {
416 type Response = Exchange;
417 type Error = CamelError;
418 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
419
420 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
421 if self.ready.fetch_or(true, Ordering::SeqCst) {
422 Poll::Ready(Ok(()))
424 } else {
425 cx.waker().wake_by_ref();
427 Poll::Pending
428 }
429 }
430
431 fn call(&mut self, ex: Exchange) -> Self::Future {
432 Box::pin(async move { Ok(ex) })
433 }
434 }
435
436 let waker = futures::task::noop_waker();
437 let mut cx = Context::from_waker(&waker);
438
439 let inner = DelayedReadyService {
440 ready: Arc::new(AtomicBool::new(false)),
441 };
442 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
443
444 let first = Pin::new(&mut svc).poll_ready(&mut cx);
446 assert!(first.is_pending(), "expected Pending on first poll_ready");
447
448 let second = Pin::new(&mut svc).poll_ready(&mut cx);
450 assert!(second.is_ready(), "expected Ready on second poll_ready");
451 }
452
453 #[tokio::test]
454 async fn test_no_matching_policy_uses_dlc() {
455 let received = Arc::new(std::sync::Mutex::new(0u32));
456 let received_clone = Arc::clone(&received);
457 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
458 let r = Arc::clone(&received_clone);
459 Box::pin(async move {
460 *r.lock().unwrap() += 1;
461 Ok(ex)
462 })
463 });
464 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
465 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
466 let result = svc.oneshot(make_exchange()).await;
467 assert!(result.is_ok());
468 assert_eq!(*received.lock().unwrap(), 1);
469 }
470
471 #[tokio::test]
472 async fn test_redelivery_headers_are_set() {
473 use camel_api::error_handler::{
474 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
475 RedeliveryPolicy,
476 };
477
478 let inner = fail_n_times(10);
479 let received = Arc::new(std::sync::Mutex::new(None));
480 let received_clone = Arc::clone(&received);
481 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
482 let r = Arc::clone(&received_clone);
483 Box::pin(async move {
484 *r.lock().unwrap() = Some(ex.clone());
485 Ok(ex)
486 })
487 });
488
489 let policy = ExceptionPolicy {
490 matches: Arc::new(|_| true),
491 retry: Some(RedeliveryPolicy {
492 max_attempts: 2,
493 initial_delay: Duration::from_millis(1),
494 multiplier: 1.0,
495 max_delay: Duration::from_millis(10),
496 jitter_factor: 0.0,
497 }),
498 handled_by: None,
499 on_steps: None,
500 handled: false,
501 };
502
503 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
504 let _ = svc.oneshot(make_exchange()).await.unwrap();
505
506 let ex = received.lock().unwrap().take().unwrap();
507 assert_eq!(
508 ex.input.header(HEADER_REDELIVERED),
509 Some(&Value::Bool(true))
510 );
511 assert_eq!(
512 ex.input.header(HEADER_REDELIVERY_COUNTER),
513 Some(&Value::Number(2.into()))
514 );
515 assert_eq!(
516 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
517 Some(&Value::Number(2.into()))
518 );
519 }
520
521 #[tokio::test]
522 async fn test_jitter_produces_varying_delays_in_retry_flow() {
523 use std::time::Instant;
524
525 let inner = fail_n_times(10);
526 let received = Arc::new(std::sync::Mutex::new(None));
527 let received_clone = Arc::clone(&received);
528 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
529 let r = Arc::clone(&received_clone);
530 Box::pin(async move {
531 *r.lock().unwrap() = Some(ex.clone());
532 Ok(ex)
533 })
534 });
535
536 let policy = ExceptionPolicy {
537 matches: Arc::new(|_| true),
538 retry: Some(RedeliveryPolicy {
539 max_attempts: 5,
540 initial_delay: Duration::from_millis(20),
541 multiplier: 1.0,
542 max_delay: Duration::from_millis(100),
543 jitter_factor: 0.5,
544 }),
545 handled_by: None,
546 on_steps: None,
547 handled: false,
548 };
549
550 let start = Instant::now();
551 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
552 let _ = svc.oneshot(make_exchange()).await.unwrap();
553 let elapsed = start.elapsed();
554
555 assert!(
556 received.lock().unwrap().is_some(),
557 "DLC should have received exchange"
558 );
559
560 assert!(
561 elapsed >= Duration::from_millis(50),
562 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
563 );
564
565 assert!(
566 elapsed <= Duration::from_millis(500),
567 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
568 );
569 }
570
571 #[tokio::test]
574 async fn test_stopped_bypasses_error_handler() {
575 let stopped_inner =
576 BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
577
578 let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
580 let dlc_called_clone = Arc::clone(&dlc_called);
581 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
582 dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
583 Box::pin(async move { Ok(ex) })
584 });
585
586 let policy = ExceptionPolicy::new(|_| true); let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
588 let result = svc.oneshot(make_exchange()).await;
589
590 assert!(
592 matches!(result, Err(CamelError::Stopped)),
593 "expected Err(Stopped), got: {:?}",
594 result
595 );
596 assert!(
598 !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
599 "DLC should not be called for Stopped"
600 );
601 }
602
603 #[tokio::test]
604 async fn test_on_steps_handled_true_consumes_error() {
605 use tower::ServiceExt;
606
607 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
608 ex.input.body = camel_api::Body::Bytes("handled".into());
609 async move { Ok(ex) }
610 }));
611 let policy = ExceptionPolicy {
612 matches: Arc::new(|_| true),
613 retry: None,
614 handled_by: None,
615 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
616 handled: true,
617 };
618 let inner = tower::service_fn(|_ex: Exchange| async {
619 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
620 });
621 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
622 let ex = Exchange::default();
623 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
624 assert!(result.error.is_none(), "handled:true should clear error");
625 assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
626 }
627
628 #[tokio::test]
629 async fn test_on_steps_handled_false_propagates_error() {
630 use tower::ServiceExt;
631
632 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
633 ex.input.body = camel_api::Body::Bytes("handled".into());
634 async move { Ok(ex) }
635 }));
636 let policy = ExceptionPolicy {
637 matches: Arc::new(|_| true),
638 retry: None,
639 handled_by: None,
640 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
641 handled: false,
642 };
643 let inner = tower::service_fn(|_ex: Exchange| async {
644 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
645 });
646 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
647 let ex = Exchange::default();
648 let result = svc.ready().await.unwrap().call(ex).await;
649 assert!(result.is_err(), "handled:false should propagate error");
650 }
651
652 #[tokio::test]
653 async fn test_on_steps_handled_true_clears_exception_properties() {
654 use tower::ServiceExt;
655
656 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
657 ex.input.body = camel_api::Body::Bytes("handled".into());
658 async move { Ok(ex) }
659 }));
660 let policy = ExceptionPolicy {
661 matches: Arc::new(|_| true),
662 retry: None,
663 handled_by: None,
664 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
665 handled: true,
666 };
667 let inner = tower::service_fn(|_ex: Exchange| async {
668 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
669 });
670 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
671 let ex = Exchange::default();
672 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
673 assert!(result.error.is_none(), "handled:true should clear error");
674 assert!(
675 result
676 .properties
677 .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
678 .is_none(),
679 "handled:true should clear exception properties"
680 );
681 assert!(
682 result
683 .properties
684 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
685 .is_none(),
686 "handled:true should clear exception kind property"
687 );
688 assert!(
689 result
690 .properties
691 .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
692 .is_none(),
693 "handled:true should clear exception caught property"
694 );
695 }
696}