1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9 HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10 StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15 original: Exchange,
16 original_err: CamelError,
17 on_steps: &SyncBoxProcessor,
18 disposition: ExceptionDisposition,
19 handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21 let snapshot = original.clone();
22 let mut ex = original;
23 ex.set_error(original_err.clone());
24 let mut pipeline = on_steps.clone_inner();
25 let step_result = async {
26 let svc = pipeline.ready().await?;
27 svc.call(ex).await
28 }
29 .await;
30
31 match step_result {
32 Ok(mut ex) => {
33 if disposition == ExceptionDisposition::Handled {
34 ex.handle_error();
35 Ok(ex)
36 } else {
37 Err(original_err)
40 }
41 }
42 Err(_) => {
43 tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
44 let mut ex = snapshot;
45 ex.set_error(original_err);
46 send_to_handler(ex, handler).await
47 }
48 }
49}
50
51pub async fn invoke_processor(
56 svc: &mut BoxProcessor,
57 ex: Exchange,
58) -> Result<Exchange, CamelError> {
59 match svc.ready().await {
60 Ok(ready) => ready.call(ex).await,
61 Err(err) => Err(err),
62 }
63}
64
65#[async_trait::async_trait]
70pub trait RouteErrorHandler: Send + Sync {
71 fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
73
74 async fn retry_step(
76 &self,
77 policy: Option<PolicyId>,
78 step: &mut BoxProcessor,
79 original: Exchange,
80 error: CamelError,
81 ) -> RetryOutcome;
82
83 async fn handle_step(
85 &self,
86 policy: Option<PolicyId>,
87 exchange: Exchange,
88 error: CamelError,
89 ) -> Result<StepDisposition, CamelError>;
90
91 async fn handle_boundary(
93 &self,
94 kind: BoundaryKind,
95 exchange: Exchange,
96 error: CamelError,
97 ) -> Result<Exchange, CamelError>;
98}
99
100pub struct DefaultRouteErrorHandler {
106 pub(crate) dlc_producer: Option<SyncBoxProcessor>,
107 pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
108}
109
110impl DefaultRouteErrorHandler {
111 pub fn new(
112 dlc_producer: Option<BoxProcessor>,
113 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
114 ) -> Self {
115 Self {
116 dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
117 policies: policies
118 .into_iter()
119 .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
120 .collect(),
121 }
122 }
123
124 fn resolve_producer(
127 &self,
128 policy: Option<PolicyId>,
129 ) -> (ExceptionDisposition, Option<BoxProcessor>) {
130 match policy {
131 Some(PolicyId(idx)) => match self.policies.get(idx) {
132 Some((p, prod)) => (
133 p.disposition,
134 prod.as_ref()
135 .map(|p| p.clone_inner())
136 .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
137 ),
138 None => (
139 ExceptionDisposition::Propagate,
140 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
141 ),
142 },
143 None => (
144 ExceptionDisposition::Propagate,
145 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
146 ),
147 }
148 }
149}
150
151#[async_trait::async_trait]
152impl RouteErrorHandler for DefaultRouteErrorHandler {
153 fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
154 self.policies
155 .iter()
156 .position(|(p, _)| (p.matches)(err))
157 .map(PolicyId)
158 }
159
160 async fn retry_step(
161 &self,
162 policy: Option<PolicyId>,
163 step: &mut BoxProcessor,
164 original: Exchange,
165 error: CamelError,
166 ) -> RetryOutcome {
167 let Some(PolicyId(idx)) = policy else {
168 return RetryOutcome::Exhausted {
169 exchange: original,
170 error,
171 policy: None,
172 };
173 };
174 let Some((policy_def, _)) = self.policies.get(idx) else {
175 return RetryOutcome::Exhausted {
176 exchange: original,
177 error,
178 policy,
179 };
180 };
181 let Some(ref backoff) = policy_def.retry else {
182 return RetryOutcome::Exhausted {
183 exchange: original,
184 error,
185 policy,
186 };
187 };
188
189 for attempt in 0..backoff.max_attempts {
190 let delay = backoff.delay_for(attempt);
191 tokio::time::sleep(delay).await;
192
193 let mut ex = original.clone();
194 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
195 ex.input.set_header(
196 HEADER_REDELIVERY_COUNTER,
197 Value::Number((attempt + 1).into()),
198 );
199 ex.input.set_header(
200 HEADER_REDELIVERY_MAX_COUNTER,
201 Value::Number(backoff.max_attempts.into()),
202 );
203
204 match invoke_processor(step, ex).await {
205 Ok(exchange) => return RetryOutcome::Recovered(exchange),
206 Err(retry_err) => {
207 if attempt + 1 == backoff.max_attempts {
208 let mut final_ex = original;
209 final_ex
210 .input
211 .set_header(HEADER_REDELIVERED, Value::Bool(true));
212 final_ex.input.set_header(
213 HEADER_REDELIVERY_COUNTER,
214 Value::Number(backoff.max_attempts.into()),
215 );
216 final_ex.input.set_header(
217 HEADER_REDELIVERY_MAX_COUNTER,
218 Value::Number(backoff.max_attempts.into()),
219 );
220 return RetryOutcome::Exhausted {
221 exchange: final_ex,
222 error: retry_err,
223 policy,
224 };
225 }
226 }
227 }
228 }
229
230 RetryOutcome::Exhausted {
231 exchange: original,
232 error,
233 policy,
234 }
235 }
236
237 async fn handle_step(
238 &self,
239 policy: Option<PolicyId>,
240 mut exchange: Exchange,
241 error: CamelError,
242 ) -> Result<StepDisposition, CamelError> {
243 let (disposition, producer) = self.resolve_producer(policy);
244
245 if let Some(PolicyId(idx)) = policy
247 && let Some((p, _)) = self.policies.get(idx)
248 && let Some(ref steps) = p.on_steps
249 {
250 let snapshot = exchange.clone();
251 exchange.set_error(error.clone());
252 let mut step_pipeline = steps.clone_inner();
253 let step_result = async {
254 let svc = step_pipeline.ready().await?;
255 svc.call(exchange).await
256 }
257 .await;
258 match step_result {
259 Ok(mut ex) => match disposition {
260 ExceptionDisposition::Handled => {
261 ex.handle_error();
262 return Ok(StepDisposition::Handled(ex));
263 }
264 ExceptionDisposition::Continued => {
265 ex.clear_error();
266 return Ok(StepDisposition::Continued(ex));
267 }
268 ExceptionDisposition::Propagate => {
269 exchange = snapshot;
270 }
271 },
272 Err(_) => {
273 exchange = snapshot;
274 }
275 }
276 }
277
278 exchange.set_error(error.clone());
281 match send_to_handler(exchange, producer).await {
282 Ok(handler_ex) => match disposition {
283 ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
284 ExceptionDisposition::Handled => {
285 let mut ex = handler_ex;
286 ex.clear_error();
287 Ok(StepDisposition::Handled(ex))
288 }
289 ExceptionDisposition::Continued => {
290 let mut ex = handler_ex;
291 ex.clear_error();
292 Ok(StepDisposition::Continued(ex))
293 }
294 },
295 Err(_) => Ok(StepDisposition::Propagate(error)),
297 }
298 }
299
300 async fn handle_boundary(
301 &self,
302 _kind: BoundaryKind,
303 mut exchange: Exchange,
304 error: CamelError,
305 ) -> Result<Exchange, CamelError> {
306 let policy = self.match_policy(&error);
312 let (disposition, producer) = self.resolve_producer(policy);
313
314 if let Some(PolicyId(idx)) = policy
316 && let Some((p, _)) = self.policies.get(idx)
317 && let Some(ref steps) = p.on_steps
318 {
319 let snapshot = exchange.clone();
320 exchange.set_error(error.clone());
321 let mut step_pipeline = steps.clone_inner();
322 let step_result = async {
323 let svc = step_pipeline.ready().await?;
324 svc.call(exchange).await
325 }
326 .await;
327 match step_result {
328 Ok(mut ex) => match disposition {
329 ExceptionDisposition::Handled => {
330 ex.handle_error();
331 return Ok(ex);
332 }
333 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
334 exchange = snapshot;
335 }
336 },
337 Err(_) => {
338 exchange = snapshot;
339 }
340 }
341 }
342
343 exchange.set_error(error.clone());
345 match send_to_handler(exchange, producer).await {
346 Ok(handler_ex) => match disposition {
347 ExceptionDisposition::Handled => {
348 let mut ex = handler_ex;
349 ex.clear_error();
350 Ok(ex)
351 }
352 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
353 let mut ex = handler_ex;
354 ex.set_error(error);
355 Ok(ex)
356 }
357 },
358 Err(e) => Err(e),
360 }
361 }
362}
363
364#[deprecated(
368 since = "0.16.0",
369 note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
370)]
371pub struct ErrorHandlerLayer {
372 dlc_producer: Option<BoxProcessor>,
374 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
376}
377
378#[allow(deprecated)]
379impl ErrorHandlerLayer {
380 pub fn new(
382 dlc_producer: Option<BoxProcessor>,
383 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
384 ) -> Self {
385 Self {
386 dlc_producer,
387 policies,
388 }
389 }
390}
391
392#[allow(deprecated)]
393impl<S> Layer<S> for ErrorHandlerLayer
394where
395 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
396 S::Future: Send + 'static,
397{
398 type Service = ErrorHandlerService<S>;
399
400 fn layer(&self, inner: S) -> Self::Service {
401 ErrorHandlerService {
402 inner,
403 dlc_producer: self.dlc_producer.clone(),
404 policies: self
405 .policies
406 .iter()
407 .map(|(p, prod)| (p.clone(), prod.clone()))
408 .collect(),
409 }
410 }
411}
412
413#[deprecated(
418 since = "0.16.0",
419 note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
420)]
421pub struct ErrorHandlerService<S> {
422 inner: S,
423 dlc_producer: Option<BoxProcessor>,
424 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
425}
426
427#[allow(deprecated)]
428impl<S: Clone> Clone for ErrorHandlerService<S> {
429 fn clone(&self) -> Self {
430 Self {
431 inner: self.inner.clone(),
432 dlc_producer: self.dlc_producer.clone(),
433 policies: self
434 .policies
435 .iter()
436 .map(|(p, prod)| (p.clone(), prod.clone()))
437 .collect(),
438 }
439 }
440}
441
442#[allow(deprecated)]
443impl<S> ErrorHandlerService<S>
444where
445 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
446 S::Future: Send + 'static,
447{
448 pub fn new(
450 inner: S,
451 dlc_producer: Option<BoxProcessor>,
452 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
453 ) -> Self {
454 Self {
455 inner,
456 dlc_producer,
457 policies,
458 }
459 }
460}
461
462#[allow(deprecated)]
463impl<S> Service<Exchange> for ErrorHandlerService<S>
464where
465 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
466 S::Future: Send + 'static,
467{
468 type Response = Exchange;
469 type Error = CamelError;
470 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
471
472 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
473 match self.inner.poll_ready(cx) {
478 Poll::Pending => Poll::Pending,
479 Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
480 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
481 }
482 }
483
484 fn call(&mut self, exchange: Exchange) -> Self::Future {
485 let mut inner = self.inner.clone();
486 let dlc = self.dlc_producer.clone();
487 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
488 .policies
489 .iter()
490 .map(|(p, prod)| (p.clone(), prod.clone()))
491 .collect();
492
493 Box::pin(async move {
494 let original = exchange.clone();
495 let result = match inner.ready().await {
496 Ok(svc) => svc.call(exchange).await,
497 Err(e) => Err(e), };
499
500 let err = match result {
501 Ok(ex) => return Ok(ex),
502 Err(e) => e,
503 };
504
505 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
507
508 if let Some((policy, policy_producer)) = matched {
509 if let Some(ref backoff) = policy.retry {
511 for attempt in 0..backoff.max_attempts {
512 let delay = backoff.delay_for(attempt);
513 tokio::time::sleep(delay).await;
514
515 let mut ex = original.clone();
517 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
518 ex.input.set_header(
519 HEADER_REDELIVERY_COUNTER,
520 Value::Number((attempt + 1).into()),
521 );
522 ex.input.set_header(
523 HEADER_REDELIVERY_MAX_COUNTER,
524 Value::Number(backoff.max_attempts.into()),
525 );
526
527 let result = match inner.ready().await {
528 Ok(svc) => svc.call(ex).await,
529 Err(e) => Err(e), };
531 match result {
532 Ok(ex) => return Ok(ex),
533 Err(retry_err) => {
534 if attempt + 1 == backoff.max_attempts {
535 let mut original = original.clone();
537 original
538 .input
539 .set_header(HEADER_REDELIVERED, Value::Bool(true));
540 original.input.set_header(
541 HEADER_REDELIVERY_COUNTER,
542 Value::Number(backoff.max_attempts.into()),
543 );
544 original.input.set_header(
545 HEADER_REDELIVERY_MAX_COUNTER,
546 Value::Number(backoff.max_attempts.into()),
547 );
548 if let Some(ref steps) = policy.on_steps {
549 let handler = policy_producer.clone().or(dlc.clone());
550 return execute_on_steps(
551 original,
552 retry_err,
553 steps,
554 policy.disposition,
555 handler,
556 )
557 .await;
558 }
559 original.set_error(retry_err);
560 let handler = policy_producer.or(dlc);
561 return send_to_handler(original, handler).await;
562 }
563 }
564 }
565 }
566 }
567 if let Some(ref steps) = policy.on_steps {
569 let handler = policy_producer.or(dlc);
570 return execute_on_steps(original, err, steps, policy.disposition, handler)
571 .await;
572 }
573 let mut ex = original.clone();
574 ex.set_error(err);
575 let handler = policy_producer.or(dlc);
576 send_to_handler(ex, handler).await
577 } else {
578 let mut ex = original;
580 ex.set_error(err);
581 send_to_handler(ex, dlc).await
582 }
583 })
584 }
585}
586
587async fn send_to_handler(
588 exchange: Exchange,
589 producer: Option<BoxProcessor>,
590) -> Result<Exchange, CamelError> {
591 match producer {
592 None => {
593 tracing::error!(
595 error = ?exchange.error,
596 "Exchange failed with no error handler configured"
597 );
598 Ok(exchange)
599 }
600 Some(mut prod) => match prod.ready().await {
601 Err(e) => {
602 tracing::error!("DLC/handler not ready: {e}");
604 Ok(exchange)
605 }
606 Ok(svc) => match svc.call(exchange.clone()).await {
607 Ok(ex) => Ok(ex),
608 Err(e) => {
609 tracing::error!("DLC/handler call failed: {e}");
611 Ok(exchange)
613 }
614 },
615 },
616 }
617}
618
619#[cfg(test)]
620#[allow(deprecated)]
621mod tests {
622 use super::*;
623 use camel_api::{
624 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
625 error_handler::RedeliveryPolicy,
626 };
627 use std::sync::{
628 Arc,
629 atomic::{AtomicU32, Ordering},
630 };
631 use std::time::Duration;
632 use tower::ServiceExt;
633
634 fn make_exchange() -> Exchange {
635 Exchange::new(Message::new("test"))
636 }
637
638 fn failing_processor() -> BoxProcessor {
639 BoxProcessor::from_fn(|_ex| {
640 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
641 })
642 }
643
644 fn ok_processor() -> BoxProcessor {
645 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
646 }
647
648 fn fail_n_times(n: u32) -> BoxProcessor {
649 let count = Arc::new(AtomicU32::new(0));
650 BoxProcessor::from_fn(move |ex| {
651 let count = Arc::clone(&count);
652 Box::pin(async move {
653 let c = count.fetch_add(1, Ordering::SeqCst);
654 if c < n {
655 Err(CamelError::ProcessorError(format!("attempt {c}")))
656 } else {
657 Ok(ex)
658 }
659 })
660 })
661 }
662
663 #[tokio::test]
664 async fn test_ok_passthrough() {
665 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
666 let result = svc.oneshot(make_exchange()).await;
667 assert!(result.is_ok());
668 assert!(!result.unwrap().has_error());
669 }
670
671 #[tokio::test]
672 async fn test_error_goes_to_dlc() {
673 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
674 let received_clone = Arc::clone(&received);
675 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
676 let r = Arc::clone(&received_clone);
677 Box::pin(async move {
678 r.lock().unwrap().push(ex.clone());
679 Ok(ex)
680 })
681 });
682
683 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
684 let result = svc.oneshot(make_exchange()).await;
685 assert!(result.is_ok());
686 let ex = result.unwrap();
687 assert!(ex.has_error());
688 assert_eq!(received.lock().unwrap().len(), 1);
689 }
690
691 #[tokio::test]
692 async fn test_retry_recovers() {
693 let inner = fail_n_times(2);
694 let policy = ExceptionPolicy {
695 matches: Arc::new(|_| true),
696 retry: Some(RedeliveryPolicy {
697 max_attempts: 3,
698 initial_delay: Duration::from_millis(1),
699 multiplier: 1.0,
700 max_delay: Duration::from_millis(10),
701 jitter_factor: 0.0,
702 }),
703 handled_by: None,
704 on_steps: None,
705 disposition: ExceptionDisposition::Propagate,
706 };
707 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
708 let result = svc.oneshot(make_exchange()).await;
709 assert!(result.is_ok());
710 assert!(!result.unwrap().has_error());
711 }
712
713 #[tokio::test]
714 async fn test_retry_exhausted_goes_to_dlc() {
715 let inner = fail_n_times(10);
716 let received = Arc::new(std::sync::Mutex::new(0u32));
717 let received_clone = Arc::clone(&received);
718 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
719 let r = Arc::clone(&received_clone);
720 Box::pin(async move {
721 *r.lock().unwrap() += 1;
722 Ok(ex)
723 })
724 });
725 let policy = ExceptionPolicy {
726 matches: Arc::new(|_| true),
727 retry: Some(RedeliveryPolicy {
728 max_attempts: 2,
729 initial_delay: Duration::from_millis(1),
730 multiplier: 1.0,
731 max_delay: Duration::from_millis(10),
732 jitter_factor: 0.0,
733 }),
734 handled_by: None,
735 on_steps: None,
736 disposition: ExceptionDisposition::Propagate,
737 };
738 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
739 let result = svc.oneshot(make_exchange()).await;
740 assert!(result.is_ok());
741 assert!(result.unwrap().has_error());
742 assert_eq!(*received.lock().unwrap(), 1);
743 }
744
745 #[test]
746 fn test_poll_ready_delegates_to_inner() {
747 use std::sync::atomic::AtomicBool;
748
749 #[derive(Clone)]
751 struct DelayedReadyService {
752 ready: Arc<AtomicBool>,
753 }
754
755 impl Service<Exchange> for DelayedReadyService {
756 type Response = Exchange;
757 type Error = CamelError;
758 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
759
760 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
761 if self.ready.fetch_or(true, Ordering::SeqCst) {
762 Poll::Ready(Ok(()))
764 } else {
765 cx.waker().wake_by_ref();
767 Poll::Pending
768 }
769 }
770
771 fn call(&mut self, ex: Exchange) -> Self::Future {
772 Box::pin(async move { Ok(ex) })
773 }
774 }
775
776 let waker = futures::task::noop_waker();
777 let mut cx = Context::from_waker(&waker);
778
779 let inner = DelayedReadyService {
780 ready: Arc::new(AtomicBool::new(false)),
781 };
782 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
783
784 let first = Pin::new(&mut svc).poll_ready(&mut cx);
786 assert!(first.is_pending(), "expected Pending on first poll_ready");
787
788 let second = Pin::new(&mut svc).poll_ready(&mut cx);
790 assert!(second.is_ready(), "expected Ready on second poll_ready");
791 }
792
793 #[tokio::test]
794 async fn test_no_matching_policy_uses_dlc() {
795 let received = Arc::new(std::sync::Mutex::new(0u32));
796 let received_clone = Arc::clone(&received);
797 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
798 let r = Arc::clone(&received_clone);
799 Box::pin(async move {
800 *r.lock().unwrap() += 1;
801 Ok(ex)
802 })
803 });
804 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
805 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
806 let result = svc.oneshot(make_exchange()).await;
807 assert!(result.is_ok());
808 assert_eq!(*received.lock().unwrap(), 1);
809 }
810
811 #[tokio::test]
812 async fn test_redelivery_headers_are_set() {
813 use camel_api::error_handler::{
814 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
815 RedeliveryPolicy,
816 };
817
818 let inner = fail_n_times(10);
819 let received = Arc::new(std::sync::Mutex::new(None));
820 let received_clone = Arc::clone(&received);
821 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
822 let r = Arc::clone(&received_clone);
823 Box::pin(async move {
824 *r.lock().unwrap() = Some(ex.clone());
825 Ok(ex)
826 })
827 });
828
829 let policy = ExceptionPolicy {
830 matches: Arc::new(|_| true),
831 retry: Some(RedeliveryPolicy {
832 max_attempts: 2,
833 initial_delay: Duration::from_millis(1),
834 multiplier: 1.0,
835 max_delay: Duration::from_millis(10),
836 jitter_factor: 0.0,
837 }),
838 handled_by: None,
839 on_steps: None,
840 disposition: ExceptionDisposition::Propagate,
841 };
842
843 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
844 let _ = svc.oneshot(make_exchange()).await.unwrap();
845
846 let ex = received.lock().unwrap().take().unwrap();
847 assert_eq!(
848 ex.input.header(HEADER_REDELIVERED),
849 Some(&Value::Bool(true))
850 );
851 assert_eq!(
852 ex.input.header(HEADER_REDELIVERY_COUNTER),
853 Some(&Value::Number(2.into()))
854 );
855 assert_eq!(
856 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
857 Some(&Value::Number(2.into()))
858 );
859 }
860
861 #[tokio::test]
862 async fn test_jitter_produces_varying_delays_in_retry_flow() {
863 use std::time::Instant;
864
865 let inner = fail_n_times(10);
866 let received = Arc::new(std::sync::Mutex::new(None));
867 let received_clone = Arc::clone(&received);
868 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
869 let r = Arc::clone(&received_clone);
870 Box::pin(async move {
871 *r.lock().unwrap() = Some(ex.clone());
872 Ok(ex)
873 })
874 });
875
876 let policy = ExceptionPolicy {
877 matches: Arc::new(|_| true),
878 retry: Some(RedeliveryPolicy {
879 max_attempts: 5,
880 initial_delay: Duration::from_millis(20),
881 multiplier: 1.0,
882 max_delay: Duration::from_millis(100),
883 jitter_factor: 0.5,
884 }),
885 handled_by: None,
886 on_steps: None,
887 disposition: ExceptionDisposition::Propagate,
888 };
889
890 let start = Instant::now();
891 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
892 let _ = svc.oneshot(make_exchange()).await.unwrap();
893 let elapsed = start.elapsed();
894
895 assert!(
896 received.lock().unwrap().is_some(),
897 "DLC should have received exchange"
898 );
899
900 assert!(
901 elapsed >= Duration::from_millis(50),
902 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
903 );
904
905 assert!(
906 elapsed <= Duration::from_millis(500),
907 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
908 );
909 }
910
911 #[tokio::test]
912 async fn test_on_steps_handled_true_consumes_error() {
913 use tower::ServiceExt;
914
915 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
916 ex.input.body = camel_api::Body::Bytes("handled".into());
917 async move { Ok(ex) }
918 }));
919 let policy = ExceptionPolicy {
920 matches: Arc::new(|_| true),
921 retry: None,
922 handled_by: None,
923 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
924 disposition: ExceptionDisposition::Handled,
925 };
926 let inner = tower::service_fn(|_ex: Exchange| async {
927 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
928 });
929 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
930 let ex = Exchange::default();
931 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
932 assert!(result.error.is_none(), "handled:true should clear error");
933 assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
934 }
935
936 #[tokio::test]
937 async fn test_on_steps_handled_false_propagates_error() {
938 use tower::ServiceExt;
939
940 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
941 ex.input.body = camel_api::Body::Bytes("handled".into());
942 async move { Ok(ex) }
943 }));
944 let policy = ExceptionPolicy {
945 matches: Arc::new(|_| true),
946 retry: None,
947 handled_by: None,
948 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
949 disposition: ExceptionDisposition::Propagate,
950 };
951 let inner = tower::service_fn(|_ex: Exchange| async {
952 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
953 });
954 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
955 let ex = Exchange::default();
956 let result = svc.ready().await.unwrap().call(ex).await;
957 assert!(result.is_err(), "handled:false should propagate error");
958 }
959
960 #[derive(Clone)]
968 struct ReadinessFailService {
969 error: CamelError,
970 }
971
972 impl ReadinessFailService {
973 fn new(error: CamelError) -> Self {
974 Self { error }
975 }
976 }
977
978 impl Service<Exchange> for ReadinessFailService {
979 type Response = Exchange;
980 type Error = CamelError;
981 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
982
983 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
984 Poll::Ready(Err(self.error.clone()))
985 }
986
987 fn call(&mut self, ex: Exchange) -> Self::Future {
988 Box::pin(async move { Ok(ex) })
991 }
992 }
993
994 #[tokio::test]
995 async fn test_readiness_error_goes_to_dlc() {
996 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
997 let inner = ReadinessFailService {
998 error: readiness_err,
999 };
1000
1001 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
1002 let received_clone = Arc::clone(&received);
1003 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1004 let r = Arc::clone(&received_clone);
1005 Box::pin(async move {
1006 r.lock().unwrap().push(ex.clone());
1007 Ok(ex)
1008 })
1009 });
1010
1011 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1012 let result = svc.oneshot(make_exchange()).await;
1013
1014 assert!(
1016 result.is_ok(),
1017 "readiness error should be captured and sent to DLC, got: {:?}",
1018 result
1019 );
1020 let ex = result.unwrap();
1021 assert!(ex.has_error(), "exchange should carry the readiness error");
1022 assert_eq!(
1023 received.lock().unwrap().len(),
1024 1,
1025 "DLC should have received the exchange exactly once"
1026 );
1027 }
1028
1029 #[tokio::test]
1030 async fn test_readiness_error_goes_to_matching_policy() {
1031 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1032 let inner = ReadinessFailService {
1033 error: readiness_err,
1034 };
1035
1036 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1037 ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1038 async move { Ok(ex) }
1039 }));
1040 let policy = ExceptionPolicy {
1041 matches: Arc::new(|_| true),
1042 retry: None,
1043 handled_by: None,
1044 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1045 disposition: ExceptionDisposition::Handled,
1046 };
1047
1048 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1049 let result = svc.oneshot(make_exchange()).await;
1050
1051 assert!(
1053 result.is_ok(),
1054 "readiness error should be captured by policy, got: {:?}",
1055 result
1056 );
1057 let ex = result.unwrap();
1058 assert!(ex.error.is_none(), "handled:true should clear error");
1059 assert!(
1060 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1061 "on_steps should have modified the body"
1062 );
1063 }
1064
1065 #[test]
1066 fn test_poll_ready_converts_readiness_error_to_ok() {
1067 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1068 let inner = ReadinessFailService {
1069 error: readiness_err,
1070 };
1071 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1072
1073 let waker = futures::task::noop_waker();
1074 let mut cx = Context::from_waker(&waker);
1075
1076 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1078 match poll {
1079 Poll::Ready(Ok(())) => { }
1080 Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1081 Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1082 }
1083 }
1084
1085 #[tokio::test]
1088 async fn test_invoke_processor_returns_ok_on_success() {
1089 let mut svc = ok_processor();
1090 let ex = make_exchange();
1091 let result = invoke_processor(&mut svc, ex).await;
1092 assert!(result.is_ok());
1093 }
1094
1095 #[tokio::test]
1096 async fn test_invoke_processor_captures_readiness_error() {
1097 let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1098 CamelError::ProcessorError("not ready".into()),
1099 ));
1100 let ex = make_exchange();
1101 let result = invoke_processor(&mut failing_ready, ex).await;
1102 assert!(result.is_err());
1103 }
1104
1105 #[tokio::test]
1106 async fn test_on_steps_handled_true_clears_exception_properties() {
1107 use tower::ServiceExt;
1108
1109 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1110 ex.input.body = camel_api::Body::Bytes("handled".into());
1111 async move { Ok(ex) }
1112 }));
1113 let policy = ExceptionPolicy {
1114 matches: Arc::new(|_| true),
1115 retry: None,
1116 handled_by: None,
1117 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1118 disposition: ExceptionDisposition::Handled,
1119 };
1120 let inner = tower::service_fn(|_ex: Exchange| async {
1121 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1122 });
1123 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1124 let ex = Exchange::default();
1125 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1126 assert!(result.error.is_none(), "handled:true should clear error");
1127 assert!(
1128 result
1129 .properties
1130 .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1131 .is_none(),
1132 "handled:true should clear exception properties"
1133 );
1134 assert!(
1135 result
1136 .properties
1137 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1138 .is_none(),
1139 "handled:true should clear exception kind property"
1140 );
1141 assert!(
1142 result
1143 .properties
1144 .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1145 .is_none(),
1146 "handled:true should clear exception caught property"
1147 );
1148 }
1149
1150 #[test]
1153 fn test_match_policy_returns_id_for_matching_error() {
1154 let handler = DefaultRouteErrorHandler::new(
1155 None,
1156 vec![(
1157 ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1158 None,
1159 )],
1160 );
1161 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1162 assert_eq!(id, Some(PolicyId(0)));
1163 }
1164
1165 #[test]
1166 fn test_match_policy_returns_none_for_unmatched() {
1167 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1168 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1169 assert_eq!(id, None);
1170 }
1171
1172 #[tokio::test]
1175 async fn test_retry_step_succeeds_on_second_attempt() {
1176 let mut policy = ExceptionPolicy::new(|_| true);
1177 policy.retry = Some(RedeliveryPolicy::new(3));
1178 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1179 let mut step = fail_n_times(1); let ex = make_exchange();
1181 let outcome = handler
1182 .retry_step(
1183 Some(PolicyId(0)),
1184 &mut step,
1185 ex,
1186 CamelError::ProcessorError("attempt 0".into()),
1187 )
1188 .await;
1189 assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1190 }
1191
1192 #[tokio::test]
1193 async fn test_retry_step_exhausted_when_all_fail() {
1194 let mut policy = ExceptionPolicy::new(|_| true);
1195 policy.retry = Some(RedeliveryPolicy::new(3));
1196 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1197 let mut step = failing_processor();
1198 let ex = make_exchange();
1199 let outcome = handler
1200 .retry_step(
1201 Some(PolicyId(0)),
1202 &mut step,
1203 ex,
1204 CamelError::ProcessorError("boom".into()),
1205 )
1206 .await;
1207 assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1208 }
1209
1210 #[tokio::test]
1211 async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1212 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1213 let mut step = ok_processor();
1214 let ex = make_exchange();
1215 let outcome = handler
1216 .retry_step(
1217 None,
1218 &mut step,
1219 ex,
1220 CamelError::ProcessorError("boom".into()),
1221 )
1222 .await;
1223 assert!(matches!(
1224 outcome,
1225 RetryOutcome::Exhausted { policy: None, .. }
1226 ));
1227 }
1228
1229 #[tokio::test]
1232 async fn test_handle_step_propagate_sends_to_dlc() {
1233 let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1234 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1235 let ex = make_exchange();
1236 let result = handler
1237 .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1238 .await;
1239 assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1240 }
1241
1242 #[tokio::test]
1243 async fn test_handle_step_handled_uses_handler_output() {
1244 let handler_producer = BoxProcessor::from_fn(|mut ex| {
1245 Box::pin(async move {
1246 ex.input.set_header("processed_by", Value::Bool(true));
1247 Ok(ex)
1248 })
1249 });
1250 let policy = ExceptionPolicy {
1251 matches: std::sync::Arc::new(|_| true),
1252 retry: None,
1253 handled_by: None,
1254 on_steps: None,
1255 disposition: ExceptionDisposition::Handled,
1256 };
1257 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1258 let mut ex = make_exchange();
1259 ex.set_error(CamelError::ProcessorError("boom".into()));
1260 let result = handler
1261 .handle_step(
1262 Some(PolicyId(0)),
1263 ex,
1264 CamelError::ProcessorError("boom".into()),
1265 )
1266 .await;
1267 match result {
1268 Ok(StepDisposition::Handled(ex)) => {
1269 assert!(!ex.has_error(), "error should be cleared");
1270 assert_eq!(
1271 ex.input.header("processed_by"),
1272 Some(&Value::Bool(true)),
1273 "should use handler's output exchange"
1274 );
1275 }
1276 other => panic!("expected Handled, got {:?}", other.is_ok()),
1277 }
1278 }
1279
1280 #[tokio::test]
1281 async fn test_handle_step_continued_clears_error() {
1282 let policy = ExceptionPolicy {
1283 matches: std::sync::Arc::new(|_| true),
1284 retry: None,
1285 handled_by: None,
1286 on_steps: None,
1287 disposition: ExceptionDisposition::Continued,
1288 };
1289 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1290 let mut ex = make_exchange();
1291 ex.set_error(CamelError::ProcessorError("boom".into()));
1292 let result = handler
1293 .handle_step(
1294 Some(PolicyId(0)),
1295 ex,
1296 CamelError::ProcessorError("boom".into()),
1297 )
1298 .await;
1299 match result {
1300 Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1301 other => panic!("expected Continued, got {:?}", other.is_ok()),
1302 }
1303 }
1304
1305 #[tokio::test]
1306 async fn test_handle_step_with_on_steps_handled() {
1307 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1308 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1309 async move { Ok(ex) }
1310 }));
1311 let policy = ExceptionPolicy {
1312 matches: std::sync::Arc::new(|_| true),
1313 retry: None,
1314 handled_by: None,
1315 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1316 disposition: ExceptionDisposition::Handled,
1317 };
1318 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1319 let mut ex = make_exchange();
1320 ex.set_error(CamelError::ProcessorError("boom".into()));
1321 let result = handler
1322 .handle_step(
1323 Some(PolicyId(0)),
1324 ex,
1325 CamelError::ProcessorError("boom".into()),
1326 )
1327 .await;
1328 match result {
1329 Ok(StepDisposition::Handled(ex)) => {
1330 assert!(!ex.has_error(), "error should be cleared");
1331 assert!(
1332 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1333 "on_steps should have modified the body"
1334 );
1335 }
1336 other => panic!("expected Handled, got: {}", other.is_ok()),
1337 }
1338 }
1339
1340 #[tokio::test]
1341 async fn test_handle_step_with_on_steps_propagate_falls_through() {
1342 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1343 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1344 async move { Ok(ex) }
1345 }));
1346 let dlc_called = Arc::new(AtomicU32::new(0));
1347 let dlc_called_clone = dlc_called.clone();
1348 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1349 let c = dlc_called_clone.clone();
1350 Box::pin(async move {
1351 c.fetch_add(1, Ordering::SeqCst);
1352 Ok(ex)
1353 })
1354 });
1355 let policy = ExceptionPolicy {
1356 matches: std::sync::Arc::new(|_| true),
1357 retry: None,
1358 handled_by: None,
1359 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1360 disposition: ExceptionDisposition::Propagate,
1361 };
1362 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1363 let mut ex = make_exchange();
1364 ex.set_error(CamelError::ProcessorError("boom".into()));
1365 let result = handler
1366 .handle_step(
1367 Some(PolicyId(0)),
1368 ex,
1369 CamelError::ProcessorError("boom".into()),
1370 )
1371 .await;
1372 assert!(
1373 matches!(result, Ok(StepDisposition::Propagate(_))),
1374 "Propagate disposition should return Propagate"
1375 );
1376 assert_eq!(
1377 dlc_called.load(Ordering::SeqCst),
1378 1,
1379 "DLC should be called when on_steps disposition is Propagate"
1380 );
1381 }
1382
1383 #[tokio::test]
1384 async fn test_handle_step_dlc_failure_propagates() {
1385 let failing_dlc = BoxProcessor::from_fn(|_| {
1386 Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1387 });
1388 let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1389 let ex = make_exchange();
1390 let result = handler
1391 .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1392 .await;
1393 assert!(
1394 matches!(result, Ok(StepDisposition::Propagate(_))),
1395 "DLC failure should still return Propagate with original error"
1396 );
1397 }
1398
1399 #[tokio::test]
1402 async fn test_handle_boundary_security_error_goes_to_dlc() {
1403 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1404 let count_clone = dlc_count.clone();
1405 let dlc = BoxProcessor::from_fn(move |ex| {
1406 let c = count_clone.clone();
1407 Box::pin(async move {
1408 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1409 Ok(ex)
1410 })
1411 });
1412 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1413 let ex = make_exchange();
1414 let result = handler
1415 .handle_boundary(
1416 BoundaryKind::Security,
1417 ex,
1418 CamelError::Unauthorized("denied".into()),
1419 )
1420 .await;
1421 assert!(result.is_ok());
1422 assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1423 }
1424
1425 #[tokio::test]
1426 async fn test_handle_boundary_handled_clears_error() {
1427 let policy = ExceptionPolicy {
1428 matches: std::sync::Arc::new(|_| true),
1429 retry: None,
1430 handled_by: None,
1431 on_steps: None,
1432 disposition: ExceptionDisposition::Handled,
1433 };
1434 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1435 let ex = make_exchange();
1436 let result = handler
1437 .handle_boundary(
1438 BoundaryKind::Security,
1439 ex,
1440 CamelError::Unauthorized("denied".into()),
1441 )
1442 .await;
1443 assert!(result.is_ok());
1444 assert!(
1445 !result.unwrap().has_error(),
1446 "Handled disposition should clear error"
1447 );
1448 }
1449
1450 #[tokio::test]
1451 async fn test_handle_boundary_propagate_preserves_error() {
1452 let policy = ExceptionPolicy {
1453 matches: std::sync::Arc::new(|_| true),
1454 retry: None,
1455 handled_by: None,
1456 on_steps: None,
1457 disposition: ExceptionDisposition::Propagate,
1458 };
1459 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1460 let ex = make_exchange();
1461 let result = handler
1462 .handle_boundary(
1463 BoundaryKind::CircuitBreaker,
1464 ex,
1465 CamelError::CircuitOpen("open".into()),
1466 )
1467 .await;
1468 assert!(result.is_ok(), "boundary errors always return Ok");
1469 assert!(
1470 result.unwrap().has_error(),
1471 "Propagate disposition should preserve error"
1472 );
1473 }
1474
1475 #[tokio::test]
1476 async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1477 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1478 let count_clone = dlc_count.clone();
1479 let dlc = BoxProcessor::from_fn(move |ex| {
1480 let c = count_clone.clone();
1481 Box::pin(async move {
1482 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1483 Ok(ex)
1484 })
1485 });
1486 let policy = ExceptionPolicy {
1487 matches: std::sync::Arc::new(|_| true),
1488 retry: None,
1489 handled_by: None,
1490 on_steps: None,
1491 disposition: ExceptionDisposition::Continued,
1492 };
1493 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1494 let ex = make_exchange();
1495 let result = handler
1496 .handle_boundary(
1497 BoundaryKind::Security,
1498 ex,
1499 CamelError::Unauthorized("denied".into()),
1500 )
1501 .await;
1502 assert!(result.is_ok(), "boundary errors always return Ok");
1503 assert!(
1504 result.unwrap().has_error(),
1505 "Continued at boundary should preserve error"
1506 );
1507 assert_eq!(
1508 dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1509 1,
1510 "DLC should be called"
1511 );
1512 }
1513
1514 #[tokio::test]
1515 async fn test_handle_boundary_with_on_steps_handled() {
1516 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1517 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1518 async move { Ok(ex) }
1519 }));
1520 let policy = ExceptionPolicy {
1521 matches: std::sync::Arc::new(|_| true),
1522 retry: None,
1523 handled_by: None,
1524 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1525 disposition: ExceptionDisposition::Handled,
1526 };
1527 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1528 let ex = make_exchange();
1529 let result = handler
1530 .handle_boundary(
1531 BoundaryKind::Security,
1532 ex,
1533 CamelError::Unauthorized("denied".into()),
1534 )
1535 .await;
1536 assert!(result.is_ok(), "boundary errors always return Ok");
1537 let ex = result.unwrap();
1538 assert!(!ex.has_error(), "Handled disposition should clear error");
1539 assert!(
1540 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1541 "on_steps should have modified the body"
1542 );
1543 }
1544}