Skip to main content

camel_core/lifecycle/adapters/step_compilers/
mod.rs

1//! StepCompiler registry pattern — extract from step_resolution.rs
2//!
3//! Each compiler group is responsible for matching specific `BuilderStep` variants.
4//! The registry dispatches each step to compilers in registration order; the first
5//! compiler that returns `Matched` wins. Compilers that don't handle a variant
6//! return `NotHandled(step)` to pass it to the next compiler.
7
8use std::sync::Arc;
9
10use camel_api::{
11    BodyType, BoxProcessor, CamelError, FunctionInvoker, ProducerContext, StepLifecycle,
12};
13use camel_component_api::{ComponentContext, RuntimeObservability};
14use camel_endpoint::parse_uri;
15
16use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
17use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
18use crate::lifecycle::application::route_definition::BuilderStep;
19use crate::{ClaimCheckRegistry, IdempotentRegistry};
20use camel_bean::BeanRegistry;
21
22mod control_flow;
23mod core;
24mod endpoints;
25mod error_handling;
26mod routing;
27mod splitting;
28mod transforms;
29
30/// A compiled pipeline step.
31///
32/// `Process` is the normal case: a boxed processor plus its optional body
33/// contract. `Stop` (added in Task 3b) is the Stop EIP marker — `run_steps`
34/// recognises it and produces `PipelineOutcome::Stopped` without invoking a
35/// Tower service. `Segment` (added in Task 3) wraps an `OutcomeSegment` for
36/// structural EIPs with outcome-aware sub-pipelines.
37///
38/// **Boundary:** `CompiledStep` is the compile-time representation. At runtime,
39/// `run_steps` consumes a `Vec<CompiledStep>` (Stop variants included) and
40/// produces a `PipelineOutcome`; the wrapping `Service<Exchange>` impl
41/// translates `PipelineOutcome` back to `Result<Exchange, CamelError>`. See
42/// ADR-0024.
43#[derive(Debug, Clone)]
44pub enum CompiledStep {
45    Process {
46        processor: BoxProcessor,
47        body_contract: Option<BodyType>,
48        /// Lifecycle handle for this processor, if it is stateful.
49        /// `None` for stateless processors (the common case).
50        lifecycle: Option<Arc<dyn StepLifecycle>>,
51    },
52    /// Stop EIP marker. `run_steps` produces `PipelineOutcome::Stopped(ex)`
53    /// without invoking a Tower service. Replaces `StopService` (Task 7).
54    Stop,
55    /// Outcome-aware structural EIP segment. `run_steps` invokes
56    /// `segment.run(ex)` and matches on the returned `PipelineOutcome`.
57    /// See ADR-0025.
58    Segment {
59        segment: camel_api::OutcomeSegment,
60        body_contract: Option<BodyType>,
61        /// Lifecycle handles from children nested inside this segment.
62        /// `Option<Vec<...>>` (not `Option<Arc<...>>`) so multiple stateful
63        /// children (e.g. Idempotent+Resequencer inside Filter) each
64        /// register independently.
65        lifecycle: Option<Vec<Arc<dyn StepLifecycle>>>,
66    },
67}
68
69/// Result from a compiler: either it handled the step (with success or error),
70/// or it did not recognize the variant and returns the step for the next compiler.
71pub(crate) enum StepCompileResult {
72    Matched(Result<CompiledStep, CamelError>),
73    NotHandled(BuilderStep),
74}
75
76/// A compiler that can handle one or more `BuilderStep` variants.
77///
78/// The `compile` method receives ownership of the step. If the compiler recognizes
79/// the variant it returns `StepCompileResult::Matched(...)`. Otherwise it returns the
80/// step back via `NotHandled(step)`.
81pub(crate) trait StepCompiler: Send + Sync {
82    fn compile(
83        &self,
84        step: BuilderStep,
85        step_index: usize,
86        ctx: &CompilationContext,
87        registry: &StepCompilerRegistry,
88    ) -> StepCompileResult;
89}
90
91/// Shared context passed to every compiler invocation.
92pub(crate) struct CompilationContext<'a> {
93    pub producer_ctx: &'a ProducerContext,
94    pub rt: Arc<dyn RuntimeObservability>,
95    pub languages: &'a SharedLanguageRegistry,
96    pub beans: &'a Arc<std::sync::Mutex<BeanRegistry>>,
97    pub function_invoker: Option<Arc<dyn FunctionInvoker>>,
98    pub component_ctx: Arc<dyn ComponentContext>,
99    pub route_id: Option<&'a str>,
100    pub staging_mode: &'a FunctionStagingMode,
101    /// Idempotent repository registry. Used by the `IdempotentConsumer`
102    /// compiler arm to resolve repository names into `Arc<dyn IdempotentRepository>`.
103    pub idempotent_repositories: &'a IdempotentRegistry,
104    /// Claim check repository registry. Used by the `ClaimCheck` compiler arm
105    /// to resolve repository names into `Arc<dyn ClaimCheckRepository>`.
106    pub claim_check_repositories: &'a ClaimCheckRegistry,
107}
108
109impl<'a> CompilationContext<'a> {
110    /// Recursively compile child steps. Used by compilers that have sub-pipelines
111    /// (Filter, Choice, Split, Loop, etc.).
112    pub fn compile_children(
113        &self,
114        steps: Vec<BuilderStep>,
115        registry: &StepCompilerRegistry,
116    ) -> Result<Vec<CompiledStep>, CamelError> {
117        registry.compile_steps(steps, self)
118    }
119
120    /// Recursively compile child steps and map them into outcome-aware segments.
121    ///
122    /// Each `CompiledStep` variant is converted to a `Box<dyn OutcomePipeline>`:
123    /// - `Process` → `BoxProcessorSegment`, optionally wrapped in `BodyCoercingSegment`
124    /// - `Stop` → `StopSegment` (produces `PipelineOutcome::Stopped(ex)`)
125    /// - `Segment` → its inner `OutcomeSegment` (which now implements OutcomePipeline)
126    ///
127    /// This replaces the 22-line duplicated closure in Filter/DeclarativeFilter
128    /// (and will prevent 14+ more duplicates in T9–T16).
129    #[allow(clippy::type_complexity)]
130    pub fn compile_children_segments(
131        &self,
132        steps: Vec<BuilderStep>,
133        registry: &StepCompilerRegistry,
134    ) -> Result<
135        (
136            Vec<Box<dyn camel_api::OutcomePipeline>>,
137            Vec<Arc<dyn camel_api::StepLifecycle>>,
138        ),
139        CamelError,
140    > {
141        let pairs = self.compile_children(steps, registry)?;
142        let mut lifecycle_handles: Vec<Arc<dyn camel_api::StepLifecycle>> = Vec::new();
143        let segments: Vec<Box<dyn camel_api::OutcomePipeline>> = pairs
144            .into_iter()
145            .map(|c| match c {
146                CompiledStep::Process {
147                    processor,
148                    body_contract,
149                    lifecycle,
150                } => {
151                    if let Some(lc) = lifecycle {
152                        lifecycle_handles.push(lc);
153                    }
154                    let inner: Box<dyn camel_api::OutcomePipeline> = Box::new(
155                        crate::lifecycle::adapters::route_compiler::BoxProcessorSegment::new(
156                            processor,
157                        ),
158                    );
159                    match body_contract {
160                        Some(contract) => Box::new(
161                            crate::lifecycle::adapters::route_compiler::BodyCoercingSegment::new(
162                                inner, contract,
163                            ),
164                        ),
165                        None => inner,
166                    }
167                }
168                CompiledStep::Stop => {
169                    Box::new(crate::lifecycle::adapters::route_compiler::StopSegment)
170                        as Box<dyn camel_api::OutcomePipeline>
171                }
172                CompiledStep::Segment {
173                    segment,
174                    body_contract: _,
175                    lifecycle,
176                } => {
177                    if let Some(lcs) = lifecycle {
178                        lifecycle_handles.extend(lcs);
179                    }
180                    Box::new(segment)
181                }
182            })
183            .collect();
184        Ok((segments, lifecycle_handles))
185    }
186}
187
188/// Registry of step compilers. Steps are dispatched to compilers in registration
189/// order. The first matching compiler handles the step.
190pub(crate) struct StepCompilerRegistry {
191    compilers: Vec<Box<dyn StepCompiler>>,
192}
193
194impl StepCompilerRegistry {
195    pub fn new() -> Self {
196        Self {
197            compilers: Vec::new(),
198        }
199    }
200
201    pub fn register(&mut self, compiler: Box<dyn StepCompiler>) {
202        self.compilers.push(compiler);
203    }
204
205    /// Try each compiler in order. The first to return `Matched` wins.
206    /// If all return `NotHandled`, returns `None`.
207    pub fn compile_step(
208        &self,
209        step: BuilderStep,
210        step_index: usize,
211        ctx: &CompilationContext,
212    ) -> Option<Result<CompiledStep, CamelError>> {
213        let mut step = step;
214        for compiler in &self.compilers {
215            match compiler.compile(step, step_index, ctx, self) {
216                StepCompileResult::Matched(result) => return Some(result),
217                StepCompileResult::NotHandled(s) => step = s,
218            }
219        }
220        None
221    }
222
223    /// Compile all steps in a vector.
224    pub fn compile_steps(
225        &self,
226        steps: Vec<BuilderStep>,
227        ctx: &CompilationContext,
228    ) -> Result<Vec<CompiledStep>, CamelError> {
229        let mut out = Vec::with_capacity(steps.len());
230        for (i, step) in steps.into_iter().enumerate() {
231            match self.compile_step(step, i, ctx) {
232                Some(Ok(c)) => out.push(c),
233                Some(Err(e)) => return Err(e),
234                None => {
235                    return Err(CamelError::RouteError(
236                        "no compiler registered for step variant".into(),
237                    ));
238                }
239            }
240        }
241        Ok(out)
242    }
243}
244
245/// Parse a URI and create a producer, reusing `component_ctx`, `rt`, and `producer_ctx`
246/// from the compilation context.
247pub(crate) fn resolve_producer(
248    ctx: &CompilationContext,
249    uri: &str,
250) -> Result<BoxProcessor, CamelError> {
251    let parsed = parse_uri(uri)?;
252    let component = ctx
253        .component_ctx
254        .resolve_component(&parsed.scheme)
255        .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
256    let endpoint = component.create_endpoint(uri, ctx.component_ctx.as_ref())?;
257    endpoint.create_producer(Arc::clone(&ctx.rt), ctx.producer_ctx)
258}
259
260/// Pack a lifecycle Vec into `None` when empty, `Some` when non-empty.
261/// Preserves the invariant that `Some` always implies ≥1 handle.
262pub(super) fn pack_lifecycles(
263    lifecycles: Vec<Arc<dyn StepLifecycle>>,
264) -> Option<Vec<Arc<dyn StepLifecycle>>> {
265    if lifecycles.is_empty() {
266        None
267    } else {
268        Some(lifecycles)
269    }
270}
271
272/// Build the full registry with all compiler groups.
273pub(crate) fn build_registry() -> StepCompilerRegistry {
274    let mut reg = StepCompilerRegistry::new();
275    reg.register(Box::new(core::CoreCompiler));
276    reg.register(Box::new(endpoints::EndpointsCompiler));
277    reg.register(Box::new(transforms::TransformsCompiler));
278    reg.register(Box::new(routing::RoutingCompiler));
279    reg.register(Box::new(control_flow::ControlFlowCompiler));
280    reg.register(Box::new(splitting::SplittingCompiler));
281    reg.register(Box::new(error_handling::ErrorHandlingCompiler));
282    reg
283}
284
285#[cfg(test)]
286mod segment_tests {
287    use super::*;
288    use camel_api::{Exchange, OutcomePipeline, PipelineOutcome};
289    use std::future::Future;
290    use std::pin::Pin;
291
292    #[derive(Clone)]
293    struct EchoSegment;
294
295    impl OutcomePipeline for EchoSegment {
296        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
297            Box::new(EchoSegment)
298        }
299        fn run<'a>(
300            &'a mut self,
301            exchange: Exchange,
302        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
303            Box::pin(async move { PipelineOutcome::Completed(exchange) })
304        }
305    }
306
307    #[test]
308    fn compiled_step_segment_clone_compiles() {
309        let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
310        let step = CompiledStep::Segment {
311            segment: seg,
312            body_contract: None,
313            lifecycle: None,
314        };
315        let _cloned = step.clone();
316        if let CompiledStep::Segment { segment: _, .. } = _cloned {
317            // ok
318        } else {
319            panic!("clone should preserve variant");
320        }
321    }
322
323    #[test]
324    fn compiled_step_segment_debug_renders() {
325        let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
326        let step = CompiledStep::Segment {
327            segment: seg,
328            body_contract: None,
329            lifecycle: None,
330        };
331        let s = format!("{:?}", step);
332        assert!(
333            s.contains("Segment"),
334            "debug should mention Segment variant: {s}"
335        );
336    }
337
338    #[test]
339    fn outcome_segment_satisfies_clone_send_static() {
340        fn assert_traits<T: Clone + Send + 'static>() {}
341        assert_traits::<camel_api::OutcomeSegment>();
342    }
343
344    #[tokio::test]
345    #[allow(clippy::arc_with_non_send_sync)]
346    async fn outcome_segment_survives_arcswap_swap() {
347        use arc_swap::ArcSwap;
348        use camel_api::{Exchange, Message, OutcomePipeline, PipelineOutcome};
349        use std::sync::Arc;
350
351        #[derive(Clone)]
352        struct EchoSegment;
353        impl OutcomePipeline for EchoSegment {
354            fn clone_box(&self) -> Box<dyn OutcomePipeline> {
355                Box::new(EchoSegment)
356            }
357            fn run<'a>(
358                &'a mut self,
359                ex: Exchange,
360            ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>
361            {
362                Box::pin(async move { PipelineOutcome::Completed(ex) })
363            }
364        }
365
366        let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
367        let slot: ArcSwap<Option<camel_api::OutcomeSegment>> = ArcSwap::from_pointee(None);
368        slot.store(Arc::new(Some(seg.clone())));
369        slot.store(Arc::new(Some(seg)));
370
371        let mut borrowed = slot.load().as_ref().clone().unwrap();
372        let outcome = borrowed.run(Exchange::new(Message::new("ping"))).await;
373        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
374    }
375
376    /// Test lifecycle handle used by compile_children_segments_bubbles_child_lifecycle.
377    #[derive(Debug)]
378    struct TestLifecycle;
379
380    #[async_trait::async_trait]
381    impl camel_api::StepLifecycle for TestLifecycle {
382        fn name(&self) -> &'static str {
383            "test-lifecycle"
384        }
385        async fn shutdown(
386            &self,
387            _reason: camel_api::StepShutdownReason,
388        ) -> Result<(), camel_api::CamelError> {
389            Ok(())
390        }
391    }
392
393    /// Custom compiler that injects a lifecycle handle into every
394    /// `BuilderStep::Processor` it compiles.
395    struct LifecycleInjectorCompiler {
396        handle: Arc<dyn camel_api::StepLifecycle>,
397    }
398
399    impl StepCompiler for LifecycleInjectorCompiler {
400        fn compile(
401            &self,
402            step: BuilderStep,
403            _step_index: usize,
404            _ctx: &CompilationContext,
405            _registry: &StepCompilerRegistry,
406        ) -> StepCompileResult {
407            match step {
408                BuilderStep::Processor(svc) => {
409                    StepCompileResult::Matched(Ok(CompiledStep::Process {
410                        processor: svc,
411                        body_contract: None,
412                        lifecycle: Some(self.handle.clone()),
413                    }))
414                }
415                other => StepCompileResult::NotHandled(other),
416            }
417        }
418    }
419
420    #[tokio::test]
421    async fn compile_children_segments_bubbles_child_lifecycle() {
422        use std::collections::HashMap;
423        use std::sync::Mutex;
424
425        use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
426        use camel_bean::BeanRegistry;
427        use camel_component_api::{
428            ComponentContext, NoOpComponentContext, RuntimeObservability,
429            test_support::NoopRuntimeObservability,
430        };
431
432        use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
433
434        let handle: Arc<dyn StepLifecycle> = Arc::new(TestLifecycle);
435
436        // Register lifecycle injector + real control-flow compiler so
437        // compile_children_segments runs through a structural EIP path.
438        let mut reg = StepCompilerRegistry::new();
439        reg.register(Box::new(LifecycleInjectorCompiler {
440            handle: handle.clone(),
441        }));
442        reg.register(Box::new(super::control_flow::ControlFlowCompiler));
443
444        let pc = ProducerContext::default();
445        let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
446        let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
447        let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
448        let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
449        let staging = FunctionStagingMode::DirectAdd;
450        let idempotent_repositories = crate::IdempotentRegistry::new();
451        let claim_check_repositories = crate::ClaimCheckRegistry::new();
452
453        let ctx = CompilationContext {
454            producer_ctx: &pc,
455            rt,
456            languages: &languages,
457            beans: &beans,
458            function_invoker: None,
459            component_ctx,
460            route_id: None,
461            staging_mode: &staging,
462            idempotent_repositories: &idempotent_repositories,
463            claim_check_repositories: &claim_check_repositories,
464        };
465
466        // Compile a Filter with a child Processor step.
467        let filter_step = BuilderStep::Filter {
468            predicate: Arc::new(|_| true),
469            steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
470                Box::pin(async move { Ok(ex) })
471            }))],
472        };
473
474        let result = reg.compile_step(filter_step, 0, &ctx);
475        let compiled = result
476            .expect("compilation should succeed")
477            .expect("should match");
478
479        match compiled {
480            CompiledStep::Segment {
481                lifecycle,
482                body_contract,
483                ..
484            } => {
485                assert_eq!(body_contract, None, "body_contract should be None");
486                let handles = lifecycle.expect("Segment should have lifecycle handles");
487                assert_eq!(handles.len(), 1, "expected 1 lifecycle handle");
488                assert_eq!(handles[0].name(), "test-lifecycle", "handle name mismatch");
489            }
490            other => panic!("Expected CompiledStep::Segment, got {other:?}"),
491        }
492    }
493
494    /// A lifecycle handle with a configurable name for multi-handle tests.
495    #[derive(Debug)]
496    struct NamedLifecycle(&'static str);
497
498    #[async_trait::async_trait]
499    impl camel_api::StepLifecycle for NamedLifecycle {
500        fn name(&self) -> &'static str {
501            self.0
502        }
503        async fn shutdown(
504            &self,
505            _reason: camel_api::StepShutdownReason,
506        ) -> Result<(), camel_api::CamelError> {
507            Ok(())
508        }
509    }
510
511    /// Test A: Multiple stateful children in one Segment → Vec length 2.
512    #[tokio::test]
513    async fn compile_children_segments_multiple_stateful_children() {
514        use std::collections::HashMap;
515        use std::sync::Mutex;
516
517        use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
518        use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
519        use camel_bean::BeanRegistry;
520        use camel_component_api::{
521            ComponentContext, NoOpComponentContext, RuntimeObservability,
522            test_support::NoopRuntimeObservability,
523        };
524
525        let handle: Arc<dyn StepLifecycle> = Arc::new(NamedLifecycle("multi"));
526
527        let mut reg = StepCompilerRegistry::new();
528        reg.register(Box::new(LifecycleInjectorCompiler {
529            handle: handle.clone(),
530        }));
531        reg.register(Box::new(super::control_flow::ControlFlowCompiler));
532
533        let pc = ProducerContext::default();
534        let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
535        let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
536        let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
537        let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
538        let staging = FunctionStagingMode::DirectAdd;
539        let idempotent_repositories = crate::IdempotentRegistry::new();
540        let claim_check_repositories = crate::ClaimCheckRegistry::new();
541
542        let ctx = CompilationContext {
543            producer_ctx: &pc,
544            rt,
545            languages: &languages,
546            beans: &beans,
547            function_invoker: None,
548            component_ctx,
549            route_id: None,
550            staging_mode: &staging,
551            idempotent_repositories: &idempotent_repositories,
552            claim_check_repositories: &claim_check_repositories,
553        };
554
555        // Filter with TWO child Processors → both get the same lifecycle handle.
556        let filter_step = BuilderStep::Filter {
557            predicate: Arc::new(|_| true),
558            steps: vec![
559                BuilderStep::Processor(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))),
560                BuilderStep::Processor(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))),
561            ],
562        };
563
564        let result = reg.compile_step(filter_step, 0, &ctx);
565        let compiled = result
566            .expect("compilation should succeed")
567            .expect("should match");
568
569        match compiled {
570            CompiledStep::Segment { lifecycle, .. } => {
571                let handles = lifecycle.expect("Segment should have lifecycle handles");
572                assert_eq!(
573                    handles.len(),
574                    2,
575                    "expected 2 lifecycle handles for 2 children"
576                );
577                for h in &handles {
578                    assert_eq!(h.name(), "multi", "all handles should be 'multi'");
579                }
580            }
581            other => panic!("Expected CompiledStep::Segment, got {other:?}"),
582        }
583    }
584
585    /// Test B: Multi-branch accumulation across Choice when-clauses.
586    #[tokio::test]
587    async fn compile_children_segments_multi_branch_accumulation() {
588        use std::collections::HashMap;
589        use std::sync::Mutex;
590
591        use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
592        use crate::lifecycle::application::route_definition::WhenStep;
593        use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
594        use camel_bean::BeanRegistry;
595        use camel_component_api::{
596            ComponentContext, NoOpComponentContext, RuntimeObservability,
597            test_support::NoopRuntimeObservability,
598        };
599
600        let handle: Arc<dyn StepLifecycle> = Arc::new(NamedLifecycle("branch"));
601
602        let mut reg = StepCompilerRegistry::new();
603        reg.register(Box::new(LifecycleInjectorCompiler {
604            handle: handle.clone(),
605        }));
606        reg.register(Box::new(super::control_flow::ControlFlowCompiler));
607
608        let pc = ProducerContext::default();
609        let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
610        let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
611        let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
612        let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
613        let staging = FunctionStagingMode::DirectAdd;
614        let idempotent_repositories = crate::IdempotentRegistry::new();
615        let claim_check_repositories = crate::ClaimCheckRegistry::new();
616
617        let ctx = CompilationContext {
618            producer_ctx: &pc,
619            rt,
620            languages: &languages,
621            beans: &beans,
622            function_invoker: None,
623            component_ctx,
624            route_id: None,
625            staging_mode: &staging,
626            idempotent_repositories: &idempotent_repositories,
627            claim_check_repositories: &claim_check_repositories,
628        };
629
630        // Choice with 2 when branches, each containing 1 stateful child.
631        let choice_step = BuilderStep::Choice {
632            whens: vec![
633                WhenStep {
634                    predicate: Arc::new(|_| true),
635                    steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
636                        Box::pin(async move { Ok(ex) })
637                    }))],
638                },
639                WhenStep {
640                    predicate: Arc::new(|_| false),
641                    steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
642                        Box::pin(async move { Ok(ex) })
643                    }))],
644                },
645            ],
646            otherwise: None,
647        };
648
649        let result = reg.compile_step(choice_step, 0, &ctx);
650        let compiled = result
651            .expect("compilation should succeed")
652            .expect("should match");
653
654        match compiled {
655            CompiledStep::Segment { lifecycle, .. } => {
656                let handles = lifecycle.expect("Segment should have lifecycle handles");
657                assert_eq!(
658                    handles.len(),
659                    2,
660                    "expected 2 lifecycle handles from 2 branches"
661                );
662                for h in &handles {
663                    assert_eq!(h.name(), "branch", "all handles should be 'branch'");
664                }
665            }
666            other => panic!("Expected CompiledStep::Segment, got {other:?}"),
667        }
668    }
669
670    /// Test C: Nested Segment-in-Segment flattening — outer Segment contains
671    /// innermost lifecycle handle from a grandchild Processor.
672    #[tokio::test]
673    async fn compile_children_segments_nested_segment_flattening() {
674        use std::collections::HashMap;
675        use std::sync::Mutex;
676
677        use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
678        use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
679        use camel_bean::BeanRegistry;
680        use camel_component_api::{
681            ComponentContext, NoOpComponentContext, RuntimeObservability,
682            test_support::NoopRuntimeObservability,
683        };
684
685        let inner_handle: Arc<dyn StepLifecycle> = Arc::new(NamedLifecycle("deep"));
686
687        let mut reg = StepCompilerRegistry::new();
688        reg.register(Box::new(LifecycleInjectorCompiler {
689            handle: inner_handle.clone(),
690        }));
691        reg.register(Box::new(super::control_flow::ControlFlowCompiler));
692
693        let pc = ProducerContext::default();
694        let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
695        let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
696        let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
697        let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
698        let staging = FunctionStagingMode::DirectAdd;
699        let idempotent_repositories = crate::IdempotentRegistry::new();
700        let claim_check_repositories = crate::ClaimCheckRegistry::new();
701
702        let ctx = CompilationContext {
703            producer_ctx: &pc,
704            rt,
705            languages: &languages,
706            beans: &beans,
707            function_invoker: None,
708            component_ctx,
709            route_id: None,
710            staging_mode: &staging,
711            idempotent_repositories: &idempotent_repositories,
712            claim_check_repositories: &claim_check_repositories,
713        };
714
715        // Outer Filter containing an inner Filter that has a stateful Processor.
716        // The outer Segment's lifecycle should contain the innermost handle
717        // (proves recursive flattening through compile_children_segments).
718        let inner_filter = BuilderStep::Filter {
719            predicate: Arc::new(|_| true),
720            steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
721                Box::pin(async move { Ok(ex) })
722            }))],
723        };
724
725        let outer_filter = BuilderStep::Filter {
726            predicate: Arc::new(|_| true),
727            steps: vec![inner_filter],
728        };
729
730        let result = reg.compile_step(outer_filter, 0, &ctx);
731        let compiled = result
732            .expect("compilation should succeed")
733            .expect("should match");
734
735        match compiled {
736            CompiledStep::Segment { lifecycle, .. } => {
737                let handles = lifecycle.expect("outer Segment should have lifecycle handles");
738                assert_eq!(handles.len(), 1, "expected 1 innermost lifecycle handle");
739                assert_eq!(
740                    handles[0].name(),
741                    "deep",
742                    "handle should be from innermost child"
743                );
744            }
745            other => panic!("Expected CompiledStep::Segment, got {other:?}"),
746        }
747    }
748}