Skip to main content

camel_core/
context_builder.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio_util::sync::CancellationToken;
4
5use camel_api::{
6    CamelError, FunctionInvoker, Lifecycle, MetricsCollector, NoOpMetrics, NoopPlatformService,
7    PlatformService, SupervisionConfig,
8};
9use camel_language_api::Language;
10
11use super::context::{CamelContext, FromParts};
12use crate::claim_check::memory_repository::MemoryClaimCheckRepository;
13use crate::health_registry::HealthCheckRegistry;
14use crate::idempotent::memory_repository::MemoryIdempotentRepository;
15use crate::lifecycle::adapters::RuntimeExecutionAdapter;
16use crate::lifecycle::adapters::controller_actor::{
17    RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
18};
19use crate::lifecycle::adapters::route_controller::{
20    DefaultRouteController, SharedLanguageRegistry,
21};
22use crate::lifecycle::application::runtime_bus::RuntimeBus;
23use crate::lifecycle::ports::RuntimeExecutionPort;
24use crate::registry::{ClaimCheckRegistry, IdempotentRegistry};
25use crate::shared::components::domain::Registry;
26use crate::template::TemplateRegistry;
27
28type ExecutionFactory =
29    Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
30
31pub struct CamelContextBuilder {
32    registry: Option<Arc<std::sync::Mutex<Registry>>>,
33    languages: Option<SharedLanguageRegistry>,
34    metrics: Option<Arc<dyn MetricsCollector>>,
35    // Platform ports
36    platform_service: Option<Arc<dyn PlatformService>>,
37    supervision_config: Option<SupervisionConfig>,
38    runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
39    shutdown_timeout: std::time::Duration,
40    beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
41    function_invoker: Option<Arc<dyn FunctionInvoker>>,
42    lifecycle_services: Vec<Box<dyn Lifecycle>>,
43    execution_factory: Option<ExecutionFactory>,
44    health_registry: Option<Arc<HealthCheckRegistry>>,
45    template_registry: Option<Arc<TemplateRegistry>>,
46}
47
48impl CamelContextBuilder {
49    pub fn new() -> Self {
50        Self {
51            registry: None,
52            languages: None,
53            metrics: None,
54            platform_service: None,
55            supervision_config: None,
56            runtime_store: None,
57            shutdown_timeout: std::time::Duration::from_secs(5),
58            beans: None,
59            function_invoker: None,
60            lifecycle_services: Vec::new(),
61            execution_factory: None,
62            health_registry: None,
63            template_registry: None,
64        }
65    }
66
67    pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
68        self.registry = Some(registry);
69        self
70    }
71
72    pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
73        self.languages = Some(languages);
74        self
75    }
76
77    pub fn with_execution_factory(
78        mut self,
79        factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
80    ) -> Self {
81        self.execution_factory = Some(Arc::new(factory));
82        self
83    }
84
85    pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
86        self.metrics = Some(metrics);
87        self
88    }
89
90    /// Set a custom platform service.
91    pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
92        self.platform_service = Some(platform_service);
93        self
94    }
95
96    pub fn supervision(mut self, config: SupervisionConfig) -> Self {
97        self.supervision_config = Some(config);
98        self
99    }
100
101    pub fn runtime_store(
102        mut self,
103        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
104    ) -> Self {
105        self.runtime_store = Some(store);
106        self
107    }
108
109    pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
110        self.shutdown_timeout = timeout;
111        self
112    }
113
114    pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
115        self.health_registry = Some(registry);
116        self
117    }
118
119    /// Inject a shared `BeanRegistry` for bean resolution across routes.
120    pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
121        self.beans = Some(beans);
122        self
123    }
124
125    /// Register a lifecycle service (e.g., FunctionRuntimeService) at builder time.
126    ///
127    /// This is the recommended path for services that need to be wired into the
128    /// route controller before any routes are added. The function invoker (if any)
129    /// is extracted and passed to the `DefaultRouteController` during `build()`.
130    pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
131        if let Some(collector) = service.as_metrics_collector() {
132            self.metrics = Some(collector);
133        }
134        if let Some(invoker) = service.as_function_invoker() {
135            self.function_invoker = Some(invoker);
136        }
137        self.lifecycle_services.push(Box::new(service));
138        self
139    }
140
141    /// Set a custom `TemplateRegistry` for route template storage.
142    ///
143    /// If not provided, a default empty registry is created during `build()`.
144    pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
145        self.template_registry = Some(registry);
146        self
147    }
148
149    fn built_in_languages() -> SharedLanguageRegistry {
150        let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
151        languages.insert(
152            "simple".to_string(),
153            Arc::new(camel_language_simple::SimpleLanguage::new()),
154        );
155        #[cfg(feature = "lang-js")]
156        {
157            let js_lang = camel_language_js::JsLanguage::new();
158            languages.insert("js".to_string(), Arc::new(js_lang.clone()));
159            languages.insert("javascript".to_string(), Arc::new(js_lang));
160        }
161        #[cfg(feature = "lang-rhai")]
162        {
163            let rhai_lang = camel_language_rhai::RhaiLanguage::new();
164            languages.insert("rhai".to_string(), Arc::new(rhai_lang));
165        }
166        #[cfg(feature = "lang-jsonpath")]
167        {
168            languages.insert(
169                "jsonpath".to_string(),
170                Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
171            );
172        }
173        #[cfg(feature = "lang-xpath")]
174        {
175            languages.insert(
176                "xpath".to_string(),
177                Arc::new(camel_language_xpath::XPathLanguage::new()),
178            );
179        }
180        Arc::new(std::sync::Mutex::new(languages))
181    }
182
183    fn build_runtime(
184        controller: RouteControllerHandle,
185        store: crate::lifecycle::adapters::InMemoryRuntimeStore,
186        execution_factory: Option<ExecutionFactory>,
187        health_registry: Arc<HealthCheckRegistry>,
188    ) -> Arc<RuntimeBus> {
189        let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
190            factory(controller.clone())
191        } else {
192            Arc::new(RuntimeExecutionAdapter::new(controller))
193        };
194        Arc::new(
195            RuntimeBus::new(
196                Arc::new(store.clone()),
197                Arc::new(store.clone()),
198                Arc::new(store.clone()),
199                Arc::new(store.clone()),
200            )
201            .with_uow(Arc::new(store))
202            .with_execution(execution)
203            .with_health_registry(health_registry),
204        )
205    }
206
207    pub async fn build(self) -> Result<CamelContext, CamelError> {
208        let registry = self
209            .registry
210            .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
211        let languages = self.languages.unwrap_or_else(Self::built_in_languages);
212        let simple_with_resolver: Arc<dyn Language> = Arc::new(
213            camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
214                let languages = Arc::clone(&languages);
215                move |name| {
216                    languages
217                        .lock()
218                        .ok()
219                        .and_then(|registry| registry.get(name).cloned())
220                }
221            })),
222        );
223        languages
224            .lock()
225            .expect("mutex poisoned: another thread panicked while holding this lock") // allow-unwrap
226            .insert("simple".to_string(), simple_with_resolver);
227        let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
228        let platform_service = self
229            .platform_service
230            .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
231        let health_registry = self.health_registry.unwrap_or_else(|| {
232            Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
233        });
234
235        // Default idempotent repository registry with a built-in memory repo.
236        // Built BEFORE the controller so the same Arc can be shared between
237        // CamelContext (user-facing register API) and DefaultRouteController
238        // (compile-time repository-name resolution for the idempotent_consumer step).
239        let idempotent_repositories: crate::registry::SharedIdempotentRegistry = {
240            let reg = Arc::new(IdempotentRegistry::new());
241            let memory = Arc::new(MemoryIdempotentRepository::new("memory"));
242            // If registration fails (e.g. someone already registered "memory"),
243            // it's a programming error — unwrap is safe.
244            reg.register("memory", memory)
245                .expect("built-in memory idempotent repository registration must succeed"); // allow-unwrap
246            reg
247        };
248
249        // Default claim check repository registry with a built-in memory repo.
250        let claim_check_repositories: crate::registry::SharedClaimCheckRegistry = {
251            let reg = Arc::new(ClaimCheckRegistry::new());
252            let memory = Arc::new(MemoryClaimCheckRepository::new("memory"));
253            reg.register("memory", memory)
254                .expect("built-in memory claim check repository registration must succeed"); // allow-unwrap
255            reg
256        };
257
258        let (controller, actor_join, supervision_join) =
259            if let Some(config) = self.supervision_config {
260                let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
261                let mut controller_impl = if let Some(ref beans) = self.beans {
262                    DefaultRouteController::with_languages_and_beans(
263                        Arc::clone(&registry),
264                        Arc::clone(&languages),
265                        Arc::clone(&platform_service),
266                        Arc::clone(beans),
267                    )
268                } else {
269                    DefaultRouteController::with_languages(
270                        Arc::clone(&registry),
271                        Arc::clone(&languages),
272                        Arc::clone(&platform_service),
273                    )
274                };
275                if let Some(invoker) = self.function_invoker.clone() {
276                    controller_impl = controller_impl.with_function_invoker(invoker);
277                }
278                controller_impl.set_idempotent_repositories(Arc::clone(&idempotent_repositories));
279                controller_impl.set_claim_check_repositories(Arc::clone(&claim_check_repositories));
280                controller_impl.set_health_registry(Arc::clone(&health_registry));
281                controller_impl.set_crash_notifier(crash_tx);
282                let (controller, actor_join) = spawn_controller_actor(controller_impl);
283                let supervision_join = spawn_supervision_task(
284                    controller.clone(),
285                    config,
286                    Some(Arc::clone(&metrics)),
287                    crash_rx,
288                );
289                (controller, actor_join, Some(supervision_join))
290            } else {
291                let mut controller_impl = if let Some(ref beans) = self.beans {
292                    DefaultRouteController::with_languages_and_beans(
293                        Arc::clone(&registry),
294                        Arc::clone(&languages),
295                        Arc::clone(&platform_service),
296                        Arc::clone(beans),
297                    )
298                } else {
299                    DefaultRouteController::with_languages(
300                        Arc::clone(&registry),
301                        Arc::clone(&languages),
302                        Arc::clone(&platform_service),
303                    )
304                };
305                if let Some(invoker) = self.function_invoker.clone() {
306                    controller_impl = controller_impl.with_function_invoker(invoker);
307                }
308                controller_impl.set_idempotent_repositories(Arc::clone(&idempotent_repositories));
309                controller_impl.set_claim_check_repositories(Arc::clone(&claim_check_repositories));
310                controller_impl.set_health_registry(Arc::clone(&health_registry));
311                let (controller, actor_join) = spawn_controller_actor(controller_impl);
312                (controller, actor_join, None)
313            };
314
315        let store = self.runtime_store.unwrap_or_default();
316        let runtime = Self::build_runtime(
317            controller.clone(),
318            store,
319            self.execution_factory,
320            Arc::clone(&health_registry),
321        );
322        let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
323        controller
324            .try_set_runtime_handle(runtime_handle)
325            .expect("controller actor mailbox should accept initial runtime handle"); // allow-unwrap
326
327        let template_registry = self
328            .template_registry
329            .unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
330
331        Ok(CamelContext::from_parts(FromParts {
332            registry,
333            route_controller: controller,
334            _actor_join: actor_join,
335            supervision_join,
336            runtime,
337            cancel_token: CancellationToken::new(),
338            metrics,
339            platform_service,
340            languages,
341            shutdown_timeout: self.shutdown_timeout,
342            services: self.lifecycle_services,
343            health_registry,
344            component_configs: HashMap::new(),
345            function_invoker: self.function_invoker,
346            template_registry,
347            idempotent_repositories,
348            claim_check_repositories,
349        }))
350    }
351}
352
353impl Default for CamelContextBuilder {
354    fn default() -> Self {
355        Self::new()
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn builder_default_has_sane_timeout() {
365        let builder = CamelContextBuilder::new();
366        assert_eq!(builder.shutdown_timeout, std::time::Duration::from_secs(5));
367    }
368
369    #[tokio::test]
370    async fn builder_registers_default_memory_idempotent_repository() {
371        let ctx = CamelContext::builder()
372            .build()
373            .await
374            .expect("build context");
375        let repo = ctx.idempotent_repository("memory");
376        assert!(
377            repo.is_some(),
378            "default 'memory' idempotent repository should be registered"
379        );
380    }
381}