1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio_util::sync::CancellationToken;
4
5use camel_api::{
6 CamelError, FunctionInvoker, Lifecycle, MetricsCollector, NoOpMetrics, NoopPlatformService,
7 PlatformService, SupervisionConfig,
8};
9use camel_language_api::Language;
10
11use super::context::{CamelContext, FromParts};
12use crate::claim_check::memory_repository::MemoryClaimCheckRepository;
13use crate::health_registry::HealthCheckRegistry;
14use crate::idempotent::memory_repository::MemoryIdempotentRepository;
15use crate::lifecycle::adapters::RuntimeExecutionAdapter;
16use crate::lifecycle::adapters::controller_actor::{
17 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
18};
19use crate::lifecycle::adapters::route_controller::{
20 DefaultRouteController, SharedLanguageRegistry,
21};
22use crate::lifecycle::application::runtime_bus::RuntimeBus;
23use crate::lifecycle::ports::RuntimeExecutionPort;
24use crate::registry::{ClaimCheckRegistry, IdempotentRegistry};
25use crate::shared::components::domain::Registry;
26use crate::template::TemplateRegistry;
27
28type ExecutionFactory =
29 Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
30
31pub struct CamelContextBuilder {
32 registry: Option<Arc<std::sync::Mutex<Registry>>>,
33 languages: Option<SharedLanguageRegistry>,
34 metrics: Option<Arc<dyn MetricsCollector>>,
35 platform_service: Option<Arc<dyn PlatformService>>,
37 supervision_config: Option<SupervisionConfig>,
38 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
39 shutdown_timeout: std::time::Duration,
40 beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
41 function_invoker: Option<Arc<dyn FunctionInvoker>>,
42 lifecycle_services: Vec<Box<dyn Lifecycle>>,
43 execution_factory: Option<ExecutionFactory>,
44 health_registry: Option<Arc<HealthCheckRegistry>>,
45 template_registry: Option<Arc<TemplateRegistry>>,
46}
47
48impl CamelContextBuilder {
49 pub fn new() -> Self {
50 Self {
51 registry: None,
52 languages: None,
53 metrics: None,
54 platform_service: None,
55 supervision_config: None,
56 runtime_store: None,
57 shutdown_timeout: std::time::Duration::from_secs(5),
58 beans: None,
59 function_invoker: None,
60 lifecycle_services: Vec::new(),
61 execution_factory: None,
62 health_registry: None,
63 template_registry: None,
64 }
65 }
66
67 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
68 self.registry = Some(registry);
69 self
70 }
71
72 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
73 self.languages = Some(languages);
74 self
75 }
76
77 pub fn with_execution_factory(
78 mut self,
79 factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
80 ) -> Self {
81 self.execution_factory = Some(Arc::new(factory));
82 self
83 }
84
85 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
86 self.metrics = Some(metrics);
87 self
88 }
89
90 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
92 self.platform_service = Some(platform_service);
93 self
94 }
95
96 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
97 self.supervision_config = Some(config);
98 self
99 }
100
101 pub fn runtime_store(
102 mut self,
103 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
104 ) -> Self {
105 self.runtime_store = Some(store);
106 self
107 }
108
109 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
110 self.shutdown_timeout = timeout;
111 self
112 }
113
114 pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
115 self.health_registry = Some(registry);
116 self
117 }
118
119 pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
121 self.beans = Some(beans);
122 self
123 }
124
125 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
131 if let Some(collector) = service.as_metrics_collector() {
132 self.metrics = Some(collector);
133 }
134 if let Some(invoker) = service.as_function_invoker() {
135 self.function_invoker = Some(invoker);
136 }
137 self.lifecycle_services.push(Box::new(service));
138 self
139 }
140
141 pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
145 self.template_registry = Some(registry);
146 self
147 }
148
149 fn built_in_languages() -> SharedLanguageRegistry {
150 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
151 languages.insert(
152 "simple".to_string(),
153 Arc::new(camel_language_simple::SimpleLanguage::new()),
154 );
155 #[cfg(feature = "lang-js")]
156 {
157 let js_lang = camel_language_js::JsLanguage::new();
158 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
159 languages.insert("javascript".to_string(), Arc::new(js_lang));
160 }
161 #[cfg(feature = "lang-rhai")]
162 {
163 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
164 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
165 }
166 #[cfg(feature = "lang-jsonpath")]
167 {
168 languages.insert(
169 "jsonpath".to_string(),
170 Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
171 );
172 }
173 #[cfg(feature = "lang-xpath")]
174 {
175 languages.insert(
176 "xpath".to_string(),
177 Arc::new(camel_language_xpath::XPathLanguage::new()),
178 );
179 }
180 Arc::new(std::sync::Mutex::new(languages))
181 }
182
183 fn build_runtime(
184 controller: RouteControllerHandle,
185 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
186 execution_factory: Option<ExecutionFactory>,
187 health_registry: Arc<HealthCheckRegistry>,
188 ) -> Arc<RuntimeBus> {
189 let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
190 factory(controller.clone())
191 } else {
192 Arc::new(RuntimeExecutionAdapter::new(controller))
193 };
194 Arc::new(
195 RuntimeBus::new(
196 Arc::new(store.clone()),
197 Arc::new(store.clone()),
198 Arc::new(store.clone()),
199 Arc::new(store.clone()),
200 )
201 .with_uow(Arc::new(store))
202 .with_execution(execution)
203 .with_health_registry(health_registry),
204 )
205 }
206
207 pub async fn build(self) -> Result<CamelContext, CamelError> {
208 let registry = self
209 .registry
210 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
211 let languages = self.languages.unwrap_or_else(Self::built_in_languages);
212 let simple_with_resolver: Arc<dyn Language> = Arc::new(
213 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
214 let languages = Arc::clone(&languages);
215 move |name| {
216 languages
217 .lock()
218 .ok()
219 .and_then(|registry| registry.get(name).cloned())
220 }
221 })),
222 );
223 languages
224 .lock()
225 .expect("mutex poisoned: another thread panicked while holding this lock") .insert("simple".to_string(), simple_with_resolver);
227 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
228 let platform_service = self
229 .platform_service
230 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
231 let health_registry = self.health_registry.unwrap_or_else(|| {
232 Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
233 });
234
235 let idempotent_repositories: crate::registry::SharedIdempotentRegistry = {
240 let reg = Arc::new(IdempotentRegistry::new());
241 let memory = Arc::new(MemoryIdempotentRepository::new("memory"));
242 reg.register("memory", memory)
245 .expect("built-in memory idempotent repository registration must succeed"); reg
247 };
248
249 let claim_check_repositories: crate::registry::SharedClaimCheckRegistry = {
251 let reg = Arc::new(ClaimCheckRegistry::new());
252 let memory = Arc::new(MemoryClaimCheckRepository::new("memory"));
253 reg.register("memory", memory)
254 .expect("built-in memory claim check repository registration must succeed"); reg
256 };
257
258 let (controller, actor_join, supervision_join) =
259 if let Some(config) = self.supervision_config {
260 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
261 let mut controller_impl = if let Some(ref beans) = self.beans {
262 DefaultRouteController::with_languages_and_beans(
263 Arc::clone(®istry),
264 Arc::clone(&languages),
265 Arc::clone(&platform_service),
266 Arc::clone(beans),
267 )
268 } else {
269 DefaultRouteController::with_languages(
270 Arc::clone(®istry),
271 Arc::clone(&languages),
272 Arc::clone(&platform_service),
273 )
274 };
275 if let Some(invoker) = self.function_invoker.clone() {
276 controller_impl = controller_impl.with_function_invoker(invoker);
277 }
278 controller_impl.set_idempotent_repositories(Arc::clone(&idempotent_repositories));
279 controller_impl.set_claim_check_repositories(Arc::clone(&claim_check_repositories));
280 controller_impl.set_health_registry(Arc::clone(&health_registry));
281 controller_impl.set_crash_notifier(crash_tx);
282 let (controller, actor_join) = spawn_controller_actor(controller_impl);
283 let supervision_join = spawn_supervision_task(
284 controller.clone(),
285 config,
286 Some(Arc::clone(&metrics)),
287 crash_rx,
288 );
289 (controller, actor_join, Some(supervision_join))
290 } else {
291 let mut controller_impl = if let Some(ref beans) = self.beans {
292 DefaultRouteController::with_languages_and_beans(
293 Arc::clone(®istry),
294 Arc::clone(&languages),
295 Arc::clone(&platform_service),
296 Arc::clone(beans),
297 )
298 } else {
299 DefaultRouteController::with_languages(
300 Arc::clone(®istry),
301 Arc::clone(&languages),
302 Arc::clone(&platform_service),
303 )
304 };
305 if let Some(invoker) = self.function_invoker.clone() {
306 controller_impl = controller_impl.with_function_invoker(invoker);
307 }
308 controller_impl.set_idempotent_repositories(Arc::clone(&idempotent_repositories));
309 controller_impl.set_claim_check_repositories(Arc::clone(&claim_check_repositories));
310 controller_impl.set_health_registry(Arc::clone(&health_registry));
311 let (controller, actor_join) = spawn_controller_actor(controller_impl);
312 (controller, actor_join, None)
313 };
314
315 let store = self.runtime_store.unwrap_or_default();
316 let runtime = Self::build_runtime(
317 controller.clone(),
318 store,
319 self.execution_factory,
320 Arc::clone(&health_registry),
321 );
322 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
323 controller
324 .try_set_runtime_handle(runtime_handle)
325 .expect("controller actor mailbox should accept initial runtime handle"); let template_registry = self
328 .template_registry
329 .unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
330
331 Ok(CamelContext::from_parts(FromParts {
332 registry,
333 route_controller: controller,
334 _actor_join: actor_join,
335 supervision_join,
336 runtime,
337 cancel_token: CancellationToken::new(),
338 metrics,
339 platform_service,
340 languages,
341 shutdown_timeout: self.shutdown_timeout,
342 services: self.lifecycle_services,
343 health_registry,
344 component_configs: HashMap::new(),
345 function_invoker: self.function_invoker,
346 template_registry,
347 idempotent_repositories,
348 claim_check_repositories,
349 }))
350 }
351}
352
353impl Default for CamelContextBuilder {
354 fn default() -> Self {
355 Self::new()
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn builder_default_has_sane_timeout() {
365 let builder = CamelContextBuilder::new();
366 assert_eq!(builder.shutdown_timeout, std::time::Duration::from_secs(5));
367 }
368
369 #[tokio::test]
370 async fn builder_registers_default_memory_idempotent_repository() {
371 let ctx = CamelContext::builder()
372 .build()
373 .await
374 .expect("build context");
375 let repo = ctx.idempotent_repository("memory");
376 assert!(
377 repo.is_some(),
378 "default 'memory' idempotent repository should be registered"
379 );
380 }
381}