Skip to main content

camel_core/lifecycle/adapters/
route_compiler.rs

1// adapters/route_compiler.rs
2// Pipeline compilation functions: compose BuilderSteps into a Tower BoxProcessor.
3// Tower types live here as this is the adapter layer responsible for
4// translating declarative route definitions into executable pipelines.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10
11use tower::Service;
12
13use camel_api::metrics::MetricsCollector;
14use camel_api::{
15    BoxProcessor, CamelError, Exchange, IdentityProcessor, Message, ORIGINAL_MESSAGE_EXTENSION,
16    PipelineOutcome,
17};
18
19use camel_api::error_handler::{BoundaryKind, RetryOutcome, StepDisposition};
20use camel_processor::{
21    CircuitBreakerDecision, CircuitBreakerGate, RouteErrorHandler, invoke_processor,
22};
23use tracing::Instrument;
24
25use crate::lifecycle::adapters::body_coercing::wrap_if_needed;
26use crate::lifecycle::adapters::step_compilers::CompiledStep;
27use crate::shared::observability::adapters::TracingProcessor;
28use crate::shared::observability::domain::DetailLevel;
29
30// Re-export outcome composition types so existing step_compiler import paths
31// (`route_compiler::BoxProcessorSegment`, etc.) continue to work.
32pub(crate) use super::outcome_composition::{
33    BodyCoercingSegment, BoxProcessorSegment, StopSegment, compose_outcome_segment,
34};
35
36/// Compose a list of CompiledSteps into a sub-pipeline (EIP internal).
37///
38/// Uses `into_tower_result()` so `PipelineOutcome::Stopped` maps to `Ok(ex)`.
39/// Use [`compose_pipeline_with_handler`] for the top-level consumer-facing pipeline.
40pub fn compose_pipeline(processors: Vec<CompiledStep>) -> BoxProcessor {
41    if processors.is_empty() {
42        return BoxProcessor::new(IdentityProcessor);
43    }
44    BoxProcessor::new(SequentialPipeline {
45        steps: processors,
46        handler: None,
47    })
48}
49
50/// Compose a list of CompiledSteps with an optional route error handler.
51///
52/// When a handler is present, step readiness errors are swallowed (poll_ready
53/// returns Ready) and the handler's retry/recovery logic is invoked on step
54/// failures. Otherwise, step readiness errors propagate immediately.
55pub fn compose_pipeline_with_handler(
56    processors: Vec<CompiledStep>,
57    handler: Option<Arc<dyn RouteErrorHandler>>,
58) -> BoxProcessor {
59    if processors.is_empty() {
60        return BoxProcessor::new(IdentityProcessor);
61    }
62    BoxProcessor::new(SequentialPipeline {
63        steps: processors,
64        handler,
65    })
66}
67
68/// Compose a list of CompiledSteps into a traced pipeline with Stop→Ok translation.
69///
70/// Each processor is wrapped with TracingProcessor to emit spans for observability.
71/// When tracing is disabled, falls back to [`compose_pipeline_with_handler`] with zero overhead.
72pub fn compose_traced_pipeline(
73    processors: Vec<CompiledStep>,
74    route_id: &str,
75    trace_enabled: bool,
76    detail_level: DetailLevel,
77    metrics: Option<Arc<dyn MetricsCollector>>,
78    handler: Option<Arc<dyn RouteErrorHandler>>,
79) -> BoxProcessor {
80    if !trace_enabled {
81        return compose_pipeline_with_handler(processors, handler);
82    }
83
84    if processors.is_empty() {
85        return BoxProcessor::new(IdentityProcessor);
86    }
87
88    let wrapped: Vec<CompiledStep> = processors
89        .into_iter()
90        .enumerate()
91        .map(|(idx, step)| {
92            let (p, c, lc) = match step {
93                CompiledStep::Process {
94                    processor,
95                    body_contract,
96                    lifecycle,
97                } => (processor, body_contract, lifecycle),
98                CompiledStep::Stop => return CompiledStep::Stop,
99                CompiledStep::Segment { .. } => return step,
100            };
101            let traced = BoxProcessor::new(TracingProcessor::new(
102                p,
103                route_id.to_string(),
104                idx,
105                detail_level.clone(),
106                metrics.clone(),
107            ));
108            CompiledStep::Process {
109                processor: traced,
110                body_contract: c,
111                lifecycle: lc,
112            }
113        })
114        .collect();
115
116    BoxProcessor::new(TracedPipeline {
117        steps: wrapped,
118        handler,
119    })
120}
121
122/// Compose a list of `CompiledStep` items into a single pipeline with body coercion.
123///
124/// Each processor is optionally wrapped with [`BodyCoercingProcessor`] based on its
125/// contract. Processors with `None` contract are passed through with zero overhead.
126/// `CompiledStep::Stop` passes through without coercion.
127pub fn compose_pipeline_with_contracts(
128    processors: Vec<CompiledStep>,
129    handler: Option<Arc<dyn RouteErrorHandler>>,
130) -> BoxProcessor {
131    let wrapped: Vec<CompiledStep> = processors
132        .into_iter()
133        .map(|step| match step {
134            CompiledStep::Process {
135                processor,
136                body_contract,
137                lifecycle,
138            } => {
139                let coerced = wrap_if_needed(processor, body_contract);
140                CompiledStep::Process {
141                    processor: coerced,
142                    body_contract: None,
143                    lifecycle,
144                }
145            }
146            CompiledStep::Stop => CompiledStep::Stop,
147            CompiledStep::Segment { .. } => step,
148        })
149        .collect();
150    compose_pipeline_with_handler(wrapped, handler)
151}
152
153/// Compose a list of `CompiledStep` items into a traced pipeline with body coercion.
154///
155/// Applies body coercion contracts first, then wraps with `TracingProcessor`.
156/// When tracing is disabled, falls back to [`compose_pipeline_with_contracts`].
157pub(crate) fn compose_traced_pipeline_with_contracts(
158    processors: Vec<CompiledStep>,
159    route_id: &str,
160    trace_enabled: bool,
161    detail_level: DetailLevel,
162    metrics: Option<Arc<dyn MetricsCollector>>,
163    handler: Option<Arc<dyn RouteErrorHandler>>,
164) -> BoxProcessor {
165    if !trace_enabled {
166        return compose_pipeline_with_contracts(processors, handler);
167    }
168
169    if processors.is_empty() {
170        return BoxProcessor::new(IdentityProcessor);
171    }
172
173    let wrapped: Vec<CompiledStep> = processors
174        .into_iter()
175        .enumerate()
176        .map(|(idx, step)| match step {
177            CompiledStep::Process {
178                processor,
179                body_contract,
180                lifecycle,
181            } => {
182                let coerced = wrap_if_needed(processor, body_contract);
183                let traced = BoxProcessor::new(TracingProcessor::new(
184                    coerced,
185                    route_id.to_string(),
186                    idx,
187                    detail_level.clone(),
188                    metrics.clone(),
189                ));
190                CompiledStep::Process {
191                    processor: traced,
192                    body_contract: None,
193                    lifecycle,
194                }
195            }
196            CompiledStep::Stop => CompiledStep::Stop,
197            CompiledStep::Segment { .. } => step,
198        })
199        .collect();
200
201    BoxProcessor::new(TracedPipeline {
202        steps: wrapped,
203        handler,
204    })
205}
206
207/// A service that executes a sequence of CompiledSteps in order.
208///
209/// Uses `into_tower_result()` so `PipelineOutcome::Stopped(ex)` maps to
210/// `Ok(ex)` — the Bug B fix that makes Stop indistinguishable from Completed
211/// at the consumer boundary.
212#[derive(Clone)]
213struct SequentialPipeline {
214    steps: Vec<CompiledStep>,
215    handler: Option<Arc<dyn RouteErrorHandler>>,
216}
217
218impl Service<Exchange> for SequentialPipeline {
219    type Response = Exchange;
220    type Error = CamelError;
221    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
222
223    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
224        match self.steps.first() {
225            Some(CompiledStep::Process { processor, .. }) => {
226                let mut proc = processor.clone();
227                match proc.poll_ready(cx) {
228                    Poll::Pending => Poll::Pending,
229                    Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
230                    Poll::Ready(other) => Poll::Ready(other),
231                }
232            }
233            Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
234            Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
235            None => Poll::Ready(Ok(())),
236        }
237    }
238
239    // ADR-0024 reply-channel adapter: PipelineOutcome → Result<Exchange, CamelError>.
240    // Completed(ex) and Stopped(ex) both map to Ok(ex); Failed(err) maps to Err.
241    // Downstream consumers (RouteChannelService, ExchangeUoWLayer, HTTP/Kafka reply
242    // finalisers) see Result<Exchange, CamelError> and treat Stop as success.
243    fn call(&mut self, exchange: Exchange) -> Self::Future {
244        let steps = self.steps.clone();
245        let handler = self.handler.clone();
246        Box::pin(async move {
247            let outcome = run_steps(steps, exchange, handler, false).await;
248            outcome.into_tower_result()
249        })
250    }
251}
252
253/// A traced service pipeline for wrapped CompiledSteps.
254#[derive(Clone)]
255struct TracedPipeline {
256    steps: Vec<CompiledStep>,
257    handler: Option<Arc<dyn RouteErrorHandler>>,
258}
259
260impl Service<Exchange> for TracedPipeline {
261    type Response = Exchange;
262    type Error = CamelError;
263    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
264
265    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
266        match self.steps.first() {
267            Some(CompiledStep::Process { processor, .. }) => {
268                let mut proc = processor.clone();
269                match proc.poll_ready(cx) {
270                    Poll::Pending => Poll::Pending,
271                    Poll::Ready(Err(_)) if self.handler.is_some() => Poll::Ready(Ok(())),
272                    Poll::Ready(other) => Poll::Ready(other),
273                }
274            }
275            Some(CompiledStep::Stop) => Poll::Ready(Ok(())),
276            Some(CompiledStep::Segment { .. }) => Poll::Ready(Ok(())),
277            None => Poll::Ready(Ok(())),
278        }
279    }
280
281    // ADR-0024 reply-channel adapter (same as SequentialPipeline::call):
282    // Completed(ex) and Stopped(ex) both map to Ok(ex). Bug B fix.
283    fn call(&mut self, exchange: Exchange) -> Self::Future {
284        let steps = self.steps.clone();
285        let handler = self.handler.clone();
286        Box::pin(async move {
287            let outcome = run_steps(steps, exchange, handler, true).await;
288            outcome.into_tower_result()
289        })
290    }
291}
292
293/// Run a sequence of CompiledSteps with optional error recovery.
294///
295/// Each step is unified under `Box<dyn RetryableStep>` — both Process and
296/// Segment variants are treated uniformly. On failure:
297/// 1. If a handler is present, `match_policy` selects a retry policy.
298/// 2. `retry_step` attempts recovery; if exhausted, `handle_step` determines
299///    the disposition:
300///    - `Propagate` — return the error
301///    - `Handled` — return the exchange early (success)
302///    - `Continued` — clear the error and continue to the next step
303/// 3. If no handler is present, the error is propagated directly.
304///
305/// CompiledStep::Stop short-circuits to `PipelineOutcome::Stopped(ex)` — the
306/// handler is bypassed and no Tower service is invoked (ADR-0024 §3.5).
307pub async fn run_steps(
308    steps: Vec<CompiledStep>,
309    exchange: Exchange,
310    handler: Option<Arc<dyn RouteErrorHandler>>,
311    trace: bool,
312) -> PipelineOutcome {
313    use camel_api::error_handler::RetryableStep;
314    let mut ex = exchange;
315    for (i, step) in steps.into_iter().enumerate() {
316        let (mut retryable, _body_contract): (Box<dyn RetryableStep>, _) = match step {
317            CompiledStep::Stop => return PipelineOutcome::Stopped(ex),
318            CompiledStep::Process {
319                processor,
320                body_contract,
321                ..
322            } => {
323                let boxed: Box<dyn RetryableStep> = Box::new(processor);
324                (boxed, body_contract)
325            }
326            CompiledStep::Segment {
327                segment,
328                body_contract,
329                ..
330            } => {
331                let boxed: Box<dyn RetryableStep> = Box::new(segment);
332                (boxed, body_contract)
333            }
334        };
335
336        let original = ex.clone();
337        let outcome = if trace {
338            invoke_with_span(&mut retryable, ex, i).await
339        } else {
340            retryable.invoke(ex).await
341        };
342
343        match outcome {
344            PipelineOutcome::Completed(next) => {
345                if camel_api::is_camel_stop(&next) {
346                    return PipelineOutcome::Stopped(next);
347                }
348                ex = next;
349            }
350            PipelineOutcome::Stopped(stopped_ex) => {
351                return PipelineOutcome::Stopped(stopped_ex);
352            }
353            PipelineOutcome::Failed(err) => {
354                let Some(handler) = handler.as_ref() else {
355                    return PipelineOutcome::Failed(err);
356                };
357                let policy = handler.match_policy(&err);
358                match handler
359                    .retry_step(policy, retryable.as_mut(), original, err)
360                    .await
361                {
362                    RetryOutcome::Recovered(exchange) => {
363                        ex = exchange;
364                    }
365                    RetryOutcome::Stopped(stopped_ex) => {
366                        return PipelineOutcome::Stopped(stopped_ex);
367                    }
368                    RetryOutcome::Exhausted {
369                        exchange,
370                        error,
371                        policy,
372                    } => {
373                        let disposition = if trace {
374                            handler
375                                .handle_step(policy, exchange, error)
376                                .instrument(tracing::debug_span!("error_handler", step_index = i))
377                                .await
378                        } else {
379                            handler.handle_step(policy, exchange, error).await
380                        };
381                        match disposition {
382                            Ok(StepDisposition::Propagate(e)) => {
383                                return PipelineOutcome::Failed(e);
384                            }
385                            Ok(StepDisposition::Handled(done)) => {
386                                return PipelineOutcome::Completed(done);
387                            }
388                            Ok(StepDisposition::Continued(next)) => {
389                                ex = next;
390                            }
391                            Err(e) => return PipelineOutcome::Failed(e),
392                        }
393                    }
394                }
395            }
396        }
397    }
398    PipelineOutcome::Completed(ex)
399}
400
401async fn invoke_with_span(
402    retryable: &mut Box<dyn camel_api::error_handler::RetryableStep>,
403    exchange: Exchange,
404    idx: usize,
405) -> PipelineOutcome {
406    retryable
407        .invoke(exchange)
408        .instrument(tracing::debug_span!("pipeline_step", index = idx))
409        .await
410}
411
412/// Route channel with explicit security and circuit-breaker gates.
413///
414/// Gate order: Security → CB(before_call) → Pipeline → CB(after_result).
415/// Errors from Security/CB gates go to `handler.handle_boundary`.
416/// Errors from Pipeline go through the injected handler's retry/handle_step.
417/// Pipeline Propagate returns Err — passed through to upstream.
418#[derive(Clone)]
419pub struct RouteChannelService {
420    handler: Arc<dyn RouteErrorHandler>,
421    security: Option<BoxProcessor>,
422    cb_gate: Option<CircuitBreakerGate>,
423    pipeline: BoxProcessor,
424    /// When true, stash the original Message as `ORIGINAL_MESSAGE_EXTENSION`
425    /// before any gate runs, so the error handler can restore it on failure.
426    use_original_message: bool,
427}
428
429impl RouteChannelService {
430    pub fn new(
431        handler: Arc<dyn RouteErrorHandler>,
432        security: Option<BoxProcessor>,
433        cb_gate: Option<CircuitBreakerGate>,
434        pipeline: BoxProcessor,
435        use_original_message: bool,
436    ) -> Self {
437        Self {
438            handler,
439            security,
440            cb_gate,
441            pipeline,
442            use_original_message,
443        }
444    }
445}
446
447impl Service<Exchange> for RouteChannelService {
448    type Response = Exchange;
449    type Error = CamelError;
450    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
451
452    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
453        // Swallow readiness errors from security gate — deferred to call()
454        if let Some(ref mut sec) = self.security {
455            match sec.clone().poll_ready(cx) {
456                Poll::Pending => return Poll::Pending,
457                Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
458            }
459        }
460        // Pipeline readiness — swallow errors when handler present
461        match self.pipeline.clone().poll_ready(cx) {
462            Poll::Pending => return Poll::Pending,
463            Poll::Ready(Err(_)) | Poll::Ready(Ok(())) => {}
464        }
465        Poll::Ready(Ok(()))
466    }
467
468    fn call(&mut self, exchange: Exchange) -> Self::Future {
469        let handler = self.handler.clone();
470        let security = self.security.clone();
471        let cb_gate = self.cb_gate.clone();
472        let mut pipeline = self.pipeline.clone();
473        let use_original_message = self.use_original_message;
474
475        Box::pin(async move {
476            let mut ex = exchange;
477
478            // Stash original message for use_original_message support.
479            // Done BEFORE any gate so the DLC can restore the pre-route message.
480            // Only stashes when the flag is true to avoid perf regression on every Exchange.
481            if use_original_message {
482                let original: Arc<Message> = Arc::new(ex.input.clone());
483                ex.set_extension(ORIGINAL_MESSAGE_EXTENSION, original);
484            }
485
486            // Gate 1: Security
487            if let Some(mut sec) = security {
488                let original = ex.clone();
489                match invoke_processor(&mut sec, ex).await {
490                    Ok(next) => ex = next,
491                    Err(err) => {
492                        return handler
493                            .handle_boundary(BoundaryKind::Security, original, err)
494                            .await;
495                    }
496                }
497            }
498
499            // Gate 2: CircuitBreaker — before_call
500            if let Some(ref cb) = cb_gate {
501                match cb.before_call() {
502                    CircuitBreakerDecision::Allow => { /* proceed to pipeline */ }
503                    CircuitBreakerDecision::Fallback(mut fb) => {
504                        // Circuit open with fallback — call fallback.
505                        // Fallback errors go through handle_boundary, not raw to upstream.
506                        let original = ex.clone();
507                        match invoke_processor(&mut fb, ex).await {
508                            Ok(result) => return Ok(result),
509                            Err(err) => {
510                                return handler
511                                    .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
512                                    .await;
513                            }
514                        }
515                    }
516                    CircuitBreakerDecision::Reject(err) => {
517                        let original = ex.clone();
518                        return handler
519                            .handle_boundary(BoundaryKind::CircuitBreaker, original, err)
520                            .await;
521                    }
522                }
523            }
524
525            // Pipeline (handler already injected for step errors)
526            let result = invoke_processor(&mut pipeline, ex).await;
527
528            // Gate 2: CircuitBreaker — after_result
529            if let Some(ref cb) = cb_gate {
530                cb.after_result(&result);
531            }
532
533            // Propagate from inner handler — pass through to upstream
534            result
535        })
536    }
537}
538
539#[cfg(test)]
540#[path = "route_compiler_tests.rs"]
541mod tests;