1use std::any::{Any, TypeId};
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use tokio::time::timeout;
6use tokio_util::sync::CancellationToken;
7use tracing::{info, warn};
8
9use camel_api::error_handler::ErrorHandlerConfig;
10use camel_api::{
11 CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, PlatformIdentity,
12 PlatformService, ReadinessGate, RouteTemplateSpec, RuntimeCommandBus, RuntimeQueryBus,
13 TemplateInstanceRecord,
14};
15use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
16use camel_language_api::Language;
17
18use crate::health_registry::HealthCheckRegistry;
19use crate::lifecycle::adapters::controller_actor::RouteControllerHandle;
20use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
21use crate::lifecycle::application::route_definition::RouteDefinition;
22use crate::lifecycle::application::runtime_bus::RuntimeBus;
23use crate::lifecycle::domain::LanguageRegistryError;
24use crate::shared::components::domain::Registry;
25use crate::shared::observability::domain::TracerConfig;
26use crate::template::TemplateRegistry;
27
28static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
29
30pub use crate::context_builder::CamelContextBuilder;
31
32pub struct CamelContext {
40 registry: Arc<std::sync::Mutex<Registry>>,
41 route_controller: RouteControllerHandle,
42 _actor_join: tokio::task::JoinHandle<()>,
43 supervision_join: Option<tokio::task::JoinHandle<()>>,
44 runtime: Arc<RuntimeBus>,
45 cancel_token: CancellationToken,
46 metrics: Arc<dyn MetricsCollector>,
47 platform_service: Arc<dyn PlatformService>,
49 languages: SharedLanguageRegistry,
50 shutdown_timeout: std::time::Duration,
51 services: Vec<Box<dyn Lifecycle>>,
52 health_registry: Arc<HealthCheckRegistry>,
53 component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
54 function_invoker: Option<Arc<dyn FunctionInvoker>>,
55 template_registry: Arc<TemplateRegistry>,
56}
57
58pub(crate) struct FromParts {
61 pub(crate) registry: Arc<std::sync::Mutex<Registry>>,
62 pub(crate) route_controller: RouteControllerHandle,
63 pub(crate) _actor_join: tokio::task::JoinHandle<()>,
64 pub(crate) supervision_join: Option<tokio::task::JoinHandle<()>>,
65 pub(crate) runtime: Arc<RuntimeBus>,
66 pub(crate) cancel_token: CancellationToken,
67 pub(crate) metrics: Arc<dyn MetricsCollector>,
68 pub(crate) platform_service: Arc<dyn PlatformService>,
69 pub(crate) languages: SharedLanguageRegistry,
70 pub(crate) shutdown_timeout: std::time::Duration,
71 pub(crate) services: Vec<Box<dyn Lifecycle>>,
72 pub(crate) health_registry: Arc<HealthCheckRegistry>,
73 pub(crate) component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
74 pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
75 pub(crate) template_registry: Arc<TemplateRegistry>,
76}
77
78impl CamelContext {
79 pub(crate) fn from_parts(parts: FromParts) -> Self {
80 Self {
81 registry: parts.registry,
82 route_controller: parts.route_controller,
83 _actor_join: parts._actor_join,
84 supervision_join: parts.supervision_join,
85 runtime: parts.runtime,
86 cancel_token: parts.cancel_token,
87 metrics: parts.metrics,
88 platform_service: parts.platform_service,
89 languages: parts.languages,
90 shutdown_timeout: parts.shutdown_timeout,
91 services: parts.services,
92 health_registry: parts.health_registry,
93 component_configs: parts.component_configs,
94 function_invoker: parts.function_invoker,
95 template_registry: parts.template_registry,
96 }
97 }
98}
99
100#[derive(Clone)]
104pub struct RuntimeExecutionHandle {
105 controller: RouteControllerHandle,
106 runtime: Arc<RuntimeBus>,
107 function_invoker: Option<Arc<dyn FunctionInvoker>>,
108}
109
110impl RuntimeExecutionHandle {
111 pub(crate) async fn add_route_definition(
112 &self,
113 definition: RouteDefinition,
114 ) -> Result<(), CamelError> {
115 use crate::lifecycle::ports::RouteRegistrationPort;
116 self.runtime
117 .register_route(definition)
118 .await
119 .map_err(Into::into)
120 }
121
122 pub(crate) async fn compile_route_definition(
123 &self,
124 definition: RouteDefinition,
125 ) -> Result<camel_api::BoxProcessor, CamelError> {
126 self.controller.compile_route_definition(definition).await
127 }
128
129 pub(crate) async fn compile_route_definition_with_generation(
130 &self,
131 definition: RouteDefinition,
132 generation: u64,
133 ) -> Result<camel_api::BoxProcessor, CamelError> {
134 self.controller
135 .compile_route_definition_with_generation(definition, generation)
136 .await
137 }
138
139 pub(crate) async fn prepare_route_definition_with_generation(
140 &self,
141 definition: RouteDefinition,
142 generation: u64,
143 ) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
144 self.controller
145 .prepare_route_definition_with_generation(definition, generation)
146 .await
147 }
148
149 pub(crate) async fn insert_prepared_route(
150 &self,
151 prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
152 ) -> Result<(), CamelError> {
153 self.controller.insert_prepared_route(prepared).await
154 }
155
156 pub(crate) async fn remove_route_preserving_functions(
157 &self,
158 route_id: String,
159 ) -> Result<(), CamelError> {
160 self.controller
161 .remove_route_preserving_functions(route_id)
162 .await
163 }
164
165 pub(crate) async fn register_route_aggregate(
166 &self,
167 route_id: String,
168 ) -> Result<(), CamelError> {
169 self.runtime.register_aggregate_only(route_id).await
170 }
171
172 pub(crate) async fn swap_route_pipeline(
173 &self,
174 route_id: &str,
175 pipeline: camel_api::BoxProcessor,
176 ) -> Result<(), CamelError> {
177 self.controller.swap_pipeline(route_id, pipeline).await
178 }
179
180 pub(crate) async fn execute_runtime_command(
181 &self,
182 cmd: camel_api::RuntimeCommand,
183 ) -> Result<camel_api::RuntimeCommandResult, CamelError> {
184 self.runtime.execute(cmd).await
185 }
186
187 pub(crate) async fn runtime_route_status(
188 &self,
189 route_id: &str,
190 ) -> Result<Option<String>, CamelError> {
191 match self
192 .runtime
193 .ask(camel_api::RuntimeQuery::GetRouteStatus {
194 route_id: route_id.to_string(),
195 })
196 .await
197 {
198 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
199 Ok(_) => Err(CamelError::RouteError(
200 "unexpected runtime query response for route status".to_string(),
201 )),
202 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
203 Err(err) => Err(err),
204 }
205 }
206
207 pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
208 match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
209 Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
210 Ok(_) => Err(CamelError::RouteError(
211 "unexpected runtime query response for route listing".to_string(),
212 )),
213 Err(err) => Err(err),
214 }
215 }
216
217 pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
218 self.controller.route_source_hash(route_id).await
219 }
220
221 pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
222 if !self.controller.route_exists(route_id).await? {
223 return Err(CamelError::RouteError(format!(
224 "Route '{}' not found",
225 route_id
226 )));
227 }
228 Ok(self
229 .controller
230 .in_flight_count(route_id)
231 .await?
232 .unwrap_or(0))
233 }
234
235 pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
236 self.function_invoker.clone()
237 }
238
239 #[cfg(test)]
240 pub(crate) async fn force_start_route_for_test(
241 &self,
242 route_id: &str,
243 ) -> Result<(), CamelError> {
244 self.controller.start_route(route_id).await
245 }
246
247 pub async fn controller_route_count_for_test(&self) -> usize {
248 self.controller.route_count().await.unwrap_or(0)
249 }
250}
251
252impl CamelContext {
253 pub fn builder() -> CamelContextBuilder {
254 CamelContextBuilder::new()
255 }
256
257 pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
259 let _ = self.route_controller.set_error_handler(config).await;
260 }
261
262 pub async fn set_tracing(&mut self, enabled: bool) {
264 let _ = self
265 .route_controller
266 .set_tracer_config(TracerConfig {
267 enabled,
268 ..Default::default()
269 })
270 .await;
271 }
272
273 pub async fn set_tracer_config(&mut self, config: TracerConfig) {
275 let config = if config.metrics_collector.is_none() {
277 TracerConfig {
278 metrics_collector: Some(Arc::clone(&self.metrics)),
279 ..config
280 }
281 } else {
282 config
283 };
284
285 let _ = self.route_controller.set_tracer_config(config).await;
286 }
287
288 pub async fn with_tracing(mut self) -> Self {
290 self.set_tracing(true).await;
291 self
292 }
293
294 pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
298 self.set_tracer_config(config).await;
299 self
300 }
301
302 pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
311 if let Some(collector) = service.as_metrics_collector() {
312 self.metrics = collector;
313 }
314 if let Some(invoker) = service.as_function_invoker() {
315 self.function_invoker = Some(invoker.clone());
316 if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
317 tracing::warn!("Failed to propagate function invoker to route controller: {e}");
318 }
319 }
320
321 self.services.push(Box::new(service));
322 self
323 }
324
325 pub fn register_component<C: Component + 'static>(&mut self, component: C) {
327 info!(scheme = component.scheme(), "Registering component");
328 self.registry
329 .lock()
330 .expect("mutex poisoned: another thread panicked while holding this lock") .register(Arc::new(component));
332 }
333
334 pub fn register_language(
341 &mut self,
342 name: impl Into<String>,
343 lang: Box<dyn Language>,
344 ) -> Result<(), LanguageRegistryError> {
345 let name = name.into();
346 let mut languages = self
347 .languages
348 .lock()
349 .expect("mutex poisoned: another thread panicked while holding this lock"); if languages.contains_key(&name) {
351 return Err(LanguageRegistryError::AlreadyRegistered { name });
352 }
353 languages.insert(name, Arc::from(lang));
354 Ok(())
355 }
356
357 pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
359 let languages = self
360 .languages
361 .lock()
362 .expect("mutex poisoned: another thread panicked while holding this lock"); languages.get(name).cloned()
364 }
365
366 pub async fn add_route_definition(
370 &self,
371 definition: RouteDefinition,
372 ) -> Result<(), CamelError> {
373 use crate::lifecycle::ports::RouteRegistrationPort;
374 info!(
375 from = definition.from_uri(),
376 route_id = %definition.route_id(),
377 "Adding route definition"
378 );
379 self.runtime
380 .register_route(definition)
381 .await
382 .map_err(Into::into)
383 }
384
385 fn next_context_command_id(op: &str, route_id: &str) -> String {
386 let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
387 format!("context:{op}:{route_id}:{seq}")
388 }
389
390 pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
392 self.registry
393 .lock()
394 .expect("mutex poisoned: another thread panicked while holding this lock") }
396
397 pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
399 Arc::clone(&self.registry)
400 }
401
402 pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
404 RuntimeExecutionHandle {
405 controller: self.route_controller.clone(),
406 runtime: Arc::clone(&self.runtime),
407 function_invoker: self.function_invoker.clone(),
408 }
409 }
410
411 pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
413 Arc::clone(&self.metrics)
414 }
415
416 pub fn platform_service(&self) -> Arc<dyn PlatformService> {
418 Arc::clone(&self.platform_service)
419 }
420
421 pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
423 self.platform_service.readiness_gate()
424 }
425
426 pub fn platform_identity(&self) -> PlatformIdentity {
428 self.platform_service.identity()
429 }
430
431 pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
433 self.platform_service.leadership()
434 }
435
436 pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
438 self.runtime.clone()
439 }
440
441 pub fn producer_context(&self) -> camel_api::ProducerContext {
443 camel_api::ProducerContext::new().with_runtime(self.runtime())
444 }
445
446 pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
448 match self
449 .runtime()
450 .ask(camel_api::RuntimeQuery::GetRouteStatus {
451 route_id: route_id.to_string(),
452 })
453 .await
454 {
455 Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
456 Ok(_) => Err(CamelError::RouteError(
457 "unexpected runtime query response for route status".to_string(),
458 )),
459 Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
460 Err(err) => Err(err),
461 }
462 }
463
464 pub async fn start(&mut self) -> Result<(), CamelError> {
469 info!("Starting CamelContext");
470
471 for (i, service) in self.services.iter_mut().enumerate() {
473 info!("Starting service: {}", service.name());
474 if let Err(e) = service.start().await {
475 warn!(
477 "Service {} failed to start, rolling back {} services",
478 service.name(),
479 i
480 );
481 for j in (0..i).rev() {
482 if let Err(rollback_err) = self.services[j].stop().await {
483 warn!(
484 "Failed to stop service {} during rollback: {}",
485 self.services[j].name(),
486 rollback_err
487 );
488 }
489 }
490 return Err(e);
491 }
492 }
493
494 let route_ids = self.route_controller.auto_startup_route_ids().await?;
497 for route_id in route_ids {
498 self.runtime
499 .execute(camel_api::RuntimeCommand::StartRoute {
500 route_id: route_id.clone(),
501 command_id: Self::next_context_command_id("start", &route_id),
502 causation_id: None,
503 })
504 .await?;
505 }
506
507 info!("CamelContext started");
508 Ok(())
509 }
510
511 pub async fn stop(&mut self) -> Result<(), CamelError> {
513 self.stop_timeout(self.shutdown_timeout).await
514 }
515
516 pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
523 info!("Stopping CamelContext");
524
525 self.cancel_token.cancel();
527 if let Some(join) = self.supervision_join.take() {
528 join.abort();
529 }
530
531 let route_ids = self.route_controller.shutdown_route_ids().await?;
534 for route_id in route_ids {
535 if let Err(err) = self
536 .runtime
537 .execute(camel_api::RuntimeCommand::StopRoute {
538 route_id: route_id.clone(),
539 command_id: Self::next_context_command_id("stop", &route_id),
540 causation_id: None,
541 })
542 .await
543 {
544 warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
545 }
546 }
547
548 self.health_registry.cancel_token().cancel();
549
550 let mut first_error = None;
553 for service in self.services.iter_mut().rev() {
554 info!("Stopping service: {}", service.name());
555 if let Err(e) = service.stop().await {
556 warn!("Service {} failed to stop: {}", service.name(), e);
557 if first_error.is_none() {
558 first_error = Some(e);
559 }
560 }
561 }
562
563 info!("CamelContext stopped");
564
565 if let Some(e) = first_error {
566 Err(e)
567 } else {
568 Ok(())
569 }
570 }
571
572 pub fn shutdown_timeout(&self) -> std::time::Duration {
574 self.shutdown_timeout
575 }
576
577 pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
579 self.shutdown_timeout = timeout;
580 }
581
582 pub async fn abort(&mut self) {
584 self.cancel_token.cancel();
585 if let Some(join) = self.supervision_join.take() {
586 join.abort();
587 }
588 let route_ids = self
589 .route_controller
590 .shutdown_route_ids()
591 .await
592 .unwrap_or_default();
593 for route_id in route_ids {
594 let _ = self
595 .runtime
596 .execute(camel_api::RuntimeCommand::StopRoute {
597 route_id: route_id.clone(),
598 command_id: Self::next_context_command_id("abort-stop", &route_id),
599 causation_id: None,
600 })
601 .await;
602 }
603
604 for service in self.services.iter_mut().rev() {
605 let name = service.name().to_string();
606 match timeout(std::time::Duration::from_secs(5), service.stop()).await {
607 Ok(Ok(())) => info!("Aborted service: {}", name),
608 Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
609 Err(_) => warn!("Service {} timed out during abort (5s)", name),
610 }
611 }
612 }
613
614 pub async fn health_check(&self) -> HealthReport {
616 use camel_api::HealthSource;
617 self.health_report().await
618 }
619
620 pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
621 Arc::clone(&self.health_registry)
622 }
623
624 pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
626 self.component_configs
627 .insert(TypeId::of::<T>(), Box::new(config));
628 }
629
630 pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
632 self.component_configs
633 .get(&TypeId::of::<T>())
634 .and_then(|b| b.downcast_ref::<T>())
635 }
636
637 pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
643 self.template_registry.register(spec)
644 }
645
646 pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
648 self.template_registry.get(id)
649 }
650
651 pub fn template_ids(&self) -> Vec<String> {
653 self.template_registry.template_ids()
654 }
655
656 pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
658 self.template_registry.record_instance(record)
659 }
660
661 pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
663 self.template_registry.instances(template_id)
664 }
665}
666
667impl ComponentRegistrar for CamelContext {
668 fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
669 self.registry
670 .lock()
671 .expect("mutex poisoned: another thread panicked while holding this lock") .register(component);
673 }
674}
675
676impl ComponentContext for CamelContext {
677 fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
678 self.registry.lock().ok()?.get(scheme)
679 }
680
681 fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
682 self.languages.lock().ok()?.get(name).cloned()
683 }
684
685 fn metrics(&self) -> Arc<dyn MetricsCollector> {
686 Arc::clone(&self.metrics)
687 }
688
689 fn health(&self) -> Arc<dyn camel_component_api::HealthCheckRegistry> {
690 Arc::clone(&self.health_registry) as Arc<dyn camel_component_api::HealthCheckRegistry>
693 }
694
695 fn platform_service(&self) -> Arc<dyn PlatformService> {
696 Arc::clone(&self.platform_service)
697 }
698
699 fn register_route_health_check(
700 &self,
701 route_id: &str,
702 check: Arc<dyn camel_api::AsyncHealthCheck>,
703 ) {
704 self.health_registry.register_for_route(route_id, check);
705 }
706
707 fn unregister_route_health_check(&self, route_id: &str) {
708 self.health_registry.unregister_for_route(route_id);
709 }
710}
711
712#[async_trait::async_trait]
713impl camel_api::HealthSource for CamelContext {
714 async fn liveness(&self) -> camel_api::HealthStatus {
715 let has_failed = self
716 .services
717 .iter()
718 .any(|s| s.status() == camel_api::ServiceStatus::Failed);
719 if has_failed {
720 camel_api::HealthStatus::Unhealthy
721 } else {
722 camel_api::HealthStatus::Healthy
723 }
724 }
725
726 async fn readiness(&self) -> camel_api::HealthStatus {
727 let has_failed = self
728 .services
729 .iter()
730 .any(|s| s.status() == camel_api::ServiceStatus::Failed);
731 if has_failed {
732 return camel_api::HealthStatus::Unhealthy;
733 }
734 let has_stopped = self
735 .services
736 .iter()
737 .any(|s| s.status() == camel_api::ServiceStatus::Stopped);
738 if has_stopped {
739 return camel_api::HealthStatus::Degraded;
740 }
741 self.health_registry.check_all().await.status
742 }
743
744 async fn health_report(&self) -> camel_api::HealthReport {
745 let mut report = self.health_registry.check_all().await;
746 let mut worst = report.status;
747 for service in &self.services {
748 let svc_status = service.status();
749 let health = match svc_status {
750 camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
751 camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
752 camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
753 };
754 if matches!(worst, camel_api::HealthStatus::Healthy)
755 && matches!(
756 health,
757 camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
758 )
759 {
760 worst = health;
761 }
762 if matches!(worst, camel_api::HealthStatus::Degraded)
763 && matches!(health, camel_api::HealthStatus::Unhealthy)
764 {
765 worst = health;
766 }
767 report.services.push(camel_api::ServiceHealth {
768 name: service.name().to_string(),
769 status: svc_status,
770 message: None,
771 });
772 }
773 report.status = worst;
774 report
775 }
776
777 async fn startup(&self) -> camel_api::HealthStatus {
778 camel_api::HealthStatus::Healthy
779 }
780}
781
782#[cfg(test)]
783#[path = "context_tests.rs"]
784mod context_tests;