Skip to main content

camel_core/lifecycle/adapters/step_compilers/
mod.rs

1//! StepCompiler registry pattern — extract from step_resolution.rs
2//!
3//! Each compiler group is responsible for matching specific `BuilderStep` variants.
4//! The registry dispatches each step to compilers in registration order; the first
5//! compiler that returns `Matched` wins. Compilers that don't handle a variant
6//! return `NotHandled(step)` to pass it to the next compiler.
7
8use std::sync::Arc;
9
10use camel_api::{BodyType, BoxProcessor, CamelError, FunctionInvoker, ProducerContext};
11use camel_component_api::{ComponentContext, RuntimeObservability};
12use camel_endpoint::parse_uri;
13
14use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
15use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
16use crate::lifecycle::application::route_definition::BuilderStep;
17use camel_bean::BeanRegistry;
18
19mod control_flow;
20mod core;
21mod endpoints;
22mod error_handling;
23mod routing;
24mod splitting;
25mod transforms;
26
27/// A compiled pipeline step.
28///
29/// `Process` is the normal case: a boxed processor plus its optional body
30/// contract. `Stop` (added in Task 3b) is the Stop EIP marker — `run_steps`
31/// recognises it and produces `PipelineOutcome::Stopped` without invoking a
32/// Tower service. `Segment` (added in Task 3) wraps an `OutcomeSegment` for
33/// structural EIPs with outcome-aware sub-pipelines.
34///
35/// **Boundary:** `CompiledStep` is the compile-time representation. At runtime,
36/// `run_steps` consumes a `Vec<CompiledStep>` (Stop variants included) and
37/// produces a `PipelineOutcome`; the wrapping `Service<Exchange>` impl
38/// translates `PipelineOutcome` back to `Result<Exchange, CamelError>`. See
39/// ADR-0024.
40#[derive(Debug, Clone)]
41pub enum CompiledStep {
42    Process {
43        processor: BoxProcessor,
44        body_contract: Option<BodyType>,
45    },
46    /// Stop EIP marker. `run_steps` produces `PipelineOutcome::Stopped(ex)`
47    /// without invoking a Tower service. Replaces `StopService` (Task 7).
48    Stop,
49    /// Outcome-aware structural EIP segment. `run_steps` invokes
50    /// `segment.run(ex)` and matches on the returned `PipelineOutcome`.
51    /// See ADR-0025.
52    Segment {
53        segment: camel_api::OutcomeSegment,
54        body_contract: Option<BodyType>,
55    },
56}
57
58/// Result from a compiler: either it handled the step (with success or error),
59/// or it did not recognize the variant and returns the step for the next compiler.
60pub(crate) enum StepCompileResult {
61    Matched(Result<CompiledStep, CamelError>),
62    NotHandled(BuilderStep),
63}
64
65/// A compiler that can handle one or more `BuilderStep` variants.
66///
67/// The `compile` method receives ownership of the step. If the compiler recognizes
68/// the variant it returns `StepCompileResult::Matched(...)`. Otherwise it returns the
69/// step back via `NotHandled(step)`.
70pub(crate) trait StepCompiler: Send + Sync {
71    fn compile(
72        &self,
73        step: BuilderStep,
74        step_index: usize,
75        ctx: &CompilationContext,
76        registry: &StepCompilerRegistry,
77    ) -> StepCompileResult;
78}
79
80/// Shared context passed to every compiler invocation.
81pub(crate) struct CompilationContext<'a> {
82    pub producer_ctx: &'a ProducerContext,
83    pub rt: Arc<dyn RuntimeObservability>,
84    pub languages: &'a SharedLanguageRegistry,
85    pub beans: &'a Arc<std::sync::Mutex<BeanRegistry>>,
86    pub function_invoker: Option<Arc<dyn FunctionInvoker>>,
87    pub component_ctx: Arc<dyn ComponentContext>,
88    pub route_id: Option<&'a str>,
89    pub staging_mode: &'a FunctionStagingMode,
90}
91
92impl<'a> CompilationContext<'a> {
93    /// Recursively compile child steps. Used by compilers that have sub-pipelines
94    /// (Filter, Choice, Split, Loop, etc.).
95    pub fn compile_children(
96        &self,
97        steps: Vec<BuilderStep>,
98        registry: &StepCompilerRegistry,
99    ) -> Result<Vec<CompiledStep>, CamelError> {
100        registry.compile_steps(steps, self)
101    }
102
103    /// Recursively compile child steps and map them into outcome-aware segments.
104    ///
105    /// Each `CompiledStep` variant is converted to a `Box<dyn OutcomePipeline>`:
106    /// - `Process` → `BoxProcessorSegment`, optionally wrapped in `BodyCoercingSegment`
107    /// - `Stop` → `StopSegment` (produces `PipelineOutcome::Stopped(ex)`)
108    /// - `Segment` → its inner `OutcomeSegment` (which now implements OutcomePipeline)
109    ///
110    /// This replaces the 22-line duplicated closure in Filter/DeclarativeFilter
111    /// (and will prevent 14+ more duplicates in T9–T16).
112    pub fn compile_children_segments(
113        &self,
114        steps: Vec<BuilderStep>,
115        registry: &StepCompilerRegistry,
116    ) -> Result<Vec<Box<dyn camel_api::OutcomePipeline>>, CamelError> {
117        let pairs = self.compile_children(steps, registry)?;
118        let segments: Vec<Box<dyn camel_api::OutcomePipeline>> = pairs
119            .into_iter()
120            .map(|c| match c {
121                CompiledStep::Process {
122                    processor,
123                    body_contract,
124                } => {
125                    let inner: Box<dyn camel_api::OutcomePipeline> = Box::new(
126                        crate::lifecycle::adapters::route_compiler::BoxProcessorSegment::new(
127                            processor,
128                        ),
129                    );
130                    match body_contract {
131                        Some(contract) => Box::new(
132                            crate::lifecycle::adapters::route_compiler::BodyCoercingSegment::new(
133                                inner, contract,
134                            ),
135                        ),
136                        None => inner,
137                    }
138                }
139                CompiledStep::Stop => {
140                    Box::new(crate::lifecycle::adapters::route_compiler::StopSegment)
141                        as Box<dyn camel_api::OutcomePipeline>
142                }
143                CompiledStep::Segment { segment, .. } => Box::new(segment),
144            })
145            .collect();
146        Ok(segments)
147    }
148}
149
150/// Registry of step compilers. Steps are dispatched to compilers in registration
151/// order. The first matching compiler handles the step.
152pub(crate) struct StepCompilerRegistry {
153    compilers: Vec<Box<dyn StepCompiler>>,
154}
155
156impl StepCompilerRegistry {
157    pub fn new() -> Self {
158        Self {
159            compilers: Vec::new(),
160        }
161    }
162
163    pub fn register(&mut self, compiler: Box<dyn StepCompiler>) {
164        self.compilers.push(compiler);
165    }
166
167    /// Try each compiler in order. The first to return `Matched` wins.
168    /// If all return `NotHandled`, returns `None`.
169    pub fn compile_step(
170        &self,
171        step: BuilderStep,
172        step_index: usize,
173        ctx: &CompilationContext,
174    ) -> Option<Result<CompiledStep, CamelError>> {
175        let mut step = step;
176        for compiler in &self.compilers {
177            match compiler.compile(step, step_index, ctx, self) {
178                StepCompileResult::Matched(result) => return Some(result),
179                StepCompileResult::NotHandled(s) => step = s,
180            }
181        }
182        None
183    }
184
185    /// Compile all steps in a vector.
186    pub fn compile_steps(
187        &self,
188        steps: Vec<BuilderStep>,
189        ctx: &CompilationContext,
190    ) -> Result<Vec<CompiledStep>, CamelError> {
191        let mut out = Vec::with_capacity(steps.len());
192        for (i, step) in steps.into_iter().enumerate() {
193            match self.compile_step(step, i, ctx) {
194                Some(Ok(c)) => out.push(c),
195                Some(Err(e)) => return Err(e),
196                None => {
197                    return Err(CamelError::RouteError(
198                        "no compiler registered for step variant".into(),
199                    ));
200                }
201            }
202        }
203        Ok(out)
204    }
205}
206
207/// Parse a URI and create a producer, reusing `component_ctx`, `rt`, and `producer_ctx`
208/// from the compilation context.
209pub(crate) fn resolve_producer(
210    ctx: &CompilationContext,
211    uri: &str,
212) -> Result<BoxProcessor, CamelError> {
213    let parsed = parse_uri(uri)?;
214    let component = ctx
215        .component_ctx
216        .resolve_component(&parsed.scheme)
217        .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
218    let endpoint = component.create_endpoint(uri, ctx.component_ctx.as_ref())?;
219    endpoint.create_producer(Arc::clone(&ctx.rt), ctx.producer_ctx)
220}
221
222#[cfg(test)]
223mod segment_tests {
224    use super::*;
225    use camel_api::{Exchange, OutcomePipeline, PipelineOutcome};
226    use std::future::Future;
227    use std::pin::Pin;
228
229    #[derive(Clone)]
230    struct EchoSegment;
231
232    impl OutcomePipeline for EchoSegment {
233        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
234            Box::new(EchoSegment)
235        }
236        fn run<'a>(
237            &'a mut self,
238            exchange: Exchange,
239        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
240            Box::pin(async move { PipelineOutcome::Completed(exchange) })
241        }
242    }
243
244    #[test]
245    fn compiled_step_segment_clone_compiles() {
246        let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
247        let step = CompiledStep::Segment {
248            segment: seg,
249            body_contract: None,
250        };
251        let _cloned = step.clone();
252        if let CompiledStep::Segment { segment: _, .. } = _cloned {
253            // ok
254        } else {
255            panic!("clone should preserve variant");
256        }
257    }
258
259    #[test]
260    fn compiled_step_segment_debug_renders() {
261        let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
262        let step = CompiledStep::Segment {
263            segment: seg,
264            body_contract: None,
265        };
266        let s = format!("{:?}", step);
267        assert!(
268            s.contains("Segment"),
269            "debug should mention Segment variant: {s}"
270        );
271    }
272
273    #[test]
274    fn outcome_segment_satisfies_clone_send_static() {
275        fn assert_traits<T: Clone + Send + 'static>() {}
276        assert_traits::<camel_api::OutcomeSegment>();
277    }
278
279    #[tokio::test]
280    async fn outcome_segment_survives_arcswap_swap() {
281        use arc_swap::ArcSwap;
282        use camel_api::{Exchange, Message, OutcomePipeline, PipelineOutcome};
283        use std::sync::Arc;
284
285        #[derive(Clone)]
286        struct EchoSegment;
287        impl OutcomePipeline for EchoSegment {
288            fn clone_box(&self) -> Box<dyn OutcomePipeline> {
289                Box::new(EchoSegment)
290            }
291            fn run<'a>(
292                &'a mut self,
293                ex: Exchange,
294            ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>
295            {
296                Box::pin(async move { PipelineOutcome::Completed(ex) })
297            }
298        }
299
300        let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
301        let slot: ArcSwap<Option<camel_api::OutcomeSegment>> = ArcSwap::from_pointee(None);
302        slot.store(Arc::new(Some(seg.clone())));
303        slot.store(Arc::new(Some(seg)));
304
305        let mut borrowed = slot.load().as_ref().clone().unwrap();
306        let outcome = borrowed.run(Exchange::new(Message::new("ping"))).await;
307        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
308    }
309}
310
311/// Build the full registry with all compiler groups.
312pub(crate) fn build_registry() -> StepCompilerRegistry {
313    let mut reg = StepCompilerRegistry::new();
314    reg.register(Box::new(core::CoreCompiler));
315    reg.register(Box::new(endpoints::EndpointsCompiler));
316    reg.register(Box::new(transforms::TransformsCompiler));
317    reg.register(Box::new(routing::RoutingCompiler));
318    reg.register(Box::new(control_flow::ControlFlowCompiler));
319    reg.register(Box::new(splitting::SplittingCompiler));
320    reg.register(Box::new(error_handling::ErrorHandlingCompiler));
321    reg
322}