Skip to main content

camel_core/
context_builder.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio_util::sync::CancellationToken;
4
5use camel_api::{
6    CamelError, FunctionInvoker, Lifecycle, MetricsCollector, NoOpMetrics, NoopPlatformService,
7    PlatformService, SupervisionConfig,
8};
9use camel_language_api::Language;
10
11use super::context::{CamelContext, FromParts};
12use crate::health_registry::HealthCheckRegistry;
13use crate::lifecycle::adapters::RuntimeExecutionAdapter;
14use crate::lifecycle::adapters::controller_actor::{
15    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
16};
17use crate::lifecycle::adapters::route_controller::{
18    DefaultRouteController, SharedLanguageRegistry,
19};
20use crate::lifecycle::application::runtime_bus::RuntimeBus;
21use crate::lifecycle::ports::RuntimeExecutionPort;
22use crate::shared::components::domain::Registry;
23use crate::template::TemplateRegistry;
24
25type ExecutionFactory =
26    Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
27
28pub struct CamelContextBuilder {
29    registry: Option<Arc<std::sync::Mutex<Registry>>>,
30    languages: Option<SharedLanguageRegistry>,
31    metrics: Option<Arc<dyn MetricsCollector>>,
32    // Platform ports
33    platform_service: Option<Arc<dyn PlatformService>>,
34    supervision_config: Option<SupervisionConfig>,
35    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
36    shutdown_timeout: std::time::Duration,
37    beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
38    function_invoker: Option<Arc<dyn FunctionInvoker>>,
39    lifecycle_services: Vec<Box<dyn Lifecycle>>,
40    execution_factory: Option<ExecutionFactory>,
41    health_registry: Option<Arc<HealthCheckRegistry>>,
42    template_registry: Option<Arc<TemplateRegistry>>,
43}
44
45impl CamelContextBuilder {
46    pub fn new() -> Self {
47        Self {
48            registry: None,
49            languages: None,
50            metrics: None,
51            platform_service: None,
52            supervision_config: None,
53            runtime_store: None,
54            shutdown_timeout: std::time::Duration::from_secs(5),
55            beans: None,
56            function_invoker: None,
57            lifecycle_services: Vec::new(),
58            execution_factory: None,
59            health_registry: None,
60            template_registry: None,
61        }
62    }
63
64    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
65        self.registry = Some(registry);
66        self
67    }
68
69    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
70        self.languages = Some(languages);
71        self
72    }
73
74    pub fn with_execution_factory(
75        mut self,
76        factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
77    ) -> Self {
78        self.execution_factory = Some(Arc::new(factory));
79        self
80    }
81
82    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
83        self.metrics = Some(metrics);
84        self
85    }
86
87    /// Set a custom platform service.
88    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
89        self.platform_service = Some(platform_service);
90        self
91    }
92
93    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
94        self.supervision_config = Some(config);
95        self
96    }
97
98    pub fn runtime_store(
99        mut self,
100        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
101    ) -> Self {
102        self.runtime_store = Some(store);
103        self
104    }
105
106    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
107        self.shutdown_timeout = timeout;
108        self
109    }
110
111    pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
112        self.health_registry = Some(registry);
113        self
114    }
115
116    /// Inject a shared `BeanRegistry` for bean resolution across routes.
117    pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
118        self.beans = Some(beans);
119        self
120    }
121
122    /// Register a lifecycle service (e.g., FunctionRuntimeService) at builder time.
123    ///
124    /// This is the recommended path for services that need to be wired into the
125    /// route controller before any routes are added. The function invoker (if any)
126    /// is extracted and passed to the `DefaultRouteController` during `build()`.
127    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
128        if let Some(collector) = service.as_metrics_collector() {
129            self.metrics = Some(collector);
130        }
131        if let Some(invoker) = service.as_function_invoker() {
132            self.function_invoker = Some(invoker);
133        }
134        self.lifecycle_services.push(Box::new(service));
135        self
136    }
137
138    /// Set a custom `TemplateRegistry` for route template storage.
139    ///
140    /// If not provided, a default empty registry is created during `build()`.
141    pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
142        self.template_registry = Some(registry);
143        self
144    }
145
146    fn built_in_languages() -> SharedLanguageRegistry {
147        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
148        languages.insert(
149            "simple".to_string(),
150            Arc::new(camel_language_simple::SimpleLanguage::new()),
151        );
152        #[cfg(feature = "lang-js")]
153        {
154            let js_lang = camel_language_js::JsLanguage::new();
155            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
156            languages.insert("javascript".to_string(), Arc::new(js_lang));
157        }
158        #[cfg(feature = "lang-rhai")]
159        {
160            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
161            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
162        }
163        #[cfg(feature = "lang-jsonpath")]
164        {
165            languages.insert(
166                "jsonpath".to_string(),
167                Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
168            );
169        }
170        #[cfg(feature = "lang-xpath")]
171        {
172            languages.insert(
173                "xpath".to_string(),
174                Arc::new(camel_language_xpath::XPathLanguage::new()),
175            );
176        }
177        Arc::new(std::sync::Mutex::new(languages))
178    }
179
180    fn build_runtime(
181        controller: RouteControllerHandle,
182        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
183        execution_factory: Option<ExecutionFactory>,
184        health_registry: Arc<HealthCheckRegistry>,
185    ) -> Arc<RuntimeBus> {
186        let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
187            factory(controller.clone())
188        } else {
189            Arc::new(RuntimeExecutionAdapter::new(controller))
190        };
191        Arc::new(
192            RuntimeBus::new(
193                Arc::new(store.clone()),
194                Arc::new(store.clone()),
195                Arc::new(store.clone()),
196                Arc::new(store.clone()),
197            )
198            .with_uow(Arc::new(store))
199            .with_execution(execution)
200            .with_health_registry(health_registry),
201        )
202    }
203
204    pub async fn build(self) -> Result<CamelContext, CamelError> {
205        let registry = self
206            .registry
207            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
208        let languages = self.languages.unwrap_or_else(Self::built_in_languages);
209        let simple_with_resolver: Arc<dyn Language> = Arc::new(
210            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
211                let languages = Arc::clone(&languages);
212                move |name| {
213                    languages
214                        .lock()
215                        .ok()
216                        .and_then(|registry| registry.get(name).cloned())
217                }
218            })),
219        );
220        languages
221            .lock()
222            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
223            .insert("simple".to_string(), simple_with_resolver);
224        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
225        let platform_service = self
226            .platform_service
227            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
228        let health_registry = self.health_registry.unwrap_or_else(|| {
229            Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
230        });
231
232        let (controller, actor_join, supervision_join) =
233            if let Some(config) = self.supervision_config {
234                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
235                let mut controller_impl = if let Some(ref beans) = self.beans {
236                    DefaultRouteController::with_languages_and_beans(
237                        Arc::clone(&registry),
238                        Arc::clone(&languages),
239                        Arc::clone(&platform_service),
240                        Arc::clone(beans),
241                    )
242                } else {
243                    DefaultRouteController::with_languages(
244                        Arc::clone(&registry),
245                        Arc::clone(&languages),
246                        Arc::clone(&platform_service),
247                    )
248                };
249                if let Some(invoker) = self.function_invoker.clone() {
250                    controller_impl = controller_impl.with_function_invoker(invoker);
251                }
252                controller_impl.set_health_registry(Arc::clone(&health_registry));
253                controller_impl.set_crash_notifier(crash_tx);
254                let (controller, actor_join) = spawn_controller_actor(controller_impl);
255                let supervision_join = spawn_supervision_task(
256                    controller.clone(),
257                    config,
258                    Some(Arc::clone(&metrics)),
259                    crash_rx,
260                );
261                (controller, actor_join, Some(supervision_join))
262            } else {
263                let mut controller_impl = if let Some(ref beans) = self.beans {
264                    DefaultRouteController::with_languages_and_beans(
265                        Arc::clone(&registry),
266                        Arc::clone(&languages),
267                        Arc::clone(&platform_service),
268                        Arc::clone(beans),
269                    )
270                } else {
271                    DefaultRouteController::with_languages(
272                        Arc::clone(&registry),
273                        Arc::clone(&languages),
274                        Arc::clone(&platform_service),
275                    )
276                };
277                if let Some(invoker) = self.function_invoker.clone() {
278                    controller_impl = controller_impl.with_function_invoker(invoker);
279                }
280                controller_impl.set_health_registry(Arc::clone(&health_registry));
281                let (controller, actor_join) = spawn_controller_actor(controller_impl);
282                (controller, actor_join, None)
283            };
284
285        let store = self.runtime_store.unwrap_or_default();
286        let runtime = Self::build_runtime(
287            controller.clone(),
288            store,
289            self.execution_factory,
290            Arc::clone(&health_registry),
291        );
292        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
293        controller
294            .try_set_runtime_handle(runtime_handle)
295            .expect("controller actor mailbox should accept initial runtime handle"); // allow-unwrap
296
297        let template_registry = self
298            .template_registry
299            .unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
300
301        Ok(CamelContext::from_parts(FromParts {
302            registry,
303            route_controller: controller,
304            _actor_join: actor_join,
305            supervision_join,
306            runtime,
307            cancel_token: CancellationToken::new(),
308            metrics,
309            platform_service,
310            languages,
311            shutdown_timeout: self.shutdown_timeout,
312            services: self.lifecycle_services,
313            health_registry,
314            component_configs: HashMap::new(),
315            function_invoker: self.function_invoker,
316            template_registry,
317        }))
318    }
319}
320
321impl Default for CamelContextBuilder {
322    fn default() -> Self {
323        Self::new()
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use crate::context::FromParts;
331
332    #[test]
333    fn builder_default_has_sane_timeout() {
334        let builder = CamelContextBuilder::new();
335        assert_eq!(builder.shutdown_timeout, std::time::Duration::from_secs(5));
336    }
337}