Skip to main content

camel_core/lifecycle/adapters/
route_compiler.rs

1// adapters/route_compiler.rs
2// Pipeline compilation functions: compose BuilderSteps into a Tower BoxProcessor.
3// Tower types live here as this is the adapter layer responsible for
4// translating declarative route definitions into executable pipelines.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tower::Service;
12
13use camel_api::metrics::MetricsCollector;
14use camel_api::{BoxProcessor, CamelError, Exchange, IdentityProcessor, PipelineOutcome};
15
16use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
17use camel_processor::{
18    CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
19};
20use tracing::Instrument;
21
22use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
23use crate::lifecycle::adapters::step_compilers::CompiledStep;
24use crate::shared::observability::adapters::TracingProcessor;
25use crate::shared::observability::domain::DetailLevel;
26
27// Re-export outcome composition types so existing step_compiler import paths
28// (`route_compiler::BoxProcessorSegment`, etc.) continue to work.
29pub(crate) use super::outcome_composition::{
30    BodyCoercingSegment, BoxProcessorSegment, StopSegment, compose_outcome_segment,
31};
32
33/// Compose a list of CompiledSteps into a sub-pipeline (EIP internal).
34///
35/// Uses `into_tower_result()` so `PipelineOutcome::Stopped` maps to `Ok(ex)`.
36/// Use [`compose_pipeline_with_handler`] for the top-level consumer-facing pipeline.
37pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
38    if processors.is_empty() {
39        return BoxProcessor::new(IdentityProcessor);
40    }
41    BoxProcessor::new(SequentialPipeline {
42        steps: processors,
43        handler: None,
44    })
45}
46
47/// Compose a list of CompiledSteps with an optional route error handler.
48///
49/// When a handler is present, step readiness errors are swallowed (poll_ready
50/// returns Ready) and the handler's retry/recovery logic is invoked on step
51/// failures. Otherwise, step readiness errors propagate immediately.
52pub fn compose_pipeline_with_handler(
53    processors: Vec<CompiledStep>,
54    handler: Option<Arc<dyn RouteErrorHandler>>,
55) -> BoxProcessor {
56    if processors.is_empty() {
57        return BoxProcessor::new(IdentityProcessor);
58    }
59    BoxProcessor::new(SequentialPipeline {
60        steps: processors,
61        handler,
62    })
63}
64
65/// Compose a list of CompiledSteps into a traced pipeline with Stop→Ok translation.
66///
67/// Each processor is wrapped with TracingProcessor to emit spans for observability.
68/// When tracing is disabled, falls back to [`compose_pipeline_with_handler`] with zero overhead.
69pub fn compose_traced_pipeline(
70    processors: Vec<CompiledStep>,
71    route_id: &str,
72    trace_enabled: bool,
73    detail_level: DetailLevel,
74    metrics: Option<Arc<dyn MetricsCollector>>,
75    handler: Option<Arc<dyn RouteErrorHandler>>,
76) -> BoxProcessor {
77    if !trace_enabled {
78        return compose_pipeline_with_handler(processors, handler);
79    }
80
81    if processors.is_empty() {
82        return BoxProcessor::new(IdentityProcessor);
83    }
84
85    let wrapped: Vec<CompiledStep> = processors
86        .into_iter()
87        .enumerate()
88        .map(|(idx, step)| {
89            let (p, c) = match step {
90                CompiledStep::Process {
91                    processor,
92                    body_contract,
93                } => (processor, body_contract),
94                CompiledStep::Stop => return CompiledStep::Stop,
95                CompiledStep::Segment { .. } => return step,
96            };
97            let traced = BoxProcessor::new(TracingProcessor::new(
98                p,
99                route_id.to_string(),
100                idx,
101                detail_level.clone(),
102                metrics.clone(),
103            ));
104            CompiledStep::Process {
105                processor: traced,
106                body_contract: c,
107            }
108        })
109        .collect();
110
111    BoxProcessor::new(TracedPipeline {
112        steps: wrapped,
113        handler,
114    })
115}
116
117/// Compose a list of `CompiledStep` items into a single pipeline with body coercion.
118///
119/// Each processor is optionally wrapped with [`BodyCoercingProcessor`] based on its
120/// contract. Processors with `None` contract are passed through with zero overhead.
121/// `CompiledStep::Stop` passes through without coercion.
122pub fn compose_pipeline_with_contracts(
123    processors: Vec<CompiledStep>,
124    handler: Option<Arc<dyn RouteErrorHandler>>,
125) -> BoxProcessor {
126    let wrapped: Vec<CompiledStep> = processors
127        .into_iter()
128        .map(|step| match step {
129            CompiledStep::Process {
130                processor,
131                body_contract,
132            } => {
133                let coerced = wrap_if_needed(processor, body_contract);
134                CompiledStep::Process {
135                    processor: coerced,
136                    body_contract: None,
137                }
138            }
139            CompiledStep::Stop => CompiledStep::Stop,
140            CompiledStep::Segment { .. } => step,
141        })
142        .collect();
143    compose_pipeline_with_handler(wrapped, handler)
144}
145
146/// Compose a list of `CompiledStep` items into a traced pipeline with body coercion.
147///
148/// Applies body coercion contracts first, then wraps with `TracingProcessor`.
149/// When tracing is disabled, falls back to [`compose_pipeline_with_contracts`].
150pub(crate) fn compose_traced_pipeline_with_contracts(
151    processors: Vec<CompiledStep>,
152    route_id: &str,
153    trace_enabled: bool,
154    detail_level: DetailLevel,
155    metrics: Option<Arc<dyn MetricsCollector>>,
156    handler: Option<Arc<dyn RouteErrorHandler>>,
157) -> BoxProcessor {
158    if !trace_enabled {
159        return compose_pipeline_with_contracts(processors, handler);
160    }
161
162    if processors.is_empty() {
163        return BoxProcessor::new(IdentityProcessor);
164    }
165
166    let wrapped: Vec<CompiledStep> = processors
167        .into_iter()
168        .enumerate()
169        .map(|(idx, step)| match step {
170            CompiledStep::Process {
171                processor,
172                body_contract,
173            } => {
174                let coerced = wrap_if_needed(processor, body_contract);
175                let traced = BoxProcessor::new(TracingProcessor::new(
176                    coerced,
177                    route_id.to_string(),
178                    idx,
179                    detail_level.clone(),
180                    metrics.clone(),
181                ));
182                CompiledStep::Process {
183                    processor: traced,
184                    body_contract: None,
185                }
186            }
187            CompiledStep::Stop => CompiledStep::Stop,
188            CompiledStep::Segment { .. } => step,
189        })
190        .collect();
191
192    BoxProcessor::new(TracedPipeline {
193        steps: wrapped,
194        handler,
195    })
196}
197
198/// A service that executes a sequence of CompiledSteps in order.
199///
200/// Uses `into_tower_result()` so `PipelineOutcome::Stopped(ex)` maps to
201/// `Ok(ex)` — the Bug B fix that makes Stop indistinguishable from Completed
202/// at the consumer boundary.
203#[derive(Clone)]
204struct SequentialPipeline {
205    steps: Vec<CompiledStep>,
206    handler: Option<Arc<dyn RouteErrorHandler>>,
207}
208
209impl Service<Exchange> for SequentialPipeline {
210    type Response = Exchange;
211    type Error = CamelError;
212    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
213
214    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
215        match self.steps.first() {
216            Some(CompiledStep::Process { processor, .. }) => {
217                let mut proc = processor.clone();
218                match proc.poll_ready(cx) {
219                    Poll::Pending => Poll::Pending,
220                    Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
221                    Poll::Ready(other) => Poll::Ready(other),
222                }
223            }
224            Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
225            Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
226            None => Poll::Ready(Ok(())),
227        }
228    }
229
230    // ADR-0024 reply-channel adapter: PipelineOutcome → Result<Exchange, CamelError>.
231    // Completed(ex) and Stopped(ex) both map to Ok(ex); Failed(err) maps to Err.
232    // Downstream consumers (RouteChannelService, ExchangeUoWLayer, HTTP/Kafka reply
233    // finalisers) see Result<Exchange, CamelError> and treat Stop as success.
234    fn call(&mut self, exchange: Exchange) -> Self::Future {
235        let steps = self.steps.clone();
236        let handler = self.handler.clone();
237        Box::pin(async move {
238            let outcome = run_steps(steps, exchange, handler, false).await;
239            outcome.into_tower_result()
240        })
241    }
242}
243
244/// A traced service pipeline for wrapped CompiledSteps.
245#[derive(Clone)]
246struct TracedPipeline {
247    steps: Vec<CompiledStep>,
248    handler: Option<Arc<dyn RouteErrorHandler>>,
249}
250
251impl Service<Exchange> for TracedPipeline {
252    type Response = Exchange;
253    type Error = CamelError;
254    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
255
256    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
257        match self.steps.first() {
258            Some(CompiledStep::Process { processor, .. }) => {
259                let mut proc = processor.clone();
260                match proc.poll_ready(cx) {
261                    Poll::Pending => Poll::Pending,
262                    Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
263                    Poll::Ready(other) => Poll::Ready(other),
264                }
265            }
266            Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
267            Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
268            None => Poll::Ready(Ok(())),
269        }
270    }
271
272    // ADR-0024 reply-channel adapter (same as SequentialPipeline::call):
273    // Completed(ex) and Stopped(ex) both map to Ok(ex). Bug B fix.
274    fn call(&mut self, exchange: Exchange) -> Self::Future {
275        let steps = self.steps.clone();
276        let handler = self.handler.clone();
277        Box::pin(async move {
278            let outcome = run_steps(steps, exchange, handler, true).await;
279            outcome.into_tower_result()
280        })
281    }
282}
283
284/// Run a sequence of CompiledSteps with optional error recovery.
285///
286/// Each step is unified under `Box<dyn RetryableStep>` — both Process and
287/// Segment variants are treated uniformly. On failure:
288/// 1. If a handler is present, `match_policy` selects a retry policy.
289/// 2. `retry_step` attempts recovery; if exhausted, `handle_step` determines
290///    the disposition:
291///    - `Propagate` — return the error
292///    - `Handled` — return the exchange early (success)
293///    - `Continued` — clear the error and continue to the next step
294/// 3. If no handler is present, the error is propagated directly.
295///
296/// CompiledStep::Stop short-circuits to `PipelineOutcome::Stopped(ex)` — the
297/// handler is bypassed and no Tower service is invoked (ADR-0024 §3.5).
298pub async fn run_steps(
299    steps: Vec<CompiledStep>,
300    exchange: Exchange,
301    handler: Option<Arc<dyn RouteErrorHandler>>,
302    trace: bool,
303) -> PipelineOutcome {
304    use camel_api::error_handler::RetryableStep;
305    let mut ex = exchange;
306    for (i, step) in steps.into_iter().enumerate() {
307        let (mut retryable, _body_contract): (Box<dyn RetryableStep>, _) = match step {
308            CompiledStep::Stop => return PipelineOutcome::Stopped(ex),
309            CompiledStep::Process {
310                processor,
311                body_contract,
312            } => {
313                let boxed: Box<dyn RetryableStep> = Box::new(processor);
314                (boxed, body_contract)
315            }
316            CompiledStep::Segment {
317                segment,
318                body_contract,
319            } => {
320                let boxed: Box<dyn RetryableStep> = Box::new(segment);
321                (boxed, body_contract)
322            }
323        };
324
325        let original = ex.clone();
326        let outcome = if trace {
327            invoke_with_span(&mut retryable, ex, i).await
328        } else {
329            retryable.invoke(ex).await
330        };
331
332        match outcome {
333            PipelineOutcome::Completed(next) => {
334                ex = next;
335            }
336            PipelineOutcome::Stopped(stopped_ex) => {
337                return PipelineOutcome::Stopped(stopped_ex);
338            }
339            PipelineOutcome::Failed(err) => {
340                let Some(handler) = handler.as_ref() else {
341                    return PipelineOutcome::Failed(err);
342                };
343                let policy = handler.match_policy(&err);
344                match handler
345                    .retry_step(policy, retryable.as_mut(), original, err)
346                    .await
347                {
348                    RetryOutcome::Recovered(exchange) => {
349                        ex = exchange;
350                    }
351                    RetryOutcome::Stopped(stopped_ex) => {
352                        return PipelineOutcome::Stopped(stopped_ex);
353                    }
354                    RetryOutcome::Exhausted {
355                        exchange,
356                        error,
357                        policy,
358                    } => {
359                        let disposition = if trace {
360                            handler
361                                .handle_step(policy, exchange, error)
362                                .instrument(tracing::debug_span!("error_handler", step_index = i))
363                                .await
364                        } else {
365                            handler.handle_step(policy, exchange, error).await
366                        };
367                        match disposition {
368                            Ok(StepDisposition::Propagate(e)) => {
369                                return PipelineOutcome::Failed(e);
370                            }
371                            Ok(StepDisposition::Handled(done)) => {
372                                return PipelineOutcome::Completed(done);
373                            }
374                            Ok(StepDisposition::Continued(next)) => {
375                                ex = next;
376                            }
377                            Err(e) => return PipelineOutcome::Failed(e),
378                        }
379                    }
380                }
381            }
382        }
383    }
384    PipelineOutcome::Completed(ex)
385}
386
387async fn invoke_with_span(
388    retryable: &mut Box<dyn camel_api::error_handler::RetryableStep>,
389    exchange: Exchange,
390    idx: usize,
391) -> PipelineOutcome {
392    retryable
393        .invoke(exchange)
394        .instrument(tracing::debug_span!("pipeline_step", index = idx))
395        .await
396}
397
398/// Route channel with explicit security and circuit-breaker gates.
399///
400/// Gate order: Security → CB(before_call) → Pipeline → CB(after_result).
401/// Errors from Security/CB gates go to `handler.handle_boundary`.
402/// Errors from Pipeline go through the injected handler's retry/handle_step.
403/// Pipeline Propagate returns Err — passed through to upstream.
404#[derive(Clone)]
405pub struct RouteChannelService {
406    handler: Arc<dyn RouteErrorHandler>,
407    security: Option<BoxProcessor>,
408    cb_gate: Option<CircuitBreakerGate>,
409    pipeline: BoxProcessor,
410}
411
412impl RouteChannelService {
413    pub fn new(
414        handler: Arc<dyn RouteErrorHandler>,
415        security: Option<BoxProcessor>,
416        cb_gate: Option<CircuitBreakerGate>,
417        pipeline: BoxProcessor,
418    ) -> Self {
419        Self {
420            handler,
421            security,
422            cb_gate,
423            pipeline,
424        }
425    }
426}
427
428impl Service<Exchange> for RouteChannelService {
429    type Response = Exchange;
430    type Error = CamelError;
431    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
432
433    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
434        // Swallow readiness errors from security gate — deferred to call()
435        if let Some(ref mut sec) = self.security {
436            match sec.clone().poll_ready(cx) {
437                Poll::Pending => return Poll::Pending,
438                Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
439            }
440        }
441        // Pipeline readiness — swallow errors when handler present
442        match self.pipeline.clone().poll_ready(cx) {
443            Poll::Pending => return Poll::Pending,
444            Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
445        }
446        Poll::Ready(Ok(()))
447    }
448
449    fn call(&mut self, exchange: Exchange) -> Self::Future {
450        let handler = self.handler.clone();
451        let security = self.security.clone();
452        let cb_gate = self.cb_gate.clone();
453        let mut pipeline = self.pipeline.clone();
454
455        Box::pin(async move {
456            let mut ex = exchange;
457
458            // Gate 1: Security
459            if let Some(mut sec) = security {
460                let original = ex.clone();
461                match invoke_processor(&mut sec, ex).await {
462                    Ok(next) => ex = next,
463                    Err(err) => {
464                        return handler
465                            .handle_boundary(BoundaryKind::Security, original, err)
466                            .await;
467                    }
468                }
469            }
470
471            // Gate 2: CircuitBreaker — before_call
472            if let Some(ref cb) = cb_gate {
473                match cb.before_call() {
474                    CircuitBreakerDecision::Allow => { /* proceed to pipeline */ }
475                    CircuitBreakerDecision::Fallback(mut fb) => {
476                        // Circuit open with fallback — call fallback.
477                        // Fallback errors go through handle_boundary, not raw to upstream.
478                        let original = ex.clone();
479                        match invoke_processor(&mut fb, ex).await {
480                            Ok(result) => return Ok(result),
481                            Err(err) => {
482                                return handler
483                                    .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
484                                    .await;
485                            }
486                        }
487                    }
488                    CircuitBreakerDecision::Reject(err) => {
489                        let original = ex.clone();
490                        return handler
491                            .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
492                            .await;
493                    }
494                }
495            }
496
497            // Pipeline (handler already injected for step errors)
498            let result = invoke_processor(&mut pipeline, ex).await;
499
500            // Gate 2: CircuitBreaker — after_result
501            if let Some(ref cb) = cb_gate {
502                cb.after_result(&result);
503            }
504
505            // Propagate from inner handler — pass through to upstream
506            result
507        })
508    }
509}
510
511#[cfg(test)]
512#[path = "route_compiler_tests.rs"]
513mod tests;