1use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tower::Service;
12
13use camel_api::metrics::MetricsCollector;
14use camel_api::{BoxProcessor, CamelError, Exchange, IdentityProcessor, PipelineOutcome};
15
16use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
17use camel_processor::{
18 CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
19};
20use tracing::Instrument;
21
22use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
23use crate::lifecycle::adapters::step_compilers::CompiledStep;
24use crate::shared::observability::adapters::TracingProcessor;
25use crate::shared::observability::domain::DetailLevel;
26
27pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
33 if processors.is_empty() {
34 return BoxProcessor::new(IdentityProcessor);
35 }
36 BoxProcessor::new(SequentialPipeline {
37 steps: processors,
38 handler: None,
39 flatten_stop: false,
40 })
41}
42
43pub fn compose_pipeline_with_handler(
49 processors: Vec<CompiledStep>,
50 handler: Option<Arc<dyn RouteErrorHandler>>,
51) -> BoxProcessor {
52 if processors.is_empty() {
53 return BoxProcessor::new(IdentityProcessor);
54 }
55 BoxProcessor::new(SequentialPipeline {
56 steps: processors,
57 handler,
58 flatten_stop: true, })
60}
61
62pub fn compose_traced_pipeline(
67 processors: Vec<CompiledStep>,
68 route_id: &str,
69 trace_enabled: bool,
70 detail_level: DetailLevel,
71 metrics: Option<Arc<dyn MetricsCollector>>,
72 handler: Option<Arc<dyn RouteErrorHandler>>,
73) -> BoxProcessor {
74 if !trace_enabled {
75 return compose_pipeline_with_handler(processors, handler);
76 }
77
78 if processors.is_empty() {
79 return BoxProcessor::new(IdentityProcessor);
80 }
81
82 let wrapped: Vec<CompiledStep> = processors
83 .into_iter()
84 .enumerate()
85 .map(|(idx, step)| {
86 let (p, c) = match step {
87 CompiledStep::Process {
88 processor,
89 body_contract,
90 } => (processor, body_contract),
91 CompiledStep::Stop => return CompiledStep::Stop,
92 };
93 let traced = BoxProcessor::new(TracingProcessor::new(
94 p,
95 route_id.to_string(),
96 idx,
97 detail_level.clone(),
98 metrics.clone(),
99 ));
100 CompiledStep::Process {
101 processor: traced,
102 body_contract: c,
103 }
104 })
105 .collect();
106
107 BoxProcessor::new(TracedPipeline {
108 steps: wrapped,
109 handler,
110 flatten_stop: false, })
112}
113
114pub fn compose_pipeline_with_contracts(
120 processors: Vec<CompiledStep>,
121 handler: Option<Arc<dyn RouteErrorHandler>>,
122) -> BoxProcessor {
123 let wrapped: Vec<CompiledStep> = processors
124 .into_iter()
125 .map(|step| match step {
126 CompiledStep::Process {
127 processor,
128 body_contract,
129 } => {
130 let coerced = wrap_if_needed(processor, body_contract);
131 CompiledStep::Process {
132 processor: coerced,
133 body_contract: None,
134 }
135 }
136 CompiledStep::Stop => CompiledStep::Stop,
137 })
138 .collect();
139 compose_pipeline_with_handler(wrapped, handler)
140}
141
142pub(crate) fn compose_traced_pipeline_with_contracts(
147 processors: Vec<CompiledStep>,
148 route_id: &str,
149 trace_enabled: bool,
150 detail_level: DetailLevel,
151 metrics: Option<Arc<dyn MetricsCollector>>,
152 handler: Option<Arc<dyn RouteErrorHandler>>,
153) -> BoxProcessor {
154 if !trace_enabled {
155 return compose_pipeline_with_contracts(processors, handler);
156 }
157
158 if processors.is_empty() {
159 return BoxProcessor::new(IdentityProcessor);
160 }
161
162 let wrapped: Vec<CompiledStep> = processors
163 .into_iter()
164 .enumerate()
165 .map(|(idx, step)| match step {
166 CompiledStep::Process {
167 processor,
168 body_contract,
169 } => {
170 let coerced = wrap_if_needed(processor, body_contract);
171 let traced = BoxProcessor::new(TracingProcessor::new(
172 coerced,
173 route_id.to_string(),
174 idx,
175 detail_level.clone(),
176 metrics.clone(),
177 ));
178 CompiledStep::Process {
179 processor: traced,
180 body_contract: None,
181 }
182 }
183 CompiledStep::Stop => CompiledStep::Stop,
184 })
185 .collect();
186
187 BoxProcessor::new(TracedPipeline {
188 steps: wrapped,
189 handler,
190 flatten_stop: true, })
192}
193
194#[derive(Clone)]
203struct SequentialPipeline {
204 steps: Vec<CompiledStep>,
205 handler: Option<Arc<dyn RouteErrorHandler>>,
206 flatten_stop: bool,
207}
208
209impl Service<Exchange> for SequentialPipeline {
210 type Response = Exchange;
211 type Error = CamelError;
212 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
213
214 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
215 match self.steps.first() {
216 Some(CompiledStep::Process { processor, .. }) => {
217 let mut proc = processor.clone();
218 match proc.poll_ready(cx) {
219 Poll::Pending => Poll::Pending,
220 Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
221 Poll::Ready(other) => Poll::Ready(other),
222 }
223 }
224 Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
225 None => Poll::Ready(Ok(())),
226 }
227 }
228
229 fn call(&mut self, exchange: Exchange) -> Self::Future {
235 let steps = self.steps.clone();
236 let handler = self.handler.clone();
237 let flatten = self.flatten_stop;
238 Box::pin(async move {
239 let outcome = run_steps(steps, exchange, handler, false).await;
240 if flatten {
241 outcome.into_tower_result()
242 } else {
243 eip_outcome_to_result(outcome)
244 }
245 })
246 }
247}
248
249#[derive(Clone)]
251struct TracedPipeline {
252 steps: Vec<CompiledStep>,
253 handler: Option<Arc<dyn RouteErrorHandler>>,
254 flatten_stop: bool,
255}
256
257impl Service<Exchange> for TracedPipeline {
258 type Response = Exchange;
259 type Error = CamelError;
260 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
261
262 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263 match self.steps.first() {
264 Some(CompiledStep::Process { processor, .. }) => {
265 let mut proc = processor.clone();
266 match proc.poll_ready(cx) {
267 Poll::Pending => Poll::Pending,
268 Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
269 Poll::Ready(other) => Poll::Ready(other),
270 }
271 }
272 Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
273 None => Poll::Ready(Ok(())),
274 }
275 }
276
277 fn call(&mut self, exchange: Exchange) -> Self::Future {
280 let steps = self.steps.clone();
281 let handler = self.handler.clone();
282 let flatten = self.flatten_stop;
283 Box::pin(async move {
284 let outcome = run_steps(steps, exchange, handler, true).await;
285 if flatten {
286 outcome.into_tower_result()
287 } else {
288 eip_outcome_to_result(outcome)
289 }
290 })
291 }
292}
293
294fn eip_outcome_to_result(outcome: PipelineOutcome) -> Result<Exchange, CamelError> {
302 match outcome {
303 PipelineOutcome::Completed(ex) => Ok(ex),
304 PipelineOutcome::Stopped(_ex) => Err(CamelError::Stopped),
305 PipelineOutcome::Failed(err) => Err(err),
306 }
307}
308
309pub async fn run_steps(
323 steps: Vec<CompiledStep>,
324 exchange: Exchange,
325 handler: Option<Arc<dyn RouteErrorHandler>>,
326 trace: bool,
327) -> PipelineOutcome {
328 use camel_api::PipelineOutcome;
329 let mut ex = exchange;
330 for (i, step) in steps.into_iter().enumerate() {
331 let CompiledStep::Process { mut processor, .. } = step else {
332 return PipelineOutcome::Stopped(ex);
335 };
336 let original = ex.clone();
337 let invoke_future = invoke_processor(&mut processor, ex);
338 let result = if trace {
339 invoke_future
340 .instrument(tracing::debug_span!("pipeline_step", index = i))
341 .await
342 } else {
343 invoke_future.await
344 };
345 match result {
346 Ok(next) => {
347 ex = next;
348 }
349 Err(err) => {
350 if matches!(err, CamelError::Stopped) {
358 return PipelineOutcome::Stopped(original);
359 }
360 let Some(handler) = handler.as_ref() else {
365 return PipelineOutcome::Failed(err);
366 };
367
368 let policy = handler.match_policy(&err);
369 match handler
370 .retry_step(policy, &mut processor, original, err)
371 .await
372 {
373 RetryOutcome::Recovered(exchange) => {
374 ex = exchange;
375 continue;
376 }
377 RetryOutcome::Exhausted {
378 exchange,
379 error,
380 policy,
381 } => {
382 let handle_future = handler.handle_step(policy, exchange, error);
383 let disposition = if trace {
384 handle_future
385 .instrument(tracing::debug_span!("error_handler", step_index = i))
386 .await
387 } else {
388 handle_future.await
389 };
390 match disposition {
391 Ok(StepDisposition::Propagate(e)) => return PipelineOutcome::Failed(e),
392 Ok(StepDisposition::Handled(done)) => {
393 return PipelineOutcome::Completed(done);
394 }
395 Ok(StepDisposition::Continued(next)) => {
396 ex = next;
397 }
398 Err(e) => return PipelineOutcome::Failed(e),
399 }
400 }
401 }
402 }
403 }
404 }
405 PipelineOutcome::Completed(ex)
406}
407
408#[derive(Clone)]
415pub struct RouteChannelService {
416 handler: Arc<dyn RouteErrorHandler>,
417 security: Option<BoxProcessor>,
418 cb_gate: Option<CircuitBreakerGate>,
419 pipeline: BoxProcessor,
420}
421
422impl RouteChannelService {
423 pub fn new(
424 handler: Arc<dyn RouteErrorHandler>,
425 security: Option<BoxProcessor>,
426 cb_gate: Option<CircuitBreakerGate>,
427 pipeline: BoxProcessor,
428 ) -> Self {
429 Self {
430 handler,
431 security,
432 cb_gate,
433 pipeline,
434 }
435 }
436}
437
438impl Service<Exchange> for RouteChannelService {
439 type Response = Exchange;
440 type Error = CamelError;
441 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
442
443 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
444 if let Some(ref mut sec) = self.security {
446 match sec.clone().poll_ready(cx) {
447 Poll::Pending => return Poll::Pending,
448 Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
449 }
450 }
451 match self.pipeline.clone().poll_ready(cx) {
453 Poll::Pending => return Poll::Pending,
454 Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
455 }
456 Poll::Ready(Ok(()))
457 }
458
459 fn call(&mut self, exchange: Exchange) -> Self::Future {
460 let handler = self.handler.clone();
461 let security = self.security.clone();
462 let cb_gate = self.cb_gate.clone();
463 let mut pipeline = self.pipeline.clone();
464
465 Box::pin(async move {
466 let mut ex = exchange;
467
468 if let Some(mut sec) = security {
470 let original = ex.clone();
471 match invoke_processor(&mut sec, ex).await {
472 Ok(next) => ex = next,
473 Err(err) => {
474 return handler
475 .handle_boundary(BoundaryKind::Security, original, err)
476 .await;
477 }
478 }
479 }
480
481 if let Some(ref cb) = cb_gate {
483 match cb.before_call() {
484 CircuitBreakerDecision::Allow => { }
485 CircuitBreakerDecision::Fallback(mut fb) => {
486 let original = ex.clone();
489 match invoke_processor(&mut fb, ex).await {
490 Ok(result) => return Ok(result),
491 Err(err) => {
492 return handler
493 .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
494 .await;
495 }
496 }
497 }
498 CircuitBreakerDecision::Reject(err) => {
499 let original = ex.clone();
500 return handler
501 .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
502 .await;
503 }
504 }
505 }
506
507 let result = invoke_processor(&mut pipeline, ex).await;
509
510 if let Some(ref cb) = cb_gate {
512 cb.after_result(&result);
513 }
514
515 result
517 })
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use camel_api::error_handler::{BoundaryKind, PolicyId, RetryOutcome, StepDisposition};
525 use camel_api::{Body, BoxProcessorExt, CircuitBreakerConfig, Message, Value};
526 use camel_processor::RouteErrorHandler;
527 use serde_json::json;
528 use std::sync::Arc;
529 use std::sync::Mutex;
530 use std::sync::atomic::{AtomicBool, Ordering};
531 use std::time::Duration;
532 use tower::ServiceExt;
533
534 fn make_test_exchange() -> Exchange {
535 Exchange::new(Message::new("test"))
536 }
537
538 struct ContinuedHandler;
540
541 #[async_trait::async_trait]
542 impl RouteErrorHandler for ContinuedHandler {
543 fn match_policy(&self, _: &CamelError) -> Option<PolicyId> {
544 Some(PolicyId(0))
545 }
546
547 async fn retry_step(
548 &self,
549 _: Option<PolicyId>,
550 _: &mut BoxProcessor,
551 original: Exchange,
552 error: CamelError,
553 ) -> RetryOutcome {
554 RetryOutcome::Exhausted {
555 exchange: original,
556 error,
557 policy: Some(PolicyId(0)),
558 }
559 }
560
561 async fn handle_step(
562 &self,
563 _: Option<PolicyId>,
564 mut ex: Exchange,
565 _: CamelError,
566 ) -> Result<StepDisposition, CamelError> {
567 ex.clear_error();
568 Ok(StepDisposition::Continued(ex))
569 }
570
571 async fn handle_boundary(
572 &self,
573 _: BoundaryKind,
574 ex: Exchange,
575 _: CamelError,
576 ) -> Result<Exchange, CamelError> {
577 Ok(ex)
578 }
579 }
580
581 struct PropagateHandler;
583
584 #[async_trait::async_trait]
585 impl RouteErrorHandler for PropagateHandler {
586 fn match_policy(&self, _: &CamelError) -> Option<PolicyId> {
587 None
588 }
589
590 async fn retry_step(
591 &self,
592 _: Option<PolicyId>,
593 _: &mut BoxProcessor,
594 original: Exchange,
595 error: CamelError,
596 ) -> RetryOutcome {
597 RetryOutcome::Exhausted {
598 exchange: original,
599 error,
600 policy: None,
601 }
602 }
603
604 async fn handle_step(
605 &self,
606 _: Option<PolicyId>,
607 _ex: Exchange,
608 error: CamelError,
609 ) -> Result<StepDisposition, CamelError> {
610 Ok(StepDisposition::Propagate(error))
611 }
612
613 async fn handle_boundary(
614 &self,
615 _: BoundaryKind,
616 ex: Exchange,
617 _: CamelError,
618 ) -> Result<Exchange, CamelError> {
619 Ok(ex)
620 }
621 }
622
623 #[derive(Clone)]
625 struct DelayedReadyService {
626 ready: Arc<AtomicBool>,
627 }
628
629 impl Service<Exchange> for DelayedReadyService {
630 type Response = Exchange;
631 type Error = CamelError;
632 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
633
634 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
635 if self.ready.fetch_or(true, Ordering::SeqCst) {
636 Poll::Ready(Ok(()))
637 } else {
638 cx.waker().wake_by_ref();
639 Poll::Pending
640 }
641 }
642
643 fn call(&mut self, ex: Exchange) -> Self::Future {
644 Box::pin(async move { Ok(ex) })
645 }
646 }
647
648 #[test]
649 fn test_pipeline_poll_ready_delegates_to_first_step() {
650 let waker = futures::task::noop_waker();
651 let mut cx = Context::from_waker(&waker);
652
653 let inner = DelayedReadyService {
654 ready: Arc::new(AtomicBool::new(false)),
655 };
656 let boxed = BoxProcessor::new(inner);
657 let mut pipeline = SequentialPipeline {
658 steps: vec![CompiledStep::Process {
659 processor: boxed,
660 body_contract: None,
661 }],
662 handler: None,
663 flatten_stop: true,
664 };
665
666 let first = pipeline.poll_ready(&mut cx);
667 assert!(first.is_pending(), "expected Pending on first poll_ready");
668
669 let second = pipeline.poll_ready(&mut cx);
670 assert!(second.is_ready(), "expected Ready on second poll_ready");
671 }
672
673 #[test]
674 fn test_pipeline_poll_ready_with_empty_steps() {
675 let waker = futures::task::noop_waker();
676 let mut cx = Context::from_waker(&waker);
677
678 let mut pipeline = SequentialPipeline {
679 steps: vec![],
680 handler: None,
681 flatten_stop: true,
682 };
683 let result = pipeline.poll_ready(&mut cx);
684 assert!(result.is_ready(), "expected Ready for empty pipeline");
685 }
686
687 #[tokio::test]
688 async fn test_pipeline_stop_returns_ok_with_exchange() {
689 let stop_step = CompiledStep::Stop;
690 let after_called = Arc::new(AtomicBool::new(false));
691 let after_called_clone = after_called.clone();
692 let after_step = CompiledStep::Process {
693 processor: BoxProcessor::from_fn(move |ex| {
694 after_called_clone.store(true, Ordering::SeqCst);
695 Box::pin(async move { Ok(ex) })
696 }),
697 body_contract: None,
698 };
699
700 let mut pipeline = SequentialPipeline {
701 steps: vec![stop_step, after_step],
702 handler: None,
703 flatten_stop: true,
704 };
705
706 let ex = Exchange::new(camel_api::Message::new("hello"));
707 let result = pipeline.call(ex).await;
708 assert!(result.is_ok(), "expected Ok, got: {:?}", result);
710 assert_eq!(result.unwrap().input.body.as_text(), Some("hello"));
711 assert!(
712 !after_called.load(Ordering::SeqCst),
713 "step after stop should not be called"
714 );
715 }
716
717 #[tokio::test]
718 async fn test_run_steps_stop_produces_pipeline_outcome_stopped() {
719 use camel_api::PipelineOutcome;
720 let steps = vec![
722 CompiledStep::Stop,
723 CompiledStep::Process {
724 processor: BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })),
725 body_contract: None,
726 },
727 ];
728 let ex = Exchange::new(camel_api::Message::new("payload"));
729 let outcome = run_steps(steps, ex, None, false).await;
730 match outcome {
731 PipelineOutcome::Stopped(returned) => {
732 assert_eq!(returned.input.body.as_text(), Some("payload"));
733 }
734 other => panic!(
735 "expected PipelineOutcome::Stopped, got {:?}",
736 other.is_success()
737 ),
738 }
739 }
740
741 #[tokio::test]
742 async fn test_run_steps_stop_bypasses_error_handler() {
743 use camel_api::PipelineOutcome;
744 use camel_api::error_handler::{BoundaryKind, PolicyId, RetryOutcome, StepDisposition};
745 use camel_processor::RouteErrorHandler;
746 use std::sync::atomic::{AtomicUsize, Ordering};
747
748 let handler_invocations = Arc::new(AtomicUsize::new(0));
749 let counter = Arc::clone(&handler_invocations);
750
751 struct RecordingHandler {
753 counter: Arc<AtomicUsize>,
754 }
755 #[async_trait::async_trait]
756 impl RouteErrorHandler for RecordingHandler {
757 fn match_policy(&self, _err: &CamelError) -> Option<PolicyId> {
758 self.counter.fetch_add(1, Ordering::SeqCst);
759 None
760 }
761 async fn retry_step(
762 &self,
763 _policy: Option<PolicyId>,
764 _step: &mut camel_api::BoxProcessor,
765 _original: Exchange,
766 _error: CamelError,
767 ) -> RetryOutcome {
768 self.counter.fetch_add(1, Ordering::SeqCst);
769 unreachable!("retry_step must not be called for CompiledStep::Stop")
770 }
771 async fn handle_step(
772 &self,
773 _policy: Option<PolicyId>,
774 _exchange: Exchange,
775 _error: CamelError,
776 ) -> Result<StepDisposition, CamelError> {
777 self.counter.fetch_add(1, Ordering::SeqCst);
778 unreachable!("handle_step must not be called for CompiledStep::Stop")
779 }
780 async fn handle_boundary(
781 &self,
782 _kind: BoundaryKind,
783 _exchange: Exchange,
784 _error: CamelError,
785 ) -> Result<Exchange, CamelError> {
786 self.counter.fetch_add(1, Ordering::SeqCst);
787 unreachable!("handle_boundary must not be called for CompiledStep::Stop")
788 }
789 }
790
791 let steps = vec![CompiledStep::Stop];
792 let ex = Exchange::new(camel_api::Message::new("payload"));
793 let outcome = run_steps(
794 steps,
795 ex,
796 Some(Arc::new(RecordingHandler { counter })),
797 false,
798 )
799 .await;
800
801 assert!(matches!(outcome, PipelineOutcome::Stopped(_)));
802 assert_eq!(
803 handler_invocations.load(Ordering::SeqCst),
804 0,
805 "error handler MUST NOT be invoked for CompiledStep::Stop"
806 );
807 }
808
809 #[tokio::test]
810 async fn test_compose_traced_pipeline_disabled() {
811 let pipeline = compose_traced_pipeline(
812 vec![],
813 "test-route",
814 false,
815 DetailLevel::Minimal,
816 None,
817 None,
818 );
819 let ex = Exchange::new(camel_api::Message::new("hello"));
820 let result = tower::ServiceExt::oneshot(pipeline, ex).await;
821 assert!(result.is_ok());
822 }
823
824 #[tokio::test]
825 async fn test_compose_traced_pipeline_enabled() {
826 let step = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
827 let pipeline = compose_traced_pipeline(
828 vec![CompiledStep::Process {
829 processor: step,
830 body_contract: None,
831 }],
832 "test-route",
833 true,
834 DetailLevel::Minimal,
835 None,
836 None,
837 );
838 let ex = Exchange::new(camel_api::Message::new("hello"));
839 let result = tower::ServiceExt::oneshot(pipeline, ex).await;
840 assert!(result.is_ok());
841 }
842
843 #[tokio::test]
844 async fn test_compose_pipeline_with_contracts_coerces_before_inner_processor() {
845 let seen_body = Arc::new(Mutex::new(None::<Body>));
846 let seen_body_clone = Arc::clone(&seen_body);
847
848 let inner = BoxProcessor::from_fn(move |ex: Exchange| {
849 let seen_body_clone = Arc::clone(&seen_body_clone);
850 Box::pin(async move {
851 *seen_body_clone.lock().expect("lock seen body") = Some(ex.input.body.clone());
852 Ok(ex)
853 })
854 });
855
856 let pipeline = compose_pipeline_with_contracts(
857 vec![CompiledStep::Process {
858 processor: inner,
859 body_contract: Some(camel_api::BodyType::Text),
860 }],
861 None,
862 );
863
864 let mut ex = Exchange::new(Message::default());
865 ex.input.body = Body::Json(json!("hello"));
866
867 let result = tower::ServiceExt::oneshot(pipeline, ex).await;
868 assert!(result.is_ok());
869
870 let observed = seen_body.lock().expect("lock seen body").clone();
871 assert_eq!(observed, Some(Body::Text("hello".to_string())));
872 }
873
874 #[tokio::test]
875 async fn test_run_steps_continued_skips_failed_step() {
876 let step1 = CompiledStep::Process {
877 processor: BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })),
878 body_contract: None,
879 };
880 let step2 = CompiledStep::Process {
881 processor: BoxProcessor::from_fn(|_ex| {
882 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
883 }),
884 body_contract: None,
885 };
886 let step3_hit = Arc::new(AtomicBool::new(false));
887 let hit = step3_hit.clone();
888 let step3 = CompiledStep::Process {
889 processor: BoxProcessor::from_fn(move |ex| {
890 let hit = hit.clone();
891 Box::pin(async move {
892 hit.store(true, Ordering::SeqCst);
893 Ok(ex)
894 })
895 }),
896 body_contract: None,
897 };
898
899 let handler: Arc<dyn RouteErrorHandler> = Arc::new(ContinuedHandler);
900 let outcome = run_steps(
901 vec![step1, step2, step3],
902 make_test_exchange(),
903 Some(handler),
904 false,
905 )
906 .await;
907 assert!(
908 matches!(outcome, PipelineOutcome::Completed(_)),
909 "expected Completed, got: {:?}",
910 outcome.is_success()
911 );
912 assert!(
913 step3_hit.load(Ordering::SeqCst),
914 "step 3 should have executed after continued"
915 );
916 }
917
918 #[tokio::test]
921 async fn test_route_channel_pipeline_propagate_returns_err() {
922 let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
923 let failing_step = BoxProcessor::from_fn(|_ex| {
924 Box::pin(async { Err(CamelError::ProcessorError("step boom".into())) })
925 });
926 let pipeline = compose_pipeline_with_handler(
927 vec![CompiledStep::Process {
928 processor: failing_step,
929 body_contract: None,
930 }],
931 Some(handler.clone()),
932 );
933 let channel = RouteChannelService::new(handler.clone(), None, None, pipeline);
934 let mut svc = BoxProcessor::new(channel);
935 let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
936 assert!(result.is_err(), "Propagate should return Err");
937 }
938
939 #[tokio::test]
940 async fn test_route_channel_security_error_calls_boundary() {
941 let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
942 let deny_all = BoxProcessor::from_fn(|_ex| {
943 Box::pin(async { Err(CamelError::Unauthorized("denied".into())) })
944 });
945 let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
946 let channel = RouteChannelService::new(handler.clone(), Some(deny_all), None, pipeline);
947 let mut svc = BoxProcessor::new(channel);
948 let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
949 assert!(
950 result.is_ok(),
951 "boundary errors should be absorbed by handler"
952 );
953 }
954
955 #[tokio::test]
956 async fn test_route_channel_cb_reject_calls_boundary() {
957 let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
958 let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
959 failure_threshold: 1,
960 open_duration: Duration::from_secs(60),
961 success_threshold: 1,
962 fallback: None,
963 });
964 cb_gate.after_result(&Err(CamelError::ProcessorError("force open".into())));
965 let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
966 let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline);
967 let mut svc = BoxProcessor::new(channel);
968 let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
969 assert!(
970 result.is_ok(),
971 "CB reject should call handle_boundary and return Ok"
972 );
973 }
974
975 #[tokio::test]
976 async fn test_route_channel_cb_fallback_executes_fallback() {
977 let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
978 let fallback = BoxProcessor::from_fn(|mut ex| {
979 Box::pin(async move {
980 ex.input.set_header("from_fallback", Value::Bool(true));
981 Ok(ex)
982 })
983 });
984 let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
985 failure_threshold: 1,
986 open_duration: Duration::from_secs(60),
987 success_threshold: 1,
988 fallback: Some(fallback),
989 });
990 cb_gate.after_result(&Err(CamelError::ProcessorError("force open".into())));
991 let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
992 let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline);
993 let mut svc = BoxProcessor::new(channel);
994 let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
995 assert!(result.is_ok(), "fallback should succeed");
996 assert_eq!(
997 result.unwrap().input.header("from_fallback"),
998 Some(&Value::Bool(true)),
999 "should have executed fallback processor",
1000 );
1001 }
1002
1003 #[tokio::test]
1004 async fn test_route_channel_cb_fallback_failure_calls_boundary() {
1005 let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
1007 let failing_fallback = BoxProcessor::from_fn(|_ex| {
1008 Box::pin(async { Err(CamelError::ProcessorError("fallback broken".into())) })
1009 });
1010 let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
1011 failure_threshold: 1,
1012 open_duration: Duration::from_secs(60),
1013 success_threshold: 1,
1014 fallback: Some(failing_fallback),
1015 });
1016 cb_gate.after_result(&Err(CamelError::ProcessorError("force open".into())));
1017
1018 let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
1019 let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline);
1020
1021 let mut svc = BoxProcessor::new(channel);
1022 let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
1023 assert!(
1025 result.is_ok(),
1026 "fallback failure should go through handle_boundary, not raw Err"
1027 );
1028 }
1029
1030 #[tokio::test]
1031 async fn test_route_channel_cb_counts_stopped_as_success() {
1032 let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
1036 let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
1037 failure_threshold: 2,
1038 open_duration: Duration::from_secs(60),
1039 success_threshold: 1,
1040 fallback: None,
1041 });
1042 let cb_clone = cb_gate.clone();
1043
1044 let pipeline = compose_pipeline_with_handler(vec![CompiledStep::Stop], None);
1046
1047 let channel = RouteChannelService::new(handler, None, Some(cb_gate), pipeline);
1048
1049 let ex1 = Exchange::new(camel_api::Message::new("a"));
1051 let ex2 = Exchange::new(camel_api::Message::new("b"));
1052 let r1 = tower::ServiceExt::oneshot(channel.clone(), ex1).await;
1053 let r2 = tower::ServiceExt::oneshot(channel, ex2).await;
1054 assert!(r1.is_ok(), "Stop must arrive as Ok via RouteChannelService");
1055 assert!(r2.is_ok(), "Stop must arrive as Ok via RouteChannelService");
1056
1057 assert!(
1059 matches!(cb_clone.before_call(), CircuitBreakerDecision::Allow),
1060 "CB should count Stop as success"
1061 );
1062 }
1063}