1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9 HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10 StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15 original: Exchange,
16 original_err: CamelError,
17 on_steps: &SyncBoxProcessor,
18 disposition: ExceptionDisposition,
19 handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21 let snapshot = original.clone();
22 let mut ex = original;
23 ex.set_error(original_err.clone());
24 let mut pipeline = on_steps.clone_inner();
25 let step_result = async {
26 let svc = pipeline.ready().await?;
27 svc.call(ex).await
28 }
29 .await;
30
31 match step_result {
32 Ok(mut ex) => {
33 if disposition == ExceptionDisposition::Handled {
34 ex.handle_error();
35 Ok(ex)
36 } else {
37 Err(original_err)
40 }
41 }
42 Err(_) => {
43 tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
44 let mut ex = snapshot;
45 ex.set_error(original_err);
46 send_to_handler(ex, handler).await
47 }
48 }
49}
50
51pub async fn invoke_processor(
56 svc: &mut BoxProcessor,
57 ex: Exchange,
58) -> Result<Exchange, CamelError> {
59 match svc.ready().await {
60 Ok(ready) => ready.call(ex).await,
61 Err(err) => Err(err),
62 }
63}
64
65#[async_trait::async_trait]
70pub trait RouteErrorHandler: Send + Sync {
71 fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
73
74 async fn retry_step(
76 &self,
77 policy: Option<PolicyId>,
78 step: &mut BoxProcessor,
79 original: Exchange,
80 error: CamelError,
81 ) -> RetryOutcome;
82
83 async fn handle_step(
85 &self,
86 policy: Option<PolicyId>,
87 exchange: Exchange,
88 error: CamelError,
89 ) -> Result<StepDisposition, CamelError>;
90
91 async fn handle_boundary(
93 &self,
94 kind: BoundaryKind,
95 exchange: Exchange,
96 error: CamelError,
97 ) -> Result<Exchange, CamelError>;
98}
99
100pub struct DefaultRouteErrorHandler {
106 pub(crate) dlc_producer: Option<SyncBoxProcessor>,
107 pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
108}
109
110impl DefaultRouteErrorHandler {
111 pub fn new(
112 dlc_producer: Option<BoxProcessor>,
113 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
114 ) -> Self {
115 Self {
116 dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
117 policies: policies
118 .into_iter()
119 .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
120 .collect(),
121 }
122 }
123
124 fn resolve_producer(
127 &self,
128 policy: Option<PolicyId>,
129 ) -> (ExceptionDisposition, Option<BoxProcessor>) {
130 match policy {
131 Some(PolicyId(idx)) => match self.policies.get(idx) {
132 Some((p, prod)) => (
133 p.disposition,
134 prod.as_ref()
135 .map(|p| p.clone_inner())
136 .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
137 ),
138 None => (
139 ExceptionDisposition::Propagate,
140 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
141 ),
142 },
143 None => (
144 ExceptionDisposition::Propagate,
145 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
146 ),
147 }
148 }
149}
150
151#[async_trait::async_trait]
152impl RouteErrorHandler for DefaultRouteErrorHandler {
153 fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
154 self.policies
155 .iter()
156 .position(|(p, _)| (p.matches)(err))
157 .map(PolicyId)
158 }
159
160 async fn retry_step(
161 &self,
162 policy: Option<PolicyId>,
163 step: &mut BoxProcessor,
164 original: Exchange,
165 error: CamelError,
166 ) -> RetryOutcome {
167 let Some(PolicyId(idx)) = policy else {
168 return RetryOutcome::Exhausted {
169 exchange: original,
170 error,
171 policy: None,
172 };
173 };
174 let Some((policy_def, _)) = self.policies.get(idx) else {
175 return RetryOutcome::Exhausted {
176 exchange: original,
177 error,
178 policy,
179 };
180 };
181 let Some(ref backoff) = policy_def.retry else {
182 return RetryOutcome::Exhausted {
183 exchange: original,
184 error,
185 policy,
186 };
187 };
188
189 for attempt in 0..backoff.max_attempts {
190 let delay = backoff.delay_for(attempt);
191 tokio::time::sleep(delay).await;
192
193 let mut ex = original.clone();
194 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
195 ex.input.set_header(
196 HEADER_REDELIVERY_COUNTER,
197 Value::Number((attempt + 1).into()),
198 );
199 ex.input.set_header(
200 HEADER_REDELIVERY_MAX_COUNTER,
201 Value::Number(backoff.max_attempts.into()),
202 );
203
204 match invoke_processor(step, ex).await {
205 Ok(exchange) => return RetryOutcome::Recovered(exchange),
206 Err(retry_err) => {
207 if attempt + 1 == backoff.max_attempts {
208 let mut final_ex = original;
209 final_ex
210 .input
211 .set_header(HEADER_REDELIVERED, Value::Bool(true));
212 final_ex.input.set_header(
213 HEADER_REDELIVERY_COUNTER,
214 Value::Number(backoff.max_attempts.into()),
215 );
216 final_ex.input.set_header(
217 HEADER_REDELIVERY_MAX_COUNTER,
218 Value::Number(backoff.max_attempts.into()),
219 );
220 return RetryOutcome::Exhausted {
221 exchange: final_ex,
222 error: retry_err,
223 policy,
224 };
225 }
226 }
227 }
228 }
229
230 RetryOutcome::Exhausted {
231 exchange: original,
232 error,
233 policy,
234 }
235 }
236
237 async fn handle_step(
238 &self,
239 policy: Option<PolicyId>,
240 mut exchange: Exchange,
241 error: CamelError,
242 ) -> Result<StepDisposition, CamelError> {
243 if matches!(error, CamelError::Stopped) {
244 return Ok(StepDisposition::Propagate(error));
245 }
246
247 let (disposition, producer) = self.resolve_producer(policy);
248
249 if let Some(PolicyId(idx)) = policy
251 && let Some((p, _)) = self.policies.get(idx)
252 && let Some(ref steps) = p.on_steps
253 {
254 let snapshot = exchange.clone();
255 exchange.set_error(error.clone());
256 let mut step_pipeline = steps.clone_inner();
257 let step_result = async {
258 let svc = step_pipeline.ready().await?;
259 svc.call(exchange).await
260 }
261 .await;
262 match step_result {
263 Ok(mut ex) => match disposition {
264 ExceptionDisposition::Handled => {
265 ex.handle_error();
266 return Ok(StepDisposition::Handled(ex));
267 }
268 ExceptionDisposition::Continued => {
269 ex.clear_error();
270 return Ok(StepDisposition::Continued(ex));
271 }
272 ExceptionDisposition::Propagate => {
273 exchange = snapshot;
274 }
275 },
276 Err(_) => {
277 exchange = snapshot;
278 }
279 }
280 }
281
282 exchange.set_error(error.clone());
285 match send_to_handler(exchange, producer).await {
286 Ok(handler_ex) => match disposition {
287 ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
288 ExceptionDisposition::Handled => {
289 let mut ex = handler_ex;
290 ex.clear_error();
291 Ok(StepDisposition::Handled(ex))
292 }
293 ExceptionDisposition::Continued => {
294 let mut ex = handler_ex;
295 ex.clear_error();
296 Ok(StepDisposition::Continued(ex))
297 }
298 },
299 Err(_) => Ok(StepDisposition::Propagate(error)),
301 }
302 }
303
304 async fn handle_boundary(
305 &self,
306 _kind: BoundaryKind,
307 mut exchange: Exchange,
308 error: CamelError,
309 ) -> Result<Exchange, CamelError> {
310 let policy = self.match_policy(&error);
316 let (disposition, producer) = self.resolve_producer(policy);
317
318 if let Some(PolicyId(idx)) = policy
320 && let Some((p, _)) = self.policies.get(idx)
321 && let Some(ref steps) = p.on_steps
322 {
323 let snapshot = exchange.clone();
324 exchange.set_error(error.clone());
325 let mut step_pipeline = steps.clone_inner();
326 let step_result = async {
327 let svc = step_pipeline.ready().await?;
328 svc.call(exchange).await
329 }
330 .await;
331 match step_result {
332 Ok(mut ex) => match disposition {
333 ExceptionDisposition::Handled => {
334 ex.handle_error();
335 return Ok(ex);
336 }
337 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
338 exchange = snapshot;
339 }
340 },
341 Err(_) => {
342 exchange = snapshot;
343 }
344 }
345 }
346
347 exchange.set_error(error.clone());
349 match send_to_handler(exchange, producer).await {
350 Ok(handler_ex) => match disposition {
351 ExceptionDisposition::Handled => {
352 let mut ex = handler_ex;
353 ex.clear_error();
354 Ok(ex)
355 }
356 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
357 let mut ex = handler_ex;
358 ex.set_error(error);
359 Ok(ex)
360 }
361 },
362 Err(e) => Err(e),
364 }
365 }
366}
367
368#[deprecated(
372 since = "0.16.0",
373 note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
374)]
375pub struct ErrorHandlerLayer {
376 dlc_producer: Option<BoxProcessor>,
378 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
380}
381
382#[allow(deprecated)]
383impl ErrorHandlerLayer {
384 pub fn new(
386 dlc_producer: Option<BoxProcessor>,
387 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
388 ) -> Self {
389 Self {
390 dlc_producer,
391 policies,
392 }
393 }
394}
395
396#[allow(deprecated)]
397impl<S> Layer<S> for ErrorHandlerLayer
398where
399 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
400 S::Future: Send + 'static,
401{
402 type Service = ErrorHandlerService<S>;
403
404 fn layer(&self, inner: S) -> Self::Service {
405 ErrorHandlerService {
406 inner,
407 dlc_producer: self.dlc_producer.clone(),
408 policies: self
409 .policies
410 .iter()
411 .map(|(p, prod)| (p.clone(), prod.clone()))
412 .collect(),
413 }
414 }
415}
416
417#[deprecated(
422 since = "0.16.0",
423 note = "Use RouteChannelService + DefaultRouteErrorHandler instead"
424)]
425pub struct ErrorHandlerService<S> {
426 inner: S,
427 dlc_producer: Option<BoxProcessor>,
428 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
429}
430
431#[allow(deprecated)]
432impl<S: Clone> Clone for ErrorHandlerService<S> {
433 fn clone(&self) -> Self {
434 Self {
435 inner: self.inner.clone(),
436 dlc_producer: self.dlc_producer.clone(),
437 policies: self
438 .policies
439 .iter()
440 .map(|(p, prod)| (p.clone(), prod.clone()))
441 .collect(),
442 }
443 }
444}
445
446#[allow(deprecated)]
447impl<S> ErrorHandlerService<S>
448where
449 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
450 S::Future: Send + 'static,
451{
452 pub fn new(
454 inner: S,
455 dlc_producer: Option<BoxProcessor>,
456 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
457 ) -> Self {
458 Self {
459 inner,
460 dlc_producer,
461 policies,
462 }
463 }
464}
465
466#[allow(deprecated)]
467impl<S> Service<Exchange> for ErrorHandlerService<S>
468where
469 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
470 S::Future: Send + 'static,
471{
472 type Response = Exchange;
473 type Error = CamelError;
474 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
475
476 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
477 match self.inner.poll_ready(cx) {
482 Poll::Pending => Poll::Pending,
483 Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
484 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
485 }
486 }
487
488 fn call(&mut self, exchange: Exchange) -> Self::Future {
489 let mut inner = self.inner.clone();
490 let dlc = self.dlc_producer.clone();
491 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
492 .policies
493 .iter()
494 .map(|(p, prod)| (p.clone(), prod.clone()))
495 .collect();
496
497 Box::pin(async move {
498 let original = exchange.clone();
499 let result = match inner.ready().await {
500 Ok(svc) => svc.call(exchange).await,
501 Err(e) => Err(e), };
503
504 let err = match result {
505 Ok(ex) => return Ok(ex),
506 Err(e) => e,
507 };
508
509 if matches!(err, CamelError::Stopped) {
511 return Err(err);
512 }
513
514 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
516
517 if let Some((policy, policy_producer)) = matched {
518 if let Some(ref backoff) = policy.retry {
520 for attempt in 0..backoff.max_attempts {
521 let delay = backoff.delay_for(attempt);
522 tokio::time::sleep(delay).await;
523
524 let mut ex = original.clone();
526 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
527 ex.input.set_header(
528 HEADER_REDELIVERY_COUNTER,
529 Value::Number((attempt + 1).into()),
530 );
531 ex.input.set_header(
532 HEADER_REDELIVERY_MAX_COUNTER,
533 Value::Number(backoff.max_attempts.into()),
534 );
535
536 let result = match inner.ready().await {
537 Ok(svc) => svc.call(ex).await,
538 Err(e) => Err(e), };
540 match result {
541 Ok(ex) => return Ok(ex),
542 Err(retry_err) => {
543 if attempt + 1 == backoff.max_attempts {
544 let mut original = original.clone();
546 original
547 .input
548 .set_header(HEADER_REDELIVERED, Value::Bool(true));
549 original.input.set_header(
550 HEADER_REDELIVERY_COUNTER,
551 Value::Number(backoff.max_attempts.into()),
552 );
553 original.input.set_header(
554 HEADER_REDELIVERY_MAX_COUNTER,
555 Value::Number(backoff.max_attempts.into()),
556 );
557 if let Some(ref steps) = policy.on_steps {
558 let handler = policy_producer.clone().or(dlc.clone());
559 return execute_on_steps(
560 original,
561 retry_err,
562 steps,
563 policy.disposition,
564 handler,
565 )
566 .await;
567 }
568 original.set_error(retry_err);
569 let handler = policy_producer.or(dlc);
570 return send_to_handler(original, handler).await;
571 }
572 }
573 }
574 }
575 }
576 if let Some(ref steps) = policy.on_steps {
578 let handler = policy_producer.or(dlc);
579 return execute_on_steps(original, err, steps, policy.disposition, handler)
580 .await;
581 }
582 let mut ex = original.clone();
583 ex.set_error(err);
584 let handler = policy_producer.or(dlc);
585 send_to_handler(ex, handler).await
586 } else {
587 let mut ex = original;
589 ex.set_error(err);
590 send_to_handler(ex, dlc).await
591 }
592 })
593 }
594}
595
596async fn send_to_handler(
597 exchange: Exchange,
598 producer: Option<BoxProcessor>,
599) -> Result<Exchange, CamelError> {
600 match producer {
601 None => {
602 tracing::error!(
604 error = ?exchange.error,
605 "Exchange failed with no error handler configured"
606 );
607 Ok(exchange)
608 }
609 Some(mut prod) => match prod.ready().await {
610 Err(e) => {
611 tracing::error!("DLC/handler not ready: {e}");
613 Ok(exchange)
614 }
615 Ok(svc) => match svc.call(exchange.clone()).await {
616 Ok(ex) => Ok(ex),
617 Err(e) => {
618 tracing::error!("DLC/handler call failed: {e}");
620 Ok(exchange)
622 }
623 },
624 },
625 }
626}
627
628#[cfg(test)]
629#[allow(deprecated)]
630mod tests {
631 use super::*;
632 use camel_api::{
633 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, SyncBoxProcessor, Value,
634 error_handler::RedeliveryPolicy,
635 };
636 use std::sync::{
637 Arc,
638 atomic::{AtomicU32, Ordering},
639 };
640 use std::time::Duration;
641 use tower::ServiceExt;
642
643 fn make_exchange() -> Exchange {
644 Exchange::new(Message::new("test"))
645 }
646
647 fn failing_processor() -> BoxProcessor {
648 BoxProcessor::from_fn(|_ex| {
649 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
650 })
651 }
652
653 fn ok_processor() -> BoxProcessor {
654 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
655 }
656
657 fn fail_n_times(n: u32) -> BoxProcessor {
658 let count = Arc::new(AtomicU32::new(0));
659 BoxProcessor::from_fn(move |ex| {
660 let count = Arc::clone(&count);
661 Box::pin(async move {
662 let c = count.fetch_add(1, Ordering::SeqCst);
663 if c < n {
664 Err(CamelError::ProcessorError(format!("attempt {c}")))
665 } else {
666 Ok(ex)
667 }
668 })
669 })
670 }
671
672 #[tokio::test]
673 async fn test_ok_passthrough() {
674 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
675 let result = svc.oneshot(make_exchange()).await;
676 assert!(result.is_ok());
677 assert!(!result.unwrap().has_error());
678 }
679
680 #[tokio::test]
681 async fn test_error_goes_to_dlc() {
682 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
683 let received_clone = Arc::clone(&received);
684 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
685 let r = Arc::clone(&received_clone);
686 Box::pin(async move {
687 r.lock().unwrap().push(ex.clone());
688 Ok(ex)
689 })
690 });
691
692 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
693 let result = svc.oneshot(make_exchange()).await;
694 assert!(result.is_ok());
695 let ex = result.unwrap();
696 assert!(ex.has_error());
697 assert_eq!(received.lock().unwrap().len(), 1);
698 }
699
700 #[tokio::test]
701 async fn test_retry_recovers() {
702 let inner = fail_n_times(2);
703 let policy = ExceptionPolicy {
704 matches: Arc::new(|_| true),
705 retry: Some(RedeliveryPolicy {
706 max_attempts: 3,
707 initial_delay: Duration::from_millis(1),
708 multiplier: 1.0,
709 max_delay: Duration::from_millis(10),
710 jitter_factor: 0.0,
711 }),
712 handled_by: None,
713 on_steps: None,
714 disposition: ExceptionDisposition::Propagate,
715 };
716 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
717 let result = svc.oneshot(make_exchange()).await;
718 assert!(result.is_ok());
719 assert!(!result.unwrap().has_error());
720 }
721
722 #[tokio::test]
723 async fn test_retry_exhausted_goes_to_dlc() {
724 let inner = fail_n_times(10);
725 let received = Arc::new(std::sync::Mutex::new(0u32));
726 let received_clone = Arc::clone(&received);
727 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
728 let r = Arc::clone(&received_clone);
729 Box::pin(async move {
730 *r.lock().unwrap() += 1;
731 Ok(ex)
732 })
733 });
734 let policy = ExceptionPolicy {
735 matches: Arc::new(|_| true),
736 retry: Some(RedeliveryPolicy {
737 max_attempts: 2,
738 initial_delay: Duration::from_millis(1),
739 multiplier: 1.0,
740 max_delay: Duration::from_millis(10),
741 jitter_factor: 0.0,
742 }),
743 handled_by: None,
744 on_steps: None,
745 disposition: ExceptionDisposition::Propagate,
746 };
747 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
748 let result = svc.oneshot(make_exchange()).await;
749 assert!(result.is_ok());
750 assert!(result.unwrap().has_error());
751 assert_eq!(*received.lock().unwrap(), 1);
752 }
753
754 #[test]
755 fn test_poll_ready_delegates_to_inner() {
756 use std::sync::atomic::AtomicBool;
757
758 #[derive(Clone)]
760 struct DelayedReadyService {
761 ready: Arc<AtomicBool>,
762 }
763
764 impl Service<Exchange> for DelayedReadyService {
765 type Response = Exchange;
766 type Error = CamelError;
767 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
768
769 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
770 if self.ready.fetch_or(true, Ordering::SeqCst) {
771 Poll::Ready(Ok(()))
773 } else {
774 cx.waker().wake_by_ref();
776 Poll::Pending
777 }
778 }
779
780 fn call(&mut self, ex: Exchange) -> Self::Future {
781 Box::pin(async move { Ok(ex) })
782 }
783 }
784
785 let waker = futures::task::noop_waker();
786 let mut cx = Context::from_waker(&waker);
787
788 let inner = DelayedReadyService {
789 ready: Arc::new(AtomicBool::new(false)),
790 };
791 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
792
793 let first = Pin::new(&mut svc).poll_ready(&mut cx);
795 assert!(first.is_pending(), "expected Pending on first poll_ready");
796
797 let second = Pin::new(&mut svc).poll_ready(&mut cx);
799 assert!(second.is_ready(), "expected Ready on second poll_ready");
800 }
801
802 #[tokio::test]
803 async fn test_no_matching_policy_uses_dlc() {
804 let received = Arc::new(std::sync::Mutex::new(0u32));
805 let received_clone = Arc::clone(&received);
806 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
807 let r = Arc::clone(&received_clone);
808 Box::pin(async move {
809 *r.lock().unwrap() += 1;
810 Ok(ex)
811 })
812 });
813 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
814 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
815 let result = svc.oneshot(make_exchange()).await;
816 assert!(result.is_ok());
817 assert_eq!(*received.lock().unwrap(), 1);
818 }
819
820 #[tokio::test]
821 async fn test_redelivery_headers_are_set() {
822 use camel_api::error_handler::{
823 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
824 RedeliveryPolicy,
825 };
826
827 let inner = fail_n_times(10);
828 let received = Arc::new(std::sync::Mutex::new(None));
829 let received_clone = Arc::clone(&received);
830 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
831 let r = Arc::clone(&received_clone);
832 Box::pin(async move {
833 *r.lock().unwrap() = Some(ex.clone());
834 Ok(ex)
835 })
836 });
837
838 let policy = ExceptionPolicy {
839 matches: Arc::new(|_| true),
840 retry: Some(RedeliveryPolicy {
841 max_attempts: 2,
842 initial_delay: Duration::from_millis(1),
843 multiplier: 1.0,
844 max_delay: Duration::from_millis(10),
845 jitter_factor: 0.0,
846 }),
847 handled_by: None,
848 on_steps: None,
849 disposition: ExceptionDisposition::Propagate,
850 };
851
852 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
853 let _ = svc.oneshot(make_exchange()).await.unwrap();
854
855 let ex = received.lock().unwrap().take().unwrap();
856 assert_eq!(
857 ex.input.header(HEADER_REDELIVERED),
858 Some(&Value::Bool(true))
859 );
860 assert_eq!(
861 ex.input.header(HEADER_REDELIVERY_COUNTER),
862 Some(&Value::Number(2.into()))
863 );
864 assert_eq!(
865 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
866 Some(&Value::Number(2.into()))
867 );
868 }
869
870 #[tokio::test]
871 async fn test_jitter_produces_varying_delays_in_retry_flow() {
872 use std::time::Instant;
873
874 let inner = fail_n_times(10);
875 let received = Arc::new(std::sync::Mutex::new(None));
876 let received_clone = Arc::clone(&received);
877 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
878 let r = Arc::clone(&received_clone);
879 Box::pin(async move {
880 *r.lock().unwrap() = Some(ex.clone());
881 Ok(ex)
882 })
883 });
884
885 let policy = ExceptionPolicy {
886 matches: Arc::new(|_| true),
887 retry: Some(RedeliveryPolicy {
888 max_attempts: 5,
889 initial_delay: Duration::from_millis(20),
890 multiplier: 1.0,
891 max_delay: Duration::from_millis(100),
892 jitter_factor: 0.5,
893 }),
894 handled_by: None,
895 on_steps: None,
896 disposition: ExceptionDisposition::Propagate,
897 };
898
899 let start = Instant::now();
900 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
901 let _ = svc.oneshot(make_exchange()).await.unwrap();
902 let elapsed = start.elapsed();
903
904 assert!(
905 received.lock().unwrap().is_some(),
906 "DLC should have received exchange"
907 );
908
909 assert!(
910 elapsed >= Duration::from_millis(50),
911 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
912 );
913
914 assert!(
915 elapsed <= Duration::from_millis(500),
916 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
917 );
918 }
919
920 #[tokio::test]
923 async fn test_stopped_bypasses_error_handler() {
924 let stopped_inner =
925 BoxProcessor::from_fn(|_ex| Box::pin(async { Err(CamelError::Stopped) }));
926
927 let dlc_called = Arc::new(std::sync::atomic::AtomicBool::new(false));
929 let dlc_called_clone = Arc::clone(&dlc_called);
930 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
931 dlc_called_clone.store(true, std::sync::atomic::Ordering::SeqCst);
932 Box::pin(async move { Ok(ex) })
933 });
934
935 let policy = ExceptionPolicy::new(|_| true); let svc = ErrorHandlerService::new(stopped_inner, Some(dlc), vec![(policy, None)]);
937 let result = svc.oneshot(make_exchange()).await;
938
939 assert!(
941 matches!(result, Err(CamelError::Stopped)),
942 "expected Err(Stopped), got: {:?}",
943 result
944 );
945 assert!(
947 !dlc_called.load(std::sync::atomic::Ordering::SeqCst),
948 "DLC should not be called for Stopped"
949 );
950 }
951
952 #[tokio::test]
953 async fn test_on_steps_handled_true_consumes_error() {
954 use tower::ServiceExt;
955
956 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
957 ex.input.body = camel_api::Body::Bytes("handled".into());
958 async move { Ok(ex) }
959 }));
960 let policy = ExceptionPolicy {
961 matches: Arc::new(|_| true),
962 retry: None,
963 handled_by: None,
964 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
965 disposition: ExceptionDisposition::Handled,
966 };
967 let inner = tower::service_fn(|_ex: Exchange| async {
968 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
969 });
970 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
971 let ex = Exchange::default();
972 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
973 assert!(result.error.is_none(), "handled:true should clear error");
974 assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
975 }
976
977 #[tokio::test]
978 async fn test_on_steps_handled_false_propagates_error() {
979 use tower::ServiceExt;
980
981 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
982 ex.input.body = camel_api::Body::Bytes("handled".into());
983 async move { Ok(ex) }
984 }));
985 let policy = ExceptionPolicy {
986 matches: Arc::new(|_| true),
987 retry: None,
988 handled_by: None,
989 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
990 disposition: ExceptionDisposition::Propagate,
991 };
992 let inner = tower::service_fn(|_ex: Exchange| async {
993 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
994 });
995 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
996 let ex = Exchange::default();
997 let result = svc.ready().await.unwrap().call(ex).await;
998 assert!(result.is_err(), "handled:false should propagate error");
999 }
1000
1001 #[derive(Clone)]
1009 struct ReadinessFailService {
1010 error: CamelError,
1011 }
1012
1013 impl ReadinessFailService {
1014 fn new(error: CamelError) -> Self {
1015 Self { error }
1016 }
1017 }
1018
1019 impl Service<Exchange> for ReadinessFailService {
1020 type Response = Exchange;
1021 type Error = CamelError;
1022 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1023
1024 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1025 Poll::Ready(Err(self.error.clone()))
1026 }
1027
1028 fn call(&mut self, ex: Exchange) -> Self::Future {
1029 Box::pin(async move { Ok(ex) })
1032 }
1033 }
1034
1035 #[tokio::test]
1036 async fn test_readiness_error_goes_to_dlc() {
1037 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1038 let inner = ReadinessFailService {
1039 error: readiness_err,
1040 };
1041
1042 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
1043 let received_clone = Arc::clone(&received);
1044 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1045 let r = Arc::clone(&received_clone);
1046 Box::pin(async move {
1047 r.lock().unwrap().push(ex.clone());
1048 Ok(ex)
1049 })
1050 });
1051
1052 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1053 let result = svc.oneshot(make_exchange()).await;
1054
1055 assert!(
1057 result.is_ok(),
1058 "readiness error should be captured and sent to DLC, got: {:?}",
1059 result
1060 );
1061 let ex = result.unwrap();
1062 assert!(ex.has_error(), "exchange should carry the readiness error");
1063 assert_eq!(
1064 received.lock().unwrap().len(),
1065 1,
1066 "DLC should have received the exchange exactly once"
1067 );
1068 }
1069
1070 #[tokio::test]
1071 async fn test_readiness_error_goes_to_matching_policy() {
1072 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1073 let inner = ReadinessFailService {
1074 error: readiness_err,
1075 };
1076
1077 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1078 ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1079 async move { Ok(ex) }
1080 }));
1081 let policy = ExceptionPolicy {
1082 matches: Arc::new(|_| true),
1083 retry: None,
1084 handled_by: None,
1085 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1086 disposition: ExceptionDisposition::Handled,
1087 };
1088
1089 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1090 let result = svc.oneshot(make_exchange()).await;
1091
1092 assert!(
1094 result.is_ok(),
1095 "readiness error should be captured by policy, got: {:?}",
1096 result
1097 );
1098 let ex = result.unwrap();
1099 assert!(ex.error.is_none(), "handled:true should clear error");
1100 assert!(
1101 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1102 "on_steps should have modified the body"
1103 );
1104 }
1105
1106 #[test]
1107 fn test_poll_ready_converts_readiness_error_to_ok() {
1108 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1109 let inner = ReadinessFailService {
1110 error: readiness_err,
1111 };
1112 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1113
1114 let waker = futures::task::noop_waker();
1115 let mut cx = Context::from_waker(&waker);
1116
1117 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1119 match poll {
1120 Poll::Ready(Ok(())) => { }
1121 Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1122 Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1123 }
1124 }
1125
1126 #[tokio::test]
1129 async fn test_invoke_processor_returns_ok_on_success() {
1130 let mut svc = ok_processor();
1131 let ex = make_exchange();
1132 let result = invoke_processor(&mut svc, ex).await;
1133 assert!(result.is_ok());
1134 }
1135
1136 #[tokio::test]
1137 async fn test_invoke_processor_captures_readiness_error() {
1138 let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1139 CamelError::ProcessorError("not ready".into()),
1140 ));
1141 let ex = make_exchange();
1142 let result = invoke_processor(&mut failing_ready, ex).await;
1143 assert!(result.is_err());
1144 }
1145
1146 #[tokio::test]
1147 async fn test_on_steps_handled_true_clears_exception_properties() {
1148 use tower::ServiceExt;
1149
1150 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1151 ex.input.body = camel_api::Body::Bytes("handled".into());
1152 async move { Ok(ex) }
1153 }));
1154 let policy = ExceptionPolicy {
1155 matches: Arc::new(|_| true),
1156 retry: None,
1157 handled_by: None,
1158 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1159 disposition: ExceptionDisposition::Handled,
1160 };
1161 let inner = tower::service_fn(|_ex: Exchange| async {
1162 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1163 });
1164 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1165 let ex = Exchange::default();
1166 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1167 assert!(result.error.is_none(), "handled:true should clear error");
1168 assert!(
1169 result
1170 .properties
1171 .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1172 .is_none(),
1173 "handled:true should clear exception properties"
1174 );
1175 assert!(
1176 result
1177 .properties
1178 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1179 .is_none(),
1180 "handled:true should clear exception kind property"
1181 );
1182 assert!(
1183 result
1184 .properties
1185 .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1186 .is_none(),
1187 "handled:true should clear exception caught property"
1188 );
1189 }
1190
1191 #[test]
1194 fn test_match_policy_returns_id_for_matching_error() {
1195 let handler = DefaultRouteErrorHandler::new(
1196 None,
1197 vec![(
1198 ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1199 None,
1200 )],
1201 );
1202 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1203 assert_eq!(id, Some(PolicyId(0)));
1204 }
1205
1206 #[test]
1207 fn test_match_policy_returns_none_for_unmatched() {
1208 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1209 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1210 assert_eq!(id, None);
1211 }
1212
1213 #[tokio::test]
1216 async fn test_retry_step_succeeds_on_second_attempt() {
1217 let mut policy = ExceptionPolicy::new(|_| true);
1218 policy.retry = Some(RedeliveryPolicy::new(3));
1219 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1220 let mut step = fail_n_times(1); let ex = make_exchange();
1222 let outcome = handler
1223 .retry_step(
1224 Some(PolicyId(0)),
1225 &mut step,
1226 ex,
1227 CamelError::ProcessorError("attempt 0".into()),
1228 )
1229 .await;
1230 assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1231 }
1232
1233 #[tokio::test]
1234 async fn test_retry_step_exhausted_when_all_fail() {
1235 let mut policy = ExceptionPolicy::new(|_| true);
1236 policy.retry = Some(RedeliveryPolicy::new(3));
1237 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1238 let mut step = failing_processor();
1239 let ex = make_exchange();
1240 let outcome = handler
1241 .retry_step(
1242 Some(PolicyId(0)),
1243 &mut step,
1244 ex,
1245 CamelError::ProcessorError("boom".into()),
1246 )
1247 .await;
1248 assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1249 }
1250
1251 #[tokio::test]
1252 async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1253 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1254 let mut step = ok_processor();
1255 let ex = make_exchange();
1256 let outcome = handler
1257 .retry_step(
1258 None,
1259 &mut step,
1260 ex,
1261 CamelError::ProcessorError("boom".into()),
1262 )
1263 .await;
1264 assert!(matches!(
1265 outcome,
1266 RetryOutcome::Exhausted { policy: None, .. }
1267 ));
1268 }
1269
1270 #[tokio::test]
1273 async fn test_handle_step_propagate_sends_to_dlc() {
1274 let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1275 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1276 let ex = make_exchange();
1277 let result = handler
1278 .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1279 .await;
1280 assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1281 }
1282
1283 #[tokio::test]
1284 async fn test_handle_step_handled_uses_handler_output() {
1285 let handler_producer = BoxProcessor::from_fn(|mut ex| {
1286 Box::pin(async move {
1287 ex.input.set_header("processed_by", Value::Bool(true));
1288 Ok(ex)
1289 })
1290 });
1291 let policy = ExceptionPolicy {
1292 matches: std::sync::Arc::new(|_| true),
1293 retry: None,
1294 handled_by: None,
1295 on_steps: None,
1296 disposition: ExceptionDisposition::Handled,
1297 };
1298 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1299 let mut ex = make_exchange();
1300 ex.set_error(CamelError::ProcessorError("boom".into()));
1301 let result = handler
1302 .handle_step(
1303 Some(PolicyId(0)),
1304 ex,
1305 CamelError::ProcessorError("boom".into()),
1306 )
1307 .await;
1308 match result {
1309 Ok(StepDisposition::Handled(ex)) => {
1310 assert!(!ex.has_error(), "error should be cleared");
1311 assert_eq!(
1312 ex.input.header("processed_by"),
1313 Some(&Value::Bool(true)),
1314 "should use handler's output exchange"
1315 );
1316 }
1317 other => panic!("expected Handled, got {:?}", other.is_ok()),
1318 }
1319 }
1320
1321 #[tokio::test]
1322 async fn test_handle_step_continued_clears_error() {
1323 let policy = ExceptionPolicy {
1324 matches: std::sync::Arc::new(|_| true),
1325 retry: None,
1326 handled_by: None,
1327 on_steps: None,
1328 disposition: ExceptionDisposition::Continued,
1329 };
1330 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1331 let mut ex = make_exchange();
1332 ex.set_error(CamelError::ProcessorError("boom".into()));
1333 let result = handler
1334 .handle_step(
1335 Some(PolicyId(0)),
1336 ex,
1337 CamelError::ProcessorError("boom".into()),
1338 )
1339 .await;
1340 match result {
1341 Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1342 other => panic!("expected Continued, got {:?}", other.is_ok()),
1343 }
1344 }
1345
1346 #[tokio::test]
1347 async fn test_handle_step_stopped_propagates_immediately() {
1348 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1349 let ex = make_exchange();
1350 let result = handler.handle_step(None, ex, CamelError::Stopped).await;
1351 assert!(
1352 matches!(result, Ok(StepDisposition::Propagate(CamelError::Stopped))),
1353 "Stopped should propagate immediately"
1354 );
1355 }
1356
1357 #[tokio::test]
1358 async fn test_handle_step_with_on_steps_handled() {
1359 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1360 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1361 async move { Ok(ex) }
1362 }));
1363 let policy = ExceptionPolicy {
1364 matches: std::sync::Arc::new(|_| true),
1365 retry: None,
1366 handled_by: None,
1367 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1368 disposition: ExceptionDisposition::Handled,
1369 };
1370 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1371 let mut ex = make_exchange();
1372 ex.set_error(CamelError::ProcessorError("boom".into()));
1373 let result = handler
1374 .handle_step(
1375 Some(PolicyId(0)),
1376 ex,
1377 CamelError::ProcessorError("boom".into()),
1378 )
1379 .await;
1380 match result {
1381 Ok(StepDisposition::Handled(ex)) => {
1382 assert!(!ex.has_error(), "error should be cleared");
1383 assert!(
1384 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1385 "on_steps should have modified the body"
1386 );
1387 }
1388 other => panic!("expected Handled, got: {}", other.is_ok()),
1389 }
1390 }
1391
1392 #[tokio::test]
1393 async fn test_handle_step_with_on_steps_propagate_falls_through() {
1394 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1395 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1396 async move { Ok(ex) }
1397 }));
1398 let dlc_called = Arc::new(AtomicU32::new(0));
1399 let dlc_called_clone = dlc_called.clone();
1400 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1401 let c = dlc_called_clone.clone();
1402 Box::pin(async move {
1403 c.fetch_add(1, Ordering::SeqCst);
1404 Ok(ex)
1405 })
1406 });
1407 let policy = ExceptionPolicy {
1408 matches: std::sync::Arc::new(|_| true),
1409 retry: None,
1410 handled_by: None,
1411 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1412 disposition: ExceptionDisposition::Propagate,
1413 };
1414 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1415 let mut ex = make_exchange();
1416 ex.set_error(CamelError::ProcessorError("boom".into()));
1417 let result = handler
1418 .handle_step(
1419 Some(PolicyId(0)),
1420 ex,
1421 CamelError::ProcessorError("boom".into()),
1422 )
1423 .await;
1424 assert!(
1425 matches!(result, Ok(StepDisposition::Propagate(_))),
1426 "Propagate disposition should return Propagate"
1427 );
1428 assert_eq!(
1429 dlc_called.load(Ordering::SeqCst),
1430 1,
1431 "DLC should be called when on_steps disposition is Propagate"
1432 );
1433 }
1434
1435 #[tokio::test]
1436 async fn test_handle_step_dlc_failure_propagates() {
1437 let failing_dlc = BoxProcessor::from_fn(|_| {
1438 Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1439 });
1440 let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1441 let ex = make_exchange();
1442 let result = handler
1443 .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1444 .await;
1445 assert!(
1446 matches!(result, Ok(StepDisposition::Propagate(_))),
1447 "DLC failure should still return Propagate with original error"
1448 );
1449 }
1450
1451 #[tokio::test]
1454 async fn test_handle_boundary_security_error_goes_to_dlc() {
1455 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1456 let count_clone = dlc_count.clone();
1457 let dlc = BoxProcessor::from_fn(move |ex| {
1458 let c = count_clone.clone();
1459 Box::pin(async move {
1460 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1461 Ok(ex)
1462 })
1463 });
1464 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1465 let ex = make_exchange();
1466 let result = handler
1467 .handle_boundary(
1468 BoundaryKind::Security,
1469 ex,
1470 CamelError::Unauthorized("denied".into()),
1471 )
1472 .await;
1473 assert!(result.is_ok());
1474 assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1475 }
1476
1477 #[tokio::test]
1478 async fn test_handle_boundary_handled_clears_error() {
1479 let policy = ExceptionPolicy {
1480 matches: std::sync::Arc::new(|_| true),
1481 retry: None,
1482 handled_by: None,
1483 on_steps: None,
1484 disposition: ExceptionDisposition::Handled,
1485 };
1486 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1487 let ex = make_exchange();
1488 let result = handler
1489 .handle_boundary(
1490 BoundaryKind::Security,
1491 ex,
1492 CamelError::Unauthorized("denied".into()),
1493 )
1494 .await;
1495 assert!(result.is_ok());
1496 assert!(
1497 !result.unwrap().has_error(),
1498 "Handled disposition should clear error"
1499 );
1500 }
1501
1502 #[tokio::test]
1503 async fn test_handle_boundary_propagate_preserves_error() {
1504 let policy = ExceptionPolicy {
1505 matches: std::sync::Arc::new(|_| true),
1506 retry: None,
1507 handled_by: None,
1508 on_steps: None,
1509 disposition: ExceptionDisposition::Propagate,
1510 };
1511 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1512 let ex = make_exchange();
1513 let result = handler
1514 .handle_boundary(
1515 BoundaryKind::CircuitBreaker,
1516 ex,
1517 CamelError::CircuitOpen("open".into()),
1518 )
1519 .await;
1520 assert!(result.is_ok(), "boundary errors always return Ok");
1521 assert!(
1522 result.unwrap().has_error(),
1523 "Propagate disposition should preserve error"
1524 );
1525 }
1526
1527 #[tokio::test]
1528 async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1529 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1530 let count_clone = dlc_count.clone();
1531 let dlc = BoxProcessor::from_fn(move |ex| {
1532 let c = count_clone.clone();
1533 Box::pin(async move {
1534 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1535 Ok(ex)
1536 })
1537 });
1538 let policy = ExceptionPolicy {
1539 matches: std::sync::Arc::new(|_| true),
1540 retry: None,
1541 handled_by: None,
1542 on_steps: None,
1543 disposition: ExceptionDisposition::Continued,
1544 };
1545 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1546 let ex = make_exchange();
1547 let result = handler
1548 .handle_boundary(
1549 BoundaryKind::Security,
1550 ex,
1551 CamelError::Unauthorized("denied".into()),
1552 )
1553 .await;
1554 assert!(result.is_ok(), "boundary errors always return Ok");
1555 assert!(
1556 result.unwrap().has_error(),
1557 "Continued at boundary should preserve error"
1558 );
1559 assert_eq!(
1560 dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1561 1,
1562 "DLC should be called"
1563 );
1564 }
1565
1566 #[tokio::test]
1567 async fn test_handle_boundary_with_on_steps_handled() {
1568 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1569 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1570 async move { Ok(ex) }
1571 }));
1572 let policy = ExceptionPolicy {
1573 matches: std::sync::Arc::new(|_| true),
1574 retry: None,
1575 handled_by: None,
1576 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1577 disposition: ExceptionDisposition::Handled,
1578 };
1579 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1580 let ex = make_exchange();
1581 let result = handler
1582 .handle_boundary(
1583 BoundaryKind::Security,
1584 ex,
1585 CamelError::Unauthorized("denied".into()),
1586 )
1587 .await;
1588 assert!(result.is_ok(), "boundary errors always return Ok");
1589 let ex = result.unwrap();
1590 assert!(!ex.has_error(), "Handled disposition should clear error");
1591 assert!(
1592 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1593 "on_steps should have modified the body"
1594 );
1595 }
1596}