1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 ExceptionPolicy, HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
9};
10use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
11
12async fn execute_on_steps(
13 original: Exchange,
14 original_err: CamelError,
15 on_steps: &SyncBoxProcessor,
16 handled: bool,
17 handler: Option<BoxProcessor>,
18) -> Result<Exchange, CamelError> {
19 let snapshot = original.clone();
20 let mut ex = original;
21 ex.set_error(original_err.clone());
22 let mut pipeline = on_steps.clone_inner();
23 let step_result = async {
24 let svc = pipeline.ready().await?;
25 svc.call(ex).await
26 }
27 .await;
28
29 match step_result {
30 Ok(mut ex) => {
31 if handled {
32 ex.handle_error();
33 Ok(ex)
34 } else {
35 Err(original_err)
38 }
39 }
40 Err(_) => {
41 tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
42 let mut ex = snapshot;
43 ex.set_error(original_err);
44 send_to_handler(ex, handler).await
45 }
46 }
47}
48
49pub struct ErrorHandlerLayer {
53 dlc_producer: Option<BoxProcessor>,
55 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
57}
58
59impl ErrorHandlerLayer {
60 pub fn new(
62 dlc_producer: Option<BoxProcessor>,
63 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
64 ) -> Self {
65 Self {
66 dlc_producer,
67 policies,
68 }
69 }
70}
71
72impl<S> Layer<S> for ErrorHandlerLayer
73where
74 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
75 S::Future: Send + 'static,
76{
77 type Service = ErrorHandlerService<S>;
78
79 fn layer(&self, inner: S) -> Self::Service {
80 ErrorHandlerService {
81 inner,
82 dlc_producer: self.dlc_producer.clone(),
83 policies: self
84 .policies
85 .iter()
86 .map(|(p, prod)| (p.clone(), prod.clone()))
87 .collect(),
88 }
89 }
90}
91
92pub struct ErrorHandlerService<S> {
97 inner: S,
98 dlc_producer: Option<BoxProcessor>,
99 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
100}
101
102impl<S: Clone> Clone for ErrorHandlerService<S> {
103 fn clone(&self) -> Self {
104 Self {
105 inner: self.inner.clone(),
106 dlc_producer: self.dlc_producer.clone(),
107 policies: self
108 .policies
109 .iter()
110 .map(|(p, prod)| (p.clone(), prod.clone()))
111 .collect(),
112 }
113 }
114}
115
116impl<S> ErrorHandlerService<S>
117where
118 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
119 S::Future: Send + 'static,
120{
121 pub fn new(
123 inner: S,
124 dlc_producer: Option<BoxProcessor>,
125 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
126 ) -> Self {
127 Self {
128 inner,
129 dlc_producer,
130 policies,
131 }
132 }
133}
134
135impl<S> Service<Exchange> for ErrorHandlerService<S>
136where
137 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
138 S::Future: Send + 'static,
139{
140 type Response = Exchange;
141 type Error = CamelError;
142 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
143
144 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145 self.inner.poll_ready(cx)
146 }
147
148 fn call(&mut self, exchange: Exchange) -> Self::Future {
149 let mut inner = self.inner.clone();
150 let dlc = self.dlc_producer.clone();
151 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
152 .policies
153 .iter()
154 .map(|(p, prod)| (p.clone(), prod.clone()))
155 .collect();
156
157 Box::pin(async move {
158 let original = exchange.clone();
159 let result = inner.ready().await?.call(exchange).await;
160
161 let err = match result {
162 Ok(ex) => return Ok(ex),
163 Err(e) => e,
164 };
165
166 if matches!(err, CamelError::Stopped) {
168 return Err(err);
169 }
170
171 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
173
174 if let Some((policy, policy_producer)) = matched {
175 if let Some(ref backoff) = policy.retry {
177 for attempt in 0..backoff.max_attempts {
178 let delay = backoff.delay_for(attempt);
179 tokio::time::sleep(delay).await;
180
181 let mut ex = original.clone();
183 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
184 ex.input.set_header(
185 HEADER_REDELIVERY_COUNTER,
186 Value::Number((attempt + 1).into()),
187 );
188 ex.input.set_header(
189 HEADER_REDELIVERY_MAX_COUNTER,
190 Value::Number(backoff.max_attempts.into()),
191 );
192
193 match inner.ready().await?.call(ex).await {
194 Ok(ex) => return Ok(ex),
195 Err(retry_err) => {
196 if attempt + 1 == backoff.max_attempts {
197 let mut original = original.clone();
199 original
200 .input
201 .set_header(HEADER_REDELIVERED, Value::Bool(true));
202 original.input.set_header(
203 HEADER_REDELIVERY_COUNTER,
204 Value::Number(backoff.max_attempts.into()),
205 );
206 original.input.set_header(
207 HEADER_REDELIVERY_MAX_COUNTER,
208 Value::Number(backoff.max_attempts.into()),
209 );
210 if let Some(ref steps) = policy.on_steps {
211 let handler = policy_producer.clone().or(dlc.clone());
212 return execute_on_steps(
213 original,
214 retry_err,
215 steps,
216 policy.handled,
217 handler,
218 )
219 .await;
220 }
221 original.set_error(retry_err);
222 let handler = policy_producer.or(dlc);
223 return send_to_handler(original, handler).await;
224 }
225 }
226 }
227 }
228 }
229 if let Some(ref steps) = policy.on_steps {
231 let handler = policy_producer.or(dlc);
232 return execute_on_steps(original, err, steps, policy.handled, handler).await;
233 }
234 let mut ex = original.clone();
235 ex.set_error(err);
236 let handler = policy_producer.or(dlc);
237 send_to_handler(ex, handler).await
238 } else {
239 let mut ex = original;
241 ex.set_error(err);
242 send_to_handler(ex, dlc).await
243 }
244 })
245 }
246}
247
248async fn send_to_handler(
249 exchange: Exchange,
250 producer: Option<BoxProcessor>,
251) -> Result<Exchange, CamelError> {
252 match producer {
253 None => {
254 tracing::error!(
255 error = ?exchange.error,
256 "Exchange failed with no error handler configured"
257 );
258 Ok(exchange)
259 }
260 Some(mut prod) => match prod.ready().await {
261 Err(e) => {
262 tracing::error!("DLC/handler not ready: {e}");
263 Ok(exchange)
264 }
265 Ok(svc) => match svc.call(exchange.clone()).await {
266 Ok(ex) => Ok(ex),
267 Err(e) => {
268 tracing::error!("DLC/handler call failed: {e}");
269 Ok(exchange)
271 }
272 },
273 },
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use camel_api::{
281 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
282 error_handler::RedeliveryPolicy,
283 };
284 use std::sync::{
285 Arc,
286 atomic::{AtomicU32, Ordering},
287 };
288 use std::time::Duration;
289 use tower::ServiceExt;
290
291 fn make_exchange() -> Exchange {
292 Exchange::new(Message::new("test"))
293 }
294
295 fn failing_processor() -> BoxProcessor {
296 BoxProcessor::from_fn(|_ex| {
297 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
298 })
299 }
300
301 fn ok_processor() -> BoxProcessor {
302 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
303 }
304
305 fn fail_n_times(n: u32) -> BoxProcessor {
306 let count = Arc::new(AtomicU32::new(0));
307 BoxProcessor::from_fn(move |ex| {
308 let count = Arc::clone(&count);
309 Box::pin(async move {
310 let c = count.fetch_add(1, Ordering::SeqCst);
311 if c < n {
312 Err(CamelError::ProcessorError(format!("attempt {c}")))
313 } else {
314 Ok(ex)
315 }
316 })
317 })
318 }
319
320 #[tokio::test]
321 async fn test_ok_passthrough() {
322 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
323 let result = svc.oneshot(make_exchange()).await;
324 assert!(result.is_ok());
325 assert!(!result.unwrap().has_error());
326 }
327
328 #[tokio::test]
329 async fn test_error_goes_to_dlc() {
330 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
331 let received_clone = Arc::clone(&received);
332 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
333 let r = Arc::clone(&received_clone);
334 Box::pin(async move {
335 r.lock().unwrap().push(ex.clone());
336 Ok(ex)
337 })
338 });
339
340 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
341 let result = svc.oneshot(make_exchange()).await;
342 assert!(result.is_ok());
343 let ex = result.unwrap();
344 assert!(ex.has_error());
345 assert_eq!(received.lock().unwrap().len(), 1);
346 }
347
348 #[tokio::test]
349 async fn test_retry_recovers() {
350 let inner = fail_n_times(2);
351 let policy = ExceptionPolicy {
352 matches: Arc::new(|_| true),
353 retry: Some(RedeliveryPolicy {
354 max_attempts: 3,
355 initial_delay: Duration::from_millis(1),
356 multiplier: 1.0,
357 max_delay: Duration::from_millis(10),
358 jitter_factor: 0.0,
359 }),
360 handled_by: None,
361 on_steps: None,
362 handled: false,
363 };
364 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
365 let result = svc.oneshot(make_exchange()).await;
366 assert!(result.is_ok());
367 assert!(!result.unwrap().has_error());
368 }
369
370 #[tokio::test]
371 async fn test_retry_exhausted_goes_to_dlc() {
372 let inner = fail_n_times(10);
373 let received = Arc::new(std::sync::Mutex::new(0u32));
374 let received_clone = Arc::clone(&received);
375 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
376 let r = Arc::clone(&received_clone);
377 Box::pin(async move {
378 *r.lock().unwrap() += 1;
379 Ok(ex)
380 })
381 });
382 let policy = ExceptionPolicy {
383 matches: Arc::new(|_| true),
384 retry: Some(RedeliveryPolicy {
385 max_attempts: 2,
386 initial_delay: Duration::from_millis(1),
387 multiplier: 1.0,
388 max_delay: Duration::from_millis(10),
389 jitter_factor: 0.0,
390 }),
391 handled_by: None,
392 on_steps: None,
393 handled: false,
394 };
395 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
396 let result = svc.oneshot(make_exchange()).await;
397 assert!(result.is_ok());
398 assert!(result.unwrap().has_error());
399 assert_eq!(*received.lock().unwrap(), 1);
400 }
401
402 #[test]
403 fn test_poll_ready_delegates_to_inner() {
404 use std::sync::atomic::AtomicBool;
405
406 #[derive(Clone)]
408 struct DelayedReadyService {
409 ready: Arc<AtomicBool>,
410 }
411
412 impl Service<Exchange> for DelayedReadyService {
413 type Response = Exchange;
414 type Error = CamelError;
415 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
416
417 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
418 if self.ready.fetch_or(true, Ordering::SeqCst) {
419 Poll::Ready(Ok(()))
421 } else {
422 cx.waker().wake_by_ref();
424 Poll::Pending
425 }
426 }
427
428 fn call(&mut self, ex: Exchange) -> Self::Future {
429 Box::pin(async move { Ok(ex) })
430 }
431 }
432
433 let waker = futures::task::noop_waker();
434 let mut cx = Context::from_waker(&waker);
435
436 let inner = DelayedReadyService {
437 ready: Arc::new(AtomicBool::new(false)),
438 };
439 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
440
441 let first = Pin::new(&mut svc).poll_ready(&mut cx);
443 assert!(first.is_pending(), "expected Pending on first poll_ready");
444
445 let second = Pin::new(&mut svc).poll_ready(&mut cx);
447 assert!(second.is_ready(), "expected Ready on second poll_ready");
448 }
449
450 #[tokio::test]
451 async fn test_no_matching_policy_uses_dlc() {
452 let received = Arc::new(std::sync::Mutex::new(0u32));
453 let received_clone = Arc::clone(&received);
454 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
455 let r = Arc::clone(&received_clone);
456 Box::pin(async move {
457 *r.lock().unwrap() += 1;
458 Ok(ex)
459 })
460 });
461 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
462 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
463 let result = svc.oneshot(make_exchange()).await;
464 assert!(result.is_ok());
465 assert_eq!(*received.lock().unwrap(), 1);
466 }
467
468 #[tokio::test]
469 async fn test_redelivery_headers_are_set() {
470 use camel_api::error_handler::{
471 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
472 RedeliveryPolicy,
473 };
474
475 let inner = fail_n_times(10);
476 let received = Arc::new(std::sync::Mutex::new(None));
477 let received_clone = Arc::clone(&received);
478 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
479 let r = Arc::clone(&received_clone);
480 Box::pin(async move {
481 *r.lock().unwrap() = Some(ex.clone());
482 Ok(ex)
483 })
484 });
485
486 let policy = ExceptionPolicy {
487 matches: Arc::new(|_| true),
488 retry: Some(RedeliveryPolicy {
489 max_attempts: 2,
490 initial_delay: Duration::from_millis(1),
491 multiplier: 1.0,
492 max_delay: Duration::from_millis(10),
493 jitter_factor: 0.0,
494 }),
495 handled_by: None,
496 on_steps: None,
497 handled: false,
498 };
499
500 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
501 let _ = svc.oneshot(make_exchange()).await.unwrap();
502
503 let ex = received.lock().unwrap().take().unwrap();
504 assert_eq!(
505 ex.input.header(HEADER_REDELIVERED),
506 Some(&Value::Bool(true))
507 );
508 assert_eq!(
509 ex.input.header(HEADER_REDELIVERY_COUNTER),
510 Some(&Value::Number(2.into()))
511 );
512 assert_eq!(
513 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
514 Some(&Value::Number(2.into()))
515 );
516 }
517
518 #[tokio::test]
519 async fn test_jitter_produces_varying_delays_in_retry_flow() {
520 use std::time::Instant;
521
522 let inner = fail_n_times(10);
523 let received = Arc::new(std::sync::Mutex::new(None));
524 let received_clone = Arc::clone(&received);
525 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
526 let r = Arc::clone(&received_clone);
527 Box::pin(async move {
528 *r.lock().unwrap() = Some(ex.clone());
529 Ok(ex)
530 })
531 });
532
533 let policy = ExceptionPolicy {
534 matches: Arc::new(|_| true),
535 retry: Some(RedeliveryPolicy {
536 max_attempts: 5,
537 initial_delay: Duration::from_millis(20),
538 multiplier: 1.0,
539 max_delay: Duration::from_millis(100),
540 jitter_factor: 0.5,
541 }),
542 handled_by: None,
543 on_steps: None,
544 handled: false,
545 };
546
547 let start = Instant::now();
548 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
549 let _ = svc.oneshot(make_exchange()).await.unwrap();
550 let elapsed = start.elapsed();
551
552 assert!(
553 received.lock().unwrap().is_some(),
554 "DLC should have received exchange"
555 );
556
557 assert!(
558 elapsed >= Duration::from_millis(50),
559 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
560 );
561
562 assert!(
563 elapsed <= Duration::from_millis(500),
564 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
565 );
566 }
567
568 #[tokio::test]
571 async fn test_stopped_bypasses_error_handler() {
572 let stopped_inner =
573 BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
574
575 let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
577 let dlc_called_clone = Arc::clone(&dlc_called);
578 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
579 dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
580 Box::pin(async move { Ok(ex) })
581 });
582
583 let policy = ExceptionPolicy::new(|_| true); let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
585 let result = svc.oneshot(make_exchange()).await;
586
587 assert!(
589 matches!(result, Err(CamelError::Stopped)),
590 "expected Err(Stopped), got: {:?}",
591 result
592 );
593 assert!(
595 !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
596 "DLC should not be called for Stopped"
597 );
598 }
599
600 #[tokio::test]
601 async fn test_on_steps_handled_true_consumes_error() {
602 use tower::ServiceExt;
603
604 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
605 ex.input.body = camel_api::Body::Bytes("handled".into());
606 async move { Ok(ex) }
607 }));
608 let policy = ExceptionPolicy {
609 matches: Arc::new(|_| true),
610 retry: None,
611 handled_by: None,
612 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
613 handled: true,
614 };
615 let inner = tower::service_fn(|_ex: Exchange| async {
616 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
617 });
618 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
619 let ex = Exchange::default();
620 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
621 assert!(result.error.is_none(), "handled:true should clear error");
622 assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
623 }
624
625 #[tokio::test]
626 async fn test_on_steps_handled_false_propagates_error() {
627 use tower::ServiceExt;
628
629 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
630 ex.input.body = camel_api::Body::Bytes("handled".into());
631 async move { Ok(ex) }
632 }));
633 let policy = ExceptionPolicy {
634 matches: Arc::new(|_| true),
635 retry: None,
636 handled_by: None,
637 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
638 handled: false,
639 };
640 let inner = tower::service_fn(|_ex: Exchange| async {
641 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
642 });
643 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
644 let ex = Exchange::default();
645 let result = svc.ready().await.unwrap().call(ex).await;
646 assert!(result.is_err(), "handled:false should propagate error");
647 }
648
649 #[tokio::test]
650 async fn test_on_steps_handled_true_clears_exception_properties() {
651 use tower::ServiceExt;
652
653 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
654 ex.input.body = camel_api::Body::Bytes("handled".into());
655 async move { Ok(ex) }
656 }));
657 let policy = ExceptionPolicy {
658 matches: Arc::new(|_| true),
659 retry: None,
660 handled_by: None,
661 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
662 handled: true,
663 };
664 let inner = tower::service_fn(|_ex: Exchange| async {
665 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
666 });
667 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
668 let ex = Exchange::default();
669 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
670 assert!(result.error.is_none(), "handled:true should clear error");
671 assert!(
672 result
673 .properties
674 .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
675 .is_none(),
676 "handled:true should clear exception properties"
677 );
678 assert!(
679 result
680 .properties
681 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
682 .is_none(),
683 "handled:true should clear exception kind property"
684 );
685 assert!(
686 result
687 .properties
688 .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
689 .is_none(),
690 "handled:true should clear exception caught property"
691 );
692 }
693}