1use std::sync::Arc;
9
10use camel_api::{
11 BodyType, BoxProcessor, CamelError, FunctionInvoker, ProducerContext, StepLifecycle,
12};
13use camel_component_api::{ComponentContext, RuntimeObservability};
14use camel_endpoint::parse_uri;
15
16use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
17use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
18use crate::lifecycle::application::route_definition::BuilderStep;
19use crate::{ClaimCheckRegistry, IdempotentRegistry};
20use camel_bean::BeanRegistry;
21
22mod control_flow;
23mod core;
24mod endpoints;
25mod error_handling;
26mod routing;
27mod splitting;
28mod transforms;
29
30#[derive(Debug, Clone)]
44pub enum CompiledStep {
45 Process {
46 processor: BoxProcessor,
47 body_contract: Option<BodyType>,
48 lifecycle: Option<Arc<dyn StepLifecycle>>,
51 },
52 Stop,
55 Segment {
59 segment: camel_api::OutcomeSegment,
60 body_contract: Option<BodyType>,
61 lifecycle: Option<Vec<Arc<dyn StepLifecycle>>>,
66 },
67}
68
69pub(crate) enum StepCompileResult {
72 Matched(Result<CompiledStep, CamelError>),
73 NotHandled(BuilderStep),
74}
75
76pub(crate) trait StepCompiler: Send + Sync {
82 fn compile(
83 &self,
84 step: BuilderStep,
85 step_index: usize,
86 ctx: &CompilationContext,
87 registry: &StepCompilerRegistry,
88 ) -> StepCompileResult;
89}
90
91pub(crate) struct CompilationContext<'a> {
93 pub producer_ctx: &'a ProducerContext,
94 pub rt: Arc<dyn RuntimeObservability>,
95 pub languages: &'a SharedLanguageRegistry,
96 pub beans: &'a Arc<std::sync::Mutex<BeanRegistry>>,
97 pub function_invoker: Option<Arc<dyn FunctionInvoker>>,
98 pub component_ctx: Arc<dyn ComponentContext>,
99 pub route_id: Option<&'a str>,
100 pub staging_mode: &'a FunctionStagingMode,
101 pub idempotent_repositories: &'a IdempotentRegistry,
104 pub claim_check_repositories: &'a ClaimCheckRegistry,
107}
108
109impl<'a> CompilationContext<'a> {
110 pub fn compile_children(
113 &self,
114 steps: Vec<BuilderStep>,
115 registry: &StepCompilerRegistry,
116 ) -> Result<Vec<CompiledStep>, CamelError> {
117 registry.compile_steps(steps, self)
118 }
119
120 #[allow(clippy::type_complexity)]
130 pub fn compile_children_segments(
131 &self,
132 steps: Vec<BuilderStep>,
133 registry: &StepCompilerRegistry,
134 ) -> Result<
135 (
136 Vec<Box<dyn camel_api::OutcomePipeline>>,
137 Vec<Arc<dyn camel_api::StepLifecycle>>,
138 ),
139 CamelError,
140 > {
141 let pairs = self.compile_children(steps, registry)?;
142 let mut lifecycle_handles: Vec<Arc<dyn camel_api::StepLifecycle>> = Vec::new();
143 let segments: Vec<Box<dyn camel_api::OutcomePipeline>> = pairs
144 .into_iter()
145 .map(|c| match c {
146 CompiledStep::Process {
147 processor,
148 body_contract,
149 lifecycle,
150 } => {
151 if let Some(lc) = lifecycle {
152 lifecycle_handles.push(lc);
153 }
154 let inner: Box<dyn camel_api::OutcomePipeline> = Box::new(
155 crate::lifecycle::adapters::route_compiler::BoxProcessorSegment::new(
156 processor,
157 ),
158 );
159 match body_contract {
160 Some(contract) => Box::new(
161 crate::lifecycle::adapters::route_compiler::BodyCoercingSegment::new(
162 inner, contract,
163 ),
164 ),
165 None => inner,
166 }
167 }
168 CompiledStep::Stop => {
169 Box::new(crate::lifecycle::adapters::route_compiler::StopSegment)
170 as Box<dyn camel_api::OutcomePipeline>
171 }
172 CompiledStep::Segment {
173 segment,
174 body_contract: _,
175 lifecycle,
176 } => {
177 if let Some(lcs) = lifecycle {
178 lifecycle_handles.extend(lcs);
179 }
180 Box::new(segment)
181 }
182 })
183 .collect();
184 Ok((segments, lifecycle_handles))
185 }
186}
187
188pub(crate) struct StepCompilerRegistry {
191 compilers: Vec<Box<dyn StepCompiler>>,
192}
193
194impl StepCompilerRegistry {
195 pub fn new() -> Self {
196 Self {
197 compilers: Vec::new(),
198 }
199 }
200
201 pub fn register(&mut self, compiler: Box<dyn StepCompiler>) {
202 self.compilers.push(compiler);
203 }
204
205 pub fn compile_step(
208 &self,
209 step: BuilderStep,
210 step_index: usize,
211 ctx: &CompilationContext,
212 ) -> Option<Result<CompiledStep, CamelError>> {
213 let mut step = step;
214 for compiler in &self.compilers {
215 match compiler.compile(step, step_index, ctx, self) {
216 StepCompileResult::Matched(result) => return Some(result),
217 StepCompileResult::NotHandled(s) => step = s,
218 }
219 }
220 None
221 }
222
223 pub fn compile_steps(
225 &self,
226 steps: Vec<BuilderStep>,
227 ctx: &CompilationContext,
228 ) -> Result<Vec<CompiledStep>, CamelError> {
229 let mut out = Vec::with_capacity(steps.len());
230 for (i, step) in steps.into_iter().enumerate() {
231 match self.compile_step(step, i, ctx) {
232 Some(Ok(c)) => out.push(c),
233 Some(Err(e)) => return Err(e),
234 None => {
235 return Err(CamelError::RouteError(
236 "no compiler registered for step variant".into(),
237 ));
238 }
239 }
240 }
241 Ok(out)
242 }
243}
244
245pub(crate) fn resolve_producer(
248 ctx: &CompilationContext,
249 uri: &str,
250) -> Result<BoxProcessor, CamelError> {
251 let parsed = parse_uri(uri)?;
252 let component = ctx
253 .component_ctx
254 .resolve_component(&parsed.scheme)
255 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
256 let endpoint = component.create_endpoint(uri, ctx.component_ctx.as_ref())?;
257 endpoint.create_producer(Arc::clone(&ctx.rt), ctx.producer_ctx)
258}
259
260pub(super) fn pack_lifecycles(
263 lifecycles: Vec<Arc<dyn StepLifecycle>>,
264) -> Option<Vec<Arc<dyn StepLifecycle>>> {
265 if lifecycles.is_empty() {
266 None
267 } else {
268 Some(lifecycles)
269 }
270}
271
272pub(crate) fn build_registry() -> StepCompilerRegistry {
274 let mut reg = StepCompilerRegistry::new();
275 reg.register(Box::new(core::CoreCompiler));
276 reg.register(Box::new(endpoints::EndpointsCompiler));
277 reg.register(Box::new(transforms::TransformsCompiler));
278 reg.register(Box::new(routing::RoutingCompiler));
279 reg.register(Box::new(control_flow::ControlFlowCompiler));
280 reg.register(Box::new(splitting::SplittingCompiler));
281 reg.register(Box::new(error_handling::ErrorHandlingCompiler));
282 reg
283}
284
285#[cfg(test)]
286mod segment_tests {
287 use super::*;
288 use camel_api::{Exchange, OutcomePipeline, PipelineOutcome};
289 use std::future::Future;
290 use std::pin::Pin;
291
292 #[derive(Clone)]
293 struct EchoSegment;
294
295 impl OutcomePipeline for EchoSegment {
296 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
297 Box::new(EchoSegment)
298 }
299 fn run<'a>(
300 &'a mut self,
301 exchange: Exchange,
302 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
303 Box::pin(async move { PipelineOutcome::Completed(exchange) })
304 }
305 }
306
307 #[test]
308 fn compiled_step_segment_clone_compiles() {
309 let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
310 let step = CompiledStep::Segment {
311 segment: seg,
312 body_contract: None,
313 lifecycle: None,
314 };
315 let _cloned = step.clone();
316 if let CompiledStep::Segment { segment: _, .. } = _cloned {
317 } else {
319 panic!("clone should preserve variant");
320 }
321 }
322
323 #[test]
324 fn compiled_step_segment_debug_renders() {
325 let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
326 let step = CompiledStep::Segment {
327 segment: seg,
328 body_contract: None,
329 lifecycle: None,
330 };
331 let s = format!("{:?}", step);
332 assert!(
333 s.contains("Segment"),
334 "debug should mention Segment variant: {s}"
335 );
336 }
337
338 #[test]
339 fn outcome_segment_satisfies_clone_send_static() {
340 fn assert_traits<T: Clone + Send + 'static>() {}
341 assert_traits::<camel_api::OutcomeSegment>();
342 }
343
344 #[tokio::test]
345 #[allow(clippy::arc_with_non_send_sync)]
346 async fn outcome_segment_survives_arcswap_swap() {
347 use arc_swap::ArcSwap;
348 use camel_api::{Exchange, Message, OutcomePipeline, PipelineOutcome};
349 use std::sync::Arc;
350
351 #[derive(Clone)]
352 struct EchoSegment;
353 impl OutcomePipeline for EchoSegment {
354 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
355 Box::new(EchoSegment)
356 }
357 fn run<'a>(
358 &'a mut self,
359 ex: Exchange,
360 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>
361 {
362 Box::pin(async move { PipelineOutcome::Completed(ex) })
363 }
364 }
365
366 let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
367 let slot: ArcSwap<Option<camel_api::OutcomeSegment>> = ArcSwap::from_pointee(None);
368 slot.store(Arc::new(Some(seg.clone())));
369 slot.store(Arc::new(Some(seg)));
370
371 let mut borrowed = slot.load().as_ref().clone().unwrap();
372 let outcome = borrowed.run(Exchange::new(Message::new("ping"))).await;
373 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
374 }
375
376 #[derive(Debug)]
378 struct TestLifecycle;
379
380 #[async_trait::async_trait]
381 impl camel_api::StepLifecycle for TestLifecycle {
382 fn name(&self) -> &'static str {
383 "test-lifecycle"
384 }
385 async fn shutdown(
386 &self,
387 _reason: camel_api::StepShutdownReason,
388 ) -> Result<(), camel_api::CamelError> {
389 Ok(())
390 }
391 }
392
393 struct LifecycleInjectorCompiler {
396 handle: Arc<dyn camel_api::StepLifecycle>,
397 }
398
399 impl StepCompiler for LifecycleInjectorCompiler {
400 fn compile(
401 &self,
402 step: BuilderStep,
403 _step_index: usize,
404 _ctx: &CompilationContext,
405 _registry: &StepCompilerRegistry,
406 ) -> StepCompileResult {
407 match step {
408 BuilderStep::Processor(svc) => {
409 StepCompileResult::Matched(Ok(CompiledStep::Process {
410 processor: svc,
411 body_contract: None,
412 lifecycle: Some(self.handle.clone()),
413 }))
414 }
415 other => StepCompileResult::NotHandled(other),
416 }
417 }
418 }
419
420 #[tokio::test]
421 async fn compile_children_segments_bubbles_child_lifecycle() {
422 use std::collections::HashMap;
423 use std::sync::Mutex;
424
425 use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
426 use camel_bean::BeanRegistry;
427 use camel_component_api::{
428 ComponentContext, NoOpComponentContext, RuntimeObservability,
429 test_support::NoopRuntimeObservability,
430 };
431
432 use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
433
434 let handle: Arc<dyn StepLifecycle> = Arc::new(TestLifecycle);
435
436 let mut reg = StepCompilerRegistry::new();
439 reg.register(Box::new(LifecycleInjectorCompiler {
440 handle: handle.clone(),
441 }));
442 reg.register(Box::new(super::control_flow::ControlFlowCompiler));
443
444 let pc = ProducerContext::default();
445 let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
446 let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
447 let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
448 let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
449 let staging = FunctionStagingMode::DirectAdd;
450 let idempotent_repositories = crate::IdempotentRegistry::new();
451 let claim_check_repositories = crate::ClaimCheckRegistry::new();
452
453 let ctx = CompilationContext {
454 producer_ctx: &pc,
455 rt,
456 languages: &languages,
457 beans: &beans,
458 function_invoker: None,
459 component_ctx,
460 route_id: None,
461 staging_mode: &staging,
462 idempotent_repositories: &idempotent_repositories,
463 claim_check_repositories: &claim_check_repositories,
464 };
465
466 let filter_step = BuilderStep::Filter {
468 predicate: Arc::new(|_| true),
469 steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
470 Box::pin(async move { Ok(ex) })
471 }))],
472 };
473
474 let result = reg.compile_step(filter_step, 0, &ctx);
475 let compiled = result
476 .expect("compilation should succeed")
477 .expect("should match");
478
479 match compiled {
480 CompiledStep::Segment {
481 lifecycle,
482 body_contract,
483 ..
484 } => {
485 assert_eq!(body_contract, None, "body_contract should be None");
486 let handles = lifecycle.expect("Segment should have lifecycle handles");
487 assert_eq!(handles.len(), 1, "expected 1 lifecycle handle");
488 assert_eq!(handles[0].name(), "test-lifecycle", "handle name mismatch");
489 }
490 other => panic!("Expected CompiledStep::Segment, got {other:?}"),
491 }
492 }
493
494 #[derive(Debug)]
496 struct NamedLifecycle(&'static str);
497
498 #[async_trait::async_trait]
499 impl camel_api::StepLifecycle for NamedLifecycle {
500 fn name(&self) -> &'static str {
501 self.0
502 }
503 async fn shutdown(
504 &self,
505 _reason: camel_api::StepShutdownReason,
506 ) -> Result<(), camel_api::CamelError> {
507 Ok(())
508 }
509 }
510
511 #[tokio::test]
513 async fn compile_children_segments_multiple_stateful_children() {
514 use std::collections::HashMap;
515 use std::sync::Mutex;
516
517 use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
518 use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
519 use camel_bean::BeanRegistry;
520 use camel_component_api::{
521 ComponentContext, NoOpComponentContext, RuntimeObservability,
522 test_support::NoopRuntimeObservability,
523 };
524
525 let handle: Arc<dyn StepLifecycle> = Arc::new(NamedLifecycle("multi"));
526
527 let mut reg = StepCompilerRegistry::new();
528 reg.register(Box::new(LifecycleInjectorCompiler {
529 handle: handle.clone(),
530 }));
531 reg.register(Box::new(super::control_flow::ControlFlowCompiler));
532
533 let pc = ProducerContext::default();
534 let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
535 let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
536 let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
537 let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
538 let staging = FunctionStagingMode::DirectAdd;
539 let idempotent_repositories = crate::IdempotentRegistry::new();
540 let claim_check_repositories = crate::ClaimCheckRegistry::new();
541
542 let ctx = CompilationContext {
543 producer_ctx: &pc,
544 rt,
545 languages: &languages,
546 beans: &beans,
547 function_invoker: None,
548 component_ctx,
549 route_id: None,
550 staging_mode: &staging,
551 idempotent_repositories: &idempotent_repositories,
552 claim_check_repositories: &claim_check_repositories,
553 };
554
555 let filter_step = BuilderStep::Filter {
557 predicate: Arc::new(|_| true),
558 steps: vec![
559 BuilderStep::Processor(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))),
560 BuilderStep::Processor(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))),
561 ],
562 };
563
564 let result = reg.compile_step(filter_step, 0, &ctx);
565 let compiled = result
566 .expect("compilation should succeed")
567 .expect("should match");
568
569 match compiled {
570 CompiledStep::Segment { lifecycle, .. } => {
571 let handles = lifecycle.expect("Segment should have lifecycle handles");
572 assert_eq!(
573 handles.len(),
574 2,
575 "expected 2 lifecycle handles for 2 children"
576 );
577 for h in &handles {
578 assert_eq!(h.name(), "multi", "all handles should be 'multi'");
579 }
580 }
581 other => panic!("Expected CompiledStep::Segment, got {other:?}"),
582 }
583 }
584
585 #[tokio::test]
587 async fn compile_children_segments_multi_branch_accumulation() {
588 use std::collections::HashMap;
589 use std::sync::Mutex;
590
591 use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
592 use crate::lifecycle::application::route_definition::WhenStep;
593 use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
594 use camel_bean::BeanRegistry;
595 use camel_component_api::{
596 ComponentContext, NoOpComponentContext, RuntimeObservability,
597 test_support::NoopRuntimeObservability,
598 };
599
600 let handle: Arc<dyn StepLifecycle> = Arc::new(NamedLifecycle("branch"));
601
602 let mut reg = StepCompilerRegistry::new();
603 reg.register(Box::new(LifecycleInjectorCompiler {
604 handle: handle.clone(),
605 }));
606 reg.register(Box::new(super::control_flow::ControlFlowCompiler));
607
608 let pc = ProducerContext::default();
609 let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
610 let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
611 let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
612 let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
613 let staging = FunctionStagingMode::DirectAdd;
614 let idempotent_repositories = crate::IdempotentRegistry::new();
615 let claim_check_repositories = crate::ClaimCheckRegistry::new();
616
617 let ctx = CompilationContext {
618 producer_ctx: &pc,
619 rt,
620 languages: &languages,
621 beans: &beans,
622 function_invoker: None,
623 component_ctx,
624 route_id: None,
625 staging_mode: &staging,
626 idempotent_repositories: &idempotent_repositories,
627 claim_check_repositories: &claim_check_repositories,
628 };
629
630 let choice_step = BuilderStep::Choice {
632 whens: vec![
633 WhenStep {
634 predicate: Arc::new(|_| true),
635 steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
636 Box::pin(async move { Ok(ex) })
637 }))],
638 },
639 WhenStep {
640 predicate: Arc::new(|_| false),
641 steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
642 Box::pin(async move { Ok(ex) })
643 }))],
644 },
645 ],
646 otherwise: None,
647 };
648
649 let result = reg.compile_step(choice_step, 0, &ctx);
650 let compiled = result
651 .expect("compilation should succeed")
652 .expect("should match");
653
654 match compiled {
655 CompiledStep::Segment { lifecycle, .. } => {
656 let handles = lifecycle.expect("Segment should have lifecycle handles");
657 assert_eq!(
658 handles.len(),
659 2,
660 "expected 2 lifecycle handles from 2 branches"
661 );
662 for h in &handles {
663 assert_eq!(h.name(), "branch", "all handles should be 'branch'");
664 }
665 }
666 other => panic!("Expected CompiledStep::Segment, got {other:?}"),
667 }
668 }
669
670 #[tokio::test]
673 async fn compile_children_segments_nested_segment_flattening() {
674 use std::collections::HashMap;
675 use std::sync::Mutex;
676
677 use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
678 use camel_api::{BoxProcessor, BoxProcessorExt, StepLifecycle};
679 use camel_bean::BeanRegistry;
680 use camel_component_api::{
681 ComponentContext, NoOpComponentContext, RuntimeObservability,
682 test_support::NoopRuntimeObservability,
683 };
684
685 let inner_handle: Arc<dyn StepLifecycle> = Arc::new(NamedLifecycle("deep"));
686
687 let mut reg = StepCompilerRegistry::new();
688 reg.register(Box::new(LifecycleInjectorCompiler {
689 handle: inner_handle.clone(),
690 }));
691 reg.register(Box::new(super::control_flow::ControlFlowCompiler));
692
693 let pc = ProducerContext::default();
694 let rt: Arc<dyn RuntimeObservability> = Arc::new(NoopRuntimeObservability);
695 let languages: SharedLanguageRegistry = Arc::new(Mutex::new(HashMap::new()));
696 let beans: Arc<Mutex<BeanRegistry>> = Arc::new(Mutex::new(BeanRegistry::new()));
697 let component_ctx: Arc<dyn ComponentContext> = Arc::new(NoOpComponentContext);
698 let staging = FunctionStagingMode::DirectAdd;
699 let idempotent_repositories = crate::IdempotentRegistry::new();
700 let claim_check_repositories = crate::ClaimCheckRegistry::new();
701
702 let ctx = CompilationContext {
703 producer_ctx: &pc,
704 rt,
705 languages: &languages,
706 beans: &beans,
707 function_invoker: None,
708 component_ctx,
709 route_id: None,
710 staging_mode: &staging,
711 idempotent_repositories: &idempotent_repositories,
712 claim_check_repositories: &claim_check_repositories,
713 };
714
715 let inner_filter = BuilderStep::Filter {
719 predicate: Arc::new(|_| true),
720 steps: vec![BuilderStep::Processor(BoxProcessor::from_fn(|ex| {
721 Box::pin(async move { Ok(ex) })
722 }))],
723 };
724
725 let outer_filter = BuilderStep::Filter {
726 predicate: Arc::new(|_| true),
727 steps: vec![inner_filter],
728 };
729
730 let result = reg.compile_step(outer_filter, 0, &ctx);
731 let compiled = result
732 .expect("compilation should succeed")
733 .expect("should match");
734
735 match compiled {
736 CompiledStep::Segment { lifecycle, .. } => {
737 let handles = lifecycle.expect("outer Segment should have lifecycle handles");
738 assert_eq!(handles.len(), 1, "expected 1 innermost lifecycle handle");
739 assert_eq!(
740 handles[0].name(),
741 "deep",
742 "handle should be from innermost child"
743 );
744 }
745 other => panic!("Expected CompiledStep::Segment, got {other:?}"),
746 }
747 }
748}