1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
12 NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RuntimeCommandBus,
13 RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23 DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34 registry: Option<Arc<std::sync::Mutex<Registry>>>,
35 languages: Option<SharedLanguageRegistry>,
36 metrics: Option<Arc<dyn MetricsCollector>>,
37 platform_service: Option<Arc<dyn PlatformService>>,
39 supervision_config: Option<SupervisionConfig>,
40 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41 shutdown_timeout: std::time::Duration,
42 beans: Option<Arc<std::sync::Mutex<camel_bean::BeanRegistry>>>,
43}
44
45pub struct CamelContext {
53 registry: Arc<std::sync::Mutex<Registry>>,
54 route_controller: RouteControllerHandle,
55 _actor_join: tokio::task::JoinHandle<()>,
56 supervision_join: Option<tokio::task::JoinHandle<()>>,
57 runtime: Arc<RuntimeBus>,
58 cancel_token: CancellationToken,
59 metrics: Arc<dyn MetricsCollector>,
60 platform_service: Arc<dyn PlatformService>,
62 languages: SharedLanguageRegistry,
63 shutdown_timeout: std::time::Duration,
64 services: Vec<Box<dyn Lifecycle>>,
65 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
66}
67
68#[derive(Clone)]
72pub struct RuntimeExecutionHandle {
73 controller: RouteControllerHandle,
74 runtime: Arc<RuntimeBus>,
75}
76
77impl RuntimeExecutionHandle {
78 pub(crate) async fn add_route_definition(
79 &self,
80 definition: RouteDefinition,
81 ) -> Result<(), CamelError> {
82 use crate::lifecycle::ports::RouteRegistrationPort;
83 self.runtime.register_route(definition).await
84 }
85
86 pub(crate) async fn compile_route_definition(
87 &self,
88 definition: RouteDefinition,
89 ) -> Result<camel_api::BoxProcessor, CamelError> {
90 self.controller.compile_route_definition(definition).await
91 }
92
93 pub(crate) async fn swap_route_pipeline(
94 &self,
95 route_id: &str,
96 pipeline: camel_api::BoxProcessor,
97 ) -> Result<(), CamelError> {
98 self.controller.swap_pipeline(route_id, pipeline).await
99 }
100
101 pub(crate) async fn execute_runtime_command(
102 &self,
103 cmd: camel_api::RuntimeCommand,
104 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
105 self.runtime.execute(cmd).await
106 }
107
108 pub(crate) async fn runtime_route_status(
109 &self,
110 route_id: &str,
111 ) -> Result<Option<String>, CamelError> {
112 match self
113 .runtime
114 .ask(camel_api::RuntimeQuery::GetRouteStatus {
115 route_id: route_id.to_string(),
116 })
117 .await
118 {
119 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
120 Ok(_) => Err(CamelError::RouteError(
121 "unexpected runtime query response for route status".to_string(),
122 )),
123 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
124 Err(err) => Err(err),
125 }
126 }
127
128 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
129 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
130 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
131 Ok(_) => Err(CamelError::RouteError(
132 "unexpected runtime query response for route listing".to_string(),
133 )),
134 Err(err) => Err(err),
135 }
136 }
137
138 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
139 self.controller.route_source_hash(route_id).await
140 }
141
142 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
143 if !self.controller.route_exists(route_id).await? {
144 return Err(CamelError::RouteError(format!(
145 "Route '{}' not found",
146 route_id
147 )));
148 }
149 Ok(self
150 .controller
151 .in_flight_count(route_id)
152 .await?
153 .unwrap_or(0))
154 }
155
156 #[cfg(test)]
157 pub(crate) async fn force_start_route_for_test(
158 &self,
159 route_id: &str,
160 ) -> Result<(), CamelError> {
161 self.controller.start_route(route_id).await
162 }
163
164 #[cfg(test)]
165 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
166 self.controller.route_count().await.unwrap_or(0)
167 }
168}
169
170impl CamelContext {
171 fn built_in_languages() -> SharedLanguageRegistry {
172 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
173 languages.insert(
174 "simple".to_string(),
175 Arc::new(camel_language_simple::SimpleLanguage::new()),
176 );
177 #[cfg(feature = "lang-js")]
178 {
179 let js_lang = camel_language_js::JsLanguage::new();
180 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
181 languages.insert("javascript".to_string(), Arc::new(js_lang));
182 }
183 #[cfg(feature = "lang-rhai")]
184 {
185 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
186 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
187 }
188 #[cfg(feature = "lang-jsonpath")]
189 {
190 languages.insert(
191 "jsonpath".to_string(),
192 Arc::new(camel_language_jsonpath::JsonPathLanguage),
193 );
194 }
195 #[cfg(feature = "lang-xpath")]
196 {
197 languages.insert(
198 "xpath".to_string(),
199 Arc::new(camel_language_xpath::XPathLanguage),
200 );
201 }
202 Arc::new(std::sync::Mutex::new(languages))
203 }
204
205 fn build_runtime(
206 controller: RouteControllerHandle,
207 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
208 ) -> Arc<RuntimeBus> {
209 let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
210 Arc::new(
211 RuntimeBus::new(
212 Arc::new(store.clone()),
213 Arc::new(store.clone()),
214 Arc::new(store.clone()),
215 Arc::new(store.clone()),
216 )
217 .with_uow(Arc::new(store))
218 .with_execution(execution),
219 )
220 }
221
222 pub fn builder() -> CamelContextBuilder {
223 CamelContextBuilder::new()
224 }
225
226 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
228 let _ = self.route_controller.set_error_handler(config).await;
229 }
230
231 pub async fn set_tracing(&mut self, enabled: bool) {
233 let _ = self
234 .route_controller
235 .set_tracer_config(TracerConfig {
236 enabled,
237 ..Default::default()
238 })
239 .await;
240 }
241
242 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
244 let config = if config.metrics_collector.is_none() {
246 TracerConfig {
247 metrics_collector: Some(Arc::clone(&self.metrics)),
248 ..config
249 }
250 } else {
251 config
252 };
253
254 let _ = self.route_controller.set_tracer_config(config).await;
255 }
256
257 pub async fn with_tracing(mut self) -> Self {
259 self.set_tracing(true).await;
260 self
261 }
262
263 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
267 self.set_tracer_config(config).await;
268 self
269 }
270
271 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
273 if let Some(collector) = service.as_metrics_collector() {
275 self.metrics = collector;
276 }
277
278 self.services.push(Box::new(service));
279 self
280 }
281
282 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
284 info!(scheme = component.scheme(), "Registering component");
285 self.registry
286 .lock()
287 .expect("mutex poisoned: another thread panicked while holding this lock")
288 .register(Arc::new(component));
289 }
290
291 pub fn register_language(
298 &mut self,
299 name: impl Into<String>,
300 lang: Box<dyn Language>,
301 ) -> Result<(), LanguageRegistryError> {
302 let name = name.into();
303 let mut languages = self
304 .languages
305 .lock()
306 .expect("mutex poisoned: another thread panicked while holding this lock");
307 if languages.contains_key(&name) {
308 return Err(LanguageRegistryError::AlreadyRegistered { name });
309 }
310 languages.insert(name, Arc::from(lang));
311 Ok(())
312 }
313
314 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
316 let languages = self
317 .languages
318 .lock()
319 .expect("mutex poisoned: another thread panicked while holding this lock");
320 languages.get(name).cloned()
321 }
322
323 pub async fn add_route_definition(
327 &self,
328 definition: RouteDefinition,
329 ) -> Result<(), CamelError> {
330 use crate::lifecycle::ports::RouteRegistrationPort;
331 info!(
332 from = definition.from_uri(),
333 route_id = %definition.route_id(),
334 "Adding route definition"
335 );
336 self.runtime.register_route(definition).await
337 }
338
339 fn next_context_command_id(op: &str, route_id: &str) -> String {
340 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
341 format!("context:{op}:{route_id}:{seq}")
342 }
343
344 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
346 self.registry
347 .lock()
348 .expect("mutex poisoned: another thread panicked while holding this lock")
349 }
350
351 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
353 Arc::clone(&self.registry)
354 }
355
356 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
358 RuntimeExecutionHandle {
359 controller: self.route_controller.clone(),
360 runtime: Arc::clone(&self.runtime),
361 }
362 }
363
364 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
366 Arc::clone(&self.metrics)
367 }
368
369 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
371 Arc::clone(&self.platform_service)
372 }
373
374 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
376 self.platform_service.readiness_gate()
377 }
378
379 pub fn platform_identity(&self) -> PlatformIdentity {
381 self.platform_service.identity()
382 }
383
384 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
386 self.platform_service.leadership()
387 }
388
389 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
391 self.runtime.clone()
392 }
393
394 pub fn producer_context(&self) -> camel_api::ProducerContext {
396 camel_api::ProducerContext::new().with_runtime(self.runtime())
397 }
398
399 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
401 match self
402 .runtime()
403 .ask(camel_api::RuntimeQuery::GetRouteStatus {
404 route_id: route_id.to_string(),
405 })
406 .await
407 {
408 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
409 Ok(_) => Err(CamelError::RouteError(
410 "unexpected runtime query response for route status".to_string(),
411 )),
412 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
413 Err(err) => Err(err),
414 }
415 }
416
417 pub async fn start(&mut self) -> Result<(), CamelError> {
422 info!("Starting CamelContext");
423
424 for (i, service) in self.services.iter_mut().enumerate() {
426 info!("Starting service: {}", service.name());
427 if let Err(e) = service.start().await {
428 warn!(
430 "Service {} failed to start, rolling back {} services",
431 service.name(),
432 i
433 );
434 for j in (0..i).rev() {
435 if let Err(rollback_err) = self.services[j].stop().await {
436 warn!(
437 "Failed to stop service {} during rollback: {}",
438 self.services[j].name(),
439 rollback_err
440 );
441 }
442 }
443 return Err(e);
444 }
445 }
446
447 let route_ids = self.route_controller.auto_startup_route_ids().await?;
450 for route_id in route_ids {
451 self.runtime
452 .execute(camel_api::RuntimeCommand::StartRoute {
453 route_id: route_id.clone(),
454 command_id: Self::next_context_command_id("start", &route_id),
455 causation_id: None,
456 })
457 .await?;
458 }
459
460 info!("CamelContext started");
461 Ok(())
462 }
463
464 pub async fn stop(&mut self) -> Result<(), CamelError> {
466 self.stop_timeout(self.shutdown_timeout).await
467 }
468
469 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
474 info!("Stopping CamelContext");
475
476 self.cancel_token.cancel();
478 if let Some(join) = self.supervision_join.take() {
479 join.abort();
480 }
481
482 let route_ids = self.route_controller.shutdown_route_ids().await?;
485 for route_id in route_ids {
486 if let Err(err) = self
487 .runtime
488 .execute(camel_api::RuntimeCommand::StopRoute {
489 route_id: route_id.clone(),
490 command_id: Self::next_context_command_id("stop", &route_id),
491 causation_id: None,
492 })
493 .await
494 {
495 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
496 }
497 }
498
499 let mut first_error = None;
502 for service in self.services.iter_mut().rev() {
503 info!("Stopping service: {}", service.name());
504 if let Err(e) = service.stop().await {
505 warn!("Service {} failed to stop: {}", service.name(), e);
506 if first_error.is_none() {
507 first_error = Some(e);
508 }
509 }
510 }
511
512 info!("CamelContext stopped");
513
514 if let Some(e) = first_error {
515 Err(e)
516 } else {
517 Ok(())
518 }
519 }
520
521 pub fn shutdown_timeout(&self) -> std::time::Duration {
523 self.shutdown_timeout
524 }
525
526 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
528 self.shutdown_timeout = timeout;
529 }
530
531 pub async fn abort(&mut self) {
533 self.cancel_token.cancel();
534 if let Some(join) = self.supervision_join.take() {
535 join.abort();
536 }
537 let route_ids = self
538 .route_controller
539 .shutdown_route_ids()
540 .await
541 .unwrap_or_default();
542 for route_id in route_ids {
543 let _ = self
544 .runtime
545 .execute(camel_api::RuntimeCommand::StopRoute {
546 route_id: route_id.clone(),
547 command_id: Self::next_context_command_id("abort-stop", &route_id),
548 causation_id: None,
549 })
550 .await;
551 }
552
553 for service in self.services.iter_mut().rev() {
554 let name = service.name().to_string();
555 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
556 Ok(Ok(())) => info!("Aborted service: {}", name),
557 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
558 Err(_) => warn!("Service {} timed out during abort (5s)", name),
559 }
560 }
561 }
562
563 pub fn health_check(&self) -> HealthReport {
565 let services: Vec<ServiceHealth> = self
566 .services
567 .iter()
568 .map(|s| ServiceHealth {
569 name: s.name().to_string(),
570 status: s.status(),
571 })
572 .collect();
573
574 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
575 HealthStatus::Healthy
576 } else {
577 HealthStatus::Unhealthy
578 };
579
580 HealthReport {
581 status,
582 services,
583 ..Default::default()
584 }
585 }
586
587 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
589 self.component_configs
590 .insert(TypeId::of::<T>(), Box::new(config));
591 }
592
593 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
595 self.component_configs
596 .get(&TypeId::of::<T>())
597 .and_then(|b| b.downcast_ref::<T>())
598 }
599}
600
601impl ComponentRegistrar for CamelContext {
602 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
603 self.registry
604 .lock()
605 .expect("mutex poisoned: another thread panicked while holding this lock")
606 .register(component);
607 }
608}
609
610impl ComponentContext for CamelContext {
611 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
612 self.registry.lock().ok()?.get(scheme)
613 }
614
615 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
616 self.languages.lock().ok()?.get(name).cloned()
617 }
618
619 fn metrics(&self) -> Arc<dyn MetricsCollector> {
620 Arc::clone(&self.metrics)
621 }
622
623 fn platform_service(&self) -> Arc<dyn PlatformService> {
624 Arc::clone(&self.platform_service)
625 }
626}
627
628impl CamelContextBuilder {
629 pub fn new() -> Self {
630 Self {
631 registry: None,
632 languages: None,
633 metrics: None,
634 platform_service: None,
635 supervision_config: None,
636 runtime_store: None,
637 shutdown_timeout: std::time::Duration::from_secs(30),
638 beans: None,
639 }
640 }
641
642 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
643 self.registry = Some(registry);
644 self
645 }
646
647 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
648 self.languages = Some(languages);
649 self
650 }
651
652 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
653 self.metrics = Some(metrics);
654 self
655 }
656
657 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
659 self.platform_service = Some(platform_service);
660 self
661 }
662
663 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
664 self.supervision_config = Some(config);
665 self
666 }
667
668 pub fn runtime_store(
669 mut self,
670 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
671 ) -> Self {
672 self.runtime_store = Some(store);
673 self
674 }
675
676 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
677 self.shutdown_timeout = timeout;
678 self
679 }
680
681 pub fn beans(mut self, beans: Arc<std::sync::Mutex<camel_bean::BeanRegistry>>) -> Self {
683 self.beans = Some(beans);
684 self
685 }
686
687 pub async fn build(self) -> Result<CamelContext, CamelError> {
688 let registry = self
689 .registry
690 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
691 let languages = self
692 .languages
693 .unwrap_or_else(CamelContext::built_in_languages);
694 let simple_with_resolver: Arc<dyn Language> = Arc::new(
695 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
696 let languages = Arc::clone(&languages);
697 move |name| {
698 languages
699 .lock()
700 .ok()
701 .and_then(|registry| registry.get(name).cloned())
702 }
703 })),
704 );
705 languages
706 .lock()
707 .expect("mutex poisoned: another thread panicked while holding this lock")
708 .insert("simple".to_string(), simple_with_resolver);
709 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
710 let platform_service = self
711 .platform_service
712 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
713
714 let (controller, actor_join, supervision_join) =
715 if let Some(config) = self.supervision_config {
716 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
717 let mut controller_impl = if let Some(ref beans) = self.beans {
718 DefaultRouteController::with_languages_and_beans(
719 Arc::clone(®istry),
720 Arc::clone(&languages),
721 Arc::clone(&platform_service),
722 Arc::clone(beans),
723 )
724 } else {
725 DefaultRouteController::with_languages(
726 Arc::clone(®istry),
727 Arc::clone(&languages),
728 Arc::clone(&platform_service),
729 )
730 };
731 controller_impl.set_crash_notifier(crash_tx);
732 let (controller, actor_join) = spawn_controller_actor(controller_impl);
733 let supervision_join = spawn_supervision_task(
734 controller.clone(),
735 config,
736 Some(Arc::clone(&metrics)),
737 crash_rx,
738 );
739 (controller, actor_join, Some(supervision_join))
740 } else {
741 let controller_impl = if let Some(ref beans) = self.beans {
742 DefaultRouteController::with_languages_and_beans(
743 Arc::clone(®istry),
744 Arc::clone(&languages),
745 Arc::clone(&platform_service),
746 Arc::clone(beans),
747 )
748 } else {
749 DefaultRouteController::with_languages(
750 Arc::clone(®istry),
751 Arc::clone(&languages),
752 Arc::clone(&platform_service),
753 )
754 };
755 let (controller, actor_join) = spawn_controller_actor(controller_impl);
756 (controller, actor_join, None)
757 };
758
759 let store = self.runtime_store.unwrap_or_default();
760 let runtime = CamelContext::build_runtime(controller.clone(), store);
761 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
762 controller
763 .try_set_runtime_handle(runtime_handle)
764 .expect("controller actor mailbox should accept initial runtime handle");
765
766 Ok(CamelContext {
767 registry,
768 route_controller: controller,
769 _actor_join: actor_join,
770 supervision_join,
771 runtime,
772 cancel_token: CancellationToken::new(),
773 metrics,
774 platform_service,
775 languages,
776 shutdown_timeout: self.shutdown_timeout,
777 services: Vec::new(),
778 component_configs: HashMap::new(),
779 })
780 }
781}
782
783impl Default for CamelContextBuilder {
784 fn default() -> Self {
785 Self::new()
786 }
787}
788
789#[cfg(test)]
790#[path = "context_tests.rs"]
791mod context_tests;