1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::{Layer, Service, ServiceExt};
6
7use camel_api::error_handler::{
8 BoundaryKind, ExceptionDisposition, ExceptionPolicy, HEADER_REDELIVERED,
9 HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER, PolicyId, RetryOutcome,
10 StepDisposition,
11};
12use camel_api::{BoxProcessor, CamelError, Exchange, PipelineOutcome, SyncBoxProcessor, Value};
13
14async fn execute_on_steps(
15 original: Exchange,
16 original_err: CamelError,
17 on_steps: &SyncBoxProcessor,
18 disposition: ExceptionDisposition,
19 handler: Option<BoxProcessor>,
20) -> Result<Exchange, CamelError> {
21 let snapshot = original.clone();
22 let mut ex = original;
23 ex.set_error(original_err.clone());
24 let mut pipeline = on_steps.clone_inner();
25 let step_result = async {
26 let svc = pipeline.ready().await?;
27 svc.call(ex).await
28 }
29 .await;
30
31 match step_result {
32 Ok(mut ex) => {
33 if disposition == ExceptionDisposition::Handled {
34 ex.handle_error();
35 Ok(ex)
36 } else {
37 Err(original_err)
40 }
41 }
42 Err(_) => {
43 tracing::warn!(error = %original_err, "on_steps pipeline failed, falling back to handler/DLC");
45 let mut ex = snapshot;
46 ex.set_error(original_err);
47 send_to_handler(ex, handler).await
48 }
49 }
50}
51
52pub async fn invoke_processor(
57 svc: &mut BoxProcessor,
58 ex: Exchange,
59) -> Result<Exchange, CamelError> {
60 match svc.ready().await {
61 Ok(ready) => ready.call(ex).await,
62 Err(err) => Err(err),
63 }
64}
65
66#[async_trait::async_trait]
71pub trait RouteErrorHandler: Send + Sync {
72 fn match_policy(&self, err: &CamelError) -> Option<PolicyId>;
74
75 async fn retry_step(
77 &self,
78 policy: Option<PolicyId>,
79 step: &mut dyn camel_api::error_handler::RetryableStep,
80 original: Exchange,
81 error: CamelError,
82 ) -> RetryOutcome;
83
84 async fn handle_step(
86 &self,
87 policy: Option<PolicyId>,
88 exchange: Exchange,
89 error: CamelError,
90 ) -> Result<StepDisposition, CamelError>;
91
92 async fn handle_boundary(
94 &self,
95 kind: BoundaryKind,
96 exchange: Exchange,
97 error: CamelError,
98 ) -> Result<Exchange, CamelError>;
99}
100
101pub struct DefaultRouteErrorHandler {
107 pub(crate) dlc_producer: Option<SyncBoxProcessor>,
108 pub(crate) policies: Vec<(ExceptionPolicy, Option<SyncBoxProcessor>)>,
109 pub use_original_message: bool,
113}
114
115impl DefaultRouteErrorHandler {
116 pub fn new(
117 dlc_producer: Option<BoxProcessor>,
118 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
119 ) -> Self {
120 Self {
121 dlc_producer: dlc_producer.map(SyncBoxProcessor::new),
122 policies: policies
123 .into_iter()
124 .map(|(p, prod)| (p, prod.map(SyncBoxProcessor::new)))
125 .collect(),
126 use_original_message: false,
127 }
128 }
129
130 pub fn with_use_original_message(mut self, enabled: bool) -> Self {
132 self.use_original_message = enabled;
133 self
134 }
135
136 fn restore_original_message_if_enabled(&self, exchange: &mut Exchange) {
140 if self.use_original_message
141 && let Some(orig) =
142 exchange.get_extension::<camel_api::Message>(camel_api::ORIGINAL_MESSAGE_EXTENSION)
143 {
144 exchange.input = orig.clone();
145 }
146 }
147
148 fn resolve_producer(
151 &self,
152 policy: Option<PolicyId>,
153 ) -> (ExceptionDisposition, Option<BoxProcessor>) {
154 match policy {
155 Some(PolicyId(idx)) => match self.policies.get(idx) {
156 Some((p, prod)) => (
157 p.disposition,
158 prod.as_ref()
159 .map(|p| p.clone_inner())
160 .or_else(|| self.dlc_producer.as_ref().map(|p| p.clone_inner())),
161 ),
162 None => (
163 ExceptionDisposition::Propagate,
164 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
165 ),
166 },
167 None => (
168 ExceptionDisposition::Propagate,
169 self.dlc_producer.as_ref().map(|p| p.clone_inner()),
170 ),
171 }
172 }
173}
174
175#[async_trait::async_trait]
176impl RouteErrorHandler for DefaultRouteErrorHandler {
177 fn match_policy(&self, err: &CamelError) -> Option<PolicyId> {
178 self.policies
179 .iter()
180 .position(|(p, _)| (p.matches)(err))
181 .map(PolicyId)
182 }
183
184 async fn retry_step(
185 &self,
186 policy: Option<PolicyId>,
187 step: &mut dyn camel_api::error_handler::RetryableStep,
188 original: Exchange,
189 error: CamelError,
190 ) -> RetryOutcome {
191 let Some(PolicyId(idx)) = policy else {
192 return RetryOutcome::Exhausted {
193 exchange: original,
194 error,
195 policy: None,
196 };
197 };
198 let Some((policy_def, _)) = self.policies.get(idx) else {
199 return RetryOutcome::Exhausted {
200 exchange: original,
201 error,
202 policy,
203 };
204 };
205 let Some(ref backoff) = policy_def.retry else {
206 return RetryOutcome::Exhausted {
207 exchange: original,
208 error,
209 policy,
210 };
211 };
212
213 for attempt in 0..backoff.max_attempts {
214 let delay = backoff.delay_for(attempt);
215 tokio::time::sleep(delay).await;
216
217 let mut ex = original.clone();
218 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
219 ex.input.set_header(
220 HEADER_REDELIVERY_COUNTER,
221 Value::Number((attempt + 1).into()),
222 );
223 ex.input.set_header(
224 HEADER_REDELIVERY_MAX_COUNTER,
225 Value::Number(backoff.max_attempts.into()),
226 );
227
228 match step.invoke(ex).await {
229 PipelineOutcome::Completed(exchange) => {
230 return RetryOutcome::Recovered(exchange);
231 }
232 PipelineOutcome::Stopped(stopped_ex) => {
233 return RetryOutcome::Stopped(stopped_ex);
234 }
235 PipelineOutcome::Failed(retry_err) => {
236 if attempt + 1 == backoff.max_attempts {
237 let mut final_ex = original;
238 final_ex
239 .input
240 .set_header(HEADER_REDELIVERED, Value::Bool(true));
241 final_ex.input.set_header(
242 HEADER_REDELIVERY_COUNTER,
243 Value::Number(backoff.max_attempts.into()),
244 );
245 final_ex.input.set_header(
246 HEADER_REDELIVERY_MAX_COUNTER,
247 Value::Number(backoff.max_attempts.into()),
248 );
249 return RetryOutcome::Exhausted {
250 exchange: final_ex,
251 error: retry_err,
252 policy,
253 };
254 }
255 }
256 }
257 }
258
259 RetryOutcome::Exhausted {
260 exchange: original,
261 error,
262 policy,
263 }
264 }
265
266 async fn handle_step(
267 &self,
268 policy: Option<PolicyId>,
269 mut exchange: Exchange,
270 error: CamelError,
271 ) -> Result<StepDisposition, CamelError> {
272 let (disposition, producer) = self.resolve_producer(policy);
273
274 if let Some(PolicyId(idx)) = policy
276 && let Some((p, _)) = self.policies.get(idx)
277 && let Some(ref steps) = p.on_steps
278 {
279 let snapshot = exchange.clone();
280 exchange.set_error(error.clone());
281 let mut step_pipeline = steps.clone_inner();
282 let step_result = async {
283 let svc = step_pipeline.ready().await?;
284 svc.call(exchange).await
285 }
286 .await;
287 match step_result {
288 Ok(mut ex) => match disposition {
289 ExceptionDisposition::Handled => {
290 ex.handle_error();
291 return Ok(StepDisposition::Handled(ex));
292 }
293 ExceptionDisposition::Continued => {
294 ex.clear_error();
295 return Ok(StepDisposition::Continued(ex));
296 }
297 ExceptionDisposition::Propagate => {
298 exchange = snapshot;
299 }
300 },
301 Err(_) => {
302 exchange = snapshot;
303 }
304 }
305 }
306
307 self.restore_original_message_if_enabled(&mut exchange);
310 exchange.set_error(error.clone());
311 match send_to_handler(exchange, producer).await {
312 Ok(handler_ex) => match disposition {
313 ExceptionDisposition::Propagate => Ok(StepDisposition::Propagate(error)),
314 ExceptionDisposition::Handled => {
315 let mut ex = handler_ex;
316 ex.clear_error();
317 Ok(StepDisposition::Handled(ex))
318 }
319 ExceptionDisposition::Continued => {
320 let mut ex = handler_ex;
321 ex.clear_error();
322 Ok(StepDisposition::Continued(ex))
323 }
324 },
325 Err(_) => Ok(StepDisposition::Propagate(error)),
327 }
328 }
329
330 async fn handle_boundary(
331 &self,
332 _kind: BoundaryKind,
333 mut exchange: Exchange,
334 error: CamelError,
335 ) -> Result<Exchange, CamelError> {
336 let policy = self.match_policy(&error);
342 let (disposition, producer) = self.resolve_producer(policy);
343
344 if let Some(PolicyId(idx)) = policy
346 && let Some((p, _)) = self.policies.get(idx)
347 && let Some(ref steps) = p.on_steps
348 {
349 let snapshot = exchange.clone();
350 exchange.set_error(error.clone());
351 let mut step_pipeline = steps.clone_inner();
352 let step_result = async {
353 let svc = step_pipeline.ready().await?;
354 svc.call(exchange).await
355 }
356 .await;
357 match step_result {
358 Ok(mut ex) => match disposition {
359 ExceptionDisposition::Handled => {
360 ex.handle_error();
361 return Ok(ex);
362 }
363 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
364 exchange = snapshot;
365 }
366 },
367 Err(_) => {
368 exchange = snapshot;
369 }
370 }
371 }
372
373 self.restore_original_message_if_enabled(&mut exchange);
375 exchange.set_error(error.clone());
376 match send_to_handler(exchange, producer).await {
377 Ok(handler_ex) => match disposition {
378 ExceptionDisposition::Handled => {
379 let mut ex = handler_ex;
380 ex.clear_error();
381 Ok(ex)
382 }
383 ExceptionDisposition::Propagate | ExceptionDisposition::Continued => {
384 let mut ex = handler_ex;
385 ex.set_error(error);
386 Ok(ex)
387 }
388 },
389 Err(e) => Err(e),
391 }
392 }
393}
394
395pub struct ErrorHandlerLayer {
399 dlc_producer: Option<BoxProcessor>,
401 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
403}
404
405impl ErrorHandlerLayer {
406 pub fn new(
408 dlc_producer: Option<BoxProcessor>,
409 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
410 ) -> Self {
411 Self {
412 dlc_producer,
413 policies,
414 }
415 }
416}
417
418impl<S> Layer<S> for ErrorHandlerLayer
419where
420 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
421 S::Future: Send + 'static,
422{
423 type Service = ErrorHandlerService<S>;
424
425 fn layer(&self, inner: S) -> Self::Service {
426 ErrorHandlerService {
427 inner,
428 dlc_producer: self.dlc_producer.clone(),
429 policies: self
430 .policies
431 .iter()
432 .map(|(p, prod)| (p.clone(), prod.clone()))
433 .collect(),
434 }
435 }
436}
437
438pub struct ErrorHandlerService<S> {
443 inner: S,
444 dlc_producer: Option<BoxProcessor>,
445 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
446}
447
448impl<S: Clone> Clone for ErrorHandlerService<S> {
449 fn clone(&self) -> Self {
450 Self {
451 inner: self.inner.clone(),
452 dlc_producer: self.dlc_producer.clone(),
453 policies: self
454 .policies
455 .iter()
456 .map(|(p, prod)| (p.clone(), prod.clone()))
457 .collect(),
458 }
459 }
460}
461
462impl<S> ErrorHandlerService<S>
463where
464 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
465 S::Future: Send + 'static,
466{
467 pub fn new(
469 inner: S,
470 dlc_producer: Option<BoxProcessor>,
471 policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)>,
472 ) -> Self {
473 Self {
474 inner,
475 dlc_producer,
476 policies,
477 }
478 }
479}
480
481impl<S> Service<Exchange> for ErrorHandlerService<S>
482where
483 S: Service<Exchange, Response = Exchange, Error = CamelError> + Send + Clone + 'static,
484 S::Future: Send + 'static,
485{
486 type Response = Exchange;
487 type Error = CamelError;
488 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
489
490 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
491 match self.inner.poll_ready(cx) {
496 Poll::Pending => Poll::Pending,
497 Poll::Ready(Err(_)) => Poll::Ready(Ok(())),
498 Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
499 }
500 }
501
502 fn call(&mut self, exchange: Exchange) -> Self::Future {
503 let mut inner = self.inner.clone();
504 let dlc = self.dlc_producer.clone();
505 let policies: Vec<(ExceptionPolicy, Option<BoxProcessor>)> = self
506 .policies
507 .iter()
508 .map(|(p, prod)| (p.clone(), prod.clone()))
509 .collect();
510
511 Box::pin(async move {
512 let original = exchange.clone();
513 let result = match inner.ready().await {
514 Ok(svc) => svc.call(exchange).await,
515 Err(e) => Err(e), };
517
518 let err = match result {
519 Ok(ex) => return Ok(ex),
520 Err(e) => e,
521 };
522
523 let matched = policies.into_iter().find(|(p, _)| (p.matches)(&err));
525
526 if let Some((policy, policy_producer)) = matched {
527 if let Some(ref backoff) = policy.retry {
529 for attempt in 0..backoff.max_attempts {
530 let delay = backoff.delay_for(attempt);
531 tokio::time::sleep(delay).await;
532
533 let mut ex = original.clone();
535 ex.input.set_header(HEADER_REDELIVERED, Value::Bool(true));
536 ex.input.set_header(
537 HEADER_REDELIVERY_COUNTER,
538 Value::Number((attempt + 1).into()),
539 );
540 ex.input.set_header(
541 HEADER_REDELIVERY_MAX_COUNTER,
542 Value::Number(backoff.max_attempts.into()),
543 );
544
545 let result = match inner.ready().await {
546 Ok(svc) => svc.call(ex).await,
547 Err(e) => Err(e), };
549 match result {
550 Ok(ex) => return Ok(ex),
551 Err(retry_err) => {
552 if attempt + 1 == backoff.max_attempts {
553 let mut original = original.clone();
555 original
556 .input
557 .set_header(HEADER_REDELIVERED, Value::Bool(true));
558 original.input.set_header(
559 HEADER_REDELIVERY_COUNTER,
560 Value::Number(backoff.max_attempts.into()),
561 );
562 original.input.set_header(
563 HEADER_REDELIVERY_MAX_COUNTER,
564 Value::Number(backoff.max_attempts.into()),
565 );
566 if let Some(ref steps) = policy.on_steps {
567 let handler = policy_producer.clone().or(dlc.clone());
568 return execute_on_steps(
569 original,
570 retry_err,
571 steps,
572 policy.disposition,
573 handler,
574 )
575 .await;
576 }
577 original.set_error(retry_err);
578 let handler = policy_producer.or(dlc);
579 return send_to_handler(original, handler).await;
580 }
581 }
582 }
583 }
584 }
585 if let Some(ref steps) = policy.on_steps {
587 let handler = policy_producer.or(dlc);
588 return execute_on_steps(original, err, steps, policy.disposition, handler)
589 .await;
590 }
591 let mut ex = original.clone();
592 ex.set_error(err);
593 let handler = policy_producer.or(dlc);
594 send_to_handler(ex, handler).await
595 } else {
596 let mut ex = original;
598 ex.set_error(err);
599 send_to_handler(ex, dlc).await
600 }
601 })
602 }
603}
604
605async fn send_to_handler(
606 exchange: Exchange,
607 producer: Option<BoxProcessor>,
608) -> Result<Exchange, CamelError> {
609 match producer {
610 None => {
611 tracing::error!(
613 error = ?exchange.error,
614 "Exchange failed with no error handler configured"
615 );
616 Ok(exchange)
617 }
618 Some(mut prod) => match prod.ready().await {
619 Err(e) => {
620 tracing::error!("DLC/handler not ready: {e}");
622 Ok(exchange)
623 }
624 Ok(svc) => match svc.call(exchange.clone()).await {
625 Ok(ex) => Ok(ex),
626 Err(e) => {
627 tracing::error!("DLC/handler call failed: {e}");
629 Ok(exchange)
631 }
632 },
633 },
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use camel_api::{
641 BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message, OutcomePipeline,
642 OutcomeSegment, PipelineOutcome, RetryableStep, SyncBoxProcessor, Value,
643 error_handler::RedeliveryPolicy,
644 };
645 use std::future::Future;
646 use std::pin::Pin;
647 use std::sync::{
648 Arc,
649 atomic::{AtomicU32, Ordering},
650 };
651 use std::time::Duration;
652 use tower::ServiceExt;
653
654 fn make_exchange() -> Exchange {
655 Exchange::new(Message::new("test"))
656 }
657
658 fn failing_processor() -> BoxProcessor {
659 BoxProcessor::from_fn(|_ex| {
660 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
661 })
662 }
663
664 fn ok_processor() -> BoxProcessor {
665 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
666 }
667
668 fn fail_n_times(n: u32) -> BoxProcessor {
669 let count = Arc::new(AtomicU32::new(0));
670 BoxProcessor::from_fn(move |ex| {
671 let count = Arc::clone(&count);
672 Box::pin(async move {
673 let c = count.fetch_add(1, Ordering::SeqCst);
674 if c < n {
675 Err(CamelError::ProcessorError(format!("attempt {c}")))
676 } else {
677 Ok(ex)
678 }
679 })
680 })
681 }
682
683 #[tokio::test]
684 async fn test_ok_passthrough() {
685 let svc = ErrorHandlerService::new(ok_processor(), None, vec![]);
686 let result = svc.oneshot(make_exchange()).await;
687 assert!(result.is_ok());
688 assert!(!result.unwrap().has_error());
689 }
690
691 #[tokio::test]
692 async fn test_error_goes_to_dlc() {
693 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
694 let received_clone = Arc::clone(&received);
695 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
696 let r = Arc::clone(&received_clone);
697 Box::pin(async move {
698 r.lock().unwrap().push(ex.clone());
699 Ok(ex)
700 })
701 });
702
703 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![]);
704 let result = svc.oneshot(make_exchange()).await;
705 assert!(result.is_ok());
706 let ex = result.unwrap();
707 assert!(ex.has_error());
708 assert_eq!(received.lock().unwrap().len(), 1);
709 }
710
711 #[tokio::test]
712 async fn test_retry_recovers() {
713 let inner = fail_n_times(2);
714 let policy = ExceptionPolicy {
715 matches: Arc::new(|_| true),
716 retry: Some(RedeliveryPolicy {
717 max_attempts: 3,
718 initial_delay: Duration::from_millis(1),
719 multiplier: 1.0,
720 max_delay: Duration::from_millis(10),
721 jitter_factor: 0.0,
722 }),
723 handled_by: None,
724 on_steps: None,
725 disposition: ExceptionDisposition::Propagate,
726 };
727 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
728 let result = svc.oneshot(make_exchange()).await;
729 assert!(result.is_ok());
730 assert!(!result.unwrap().has_error());
731 }
732
733 #[tokio::test]
734 async fn test_retry_exhausted_goes_to_dlc() {
735 let inner = fail_n_times(10);
736 let received = Arc::new(std::sync::Mutex::new(0u32));
737 let received_clone = Arc::clone(&received);
738 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
739 let r = Arc::clone(&received_clone);
740 Box::pin(async move {
741 *r.lock().unwrap() += 1;
742 Ok(ex)
743 })
744 });
745 let policy = ExceptionPolicy {
746 matches: Arc::new(|_| true),
747 retry: Some(RedeliveryPolicy {
748 max_attempts: 2,
749 initial_delay: Duration::from_millis(1),
750 multiplier: 1.0,
751 max_delay: Duration::from_millis(10),
752 jitter_factor: 0.0,
753 }),
754 handled_by: None,
755 on_steps: None,
756 disposition: ExceptionDisposition::Propagate,
757 };
758 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
759 let result = svc.oneshot(make_exchange()).await;
760 assert!(result.is_ok());
761 assert!(result.unwrap().has_error());
762 assert_eq!(*received.lock().unwrap(), 1);
763 }
764
765 #[test]
766 fn test_poll_ready_delegates_to_inner() {
767 use std::sync::atomic::AtomicBool;
768
769 #[derive(Clone)]
771 struct DelayedReadyService {
772 ready: Arc<AtomicBool>,
773 }
774
775 impl Service<Exchange> for DelayedReadyService {
776 type Response = Exchange;
777 type Error = CamelError;
778 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
779
780 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
781 if self.ready.fetch_or(true, Ordering::SeqCst) {
782 Poll::Ready(Ok(()))
784 } else {
785 cx.waker().wake_by_ref();
787 Poll::Pending
788 }
789 }
790
791 fn call(&mut self, ex: Exchange) -> Self::Future {
792 Box::pin(async move { Ok(ex) })
793 }
794 }
795
796 let waker = futures::task::noop_waker();
797 let mut cx = Context::from_waker(&waker);
798
799 let inner = DelayedReadyService {
800 ready: Arc::new(AtomicBool::new(false)),
801 };
802 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
803
804 let first = Pin::new(&mut svc).poll_ready(&mut cx);
806 assert!(first.is_pending(), "expected Pending on first poll_ready");
807
808 let second = Pin::new(&mut svc).poll_ready(&mut cx);
810 assert!(second.is_ready(), "expected Ready on second poll_ready");
811 }
812
813 #[tokio::test]
814 async fn test_no_matching_policy_uses_dlc() {
815 let received = Arc::new(std::sync::Mutex::new(0u32));
816 let received_clone = Arc::clone(&received);
817 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
818 let r = Arc::clone(&received_clone);
819 Box::pin(async move {
820 *r.lock().unwrap() += 1;
821 Ok(ex)
822 })
823 });
824 let policy = ExceptionPolicy::new(|e| matches!(e, CamelError::Io(_)));
825 let svc = ErrorHandlerService::new(failing_processor(), Some(dlc), vec![(policy, None)]);
826 let result = svc.oneshot(make_exchange()).await;
827 assert!(result.is_ok());
828 assert_eq!(*received.lock().unwrap(), 1);
829 }
830
831 #[tokio::test]
832 async fn test_redelivery_headers_are_set() {
833 use camel_api::error_handler::{
834 HEADER_REDELIVERED, HEADER_REDELIVERY_COUNTER, HEADER_REDELIVERY_MAX_COUNTER,
835 RedeliveryPolicy,
836 };
837
838 let inner = fail_n_times(10);
839 let received = Arc::new(std::sync::Mutex::new(None));
840 let received_clone = Arc::clone(&received);
841 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
842 let r = Arc::clone(&received_clone);
843 Box::pin(async move {
844 *r.lock().unwrap() = Some(ex.clone());
845 Ok(ex)
846 })
847 });
848
849 let policy = ExceptionPolicy {
850 matches: Arc::new(|_| true),
851 retry: Some(RedeliveryPolicy {
852 max_attempts: 2,
853 initial_delay: Duration::from_millis(1),
854 multiplier: 1.0,
855 max_delay: Duration::from_millis(10),
856 jitter_factor: 0.0,
857 }),
858 handled_by: None,
859 on_steps: None,
860 disposition: ExceptionDisposition::Propagate,
861 };
862
863 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
864 let _ = svc.oneshot(make_exchange()).await.unwrap();
865
866 let ex = received.lock().unwrap().take().unwrap();
867 assert_eq!(
868 ex.input.header(HEADER_REDELIVERED),
869 Some(&Value::Bool(true))
870 );
871 assert_eq!(
872 ex.input.header(HEADER_REDELIVERY_COUNTER),
873 Some(&Value::Number(2.into()))
874 );
875 assert_eq!(
876 ex.input.header(HEADER_REDELIVERY_MAX_COUNTER),
877 Some(&Value::Number(2.into()))
878 );
879 }
880
881 #[tokio::test]
882 async fn test_jitter_produces_varying_delays_in_retry_flow() {
883 use std::time::Instant;
884
885 let inner = fail_n_times(10);
886 let received = Arc::new(std::sync::Mutex::new(None));
887 let received_clone = Arc::clone(&received);
888 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
889 let r = Arc::clone(&received_clone);
890 Box::pin(async move {
891 *r.lock().unwrap() = Some(ex.clone());
892 Ok(ex)
893 })
894 });
895
896 let policy = ExceptionPolicy {
897 matches: Arc::new(|_| true),
898 retry: Some(RedeliveryPolicy {
899 max_attempts: 5,
900 initial_delay: Duration::from_millis(20),
901 multiplier: 1.0,
902 max_delay: Duration::from_millis(100),
903 jitter_factor: 0.5,
904 }),
905 handled_by: None,
906 on_steps: None,
907 disposition: ExceptionDisposition::Propagate,
908 };
909
910 let start = Instant::now();
911 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![(policy, None)]);
912 let _ = svc.oneshot(make_exchange()).await.unwrap();
913 let elapsed = start.elapsed();
914
915 assert!(
916 received.lock().unwrap().is_some(),
917 "DLC should have received exchange"
918 );
919
920 assert!(
921 elapsed >= Duration::from_millis(50),
922 "5 retries with 20ms base delay should take at least 50ms (with jitter low bound)"
923 );
924
925 assert!(
926 elapsed <= Duration::from_millis(500),
927 "5 retries with 20ms base delay + 50% jitter should not exceed 500ms"
928 );
929 }
930
931 #[tokio::test]
932 async fn test_on_steps_handled_true_consumes_error() {
933 use tower::ServiceExt;
934
935 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
936 ex.input.body = camel_api::Body::Bytes("handled".into());
937 async move { Ok(ex) }
938 }));
939 let policy = ExceptionPolicy {
940 matches: Arc::new(|_| true),
941 retry: None,
942 handled_by: None,
943 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
944 disposition: ExceptionDisposition::Handled,
945 };
946 let inner = tower::service_fn(|_ex: Exchange| async {
947 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
948 });
949 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
950 let ex = Exchange::default();
951 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
952 assert!(result.error.is_none(), "handled:true should clear error");
953 assert!(matches!(result.input.body, camel_api::Body::Bytes(_)));
954 }
955
956 #[tokio::test]
957 async fn test_on_steps_handled_false_propagates_error() {
958 use tower::ServiceExt;
959
960 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
961 ex.input.body = camel_api::Body::Bytes("handled".into());
962 async move { Ok(ex) }
963 }));
964 let policy = ExceptionPolicy {
965 matches: Arc::new(|_| true),
966 retry: None,
967 handled_by: None,
968 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
969 disposition: ExceptionDisposition::Propagate,
970 };
971 let inner = tower::service_fn(|_ex: Exchange| async {
972 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
973 });
974 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
975 let ex = Exchange::default();
976 let result = svc.ready().await.unwrap().call(ex).await;
977 assert!(result.is_err(), "handled:false should propagate error");
978 }
979
980 #[derive(Clone)]
988 struct ReadinessFailService {
989 error: CamelError,
990 }
991
992 impl ReadinessFailService {
993 fn new(error: CamelError) -> Self {
994 Self { error }
995 }
996 }
997
998 impl Service<Exchange> for ReadinessFailService {
999 type Response = Exchange;
1000 type Error = CamelError;
1001 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1002
1003 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1004 Poll::Ready(Err(self.error.clone()))
1005 }
1006
1007 fn call(&mut self, ex: Exchange) -> Self::Future {
1008 Box::pin(async move { Ok(ex) })
1011 }
1012 }
1013
1014 #[tokio::test]
1015 async fn test_readiness_error_goes_to_dlc() {
1016 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1017 let inner = ReadinessFailService {
1018 error: readiness_err,
1019 };
1020
1021 let received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
1022 let received_clone = Arc::clone(&received);
1023 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1024 let r = Arc::clone(&received_clone);
1025 Box::pin(async move {
1026 r.lock().unwrap().push(ex.clone());
1027 Ok(ex)
1028 })
1029 });
1030
1031 let svc = ErrorHandlerService::new(inner, Some(dlc), vec![]);
1032 let result = svc.oneshot(make_exchange()).await;
1033
1034 assert!(
1036 result.is_ok(),
1037 "readiness error should be captured and sent to DLC, got: {:?}",
1038 result
1039 );
1040 let ex = result.unwrap();
1041 assert!(ex.has_error(), "exchange should carry the readiness error");
1042 assert_eq!(
1043 received.lock().unwrap().len(),
1044 1,
1045 "DLC should have received the exchange exactly once"
1046 );
1047 }
1048
1049 #[tokio::test]
1050 async fn test_readiness_error_goes_to_matching_policy() {
1051 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1052 let inner = ReadinessFailService {
1053 error: readiness_err,
1054 };
1055
1056 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1057 ex.input.body = camel_api::Body::Bytes("handled-readiness".into());
1058 async move { Ok(ex) }
1059 }));
1060 let policy = ExceptionPolicy {
1061 matches: Arc::new(|_| true),
1062 retry: None,
1063 handled_by: None,
1064 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1065 disposition: ExceptionDisposition::Handled,
1066 };
1067
1068 let svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1069 let result = svc.oneshot(make_exchange()).await;
1070
1071 assert!(
1073 result.is_ok(),
1074 "readiness error should be captured by policy, got: {:?}",
1075 result
1076 );
1077 let ex = result.unwrap();
1078 assert!(ex.error.is_none(), "handled:true should clear error");
1079 assert!(
1080 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1081 "on_steps should have modified the body"
1082 );
1083 }
1084
1085 #[test]
1086 fn test_poll_ready_converts_readiness_error_to_ok() {
1087 let readiness_err = CamelError::ProcessorError("readiness-fail".into());
1088 let inner = ReadinessFailService {
1089 error: readiness_err,
1090 };
1091 let mut svc = ErrorHandlerService::new(inner, None, vec![]);
1092
1093 let waker = futures::task::noop_waker();
1094 let mut cx = Context::from_waker(&waker);
1095
1096 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
1098 match poll {
1099 Poll::Ready(Ok(())) => { }
1100 Poll::Ready(Err(e)) => panic!("poll_ready leaked readiness error: {:?}", e),
1101 Poll::Pending => panic!("poll_ready should be Ready for readiness errors"),
1102 }
1103 }
1104
1105 #[tokio::test]
1108 async fn test_invoke_processor_returns_ok_on_success() {
1109 let mut svc = ok_processor();
1110 let ex = make_exchange();
1111 let result = invoke_processor(&mut svc, ex).await;
1112 assert!(result.is_ok());
1113 }
1114
1115 #[tokio::test]
1116 async fn test_invoke_processor_captures_readiness_error() {
1117 let mut failing_ready: BoxProcessor = BoxProcessor::new(ReadinessFailService::new(
1118 CamelError::ProcessorError("not ready".into()),
1119 ));
1120 let ex = make_exchange();
1121 let result = invoke_processor(&mut failing_ready, ex).await;
1122 assert!(result.is_err());
1123 }
1124
1125 #[tokio::test]
1126 async fn test_on_steps_handled_true_clears_exception_properties() {
1127 use tower::ServiceExt;
1128
1129 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1130 ex.input.body = camel_api::Body::Bytes("handled".into());
1131 async move { Ok(ex) }
1132 }));
1133 let policy = ExceptionPolicy {
1134 matches: Arc::new(|_| true),
1135 retry: None,
1136 handled_by: None,
1137 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1138 disposition: ExceptionDisposition::Handled,
1139 };
1140 let inner = tower::service_fn(|_ex: Exchange| async {
1141 Err::<Exchange, CamelError>(CamelError::RouteError("fail".to_string()))
1142 });
1143 let mut svc = ErrorHandlerService::new(inner, None, vec![(policy, None)]);
1144 let ex = Exchange::default();
1145 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1146 assert!(result.error.is_none(), "handled:true should clear error");
1147 assert!(
1148 result
1149 .properties
1150 .get(camel_api::exchange::PROPERTY_EXCEPTION_MESSAGE)
1151 .is_none(),
1152 "handled:true should clear exception properties"
1153 );
1154 assert!(
1155 result
1156 .properties
1157 .get(camel_api::exchange::PROPERTY_EXCEPTION_KIND)
1158 .is_none(),
1159 "handled:true should clear exception kind property"
1160 );
1161 assert!(
1162 result
1163 .properties
1164 .get(camel_api::exchange::PROPERTY_EXCEPTION_CAUGHT)
1165 .is_none(),
1166 "handled:true should clear exception caught property"
1167 );
1168 }
1169
1170 #[test]
1173 fn test_match_policy_returns_id_for_matching_error() {
1174 let handler = DefaultRouteErrorHandler::new(
1175 None,
1176 vec![(
1177 ExceptionPolicy::new(|e| matches!(e, CamelError::ProcessorError(_))),
1178 None,
1179 )],
1180 );
1181 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1182 assert_eq!(id, Some(PolicyId(0)));
1183 }
1184
1185 #[test]
1186 fn test_match_policy_returns_none_for_unmatched() {
1187 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1188 let id = handler.match_policy(&CamelError::ProcessorError("test".into()));
1189 assert_eq!(id, None);
1190 }
1191
1192 #[tokio::test]
1195 async fn test_retry_step_succeeds_on_second_attempt() {
1196 let mut policy = ExceptionPolicy::new(|_| true);
1197 policy.retry = Some(RedeliveryPolicy::new(3));
1198 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1199 let mut step = fail_n_times(1); let ex = make_exchange();
1201 let outcome = handler
1202 .retry_step(
1203 Some(PolicyId(0)),
1204 &mut step,
1205 ex,
1206 CamelError::ProcessorError("attempt 0".into()),
1207 )
1208 .await;
1209 assert!(matches!(outcome, RetryOutcome::Recovered(_)));
1210 }
1211
1212 #[tokio::test]
1213 async fn test_retry_step_exhausted_when_all_fail() {
1214 let mut policy = ExceptionPolicy::new(|_| true);
1215 policy.retry = Some(RedeliveryPolicy::new(3));
1216 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1217 let mut step = failing_processor();
1218 let ex = make_exchange();
1219 let outcome = handler
1220 .retry_step(
1221 Some(PolicyId(0)),
1222 &mut step,
1223 ex,
1224 CamelError::ProcessorError("boom".into()),
1225 )
1226 .await;
1227 assert!(matches!(outcome, RetryOutcome::Exhausted { .. }));
1228 }
1229
1230 #[tokio::test]
1231 async fn test_retry_step_no_policy_returns_exhausted_immediately() {
1232 let handler = DefaultRouteErrorHandler::new(None, vec![]);
1233 let mut step = ok_processor();
1234 let ex = make_exchange();
1235 let outcome = handler
1236 .retry_step(
1237 None,
1238 &mut step,
1239 ex,
1240 CamelError::ProcessorError("boom".into()),
1241 )
1242 .await;
1243 assert!(matches!(
1244 outcome,
1245 RetryOutcome::Exhausted { policy: None, .. }
1246 ));
1247 }
1248
1249 #[tokio::test]
1252 async fn test_handle_step_propagate_sends_to_dlc() {
1253 let dlc = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
1254 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1255 let ex = make_exchange();
1256 let result = handler
1257 .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1258 .await;
1259 assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1260 }
1261
1262 #[tokio::test]
1263 async fn test_handle_step_handled_uses_handler_output() {
1264 let handler_producer = BoxProcessor::from_fn(|mut ex| {
1265 Box::pin(async move {
1266 ex.input.set_header("processed_by", Value::Bool(true));
1267 Ok(ex)
1268 })
1269 });
1270 let policy = ExceptionPolicy {
1271 matches: std::sync::Arc::new(|_| true),
1272 retry: None,
1273 handled_by: None,
1274 on_steps: None,
1275 disposition: ExceptionDisposition::Handled,
1276 };
1277 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, Some(handler_producer))]);
1278 let mut ex = make_exchange();
1279 ex.set_error(CamelError::ProcessorError("boom".into()));
1280 let result = handler
1281 .handle_step(
1282 Some(PolicyId(0)),
1283 ex,
1284 CamelError::ProcessorError("boom".into()),
1285 )
1286 .await;
1287 match result {
1288 Ok(StepDisposition::Handled(ex)) => {
1289 assert!(!ex.has_error(), "error should be cleared");
1290 assert_eq!(
1291 ex.input.header("processed_by"),
1292 Some(&Value::Bool(true)),
1293 "should use handler's output exchange"
1294 );
1295 }
1296 other => panic!("expected Handled, got {:?}", other.is_ok()),
1297 }
1298 }
1299
1300 #[tokio::test]
1301 async fn test_handle_step_continued_clears_error() {
1302 let policy = ExceptionPolicy {
1303 matches: std::sync::Arc::new(|_| true),
1304 retry: None,
1305 handled_by: None,
1306 on_steps: None,
1307 disposition: ExceptionDisposition::Continued,
1308 };
1309 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1310 let mut ex = make_exchange();
1311 ex.set_error(CamelError::ProcessorError("boom".into()));
1312 let result = handler
1313 .handle_step(
1314 Some(PolicyId(0)),
1315 ex,
1316 CamelError::ProcessorError("boom".into()),
1317 )
1318 .await;
1319 match result {
1320 Ok(StepDisposition::Continued(ex)) => assert!(!ex.has_error()),
1321 other => panic!("expected Continued, got {:?}", other.is_ok()),
1322 }
1323 }
1324
1325 #[tokio::test]
1326 async fn test_handle_step_with_on_steps_handled() {
1327 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1328 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1329 async move { Ok(ex) }
1330 }));
1331 let policy = ExceptionPolicy {
1332 matches: std::sync::Arc::new(|_| true),
1333 retry: None,
1334 handled_by: None,
1335 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1336 disposition: ExceptionDisposition::Handled,
1337 };
1338 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1339 let mut ex = make_exchange();
1340 ex.set_error(CamelError::ProcessorError("boom".into()));
1341 let result = handler
1342 .handle_step(
1343 Some(PolicyId(0)),
1344 ex,
1345 CamelError::ProcessorError("boom".into()),
1346 )
1347 .await;
1348 match result {
1349 Ok(StepDisposition::Handled(ex)) => {
1350 assert!(!ex.has_error(), "error should be cleared");
1351 assert!(
1352 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1353 "on_steps should have modified the body"
1354 );
1355 }
1356 other => panic!("expected Handled, got: {}", other.is_ok()),
1357 }
1358 }
1359
1360 #[tokio::test]
1361 async fn test_handle_step_with_on_steps_propagate_falls_through() {
1362 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1363 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1364 async move { Ok(ex) }
1365 }));
1366 let dlc_called = Arc::new(AtomicU32::new(0));
1367 let dlc_called_clone = dlc_called.clone();
1368 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1369 let c = dlc_called_clone.clone();
1370 Box::pin(async move {
1371 c.fetch_add(1, Ordering::SeqCst);
1372 Ok(ex)
1373 })
1374 });
1375 let policy = ExceptionPolicy {
1376 matches: std::sync::Arc::new(|_| true),
1377 retry: None,
1378 handled_by: None,
1379 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1380 disposition: ExceptionDisposition::Propagate,
1381 };
1382 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1383 let mut ex = make_exchange();
1384 ex.set_error(CamelError::ProcessorError("boom".into()));
1385 let result = handler
1386 .handle_step(
1387 Some(PolicyId(0)),
1388 ex,
1389 CamelError::ProcessorError("boom".into()),
1390 )
1391 .await;
1392 assert!(
1393 matches!(result, Ok(StepDisposition::Propagate(_))),
1394 "Propagate disposition should return Propagate"
1395 );
1396 assert_eq!(
1397 dlc_called.load(Ordering::SeqCst),
1398 1,
1399 "DLC should be called when on_steps disposition is Propagate"
1400 );
1401 }
1402
1403 #[tokio::test]
1404 async fn test_handle_step_dlc_failure_propagates() {
1405 let failing_dlc = BoxProcessor::from_fn(|_| {
1406 Box::pin(async { Err(CamelError::ProcessorError("dlc broken".into())) })
1407 });
1408 let handler = DefaultRouteErrorHandler::new(Some(failing_dlc), vec![]);
1409 let ex = make_exchange();
1410 let result = handler
1411 .handle_step(None, ex, CamelError::ProcessorError("original".into()))
1412 .await;
1413 assert!(
1414 matches!(result, Ok(StepDisposition::Propagate(_))),
1415 "DLC failure should still return Propagate with original error"
1416 );
1417 }
1418
1419 #[tokio::test]
1422 async fn test_handle_boundary_security_error_goes_to_dlc() {
1423 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1424 let count_clone = dlc_count.clone();
1425 let dlc = BoxProcessor::from_fn(move |ex| {
1426 let c = count_clone.clone();
1427 Box::pin(async move {
1428 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1429 Ok(ex)
1430 })
1431 });
1432 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1433 let ex = make_exchange();
1434 let result = handler
1435 .handle_boundary(
1436 BoundaryKind::Security,
1437 ex,
1438 CamelError::Unauthorized("denied".into()),
1439 )
1440 .await;
1441 assert!(result.is_ok());
1442 assert_eq!(dlc_count.load(std::sync::atomic::Ordering::SeqCst), 1);
1443 }
1444
1445 #[tokio::test]
1446 async fn test_handle_boundary_handled_clears_error() {
1447 let policy = ExceptionPolicy {
1448 matches: std::sync::Arc::new(|_| true),
1449 retry: None,
1450 handled_by: None,
1451 on_steps: None,
1452 disposition: ExceptionDisposition::Handled,
1453 };
1454 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1455 let ex = make_exchange();
1456 let result = handler
1457 .handle_boundary(
1458 BoundaryKind::Security,
1459 ex,
1460 CamelError::Unauthorized("denied".into()),
1461 )
1462 .await;
1463 assert!(result.is_ok());
1464 assert!(
1465 !result.unwrap().has_error(),
1466 "Handled disposition should clear error"
1467 );
1468 }
1469
1470 #[tokio::test]
1471 async fn test_handle_boundary_propagate_preserves_error() {
1472 let policy = ExceptionPolicy {
1473 matches: std::sync::Arc::new(|_| true),
1474 retry: None,
1475 handled_by: None,
1476 on_steps: None,
1477 disposition: ExceptionDisposition::Propagate,
1478 };
1479 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1480 let ex = make_exchange();
1481 let result = handler
1482 .handle_boundary(
1483 BoundaryKind::CircuitBreaker,
1484 ex,
1485 CamelError::CircuitOpen("open".into()),
1486 )
1487 .await;
1488 assert!(result.is_ok(), "boundary errors always return Ok");
1489 assert!(
1490 result.unwrap().has_error(),
1491 "Propagate disposition should preserve error"
1492 );
1493 }
1494
1495 #[tokio::test]
1496 async fn test_handle_boundary_continued_preserves_error_like_propagate() {
1497 let dlc_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
1498 let count_clone = dlc_count.clone();
1499 let dlc = BoxProcessor::from_fn(move |ex| {
1500 let c = count_clone.clone();
1501 Box::pin(async move {
1502 c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1503 Ok(ex)
1504 })
1505 });
1506 let policy = ExceptionPolicy {
1507 matches: std::sync::Arc::new(|_| true),
1508 retry: None,
1509 handled_by: None,
1510 on_steps: None,
1511 disposition: ExceptionDisposition::Continued,
1512 };
1513 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![(policy, None)]);
1514 let ex = make_exchange();
1515 let result = handler
1516 .handle_boundary(
1517 BoundaryKind::Security,
1518 ex,
1519 CamelError::Unauthorized("denied".into()),
1520 )
1521 .await;
1522 assert!(result.is_ok(), "boundary errors always return Ok");
1523 assert!(
1524 result.unwrap().has_error(),
1525 "Continued at boundary should preserve error"
1526 );
1527 assert_eq!(
1528 dlc_count.load(std::sync::atomic::Ordering::SeqCst),
1529 1,
1530 "DLC should be called"
1531 );
1532 }
1533
1534 #[tokio::test]
1535 async fn test_handle_boundary_with_on_steps_handled() {
1536 let steps_pipeline = BoxProcessor::new(tower::service_fn(|mut ex: Exchange| {
1537 ex.input.body = camel_api::Body::Bytes("on_steps_ran".into());
1538 async move { Ok(ex) }
1539 }));
1540 let policy = ExceptionPolicy {
1541 matches: std::sync::Arc::new(|_| true),
1542 retry: None,
1543 handled_by: None,
1544 on_steps: Some(SyncBoxProcessor::new(steps_pipeline)),
1545 disposition: ExceptionDisposition::Handled,
1546 };
1547 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1548 let ex = make_exchange();
1549 let result = handler
1550 .handle_boundary(
1551 BoundaryKind::Security,
1552 ex,
1553 CamelError::Unauthorized("denied".into()),
1554 )
1555 .await;
1556 assert!(result.is_ok(), "boundary errors always return Ok");
1557 let ex = result.unwrap();
1558 assert!(!ex.has_error(), "Handled disposition should clear error");
1559 assert!(
1560 matches!(ex.input.body, camel_api::Body::Bytes(_)),
1561 "on_steps should have modified the body"
1562 );
1563 }
1564
1565 #[tokio::test]
1566 async fn retry_step_segment_stop_maps_to_retry_outcome_stopped() {
1567 use std::sync::Arc;
1568 use std::sync::atomic::{AtomicUsize, Ordering};
1569
1570 #[derive(Clone)]
1571 struct StoppingSegment {
1572 n: Arc<AtomicUsize>,
1573 }
1574 impl OutcomePipeline for StoppingSegment {
1575 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
1576 Box::new(self.clone())
1577 }
1578 fn run<'a>(
1579 &'a mut self,
1580 ex: Exchange,
1581 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
1582 let n = self.n.clone();
1583 Box::pin(async move {
1584 n.fetch_add(1, Ordering::SeqCst);
1585 PipelineOutcome::Stopped(ex)
1586 })
1587 }
1588 }
1589
1590 let call_count = Arc::new(AtomicUsize::new(0));
1591 let seg = OutcomeSegment::new(Box::new(StoppingSegment {
1592 n: call_count.clone(),
1593 }));
1594 let mut retryable: Box<dyn RetryableStep> = Box::new(seg);
1595
1596 let mut policy = ExceptionPolicy::new(|_e: &CamelError| true);
1597 policy.retry = Some(RedeliveryPolicy::new(3));
1598 let handler = DefaultRouteErrorHandler::new(None, vec![(policy, None)]);
1599
1600 let original = Exchange::new(Message::new("retry-me"));
1601 let err = CamelError::ProcessorError("trigger retry".into());
1602 let outcome = handler
1603 .retry_step(Some(PolicyId(0)), retryable.as_mut(), original, err)
1604 .await;
1605
1606 assert!(
1607 matches!(outcome, RetryOutcome::Stopped(_)),
1608 "Segment Stop must map to RetryOutcome::Stopped, got {:?}",
1609 outcome
1610 );
1611 assert_eq!(
1612 call_count.load(Ordering::SeqCst),
1613 1,
1614 "Stop must short-circuit retry — only 1 invoke expected, got {}",
1615 call_count.load(Ordering::SeqCst)
1616 );
1617 }
1618
1619 #[tokio::test]
1620 async fn retry_step_new_signature_works_with_dlc_producer() {
1621 use std::sync::Arc;
1622 use std::sync::atomic::{AtomicUsize, Ordering};
1623
1624 #[derive(Clone)]
1625 struct CountingProducer {
1626 count: Arc<AtomicUsize>,
1627 succeed_on: usize,
1628 }
1629 impl tower::Service<Exchange> for CountingProducer {
1630 type Response = Exchange;
1631 type Error = CamelError;
1632 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
1633 fn poll_ready(
1634 &mut self,
1635 _cx: &mut std::task::Context<'_>,
1636 ) -> std::task::Poll<Result<(), Self::Error>> {
1637 std::task::Poll::Ready(Ok(()))
1638 }
1639 fn call(&mut self, ex: Exchange) -> Self::Future {
1640 let n = self.count.fetch_add(1, Ordering::SeqCst);
1641 let succeed_on = self.succeed_on;
1642 Box::pin(async move {
1643 if n >= succeed_on {
1644 Ok(ex)
1645 } else {
1646 Err(CamelError::ProcessorError("retry".into()))
1647 }
1648 })
1649 }
1650 }
1651
1652 let count = Arc::new(AtomicUsize::new(0));
1653 let producer = CountingProducer {
1654 count: count.clone(),
1655 succeed_on: 2,
1656 };
1657 let sync_bp = SyncBoxProcessor::new(BoxProcessor::new(producer));
1658 let bp1 = sync_bp.clone_inner();
1659 let bp2 = sync_bp.clone_inner();
1660 let mut retryable1: Box<dyn RetryableStep> = Box::new(bp1);
1661 let mut retryable2: Box<dyn RetryableStep> = Box::new(bp2);
1662
1663 let ex = Exchange::new(Message::new("dlc"));
1664 let outcome1 = retryable1.invoke(ex.clone()).await;
1665 let outcome2 = retryable2.invoke(ex).await;
1666 assert!(matches!(outcome1, PipelineOutcome::Failed(_)));
1667 assert!(matches!(outcome2, PipelineOutcome::Failed(_)));
1668 assert_eq!(
1669 count.load(Ordering::SeqCst),
1670 2,
1671 "DLC producer must be invoked exactly twice through SyncBoxProcessor"
1672 );
1673 drop(retryable1);
1674 drop(retryable2);
1675 drop(sync_bp);
1676 }
1677
1678 #[tokio::test]
1681 async fn test_use_original_message_restores_body_before_dlc() {
1682 let dlc_received = Arc::new(std::sync::Mutex::new(None::<Exchange>));
1685 let dlc_received_clone = Arc::clone(&dlc_received);
1686 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1687 let r = Arc::clone(&dlc_received_clone);
1688 Box::pin(async move {
1689 *r.lock().unwrap() = Some(ex.clone());
1690 Ok(ex)
1691 })
1692 });
1693
1694 let mut handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1695 handler.use_original_message = true;
1696
1697 let mut ex = make_exchange();
1699 ex.input = Message::new("original-body");
1700
1701 let original: Arc<Message> = Arc::new(ex.input.clone());
1703 ex.set_extension(camel_api::ORIGINAL_MESSAGE_EXTENSION, original);
1704
1705 ex.input.body = camel_api::Body::Bytes("mutated-body".into());
1707
1708 let result = handler
1710 .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1711 .await;
1712 assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1713
1714 let received = dlc_received
1716 .lock()
1717 .unwrap()
1718 .take()
1719 .expect("DLC should have been called");
1720 let received_text = match &received.input.body {
1721 camel_api::Body::Text(s) => s.clone(),
1722 camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
1723 camel_api::Body::Json(v) => v.to_string(),
1724 _ => String::new(),
1725 };
1726 assert_eq!(
1727 received_text, "original-body",
1728 "DLC should receive original message body, not mutated version"
1729 );
1730 }
1731
1732 #[tokio::test]
1733 async fn test_use_original_message_handle_boundary_restores_before_dlc() {
1734 let dlc_received = Arc::new(std::sync::Mutex::new(None::<Exchange>));
1735 let dlc_received_clone = Arc::clone(&dlc_received);
1736 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1737 let r = Arc::clone(&dlc_received_clone);
1738 Box::pin(async move {
1739 *r.lock().unwrap() = Some(ex.clone());
1740 Ok(ex)
1741 })
1742 });
1743
1744 let mut handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1745 handler.use_original_message = true;
1746
1747 let mut ex = make_exchange();
1748 ex.input = Message::new("orig-boundary");
1749
1750 let original: Arc<Message> = Arc::new(ex.input.clone());
1751 ex.set_extension(camel_api::ORIGINAL_MESSAGE_EXTENSION, original);
1752
1753 ex.input.body = camel_api::Body::Bytes("mutated-boundary".into());
1755
1756 let result = handler
1757 .handle_boundary(
1758 BoundaryKind::Security,
1759 ex,
1760 CamelError::Unauthorized("denied".into()),
1761 )
1762 .await;
1763 assert!(result.is_ok());
1764
1765 let received = dlc_received
1766 .lock()
1767 .unwrap()
1768 .take()
1769 .expect("DLC should have been called");
1770 let received_text = match &received.input.body {
1771 camel_api::Body::Text(s) => s.clone(),
1772 camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
1773 camel_api::Body::Json(v) => v.to_string(),
1774 _ => String::new(),
1775 };
1776 assert_eq!(
1777 received_text, "orig-boundary",
1778 "handle_boundary should restore original message before sending to DLC"
1779 );
1780 }
1781
1782 #[tokio::test]
1783 async fn test_use_original_message_false_does_not_restore() {
1784 let dlc_received = Arc::new(std::sync::Mutex::new(None::<Exchange>));
1786 let dlc_received_clone = Arc::clone(&dlc_received);
1787 let dlc = BoxProcessor::from_fn(move |ex: Exchange| {
1788 let r = Arc::clone(&dlc_received_clone);
1789 Box::pin(async move {
1790 *r.lock().unwrap() = Some(ex.clone());
1791 Ok(ex)
1792 })
1793 });
1794
1795 let handler = DefaultRouteErrorHandler::new(Some(dlc), vec![]);
1796 let mut ex = make_exchange();
1799 ex.input = Message::new("original-body");
1800
1801 let original: Arc<Message> = Arc::new(ex.input.clone());
1803 ex.set_extension(camel_api::ORIGINAL_MESSAGE_EXTENSION, original);
1804
1805 ex.input.body = camel_api::Body::Bytes("mutated-body".into());
1807
1808 let result = handler
1809 .handle_step(None, ex, CamelError::ProcessorError("boom".into()))
1810 .await;
1811 assert!(matches!(result, Ok(StepDisposition::Propagate(_))));
1812
1813 let received = dlc_received
1814 .lock()
1815 .unwrap()
1816 .take()
1817 .expect("DLC should have been called");
1818 let received_text = match &received.input.body {
1819 camel_api::Body::Text(s) => s.clone(),
1820 camel_api::Body::Bytes(b) => String::from_utf8_lossy(b).to_string(),
1821 camel_api::Body::Json(v) => v.to_string(),
1822 _ => String::new(),
1823 };
1824 assert_eq!(
1825 received_text, "mutated-body",
1826 "When use_original_message=false, DLC should see the mutated body"
1827 );
1828 }
1829}