Skip to main content

camel_core/lifecycle/adapters/step_compilers/
mod.rs

1//! StepCompiler registry pattern — extract from step_resolution.rs
2//!
3//! Each compiler group is responsible for matching specific `BuilderStep` variants.
4//! The registry dispatches each step to compilers in registration order; the first
5//! compiler that returns `Matched` wins. Compilers that don't handle a variant
6//! return `NotHandled(step)` to pass it to the next compiler.
7
8use std::sync::Arc;
9
10use camel_api::{BodyType, BoxProcessor, CamelError, FunctionInvoker, ProducerContext};
11use camel_component_api::{ComponentContext, RuntimeObservability};
12use camel_endpoint::parse_uri;
13
14use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
15use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
16use crate::lifecycle::application::route_definition::BuilderStep;
17use camel_bean::BeanRegistry;
18
19mod control_flow;
20mod core;
21mod endpoints;
22mod error_handling;
23mod routing;
24mod splitting;
25mod transforms;
26
27/// A compiled pipeline step.
28///
29/// `Process` is the normal case: a boxed processor plus its optional body
30/// contract. `Stop` (added in Task 3b) is the Stop EIP marker — `run_steps`
31/// recognises it and produces `PipelineOutcome::Stopped` without invoking a
32/// Tower service.
33///
34/// **Boundary:** `CompiledStep` is the compile-time representation. At runtime,
35/// `run_steps` consumes a `Vec<CompiledStep>` (Stop variants included) and
36/// produces a `PipelineOutcome`; the wrapping `Service<Exchange>` impl
37/// translates `PipelineOutcome` back to `Result<Exchange, CamelError>`. See
38/// ADR-0024.
39#[derive(Debug, Clone)]
40pub enum CompiledStep {
41    Process {
42        processor: BoxProcessor,
43        body_contract: Option<BodyType>,
44    },
45    /// Stop EIP marker. `run_steps` produces `PipelineOutcome::Stopped(ex)`
46    /// without invoking a Tower service. Replaces `StopService` (Task 7).
47    Stop,
48}
49
50/// Result from a compiler: either it handled the step (with success or error),
51/// or it did not recognize the variant and returns the step for the next compiler.
52pub(crate) enum StepCompileResult {
53    Matched(Result<CompiledStep, CamelError>),
54    NotHandled(BuilderStep),
55}
56
57/// A compiler that can handle one or more `BuilderStep` variants.
58///
59/// The `compile` method receives ownership of the step. If the compiler recognizes
60/// the variant it returns `StepCompileResult::Matched(...)`. Otherwise it returns the
61/// step back via `NotHandled(step)`.
62pub(crate) trait StepCompiler: Send + Sync {
63    fn compile(
64        &self,
65        step: BuilderStep,
66        step_index: usize,
67        ctx: &CompilationContext,
68        registry: &StepCompilerRegistry,
69    ) -> StepCompileResult;
70}
71
72/// Shared context passed to every compiler invocation.
73pub(crate) struct CompilationContext<'a> {
74    pub producer_ctx: &'a ProducerContext,
75    pub rt: Arc<dyn RuntimeObservability>,
76    pub languages: &'a SharedLanguageRegistry,
77    pub beans: &'a Arc<std::sync::Mutex<BeanRegistry>>,
78    pub function_invoker: Option<Arc<dyn FunctionInvoker>>,
79    pub component_ctx: Arc<dyn ComponentContext>,
80    pub route_id: Option<&'a str>,
81    pub staging_mode: &'a FunctionStagingMode,
82}
83
84impl<'a> CompilationContext<'a> {
85    /// Recursively compile child steps. Used by compilers that have sub-pipelines
86    /// (Filter, Choice, Split, Loop, etc.).
87    pub fn compile_children(
88        &self,
89        steps: Vec<BuilderStep>,
90        registry: &StepCompilerRegistry,
91    ) -> Result<Vec<CompiledStep>, CamelError> {
92        registry.compile_steps(steps, self)
93    }
94}
95
96/// Registry of step compilers. Steps are dispatched to compilers in registration
97/// order. The first matching compiler handles the step.
98pub(crate) struct StepCompilerRegistry {
99    compilers: Vec<Box<dyn StepCompiler>>,
100}
101
102impl StepCompilerRegistry {
103    pub fn new() -> Self {
104        Self {
105            compilers: Vec::new(),
106        }
107    }
108
109    pub fn register(&mut self, compiler: Box<dyn StepCompiler>) {
110        self.compilers.push(compiler);
111    }
112
113    /// Try each compiler in order. The first to return `Matched` wins.
114    /// If all return `NotHandled`, returns `None`.
115    pub fn compile_step(
116        &self,
117        step: BuilderStep,
118        step_index: usize,
119        ctx: &CompilationContext,
120    ) -> Option<Result<CompiledStep, CamelError>> {
121        let mut step = step;
122        for compiler in &self.compilers {
123            match compiler.compile(step, step_index, ctx, self) {
124                StepCompileResult::Matched(result) => return Some(result),
125                StepCompileResult::NotHandled(s) => step = s,
126            }
127        }
128        None
129    }
130
131    /// Compile all steps in a vector.
132    pub fn compile_steps(
133        &self,
134        steps: Vec<BuilderStep>,
135        ctx: &CompilationContext,
136    ) -> Result<Vec<CompiledStep>, CamelError> {
137        let mut out = Vec::with_capacity(steps.len());
138        for (i, step) in steps.into_iter().enumerate() {
139            match self.compile_step(step, i, ctx) {
140                Some(Ok(c)) => out.push(c),
141                Some(Err(e)) => return Err(e),
142                None => {
143                    return Err(CamelError::RouteError(
144                        "no compiler registered for step variant".into(),
145                    ));
146                }
147            }
148        }
149        Ok(out)
150    }
151}
152
153/// Parse a URI and create a producer, reusing `component_ctx`, `rt`, and `producer_ctx`
154/// from the compilation context.
155pub(crate) fn resolve_producer(
156    ctx: &CompilationContext,
157    uri: &str,
158) -> Result<BoxProcessor, CamelError> {
159    let parsed = parse_uri(uri)?;
160    let component = ctx
161        .component_ctx
162        .resolve_component(&parsed.scheme)
163        .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
164    let endpoint = component.create_endpoint(uri, ctx.component_ctx.as_ref())?;
165    endpoint.create_producer(Arc::clone(&ctx.rt), ctx.producer_ctx)
166}
167
168/// Build the full registry with all compiler groups.
169pub(crate) fn build_registry() -> StepCompilerRegistry {
170    let mut reg = StepCompilerRegistry::new();
171    reg.register(Box::new(core::CoreCompiler));
172    reg.register(Box::new(endpoints::EndpointsCompiler));
173    reg.register(Box::new(transforms::TransformsCompiler));
174    reg.register(Box::new(routing::RoutingCompiler));
175    reg.register(Box::new(control_flow::ControlFlowCompiler));
176    reg.register(Box::new(splitting::SplittingCompiler));
177    reg.register(Box::new(error_handling::ErrorHandlingCompiler));
178    reg
179}