Skip to main content

camel_core/lifecycle/adapters/
route_compiler.rs

1// adapters/route_compiler.rs
2// Pipeline compilation functions: compose BuilderSteps into a Tower BoxProcessor.
3// Tower types live here as this is the adapter layer responsible for
4// translating declarative route definitions into executable pipelines.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tower::Service;
12
13use camel_api::metrics::MetricsCollector;
14use camel_api::{BoxProcessor, CamelError, Exchange, IdentityProcessor, PipelineOutcome};
15
16use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
17use camel_processor::{
18    CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
19};
20use tracing::Instrument;
21
22use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
23use crate::lifecycle::adapters::step_compilers::CompiledStep;
24use crate::shared::observability::adapters::TracingProcessor;
25use crate::shared::observability::domain::DetailLevel;
26
27/// Compose a list of CompiledSteps into a sub-pipeline (EIP internal).
28///
29/// Sub-pipelines preserve `Err(CamelError::Stopped)` so nested Stop
30/// propagates to the outer `run_steps` bypass. Use [`compose_pipeline_with_handler`]
31/// for the top-level consumer-facing pipeline.
32pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
33    if processors.is_empty() {
34        return BoxProcessor::new(IdentityProcessor);
35    }
36    BoxProcessor::new(SequentialPipeline {
37        steps: processors,
38        handler: None,
39        flatten_stop: false,
40    })
41}
42
43/// Compose a list of CompiledSteps with an optional route error handler.
44///
45/// When a handler is present, step readiness errors are swallowed (poll_ready
46/// returns Ready) and the handler's retry/recovery logic is invoked on step
47/// failures. Otherwise, step readiness errors propagate immediately.
48pub fn compose_pipeline_with_handler(
49    processors: Vec<CompiledStep>,
50    handler: Option<Arc<dyn RouteErrorHandler>>,
51) -> BoxProcessor {
52    if processors.is_empty() {
53        return BoxProcessor::new(IdentityProcessor);
54    }
55    BoxProcessor::new(SequentialPipeline {
56        steps: processors,
57        handler,
58        flatten_stop: true, // top-level: flatten Stop to Ok(ex) — Bug B fix
59    })
60}
61
62/// Compose a list of CompiledSteps into a traced pipeline.
63///
64/// Each processor is wrapped with TracingProcessor to emit spans for observability.
65/// When tracing is disabled, falls back to plain compose_pipeline with zero overhead.
66pub fn compose_traced_pipeline(
67    processors: Vec<CompiledStep>,
68    route_id: &str,
69    trace_enabled: bool,
70    detail_level: DetailLevel,
71    metrics: Option<Arc<dyn MetricsCollector>>,
72    handler: Option<Arc<dyn RouteErrorHandler>>,
73) -> BoxProcessor {
74    if !trace_enabled {
75        return compose_pipeline_with_handler(processors, handler);
76    }
77
78    if processors.is_empty() {
79        return BoxProcessor::new(IdentityProcessor);
80    }
81
82    let wrapped: Vec<CompiledStep> = processors
83        .into_iter()
84        .enumerate()
85        .map(|(idx, step)| {
86            let (p, c) = match step {
87                CompiledStep::Process {
88                    processor,
89                    body_contract,
90                } => (processor, body_contract),
91                CompiledStep::Stop => return CompiledStep::Stop,
92            };
93            let traced = BoxProcessor::new(TracingProcessor::new(
94                p,
95                route_id.to_string(),
96                idx,
97                detail_level.clone(),
98                metrics.clone(),
99            ));
100            CompiledStep::Process {
101                processor: traced,
102                body_contract: c,
103            }
104        })
105        .collect();
106
107    BoxProcessor::new(TracedPipeline {
108        steps: wrapped,
109        handler,
110        flatten_stop: false, // sub-pipeline: preserve Err(Stopped) for EIP nesting
111    })
112}
113
114/// Compose a list of `CompiledStep` items into a single pipeline with body coercion.
115///
116/// Each processor is optionally wrapped with [`BodyCoercingProcessor`] based on its
117/// contract. Processors with `None` contract are passed through with zero overhead.
118/// `CompiledStep::Stop` passes through without coercion.
119pub fn compose_pipeline_with_contracts(
120    processors: Vec<CompiledStep>,
121    handler: Option<Arc<dyn RouteErrorHandler>>,
122) -> BoxProcessor {
123    let wrapped: Vec<CompiledStep> = processors
124        .into_iter()
125        .map(|step| match step {
126            CompiledStep::Process {
127                processor,
128                body_contract,
129            } => {
130                let coerced = wrap_if_needed(processor, body_contract);
131                CompiledStep::Process {
132                    processor: coerced,
133                    body_contract: None,
134                }
135            }
136            CompiledStep::Stop => CompiledStep::Stop,
137        })
138        .collect();
139    compose_pipeline_with_handler(wrapped, handler)
140}
141
142/// Compose a list of `CompiledStep` items into a traced pipeline with body coercion.
143///
144/// Applies body coercion contracts first, then wraps with `TracingProcessor`.
145/// When tracing is disabled, falls back to [`compose_pipeline_with_contracts`].
146pub(crate) fn compose_traced_pipeline_with_contracts(
147    processors: Vec<CompiledStep>,
148    route_id: &str,
149    trace_enabled: bool,
150    detail_level: DetailLevel,
151    metrics: Option<Arc<dyn MetricsCollector>>,
152    handler: Option<Arc<dyn RouteErrorHandler>>,
153) -> BoxProcessor {
154    if !trace_enabled {
155        return compose_pipeline_with_contracts(processors, handler);
156    }
157
158    if processors.is_empty() {
159        return BoxProcessor::new(IdentityProcessor);
160    }
161
162    let wrapped: Vec<CompiledStep> = processors
163        .into_iter()
164        .enumerate()
165        .map(|(idx, step)| match step {
166            CompiledStep::Process {
167                processor,
168                body_contract,
169            } => {
170                let coerced = wrap_if_needed(processor, body_contract);
171                let traced = BoxProcessor::new(TracingProcessor::new(
172                    coerced,
173                    route_id.to_string(),
174                    idx,
175                    detail_level.clone(),
176                    metrics.clone(),
177                ));
178                CompiledStep::Process {
179                    processor: traced,
180                    body_contract: None,
181                }
182            }
183            CompiledStep::Stop => CompiledStep::Stop,
184        })
185        .collect();
186
187    BoxProcessor::new(TracedPipeline {
188        steps: wrapped,
189        handler,
190        flatten_stop: true, // top-level: flatten Stop to Ok(ex) — Bug B fix
191    })
192}
193
194/// A service that executes a sequence of CompiledSteps in order.
195///
196/// When `flatten_stop` is true (top-level route pipeline), `Stopped(ex)` is
197/// flattened to `Ok(ex)` via `into_tower_result()` — the Bug B fix that makes
198/// Stop indistinguishable from Completed at the consumer boundary.
199/// When `flatten_stop` is false (EIP sub-pipeline), `Stopped(ex)` remapped to
200/// `Err(CamelError::Stopped)` so nested Stop propagates to the outer pipeline's
201/// `run_steps` bypass (per e_gpt oracle Option E, 2026-06-22).
202#[derive(Clone)]
203struct SequentialPipeline {
204    steps: Vec<CompiledStep>,
205    handler: Option<Arc<dyn RouteErrorHandler>>,
206    flatten_stop: bool,
207}
208
209impl Service<Exchange> for SequentialPipeline {
210    type Response = Exchange;
211    type Error = CamelError;
212    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
213
214    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
215        match self.steps.first() {
216            Some(CompiledStep::Process { processor, .. }) => {
217                let mut proc = processor.clone();
218                match proc.poll_ready(cx) {
219                    Poll::Pending => Poll::Pending,
220                    Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
221                    Poll::Ready(other) => Poll::Ready(other),
222                }
223            }
224            Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
225            None => Poll::Ready(Ok(())),
226        }
227    }
228
229    // ADR-0024 reply-channel adapter: PipelineOutcome → Result<Exchange, CamelError>.
230    // This is the ONLY translation site. Completed(ex) and Stopped(ex) both map to
231    // Ok(ex); Failed(err) maps to Err. Downstream consumers (RouteChannelService,
232    // ExchangeUoWLayer, HTTP/Kafka reply finalisers) see Result<Exchange, CamelError>
233    // and treat Stop as success — the core fix for Bug B.
234    fn call(&mut self, exchange: Exchange) -> Self::Future {
235        let steps = self.steps.clone();
236        let handler = self.handler.clone();
237        let flatten = self.flatten_stop;
238        Box::pin(async move {
239            let outcome = run_steps(steps, exchange, handler, false).await;
240            if flatten {
241                outcome.into_tower_result()
242            } else {
243                eip_outcome_to_result(outcome)
244            }
245        })
246    }
247}
248
249/// A traced service pipeline for wrapped CompiledSteps.
250#[derive(Clone)]
251struct TracedPipeline {
252    steps: Vec<CompiledStep>,
253    handler: Option<Arc<dyn RouteErrorHandler>>,
254    flatten_stop: bool,
255}
256
257impl Service<Exchange> for TracedPipeline {
258    type Response = Exchange;
259    type Error = CamelError;
260    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
261
262    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
263        match self.steps.first() {
264            Some(CompiledStep::Process { processor, .. }) => {
265                let mut proc = processor.clone();
266                match proc.poll_ready(cx) {
267                    Poll::Pending => Poll::Pending,
268                    Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
269                    Poll::Ready(other) => Poll::Ready(other),
270                }
271            }
272            Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
273            None => Poll::Ready(Ok(())),
274        }
275    }
276
277    // ADR-0024 reply-channel adapter (same as SequentialPipeline::call):
278    // Completed(ex) and Stopped(ex) both map to Ok(ex). Bug B fix.
279    fn call(&mut self, exchange: Exchange) -> Self::Future {
280        let steps = self.steps.clone();
281        let handler = self.handler.clone();
282        let flatten = self.flatten_stop;
283        Box::pin(async move {
284            let outcome = run_steps(steps, exchange, handler, true).await;
285            if flatten {
286                outcome.into_tower_result()
287            } else {
288                eip_outcome_to_result(outcome)
289            }
290        })
291    }
292}
293
294/// Translate a `PipelineOutcome` to `Result<Exchange, CamelError>` for the Tower
295/// boundary of an EIP sub-pipeline. Unlike `into_tower_result` (which maps
296/// `Stopped(ex)` → `Ok(ex)` for the top-level consumer reply), this preserves
297/// `Err(CamelError::Stopped)` so nested Stop propagates to the outer `run_steps`
298/// bypass in `stop_inside_filter_prevents_outer_pipeline` and similar EIP tests.
299/// This is the internal sub-pipeline adapter — removed once EIPs become
300/// outcome-aware (per e_gpt oracle Option E, 2026-06-22).
301fn eip_outcome_to_result(outcome: PipelineOutcome) -> Result<Exchange, CamelError> {
302    match outcome {
303        PipelineOutcome::Completed(ex) => Ok(ex),
304        PipelineOutcome::Stopped(_ex) => Err(CamelError::Stopped),
305        PipelineOutcome::Failed(err) => Err(err),
306    }
307}
308
309/// Run a sequence of CompiledSteps with optional error recovery.
310///
311/// Each CompiledStep::Process is executed via `invoke_processor`. On failure:
312/// 1. If a handler is present, `match_policy` selects a retry policy.
313/// 2. `retry_step` attempts recovery; if exhausted, `handle_step` determines
314///    the disposition:
315///    - `Propagate` — return the error
316///    - `Handled` — return the exchange early (success)
317///    - `Continued` — clear the error and continue to the next step
318/// 3. If no handler is present, the error is propagated directly.
319///
320/// CompiledStep::Stop short-circuits to `PipelineOutcome::Stopped(ex)` — the
321/// handler is bypassed and no Tower service is invoked (ADR-0024 §3.5).
322pub async fn run_steps(
323    steps: Vec<CompiledStep>,
324    exchange: Exchange,
325    handler: Option<Arc<dyn RouteErrorHandler>>,
326    trace: bool,
327) -> PipelineOutcome {
328    use camel_api::PipelineOutcome;
329    let mut ex = exchange;
330    for (i, step) in steps.into_iter().enumerate() {
331        let CompiledStep::Process { mut processor, .. } = step else {
332            // CompiledStep::Stop — short-circuit to Stopped outcome WITHOUT
333            // invoking a Tower service. The handler is bypassed (ADR-0024 §3.5).
334            return PipelineOutcome::Stopped(ex);
335        };
336        let original = ex.clone();
337        let invoke_future = invoke_processor(&mut processor, ex);
338        let result = if trace {
339            invoke_future
340                .instrument(tracing::debug_span!("pipeline_step", index = i))
341                .await
342        } else {
343            invoke_future.await
344        };
345        match result {
346            Ok(next) => {
347                ex = next;
348            }
349            Err(err) => {
350                // INTERIM (e_gpt oracle Option E, 2026-06-22): Err(CamelError::Stopped)
351                // comes from a nested sub-pipeline (CompiledStep::Stop was mapped to
352                // StopService in the sub-pipeline compiler — see control_flow.rs et al).
353                // The handler is bypassed; translate to PipelineOutcome::Stopped using
354                // the Exchange from BEFORE the step ran. This preserves Apache Camel
355                // nested-stop semantics (Stop inside filter/choice/loop/multicast/split
356                // halts the entire route). Removed in Task 22 once EIPs are outcome-aware.
357                if matches!(err, CamelError::Stopped) {
358                    return PipelineOutcome::Stopped(original);
359                }
360                // NOTE: The CompiledStep::Stop at the top of the loop handles the
361                // top-level Stop. The check above handles the DISTINCT case of
362                // nested Stop propagating through Tower Response from a sub-pipeline.
363
364                let Some(handler) = handler.as_ref() else {
365                    return PipelineOutcome::Failed(err);
366                };
367
368                let policy = handler.match_policy(&err);
369                match handler
370                    .retry_step(policy, &mut processor, original, err)
371                    .await
372                {
373                    RetryOutcome::Recovered(exchange) => {
374                        ex = exchange;
375                        continue;
376                    }
377                    RetryOutcome::Exhausted {
378                        exchange,
379                        error,
380                        policy,
381                    } => {
382                        let handle_future = handler.handle_step(policy, exchange, error);
383                        let disposition = if trace {
384                            handle_future
385                                .instrument(tracing::debug_span!("error_handler", step_index = i))
386                                .await
387                        } else {
388                            handle_future.await
389                        };
390                        match disposition {
391                            Ok(StepDisposition::Propagate(e)) => return PipelineOutcome::Failed(e),
392                            Ok(StepDisposition::Handled(done)) => {
393                                return PipelineOutcome::Completed(done);
394                            }
395                            Ok(StepDisposition::Continued(next)) => {
396                                ex = next;
397                            }
398                            Err(e) => return PipelineOutcome::Failed(e),
399                        }
400                    }
401                }
402            }
403        }
404    }
405    PipelineOutcome::Completed(ex)
406}
407
408/// Route channel with explicit security and circuit-breaker gates.
409///
410/// Gate order: Security → CB(before_call) → Pipeline → CB(after_result).
411/// Errors from Security/CB gates go to `handler.handle_boundary`.
412/// Errors from Pipeline go through the injected handler's retry/handle_step.
413/// Pipeline Propagate returns Err — passed through to upstream.
414#[derive(Clone)]
415pub struct RouteChannelService {
416    handler: Arc<dyn RouteErrorHandler>,
417    security: Option<BoxProcessor>,
418    cb_gate: Option<CircuitBreakerGate>,
419    pipeline: BoxProcessor,
420}
421
422impl RouteChannelService {
423    pub fn new(
424        handler: Arc<dyn RouteErrorHandler>,
425        security: Option<BoxProcessor>,
426        cb_gate: Option<CircuitBreakerGate>,
427        pipeline: BoxProcessor,
428    ) -> Self {
429        Self {
430            handler,
431            security,
432            cb_gate,
433            pipeline,
434        }
435    }
436}
437
438impl Service<Exchange> for RouteChannelService {
439    type Response = Exchange;
440    type Error = CamelError;
441    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
442
443    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
444        // Swallow readiness errors from security gate — deferred to call()
445        if let Some(ref mut sec) = self.security {
446            match sec.clone().poll_ready(cx) {
447                Poll::Pending => return Poll::Pending,
448                Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
449            }
450        }
451        // Pipeline readiness — swallow errors when handler present
452        match self.pipeline.clone().poll_ready(cx) {
453            Poll::Pending => return Poll::Pending,
454            Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
455        }
456        Poll::Ready(Ok(()))
457    }
458
459    fn call(&mut self, exchange: Exchange) -> Self::Future {
460        let handler = self.handler.clone();
461        let security = self.security.clone();
462        let cb_gate = self.cb_gate.clone();
463        let mut pipeline = self.pipeline.clone();
464
465        Box::pin(async move {
466            let mut ex = exchange;
467
468            // Gate 1: Security
469            if let Some(mut sec) = security {
470                let original = ex.clone();
471                match invoke_processor(&mut sec, ex).await {
472                    Ok(next) => ex = next,
473                    Err(err) => {
474                        return handler
475                            .handle_boundary(BoundaryKind::Security, original, err)
476                            .await;
477                    }
478                }
479            }
480
481            // Gate 2: CircuitBreaker — before_call
482            if let Some(ref cb) = cb_gate {
483                match cb.before_call() {
484                    CircuitBreakerDecision::Allow => { /* proceed to pipeline */ }
485                    CircuitBreakerDecision::Fallback(mut fb) => {
486                        // Circuit open with fallback — call fallback.
487                        // Fallback errors go through handle_boundary, not raw to upstream.
488                        let original = ex.clone();
489                        match invoke_processor(&mut fb, ex).await {
490                            Ok(result) => return Ok(result),
491                            Err(err) => {
492                                return handler
493                                    .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
494                                    .await;
495                            }
496                        }
497                    }
498                    CircuitBreakerDecision::Reject(err) => {
499                        let original = ex.clone();
500                        return handler
501                            .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
502                            .await;
503                    }
504                }
505            }
506
507            // Pipeline (handler already injected for step errors)
508            let result = invoke_processor(&mut pipeline, ex).await;
509
510            // Gate 2: CircuitBreaker — after_result
511            if let Some(ref cb) = cb_gate {
512                cb.after_result(&result);
513            }
514
515            // Propagate from inner handler — pass through to upstream
516            result
517        })
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use camel_api::error_handler::{BoundaryKind, PolicyId, RetryOutcome, StepDisposition};
525    use camel_api::{Body, BoxProcessorExt, CircuitBreakerConfig, Message, Value};
526    use camel_processor::RouteErrorHandler;
527    use serde_json::json;
528    use std::sync::Arc;
529    use std::sync::Mutex;
530    use std::sync::atomic::{AtomicBool, Ordering};
531    use std::time::Duration;
532    use tower::ServiceExt;
533
534    fn make_test_exchange() -> Exchange {
535        Exchange::new(Message::new("test"))
536    }
537
538    /// Test double for RouteErrorHandler that returns Continued disposition.
539    struct ContinuedHandler;
540
541    #[async_trait::async_trait]
542    impl RouteErrorHandler for ContinuedHandler {
543        fn match_policy(&self, _: &CamelError) -> Option<PolicyId> {
544            Some(PolicyId(0))
545        }
546
547        async fn retry_step(
548            &self,
549            _: Option<PolicyId>,
550            _: &mut BoxProcessor,
551            original: Exchange,
552            error: CamelError,
553        ) -> RetryOutcome {
554            RetryOutcome::Exhausted {
555                exchange: original,
556                error,
557                policy: Some(PolicyId(0)),
558            }
559        }
560
561        async fn handle_step(
562            &self,
563            _: Option<PolicyId>,
564            mut ex: Exchange,
565            _: CamelError,
566        ) -> Result<StepDisposition, CamelError> {
567            ex.clear_error();
568            Ok(StepDisposition::Continued(ex))
569        }
570
571        async fn handle_boundary(
572            &self,
573            _: BoundaryKind,
574            ex: Exchange,
575            _: CamelError,
576        ) -> Result<Exchange, CamelError> {
577            Ok(ex)
578        }
579    }
580
581    /// Test double for RouteErrorHandler that returns Propagate disposition.
582    struct PropagateHandler;
583
584    #[async_trait::async_trait]
585    impl RouteErrorHandler for PropagateHandler {
586        fn match_policy(&self, _: &CamelError) -> Option<PolicyId> {
587            None
588        }
589
590        async fn retry_step(
591            &self,
592            _: Option<PolicyId>,
593            _: &mut BoxProcessor,
594            original: Exchange,
595            error: CamelError,
596        ) -> RetryOutcome {
597            RetryOutcome::Exhausted {
598                exchange: original,
599                error,
600                policy: None,
601            }
602        }
603
604        async fn handle_step(
605            &self,
606            _: Option<PolicyId>,
607            _ex: Exchange,
608            error: CamelError,
609        ) -> Result<StepDisposition, CamelError> {
610            Ok(StepDisposition::Propagate(error))
611        }
612
613        async fn handle_boundary(
614            &self,
615            _: BoundaryKind,
616            ex: Exchange,
617            _: CamelError,
618        ) -> Result<Exchange, CamelError> {
619            Ok(ex)
620        }
621    }
622
623    /// A service that returns `Pending` on the first `poll_ready`, then `Ready`.
624    #[derive(Clone)]
625    struct DelayedReadyService {
626        ready: Arc<AtomicBool>,
627    }
628
629    impl Service<Exchange> for DelayedReadyService {
630        type Response = Exchange;
631        type Error = CamelError;
632        type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
633
634        fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
635            if self.ready.fetch_or(true, Ordering::SeqCst) {
636                Poll::Ready(Ok(()))
637            } else {
638                cx.waker().wake_by_ref();
639                Poll::Pending
640            }
641        }
642
643        fn call(&mut self, ex: Exchange) -> Self::Future {
644            Box::pin(async move { Ok(ex) })
645        }
646    }
647
648    #[test]
649    fn test_pipeline_poll_ready_delegates_to_first_step() {
650        let waker = futures::task::noop_waker();
651        let mut cx = Context::from_waker(&waker);
652
653        let inner = DelayedReadyService {
654            ready: Arc::new(AtomicBool::new(false)),
655        };
656        let boxed = BoxProcessor::new(inner);
657        let mut pipeline = SequentialPipeline {
658            steps: vec![CompiledStep::Process {
659                processor: boxed,
660                body_contract: None,
661            }],
662            handler: None,
663            flatten_stop: true,
664        };
665
666        let first = pipeline.poll_ready(&mut cx);
667        assert!(first.is_pending(), "expected Pending on first poll_ready");
668
669        let second = pipeline.poll_ready(&mut cx);
670        assert!(second.is_ready(), "expected Ready on second poll_ready");
671    }
672
673    #[test]
674    fn test_pipeline_poll_ready_with_empty_steps() {
675        let waker = futures::task::noop_waker();
676        let mut cx = Context::from_waker(&waker);
677
678        let mut pipeline = SequentialPipeline {
679            steps: vec![],
680            handler: None,
681            flatten_stop: true,
682        };
683        let result = pipeline.poll_ready(&mut cx);
684        assert!(result.is_ready(), "expected Ready for empty pipeline");
685    }
686
687    #[tokio::test]
688    async fn test_pipeline_stop_returns_ok_with_exchange() {
689        let stop_step = CompiledStep::Stop;
690        let after_called = Arc::new(AtomicBool::new(false));
691        let after_called_clone = after_called.clone();
692        let after_step = CompiledStep::Process {
693            processor: BoxProcessor::from_fn(move |ex| {
694                after_called_clone.store(true, Ordering::SeqCst);
695                Box::pin(async move { Ok(ex) })
696            }),
697            body_contract: None,
698        };
699
700        let mut pipeline = SequentialPipeline {
701            steps: vec![stop_step, after_step],
702            handler: None,
703            flatten_stop: true,
704        };
705
706        let ex = Exchange::new(camel_api::Message::new("hello"));
707        let result = pipeline.call(ex).await;
708        // Pipeline-level result is Ok(ex) — Stop arrives as success (ADR-0024).
709        assert!(result.is_ok(), "expected Ok, got: {:?}", result);
710        assert_eq!(result.unwrap().input.body.as_text(), Some("hello"));
711        assert!(
712            !after_called.load(Ordering::SeqCst),
713            "step after stop should not be called"
714        );
715    }
716
717    #[tokio::test]
718    async fn test_run_steps_stop_produces_pipeline_outcome_stopped() {
719        use camel_api::PipelineOutcome;
720        // A two-step pipeline where the first step is a Stop marker.
721        let steps = vec![
722            CompiledStep::Stop,
723            CompiledStep::Process {
724                processor: BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })),
725                body_contract: None,
726            },
727        ];
728        let ex = Exchange::new(camel_api::Message::new("payload"));
729        let outcome = run_steps(steps, ex, None, false).await;
730        match outcome {
731            PipelineOutcome::Stopped(returned) => {
732                assert_eq!(returned.input.body.as_text(), Some("payload"));
733            }
734            other => panic!(
735                "expected PipelineOutcome::Stopped, got {:?}",
736                other.is_success()
737            ),
738        }
739    }
740
741    #[tokio::test]
742    async fn test_run_steps_stop_bypasses_error_handler() {
743        use camel_api::PipelineOutcome;
744        use camel_api::error_handler::{BoundaryKind, PolicyId, RetryOutcome, StepDisposition};
745        use camel_processor::RouteErrorHandler;
746        use std::sync::atomic::{AtomicUsize, Ordering};
747
748        let handler_invocations = Arc::new(AtomicUsize::new(0));
749        let counter = Arc::clone(&handler_invocations);
750
751        // Handler that records every call. NONE of its methods should be invoked for Stop.
752        struct RecordingHandler {
753            counter: Arc<AtomicUsize>,
754        }
755        #[async_trait::async_trait]
756        impl RouteErrorHandler for RecordingHandler {
757            fn match_policy(&self, _err: &CamelError) -> Option<PolicyId> {
758                self.counter.fetch_add(1, Ordering::SeqCst);
759                None
760            }
761            async fn retry_step(
762                &self,
763                _policy: Option<PolicyId>,
764                _step: &mut camel_api::BoxProcessor,
765                _original: Exchange,
766                _error: CamelError,
767            ) -> RetryOutcome {
768                self.counter.fetch_add(1, Ordering::SeqCst);
769                unreachable!("retry_step must not be called for CompiledStep::Stop")
770            }
771            async fn handle_step(
772                &self,
773                _policy: Option<PolicyId>,
774                _exchange: Exchange,
775                _error: CamelError,
776            ) -> Result<StepDisposition, CamelError> {
777                self.counter.fetch_add(1, Ordering::SeqCst);
778                unreachable!("handle_step must not be called for CompiledStep::Stop")
779            }
780            async fn handle_boundary(
781                &self,
782                _kind: BoundaryKind,
783                _exchange: Exchange,
784                _error: CamelError,
785            ) -> Result<Exchange, CamelError> {
786                self.counter.fetch_add(1, Ordering::SeqCst);
787                unreachable!("handle_boundary must not be called for CompiledStep::Stop")
788            }
789        }
790
791        let steps = vec![CompiledStep::Stop];
792        let ex = Exchange::new(camel_api::Message::new("payload"));
793        let outcome = run_steps(
794            steps,
795            ex,
796            Some(Arc::new(RecordingHandler { counter })),
797            false,
798        )
799        .await;
800
801        assert!(matches!(outcome, PipelineOutcome::Stopped(_)));
802        assert_eq!(
803            handler_invocations.load(Ordering::SeqCst),
804            0,
805            "error handler MUST NOT be invoked for CompiledStep::Stop"
806        );
807    }
808
809    #[tokio::test]
810    async fn test_compose_traced_pipeline_disabled() {
811        let pipeline = compose_traced_pipeline(
812            vec![],
813            "test-route",
814            false,
815            DetailLevel::Minimal,
816            None,
817            None,
818        );
819        let ex = Exchange::new(camel_api::Message::new("hello"));
820        let result = tower::ServiceExt::oneshot(pipeline, ex).await;
821        assert!(result.is_ok());
822    }
823
824    #[tokio::test]
825    async fn test_compose_traced_pipeline_enabled() {
826        let step = BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }));
827        let pipeline = compose_traced_pipeline(
828            vec![CompiledStep::Process {
829                processor: step,
830                body_contract: None,
831            }],
832            "test-route",
833            true,
834            DetailLevel::Minimal,
835            None,
836            None,
837        );
838        let ex = Exchange::new(camel_api::Message::new("hello"));
839        let result = tower::ServiceExt::oneshot(pipeline, ex).await;
840        assert!(result.is_ok());
841    }
842
843    #[tokio::test]
844    async fn test_compose_pipeline_with_contracts_coerces_before_inner_processor() {
845        let seen_body = Arc::new(Mutex::new(None::<Body>));
846        let seen_body_clone = Arc::clone(&seen_body);
847
848        let inner = BoxProcessor::from_fn(move |ex: Exchange| {
849            let seen_body_clone = Arc::clone(&seen_body_clone);
850            Box::pin(async move {
851                *seen_body_clone.lock().expect("lock seen body") = Some(ex.input.body.clone());
852                Ok(ex)
853            })
854        });
855
856        let pipeline = compose_pipeline_with_contracts(
857            vec![CompiledStep::Process {
858                processor: inner,
859                body_contract: Some(camel_api::BodyType::Text),
860            }],
861            None,
862        );
863
864        let mut ex = Exchange::new(Message::default());
865        ex.input.body = Body::Json(json!("hello"));
866
867        let result = tower::ServiceExt::oneshot(pipeline, ex).await;
868        assert!(result.is_ok());
869
870        let observed = seen_body.lock().expect("lock seen body").clone();
871        assert_eq!(observed, Some(Body::Text("hello".to_string())));
872    }
873
874    #[tokio::test]
875    async fn test_run_steps_continued_skips_failed_step() {
876        let step1 = CompiledStep::Process {
877            processor: BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })),
878            body_contract: None,
879        };
880        let step2 = CompiledStep::Process {
881            processor: BoxProcessor::from_fn(|_ex| {
882                Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
883            }),
884            body_contract: None,
885        };
886        let step3_hit = Arc::new(AtomicBool::new(false));
887        let hit = step3_hit.clone();
888        let step3 = CompiledStep::Process {
889            processor: BoxProcessor::from_fn(move |ex| {
890                let hit = hit.clone();
891                Box::pin(async move {
892                    hit.store(true, Ordering::SeqCst);
893                    Ok(ex)
894                })
895            }),
896            body_contract: None,
897        };
898
899        let handler: Arc<dyn RouteErrorHandler> = Arc::new(ContinuedHandler);
900        let outcome = run_steps(
901            vec![step1, step2, step3],
902            make_test_exchange(),
903            Some(handler),
904            false,
905        )
906        .await;
907        assert!(
908            matches!(outcome, PipelineOutcome::Completed(_)),
909            "expected Completed, got: {:?}",
910            outcome.is_success()
911        );
912        assert!(
913            step3_hit.load(Ordering::SeqCst),
914            "step 3 should have executed after continued"
915        );
916    }
917
918    // ── RouteChannelService tests ─────────────────────────────────────────
919
920    #[tokio::test]
921    async fn test_route_channel_pipeline_propagate_returns_err() {
922        let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
923        let failing_step = BoxProcessor::from_fn(|_ex| {
924            Box::pin(async { Err(CamelError::ProcessorError("step boom".into())) })
925        });
926        let pipeline = compose_pipeline_with_handler(
927            vec![CompiledStep::Process {
928                processor: failing_step,
929                body_contract: None,
930            }],
931            Some(handler.clone()),
932        );
933        let channel = RouteChannelService::new(handler.clone(), None, None, pipeline);
934        let mut svc = BoxProcessor::new(channel);
935        let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
936        assert!(result.is_err(), "Propagate should return Err");
937    }
938
939    #[tokio::test]
940    async fn test_route_channel_security_error_calls_boundary() {
941        let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
942        let deny_all = BoxProcessor::from_fn(|_ex| {
943            Box::pin(async { Err(CamelError::Unauthorized("denied".into())) })
944        });
945        let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
946        let channel = RouteChannelService::new(handler.clone(), Some(deny_all), None, pipeline);
947        let mut svc = BoxProcessor::new(channel);
948        let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
949        assert!(
950            result.is_ok(),
951            "boundary errors should be absorbed by handler"
952        );
953    }
954
955    #[tokio::test]
956    async fn test_route_channel_cb_reject_calls_boundary() {
957        let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
958        let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
959            failure_threshold: 1,
960            open_duration: Duration::from_secs(60),
961            success_threshold: 1,
962            fallback: None,
963        });
964        cb_gate.after_result(&Err(CamelError::ProcessorError("force open".into())));
965        let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
966        let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline);
967        let mut svc = BoxProcessor::new(channel);
968        let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
969        assert!(
970            result.is_ok(),
971            "CB reject should call handle_boundary and return Ok"
972        );
973    }
974
975    #[tokio::test]
976    async fn test_route_channel_cb_fallback_executes_fallback() {
977        let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
978        let fallback = BoxProcessor::from_fn(|mut ex| {
979            Box::pin(async move {
980                ex.input.set_header("from_fallback", Value::Bool(true));
981                Ok(ex)
982            })
983        });
984        let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
985            failure_threshold: 1,
986            open_duration: Duration::from_secs(60),
987            success_threshold: 1,
988            fallback: Some(fallback),
989        });
990        cb_gate.after_result(&Err(CamelError::ProcessorError("force open".into())));
991        let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
992        let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline);
993        let mut svc = BoxProcessor::new(channel);
994        let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
995        assert!(result.is_ok(), "fallback should succeed");
996        assert_eq!(
997            result.unwrap().input.header("from_fallback"),
998            Some(&Value::Bool(true)),
999            "should have executed fallback processor",
1000        );
1001    }
1002
1003    #[tokio::test]
1004    async fn test_route_channel_cb_fallback_failure_calls_boundary() {
1005        // CRITICAL: fallback failure must go through handle_boundary, NOT raw Err to upstream.
1006        let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
1007        let failing_fallback = BoxProcessor::from_fn(|_ex| {
1008            Box::pin(async { Err(CamelError::ProcessorError("fallback broken".into())) })
1009        });
1010        let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
1011            failure_threshold: 1,
1012            open_duration: Duration::from_secs(60),
1013            success_threshold: 1,
1014            fallback: Some(failing_fallback),
1015        });
1016        cb_gate.after_result(&Err(CamelError::ProcessorError("force open".into())));
1017
1018        let pipeline = compose_pipeline_with_handler(vec![], Some(handler.clone()));
1019        let channel = RouteChannelService::new(handler.clone(), None, Some(cb_gate), pipeline);
1020
1021        let mut svc = BoxProcessor::new(channel);
1022        let result = svc.ready().await.unwrap().call(make_test_exchange()).await;
1023        // PropagateHandler.handle_boundary returns Ok(ex) — so fallback failure is absorbed
1024        assert!(
1025            result.is_ok(),
1026            "fallback failure should go through handle_boundary, not raw Err"
1027        );
1028    }
1029
1030    #[tokio::test]
1031    async fn test_route_channel_cb_counts_stopped_as_success() {
1032        // ADR-0024 §3.5: PipelineOutcome::Stopped translates to Ok(ex) at the
1033        // Tower boundary. RouteChannelService::call invokes cb.after_result(&result)
1034        // where result = Ok(ex) for Stop. The CB must NOT trip.
1035        let handler: Arc<dyn RouteErrorHandler> = Arc::new(PropagateHandler);
1036        let cb_gate = CircuitBreakerGate::new(CircuitBreakerConfig {
1037            failure_threshold: 2,
1038            open_duration: Duration::from_secs(60),
1039            success_threshold: 1,
1040            fallback: None,
1041        });
1042        let cb_clone = cb_gate.clone();
1043
1044        // Pipeline emits Stop as the only step — top-level uses flatten_stop: true.
1045        let pipeline = compose_pipeline_with_handler(vec![CompiledStep::Stop], None);
1046
1047        let channel = RouteChannelService::new(handler, None, Some(cb_gate), pipeline);
1048
1049        // Two Stop invocations — would trip a 2-failure CB if Stop counted as failure.
1050        let ex1 = Exchange::new(camel_api::Message::new("a"));
1051        let ex2 = Exchange::new(camel_api::Message::new("b"));
1052        let r1 = tower::ServiceExt::oneshot(channel.clone(), ex1).await;
1053        let r2 = tower::ServiceExt::oneshot(channel, ex2).await;
1054        assert!(r1.is_ok(), "Stop must arrive as Ok via RouteChannelService");
1055        assert!(r2.is_ok(), "Stop must arrive as Ok via RouteChannelService");
1056
1057        // CB must still be in Allow state — Stop counted as success.
1058        assert!(
1059            matches!(cb_clone.before_call(), CircuitBreakerDecision::Allow),
1060            "CB should count Stop as success"
1061        );
1062    }
1063}