camel_core/lifecycle/adapters/step_compilers/
mod.rs1use std::sync::Arc;
9
10use camel_api::{BodyType, BoxProcessor, CamelError, FunctionInvoker, ProducerContext};
11use camel_component_api::{ComponentContext, RuntimeObservability};
12use camel_endpoint::parse_uri;
13
14use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
15use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
16use crate::lifecycle::application::route_definition::BuilderStep;
17use camel_bean::BeanRegistry;
18
19mod control_flow;
20mod core;
21mod endpoints;
22mod error_handling;
23mod routing;
24mod splitting;
25mod transforms;
26
27#[derive(Debug, Clone)]
40pub enum CompiledStep {
41 Process {
42 processor: BoxProcessor,
43 body_contract: Option<BodyType>,
44 },
45 Stop,
48}
49
50pub(crate) enum StepCompileResult {
53 Matched(Result<CompiledStep, CamelError>),
54 NotHandled(BuilderStep),
55}
56
57pub(crate) trait StepCompiler: Send + Sync {
63 fn compile(
64 &self,
65 step: BuilderStep,
66 step_index: usize,
67 ctx: &CompilationContext,
68 registry: &StepCompilerRegistry,
69 ) -> StepCompileResult;
70}
71
72pub(crate) struct CompilationContext<'a> {
74 pub producer_ctx: &'a ProducerContext,
75 pub rt: Arc<dyn RuntimeObservability>,
76 pub languages: &'a SharedLanguageRegistry,
77 pub beans: &'a Arc<std::sync::Mutex<BeanRegistry>>,
78 pub function_invoker: Option<Arc<dyn FunctionInvoker>>,
79 pub component_ctx: Arc<dyn ComponentContext>,
80 pub route_id: Option<&'a str>,
81 pub staging_mode: &'a FunctionStagingMode,
82}
83
84impl<'a> CompilationContext<'a> {
85 pub fn compile_children(
88 &self,
89 steps: Vec<BuilderStep>,
90 registry: &StepCompilerRegistry,
91 ) -> Result<Vec<CompiledStep>, CamelError> {
92 registry.compile_steps(steps, self)
93 }
94}
95
96pub(crate) struct StepCompilerRegistry {
99 compilers: Vec<Box<dyn StepCompiler>>,
100}
101
102impl StepCompilerRegistry {
103 pub fn new() -> Self {
104 Self {
105 compilers: Vec::new(),
106 }
107 }
108
109 pub fn register(&mut self, compiler: Box<dyn StepCompiler>) {
110 self.compilers.push(compiler);
111 }
112
113 pub fn compile_step(
116 &self,
117 step: BuilderStep,
118 step_index: usize,
119 ctx: &CompilationContext,
120 ) -> Option<Result<CompiledStep, CamelError>> {
121 let mut step = step;
122 for compiler in &self.compilers {
123 match compiler.compile(step, step_index, ctx, self) {
124 StepCompileResult::Matched(result) => return Some(result),
125 StepCompileResult::NotHandled(s) => step = s,
126 }
127 }
128 None
129 }
130
131 pub fn compile_steps(
133 &self,
134 steps: Vec<BuilderStep>,
135 ctx: &CompilationContext,
136 ) -> Result<Vec<CompiledStep>, CamelError> {
137 let mut out = Vec::with_capacity(steps.len());
138 for (i, step) in steps.into_iter().enumerate() {
139 match self.compile_step(step, i, ctx) {
140 Some(Ok(c)) => out.push(c),
141 Some(Err(e)) => return Err(e),
142 None => {
143 return Err(CamelError::RouteError(
144 "no compiler registered for step variant".into(),
145 ));
146 }
147 }
148 }
149 Ok(out)
150 }
151}
152
153pub(crate) fn resolve_producer(
156 ctx: &CompilationContext,
157 uri: &str,
158) -> Result<BoxProcessor, CamelError> {
159 let parsed = parse_uri(uri)?;
160 let component = ctx
161 .component_ctx
162 .resolve_component(&parsed.scheme)
163 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
164 let endpoint = component.create_endpoint(uri, ctx.component_ctx.as_ref())?;
165 endpoint.create_producer(Arc::clone(&ctx.rt), ctx.producer_ctx)
166}
167
168pub(crate) fn build_registry() -> StepCompilerRegistry {
170 let mut reg = StepCompilerRegistry::new();
171 reg.register(Box::new(core::CoreCompiler));
172 reg.register(Box::new(endpoints::EndpointsCompiler));
173 reg.register(Box::new(transforms::TransformsCompiler));
174 reg.register(Box::new(routing::RoutingCompiler));
175 reg.register(Box::new(control_flow::ControlFlowCompiler));
176 reg.register(Box::new(splitting::SplittingCompiler));
177 reg.register(Box::new(error_handling::ErrorHandlingCompiler));
178 reg
179}