1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio_util::sync::CancellationToken;
6use tracing::{info, warn};
7
8use camel_api::error_handler::ErrorHandlerConfig;
9use camel_api::{
10 CamelError, HealthReport, HealthStatus, Lifecycle, MetricsCollector, NoOpMetrics,
11 RuntimeCommandBus, RuntimeQueryBus, ServiceHealth, ServiceStatus, SupervisionConfig,
12};
13use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
14use camel_language_api::Language;
15
16use crate::lifecycle::adapters::RuntimeExecutionAdapter;
17use crate::lifecycle::adapters::controller_actor::{
18 RouteControllerHandle, spawn_controller_actor, spawn_supervision_task,
19};
20use crate::lifecycle::adapters::route_controller::{
21 DefaultRouteController, SharedLanguageRegistry,
22};
23use crate::lifecycle::application::route_definition::RouteDefinition;
24use crate::lifecycle::application::runtime_bus::RuntimeBus;
25use crate::lifecycle::domain::LanguageRegistryError;
26use crate::shared::components::domain::Registry;
27use crate::shared::observability::domain::TracerConfig;
28
29static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
30
31pub struct CamelContextBuilder {
32 registry: Option<Arc<std::sync::Mutex<Registry>>>,
33 languages: Option<SharedLanguageRegistry>,
34 metrics: Option<Arc<dyn MetricsCollector>>,
35 supervision_config: Option<SupervisionConfig>,
36 runtime_store: Option<crate::lifecycle::adapters::InMemoryRuntimeStore>,
37 shutdown_timeout: std::time::Duration,
38}
39
40pub struct CamelContext {
48 registry: Arc<std::sync::Mutex<Registry>>,
49 route_controller: RouteControllerHandle,
50 _actor_join: tokio::task::JoinHandle<()>,
51 supervision_join: Option<tokio::task::JoinHandle<()>>,
52 runtime: Arc<RuntimeBus>,
53 cancel_token: CancellationToken,
54 metrics: Arc<dyn MetricsCollector>,
55 languages: SharedLanguageRegistry,
56 shutdown_timeout: std::time::Duration,
57 services: Vec<Box<dyn Lifecycle>>,
58 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
59}
60
61#[derive(Clone)]
65pub struct RuntimeExecutionHandle {
66 controller: RouteControllerHandle,
67 runtime: Arc<RuntimeBus>,
68}
69
70impl RuntimeExecutionHandle {
71 pub(crate) async fn add_route_definition(
72 &self,
73 definition: RouteDefinition,
74 ) -> Result<(), CamelError> {
75 use crate::lifecycle::ports::RouteRegistrationPort;
76 self.runtime.register_route(definition).await
77 }
78
79 pub(crate) async fn compile_route_definition(
80 &self,
81 definition: RouteDefinition,
82 ) -> Result<camel_api::BoxProcessor, CamelError> {
83 self.controller.compile_route_definition(definition).await
84 }
85
86 pub(crate) async fn swap_route_pipeline(
87 &self,
88 route_id: &str,
89 pipeline: camel_api::BoxProcessor,
90 ) -> Result<(), CamelError> {
91 self.controller.swap_pipeline(route_id, pipeline).await
92 }
93
94 pub(crate) async fn execute_runtime_command(
95 &self,
96 cmd: camel_api::RuntimeCommand,
97 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
98 self.runtime.execute(cmd).await
99 }
100
101 pub(crate) async fn runtime_route_status(
102 &self,
103 route_id: &str,
104 ) -> Result<Option<String>, CamelError> {
105 match self
106 .runtime
107 .ask(camel_api::RuntimeQuery::GetRouteStatus {
108 route_id: route_id.to_string(),
109 })
110 .await
111 {
112 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
113 Ok(_) => Err(CamelError::RouteError(
114 "unexpected runtime query response for route status".to_string(),
115 )),
116 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
117 Err(err) => Err(err),
118 }
119 }
120
121 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
122 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
123 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
124 Ok(_) => Err(CamelError::RouteError(
125 "unexpected runtime query response for route listing".to_string(),
126 )),
127 Err(err) => Err(err),
128 }
129 }
130
131 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
132 self.controller.route_source_hash(route_id).await
133 }
134
135 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
136 if !self.controller.route_exists(route_id).await? {
137 return Err(CamelError::RouteError(format!(
138 "Route '{}' not found",
139 route_id
140 )));
141 }
142 Ok(self
143 .controller
144 .in_flight_count(route_id)
145 .await?
146 .unwrap_or(0))
147 }
148
149 #[cfg(test)]
150 pub(crate) async fn force_start_route_for_test(
151 &self,
152 route_id: &str,
153 ) -> Result<(), CamelError> {
154 self.controller.start_route(route_id).await
155 }
156
157 #[cfg(test)]
158 pub(crate) async fn controller_route_count_for_test(&self) -> usize {
159 self.controller.route_count().await.unwrap_or(0)
160 }
161}
162
163impl CamelContext {
164 fn built_in_languages() -> SharedLanguageRegistry {
165 let mut languages: HashMap<String, Arc<dyn Language>> = HashMap::new();
166 languages.insert(
167 "simple".to_string(),
168 Arc::new(camel_language_simple::SimpleLanguage),
169 );
170 #[cfg(feature = "lang-js")]
171 {
172 let js_lang = camel_language_js::JsLanguage::new();
173 languages.insert("js".to_string(), Arc::new(js_lang.clone()));
174 languages.insert("javascript".to_string(), Arc::new(js_lang));
175 }
176 #[cfg(feature = "lang-rhai")]
177 {
178 let rhai_lang = camel_language_rhai::RhaiLanguage::new();
179 languages.insert("rhai".to_string(), Arc::new(rhai_lang));
180 }
181 #[cfg(feature = "lang-jsonpath")]
182 {
183 languages.insert(
184 "jsonpath".to_string(),
185 Arc::new(camel_language_jsonpath::JsonPathLanguage),
186 );
187 }
188 #[cfg(feature = "lang-xpath")]
189 {
190 languages.insert(
191 "xpath".to_string(),
192 Arc::new(camel_language_xpath::XPathLanguage),
193 );
194 }
195 Arc::new(std::sync::Mutex::new(languages))
196 }
197
198 fn build_runtime(
199 controller: RouteControllerHandle,
200 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
201 ) -> Arc<RuntimeBus> {
202 let execution = Arc::new(RuntimeExecutionAdapter::new(controller));
203 Arc::new(
204 RuntimeBus::new(
205 Arc::new(store.clone()),
206 Arc::new(store.clone()),
207 Arc::new(store.clone()),
208 Arc::new(store.clone()),
209 )
210 .with_uow(Arc::new(store))
211 .with_execution(execution),
212 )
213 }
214
215 pub fn builder() -> CamelContextBuilder {
216 CamelContextBuilder::new()
217 }
218
219 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
221 let _ = self.route_controller.set_error_handler(config).await;
222 }
223
224 pub async fn set_tracing(&mut self, enabled: bool) {
226 let _ = self
227 .route_controller
228 .set_tracer_config(TracerConfig {
229 enabled,
230 ..Default::default()
231 })
232 .await;
233 }
234
235 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
237 let config = if config.metrics_collector.is_none() {
239 TracerConfig {
240 metrics_collector: Some(Arc::clone(&self.metrics)),
241 ..config
242 }
243 } else {
244 config
245 };
246
247 let _ = self.route_controller.set_tracer_config(config).await;
248 }
249
250 pub async fn with_tracing(mut self) -> Self {
252 self.set_tracing(true).await;
253 self
254 }
255
256 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
260 self.set_tracer_config(config).await;
261 self
262 }
263
264 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
266 if let Some(collector) = service.as_metrics_collector() {
268 self.metrics = collector;
269 }
270
271 self.services.push(Box::new(service));
272 self
273 }
274
275 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
277 info!(scheme = component.scheme(), "Registering component");
278 self.registry
279 .lock()
280 .expect("mutex poisoned: another thread panicked while holding this lock")
281 .register(Arc::new(component));
282 }
283
284 pub fn register_language(
291 &mut self,
292 name: impl Into<String>,
293 lang: Box<dyn Language>,
294 ) -> Result<(), LanguageRegistryError> {
295 let name = name.into();
296 let mut languages = self
297 .languages
298 .lock()
299 .expect("mutex poisoned: another thread panicked while holding this lock");
300 if languages.contains_key(&name) {
301 return Err(LanguageRegistryError::AlreadyRegistered { name });
302 }
303 languages.insert(name, Arc::from(lang));
304 Ok(())
305 }
306
307 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
309 let languages = self
310 .languages
311 .lock()
312 .expect("mutex poisoned: another thread panicked while holding this lock");
313 languages.get(name).cloned()
314 }
315
316 pub async fn add_route_definition(
320 &self,
321 definition: RouteDefinition,
322 ) -> Result<(), CamelError> {
323 use crate::lifecycle::ports::RouteRegistrationPort;
324 info!(
325 from = definition.from_uri(),
326 route_id = %definition.route_id(),
327 "Adding route definition"
328 );
329 self.runtime.register_route(definition).await
330 }
331
332 fn next_context_command_id(op: &str, route_id: &str) -> String {
333 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
334 format!("context:{op}:{route_id}:{seq}")
335 }
336
337 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
339 self.registry
340 .lock()
341 .expect("mutex poisoned: another thread panicked while holding this lock")
342 }
343
344 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
346 RuntimeExecutionHandle {
347 controller: self.route_controller.clone(),
348 runtime: Arc::clone(&self.runtime),
349 }
350 }
351
352 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
354 Arc::clone(&self.metrics)
355 }
356
357 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
359 self.runtime.clone()
360 }
361
362 pub fn producer_context(&self) -> camel_api::ProducerContext {
364 camel_api::ProducerContext::new().with_runtime(self.runtime())
365 }
366
367 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
369 match self
370 .runtime()
371 .ask(camel_api::RuntimeQuery::GetRouteStatus {
372 route_id: route_id.to_string(),
373 })
374 .await
375 {
376 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
377 Ok(_) => Err(CamelError::RouteError(
378 "unexpected runtime query response for route status".to_string(),
379 )),
380 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
381 Err(err) => Err(err),
382 }
383 }
384
385 pub async fn start(&mut self) -> Result<(), CamelError> {
390 info!("Starting CamelContext");
391
392 for (i, service) in self.services.iter_mut().enumerate() {
394 info!("Starting service: {}", service.name());
395 if let Err(e) = service.start().await {
396 warn!(
398 "Service {} failed to start, rolling back {} services",
399 service.name(),
400 i
401 );
402 for j in (0..i).rev() {
403 if let Err(rollback_err) = self.services[j].stop().await {
404 warn!(
405 "Failed to stop service {} during rollback: {}",
406 self.services[j].name(),
407 rollback_err
408 );
409 }
410 }
411 return Err(e);
412 }
413 }
414
415 let route_ids = self.route_controller.auto_startup_route_ids().await?;
418 for route_id in route_ids {
419 self.runtime
420 .execute(camel_api::RuntimeCommand::StartRoute {
421 route_id: route_id.clone(),
422 command_id: Self::next_context_command_id("start", &route_id),
423 causation_id: None,
424 })
425 .await?;
426 }
427
428 info!("CamelContext started");
429 Ok(())
430 }
431
432 pub async fn stop(&mut self) -> Result<(), CamelError> {
434 self.stop_timeout(self.shutdown_timeout).await
435 }
436
437 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
442 info!("Stopping CamelContext");
443
444 self.cancel_token.cancel();
446 if let Some(join) = self.supervision_join.take() {
447 join.abort();
448 }
449
450 let route_ids = self.route_controller.shutdown_route_ids().await?;
453 for route_id in route_ids {
454 if let Err(err) = self
455 .runtime
456 .execute(camel_api::RuntimeCommand::StopRoute {
457 route_id: route_id.clone(),
458 command_id: Self::next_context_command_id("stop", &route_id),
459 causation_id: None,
460 })
461 .await
462 {
463 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
464 }
465 }
466
467 let mut first_error = None;
470 for service in self.services.iter_mut().rev() {
471 info!("Stopping service: {}", service.name());
472 if let Err(e) = service.stop().await {
473 warn!("Service {} failed to stop: {}", service.name(), e);
474 if first_error.is_none() {
475 first_error = Some(e);
476 }
477 }
478 }
479
480 info!("CamelContext stopped");
481
482 if let Some(e) = first_error {
483 Err(e)
484 } else {
485 Ok(())
486 }
487 }
488
489 pub fn shutdown_timeout(&self) -> std::time::Duration {
491 self.shutdown_timeout
492 }
493
494 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
496 self.shutdown_timeout = timeout;
497 }
498
499 pub async fn abort(&mut self) {
501 self.cancel_token.cancel();
502 if let Some(join) = self.supervision_join.take() {
503 join.abort();
504 }
505 let route_ids = self
506 .route_controller
507 .shutdown_route_ids()
508 .await
509 .unwrap_or_default();
510 for route_id in route_ids {
511 let _ = self
512 .runtime
513 .execute(camel_api::RuntimeCommand::StopRoute {
514 route_id: route_id.clone(),
515 command_id: Self::next_context_command_id("abort-stop", &route_id),
516 causation_id: None,
517 })
518 .await;
519 }
520 }
521
522 pub fn health_check(&self) -> HealthReport {
524 let services: Vec<ServiceHealth> = self
525 .services
526 .iter()
527 .map(|s| ServiceHealth {
528 name: s.name().to_string(),
529 status: s.status(),
530 })
531 .collect();
532
533 let status = if services.iter().all(|s| s.status == ServiceStatus::Started) {
534 HealthStatus::Healthy
535 } else {
536 HealthStatus::Unhealthy
537 };
538
539 HealthReport {
540 status,
541 services,
542 ..Default::default()
543 }
544 }
545
546 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
548 self.component_configs
549 .insert(TypeId::of::<T>(), Box::new(config));
550 }
551
552 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
554 self.component_configs
555 .get(&TypeId::of::<T>())
556 .and_then(|b| b.downcast_ref::<T>())
557 }
558}
559
560impl ComponentRegistrar for CamelContext {
561 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
562 self.registry
563 .lock()
564 .expect("mutex poisoned: another thread panicked while holding this lock")
565 .register(component);
566 }
567}
568
569impl ComponentContext for CamelContext {
570 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
571 self.registry.lock().ok()?.get(scheme)
572 }
573
574 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
575 self.languages.lock().ok()?.get(name).cloned()
576 }
577
578 fn metrics(&self) -> Arc<dyn MetricsCollector> {
579 Arc::clone(&self.metrics)
580 }
581}
582
583impl CamelContextBuilder {
584 pub fn new() -> Self {
585 Self {
586 registry: None,
587 languages: None,
588 metrics: None,
589 supervision_config: None,
590 runtime_store: None,
591 shutdown_timeout: std::time::Duration::from_secs(30),
592 }
593 }
594
595 pub fn registry(mut self, registry: Arc<std::sync::Mutex<Registry>>) -> Self {
596 self.registry = Some(registry);
597 self
598 }
599
600 pub fn languages(mut self, languages: SharedLanguageRegistry) -> Self {
601 self.languages = Some(languages);
602 self
603 }
604
605 pub fn metrics(mut self, metrics: Arc<dyn MetricsCollector>) -> Self {
606 self.metrics = Some(metrics);
607 self
608 }
609
610 pub fn supervision(mut self, config: SupervisionConfig) -> Self {
611 self.supervision_config = Some(config);
612 self
613 }
614
615 pub fn runtime_store(
616 mut self,
617 store: crate::lifecycle::adapters::InMemoryRuntimeStore,
618 ) -> Self {
619 self.runtime_store = Some(store);
620 self
621 }
622
623 pub fn shutdown_timeout(mut self, timeout: std::time::Duration) -> Self {
624 self.shutdown_timeout = timeout;
625 self
626 }
627
628 pub async fn build(self) -> Result<CamelContext, CamelError> {
629 let registry = self
630 .registry
631 .unwrap_or_else(|| Arc::new(std::sync::Mutex::new(Registry::new())));
632 let languages = self
633 .languages
634 .unwrap_or_else(CamelContext::built_in_languages);
635 let metrics = self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics));
636
637 let (controller, actor_join, supervision_join) =
638 if let Some(config) = self.supervision_config {
639 let (crash_tx, crash_rx) = tokio::sync::mpsc::channel(64);
640 let mut controller_impl = DefaultRouteController::with_languages(
641 Arc::clone(®istry),
642 Arc::clone(&languages),
643 );
644 controller_impl.set_crash_notifier(crash_tx);
645 let (controller, actor_join) = spawn_controller_actor(controller_impl);
646 let supervision_join = spawn_supervision_task(
647 controller.clone(),
648 config,
649 Some(Arc::clone(&metrics)),
650 crash_rx,
651 );
652 (controller, actor_join, Some(supervision_join))
653 } else {
654 let controller_impl = DefaultRouteController::with_languages(
655 Arc::clone(®istry),
656 Arc::clone(&languages),
657 );
658 let (controller, actor_join) = spawn_controller_actor(controller_impl);
659 (controller, actor_join, None)
660 };
661
662 let store = self.runtime_store.unwrap_or_default();
663 let runtime = CamelContext::build_runtime(controller.clone(), store);
664 let runtime_handle: Arc<dyn camel_api::RuntimeHandle> = runtime.clone();
665 controller
666 .try_set_runtime_handle(runtime_handle)
667 .expect("controller actor mailbox should accept initial runtime handle");
668
669 Ok(CamelContext {
670 registry,
671 route_controller: controller,
672 _actor_join: actor_join,
673 supervision_join,
674 runtime,
675 cancel_token: CancellationToken::new(),
676 metrics,
677 languages,
678 shutdown_timeout: self.shutdown_timeout,
679 services: Vec::new(),
680 component_configs: HashMap::new(),
681 })
682 }
683}
684
685impl Default for CamelContextBuilder {
686 fn default() -> Self {
687 Self::new()
688 }
689}
690
691#[cfg(test)]
692#[path = "context_tests.rs"]
693mod context_tests;