use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use camel_api::error_handler::ErrorHandlerConfig;
use camel_api::{
CamelError, FunctionInvoker, HealthReport, Lifecycle, MetricsCollector, PlatformIdentity,
PlatformService, ReadinessGate, RouteTemplateSpec, RuntimeCommandBus, RuntimeQueryBus,
TemplateInstanceRecord,
};
use camel_component_api::{Component, ComponentContext, ComponentRegistrar};
use camel_language_api::Language;
use crate::health_registry::HealthCheckRegistry;
use crate::lifecycle::adapters::controller_actor::RouteControllerHandle;
use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
use crate::lifecycle::application::route_definition::RouteDefinition;
use crate::lifecycle::application::runtime_bus::RuntimeBus;
use crate::lifecycle::domain::LanguageRegistryError;
use crate::shared::components::domain::Registry;
use crate::shared::observability::domain::TracerConfig;
use crate::template::TemplateRegistry;
static CONTEXT_COMMAND_SEQ: AtomicU64 = AtomicU64::new(0);
pub use crate::context_builder::CamelContextBuilder;
pub struct CamelContext {
registry: Arc<std::sync::Mutex<Registry>>,
route_controller: RouteControllerHandle,
_actor_join: tokio::task::JoinHandle<()>,
supervision_join: Option<tokio::task::JoinHandle<()>>,
runtime: Arc<RuntimeBus>,
cancel_token: CancellationToken,
metrics: Arc<dyn MetricsCollector>,
platform_service: Arc<dyn PlatformService>,
languages: SharedLanguageRegistry,
shutdown_timeout: std::time::Duration,
services: Vec<Box<dyn Lifecycle>>,
health_registry: Arc<HealthCheckRegistry>,
component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
function_invoker: Option<Arc<dyn FunctionInvoker>>,
template_registry: Arc<TemplateRegistry>,
}
pub(crate) struct FromParts {
pub(crate) registry: Arc<std::sync::Mutex<Registry>>,
pub(crate) route_controller: RouteControllerHandle,
pub(crate) _actor_join: tokio::task::JoinHandle<()>,
pub(crate) supervision_join: Option<tokio::task::JoinHandle<()>>,
pub(crate) runtime: Arc<RuntimeBus>,
pub(crate) cancel_token: CancellationToken,
pub(crate) metrics: Arc<dyn MetricsCollector>,
pub(crate) platform_service: Arc<dyn PlatformService>,
pub(crate) languages: SharedLanguageRegistry,
pub(crate) shutdown_timeout: std::time::Duration,
pub(crate) services: Vec<Box<dyn Lifecycle>>,
pub(crate) health_registry: Arc<HealthCheckRegistry>,
pub(crate) component_configs: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
pub(crate) function_invoker: Option<Arc<dyn FunctionInvoker>>,
pub(crate) template_registry: Arc<TemplateRegistry>,
}
impl CamelContext {
pub(crate) fn from_parts(parts: FromParts) -> Self {
Self {
registry: parts.registry,
route_controller: parts.route_controller,
_actor_join: parts._actor_join,
supervision_join: parts.supervision_join,
runtime: parts.runtime,
cancel_token: parts.cancel_token,
metrics: parts.metrics,
platform_service: parts.platform_service,
languages: parts.languages,
shutdown_timeout: parts.shutdown_timeout,
services: parts.services,
health_registry: parts.health_registry,
component_configs: parts.component_configs,
function_invoker: parts.function_invoker,
template_registry: parts.template_registry,
}
}
}
#[derive(Clone)]
pub struct RuntimeExecutionHandle {
controller: RouteControllerHandle,
runtime: Arc<RuntimeBus>,
function_invoker: Option<Arc<dyn FunctionInvoker>>,
}
impl RuntimeExecutionHandle {
pub(crate) async fn add_route_definition(
&self,
definition: RouteDefinition,
) -> Result<(), CamelError> {
use crate::lifecycle::ports::RouteRegistrationPort;
self.runtime
.register_route(definition)
.await
.map_err(Into::into)
}
pub(crate) async fn compile_route_definition(
&self,
definition: RouteDefinition,
) -> Result<camel_api::BoxProcessor, CamelError> {
self.controller.compile_route_definition(definition).await
}
pub(crate) async fn compile_route_definition_with_generation(
&self,
definition: RouteDefinition,
generation: u64,
) -> Result<camel_api::BoxProcessor, CamelError> {
self.controller
.compile_route_definition_with_generation(definition, generation)
.await
}
pub(crate) async fn prepare_route_definition_with_generation(
&self,
definition: RouteDefinition,
generation: u64,
) -> Result<crate::lifecycle::adapters::route_controller::PreparedRoute, CamelError> {
self.controller
.prepare_route_definition_with_generation(definition, generation)
.await
}
pub(crate) async fn insert_prepared_route(
&self,
prepared: crate::lifecycle::adapters::route_controller::PreparedRoute,
) -> Result<(), CamelError> {
self.controller.insert_prepared_route(prepared).await
}
pub(crate) async fn remove_route_preserving_functions(
&self,
route_id: String,
) -> Result<(), CamelError> {
self.controller
.remove_route_preserving_functions(route_id)
.await
}
pub(crate) async fn register_route_aggregate(
&self,
route_id: String,
) -> Result<(), CamelError> {
self.runtime.register_aggregate_only(route_id).await
}
pub(crate) async fn swap_route_pipeline(
&self,
route_id: &str,
pipeline: camel_api::BoxProcessor,
) -> Result<(), CamelError> {
self.controller.swap_pipeline(route_id, pipeline).await
}
pub(crate) async fn execute_runtime_command(
&self,
cmd: camel_api::RuntimeCommand,
) -> Result<camel_api::RuntimeCommandResult, CamelError> {
self.runtime.execute(cmd).await
}
pub(crate) async fn runtime_route_status(
&self,
route_id: &str,
) -> Result<Option<String>, CamelError> {
match self
.runtime
.ask(camel_api::RuntimeQuery::GetRouteStatus {
route_id: route_id.to_string(),
})
.await
{
Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
Ok(_) => Err(CamelError::RouteError(
"unexpected runtime query response for route status".to_string(),
)),
Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
Err(err) => Err(err),
}
}
pub(crate) async fn runtime_route_ids(&self) -> Result<Vec<String>, CamelError> {
match self.runtime.ask(camel_api::RuntimeQuery::ListRoutes).await {
Ok(camel_api::RuntimeQueryResult::Routes { route_ids }) => Ok(route_ids),
Ok(_) => Err(CamelError::RouteError(
"unexpected runtime query response for route listing".to_string(),
)),
Err(err) => Err(err),
}
}
pub(crate) async fn route_source_hash(&self, route_id: &str) -> Option<u64> {
self.controller.route_source_hash(route_id).await
}
pub(crate) async fn in_flight_count(&self, route_id: &str) -> Result<u64, CamelError> {
if !self.controller.route_exists(route_id).await? {
return Err(CamelError::RouteError(format!(
"Route '{}' not found",
route_id
)));
}
Ok(self
.controller
.in_flight_count(route_id)
.await?
.unwrap_or(0))
}
pub(crate) fn function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
self.function_invoker.clone()
}
#[cfg(test)]
pub(crate) async fn force_start_route_for_test(
&self,
route_id: &str,
) -> Result<(), CamelError> {
self.controller.start_route(route_id).await
}
pub async fn controller_route_count_for_test(&self) -> usize {
self.controller.route_count().await.unwrap_or(0)
}
}
impl CamelContext {
pub fn builder() -> CamelContextBuilder {
CamelContextBuilder::new()
}
pub async fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
let _ = self.route_controller.set_error_handler(config).await;
}
pub async fn set_tracing(&mut self, enabled: bool) {
let _ = self
.route_controller
.set_tracer_config(TracerConfig {
enabled,
..Default::default()
})
.await;
}
pub async fn set_tracer_config(&mut self, config: TracerConfig) {
let config = if config.metrics_collector.is_none() {
TracerConfig {
metrics_collector: Some(Arc::clone(&self.metrics)),
..config
}
} else {
config
};
let _ = self.route_controller.set_tracer_config(config).await;
}
pub async fn with_tracing(mut self) -> Self {
self.set_tracing(true).await;
self
}
pub async fn with_tracer_config(mut self, config: TracerConfig) -> Self {
self.set_tracer_config(config).await;
self
}
pub fn with_lifecycle<L: Lifecycle + 'static>(mut self, service: L) -> Self {
if let Some(collector) = service.as_metrics_collector() {
self.metrics = collector;
}
if let Some(invoker) = service.as_function_invoker() {
self.function_invoker = Some(invoker.clone());
if let Err(e) = self.route_controller.try_set_function_invoker(invoker) {
tracing::warn!("Failed to propagate function invoker to route controller: {e}");
}
}
self.services.push(Box::new(service));
self
}
pub fn register_component<C: Component + 'static>(&mut self, component: C) {
info!(scheme = component.scheme(), "Registering component");
self.registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock") .register(Arc::new(component));
}
pub fn register_language(
&mut self,
name: impl Into<String>,
lang: Box<dyn Language>,
) -> Result<(), LanguageRegistryError> {
let name = name.into();
let mut languages = self
.languages
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock"); if languages.contains_key(&name) {
return Err(LanguageRegistryError::AlreadyRegistered { name });
}
languages.insert(name, Arc::from(lang));
Ok(())
}
pub fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
let languages = self
.languages
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock"); languages.get(name).cloned()
}
pub async fn add_route_definition(
&self,
definition: RouteDefinition,
) -> Result<(), CamelError> {
use crate::lifecycle::ports::RouteRegistrationPort;
info!(
from = definition.from_uri(),
route_id = %definition.route_id(),
"Adding route definition"
);
self.runtime
.register_route(definition)
.await
.map_err(Into::into)
}
fn next_context_command_id(op: &str, route_id: &str) -> String {
let seq = CONTEXT_COMMAND_SEQ.fetch_add(1, Ordering::Relaxed);
format!("context:{op}:{route_id}:{seq}")
}
pub fn registry(&self) -> std::sync::MutexGuard<'_, Registry> {
self.registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock") }
pub fn registry_arc(&self) -> Arc<std::sync::Mutex<Registry>> {
Arc::clone(&self.registry)
}
pub fn runtime_execution_handle(&self) -> RuntimeExecutionHandle {
RuntimeExecutionHandle {
controller: self.route_controller.clone(),
runtime: Arc::clone(&self.runtime),
function_invoker: self.function_invoker.clone(),
}
}
pub fn metrics(&self) -> Arc<dyn MetricsCollector> {
Arc::clone(&self.metrics)
}
pub fn platform_service(&self) -> Arc<dyn PlatformService> {
Arc::clone(&self.platform_service)
}
pub fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
self.platform_service.readiness_gate()
}
pub fn platform_identity(&self) -> PlatformIdentity {
self.platform_service.identity()
}
pub fn leadership(&self) -> Arc<dyn camel_api::LeadershipService> {
self.platform_service.leadership()
}
pub fn runtime(&self) -> Arc<dyn camel_api::RuntimeHandle> {
self.runtime.clone()
}
pub fn producer_context(&self) -> camel_api::ProducerContext {
camel_api::ProducerContext::new().with_runtime(self.runtime())
}
pub async fn runtime_route_status(&self, route_id: &str) -> Result<Option<String>, CamelError> {
match self
.runtime()
.ask(camel_api::RuntimeQuery::GetRouteStatus {
route_id: route_id.to_string(),
})
.await
{
Ok(camel_api::RuntimeQueryResult::RouteStatus { status, .. }) => Ok(Some(status)),
Ok(_) => Err(CamelError::RouteError(
"unexpected runtime query response for route status".to_string(),
)),
Err(CamelError::RouteError(msg)) if msg.contains("not found") => Ok(None),
Err(err) => Err(err),
}
}
pub async fn start(&mut self) -> Result<(), CamelError> {
info!("Starting CamelContext");
for (i, service) in self.services.iter_mut().enumerate() {
info!("Starting service: {}", service.name());
if let Err(e) = service.start().await {
warn!(
"Service {} failed to start, rolling back {} services",
service.name(),
i
);
for j in (0..i).rev() {
if let Err(rollback_err) = self.services[j].stop().await {
warn!(
"Failed to stop service {} during rollback: {}",
self.services[j].name(),
rollback_err
);
}
}
return Err(e);
}
}
let route_ids = self.route_controller.auto_startup_route_ids().await?;
for route_id in route_ids {
self.runtime
.execute(camel_api::RuntimeCommand::StartRoute {
route_id: route_id.clone(),
command_id: Self::next_context_command_id("start", &route_id),
causation_id: None,
})
.await?;
}
info!("CamelContext started");
Ok(())
}
pub async fn stop(&mut self) -> Result<(), CamelError> {
self.stop_timeout(self.shutdown_timeout).await
}
pub async fn stop_timeout(&mut self, _timeout: std::time::Duration) -> Result<(), CamelError> {
info!("Stopping CamelContext");
self.cancel_token.cancel();
if let Some(join) = self.supervision_join.take() {
join.abort();
}
let route_ids = self.route_controller.shutdown_route_ids().await?;
for route_id in route_ids {
if let Err(err) = self
.runtime
.execute(camel_api::RuntimeCommand::StopRoute {
route_id: route_id.clone(),
command_id: Self::next_context_command_id("stop", &route_id),
causation_id: None,
})
.await
{
warn!(route_id = %route_id, error = %err, "Runtime stop command failed during context shutdown");
}
}
self.health_registry.cancel_token().cancel();
let mut first_error = None;
for service in self.services.iter_mut().rev() {
info!("Stopping service: {}", service.name());
if let Err(e) = service.stop().await {
warn!("Service {} failed to stop: {}", service.name(), e);
if first_error.is_none() {
first_error = Some(e);
}
}
}
info!("CamelContext stopped");
if let Some(e) = first_error {
Err(e)
} else {
Ok(())
}
}
pub fn shutdown_timeout(&self) -> std::time::Duration {
self.shutdown_timeout
}
pub fn set_shutdown_timeout(&mut self, timeout: std::time::Duration) {
self.shutdown_timeout = timeout;
}
pub async fn abort(&mut self) {
self.cancel_token.cancel();
if let Some(join) = self.supervision_join.take() {
join.abort();
}
let route_ids = self
.route_controller
.shutdown_route_ids()
.await
.unwrap_or_default();
for route_id in route_ids {
let _ = self
.runtime
.execute(camel_api::RuntimeCommand::StopRoute {
route_id: route_id.clone(),
command_id: Self::next_context_command_id("abort-stop", &route_id),
causation_id: None,
})
.await;
}
for service in self.services.iter_mut().rev() {
let name = service.name().to_string();
match timeout(std::time::Duration::from_secs(5), service.stop()).await {
Ok(Ok(())) => info!("Aborted service: {}", name),
Ok(Err(e)) => warn!("Service {} failed to stop during abort: {}", name, e),
Err(_) => warn!("Service {} timed out during abort (5s)", name),
}
}
}
pub async fn health_check(&self) -> HealthReport {
use camel_api::HealthSource;
self.health_report().await
}
pub fn health_registry(&self) -> Arc<HealthCheckRegistry> {
Arc::clone(&self.health_registry)
}
pub fn set_component_config<T: 'static + Send + Sync>(&mut self, config: T) {
self.component_configs
.insert(TypeId::of::<T>(), Box::new(config));
}
pub fn get_component_config<T: 'static + Send + Sync>(&self) -> Option<&T> {
self.component_configs
.get(&TypeId::of::<T>())
.and_then(|b| b.downcast_ref::<T>())
}
pub fn add_route_template(&self, spec: RouteTemplateSpec) -> Result<(), CamelError> {
self.template_registry.register(spec)
}
pub fn get_route_template(&self, id: &str) -> Option<RouteTemplateSpec> {
self.template_registry.get(id)
}
pub fn template_ids(&self) -> Vec<String> {
self.template_registry.template_ids()
}
pub fn record_template_instance(&self, record: TemplateInstanceRecord) {
self.template_registry.record_instance(record)
}
pub fn template_instances(&self, template_id: &str) -> Vec<TemplateInstanceRecord> {
self.template_registry.instances(template_id)
}
}
impl ComponentRegistrar for CamelContext {
fn register_component_dyn(&mut self, component: Arc<dyn Component>) {
self.registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock") .register(component);
}
}
impl ComponentContext for CamelContext {
fn resolve_component(&self, scheme: &str) -> Option<Arc<dyn Component>> {
self.registry.lock().ok()?.get(scheme)
}
fn resolve_language(&self, name: &str) -> Option<Arc<dyn Language>> {
self.languages.lock().ok()?.get(name).cloned()
}
fn metrics(&self) -> Arc<dyn MetricsCollector> {
Arc::clone(&self.metrics)
}
fn health(&self) -> Arc<dyn camel_component_api::HealthCheckRegistry> {
Arc::clone(&self.health_registry) as Arc<dyn camel_component_api::HealthCheckRegistry>
}
fn platform_service(&self) -> Arc<dyn PlatformService> {
Arc::clone(&self.platform_service)
}
fn register_route_health_check(
&self,
route_id: &str,
check: Arc<dyn camel_api::AsyncHealthCheck>,
) {
self.health_registry.register_for_route(route_id, check);
}
fn unregister_route_health_check(&self, route_id: &str) {
self.health_registry.unregister_for_route(route_id);
}
}
#[async_trait::async_trait]
impl camel_api::HealthSource for CamelContext {
async fn liveness(&self) -> camel_api::HealthStatus {
let has_failed = self
.services
.iter()
.any(|s| s.status() == camel_api::ServiceStatus::Failed);
if has_failed {
camel_api::HealthStatus::Unhealthy
} else {
camel_api::HealthStatus::Healthy
}
}
async fn readiness(&self) -> camel_api::HealthStatus {
let has_failed = self
.services
.iter()
.any(|s| s.status() == camel_api::ServiceStatus::Failed);
if has_failed {
return camel_api::HealthStatus::Unhealthy;
}
let has_stopped = self
.services
.iter()
.any(|s| s.status() == camel_api::ServiceStatus::Stopped);
if has_stopped {
return camel_api::HealthStatus::Degraded;
}
self.health_registry.check_all().await.status
}
async fn health_report(&self) -> camel_api::HealthReport {
let mut report = self.health_registry.check_all().await;
let mut worst = report.status;
for service in &self.services {
let svc_status = service.status();
let health = match svc_status {
camel_api::ServiceStatus::Started => camel_api::HealthStatus::Healthy,
camel_api::ServiceStatus::Stopped => camel_api::HealthStatus::Degraded,
camel_api::ServiceStatus::Failed => camel_api::HealthStatus::Unhealthy,
};
if matches!(worst, camel_api::HealthStatus::Healthy)
&& matches!(
health,
camel_api::HealthStatus::Degraded | camel_api::HealthStatus::Unhealthy
)
{
worst = health;
}
if matches!(worst, camel_api::HealthStatus::Degraded)
&& matches!(health, camel_api::HealthStatus::Unhealthy)
{
worst = health;
}
report.services.push(camel_api::ServiceHealth {
name: service.name().to_string(),
status: svc_status,
message: None,
});
}
report.status = worst;
report
}
async fn startup(&self) -> camel_api::HealthStatus {
camel_api::HealthStatus::Healthy
}
}
#[cfg(test)]
#[path = "context_tests.rs"]
mod context_tests;