1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
12 NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RuntimeCommandBus,
13 RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::lifecycle::adapters::RuntimeExecutionAdapter;
19use crate::lifecycle::adapters::controller_actor::{
20 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
21};
22use crate::lifecycle::adapters::route_controller::{
23 DefaultRouteController, SharedLanguageRegistry,
24};
25use crate::lifecycle::application::route_definition::RouteDefinition;
26use crate::lifecycle::application::runtime_bus::RuntimeBus;
27use crate::lifecycle::domain::LanguageRegistryError;
28use crate::shared::components::domain::Registry;
29use crate::shared::observability::domain::TracerConfig;
30
31static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
32
33pub struct CamelContextBuilder {
34 registry: Option<Arc<std::sync::Mutex<Registry>>>,
35 languages: Option<SharedLanguageRegistry>,
36 metrics: Option<Arc<dyn MetricsCollector>>,
37 platform_service: Option<Arc<dyn PlatformService>>,
39 supervision_config: Option<SupervisionConfig>,
40 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
41 shutdown_timeout: std::time::Duration,
42}
43
44pub struct CamelContext {
52 registry: Arc<std::sync::Mutex<Registry>>,
53 route_controller: RouteControllerHandle,
54 _actor_join: tokio::task::JoinHandle<()>,
55 supervision_join: Option<tokio::task::JoinHandle<()>>,
56 runtime: Arc<RuntimeBus>,
57 cancel_token: CancellationToken,
58 metrics: Arc<dyn MetricsCollector>,
59 platform_service: Arc<dyn PlatformService>,
61 languages: SharedLanguageRegistry,
62 shutdown_timeout: std::time::Duration,
63 services: Vec<Box<dyn Lifecycle>>,
64 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
65}
66
67#[derive(Clone)]
71pub struct RuntimeExecutionHandle {
72 controller: RouteControllerHandle,
73 runtime: Arc<RuntimeBus>,
74}
75
76impl RuntimeExecutionHandle {
77 pub(crate) async fn add_route_definition(
78 &self,
79 definition: RouteDefinition,
80 ) -> Result<(), CamelError> {
81 use crate::lifecycle::ports::RouteRegistrationPort;
82 self.runtime.register_route(definition).await
83 }
84
85 pub(crate) async fn compile_route_definition(
86 &self,
87 definition: RouteDefinition,
88 ) -> Result<camel_api::BoxProcessor, CamelError> {
89 self.controller.compile_route_definition(definition).await
90 }
91
92 pub(crate) async fn swap_route_pipeline(
93 &self,
94 route_id: &str,
95 pipeline: camel_api::BoxProcessor,
96 ) -> Result<(), CamelError> {
97 self.controller.swap_pipeline(route_id, pipeline).await
98 }
99
100 pub(crate) async fn execute_runtime_command(
101 &self,
102 cmd: camel_api::RuntimeCommand,
103 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
104 self.runtime.execute(cmd).await
105 }
106
107 pub(crate) async fn runtime_route_status(
108 &self,
109 route_id: &str,
110 ) -> Result<Option<String>, CamelError> {
111 match self
112 .runtime
113 .ask(camel_api::RuntimeQuery::GetRouteStatus {
114 route_id: route_id.to_string(),
115 })
116 .await
117 {
118 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
119 Ok(_) => Err(CamelError::RouteError(
120 "unexpected runtime query response for route status".to_string(),
121 )),
122 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
123 Err(err) => Err(err),
124 }
125 }
126
127 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
128 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
129 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
130 Ok(_) => Err(CamelError::RouteError(
131 "unexpected runtime query response for route listing".to_string(),
132 )),
133 Err(err) => Err(err),
134 }
135 }
136
137 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
138 self.controller.route_source_hash(route_id).await
139 }
140
141 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
142 if !self.controller.route_exists(route_id).await? {
143 return Err(CamelError::RouteError(format!(
144 "Route '{}' not found",
145 route_id
146 )));
147 }
148 Ok(self
149 .controller
150 .in_flight_count(route_id)
151 .await?
152 .unwrap_or(0))
153 }
154
155 #[cfg(test)]
156 pub(crate) async fn force_start_route_for_test(
157 &self,
158 route_id: &str,
159 ) -> Result<(), CamelError> {
160 self.controller.start_route(route_id).await
161 }
162
163 #[cfg(test)]
164 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
165 self.controller.route_count().await.unwrap_or(0)
166 }
167}
168
169impl CamelContext {
170 fn built_in_languages() -> SharedLanguageRegistry {
171 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
172 languages.insert(
173 "simple".to_string(),
174 Arc::new(camel_language_simple::SimpleLanguage::new()),
175 );
176 #[cfg(feature = "lang-js")]
177 {
178 let js_lang = camel_language_js::JsLanguage::new();
179 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
180 languages.insert("javascript".to_string(), Arc::new(js_lang));
181 }
182 #[cfg(feature = "lang-rhai")]
183 {
184 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
185 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
186 }
187 #[cfg(feature = "lang-jsonpath")]
188 {
189 languages.insert(
190 "jsonpath".to_string(),
191 Arc::new(camel_language_jsonpath::JsonPathLanguage),
192 );
193 }
194 #[cfg(feature = "lang-xpath")]
195 {
196 languages.insert(
197 "xpath".to_string(),
198 Arc::new(camel_language_xpath::XPathLanguage),
199 );
200 }
201 Arc::new(std::sync::Mutex::new(languages))
202 }
203
204 fn build_runtime(
205 controller: RouteControllerHandle,
206 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
207 ) -> Arc<RuntimeBus> {
208 let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
209 Arc::new(
210 RuntimeBus::new(
211 Arc::new(store.clone()),
212 Arc::new(store.clone()),
213 Arc::new(store.clone()),
214 Arc::new(store.clone()),
215 )
216 .with_uow(Arc::new(store))
217 .with_execution(execution),
218 )
219 }
220
221 pub fn builder() -> CamelContextBuilder {
222 CamelContextBuilder::new()
223 }
224
225 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
227 let _ = self.route_controller.set_error_handler(config).await;
228 }
229
230 pub async fn set_tracing(&mut self, enabled: bool) {
232 let _ = self
233 .route_controller
234 .set_tracer_config(TracerConfig {
235 enabled,
236 ..Default::default()
237 })
238 .await;
239 }
240
241 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
243 let config = if config.metrics_collector.is_none() {
245 TracerConfig {
246 metrics_collector: Some(Arc::clone(&self.metrics)),
247 ..config
248 }
249 } else {
250 config
251 };
252
253 let _ = self.route_controller.set_tracer_config(config).await;
254 }
255
256 pub async fn with_tracing(mut self) -> Self {
258 self.set_tracing(true).await;
259 self
260 }
261
262 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
266 self.set_tracer_config(config).await;
267 self
268 }
269
270 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
272 if let Some(collector) = service.as_metrics_collector() {
274 self.metrics = collector;
275 }
276
277 self.services.push(Box::new(service));
278 self
279 }
280
281 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
283 info!(scheme = component.scheme(), "Registering component");
284 self.registry
285 .lock()
286 .expect("mutex poisoned: another thread panicked while holding this lock")
287 .register(Arc::new(component));
288 }
289
290 pub fn register_language(
297 &mut self,
298 name: impl Into<String>,
299 lang: Box<dyn Language>,
300 ) -> Result<(), LanguageRegistryError> {
301 let name = name.into();
302 let mut languages = self
303 .languages
304 .lock()
305 .expect("mutex poisoned: another thread panicked while holding this lock");
306 if languages.contains_key(&name) {
307 return Err(LanguageRegistryError::AlreadyRegistered { name });
308 }
309 languages.insert(name, Arc::from(lang));
310 Ok(())
311 }
312
313 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
315 let languages = self
316 .languages
317 .lock()
318 .expect("mutex poisoned: another thread panicked while holding this lock");
319 languages.get(name).cloned()
320 }
321
322 pub async fn add_route_definition(
326 &self,
327 definition: RouteDefinition,
328 ) -> Result<(), CamelError> {
329 use crate::lifecycle::ports::RouteRegistrationPort;
330 info!(
331 from = definition.from_uri(),
332 route_id = %definition.route_id(),
333 "Adding route definition"
334 );
335 self.runtime.register_route(definition).await
336 }
337
338 fn next_context_command_id(op: &str, route_id: &str) -> String {
339 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
340 format!("context:{op}:{route_id}:{seq}")
341 }
342
343 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
345 self.registry
346 .lock()
347 .expect("mutex poisoned: another thread panicked while holding this lock")
348 }
349
350 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
352 Arc::clone(&self.registry)
353 }
354
355 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
357 RuntimeExecutionHandle {
358 controller: self.route_controller.clone(),
359 runtime: Arc::clone(&self.runtime),
360 }
361 }
362
363 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
365 Arc::clone(&self.metrics)
366 }
367
368 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
370 Arc::clone(&self.platform_service)
371 }
372
373 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
375 self.platform_service.readiness_gate()
376 }
377
378 pub fn platform_identity(&self) -> PlatformIdentity {
380 self.platform_service.identity()
381 }
382
383 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
385 self.platform_service.leadership()
386 }
387
388 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
390 self.runtime.clone()
391 }
392
393 pub fn producer_context(&self) -> camel_api::ProducerContext {
395 camel_api::ProducerContext::new().with_runtime(self.runtime())
396 }
397
398 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
400 match self
401 .runtime()
402 .ask(camel_api::RuntimeQuery::GetRouteStatus {
403 route_id: route_id.to_string(),
404 })
405 .await
406 {
407 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
408 Ok(_) => Err(CamelError::RouteError(
409 "unexpected runtime query response for route status".to_string(),
410 )),
411 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
412 Err(err) => Err(err),
413 }
414 }
415
416 pub async fn start(&mut self) -> Result<(), CamelError> {
421 info!("Starting CamelContext");
422
423 for (i, service) in self.services.iter_mut().enumerate() {
425 info!("Starting service: {}", service.name());
426 if let Err(e) = service.start().await {
427 warn!(
429 "Service {} failed to start, rolling back {} services",
430 service.name(),
431 i
432 );
433 for j in (0..i).rev() {
434 if let Err(rollback_err) = self.services[j].stop().await {
435 warn!(
436 "Failed to stop service {} during rollback: {}",
437 self.services[j].name(),
438 rollback_err
439 );
440 }
441 }
442 return Err(e);
443 }
444 }
445
446 let route_ids = self.route_controller.auto_startup_route_ids().await?;
449 for route_id in route_ids {
450 self.runtime
451 .execute(camel_api::RuntimeCommand::StartRoute {
452 route_id: route_id.clone(),
453 command_id: Self::next_context_command_id("start", &route_id),
454 causation_id: None,
455 })
456 .await?;
457 }
458
459 info!("CamelContext started");
460 Ok(())
461 }
462
463 pub async fn stop(&mut self) -> Result<(), CamelError> {
465 self.stop_timeout(self.shutdown_timeout).await
466 }
467
468 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
473 info!("Stopping CamelContext");
474
475 self.cancel_token.cancel();
477 if let Some(join) = self.supervision_join.take() {
478 join.abort();
479 }
480
481 let route_ids = self.route_controller.shutdown_route_ids().await?;
484 for route_id in route_ids {
485 if let Err(err) = self
486 .runtime
487 .execute(camel_api::RuntimeCommand::StopRoute {
488 route_id: route_id.clone(),
489 command_id: Self::next_context_command_id("stop", &route_id),
490 causation_id: None,
491 })
492 .await
493 {
494 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
495 }
496 }
497
498 let mut first_error = None;
501 for service in self.services.iter_mut().rev() {
502 info!("Stopping service: {}", service.name());
503 if let Err(e) = service.stop().await {
504 warn!("Service {} failed to stop: {}", service.name(), e);
505 if first_error.is_none() {
506 first_error = Some(e);
507 }
508 }
509 }
510
511 info!("CamelContext stopped");
512
513 if let Some(e) = first_error {
514 Err(e)
515 } else {
516 Ok(())
517 }
518 }
519
520 pub fn shutdown_timeout(&self) -> std::time::Duration {
522 self.shutdown_timeout
523 }
524
525 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
527 self.shutdown_timeout = timeout;
528 }
529
530 pub async fn abort(&mut self) {
532 self.cancel_token.cancel();
533 if let Some(join) = self.supervision_join.take() {
534 join.abort();
535 }
536 let route_ids = self
537 .route_controller
538 .shutdown_route_ids()
539 .await
540 .unwrap_or_default();
541 for route_id in route_ids {
542 let _ = self
543 .runtime
544 .execute(camel_api::RuntimeCommand::StopRoute {
545 route_id: route_id.clone(),
546 command_id: Self::next_context_command_id("abort-stop", &route_id),
547 causation_id: None,
548 })
549 .await;
550 }
551
552 for service in self.services.iter_mut().rev() {
553 let name = service.name().to_string();
554 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
555 Ok(Ok(())) => info!("Aborted service: {}", name),
556 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
557 Err(_) => warn!("Service {} timed out during abort (5s)", name),
558 }
559 }
560 }
561
562 pub fn health_check(&self) -> HealthReport {
564 let services: Vec<ServiceHealth> = self
565 .services
566 .iter()
567 .map(|s| ServiceHealth {
568 name: s.name().to_string(),
569 status: s.status(),
570 })
571 .collect();
572
573 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
574 HealthStatus::Healthy
575 } else {
576 HealthStatus::Unhealthy
577 };
578
579 HealthReport {
580 status,
581 services,
582 ..Default::default()
583 }
584 }
585
586 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
588 self.component_configs
589 .insert(TypeId::of::<T>(), Box::new(config));
590 }
591
592 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
594 self.component_configs
595 .get(&TypeId::of::<T>())
596 .and_then(|b| b.downcast_ref::<T>())
597 }
598}
599
600impl ComponentRegistrar for CamelContext {
601 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
602 self.registry
603 .lock()
604 .expect("mutex poisoned: another thread panicked while holding this lock")
605 .register(component);
606 }
607}
608
609impl ComponentContext for CamelContext {
610 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
611 self.registry.lock().ok()?.get(scheme)
612 }
613
614 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
615 self.languages.lock().ok()?.get(name).cloned()
616 }
617
618 fn metrics(&self) -> Arc<dyn MetricsCollector> {
619 Arc::clone(&self.metrics)
620 }
621
622 fn platform_service(&self) -> Arc<dyn PlatformService> {
623 Arc::clone(&self.platform_service)
624 }
625}
626
627impl CamelContextBuilder {
628 pub fn new() -> Self {
629 Self {
630 registry: None,
631 languages: None,
632 metrics: None,
633 platform_service: None,
634 supervision_config: None,
635 runtime_store: None,
636 shutdown_timeout: std::time::Duration::from_secs(30),
637 }
638 }
639
640 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
641 self.registry = Some(registry);
642 self
643 }
644
645 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
646 self.languages = Some(languages);
647 self
648 }
649
650 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
651 self.metrics = Some(metrics);
652 self
653 }
654
655 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
657 self.platform_service = Some(platform_service);
658 self
659 }
660
661 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
662 self.supervision_config = Some(config);
663 self
664 }
665
666 pub fn runtime_store(
667 mut self,
668 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
669 ) -> Self {
670 self.runtime_store = Some(store);
671 self
672 }
673
674 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
675 self.shutdown_timeout = timeout;
676 self
677 }
678
679 pub async fn build(self) -> Result<CamelContext, CamelError> {
680 let registry = self
681 .registry
682 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
683 let languages = self
684 .languages
685 .unwrap_or_else(CamelContext::built_in_languages);
686 let simple_with_resolver: Arc<dyn Language> = Arc::new(
687 camel_language_simple::SimpleLanguage::with_resolver(Arc::new({
688 let languages = Arc::clone(&languages);
689 move |name| {
690 languages
691 .lock()
692 .ok()
693 .and_then(|registry| registry.get(name).cloned())
694 }
695 })),
696 );
697 languages
698 .lock()
699 .expect("mutex poisoned: another thread panicked while holding this lock")
700 .insert("simple".to_string(), simple_with_resolver);
701 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
702 let platform_service = self
703 .platform_service
704 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
705
706 let (controller, actor_join, supervision_join) =
707 if let Some(config) = self.supervision_config {
708 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
709 let mut controller_impl = DefaultRouteController::with_languages(
710 Arc::clone(®istry),
711 Arc::clone(&languages),
712 Arc::clone(&platform_service),
713 );
714 controller_impl.set_crash_notifier(crash_tx);
715 let (controller, actor_join) = spawn_controller_actor(controller_impl);
716 let supervision_join = spawn_supervision_task(
717 controller.clone(),
718 config,
719 Some(Arc::clone(&metrics)),
720 crash_rx,
721 );
722 (controller, actor_join, Some(supervision_join))
723 } else {
724 let controller_impl = DefaultRouteController::with_languages(
725 Arc::clone(®istry),
726 Arc::clone(&languages),
727 Arc::clone(&platform_service),
728 );
729 let (controller, actor_join) = spawn_controller_actor(controller_impl);
730 (controller, actor_join, None)
731 };
732
733 let store = self.runtime_store.unwrap_or_default();
734 let runtime = CamelContext::build_runtime(controller.clone(), store);
735 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
736 controller
737 .try_set_runtime_handle(runtime_handle)
738 .expect("controller actor mailbox should accept initial runtime handle");
739
740 Ok(CamelContext {
741 registry,
742 route_controller: controller,
743 _actor_join: actor_join,
744 supervision_join,
745 runtime,
746 cancel_token: CancellationToken::new(),
747 metrics,
748 platform_service,
749 languages,
750 shutdown_timeout: self.shutdown_timeout,
751 services: Vec::new(),
752 component_configs: HashMap::new(),
753 })
754 }
755}
756
757impl Default for CamelContextBuilder {
758 fn default() -> Self {
759 Self::new()
760 }
761}
762
763#[cfg(test)]
764#[path = "context_tests.rs"]
765mod context_tests;