1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, NoOpMetrics,
12 NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RouteTemplateSpec,
13 RuntimeCommandBus, RuntimeQueryBus, SupervisionConfig, TemplateInstanceRecord,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::health_registry::HealthCheckRegistry;
19use crate::lifecycle::adapters::RuntimeExecutionAdapter;
20use crate::lifecycle::adapters::controller_actor::{
21 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
22};
23use crate::lifecycle::adapters::route_controller::{
24 DefaultRouteController, SharedLanguageRegistry,
25};
26use crate::lifecycle::application::route_definition::RouteDefinition;
27use crate::lifecycle::application::runtime_bus::RuntimeBus;
28use crate::lifecycle::domain::LanguageRegistryError;
29use crate::lifecycle::ports::RuntimeExecutionPort;
30use crate::shared::components::domain::Registry;
31use crate::shared::observability::domain::TracerConfig;
32use crate::template::TemplateRegistry;
33
34static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
35
36type ExecutionFactory =
37 Arc<dyn Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync>;
38
39pub struct CamelContextBuilder {
40 registry: Option<Arc<std::sync::Mutex<Registry>>>,
41 languages: Option<SharedLanguageRegistry>,
42 metrics: Option<Arc<dyn MetricsCollector>>,
43 platform_service: Option<Arc<dyn PlatformService>>,
45 supervision_config: Option<SupervisionConfig>,
46 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
47 shutdown_timeout: std::time::Duration,
48 beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
49 function_invoker: Option<Arc<dyn FunctionInvoker>>,
50 lifecycle_services: Vec<Box<dyn Lifecycle>>,
51 execution_factory: Option<ExecutionFactory>,
52 health_registry: Option<Arc<HealthCheckRegistry>>,
53 template_registry: Option<Arc<TemplateRegistry>>,
54}
55
56pub struct CamelContext {
64 registry: Arc<std::sync::Mutex<Registry>>,
65 route_controller: RouteControllerHandle,
66 _actor_join: tokio::task::JoinHandle<()>,
67 supervision_join: Option<tokio::task::JoinHandle<()>>,
68 runtime: Arc<RuntimeBus>,
69 cancel_token: CancellationToken,
70 metrics: Arc<dyn MetricsCollector>,
71 platform_service: Arc<dyn PlatformService>,
73 languages: SharedLanguageRegistry,
74 shutdown_timeout: std::time::Duration,
75 services: Vec<Box<dyn Lifecycle>>,
76 health_registry: Arc<HealthCheckRegistry>,
77 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
78 function_invoker: Option<Arc<dyn FunctionInvoker>>,
79 template_registry: Arc<TemplateRegistry>,
80}
81
82#[derive(Clone)]
86pub struct RuntimeExecutionHandle {
87 controller: RouteControllerHandle,
88 runtime: Arc<RuntimeBus>,
89 function_invoker: Option<Arc<dyn FunctionInvoker>>,
90}
91
92impl RuntimeExecutionHandle {
93 pub(crate) async fn add_route_definition(
94 &self,
95 definition: RouteDefinition,
96 ) -> Result<(), CamelError> {
97 use crate::lifecycle::ports::RouteRegistrationPort;
98 self.runtime
99 .register_route(definition)
100 .await
101 .map_err(Into::into)
102 }
103
104 pub(crate) async fn compile_route_definition(
105 &self,
106 definition: RouteDefinition,
107 ) -> Result<camel_api::BoxProcessor, CamelError> {
108 self.controller.compile_route_definition(definition).await
109 }
110
111 pub(crate) async fn compile_route_definition_with_generation(
112 &self,
113 definition: RouteDefinition,
114 generation: u64,
115 ) -> Result<camel_api::BoxProcessor, CamelError> {
116 self.controller
117 .compile_route_definition_with_generation(definition, generation)
118 .await
119 }
120
121 pub(crate) async fn prepare_route_definition_with_generation(
122 &self,
123 definition: RouteDefinition,
124 generation: u64,
125 ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
126 self.controller
127 .prepare_route_definition_with_generation(definition, generation)
128 .await
129 }
130
131 pub(crate) async fn insert_prepared_route(
132 &self,
133 prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
134 ) -> Result<(), CamelError> {
135 self.controller.insert_prepared_route(prepared).await
136 }
137
138 pub(crate) async fn remove_route_preserving_functions(
139 &self,
140 route_id: String,
141 ) -> Result<(), CamelError> {
142 self.controller
143 .remove_route_preserving_functions(route_id)
144 .await
145 }
146
147 pub(crate) async fn register_route_aggregate(
148 &self,
149 route_id: String,
150 ) -> Result<(), CamelError> {
151 self.runtime.register_aggregate_only(route_id).await
152 }
153
154 pub(crate) async fn swap_route_pipeline(
155 &self,
156 route_id: &str,
157 pipeline: camel_api::BoxProcessor,
158 ) -> Result<(), CamelError> {
159 self.controller.swap_pipeline(route_id, pipeline).await
160 }
161
162 pub(crate) async fn execute_runtime_command(
163 &self,
164 cmd: camel_api::RuntimeCommand,
165 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
166 self.runtime.execute(cmd).await
167 }
168
169 pub(crate) async fn runtime_route_status(
170 &self,
171 route_id: &str,
172 ) -> Result<Option<String>, CamelError> {
173 match self
174 .runtime
175 .ask(camel_api::RuntimeQuery::GetRouteStatus {
176 route_id: route_id.to_string(),
177 })
178 .await
179 {
180 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
181 Ok(_) => Err(CamelError::RouteError(
182 "unexpected runtime query response for route status".to_string(),
183 )),
184 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
185 Err(err) => Err(err),
186 }
187 }
188
189 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
190 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
191 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
192 Ok(_) => Err(CamelError::RouteError(
193 "unexpected runtime query response for route listing".to_string(),
194 )),
195 Err(err) => Err(err),
196 }
197 }
198
199 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
200 self.controller.route_source_hash(route_id).await
201 }
202
203 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
204 if !self.controller.route_exists(route_id).await? {
205 return Err(CamelError::RouteError(format!(
206 "Route '{}' not found",
207 route_id
208 )));
209 }
210 Ok(self
211 .controller
212 .in_flight_count(route_id)
213 .await?
214 .unwrap_or(0))
215 }
216
217 pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
218 self.function_invoker.clone()
219 }
220
221 #[cfg(test)]
222 pub(crate) async fn force_start_route_for_test(
223 &self,
224 route_id: &str,
225 ) -> Result<(), CamelError> {
226 self.controller.start_route(route_id).await
227 }
228
229 pub async fn controller_route_count_for_test(&self) -> usize {
230 self.controller.route_count().await.unwrap_or(0)
231 }
232}
233
234impl CamelContext {
235 fn built_in_languages() -> SharedLanguageRegistry {
236 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
237 languages.insert(
238 "simple".to_string(),
239 Arc::new(camel_language_simple::SimpleLanguage::new()),
240 );
241 #[cfg(feature = "lang-js")]
242 {
243 let js_lang = camel_language_js::JsLanguage::new();
244 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
245 languages.insert("javascript".to_string(), Arc::new(js_lang));
246 }
247 #[cfg(feature = "lang-rhai")]
248 {
249 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
250 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
251 }
252 #[cfg(feature = "lang-jsonpath")]
253 {
254 languages.insert(
255 "jsonpath".to_string(),
256 Arc::new(camel_language_jsonpath::JsonPathLanguage::new()),
257 );
258 }
259 #[cfg(feature = "lang-xpath")]
260 {
261 languages.insert(
262 "xpath".to_string(),
263 Arc::new(camel_language_xpath::XPathLanguage::new()),
264 );
265 }
266 Arc::new(std::sync::Mutex::new(languages))
267 }
268
269 fn build_runtime(
270 controller: RouteControllerHandle,
271 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
272 execution_factory: Option<ExecutionFactory>,
273 health_registry: Arc<HealthCheckRegistry>,
274 ) -> Arc<RuntimeBus> {
275 let execution: Arc<dyn RuntimeExecutionPort> = if let Some(factory) = execution_factory {
276 factory(controller.clone())
277 } else {
278 Arc::new(RuntimeExecutionAdapter::new(controller))
279 };
280 Arc::new(
281 RuntimeBus::new(
282 Arc::new(store.clone()),
283 Arc::new(store.clone()),
284 Arc::new(store.clone()),
285 Arc::new(store.clone()),
286 )
287 .with_uow(Arc::new(store))
288 .with_execution(execution)
289 .with_health_registry(health_registry),
290 )
291 }
292
293 pub fn builder() -> CamelContextBuilder {
294 CamelContextBuilder::new()
295 }
296
297 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
299 let _ = self.route_controller.set_error_handler(config).await;
300 }
301
302 pub async fn set_tracing(&mut self, enabled: bool) {
304 let _ = self
305 .route_controller
306 .set_tracer_config(TracerConfig {
307 enabled,
308 ..Default::default()
309 })
310 .await;
311 }
312
313 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
315 let config = if config.metrics_collector.is_none() {
317 TracerConfig {
318 metrics_collector: Some(Arc::clone(&self.metrics)),
319 ..config
320 }
321 } else {
322 config
323 };
324
325 let _ = self.route_controller.set_tracer_config(config).await;
326 }
327
328 pub async fn with_tracing(mut self) -> Self {
330 self.set_tracing(true).await;
331 self
332 }
333
334 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
338 self.set_tracer_config(config).await;
339 self
340 }
341
342 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
351 if let Some(collector) = service.as_metrics_collector() {
352 self.metrics = collector;
353 }
354 if let Some(invoker) = service.as_function_invoker() {
355 self.function_invoker = Some(invoker.clone());
356 if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
357 tracing::warn!("Failed to propagate function invoker to route controller: {e}");
358 }
359 }
360
361 self.services.push(Box::new(service));
362 self
363 }
364
365 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
367 info!(scheme = component.scheme(), "Registering component");
368 self.registry
369 .lock()
370 .expect("mutex poisoned: another thread panicked while holding this lock") .register(Arc::new(component));
372 }
373
374 pub fn register_language(
381 &mut self,
382 name: impl Into<String>,
383 lang: Box<dyn Language>,
384 ) -> Result<(), LanguageRegistryError> {
385 let name = name.into();
386 let mut languages = self
387 .languages
388 .lock()
389 .expect("mutex poisoned: another thread panicked while holding this lock"); if languages.contains_key(&name) {
391 return Err(LanguageRegistryError::AlreadyRegistered { name });
392 }
393 languages.insert(name, Arc::from(lang));
394 Ok(())
395 }
396
397 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
399 let languages = self
400 .languages
401 .lock()
402 .expect("mutex poisoned: another thread panicked while holding this lock"); languages.get(name).cloned()
404 }
405
406 pub async fn add_route_definition(
410 &self,
411 definition: RouteDefinition,
412 ) -> Result<(), CamelError> {
413 use crate::lifecycle::ports::RouteRegistrationPort;
414 info!(
415 from = definition.from_uri(),
416 route_id = %definition.route_id(),
417 "Adding route definition"
418 );
419 self.runtime
420 .register_route(definition)
421 .await
422 .map_err(Into::into)
423 }
424
425 fn next_context_command_id(op: &str, route_id: &str) -> String {
426 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
427 format!("context:{op}:{route_id}:{seq}")
428 }
429
430 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
432 self.registry
433 .lock()
434 .expect("mutex poisoned: another thread panicked while holding this lock") }
436
437 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
439 Arc::clone(&self.registry)
440 }
441
442 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
444 RuntimeExecutionHandle {
445 controller: self.route_controller.clone(),
446 runtime: Arc::clone(&self.runtime),
447 function_invoker: self.function_invoker.clone(),
448 }
449 }
450
451 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
453 Arc::clone(&self.metrics)
454 }
455
456 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
458 Arc::clone(&self.platform_service)
459 }
460
461 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
463 self.platform_service.readiness_gate()
464 }
465
466 pub fn platform_identity(&self) -> PlatformIdentity {
468 self.platform_service.identity()
469 }
470
471 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
473 self.platform_service.leadership()
474 }
475
476 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
478 self.runtime.clone()
479 }
480
481 pub fn producer_context(&self) -> camel_api::ProducerContext {
483 camel_api::ProducerContext::new().with_runtime(self.runtime())
484 }
485
486 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
488 match self
489 .runtime()
490 .ask(camel_api::RuntimeQuery::GetRouteStatus {
491 route_id: route_id.to_string(),
492 })
493 .await
494 {
495 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
496 Ok(_) => Err(CamelError::RouteError(
497 "unexpected runtime query response for route status".to_string(),
498 )),
499 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
500 Err(err) => Err(err),
501 }
502 }
503
504 pub async fn start(&mut self) -> Result<(), CamelError> {
509 info!("Starting CamelContext");
510
511 for (i, service) in self.services.iter_mut().enumerate() {
513 info!("Starting service: {}", service.name());
514 if let Err(e) = service.start().await {
515 warn!(
517 "Service {} failed to start, rolling back {} services",
518 service.name(),
519 i
520 );
521 for j in (0..i).rev() {
522 if let Err(rollback_err) = self.services[j].stop().await {
523 warn!(
524 "Failed to stop service {} during rollback: {}",
525 self.services[j].name(),
526 rollback_err
527 );
528 }
529 }
530 return Err(e);
531 }
532 }
533
534 let route_ids = self.route_controller.auto_startup_route_ids().await?;
537 for route_id in route_ids {
538 self.runtime
539 .execute(camel_api::RuntimeCommand::StartRoute {
540 route_id: route_id.clone(),
541 command_id: Self::next_context_command_id("start", &route_id),
542 causation_id: None,
543 })
544 .await?;
545 }
546
547 info!("CamelContext started");
548 Ok(())
549 }
550
551 pub async fn stop(&mut self) -> Result<(), CamelError> {
553 self.stop_timeout(self.shutdown_timeout).await
554 }
555
556 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
563 info!("Stopping CamelContext");
564
565 self.cancel_token.cancel();
567 if let Some(join) = self.supervision_join.take() {
568 join.abort();
569 }
570
571 let route_ids = self.route_controller.shutdown_route_ids().await?;
574 for route_id in route_ids {
575 if let Err(err) = self
576 .runtime
577 .execute(camel_api::RuntimeCommand::StopRoute {
578 route_id: route_id.clone(),
579 command_id: Self::next_context_command_id("stop", &route_id),
580 causation_id: None,
581 })
582 .await
583 {
584 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
585 }
586 }
587
588 self.health_registry.cancel_token().cancel();
589
590 let mut first_error = None;
593 for service in self.services.iter_mut().rev() {
594 info!("Stopping service: {}", service.name());
595 if let Err(e) = service.stop().await {
596 warn!("Service {} failed to stop: {}", service.name(), e);
597 if first_error.is_none() {
598 first_error = Some(e);
599 }
600 }
601 }
602
603 info!("CamelContext stopped");
604
605 if let Some(e) = first_error {
606 Err(e)
607 } else {
608 Ok(())
609 }
610 }
611
612 pub fn shutdown_timeout(&self) -> std::time::Duration {
614 self.shutdown_timeout
615 }
616
617 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
619 self.shutdown_timeout = timeout;
620 }
621
622 pub async fn abort(&mut self) {
624 self.cancel_token.cancel();
625 if let Some(join) = self.supervision_join.take() {
626 join.abort();
627 }
628 let route_ids = self
629 .route_controller
630 .shutdown_route_ids()
631 .await
632 .unwrap_or_default();
633 for route_id in route_ids {
634 let _ = self
635 .runtime
636 .execute(camel_api::RuntimeCommand::StopRoute {
637 route_id: route_id.clone(),
638 command_id: Self::next_context_command_id("abort-stop", &route_id),
639 causation_id: None,
640 })
641 .await;
642 }
643
644 for service in self.services.iter_mut().rev() {
645 let name = service.name().to_string();
646 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
647 Ok(Ok(())) => info!("Aborted service: {}", name),
648 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
649 Err(_) => warn!("Service {} timed out during abort (5s)", name),
650 }
651 }
652 }
653
654 pub async fn health_check(&self) -> HealthReport {
656 use camel_api::HealthSource;
657 self.health_report().await
658 }
659
660 pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
661 Arc::clone(&self.health_registry)
662 }
663
664 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
666 self.component_configs
667 .insert(TypeId::of::<T>(), Box::new(config));
668 }
669
670 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
672 self.component_configs
673 .get(&TypeId::of::<T>())
674 .and_then(|b| b.downcast_ref::<T>())
675 }
676
677 pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
683 self.template_registry.register(spec)
684 }
685
686 pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
688 self.template_registry.get(id)
689 }
690
691 pub fn template_ids(&self) -> Vec<String> {
693 self.template_registry.template_ids()
694 }
695
696 pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
698 self.template_registry.record_instance(record)
699 }
700
701 pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
703 self.template_registry.instances(template_id)
704 }
705}
706
707impl ComponentRegistrar for CamelContext {
708 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
709 self.registry
710 .lock()
711 .expect("mutex poisoned: another thread panicked while holding this lock") .register(component);
713 }
714}
715
716impl ComponentContext for CamelContext {
717 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
718 self.registry.lock().ok()?.get(scheme)
719 }
720
721 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
722 self.languages.lock().ok()?.get(name).cloned()
723 }
724
725 fn metrics(&self) -> Arc<dyn MetricsCollector> {
726 Arc::clone(&self.metrics)
727 }
728
729 fn health(&self) -> Arc<dyn camel_component_api::HealthCheckRegistry> {
730 Arc::clone(&self.health_registry) as Arc<dyn camel_component_api::HealthCheckRegistry>
733 }
734
735 fn platform_service(&self) -> Arc<dyn PlatformService> {
736 Arc::clone(&self.platform_service)
737 }
738
739 fn register_route_health_check(
740 &self,
741 route_id: &str,
742 check: Arc<dyn camel_api::AsyncHealthCheck>,
743 ) {
744 self.health_registry.register_for_route(route_id, check);
745 }
746
747 fn unregister_route_health_check(&self, route_id: &str) {
748 self.health_registry.unregister_for_route(route_id);
749 }
750}
751
752#[async_trait::async_trait]
753impl camel_api::HealthSource for CamelContext {
754 async fn liveness(&self) -> camel_api::HealthStatus {
755 let has_failed = self
756 .services
757 .iter()
758 .any(|s| s.status() == camel_api::ServiceStatus::Failed);
759 if has_failed {
760 camel_api::HealthStatus::Unhealthy
761 } else {
762 camel_api::HealthStatus::Healthy
763 }
764 }
765
766 async fn readiness(&self) -> camel_api::HealthStatus {
767 let has_failed = self
768 .services
769 .iter()
770 .any(|s| s.status() == camel_api::ServiceStatus::Failed);
771 if has_failed {
772 return camel_api::HealthStatus::Unhealthy;
773 }
774 let has_stopped = self
775 .services
776 .iter()
777 .any(|s| s.status() == camel_api::ServiceStatus::Stopped);
778 if has_stopped {
779 return camel_api::HealthStatus::Degraded;
780 }
781 self.health_registry.check_all().await.status
782 }
783
784 async fn health_report(&self) -> camel_api::HealthReport {
785 let mut report = self.health_registry.check_all().await;
786 let mut worst = report.status;
787 for service in &self.services {
788 let svc_status = service.status();
789 let health = match svc_status {
790 camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
791 camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
792 camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
793 };
794 if matches!(worst, camel_api::HealthStatus::Healthy)
795 && matches!(
796 health,
797 camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
798 )
799 {
800 worst = health;
801 }
802 if matches!(worst, camel_api::HealthStatus::Degraded)
803 && matches!(health, camel_api::HealthStatus::Unhealthy)
804 {
805 worst = health;
806 }
807 report.services.push(camel_api::ServiceHealth {
808 name: service.name().to_string(),
809 status: svc_status,
810 message: None,
811 });
812 }
813 report.status = worst;
814 report
815 }
816
817 async fn startup(&self) -> camel_api::HealthStatus {
818 camel_api::HealthStatus::Healthy
819 }
820}
821
822impl CamelContextBuilder {
823 pub fn new() -> Self {
824 Self {
825 registry: None,
826 languages: None,
827 metrics: None,
828 platform_service: None,
829 supervision_config: None,
830 runtime_store: None,
831 shutdown_timeout: std::time::Duration::from_secs(5),
832 beans: None,
833 function_invoker: None,
834 lifecycle_services: Vec::new(),
835 execution_factory: None,
836 health_registry: None,
837 template_registry: None,
838 }
839 }
840
841 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
842 self.registry = Some(registry);
843 self
844 }
845
846 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
847 self.languages = Some(languages);
848 self
849 }
850
851 pub fn with_execution_factory(
852 mut self,
853 factory: impl Fn(RouteControllerHandle) -> Arc<dyn RuntimeExecutionPort> + Send + Sync + 'static,
854 ) -> Self {
855 self.execution_factory = Some(Arc::new(factory));
856 self
857 }
858
859 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
860 self.metrics = Some(metrics);
861 self
862 }
863
864 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
866 self.platform_service = Some(platform_service);
867 self
868 }
869
870 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
871 self.supervision_config = Some(config);
872 self
873 }
874
875 pub fn runtime_store(
876 mut self,
877 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
878 ) -> Self {
879 self.runtime_store = Some(store);
880 self
881 }
882
883 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
884 self.shutdown_timeout = timeout;
885 self
886 }
887
888 pub fn health_registry(mut self, registry: Arc<HealthCheckRegistry>) -> Self {
889 self.health_registry = Some(registry);
890 self
891 }
892
893 pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
895 self.beans = Some(beans);
896 self
897 }
898
899 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
905 if let Some(collector) = service.as_metrics_collector() {
906 self.metrics = Some(collector);
907 }
908 if let Some(invoker) = service.as_function_invoker() {
909 self.function_invoker = Some(invoker);
910 }
911 self.lifecycle_services.push(Box::new(service));
912 self
913 }
914
915 pub fn template_registry(mut self, registry: Arc<TemplateRegistry>) -> Self {
919 self.template_registry = Some(registry);
920 self
921 }
922
923 pub async fn build(self) -> Result<CamelContext, CamelError> {
924 let registry = self
925 .registry
926 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
927 let languages = self
928 .languages
929 .unwrap_or_else(CamelContext::built_in_languages);
930 let simple_with_resolver: Arc<dyn Language> = Arc::new(
931 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
932 let languages = Arc::clone(&languages);
933 move |name| {
934 languages
935 .lock()
936 .ok()
937 .and_then(|registry| registry.get(name).cloned())
938 }
939 })),
940 );
941 languages
942 .lock()
943 .expect("mutex poisoned: another thread panicked while holding this lock") .insert("simple".to_string(), simple_with_resolver);
945 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
946 let platform_service = self
947 .platform_service
948 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
949 let health_registry = self.health_registry.unwrap_or_else(|| {
950 Arc::new(HealthCheckRegistry::new(std::time::Duration::from_secs(5)))
951 });
952
953 let (controller, actor_join, supervision_join) =
954 if let Some(config) = self.supervision_config {
955 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
956 let mut controller_impl = if let Some(ref beans) = self.beans {
957 DefaultRouteController::with_languages_and_beans(
958 Arc::clone(®istry),
959 Arc::clone(&languages),
960 Arc::clone(&platform_service),
961 Arc::clone(beans),
962 )
963 } else {
964 DefaultRouteController::with_languages(
965 Arc::clone(®istry),
966 Arc::clone(&languages),
967 Arc::clone(&platform_service),
968 )
969 };
970 if let Some(invoker) = self.function_invoker.clone() {
971 controller_impl = controller_impl.with_function_invoker(invoker);
972 }
973 controller_impl.set_health_registry(Arc::clone(&health_registry));
974 controller_impl.set_crash_notifier(crash_tx);
975 let (controller, actor_join) = spawn_controller_actor(controller_impl);
976 let supervision_join = spawn_supervision_task(
977 controller.clone(),
978 config,
979 Some(Arc::clone(&metrics)),
980 crash_rx,
981 );
982 (controller, actor_join, Some(supervision_join))
983 } else {
984 let mut controller_impl = if let Some(ref beans) = self.beans {
985 DefaultRouteController::with_languages_and_beans(
986 Arc::clone(®istry),
987 Arc::clone(&languages),
988 Arc::clone(&platform_service),
989 Arc::clone(beans),
990 )
991 } else {
992 DefaultRouteController::with_languages(
993 Arc::clone(®istry),
994 Arc::clone(&languages),
995 Arc::clone(&platform_service),
996 )
997 };
998 if let Some(invoker) = self.function_invoker.clone() {
999 controller_impl = controller_impl.with_function_invoker(invoker);
1000 }
1001 controller_impl.set_health_registry(Arc::clone(&health_registry));
1002 let (controller, actor_join) = spawn_controller_actor(controller_impl);
1003 (controller, actor_join, None)
1004 };
1005
1006 let store = self.runtime_store.unwrap_or_default();
1007 let runtime = CamelContext::build_runtime(
1008 controller.clone(),
1009 store,
1010 self.execution_factory,
1011 Arc::clone(&health_registry),
1012 );
1013 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
1014 controller
1015 .try_set_runtime_handle(runtime_handle)
1016 .expect("controller actor mailbox should accept initial runtime handle"); let template_registry = self
1019 .template_registry
1020 .unwrap_or_else(|| Arc::new(TemplateRegistry::new()));
1021
1022 Ok(CamelContext {
1023 registry,
1024 route_controller: controller,
1025 _actor_join: actor_join,
1026 supervision_join,
1027 runtime,
1028 cancel_token: CancellationToken::new(),
1029 metrics,
1030 platform_service,
1031 languages,
1032 shutdown_timeout: self.shutdown_timeout,
1033 services: self.lifecycle_services,
1034 health_registry,
1035 component_configs: HashMap::new(),
1036 function_invoker: self.function_invoker,
1037 template_registry,
1038 })
1039 }
1040}
1041
1042impl Default for CamelContextBuilder {
1043 fn default() -> Self {
1044 Self::new()
1045 }
1046}
1047
1048#[cfg(test)]
1049#[path = "context_tests.rs"]
1050mod context_tests;