1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::{
10 CamelError, HealthReport, HealthStatus, LeaderElector, Lifecycle, MetricsCollector,
11 NoOpMetrics, NoopLeaderElector, NoopReadinessGate, PlatformIdentity, ReadinessGate,
12 RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
13};
14use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
15use camel_language_api::Language;
16
17use crate::lifecycle::adapters::RuntimeExecutionAdapter;
18use crate::lifecycle::adapters::controller_actor::{
19 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
20};
21use crate::lifecycle::adapters::route_controller::{
22 DefaultRouteController, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::lifecycle::application::runtime_bus::RuntimeBus;
26use crate::lifecycle::domain::LanguageRegistryError;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29
30static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
31
32pub struct CamelContextBuilder {
33 registry: Option<Arc<std::sync::Mutex<Registry>>>,
34 languages: Option<SharedLanguageRegistry>,
35 metrics: Option<Arc<dyn MetricsCollector>>,
36 leader_elector: Option<Arc<dyn LeaderElector>>,
38 readiness_gate: Option<Arc<dyn ReadinessGate>>,
39 platform_identity: Option<PlatformIdentity>,
40 supervision_config: Option<SupervisionConfig>,
41 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
42 shutdown_timeout: std::time::Duration,
43}
44
45pub struct CamelContext {
53 registry: Arc<std::sync::Mutex<Registry>>,
54 route_controller: RouteControllerHandle,
55 _actor_join: tokio::task::JoinHandle<()>,
56 supervision_join: Option<tokio::task::JoinHandle<()>>,
57 runtime: Arc<RuntimeBus>,
58 cancel_token: CancellationToken,
59 metrics: Arc<dyn MetricsCollector>,
60 leader_elector: Arc<dyn LeaderElector>,
62 readiness_gate: Arc<dyn ReadinessGate>,
63 platform_identity: PlatformIdentity,
64 languages: SharedLanguageRegistry,
65 shutdown_timeout: std::time::Duration,
66 services: Vec<Box<dyn Lifecycle>>,
67 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
68}
69
70#[derive(Clone)]
74pub struct RuntimeExecutionHandle {
75 controller: RouteControllerHandle,
76 runtime: Arc<RuntimeBus>,
77}
78
79impl RuntimeExecutionHandle {
80 pub(crate) async fn add_route_definition(
81 &self,
82 definition: RouteDefinition,
83 ) -> Result<(), CamelError> {
84 use crate::lifecycle::ports::RouteRegistrationPort;
85 self.runtime.register_route(definition).await
86 }
87
88 pub(crate) async fn compile_route_definition(
89 &self,
90 definition: RouteDefinition,
91 ) -> Result<camel_api::BoxProcessor, CamelError> {
92 self.controller.compile_route_definition(definition).await
93 }
94
95 pub(crate) async fn swap_route_pipeline(
96 &self,
97 route_id: &str,
98 pipeline: camel_api::BoxProcessor,
99 ) -> Result<(), CamelError> {
100 self.controller.swap_pipeline(route_id, pipeline).await
101 }
102
103 pub(crate) async fn execute_runtime_command(
104 &self,
105 cmd: camel_api::RuntimeCommand,
106 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
107 self.runtime.execute(cmd).await
108 }
109
110 pub(crate) async fn runtime_route_status(
111 &self,
112 route_id: &str,
113 ) -> Result<Option<String>, CamelError> {
114 match self
115 .runtime
116 .ask(camel_api::RuntimeQuery::GetRouteStatus {
117 route_id: route_id.to_string(),
118 })
119 .await
120 {
121 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
122 Ok(_) => Err(CamelError::RouteError(
123 "unexpected runtime query response for route status".to_string(),
124 )),
125 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
126 Err(err) => Err(err),
127 }
128 }
129
130 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
131 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
132 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
133 Ok(_) => Err(CamelError::RouteError(
134 "unexpected runtime query response for route listing".to_string(),
135 )),
136 Err(err) => Err(err),
137 }
138 }
139
140 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
141 self.controller.route_source_hash(route_id).await
142 }
143
144 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
145 if !self.controller.route_exists(route_id).await? {
146 return Err(CamelError::RouteError(format!(
147 "Route '{}' not found",
148 route_id
149 )));
150 }
151 Ok(self
152 .controller
153 .in_flight_count(route_id)
154 .await?
155 .unwrap_or(0))
156 }
157
158 #[cfg(test)]
159 pub(crate) async fn force_start_route_for_test(
160 &self,
161 route_id: &str,
162 ) -> Result<(), CamelError> {
163 self.controller.start_route(route_id).await
164 }
165
166 #[cfg(test)]
167 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
168 self.controller.route_count().await.unwrap_or(0)
169 }
170}
171
172impl CamelContext {
173 fn built_in_languages() -> SharedLanguageRegistry {
174 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
175 languages.insert(
176 "simple".to_string(),
177 Arc::new(camel_language_simple::SimpleLanguage),
178 );
179 #[cfg(feature = "lang-js")]
180 {
181 let js_lang = camel_language_js::JsLanguage::new();
182 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
183 languages.insert("javascript".to_string(), Arc::new(js_lang));
184 }
185 #[cfg(feature = "lang-rhai")]
186 {
187 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
188 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
189 }
190 #[cfg(feature = "lang-jsonpath")]
191 {
192 languages.insert(
193 "jsonpath".to_string(),
194 Arc::new(camel_language_jsonpath::JsonPathLanguage),
195 );
196 }
197 #[cfg(feature = "lang-xpath")]
198 {
199 languages.insert(
200 "xpath".to_string(),
201 Arc::new(camel_language_xpath::XPathLanguage),
202 );
203 }
204 Arc::new(std::sync::Mutex::new(languages))
205 }
206
207 fn build_runtime(
208 controller: RouteControllerHandle,
209 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
210 ) -> Arc<RuntimeBus> {
211 let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
212 Arc::new(
213 RuntimeBus::new(
214 Arc::new(store.clone()),
215 Arc::new(store.clone()),
216 Arc::new(store.clone()),
217 Arc::new(store.clone()),
218 )
219 .with_uow(Arc::new(store))
220 .with_execution(execution),
221 )
222 }
223
224 pub fn builder() -> CamelContextBuilder {
225 CamelContextBuilder::new()
226 }
227
228 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
230 let _ = self.route_controller.set_error_handler(config).await;
231 }
232
233 pub async fn set_tracing(&mut self, enabled: bool) {
235 let _ = self
236 .route_controller
237 .set_tracer_config(TracerConfig {
238 enabled,
239 ..Default::default()
240 })
241 .await;
242 }
243
244 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
246 let config = if config.metrics_collector.is_none() {
248 TracerConfig {
249 metrics_collector: Some(Arc::clone(&self.metrics)),
250 ..config
251 }
252 } else {
253 config
254 };
255
256 let _ = self.route_controller.set_tracer_config(config).await;
257 }
258
259 pub async fn with_tracing(mut self) -> Self {
261 self.set_tracing(true).await;
262 self
263 }
264
265 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
269 self.set_tracer_config(config).await;
270 self
271 }
272
273 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
275 if let Some(collector) = service.as_metrics_collector() {
277 self.metrics = collector;
278 }
279
280 self.services.push(Box::new(service));
281 self
282 }
283
284 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
286 info!(scheme = component.scheme(), "Registering component");
287 self.registry
288 .lock()
289 .expect("mutex poisoned: another thread panicked while holding this lock")
290 .register(Arc::new(component));
291 }
292
293 pub fn register_language(
300 &mut self,
301 name: impl Into<String>,
302 lang: Box<dyn Language>,
303 ) -> Result<(), LanguageRegistryError> {
304 let name = name.into();
305 let mut languages = self
306 .languages
307 .lock()
308 .expect("mutex poisoned: another thread panicked while holding this lock");
309 if languages.contains_key(&name) {
310 return Err(LanguageRegistryError::AlreadyRegistered { name });
311 }
312 languages.insert(name, Arc::from(lang));
313 Ok(())
314 }
315
316 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
318 let languages = self
319 .languages
320 .lock()
321 .expect("mutex poisoned: another thread panicked while holding this lock");
322 languages.get(name).cloned()
323 }
324
325 pub async fn add_route_definition(
329 &self,
330 definition: RouteDefinition,
331 ) -> Result<(), CamelError> {
332 use crate::lifecycle::ports::RouteRegistrationPort;
333 info!(
334 from = definition.from_uri(),
335 route_id = %definition.route_id(),
336 "Adding route definition"
337 );
338 self.runtime.register_route(definition).await
339 }
340
341 fn next_context_command_id(op: &str, route_id: &str) -> String {
342 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
343 format!("context:{op}:{route_id}:{seq}")
344 }
345
346 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
348 self.registry
349 .lock()
350 .expect("mutex poisoned: another thread panicked while holding this lock")
351 }
352
353 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
355 RuntimeExecutionHandle {
356 controller: self.route_controller.clone(),
357 runtime: Arc::clone(&self.runtime),
358 }
359 }
360
361 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
363 Arc::clone(&self.metrics)
364 }
365
366 pub fn leader_elector(&self) -> Arc<dyn LeaderElector> {
368 Arc::clone(&self.leader_elector)
369 }
370
371 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
373 Arc::clone(&self.readiness_gate)
374 }
375
376 pub fn platform_identity(&self) -> &PlatformIdentity {
378 &self.platform_identity
379 }
380
381 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
383 self.runtime.clone()
384 }
385
386 pub fn producer_context(&self) -> camel_api::ProducerContext {
388 camel_api::ProducerContext::new().with_runtime(self.runtime())
389 }
390
391 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
393 match self
394 .runtime()
395 .ask(camel_api::RuntimeQuery::GetRouteStatus {
396 route_id: route_id.to_string(),
397 })
398 .await
399 {
400 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
401 Ok(_) => Err(CamelError::RouteError(
402 "unexpected runtime query response for route status".to_string(),
403 )),
404 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
405 Err(err) => Err(err),
406 }
407 }
408
409 pub async fn start(&mut self) -> Result<(), CamelError> {
414 info!("Starting CamelContext");
415
416 for (i, service) in self.services.iter_mut().enumerate() {
418 info!("Starting service: {}", service.name());
419 if let Err(e) = service.start().await {
420 warn!(
422 "Service {} failed to start, rolling back {} services",
423 service.name(),
424 i
425 );
426 for j in (0..i).rev() {
427 if let Err(rollback_err) = self.services[j].stop().await {
428 warn!(
429 "Failed to stop service {} during rollback: {}",
430 self.services[j].name(),
431 rollback_err
432 );
433 }
434 }
435 return Err(e);
436 }
437 }
438
439 let route_ids = self.route_controller.auto_startup_route_ids().await?;
442 for route_id in route_ids {
443 self.runtime
444 .execute(camel_api::RuntimeCommand::StartRoute {
445 route_id: route_id.clone(),
446 command_id: Self::next_context_command_id("start", &route_id),
447 causation_id: None,
448 })
449 .await?;
450 }
451
452 info!("CamelContext started");
453 Ok(())
454 }
455
456 pub async fn stop(&mut self) -> Result<(), CamelError> {
458 self.stop_timeout(self.shutdown_timeout).await
459 }
460
461 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
466 info!("Stopping CamelContext");
467
468 self.cancel_token.cancel();
470 if let Some(join) = self.supervision_join.take() {
471 join.abort();
472 }
473
474 let route_ids = self.route_controller.shutdown_route_ids().await?;
477 for route_id in route_ids {
478 if let Err(err) = self
479 .runtime
480 .execute(camel_api::RuntimeCommand::StopRoute {
481 route_id: route_id.clone(),
482 command_id: Self::next_context_command_id("stop", &route_id),
483 causation_id: None,
484 })
485 .await
486 {
487 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
488 }
489 }
490
491 let mut first_error = None;
494 for service in self.services.iter_mut().rev() {
495 info!("Stopping service: {}", service.name());
496 if let Err(e) = service.stop().await {
497 warn!("Service {} failed to stop: {}", service.name(), e);
498 if first_error.is_none() {
499 first_error = Some(e);
500 }
501 }
502 }
503
504 info!("CamelContext stopped");
505
506 if let Some(e) = first_error {
507 Err(e)
508 } else {
509 Ok(())
510 }
511 }
512
513 pub fn shutdown_timeout(&self) -> std::time::Duration {
515 self.shutdown_timeout
516 }
517
518 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
520 self.shutdown_timeout = timeout;
521 }
522
523 pub async fn abort(&mut self) {
525 self.cancel_token.cancel();
526 if let Some(join) = self.supervision_join.take() {
527 join.abort();
528 }
529 let route_ids = self
530 .route_controller
531 .shutdown_route_ids()
532 .await
533 .unwrap_or_default();
534 for route_id in route_ids {
535 let _ = self
536 .runtime
537 .execute(camel_api::RuntimeCommand::StopRoute {
538 route_id: route_id.clone(),
539 command_id: Self::next_context_command_id("abort-stop", &route_id),
540 causation_id: None,
541 })
542 .await;
543 }
544 }
545
546 pub fn health_check(&self) -> HealthReport {
548 let services: Vec<ServiceHealth> = self
549 .services
550 .iter()
551 .map(|s| ServiceHealth {
552 name: s.name().to_string(),
553 status: s.status(),
554 })
555 .collect();
556
557 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
558 HealthStatus::Healthy
559 } else {
560 HealthStatus::Unhealthy
561 };
562
563 HealthReport {
564 status,
565 services,
566 ..Default::default()
567 }
568 }
569
570 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
572 self.component_configs
573 .insert(TypeId::of::<T>(), Box::new(config));
574 }
575
576 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
578 self.component_configs
579 .get(&TypeId::of::<T>())
580 .and_then(|b| b.downcast_ref::<T>())
581 }
582}
583
584impl ComponentRegistrar for CamelContext {
585 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
586 self.registry
587 .lock()
588 .expect("mutex poisoned: another thread panicked while holding this lock")
589 .register(component);
590 }
591}
592
593impl ComponentContext for CamelContext {
594 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
595 self.registry.lock().ok()?.get(scheme)
596 }
597
598 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
599 self.languages.lock().ok()?.get(name).cloned()
600 }
601
602 fn metrics(&self) -> Arc<dyn MetricsCollector> {
603 Arc::clone(&self.metrics)
604 }
605
606 fn leader_elector(&self) -> Arc<dyn LeaderElector> {
607 Arc::clone(&self.leader_elector)
608 }
609}
610
611impl CamelContextBuilder {
612 pub fn new() -> Self {
613 Self {
614 registry: None,
615 languages: None,
616 metrics: None,
617 leader_elector: None,
618 readiness_gate: None,
619 platform_identity: None,
620 supervision_config: None,
621 runtime_store: None,
622 shutdown_timeout: std::time::Duration::from_secs(30),
623 }
624 }
625
626 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
627 self.registry = Some(registry);
628 self
629 }
630
631 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
632 self.languages = Some(languages);
633 self
634 }
635
636 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
637 self.metrics = Some(metrics);
638 self
639 }
640
641 pub fn leader_elector(mut self, elector: Arc<dyn LeaderElector>) -> Self {
643 self.leader_elector = Some(elector);
644 self
645 }
646
647 pub fn readiness_gate(mut self, gate: Arc<dyn ReadinessGate>) -> Self {
649 self.readiness_gate = Some(gate);
650 self
651 }
652
653 pub fn platform_identity(mut self, identity: PlatformIdentity) -> Self {
655 self.platform_identity = Some(identity);
656 self
657 }
658
659 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
660 self.supervision_config = Some(config);
661 self
662 }
663
664 pub fn runtime_store(
665 mut self,
666 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
667 ) -> Self {
668 self.runtime_store = Some(store);
669 self
670 }
671
672 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
673 self.shutdown_timeout = timeout;
674 self
675 }
676
677 pub async fn build(self) -> Result<CamelContext, CamelError> {
678 let registry = self
679 .registry
680 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
681 let languages = self
682 .languages
683 .unwrap_or_else(CamelContext::built_in_languages);
684 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
685 let leader_elector = self
686 .leader_elector
687 .unwrap_or_else(|| Arc::new(NoopLeaderElector));
688 let readiness_gate = self
689 .readiness_gate
690 .unwrap_or_else(|| Arc::new(NoopReadinessGate));
691 let platform_identity = self
692 .platform_identity
693 .unwrap_or_else(|| PlatformIdentity::local("local"));
694
695 let (controller, actor_join, supervision_join) =
696 if let Some(config) = self.supervision_config {
697 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
698 let mut controller_impl = DefaultRouteController::with_languages(
699 Arc::clone(®istry),
700 Arc::clone(&languages),
701 Arc::clone(&leader_elector),
702 );
703 controller_impl.set_crash_notifier(crash_tx);
704 let (controller, actor_join) = spawn_controller_actor(controller_impl);
705 let supervision_join = spawn_supervision_task(
706 controller.clone(),
707 config,
708 Some(Arc::clone(&metrics)),
709 crash_rx,
710 );
711 (controller, actor_join, Some(supervision_join))
712 } else {
713 let controller_impl = DefaultRouteController::with_languages(
714 Arc::clone(®istry),
715 Arc::clone(&languages),
716 Arc::clone(&leader_elector),
717 );
718 let (controller, actor_join) = spawn_controller_actor(controller_impl);
719 (controller, actor_join, None)
720 };
721
722 let store = self.runtime_store.unwrap_or_default();
723 let runtime = CamelContext::build_runtime(controller.clone(), store);
724 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
725 controller
726 .try_set_runtime_handle(runtime_handle)
727 .expect("controller actor mailbox should accept initial runtime handle");
728
729 Ok(CamelContext {
730 registry,
731 route_controller: controller,
732 _actor_join: actor_join,
733 supervision_join,
734 runtime,
735 cancel_token: CancellationToken::new(),
736 metrics,
737 leader_elector,
738 readiness_gate,
739 platform_identity,
740 languages,
741 shutdown_timeout: self.shutdown_timeout,
742 services: Vec::new(),
743 component_configs: HashMap::new(),
744 })
745 }
746}
747
748impl Default for CamelContextBuilder {
749 fn default() -> Self {
750 Self::new()
751 }
752}
753
754#[cfg(test)]
755#[path = "context_tests.rs"]
756mod context_tests;