1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9 HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10 StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15 original: Exchange,
16 original_err: CamelError,
17 on_steps: &SyncBoxProcessor,
18 disposition: ExceptionDisposition,
19 handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21 let snapshot = original.clone();
22 let mut ex = original;
23 ex.set_error(original_err.clone());
24 let mut pipeline = on_steps.clone_inner();
25 let step_result = async {
26 let svc = pipeline.ready().await?;
27 svc.call(ex).await
28 }
29 .await;
30
31 match step_result {
32 Ok(mut ex) => {
33 if disposition == ExceptionDisposition::Handled {
34 ex.handle_error();
35 Ok(ex)
36 } else {
37 Err(original_err)
40 }
41 }
42 Err(_) => {
43 tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
44 let mut ex = snapshot;
45 ex.set_error(original_err);
46 send_to_handler(ex, handler).await
47 }
48 }
49}
50
51pub async fn invoke_processor(
56 svc: &mut BoxProcessor,
57 ex: Exchange,
58) -> Result<Exchange, CamelError> {
59 match svc.ready().await {
60 Ok(ready) => ready.call(ex).await,
61 Err(err) => Err(err),
62 }
63}
64
65#[async_trait::async_trait]
70pub trait RouteErrorHandler: Send + Sync {
71 fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
73
74 async fn retry_step(
76 &self,
77 policy: Option<PolicyId>,
78 step: &mut dyn camel_api::error_handler::RetryableStep,
79 original: Exchange,
80 error: CamelError,
81 ) -> RetryOutcome;
82
83 async fn handle_step(
85 &self,
86 policy: Option<PolicyId>,
87 exchange: Exchange,
88 error: CamelError,
89 ) -> Result<StepDisposition, CamelError>;
90
91 async fn handle_boundary(
93 &self,
94 kind: BoundaryKind,
95 exchange: Exchange,
96 error: CamelError,
97 ) -> Result<Exchange, CamelError>;
98}
99
100pub struct DefaultRouteErrorHandler {
106 pub(crate) dlc_producer: Option<SyncBoxProcessor>,
107 pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
108}
109
110impl DefaultRouteErrorHandler {
111 pub fn new(
112 dlc_producer: Option<BoxProcessor>,
113 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
114 ) -> Self {
115 Self {
116 dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
117 policies: policies
118 .into_iter()
119 .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
120 .collect(),
121 }
122 }
123
124 fn resolve_producer(
127 &self,
128 policy: Option<PolicyId>,
129 ) -> (ExceptionDisposition, Option<BoxProcessor>) {
130 match policy {
131 Some(PolicyId(idx)) => match self.policies.get(idx) {
132 Some((p, prod)) => (
133 p.disposition,
134 prod.as_ref()
135 .map(|p| p.clone_inner())
136 .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
137 ),
138 None => (
139 ExceptionDisposition::Propagate,
140 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
141 ),
142 },
143 None => (
144 ExceptionDisposition::Propagate,
145 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
146 ),
147 }
148 }
149}
150
151#[async_trait::async_trait]
152impl RouteErrorHandler for DefaultRouteErrorHandler {
153 fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
154 self.policies
155 .iter()
156 .position(|(p, _)| (p.matches)(err))
157 .map(PolicyId)
158 }
159
160 async fn retry_step(
161 &self,
162 policy: Option<PolicyId>,
163 step: &mut dyn camel_api::error_handler::RetryableStep,
164 original: Exchange,
165 error: CamelError,
166 ) -> RetryOutcome {
167 let Some(PolicyId(idx)) = policy else {
168 return RetryOutcome::Exhausted {
169 exchange: original,
170 error,
171 policy: None,
172 };
173 };
174 let Some((policy_def, _)) = self.policies.get(idx) else {
175 return RetryOutcome::Exhausted {
176 exchange: original,
177 error,
178 policy,
179 };
180 };
181 let Some(ref backoff) = policy_def.retry else {
182 return RetryOutcome::Exhausted {
183 exchange: original,
184 error,
185 policy,
186 };
187 };
188
189 for attempt in 0..backoff.max_attempts {
190 let delay = backoff.delay_for(attempt);
191 tokio::time::sleep(delay).await;
192
193 let mut ex = original.clone();
194 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
195 ex.input.set_header(
196 HEADER_REDELIVERY_COUNTER,
197 Value::Number((attempt + 1).into()),
198 );
199 ex.input.set_header(
200 HEADER_REDELIVERY_MAX_COUNTER,
201 Value::Number(backoff.max_attempts.into()),
202 );
203
204 match step.invoke(ex).await {
205 PipelineOutcome::Completed(exchange) => {
206 return RetryOutcome::Recovered(exchange);
207 }
208 PipelineOutcome::Stopped(stopped_ex) => {
209 return RetryOutcome::Stopped(stopped_ex);
210 }
211 PipelineOutcome::Failed(retry_err) => {
212 if attempt + 1 == backoff.max_attempts {
213 let mut final_ex = original;
214 final_ex
215 .input
216 .set_header(HEADER_REDELIVERED, Value::Bool(true));
217 final_ex.input.set_header(
218 HEADER_REDELIVERY_COUNTER,
219 Value::Number(backoff.max_attempts.into()),
220 );
221 final_ex.input.set_header(
222 HEADER_REDELIVERY_MAX_COUNTER,
223 Value::Number(backoff.max_attempts.into()),
224 );
225 return RetryOutcome::Exhausted {
226 exchange: final_ex,
227 error: retry_err,
228 policy,
229 };
230 }
231 }
232 }
233 }
234
235 RetryOutcome::Exhausted {
236 exchange: original,
237 error,
238 policy,
239 }
240 }
241
242 async fn handle_step(
243 &self,
244 policy: Option<PolicyId>,
245 mut exchange: Exchange,
246 error: CamelError,
247 ) -> Result<StepDisposition, CamelError> {
248 let (disposition, producer) = self.resolve_producer(policy);
249
250 if let Some(PolicyId(idx)) = policy
252 && let Some((p, _)) = self.policies.get(idx)
253 && let Some(ref steps) = p.on_steps
254 {
255 let snapshot = exchange.clone();
256 exchange.set_error(error.clone());
257 let mut step_pipeline = steps.clone_inner();
258 let step_result = async {
259 let svc = step_pipeline.ready().await?;
260 svc.call(exchange).await
261 }
262 .await;
263 match step_result {
264 Ok(mut ex) => match disposition {
265 ExceptionDisposition::Handled => {
266 ex.handle_error();
267 return Ok(StepDisposition::Handled(ex));
268 }
269 ExceptionDisposition::Continued => {
270 ex.clear_error();
271 return Ok(StepDisposition::Continued(ex));
272 }
273 ExceptionDisposition::Propagate => {
274 exchange = snapshot;
275 }
276 },
277 Err(_) => {
278 exchange = snapshot;
279 }
280 }
281 }
282
283 exchange.set_error(error.clone());
286 match send_to_handler(exchange, producer).await {
287 Ok(handler_ex) => match disposition {
288 ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
289 ExceptionDisposition::Handled => {
290 let mut ex = handler_ex;
291 ex.clear_error();
292 Ok(StepDisposition::Handled(ex))
293 }
294 ExceptionDisposition::Continued => {
295 let mut ex = handler_ex;
296 ex.clear_error();
297 Ok(StepDisposition::Continued(ex))
298 }
299 },
300 Err(_) => Ok(StepDisposition::Propagate(error)),
302 }
303 }
304
305 async fn handle_boundary(
306 &self,
307 _kind: BoundaryKind,
308 mut exchange: Exchange,
309 error: CamelError,
310 ) -> Result<Exchange, CamelError> {
311 let policy = self.match_policy(&error);
317 let (disposition, producer) = self.resolve_producer(policy);
318
319 if let Some(PolicyId(idx)) = policy
321 && let Some((p, _)) = self.policies.get(idx)
322 && let Some(ref steps) = p.on_steps
323 {
324 let snapshot = exchange.clone();
325 exchange.set_error(error.clone());
326 let mut step_pipeline = steps.clone_inner();
327 let step_result = async {
328 let svc = step_pipeline.ready().await?;
329 svc.call(exchange).await
330 }
331 .await;
332 match step_result {
333 Ok(mut ex) => match disposition {
334 ExceptionDisposition::Handled => {
335 ex.handle_error();
336 return Ok(ex);
337 }
338 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
339 exchange = snapshot;
340 }
341 },
342 Err(_) => {
343 exchange = snapshot;
344 }
345 }
346 }
347
348 exchange.set_error(error.clone());
350 match send_to_handler(exchange, producer).await {
351 Ok(handler_ex) => match disposition {
352 ExceptionDisposition::Handled => {
353 let mut ex = handler_ex;
354 ex.clear_error();
355 Ok(ex)
356 }
357 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
358 let mut ex = handler_ex;
359 ex.set_error(error);
360 Ok(ex)
361 }
362 },
363 Err(e) => Err(e),
365 }
366 }
367}
368
369pub struct ErrorHandlerLayer {
373 dlc_producer: Option<BoxProcessor>,
375 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
377}
378
379impl ErrorHandlerLayer {
380 pub fn new(
382 dlc_producer: Option<BoxProcessor>,
383 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
384 ) -> Self {
385 Self {
386 dlc_producer,
387 policies,
388 }
389 }
390}
391
392impl<S> Layer<S> for ErrorHandlerLayer
393where
394 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
395 S::Future: Send + 'static,
396{
397 type Service = ErrorHandlerService<S>;
398
399 fn layer(&self, inner: S) -> Self::Service {
400 ErrorHandlerService {
401 inner,
402 dlc_producer: self.dlc_producer.clone(),
403 policies: self
404 .policies
405 .iter()
406 .map(|(p, prod)| (p.clone(), prod.clone()))
407 .collect(),
408 }
409 }
410}
411
412pub struct ErrorHandlerService<S> {
417 inner: S,
418 dlc_producer: Option<BoxProcessor>,
419 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
420}
421
422impl<S: Clone> Clone for ErrorHandlerService<S> {
423 fn clone(&self) -> Self {
424 Self {
425 inner: self.inner.clone(),
426 dlc_producer: self.dlc_producer.clone(),
427 policies: self
428 .policies
429 .iter()
430 .map(|(p, prod)| (p.clone(), prod.clone()))
431 .collect(),
432 }
433 }
434}
435
436impl<S> ErrorHandlerService<S>
437where
438 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
439 S::Future: Send + 'static,
440{
441 pub fn new(
443 inner: S,
444 dlc_producer: Option<BoxProcessor>,
445 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
446 ) -> Self {
447 Self {
448 inner,
449 dlc_producer,
450 policies,
451 }
452 }
453}
454
455impl<S> Service<Exchange> for ErrorHandlerService<S>
456where
457 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
458 S::Future: Send + 'static,
459{
460 type Response = Exchange;
461 type Error = CamelError;
462 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
463
464 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
465 match self.inner.poll_ready(cx) {
470 Poll::Pending => Poll::Pending,
471 Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
472 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
473 }
474 }
475
476 fn call(&mut self, exchange: Exchange) -> Self::Future {
477 let mut inner = self.inner.clone();
478 let dlc = self.dlc_producer.clone();
479 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
480 .policies
481 .iter()
482 .map(|(p, prod)| (p.clone(), prod.clone()))
483 .collect();
484
485 Box::pin(async move {
486 let original = exchange.clone();
487 let result = match inner.ready().await {
488 Ok(svc) => svc.call(exchange).await,
489 Err(e) => Err(e), };
491
492 let err = match result {
493 Ok(ex) => return Ok(ex),
494 Err(e) => e,
495 };
496
497 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
499
500 if let Some((policy, policy_producer)) = matched {
501 if let Some(ref backoff) = policy.retry {
503 for attempt in 0..backoff.max_attempts {
504 let delay = backoff.delay_for(attempt);
505 tokio::time::sleep(delay).await;
506
507 let mut ex = original.clone();
509 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
510 ex.input.set_header(
511 HEADER_REDELIVERY_COUNTER,
512 Value::Number((attempt + 1).into()),
513 );
514 ex.input.set_header(
515 HEADER_REDELIVERY_MAX_COUNTER,
516 Value::Number(backoff.max_attempts.into()),
517 );
518
519 let result = match inner.ready().await {
520 Ok(svc) => svc.call(ex).await,
521 Err(e) => Err(e), };
523 match result {
524 Ok(ex) => return Ok(ex),
525 Err(retry_err) => {
526 if attempt + 1 == backoff.max_attempts {
527 let mut original = original.clone();
529 original
530 .input
531 .set_header(HEADER_REDELIVERED, Value::Bool(true));
532 original.input.set_header(
533 HEADER_REDELIVERY_COUNTER,
534 Value::Number(backoff.max_attempts.into()),
535 );
536 original.input.set_header(
537 HEADER_REDELIVERY_MAX_COUNTER,
538 Value::Number(backoff.max_attempts.into()),
539 );
540 if let Some(ref steps) = policy.on_steps {
541 let handler = policy_producer.clone().or(dlc.clone());
542 return execute_on_steps(
543 original,
544 retry_err,
545 steps,
546 policy.disposition,
547 handler,
548 )
549 .await;
550 }
551 original.set_error(retry_err);
552 let handler = policy_producer.or(dlc);
553 return send_to_handler(original, handler).await;
554 }
555 }
556 }
557 }
558 }
559 if let Some(ref steps) = policy.on_steps {
561 let handler = policy_producer.or(dlc);
562 return execute_on_steps(original, err, steps, policy.disposition, handler)
563 .await;
564 }
565 let mut ex = original.clone();
566 ex.set_error(err);
567 let handler = policy_producer.or(dlc);
568 send_to_handler(ex, handler).await
569 } else {
570 let mut ex = original;
572 ex.set_error(err);
573 send_to_handler(ex, dlc).await
574 }
575 })
576 }
577}
578
579async fn send_to_handler(
580 exchange: Exchange,
581 producer: Option<BoxProcessor>,
582) -> Result<Exchange, CamelError> {
583 match producer {
584 None => {
585 tracing::error!(
587 error = ?exchange.error,
588 "Exchange failed with no error handler configured"
589 );
590 Ok(exchange)
591 }
592 Some(mut prod) => match prod.ready().await {
593 Err(e) => {
594 tracing::error!("DLC/handler not ready: {e}");
596 Ok(exchange)
597 }
598 Ok(svc) => match svc.call(exchange.clone()).await {
599 Ok(ex) => Ok(ex),
600 Err(e) => {
601 tracing::error!("DLC/handler call failed: {e}");
603 Ok(exchange)
605 }
606 },
607 },
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use super::*;
614 use camel_api::{
615 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, OutcomePipeline,
616 OutcomeSegment, PipelineOutcome, RetryableStep, SyncBoxProcessor, Value,
617 error_handler::RedeliveryPolicy,
618 };
619 use std::future::Future;
620 use std::pin::Pin;
621 use std::sync::{
622 Arc,
623 atomic::{AtomicU32, Ordering},
624 };
625 use std::time::Duration;
626 use tower::ServiceExt;
627
628 fn make_exchange() -> Exchange {
629 Exchange::new(Message::new("test"))
630 }
631
632 fn failing_processor() -> BoxProcessor {
633 BoxProcessor::from_fn(|_ex| {
634 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
635 })
636 }
637
638 fn ok_processor() -> BoxProcessor {
639 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
640 }
641
642 fn fail_n_times(n: u32) -> BoxProcessor {
643 let count = Arc::new(AtomicU32::new(0));
644 BoxProcessor::from_fn(move |ex| {
645 let count = Arc::clone(&count);
646 Box::pin(async move {
647 let c = count.fetch_add(1, Ordering::SeqCst);
648 if c < n {
649 Err(CamelError::ProcessorError(format!("attempt {c}")))
650 } else {
651 Ok(ex)
652 }
653 })
654 })
655 }
656
657 #[tokio::test]
658 async fn test_ok_passthrough() {
659 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
660 let result = svc.oneshot(make_exchange()).await;
661 assert!(result.is_ok());
662 assert!(!result.unwrap().has_error());
663 }
664
665 #[tokio::test]
666 async fn test_error_goes_to_dlc() {
667 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
668 let received_clone = Arc::clone(&received);
669 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
670 let r = Arc::clone(&received_clone);
671 Box::pin(async move {
672 r.lock().unwrap().push(ex.clone());
673 Ok(ex)
674 })
675 });
676
677 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
678 let result = svc.oneshot(make_exchange()).await;
679 assert!(result.is_ok());
680 let ex = result.unwrap();
681 assert!(ex.has_error());
682 assert_eq!(received.lock().unwrap().len(), 1);
683 }
684
685 #[tokio::test]
686 async fn test_retry_recovers() {
687 let inner = fail_n_times(2);
688 let policy = ExceptionPolicy {
689 matches: Arc::new(|_| true),
690 retry: Some(RedeliveryPolicy {
691 max_attempts: 3,
692 initial_delay: Duration::from_millis(1),
693 multiplier: 1.0,
694 max_delay: Duration::from_millis(10),
695 jitter_factor: 0.0,
696 }),
697 handled_by: None,
698 on_steps: None,
699 disposition: ExceptionDisposition::Propagate,
700 };
701 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
702 let result = svc.oneshot(make_exchange()).await;
703 assert!(result.is_ok());
704 assert!(!result.unwrap().has_error());
705 }
706
707 #[tokio::test]
708 async fn test_retry_exhausted_goes_to_dlc() {
709 let inner = fail_n_times(10);
710 let received = Arc::new(std::sync::Mutex::new(0u32));
711 let received_clone = Arc::clone(&received);
712 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
713 let r = Arc::clone(&received_clone);
714 Box::pin(async move {
715 *r.lock().unwrap() += 1;
716 Ok(ex)
717 })
718 });
719 let policy = ExceptionPolicy {
720 matches: Arc::new(|_| true),
721 retry: Some(RedeliveryPolicy {
722 max_attempts: 2,
723 initial_delay: Duration::from_millis(1),
724 multiplier: 1.0,
725 max_delay: Duration::from_millis(10),
726 jitter_factor: 0.0,
727 }),
728 handled_by: None,
729 on_steps: None,
730 disposition: ExceptionDisposition::Propagate,
731 };
732 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
733 let result = svc.oneshot(make_exchange()).await;
734 assert!(result.is_ok());
735 assert!(result.unwrap().has_error());
736 assert_eq!(*received.lock().unwrap(), 1);
737 }
738
739 #[test]
740 fn test_poll_ready_delegates_to_inner() {
741 use std::sync::atomic::AtomicBool;
742
743 #[derive(Clone)]
745 struct DelayedReadyService {
746 ready: Arc<AtomicBool>,
747 }
748
749 impl Service<Exchange> for DelayedReadyService {
750 type Response = Exchange;
751 type Error = CamelError;
752 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
753
754 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
755 if self.ready.fetch_or(true, Ordering::SeqCst) {
756 Poll::Ready(Ok(()))
758 } else {
759 cx.waker().wake_by_ref();
761 Poll::Pending
762 }
763 }
764
765 fn call(&mut self, ex: Exchange) -> Self::Future {
766 Box::pin(async move { Ok(ex) })
767 }
768 }
769
770 let waker = futures::task::noop_waker();
771 let mut cx = Context::from_waker(&waker);
772
773 let inner = DelayedReadyService {
774 ready: Arc::new(AtomicBool::new(false)),
775 };
776 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
777
778 let first = Pin::new(&mut svc).poll_ready(&mut cx);
780 assert!(first.is_pending(), "expected Pending on first poll_ready");
781
782 let second = Pin::new(&mut svc).poll_ready(&mut cx);
784 assert!(second.is_ready(), "expected Ready on second poll_ready");
785 }
786
787 #[tokio::test]
788 async fn test_no_matching_policy_uses_dlc() {
789 let received = Arc::new(std::sync::Mutex::new(0u32));
790 let received_clone = Arc::clone(&received);
791 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
792 let r = Arc::clone(&received_clone);
793 Box::pin(async move {
794 *r.lock().unwrap() += 1;
795 Ok(ex)
796 })
797 });
798 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
799 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
800 let result = svc.oneshot(make_exchange()).await;
801 assert!(result.is_ok());
802 assert_eq!(*received.lock().unwrap(), 1);
803 }
804
805 #[tokio::test]
806 async fn test_redelivery_headers_are_set() {
807 use camel_api::error_handler::{
808 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
809 RedeliveryPolicy,
810 };
811
812 let inner = fail_n_times(10);
813 let received = Arc::new(std::sync::Mutex::new(None));
814 let received_clone = Arc::clone(&received);
815 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
816 let r = Arc::clone(&received_clone);
817 Box::pin(async move {
818 *r.lock().unwrap() = Some(ex.clone());
819 Ok(ex)
820 })
821 });
822
823 let policy = ExceptionPolicy {
824 matches: Arc::new(|_| true),
825 retry: Some(RedeliveryPolicy {
826 max_attempts: 2,
827 initial_delay: Duration::from_millis(1),
828 multiplier: 1.0,
829 max_delay: Duration::from_millis(10),
830 jitter_factor: 0.0,
831 }),
832 handled_by: None,
833 on_steps: None,
834 disposition: ExceptionDisposition::Propagate,
835 };
836
837 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
838 let _ = svc.oneshot(make_exchange()).await.unwrap();
839
840 let ex = received.lock().unwrap().take().unwrap();
841 assert_eq!(
842 ex.input.header(HEADER_REDELIVERED),
843 Some(&Value::Bool(true))
844 );
845 assert_eq!(
846 ex.input.header(HEADER_REDELIVERY_COUNTER),
847 Some(&Value::Number(2.into()))
848 );
849 assert_eq!(
850 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
851 Some(&Value::Number(2.into()))
852 );
853 }
854
855 #[tokio::test]
856 async fn test_jitter_produces_varying_delays_in_retry_flow() {
857 use std::time::Instant;
858
859 let inner = fail_n_times(10);
860 let received = Arc::new(std::sync::Mutex::new(None));
861 let received_clone = Arc::clone(&received);
862 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
863 let r = Arc::clone(&received_clone);
864 Box::pin(async move {
865 *r.lock().unwrap() = Some(ex.clone());
866 Ok(ex)
867 })
868 });
869
870 let policy = ExceptionPolicy {
871 matches: Arc::new(|_| true),
872 retry: Some(RedeliveryPolicy {
873 max_attempts: 5,
874 initial_delay: Duration::from_millis(20),
875 multiplier: 1.0,
876 max_delay: Duration::from_millis(100),
877 jitter_factor: 0.5,
878 }),
879 handled_by: None,
880 on_steps: None,
881 disposition: ExceptionDisposition::Propagate,
882 };
883
884 let start = Instant::now();
885 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
886 let _ = svc.oneshot(make_exchange()).await.unwrap();
887 let elapsed = start.elapsed();
888
889 assert!(
890 received.lock().unwrap().is_some(),
891 "DLC should have received exchange"
892 );
893
894 assert!(
895 elapsed >= Duration::from_millis(50),
896 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
897 );
898
899 assert!(
900 elapsed <= Duration::from_millis(500),
901 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
902 );
903 }
904
905 #[tokio::test]
906 async fn test_on_steps_handled_true_consumes_error() {
907 use tower::ServiceExt;
908
909 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
910 ex.input.body = camel_api::Body::Bytes("handled".into());
911 async move { Ok(ex) }
912 }));
913 let policy = ExceptionPolicy {
914 matches: Arc::new(|_| true),
915 retry: None,
916 handled_by: None,
917 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
918 disposition: ExceptionDisposition::Handled,
919 };
920 let inner = tower::service_fn(|_ex: Exchange| async {
921 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
922 });
923 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
924 let ex = Exchange::default();
925 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
926 assert!(result.error.is_none(), "handled:true should clear error");
927 assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
928 }
929
930 #[tokio::test]
931 async fn test_on_steps_handled_false_propagates_error() {
932 use tower::ServiceExt;
933
934 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
935 ex.input.body = camel_api::Body::Bytes("handled".into());
936 async move { Ok(ex) }
937 }));
938 let policy = ExceptionPolicy {
939 matches: Arc::new(|_| true),
940 retry: None,
941 handled_by: None,
942 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
943 disposition: ExceptionDisposition::Propagate,
944 };
945 let inner = tower::service_fn(|_ex: Exchange| async {
946 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
947 });
948 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
949 let ex = Exchange::default();
950 let result = svc.ready().await.unwrap().call(ex).await;
951 assert!(result.is_err(), "handled:false should propagate error");
952 }
953
954 #[derive(Clone)]
962 struct ReadinessFailService {
963 error: CamelError,
964 }
965
966 impl ReadinessFailService {
967 fn new(error: CamelError) -> Self {
968 Self { error }
969 }
970 }
971
972 impl Service<Exchange> for ReadinessFailService {
973 type Response = Exchange;
974 type Error = CamelError;
975 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
976
977 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
978 Poll::Ready(Err(self.error.clone()))
979 }
980
981 fn call(&mut self, ex: Exchange) -> Self::Future {
982 Box::pin(async move { Ok(ex) })
985 }
986 }
987
988 #[tokio::test]
989 async fn test_readiness_error_goes_to_dlc() {
990 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
991 let inner = ReadinessFailService {
992 error: readiness_err,
993 };
994
995 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
996 let received_clone = Arc::clone(&received);
997 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
998 let r = Arc::clone(&received_clone);
999 Box::pin(async move {
1000 r.lock().unwrap().push(ex.clone());
1001 Ok(ex)
1002 })
1003 });
1004
1005 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1006 let result = svc.oneshot(make_exchange()).await;
1007
1008 assert!(
1010 result.is_ok(),
1011 "readiness error should be captured and sent to DLC, got: {:?}",
1012 result
1013 );
1014 let ex = result.unwrap();
1015 assert!(ex.has_error(), "exchange should carry the readiness error");
1016 assert_eq!(
1017 received.lock().unwrap().len(),
1018 1,
1019 "DLC should have received the exchange exactly once"
1020 );
1021 }
1022
1023 #[tokio::test]
1024 async fn test_readiness_error_goes_to_matching_policy() {
1025 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1026 let inner = ReadinessFailService {
1027 error: readiness_err,
1028 };
1029
1030 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1031 ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1032 async move { Ok(ex) }
1033 }));
1034 let policy = ExceptionPolicy {
1035 matches: Arc::new(|_| true),
1036 retry: None,
1037 handled_by: None,
1038 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1039 disposition: ExceptionDisposition::Handled,
1040 };
1041
1042 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1043 let result = svc.oneshot(make_exchange()).await;
1044
1045 assert!(
1047 result.is_ok(),
1048 "readiness error should be captured by policy, got: {:?}",
1049 result
1050 );
1051 let ex = result.unwrap();
1052 assert!(ex.error.is_none(), "handled:true should clear error");
1053 assert!(
1054 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1055 "on_steps should have modified the body"
1056 );
1057 }
1058
1059 #[test]
1060 fn test_poll_ready_converts_readiness_error_to_ok() {
1061 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1062 let inner = ReadinessFailService {
1063 error: readiness_err,
1064 };
1065 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1066
1067 let waker = futures::task::noop_waker();
1068 let mut cx = Context::from_waker(&waker);
1069
1070 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1072 match poll {
1073 Poll::Ready(Ok(())) => { }
1074 Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1075 Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1076 }
1077 }
1078
1079 #[tokio::test]
1082 async fn test_invoke_processor_returns_ok_on_success() {
1083 let mut svc = ok_processor();
1084 let ex = make_exchange();
1085 let result = invoke_processor(&mut svc, ex).await;
1086 assert!(result.is_ok());
1087 }
1088
1089 #[tokio::test]
1090 async fn test_invoke_processor_captures_readiness_error() {
1091 let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1092 CamelError::ProcessorError("not ready".into()),
1093 ));
1094 let ex = make_exchange();
1095 let result = invoke_processor(&mut failing_ready, ex).await;
1096 assert!(result.is_err());
1097 }
1098
1099 #[tokio::test]
1100 async fn test_on_steps_handled_true_clears_exception_properties() {
1101 use tower::ServiceExt;
1102
1103 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1104 ex.input.body = camel_api::Body::Bytes("handled".into());
1105 async move { Ok(ex) }
1106 }));
1107 let policy = ExceptionPolicy {
1108 matches: Arc::new(|_| true),
1109 retry: None,
1110 handled_by: None,
1111 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1112 disposition: ExceptionDisposition::Handled,
1113 };
1114 let inner = tower::service_fn(|_ex: Exchange| async {
1115 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1116 });
1117 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1118 let ex = Exchange::default();
1119 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1120 assert!(result.error.is_none(), "handled:true should clear error");
1121 assert!(
1122 result
1123 .properties
1124 .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1125 .is_none(),
1126 "handled:true should clear exception properties"
1127 );
1128 assert!(
1129 result
1130 .properties
1131 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1132 .is_none(),
1133 "handled:true should clear exception kind property"
1134 );
1135 assert!(
1136 result
1137 .properties
1138 .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1139 .is_none(),
1140 "handled:true should clear exception caught property"
1141 );
1142 }
1143
1144 #[test]
1147 fn test_match_policy_returns_id_for_matching_error() {
1148 let handler = DefaultRouteErrorHandler::new(
1149 None,
1150 vec![(
1151 ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1152 None,
1153 )],
1154 );
1155 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1156 assert_eq!(id, Some(PolicyId(0)));
1157 }
1158
1159 #[test]
1160 fn test_match_policy_returns_none_for_unmatched() {
1161 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1162 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1163 assert_eq!(id, None);
1164 }
1165
1166 #[tokio::test]
1169 async fn test_retry_step_succeeds_on_second_attempt() {
1170 let mut policy = ExceptionPolicy::new(|_| true);
1171 policy.retry = Some(RedeliveryPolicy::new(3));
1172 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1173 let mut step = fail_n_times(1); let ex = make_exchange();
1175 let outcome = handler
1176 .retry_step(
1177 Some(PolicyId(0)),
1178 &mut step,
1179 ex,
1180 CamelError::ProcessorError("attempt 0".into()),
1181 )
1182 .await;
1183 assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1184 }
1185
1186 #[tokio::test]
1187 async fn test_retry_step_exhausted_when_all_fail() {
1188 let mut policy = ExceptionPolicy::new(|_| true);
1189 policy.retry = Some(RedeliveryPolicy::new(3));
1190 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1191 let mut step = failing_processor();
1192 let ex = make_exchange();
1193 let outcome = handler
1194 .retry_step(
1195 Some(PolicyId(0)),
1196 &mut step,
1197 ex,
1198 CamelError::ProcessorError("boom".into()),
1199 )
1200 .await;
1201 assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1202 }
1203
1204 #[tokio::test]
1205 async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1206 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1207 let mut step = ok_processor();
1208 let ex = make_exchange();
1209 let outcome = handler
1210 .retry_step(
1211 None,
1212 &mut step,
1213 ex,
1214 CamelError::ProcessorError("boom".into()),
1215 )
1216 .await;
1217 assert!(matches!(
1218 outcome,
1219 RetryOutcome::Exhausted { policy: None, .. }
1220 ));
1221 }
1222
1223 #[tokio::test]
1226 async fn test_handle_step_propagate_sends_to_dlc() {
1227 let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1228 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1229 let ex = make_exchange();
1230 let result = handler
1231 .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1232 .await;
1233 assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_handle_step_handled_uses_handler_output() {
1238 let handler_producer = BoxProcessor::from_fn(|mut ex| {
1239 Box::pin(async move {
1240 ex.input.set_header("processed_by", Value::Bool(true));
1241 Ok(ex)
1242 })
1243 });
1244 let policy = ExceptionPolicy {
1245 matches: std::sync::Arc::new(|_| true),
1246 retry: None,
1247 handled_by: None,
1248 on_steps: None,
1249 disposition: ExceptionDisposition::Handled,
1250 };
1251 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1252 let mut ex = make_exchange();
1253 ex.set_error(CamelError::ProcessorError("boom".into()));
1254 let result = handler
1255 .handle_step(
1256 Some(PolicyId(0)),
1257 ex,
1258 CamelError::ProcessorError("boom".into()),
1259 )
1260 .await;
1261 match result {
1262 Ok(StepDisposition::Handled(ex)) => {
1263 assert!(!ex.has_error(), "error should be cleared");
1264 assert_eq!(
1265 ex.input.header("processed_by"),
1266 Some(&Value::Bool(true)),
1267 "should use handler's output exchange"
1268 );
1269 }
1270 other => panic!("expected Handled, got {:?}", other.is_ok()),
1271 }
1272 }
1273
1274 #[tokio::test]
1275 async fn test_handle_step_continued_clears_error() {
1276 let policy = ExceptionPolicy {
1277 matches: std::sync::Arc::new(|_| true),
1278 retry: None,
1279 handled_by: None,
1280 on_steps: None,
1281 disposition: ExceptionDisposition::Continued,
1282 };
1283 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1284 let mut ex = make_exchange();
1285 ex.set_error(CamelError::ProcessorError("boom".into()));
1286 let result = handler
1287 .handle_step(
1288 Some(PolicyId(0)),
1289 ex,
1290 CamelError::ProcessorError("boom".into()),
1291 )
1292 .await;
1293 match result {
1294 Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1295 other => panic!("expected Continued, got {:?}", other.is_ok()),
1296 }
1297 }
1298
1299 #[tokio::test]
1300 async fn test_handle_step_with_on_steps_handled() {
1301 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1302 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1303 async move { Ok(ex) }
1304 }));
1305 let policy = ExceptionPolicy {
1306 matches: std::sync::Arc::new(|_| true),
1307 retry: None,
1308 handled_by: None,
1309 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1310 disposition: ExceptionDisposition::Handled,
1311 };
1312 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1313 let mut ex = make_exchange();
1314 ex.set_error(CamelError::ProcessorError("boom".into()));
1315 let result = handler
1316 .handle_step(
1317 Some(PolicyId(0)),
1318 ex,
1319 CamelError::ProcessorError("boom".into()),
1320 )
1321 .await;
1322 match result {
1323 Ok(StepDisposition::Handled(ex)) => {
1324 assert!(!ex.has_error(), "error should be cleared");
1325 assert!(
1326 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1327 "on_steps should have modified the body"
1328 );
1329 }
1330 other => panic!("expected Handled, got: {}", other.is_ok()),
1331 }
1332 }
1333
1334 #[tokio::test]
1335 async fn test_handle_step_with_on_steps_propagate_falls_through() {
1336 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1337 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1338 async move { Ok(ex) }
1339 }));
1340 let dlc_called = Arc::new(AtomicU32::new(0));
1341 let dlc_called_clone = dlc_called.clone();
1342 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1343 let c = dlc_called_clone.clone();
1344 Box::pin(async move {
1345 c.fetch_add(1, Ordering::SeqCst);
1346 Ok(ex)
1347 })
1348 });
1349 let policy = ExceptionPolicy {
1350 matches: std::sync::Arc::new(|_| true),
1351 retry: None,
1352 handled_by: None,
1353 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1354 disposition: ExceptionDisposition::Propagate,
1355 };
1356 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1357 let mut ex = make_exchange();
1358 ex.set_error(CamelError::ProcessorError("boom".into()));
1359 let result = handler
1360 .handle_step(
1361 Some(PolicyId(0)),
1362 ex,
1363 CamelError::ProcessorError("boom".into()),
1364 )
1365 .await;
1366 assert!(
1367 matches!(result, Ok(StepDisposition::Propagate(_))),
1368 "Propagate disposition should return Propagate"
1369 );
1370 assert_eq!(
1371 dlc_called.load(Ordering::SeqCst),
1372 1,
1373 "DLC should be called when on_steps disposition is Propagate"
1374 );
1375 }
1376
1377 #[tokio::test]
1378 async fn test_handle_step_dlc_failure_propagates() {
1379 let failing_dlc = BoxProcessor::from_fn(|_| {
1380 Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1381 });
1382 let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1383 let ex = make_exchange();
1384 let result = handler
1385 .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1386 .await;
1387 assert!(
1388 matches!(result, Ok(StepDisposition::Propagate(_))),
1389 "DLC failure should still return Propagate with original error"
1390 );
1391 }
1392
1393 #[tokio::test]
1396 async fn test_handle_boundary_security_error_goes_to_dlc() {
1397 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1398 let count_clone = dlc_count.clone();
1399 let dlc = BoxProcessor::from_fn(move |ex| {
1400 let c = count_clone.clone();
1401 Box::pin(async move {
1402 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1403 Ok(ex)
1404 })
1405 });
1406 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1407 let ex = make_exchange();
1408 let result = handler
1409 .handle_boundary(
1410 BoundaryKind::Security,
1411 ex,
1412 CamelError::Unauthorized("denied".into()),
1413 )
1414 .await;
1415 assert!(result.is_ok());
1416 assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1417 }
1418
1419 #[tokio::test]
1420 async fn test_handle_boundary_handled_clears_error() {
1421 let policy = ExceptionPolicy {
1422 matches: std::sync::Arc::new(|_| true),
1423 retry: None,
1424 handled_by: None,
1425 on_steps: None,
1426 disposition: ExceptionDisposition::Handled,
1427 };
1428 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1429 let ex = make_exchange();
1430 let result = handler
1431 .handle_boundary(
1432 BoundaryKind::Security,
1433 ex,
1434 CamelError::Unauthorized("denied".into()),
1435 )
1436 .await;
1437 assert!(result.is_ok());
1438 assert!(
1439 !result.unwrap().has_error(),
1440 "Handled disposition should clear error"
1441 );
1442 }
1443
1444 #[tokio::test]
1445 async fn test_handle_boundary_propagate_preserves_error() {
1446 let policy = ExceptionPolicy {
1447 matches: std::sync::Arc::new(|_| true),
1448 retry: None,
1449 handled_by: None,
1450 on_steps: None,
1451 disposition: ExceptionDisposition::Propagate,
1452 };
1453 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1454 let ex = make_exchange();
1455 let result = handler
1456 .handle_boundary(
1457 BoundaryKind::CircuitBreaker,
1458 ex,
1459 CamelError::CircuitOpen("open".into()),
1460 )
1461 .await;
1462 assert!(result.is_ok(), "boundary errors always return Ok");
1463 assert!(
1464 result.unwrap().has_error(),
1465 "Propagate disposition should preserve error"
1466 );
1467 }
1468
1469 #[tokio::test]
1470 async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1471 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1472 let count_clone = dlc_count.clone();
1473 let dlc = BoxProcessor::from_fn(move |ex| {
1474 let c = count_clone.clone();
1475 Box::pin(async move {
1476 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1477 Ok(ex)
1478 })
1479 });
1480 let policy = ExceptionPolicy {
1481 matches: std::sync::Arc::new(|_| true),
1482 retry: None,
1483 handled_by: None,
1484 on_steps: None,
1485 disposition: ExceptionDisposition::Continued,
1486 };
1487 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1488 let ex = make_exchange();
1489 let result = handler
1490 .handle_boundary(
1491 BoundaryKind::Security,
1492 ex,
1493 CamelError::Unauthorized("denied".into()),
1494 )
1495 .await;
1496 assert!(result.is_ok(), "boundary errors always return Ok");
1497 assert!(
1498 result.unwrap().has_error(),
1499 "Continued at boundary should preserve error"
1500 );
1501 assert_eq!(
1502 dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1503 1,
1504 "DLC should be called"
1505 );
1506 }
1507
1508 #[tokio::test]
1509 async fn test_handle_boundary_with_on_steps_handled() {
1510 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1511 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1512 async move { Ok(ex) }
1513 }));
1514 let policy = ExceptionPolicy {
1515 matches: std::sync::Arc::new(|_| true),
1516 retry: None,
1517 handled_by: None,
1518 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1519 disposition: ExceptionDisposition::Handled,
1520 };
1521 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1522 let ex = make_exchange();
1523 let result = handler
1524 .handle_boundary(
1525 BoundaryKind::Security,
1526 ex,
1527 CamelError::Unauthorized("denied".into()),
1528 )
1529 .await;
1530 assert!(result.is_ok(), "boundary errors always return Ok");
1531 let ex = result.unwrap();
1532 assert!(!ex.has_error(), "Handled disposition should clear error");
1533 assert!(
1534 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1535 "on_steps should have modified the body"
1536 );
1537 }
1538
1539 #[tokio::test]
1540 async fn retry_step_segment_stop_maps_to_retry_outcome_stopped() {
1541 use std::sync::Arc;
1542 use std::sync::atomic::{AtomicUsize, Ordering};
1543
1544 #[derive(Clone)]
1545 struct StoppingSegment {
1546 n: Arc<AtomicUsize>,
1547 }
1548 impl OutcomePipeline for StoppingSegment {
1549 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
1550 Box::new(self.clone())
1551 }
1552 fn run<'a>(
1553 &'a mut self,
1554 ex: Exchange,
1555 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
1556 let n = self.n.clone();
1557 Box::pin(async move {
1558 n.fetch_add(1, Ordering::SeqCst);
1559 PipelineOutcome::Stopped(ex)
1560 })
1561 }
1562 }
1563
1564 let call_count = Arc::new(AtomicUsize::new(0));
1565 let seg = OutcomeSegment::new(Box::new(StoppingSegment {
1566 n: call_count.clone(),
1567 }));
1568 let mut retryable: Box<dyn RetryableStep> = Box::new(seg);
1569
1570 let mut policy = ExceptionPolicy::new(|_e: &CamelError| true);
1571 policy.retry = Some(RedeliveryPolicy::new(3));
1572 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1573
1574 let original = Exchange::new(Message::new("retry-me"));
1575 let err = CamelError::ProcessorError("trigger retry".into());
1576 let outcome = handler
1577 .retry_step(Some(PolicyId(0)), retryable.as_mut(), original, err)
1578 .await;
1579
1580 assert!(
1581 matches!(outcome, RetryOutcome::Stopped(_)),
1582 "Segment Stop must map to RetryOutcome::Stopped, got {:?}",
1583 outcome
1584 );
1585 assert_eq!(
1586 call_count.load(Ordering::SeqCst),
1587 1,
1588 "Stop must short-circuit retry — only 1 invoke expected, got {}",
1589 call_count.load(Ordering::SeqCst)
1590 );
1591 }
1592
1593 #[tokio::test]
1594 async fn retry_step_new_signature_works_with_dlc_producer() {
1595 use std::sync::Arc;
1596 use std::sync::atomic::{AtomicUsize, Ordering};
1597
1598 #[derive(Clone)]
1599 struct CountingProducer {
1600 count: Arc<AtomicUsize>,
1601 succeed_on: usize,
1602 }
1603 impl tower::Service<Exchange> for CountingProducer {
1604 type Response = Exchange;
1605 type Error = CamelError;
1606 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1607 fn poll_ready(
1608 &mut self,
1609 _cx: &mut std::task::Context<'_>,
1610 ) -> std::task::Poll<Result<(), Self::Error>> {
1611 std::task::Poll::Ready(Ok(()))
1612 }
1613 fn call(&mut self, ex: Exchange) -> Self::Future {
1614 let n = self.count.fetch_add(1, Ordering::SeqCst);
1615 let succeed_on = self.succeed_on;
1616 Box::pin(async move {
1617 if n >= succeed_on {
1618 Ok(ex)
1619 } else {
1620 Err(CamelError::ProcessorError("retry".into()))
1621 }
1622 })
1623 }
1624 }
1625
1626 let count = Arc::new(AtomicUsize::new(0));
1627 let producer = CountingProducer {
1628 count: count.clone(),
1629 succeed_on: 2,
1630 };
1631 let sync_bp = SyncBoxProcessor::new(BoxProcessor::new(producer));
1632 let bp1 = sync_bp.clone_inner();
1633 let bp2 = sync_bp.clone_inner();
1634 let mut retryable1: Box<dyn RetryableStep> = Box::new(bp1);
1635 let mut retryable2: Box<dyn RetryableStep> = Box::new(bp2);
1636
1637 let ex = Exchange::new(Message::new("dlc"));
1638 let outcome1 = retryable1.invoke(ex.clone()).await;
1639 let outcome2 = retryable2.invoke(ex).await;
1640 assert!(matches!(outcome1, PipelineOutcome::Failed(_)));
1641 assert!(matches!(outcome2, PipelineOutcome::Failed(_)));
1642 assert_eq!(
1643 count.load(Ordering::SeqCst),
1644 2,
1645 "DLC producer must be invoked exactly twice through SyncBoxProcessor"
1646 );
1647 drop(retryable1);
1648 drop(retryable2);
1649 drop(sync_bp);
1650 }
1651}