1use std::collections::HashMap;
2use std::sync::Arc;
3use tokio_util::sync::CancellationToken;
4
5use camel_api::{
6 CamelError, FunctionInvoker, Lifecycle, MetricsCollector, NoOpMetrics, NoopPlatformService,
7 PlatformService, SupervisionConfig,
8};
9use camel_language_api::Language;
10
11use super::context::{CamelContext, FromParts};
12use crate::health_registry::HealthCheckRegistry;
13use crate::lifecycle::adapters::RuntimeExecutionAdapter;
14use crate::lifecycle::adapters::controller_actor::{
15 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
16};
17use crate::lifecycle::adapters::route_controller::{
18 DefaultRouteController, SharedLanguageRegistry,
19};
20use crate::lifecycle::application::runtime_bus::RuntimeBus;
21use crate::lifecycle::ports::RuntimeExecutionPort;
22use crate::shared::components::domain::Registry;
23use crate::template::TemplateRegistry;
24
25type ExecutionFactory =
26 Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
27
28pub struct CamelContextBuilder {
29 registry: Option<Arc<std::sync::Mutex<Registry>>>,
30 languages: Option<SharedLanguageRegistry>,
31 metrics: Option<Arc<dyn MetricsCollector>>,
32 platform_service: Option<Arc<dyn PlatformService>>,
34 supervision_config: Option<SupervisionConfig>,
35 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
36 shutdown_timeout: std::time::Duration,
37 beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
38 function_invoker: Option<Arc<dyn FunctionInvoker>>,
39 lifecycle_services: Vec<Box<dyn Lifecycle>>,
40 execution_factory: Option<ExecutionFactory>,
41 health_registry: Option<Arc<HealthCheckRegistry>>,
42 template_registry: Option<Arc<TemplateRegistry>>,
43}
44
45impl CamelContextBuilder {
46 pub fn new() -> Self {
47 Self {
48 registry: None,
49 languages: None,
50 metrics: None,
51 platform_service: None,
52 supervision_config: None,
53 runtime_store: None,
54 shutdown_timeout: std::time::Duration::from_secs(5),
55 beans: None,
56 function_invoker: None,
57 lifecycle_services: Vec::new(),
58 execution_factory: None,
59 health_registry: None,
60 template_registry: None,
61 }
62 }
63
64 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
65 self.registry = Some(registry);
66 self
67 }
68
69 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
70 self.languages = Some(languages);
71 self
72 }
73
74 pub fn with_execution_factory(
75 mut self,
76 factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
77 ) -> Self {
78 self.execution_factory = Some(Arc::new(factory));
79 self
80 }
81
82 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
83 self.metrics = Some(metrics);
84 self
85 }
86
87 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
89 self.platform_service = Some(platform_service);
90 self
91 }
92
93 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
94 self.supervision_config = Some(config);
95 self
96 }
97
98 pub fn runtime_store(
99 mut self,
100 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
101 ) -> Self {
102 self.runtime_store = Some(store);
103 self
104 }
105
106 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
107 self.shutdown_timeout = timeout;
108 self
109 }
110
111 pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
112 self.health_registry = Some(registry);
113 self
114 }
115
116 pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
118 self.beans = Some(beans);
119 self
120 }
121
122 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
128 if let Some(collector) = service.as_metrics_collector() {
129 self.metrics = Some(collector);
130 }
131 if let Some(invoker) = service.as_function_invoker() {
132 self.function_invoker = Some(invoker);
133 }
134 self.lifecycle_services.push(Box::new(service));
135 self
136 }
137
138 pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
142 self.template_registry = Some(registry);
143 self
144 }
145
146 fn built_in_languages() -> SharedLanguageRegistry {
147 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
148 languages.insert(
149 "simple".to_string(),
150 Arc::new(camel_language_simple::SimpleLanguage::new()),
151 );
152 #[cfg(feature = "lang-js")]
153 {
154 let js_lang = camel_language_js::JsLanguage::new();
155 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
156 languages.insert("javascript".to_string(), Arc::new(js_lang));
157 }
158 #[cfg(feature = "lang-rhai")]
159 {
160 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
161 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
162 }
163 #[cfg(feature = "lang-jsonpath")]
164 {
165 languages.insert(
166 "jsonpath".to_string(),
167 Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
168 );
169 }
170 #[cfg(feature = "lang-xpath")]
171 {
172 languages.insert(
173 "xpath".to_string(),
174 Arc::new(camel_language_xpath::XPathLanguage::new()),
175 );
176 }
177 Arc::new(std::sync::Mutex::new(languages))
178 }
179
180 fn build_runtime(
181 controller: RouteControllerHandle,
182 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
183 execution_factory: Option<ExecutionFactory>,
184 health_registry: Arc<HealthCheckRegistry>,
185 ) -> Arc<RuntimeBus> {
186 let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
187 factory(controller.clone())
188 } else {
189 Arc::new(RuntimeExecutionAdapter::new(controller))
190 };
191 Arc::new(
192 RuntimeBus::new(
193 Arc::new(store.clone()),
194 Arc::new(store.clone()),
195 Arc::new(store.clone()),
196 Arc::new(store.clone()),
197 )
198 .with_uow(Arc::new(store))
199 .with_execution(execution)
200 .with_health_registry(health_registry),
201 )
202 }
203
204 pub async fn build(self) -> Result<CamelContext, CamelError> {
205 let registry = self
206 .registry
207 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
208 let languages = self.languages.unwrap_or_else(Self::built_in_languages);
209 let simple_with_resolver: Arc<dyn Language> = Arc::new(
210 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
211 let languages = Arc::clone(&languages);
212 move |name| {
213 languages
214 .lock()
215 .ok()
216 .and_then(|registry| registry.get(name).cloned())
217 }
218 })),
219 );
220 languages
221 .lock()
222 .expect("mutex poisoned: another thread panicked while holding this lock") .insert("simple".to_string(), simple_with_resolver);
224 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
225 let platform_service = self
226 .platform_service
227 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
228 let health_registry = self.health_registry.unwrap_or_else(|| {
229 Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
230 });
231
232 let (controller, actor_join, supervision_join) =
233 if let Some(config) = self.supervision_config {
234 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
235 let mut controller_impl = if let Some(ref beans) = self.beans {
236 DefaultRouteController::with_languages_and_beans(
237 Arc::clone(®istry),
238 Arc::clone(&languages),
239 Arc::clone(&platform_service),
240 Arc::clone(beans),
241 )
242 } else {
243 DefaultRouteController::with_languages(
244 Arc::clone(®istry),
245 Arc::clone(&languages),
246 Arc::clone(&platform_service),
247 )
248 };
249 if let Some(invoker) = self.function_invoker.clone() {
250 controller_impl = controller_impl.with_function_invoker(invoker);
251 }
252 controller_impl.set_health_registry(Arc::clone(&health_registry));
253 controller_impl.set_crash_notifier(crash_tx);
254 let (controller, actor_join) = spawn_controller_actor(controller_impl);
255 let supervision_join = spawn_supervision_task(
256 controller.clone(),
257 config,
258 Some(Arc::clone(&metrics)),
259 crash_rx,
260 );
261 (controller, actor_join, Some(supervision_join))
262 } else {
263 let mut controller_impl = if let Some(ref beans) = self.beans {
264 DefaultRouteController::with_languages_and_beans(
265 Arc::clone(®istry),
266 Arc::clone(&languages),
267 Arc::clone(&platform_service),
268 Arc::clone(beans),
269 )
270 } else {
271 DefaultRouteController::with_languages(
272 Arc::clone(®istry),
273 Arc::clone(&languages),
274 Arc::clone(&platform_service),
275 )
276 };
277 if let Some(invoker) = self.function_invoker.clone() {
278 controller_impl = controller_impl.with_function_invoker(invoker);
279 }
280 controller_impl.set_health_registry(Arc::clone(&health_registry));
281 let (controller, actor_join) = spawn_controller_actor(controller_impl);
282 (controller, actor_join, None)
283 };
284
285 let store = self.runtime_store.unwrap_or_default();
286 let runtime = Self::build_runtime(
287 controller.clone(),
288 store,
289 self.execution_factory,
290 Arc::clone(&health_registry),
291 );
292 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
293 controller
294 .try_set_runtime_handle(runtime_handle)
295 .expect("controller actor mailbox should accept initial runtime handle"); let template_registry = self
298 .template_registry
299 .unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
300
301 Ok(CamelContext::from_parts(FromParts {
302 registry,
303 route_controller: controller,
304 _actor_join: actor_join,
305 supervision_join,
306 runtime,
307 cancel_token: CancellationToken::new(),
308 metrics,
309 platform_service,
310 languages,
311 shutdown_timeout: self.shutdown_timeout,
312 services: self.lifecycle_services,
313 health_registry,
314 component_configs: HashMap::new(),
315 function_invoker: self.function_invoker,
316 template_registry,
317 }))
318 }
319}
320
321impl Default for CamelContextBuilder {
322 fn default() -> Self {
323 Self::new()
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::context::FromParts;
331
332 #[test]
333 fn builder_default_has_sane_timeout() {
334 let builder = CamelContextBuilder::new();
335 assert_eq!(builder.shutdown_timeout, std::time::Duration::from_secs(5));
336 }
337}