camel_core/lifecycle/adapters/step_compilers/
mod.rs1use std::sync::Arc;
9
10use camel_api::{BodyType, BoxProcessor, CamelError, FunctionInvoker, ProducerContext};
11use camel_component_api::{ComponentContext, RuntimeObservability};
12use camel_endpoint::parse_uri;
13
14use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
15use crate::lifecycle::adapters::step_resolution::FunctionStagingMode;
16use crate::lifecycle::application::route_definition::BuilderStep;
17use camel_bean::BeanRegistry;
18
19mod control_flow;
20mod core;
21mod endpoints;
22mod error_handling;
23mod routing;
24mod splitting;
25mod transforms;
26
27#[derive(Debug, Clone)]
41pub enum CompiledStep {
42 Process {
43 processor: BoxProcessor,
44 body_contract: Option<BodyType>,
45 },
46 Stop,
49 Segment {
53 segment: camel_api::OutcomeSegment,
54 body_contract: Option<BodyType>,
55 },
56}
57
58pub(crate) enum StepCompileResult {
61 Matched(Result<CompiledStep, CamelError>),
62 NotHandled(BuilderStep),
63}
64
65pub(crate) trait StepCompiler: Send + Sync {
71 fn compile(
72 &self,
73 step: BuilderStep,
74 step_index: usize,
75 ctx: &CompilationContext,
76 registry: &StepCompilerRegistry,
77 ) -> StepCompileResult;
78}
79
80pub(crate) struct CompilationContext<'a> {
82 pub producer_ctx: &'a ProducerContext,
83 pub rt: Arc<dyn RuntimeObservability>,
84 pub languages: &'a SharedLanguageRegistry,
85 pub beans: &'a Arc<std::sync::Mutex<BeanRegistry>>,
86 pub function_invoker: Option<Arc<dyn FunctionInvoker>>,
87 pub component_ctx: Arc<dyn ComponentContext>,
88 pub route_id: Option<&'a str>,
89 pub staging_mode: &'a FunctionStagingMode,
90}
91
92impl<'a> CompilationContext<'a> {
93 pub fn compile_children(
96 &self,
97 steps: Vec<BuilderStep>,
98 registry: &StepCompilerRegistry,
99 ) -> Result<Vec<CompiledStep>, CamelError> {
100 registry.compile_steps(steps, self)
101 }
102
103 pub fn compile_children_segments(
113 &self,
114 steps: Vec<BuilderStep>,
115 registry: &StepCompilerRegistry,
116 ) -> Result<Vec<Box<dyn camel_api::OutcomePipeline>>, CamelError> {
117 let pairs = self.compile_children(steps, registry)?;
118 let segments: Vec<Box<dyn camel_api::OutcomePipeline>> = pairs
119 .into_iter()
120 .map(|c| match c {
121 CompiledStep::Process {
122 processor,
123 body_contract,
124 } => {
125 let inner: Box<dyn camel_api::OutcomePipeline> = Box::new(
126 crate::lifecycle::adapters::route_compiler::BoxProcessorSegment::new(
127 processor,
128 ),
129 );
130 match body_contract {
131 Some(contract) => Box::new(
132 crate::lifecycle::adapters::route_compiler::BodyCoercingSegment::new(
133 inner, contract,
134 ),
135 ),
136 None => inner,
137 }
138 }
139 CompiledStep::Stop => {
140 Box::new(crate::lifecycle::adapters::route_compiler::StopSegment)
141 as Box<dyn camel_api::OutcomePipeline>
142 }
143 CompiledStep::Segment { segment, .. } => Box::new(segment),
144 })
145 .collect();
146 Ok(segments)
147 }
148}
149
150pub(crate) struct StepCompilerRegistry {
153 compilers: Vec<Box<dyn StepCompiler>>,
154}
155
156impl StepCompilerRegistry {
157 pub fn new() -> Self {
158 Self {
159 compilers: Vec::new(),
160 }
161 }
162
163 pub fn register(&mut self, compiler: Box<dyn StepCompiler>) {
164 self.compilers.push(compiler);
165 }
166
167 pub fn compile_step(
170 &self,
171 step: BuilderStep,
172 step_index: usize,
173 ctx: &CompilationContext,
174 ) -> Option<Result<CompiledStep, CamelError>> {
175 let mut step = step;
176 for compiler in &self.compilers {
177 match compiler.compile(step, step_index, ctx, self) {
178 StepCompileResult::Matched(result) => return Some(result),
179 StepCompileResult::NotHandled(s) => step = s,
180 }
181 }
182 None
183 }
184
185 pub fn compile_steps(
187 &self,
188 steps: Vec<BuilderStep>,
189 ctx: &CompilationContext,
190 ) -> Result<Vec<CompiledStep>, CamelError> {
191 let mut out = Vec::with_capacity(steps.len());
192 for (i, step) in steps.into_iter().enumerate() {
193 match self.compile_step(step, i, ctx) {
194 Some(Ok(c)) => out.push(c),
195 Some(Err(e)) => return Err(e),
196 None => {
197 return Err(CamelError::RouteError(
198 "no compiler registered for step variant".into(),
199 ));
200 }
201 }
202 }
203 Ok(out)
204 }
205}
206
207pub(crate) fn resolve_producer(
210 ctx: &CompilationContext,
211 uri: &str,
212) -> Result<BoxProcessor, CamelError> {
213 let parsed = parse_uri(uri)?;
214 let component = ctx
215 .component_ctx
216 .resolve_component(&parsed.scheme)
217 .ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
218 let endpoint = component.create_endpoint(uri, ctx.component_ctx.as_ref())?;
219 endpoint.create_producer(Arc::clone(&ctx.rt), ctx.producer_ctx)
220}
221
222#[cfg(test)]
223mod segment_tests {
224 use super::*;
225 use camel_api::{Exchange, OutcomePipeline, PipelineOutcome};
226 use std::future::Future;
227 use std::pin::Pin;
228
229 #[derive(Clone)]
230 struct EchoSegment;
231
232 impl OutcomePipeline for EchoSegment {
233 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
234 Box::new(EchoSegment)
235 }
236 fn run<'a>(
237 &'a mut self,
238 exchange: Exchange,
239 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
240 Box::pin(async move { PipelineOutcome::Completed(exchange) })
241 }
242 }
243
244 #[test]
245 fn compiled_step_segment_clone_compiles() {
246 let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
247 let step = CompiledStep::Segment {
248 segment: seg,
249 body_contract: None,
250 };
251 let _cloned = step.clone();
252 if let CompiledStep::Segment { segment: _, .. } = _cloned {
253 } else {
255 panic!("clone should preserve variant");
256 }
257 }
258
259 #[test]
260 fn compiled_step_segment_debug_renders() {
261 let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
262 let step = CompiledStep::Segment {
263 segment: seg,
264 body_contract: None,
265 };
266 let s = format!("{:?}", step);
267 assert!(
268 s.contains("Segment"),
269 "debug should mention Segment variant: {s}"
270 );
271 }
272
273 #[test]
274 fn outcome_segment_satisfies_clone_send_static() {
275 fn assert_traits<T: Clone + Send + 'static>() {}
276 assert_traits::<camel_api::OutcomeSegment>();
277 }
278
279 #[tokio::test]
280 async fn outcome_segment_survives_arcswap_swap() {
281 use arc_swap::ArcSwap;
282 use camel_api::{Exchange, Message, OutcomePipeline, PipelineOutcome};
283 use std::sync::Arc;
284
285 #[derive(Clone)]
286 struct EchoSegment;
287 impl OutcomePipeline for EchoSegment {
288 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
289 Box::new(EchoSegment)
290 }
291 fn run<'a>(
292 &'a mut self,
293 ex: Exchange,
294 ) -> std::pin::Pin<Box<dyn std::future::Future<Output = PipelineOutcome> + Send + 'a>>
295 {
296 Box::pin(async move { PipelineOutcome::Completed(ex) })
297 }
298 }
299
300 let seg = camel_api::OutcomeSegment::new(Box::new(EchoSegment));
301 let slot: ArcSwap<Option<camel_api::OutcomeSegment>> = ArcSwap::from_pointee(None);
302 slot.store(Arc::new(Some(seg.clone())));
303 slot.store(Arc::new(Some(seg)));
304
305 let mut borrowed = slot.load().as_ref().clone().unwrap();
306 let outcome = borrowed.run(Exchange::new(Message::new("ping"))).await;
307 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
308 }
309}
310
311pub(crate) fn build_registry() -> StepCompilerRegistry {
313 let mut reg = StepCompilerRegistry::new();
314 reg.register(Box::new(core::CoreCompiler));
315 reg.register(Box::new(endpoints::EndpointsCompiler));
316 reg.register(Box::new(transforms::TransformsCompiler));
317 reg.register(Box::new(routing::RoutingCompiler));
318 reg.register(Box::new(control_flow::ControlFlowCompiler));
319 reg.register(Box::new(splitting::SplittingCompiler));
320 reg.register(Box::new(error_handling::ErrorHandlingCompiler));
321 reg
322}