1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::{
10 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
11 NoopPlatformService, PlatformIdentity, PlatformService, ReadinessGate, RuntimeCommandBus,
12 RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
13};
14use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
15use camel_language_api::Language;
16
17use crate::lifecycle::adapters::RuntimeExecutionAdapter;
18use crate::lifecycle::adapters::controller_actor::{
19 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
20};
21use crate::lifecycle::adapters::route_controller::{
22 DefaultRouteController, SharedLanguageRegistry,
23};
24use crate::lifecycle::application::route_definition::RouteDefinition;
25use crate::lifecycle::application::runtime_bus::RuntimeBus;
26use crate::lifecycle::domain::LanguageRegistryError;
27use crate::shared::components::domain::Registry;
28use crate::shared::observability::domain::TracerConfig;
29
30static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
31
32pub struct CamelContextBuilder {
33 registry: Option<Arc<std::sync::Mutex<Registry>>>,
34 languages: Option<SharedLanguageRegistry>,
35 metrics: Option<Arc<dyn MetricsCollector>>,
36 platform_service: Option<Arc<dyn PlatformService>>,
38 supervision_config: Option<SupervisionConfig>,
39 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
40 shutdown_timeout: std::time::Duration,
41}
42
43pub struct CamelContext {
51 registry: Arc<std::sync::Mutex<Registry>>,
52 route_controller: RouteControllerHandle,
53 _actor_join: tokio::task::JoinHandle<()>,
54 supervision_join: Option<tokio::task::JoinHandle<()>>,
55 runtime: Arc<RuntimeBus>,
56 cancel_token: CancellationToken,
57 metrics: Arc<dyn MetricsCollector>,
58 platform_service: Arc<dyn PlatformService>,
60 languages: SharedLanguageRegistry,
61 shutdown_timeout: std::time::Duration,
62 services: Vec<Box<dyn Lifecycle>>,
63 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
64}
65
66#[derive(Clone)]
70pub struct RuntimeExecutionHandle {
71 controller: RouteControllerHandle,
72 runtime: Arc<RuntimeBus>,
73}
74
75impl RuntimeExecutionHandle {
76 pub(crate) async fn add_route_definition(
77 &self,
78 definition: RouteDefinition,
79 ) -> Result<(), CamelError> {
80 use crate::lifecycle::ports::RouteRegistrationPort;
81 self.runtime.register_route(definition).await
82 }
83
84 pub(crate) async fn compile_route_definition(
85 &self,
86 definition: RouteDefinition,
87 ) -> Result<camel_api::BoxProcessor, CamelError> {
88 self.controller.compile_route_definition(definition).await
89 }
90
91 pub(crate) async fn swap_route_pipeline(
92 &self,
93 route_id: &str,
94 pipeline: camel_api::BoxProcessor,
95 ) -> Result<(), CamelError> {
96 self.controller.swap_pipeline(route_id, pipeline).await
97 }
98
99 pub(crate) async fn execute_runtime_command(
100 &self,
101 cmd: camel_api::RuntimeCommand,
102 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
103 self.runtime.execute(cmd).await
104 }
105
106 pub(crate) async fn runtime_route_status(
107 &self,
108 route_id: &str,
109 ) -> Result<Option<String>, CamelError> {
110 match self
111 .runtime
112 .ask(camel_api::RuntimeQuery::GetRouteStatus {
113 route_id: route_id.to_string(),
114 })
115 .await
116 {
117 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
118 Ok(_) => Err(CamelError::RouteError(
119 "unexpected runtime query response for route status".to_string(),
120 )),
121 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
122 Err(err) => Err(err),
123 }
124 }
125
126 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
127 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
128 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
129 Ok(_) => Err(CamelError::RouteError(
130 "unexpected runtime query response for route listing".to_string(),
131 )),
132 Err(err) => Err(err),
133 }
134 }
135
136 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
137 self.controller.route_source_hash(route_id).await
138 }
139
140 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
141 if !self.controller.route_exists(route_id).await? {
142 return Err(CamelError::RouteError(format!(
143 "Route '{}' not found",
144 route_id
145 )));
146 }
147 Ok(self
148 .controller
149 .in_flight_count(route_id)
150 .await?
151 .unwrap_or(0))
152 }
153
154 #[cfg(test)]
155 pub(crate) async fn force_start_route_for_test(
156 &self,
157 route_id: &str,
158 ) -> Result<(), CamelError> {
159 self.controller.start_route(route_id).await
160 }
161
162 #[cfg(test)]
163 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
164 self.controller.route_count().await.unwrap_or(0)
165 }
166}
167
168impl CamelContext {
169 fn built_in_languages() -> SharedLanguageRegistry {
170 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
171 languages.insert(
172 "simple".to_string(),
173 Arc::new(camel_language_simple::SimpleLanguage),
174 );
175 #[cfg(feature = "lang-js")]
176 {
177 let js_lang = camel_language_js::JsLanguage::new();
178 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
179 languages.insert("javascript".to_string(), Arc::new(js_lang));
180 }
181 #[cfg(feature = "lang-rhai")]
182 {
183 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
184 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
185 }
186 #[cfg(feature = "lang-jsonpath")]
187 {
188 languages.insert(
189 "jsonpath".to_string(),
190 Arc::new(camel_language_jsonpath::JsonPathLanguage),
191 );
192 }
193 #[cfg(feature = "lang-xpath")]
194 {
195 languages.insert(
196 "xpath".to_string(),
197 Arc::new(camel_language_xpath::XPathLanguage),
198 );
199 }
200 Arc::new(std::sync::Mutex::new(languages))
201 }
202
203 fn build_runtime(
204 controller: RouteControllerHandle,
205 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
206 ) -> Arc<RuntimeBus> {
207 let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
208 Arc::new(
209 RuntimeBus::new(
210 Arc::new(store.clone()),
211 Arc::new(store.clone()),
212 Arc::new(store.clone()),
213 Arc::new(store.clone()),
214 )
215 .with_uow(Arc::new(store))
216 .with_execution(execution),
217 )
218 }
219
220 pub fn builder() -> CamelContextBuilder {
221 CamelContextBuilder::new()
222 }
223
224 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
226 let _ = self.route_controller.set_error_handler(config).await;
227 }
228
229 pub async fn set_tracing(&mut self, enabled: bool) {
231 let _ = self
232 .route_controller
233 .set_tracer_config(TracerConfig {
234 enabled,
235 ..Default::default()
236 })
237 .await;
238 }
239
240 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
242 let config = if config.metrics_collector.is_none() {
244 TracerConfig {
245 metrics_collector: Some(Arc::clone(&self.metrics)),
246 ..config
247 }
248 } else {
249 config
250 };
251
252 let _ = self.route_controller.set_tracer_config(config).await;
253 }
254
255 pub async fn with_tracing(mut self) -> Self {
257 self.set_tracing(true).await;
258 self
259 }
260
261 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
265 self.set_tracer_config(config).await;
266 self
267 }
268
269 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
271 if let Some(collector) = service.as_metrics_collector() {
273 self.metrics = collector;
274 }
275
276 self.services.push(Box::new(service));
277 self
278 }
279
280 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
282 info!(scheme = component.scheme(), "Registering component");
283 self.registry
284 .lock()
285 .expect("mutex poisoned: another thread panicked while holding this lock")
286 .register(Arc::new(component));
287 }
288
289 pub fn register_language(
296 &mut self,
297 name: impl Into<String>,
298 lang: Box<dyn Language>,
299 ) -> Result<(), LanguageRegistryError> {
300 let name = name.into();
301 let mut languages = self
302 .languages
303 .lock()
304 .expect("mutex poisoned: another thread panicked while holding this lock");
305 if languages.contains_key(&name) {
306 return Err(LanguageRegistryError::AlreadyRegistered { name });
307 }
308 languages.insert(name, Arc::from(lang));
309 Ok(())
310 }
311
312 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
314 let languages = self
315 .languages
316 .lock()
317 .expect("mutex poisoned: another thread panicked while holding this lock");
318 languages.get(name).cloned()
319 }
320
321 pub async fn add_route_definition(
325 &self,
326 definition: RouteDefinition,
327 ) -> Result<(), CamelError> {
328 use crate::lifecycle::ports::RouteRegistrationPort;
329 info!(
330 from = definition.from_uri(),
331 route_id = %definition.route_id(),
332 "Adding route definition"
333 );
334 self.runtime.register_route(definition).await
335 }
336
337 fn next_context_command_id(op: &str, route_id: &str) -> String {
338 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
339 format!("context:{op}:{route_id}:{seq}")
340 }
341
342 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
344 self.registry
345 .lock()
346 .expect("mutex poisoned: another thread panicked while holding this lock")
347 }
348
349 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
351 RuntimeExecutionHandle {
352 controller: self.route_controller.clone(),
353 runtime: Arc::clone(&self.runtime),
354 }
355 }
356
357 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
359 Arc::clone(&self.metrics)
360 }
361
362 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
364 Arc::clone(&self.platform_service)
365 }
366
367 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
369 self.platform_service.readiness_gate()
370 }
371
372 pub fn platform_identity(&self) -> PlatformIdentity {
374 self.platform_service.identity()
375 }
376
377 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
379 self.platform_service.leadership()
380 }
381
382 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
384 self.runtime.clone()
385 }
386
387 pub fn producer_context(&self) -> camel_api::ProducerContext {
389 camel_api::ProducerContext::new().with_runtime(self.runtime())
390 }
391
392 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
394 match self
395 .runtime()
396 .ask(camel_api::RuntimeQuery::GetRouteStatus {
397 route_id: route_id.to_string(),
398 })
399 .await
400 {
401 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
402 Ok(_) => Err(CamelError::RouteError(
403 "unexpected runtime query response for route status".to_string(),
404 )),
405 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
406 Err(err) => Err(err),
407 }
408 }
409
410 pub async fn start(&mut self) -> Result<(), CamelError> {
415 info!("Starting CamelContext");
416
417 for (i, service) in self.services.iter_mut().enumerate() {
419 info!("Starting service: {}", service.name());
420 if let Err(e) = service.start().await {
421 warn!(
423 "Service {} failed to start, rolling back {} services",
424 service.name(),
425 i
426 );
427 for j in (0..i).rev() {
428 if let Err(rollback_err) = self.services[j].stop().await {
429 warn!(
430 "Failed to stop service {} during rollback: {}",
431 self.services[j].name(),
432 rollback_err
433 );
434 }
435 }
436 return Err(e);
437 }
438 }
439
440 let route_ids = self.route_controller.auto_startup_route_ids().await?;
443 for route_id in route_ids {
444 self.runtime
445 .execute(camel_api::RuntimeCommand::StartRoute {
446 route_id: route_id.clone(),
447 command_id: Self::next_context_command_id("start", &route_id),
448 causation_id: None,
449 })
450 .await?;
451 }
452
453 info!("CamelContext started");
454 Ok(())
455 }
456
457 pub async fn stop(&mut self) -> Result<(), CamelError> {
459 self.stop_timeout(self.shutdown_timeout).await
460 }
461
462 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
467 info!("Stopping CamelContext");
468
469 self.cancel_token.cancel();
471 if let Some(join) = self.supervision_join.take() {
472 join.abort();
473 }
474
475 let route_ids = self.route_controller.shutdown_route_ids().await?;
478 for route_id in route_ids {
479 if let Err(err) = self
480 .runtime
481 .execute(camel_api::RuntimeCommand::StopRoute {
482 route_id: route_id.clone(),
483 command_id: Self::next_context_command_id("stop", &route_id),
484 causation_id: None,
485 })
486 .await
487 {
488 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
489 }
490 }
491
492 let mut first_error = None;
495 for service in self.services.iter_mut().rev() {
496 info!("Stopping service: {}", service.name());
497 if let Err(e) = service.stop().await {
498 warn!("Service {} failed to stop: {}", service.name(), e);
499 if first_error.is_none() {
500 first_error = Some(e);
501 }
502 }
503 }
504
505 info!("CamelContext stopped");
506
507 if let Some(e) = first_error {
508 Err(e)
509 } else {
510 Ok(())
511 }
512 }
513
514 pub fn shutdown_timeout(&self) -> std::time::Duration {
516 self.shutdown_timeout
517 }
518
519 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
521 self.shutdown_timeout = timeout;
522 }
523
524 pub async fn abort(&mut self) {
526 self.cancel_token.cancel();
527 if let Some(join) = self.supervision_join.take() {
528 join.abort();
529 }
530 let route_ids = self
531 .route_controller
532 .shutdown_route_ids()
533 .await
534 .unwrap_or_default();
535 for route_id in route_ids {
536 let _ = self
537 .runtime
538 .execute(camel_api::RuntimeCommand::StopRoute {
539 route_id: route_id.clone(),
540 command_id: Self::next_context_command_id("abort-stop", &route_id),
541 causation_id: None,
542 })
543 .await;
544 }
545 }
546
547 pub fn health_check(&self) -> HealthReport {
549 let services: Vec<ServiceHealth> = self
550 .services
551 .iter()
552 .map(|s| ServiceHealth {
553 name: s.name().to_string(),
554 status: s.status(),
555 })
556 .collect();
557
558 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
559 HealthStatus::Healthy
560 } else {
561 HealthStatus::Unhealthy
562 };
563
564 HealthReport {
565 status,
566 services,
567 ..Default::default()
568 }
569 }
570
571 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
573 self.component_configs
574 .insert(TypeId::of::<T>(), Box::new(config));
575 }
576
577 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
579 self.component_configs
580 .get(&TypeId::of::<T>())
581 .and_then(|b| b.downcast_ref::<T>())
582 }
583}
584
585impl ComponentRegistrar for CamelContext {
586 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
587 self.registry
588 .lock()
589 .expect("mutex poisoned: another thread panicked while holding this lock")
590 .register(component);
591 }
592}
593
594impl ComponentContext for CamelContext {
595 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
596 self.registry.lock().ok()?.get(scheme)
597 }
598
599 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
600 self.languages.lock().ok()?.get(name).cloned()
601 }
602
603 fn metrics(&self) -> Arc<dyn MetricsCollector> {
604 Arc::clone(&self.metrics)
605 }
606
607 fn platform_service(&self) -> Arc<dyn PlatformService> {
608 Arc::clone(&self.platform_service)
609 }
610}
611
612impl CamelContextBuilder {
613 pub fn new() -> Self {
614 Self {
615 registry: None,
616 languages: None,
617 metrics: None,
618 platform_service: None,
619 supervision_config: None,
620 runtime_store: None,
621 shutdown_timeout: std::time::Duration::from_secs(30),
622 }
623 }
624
625 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
626 self.registry = Some(registry);
627 self
628 }
629
630 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
631 self.languages = Some(languages);
632 self
633 }
634
635 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
636 self.metrics = Some(metrics);
637 self
638 }
639
640 pub fn platform_service(mut self, platform_service: Arc<dyn PlatformService>) -> Self {
642 self.platform_service = Some(platform_service);
643 self
644 }
645
646 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
647 self.supervision_config = Some(config);
648 self
649 }
650
651 pub fn runtime_store(
652 mut self,
653 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
654 ) -> Self {
655 self.runtime_store = Some(store);
656 self
657 }
658
659 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
660 self.shutdown_timeout = timeout;
661 self
662 }
663
664 pub async fn build(self) -> Result<CamelContext, CamelError> {
665 let registry = self
666 .registry
667 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
668 let languages = self
669 .languages
670 .unwrap_or_else(CamelContext::built_in_languages);
671 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
672 let platform_service = self
673 .platform_service
674 .unwrap_or_else(|| Arc::new(NoopPlatformService::default()));
675
676 let (controller, actor_join, supervision_join) =
677 if let Some(config) = self.supervision_config {
678 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
679 let mut controller_impl = DefaultRouteController::with_languages(
680 Arc::clone(®istry),
681 Arc::clone(&languages),
682 Arc::clone(&platform_service),
683 );
684 controller_impl.set_crash_notifier(crash_tx);
685 let (controller, actor_join) = spawn_controller_actor(controller_impl);
686 let supervision_join = spawn_supervision_task(
687 controller.clone(),
688 config,
689 Some(Arc::clone(&metrics)),
690 crash_rx,
691 );
692 (controller, actor_join, Some(supervision_join))
693 } else {
694 let controller_impl = DefaultRouteController::with_languages(
695 Arc::clone(®istry),
696 Arc::clone(&languages),
697 Arc::clone(&platform_service),
698 );
699 let (controller, actor_join) = spawn_controller_actor(controller_impl);
700 (controller, actor_join, None)
701 };
702
703 let store = self.runtime_store.unwrap_or_default();
704 let runtime = CamelContext::build_runtime(controller.clone(), store);
705 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
706 controller
707 .try_set_runtime_handle(runtime_handle)
708 .expect("controller actor mailbox should accept initial runtime handle");
709
710 Ok(CamelContext {
711 registry,
712 route_controller: controller,
713 _actor_join: actor_join,
714 supervision_join,
715 runtime,
716 cancel_token: CancellationToken::new(),
717 metrics,
718 platform_service,
719 languages,
720 shutdown_timeout: self.shutdown_timeout,
721 services: Vec::new(),
722 component_configs: HashMap::new(),
723 })
724 }
725}
726
727impl Default for CamelContextBuilder {
728 fn default() -> Self {
729 Self::new()
730 }
731}
732
733#[cfg(test)]
734#[path = "context_tests.rs"]
735mod context_tests;