use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tower::{Layer, ServiceExt};
use tracing::{info, warn};
use camel_api::error_handler::ErrorHandlerConfig;
use camel_api::metrics::MetricsCollector;
#[allow(unused_imports)]
use camel_api::{
BoxProcessor, CamelError, Exchange, FunctionInvoker, IdentityProcessor, NoOpMetrics,
NoopPlatformService, PlatformService, ProducerContext, RouteController, RuntimeHandle,
};
use camel_component_api::{Consumer, ConsumerContext, consumer::ExchangeEnvelope};
use camel_processor::aggregator::AggregatorService;
pub use camel_processor::aggregator::SharedLanguageRegistry;
use crate::health_registry::HealthCheckRegistry;
use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
use crate::lifecycle::adapters::route_compiler::compose_pipeline;
use crate::lifecycle::adapters::route_compiler_ext::{RouteCompilerExt, build_eh_config_pipeline};
pub(crate) use crate::lifecycle::adapters::route_helpers::PreparedRoute;
use crate::lifecycle::adapters::route_helpers::{
AggregateSplitInfo, CrashNotification, ManagedRoute, find_top_level_aggregate_requiring_split,
handle_is_running, inferred_lifecycle_label, is_pending,
};
#[cfg(test)]
pub(super) use crate::lifecycle::adapters::route_helpers::{
emit_start_route_event, set_start_route_event_hook,
};
use crate::lifecycle::adapters::route_registry::RouteRegistry;
use crate::lifecycle::adapters::route_runtime_state;
use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinition};
use crate::shared::components::domain::Registry;
use crate::shared::observability::domain::{DetailLevel, TracerConfig};
use camel_bean::BeanRegistry;
pub struct DefaultRouteController {
pub(super) routes: RouteRegistry,
pub(super) registry: Arc<std::sync::Mutex<Registry>>,
pub(super) languages: SharedLanguageRegistry,
pub(super) beans: Arc<std::sync::Mutex<BeanRegistry>>,
pub(super) runtime: Option<Weak<dyn RuntimeHandle>>,
pub(super) global_error_handler: Option<ErrorHandlerConfig>,
pub(super) crash_notifier: Option<mpsc::Sender<CrashNotification>>,
pub(super) tracing_enabled: bool,
pub(super) tracer_detail_level: DetailLevel,
pub(super) tracer_metrics: Option<Arc<dyn MetricsCollector>>,
pub(super) platform_service: Arc<dyn PlatformService>,
pub(super) function_invoker: Option<Arc<dyn FunctionInvoker>>,
pub(super) health_registry: Option<Arc<HealthCheckRegistry>>,
}
impl DefaultRouteController {
pub(super) fn health_registry(&self) -> Arc<HealthCheckRegistry> {
self.health_registry.clone().unwrap_or_else(|| {
warn!("health_registry not configured — creating isolated fallback");
Arc::new(HealthCheckRegistry::new(Duration::from_secs(5)))
})
}
pub fn new(
registry: Arc<std::sync::Mutex<Registry>>,
platform_service: Arc<dyn PlatformService>,
) -> Self {
Self::with_beans_and_platform_service(
registry,
Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
platform_service,
)
}
pub fn with_beans(
registry: Arc<std::sync::Mutex<Registry>>,
beans: Arc<std::sync::Mutex<BeanRegistry>>,
) -> Self {
Self::with_beans_and_platform_service(
registry,
beans,
Arc::new(NoopPlatformService::default()),
)
}
fn with_beans_and_platform_service(
registry: Arc<std::sync::Mutex<Registry>>,
beans: Arc<std::sync::Mutex<BeanRegistry>>,
platform_service: Arc<dyn PlatformService>,
) -> Self {
Self {
routes: RouteRegistry::new(),
registry,
languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
beans,
runtime: None,
global_error_handler: None,
crash_notifier: None,
tracing_enabled: false,
tracer_detail_level: DetailLevel::Minimal,
tracer_metrics: None,
platform_service,
function_invoker: None,
health_registry: None,
}
}
pub fn with_languages(
registry: Arc<std::sync::Mutex<Registry>>,
languages: SharedLanguageRegistry,
platform_service: Arc<dyn PlatformService>,
) -> Self {
Self {
routes: RouteRegistry::new(),
registry,
languages,
beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
runtime: None,
global_error_handler: None,
crash_notifier: None,
tracing_enabled: false,
tracer_detail_level: DetailLevel::Minimal,
tracer_metrics: None,
platform_service,
function_invoker: None,
health_registry: None,
}
}
pub fn with_languages_and_beans(
registry: Arc<std::sync::Mutex<Registry>>,
languages: SharedLanguageRegistry,
platform_service: Arc<dyn PlatformService>,
beans: Arc<std::sync::Mutex<BeanRegistry>>,
) -> Self {
Self {
routes: RouteRegistry::new(),
registry,
languages,
beans,
runtime: None,
global_error_handler: None,
crash_notifier: None,
tracing_enabled: false,
tracer_detail_level: DetailLevel::Minimal,
tracer_metrics: None,
platform_service,
function_invoker: None,
health_registry: None,
}
}
pub fn with_function_invoker(mut self, function_invoker: Arc<dyn FunctionInvoker>) -> Self {
self.function_invoker = Some(function_invoker);
self
}
pub fn set_health_registry(&mut self, registry: Arc<HealthCheckRegistry>) {
self.health_registry = Some(registry);
}
pub fn set_function_invoker(&mut self, invoker: Arc<dyn FunctionInvoker>) {
self.function_invoker = Some(invoker);
}
pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
self.runtime = Some(Arc::downgrade(&runtime));
}
pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
self.crash_notifier = Some(tx);
}
pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
self.global_error_handler = Some(config);
}
pub fn set_tracer_config(&mut self, config: &TracerConfig) {
self.tracing_enabled = config.enabled;
self.tracer_detail_level = config.detail_level.clone();
self.tracer_metrics = config.metrics_collector.clone();
}
fn build_producer_context(&self, route_id: &str) -> Result<ProducerContext, CamelError> {
let mut producer_ctx = ProducerContext::new().with_route_id(route_id);
if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
producer_ctx = producer_ctx.with_runtime(runtime);
}
Ok(producer_ctx)
}
fn route_compiler_ext(&self) -> RouteCompilerExt<'_> {
RouteCompilerExt {
registry: &self.registry,
languages: &self.languages,
beans: &self.beans,
function_invoker: &self.function_invoker,
tracing_enabled: self.tracing_enabled,
tracer_detail_level: &self.tracer_detail_level,
tracer_metrics: &self.tracer_metrics,
platform_service: &self.platform_service,
runtime: &self.runtime,
global_error_handler: &self.global_error_handler,
health_registry: &self.health_registry,
route_registry: &self.routes,
}
}
pub(crate) fn resolve_steps(
&self,
steps: Vec<BuilderStep>,
producer_ctx: &ProducerContext,
registry: &Arc<std::sync::Mutex<Registry>>,
route_id: Option<&str>,
staging_mode: &super::step_resolution::FunctionStagingMode,
) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
let component_ctx = Arc::new(ControllerComponentContext::new(
Arc::clone(registry),
Arc::clone(&self.languages),
self.tracer_metrics
.clone()
.unwrap_or_else(|| Arc::new(NoOpMetrics)),
Arc::clone(&self.platform_service),
self.health_registry(),
route_id.map(|s| s.to_string()),
));
let rt: Arc<dyn camel_component_api::RuntimeObservability> =
Arc::clone(&component_ctx) as Arc<_>;
super::step_resolution::resolve_steps(
steps,
producer_ctx,
rt,
registry,
&self.languages,
&self.beans,
self.function_invoker.clone(),
component_ctx,
route_id,
staging_mode,
)
}
pub async fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
let route_id = definition.route_id().to_string();
if self.routes.contains_key(&route_id) {
return Err(CamelError::RouteError(format!(
"Route '{}' already exists",
route_id
)));
}
info!(route_id = %route_id, "Adding route to controller");
let prepared = match self.build_managed_route(
definition,
&super::step_resolution::FunctionStagingMode::DirectAdd,
) {
Ok(prepared) => prepared,
Err(err) => {
self.discard_function_staging();
return Err(err);
}
};
if let Some(invoker) = &self.function_invoker
&& let Err(err) = invoker.commit_staged().await
{
invoker.discard_staging(0);
return Err(CamelError::Config(err.to_string()));
}
self.routes
.insert(prepared.route_id.clone(), prepared.managed);
Ok(())
}
fn build_managed_route(
&self,
definition: RouteDefinition,
staging_mode: &super::step_resolution::FunctionStagingMode,
) -> Result<PreparedRoute, CamelError> {
let route_id = definition.route_id().to_string();
let definition_info = definition.to_info();
let RouteDefinition {
from_uri,
steps,
error_handler,
circuit_breaker,
security_policy,
security_authenticator,
unit_of_work,
concurrency,
..
} = definition;
let producer_ctx = self.build_producer_context(&route_id)?;
let mut aggregate_split: Option<AggregateSplitInfo> = None;
let processors_with_contracts = match find_top_level_aggregate_requiring_split(&steps) {
Some((idx, agg_config)) => {
let mut pre_steps = steps;
let mut rest = pre_steps.split_off(idx);
let _agg_step = rest.remove(0);
let post_steps = rest;
let pre_pairs = self.resolve_steps(
pre_steps,
&producer_ctx,
&self.registry,
Some(&route_id),
staging_mode,
)?;
let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
let pre_pipeline =
super::pipeline_runtime::new_shared_pipeline(compose_pipeline(pre_procs));
let post_pairs = self.resolve_steps(
post_steps,
&producer_ctx,
&self.registry,
Some(&route_id),
staging_mode,
)?;
let post_procs: Vec<BoxProcessor> =
post_pairs.into_iter().map(|(p, _)| p).collect();
let post_pipeline =
super::pipeline_runtime::new_shared_pipeline(compose_pipeline(post_procs));
aggregate_split = Some(AggregateSplitInfo {
pre_pipeline,
agg_config,
post_pipeline,
});
vec![]
}
None => self.resolve_steps(
steps,
&producer_ctx,
&self.registry,
Some(&route_id),
staging_mode,
)?,
};
let route_id_for_tracing = route_id.clone();
let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
let mut pipeline = build_eh_config_pipeline(
eh_config.as_ref(),
Arc::clone(&self.registry),
Arc::clone(&self.languages),
self.tracer_metrics.clone(),
Arc::clone(&self.platform_service),
self.health_registry(),
&route_id_for_tracing,
&producer_ctx,
processors_with_contracts,
self.tracing_enabled,
self.tracer_detail_level.clone(),
security_policy.clone(),
circuit_breaker,
)?;
let uow_counter = if let Some(uow_config) = &unit_of_work {
let component_ctx = Arc::new(ControllerComponentContext::new(
Arc::clone(&self.registry),
Arc::clone(&self.languages),
self.tracer_metrics
.clone()
.unwrap_or_else(|| Arc::new(NoOpMetrics)),
Arc::clone(&self.platform_service),
self.health_registry(),
Some(route_id.clone()),
));
let rt: Arc<dyn camel_component_api::RuntimeObservability> =
Arc::clone(&component_ctx) as Arc<_>;
let (uow_layer, counter) = super::route_compiler_ext::resolve_uow_layer(
uow_config,
&producer_ctx,
rt,
component_ctx.as_ref(),
None,
)?;
pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
Some(counter)
} else {
None
};
Ok(PreparedRoute {
route_id,
managed: ManagedRoute {
definition: definition_info,
from_uri,
pipeline: super::pipeline_runtime::new_shared_pipeline(pipeline),
concurrency,
consumer_handle: None,
pipeline_handle: None,
consumer_cancel_token: CancellationToken::new(),
pipeline_cancel_token: CancellationToken::new(),
channel_sender: None,
in_flight: uow_counter,
aggregate_split,
agg_service: None,
compiled: route_runtime_state::CompiledRoute {
security_policy,
security_authenticator,
},
},
})
}
pub(crate) fn insert_prepared_route(
&mut self,
prepared: PreparedRoute,
) -> Result<(), CamelError> {
if self.routes.contains_key(&prepared.route_id) {
return Err(CamelError::RouteError(format!(
"Route '{}' already exists",
prepared.route_id
)));
}
self.routes
.insert(prepared.route_id.clone(), prepared.managed);
Ok(())
}
pub async fn add_route_with_generation(
&mut self,
definition: RouteDefinition,
generation: u64,
) -> Result<(), CamelError> {
let route_id = definition.route_id().to_string();
if self.routes.contains_key(&route_id) {
return Err(CamelError::RouteError(format!(
"Route '{}' already exists",
route_id
)));
}
info!(route_id = %route_id, generation, "Adding route to controller with generation");
let prepared = self.build_managed_route(
definition,
&super::step_resolution::FunctionStagingMode::HotReload { generation },
)?;
self.routes
.insert(prepared.route_id.clone(), prepared.managed);
Ok(())
}
pub(crate) fn prepare_route_definition_with_generation(
&self,
definition: RouteDefinition,
generation: u64,
) -> Result<PreparedRoute, CamelError> {
self.build_managed_route(
definition,
&super::step_resolution::FunctionStagingMode::HotReload { generation },
)
}
pub async fn remove_route_preserving_functions(
&mut self,
route_id: &str,
) -> Result<(), CamelError> {
let managed = self.routes.get(route_id).ok_or_else(|| {
CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
})?;
if handle_is_running(&managed.consumer_handle)
|| handle_is_running(&managed.pipeline_handle)
{
return Err(CamelError::RouteError(format!(
"Route '{}' must be stopped before removal (current execution lifecycle: {})",
route_id,
inferred_lifecycle_label(managed)
)));
}
self.routes.remove(route_id);
if let Some(reg) = &self.health_registry {
reg.unregister_for_route(route_id);
}
info!(route_id = %route_id, "Route removed from controller (functions preserved for reload finalize)");
Ok(())
}
pub fn compile_route_definition(
&self,
def: RouteDefinition,
) -> Result<BoxProcessor, CamelError> {
self.route_compiler_ext().compile_route_definition(def)
}
pub fn compile_route_definition_with_generation(
&self,
def: RouteDefinition,
generation: u64,
) -> Result<BoxProcessor, CamelError> {
self.route_compiler_ext()
.compile_route_definition_with_generation(def, generation)
}
pub async fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self.routes.get(route_id).ok_or_else(|| {
CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
})?;
if handle_is_running(&managed.consumer_handle)
|| handle_is_running(&managed.pipeline_handle)
{
return Err(CamelError::RouteError(format!(
"Route '{}' must be stopped before removal (current execution lifecycle: {})",
route_id,
inferred_lifecycle_label(managed)
)));
}
if let Some(invoker) = &self.function_invoker {
for (id, rid) in self.collect_function_refs(route_id) {
if let Err(e) = invoker.unregister(&id, rid.as_deref()).await {
warn!(route_id = %route_id, error = %e, "Failed to unregister function during route removal");
}
}
}
self.routes.remove(route_id);
if let Some(reg) = &self.health_registry {
reg.unregister_for_route(route_id);
}
info!(route_id = %route_id, "Route removed from controller");
Ok(())
}
fn collect_function_refs(
&self,
route_id: &str,
) -> Vec<(camel_api::FunctionId, Option<String>)> {
self.function_invoker
.as_ref()
.map(|invoker| invoker.function_refs_for_route(route_id))
.unwrap_or_default()
}
fn discard_function_staging(&self) {
if let Some(invoker) = &self.function_invoker {
invoker.discard_staging(0);
}
}
pub fn route_count(&self) -> usize {
self.routes.route_count()
}
pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
self.routes.in_flight_count(route_id)
}
pub fn route_exists(&self, route_id: &str) -> bool {
self.routes.route_exists(route_id)
}
pub fn route_ids(&self) -> Vec<String> {
self.routes.route_ids()
}
pub fn route_source_hash(&self, route_id: &str) -> Option<u64> {
self.routes.route_source_hash(route_id)
}
pub fn auto_startup_route_ids(&self) -> Vec<String> {
self.routes.auto_startup_route_ids()
}
pub fn shutdown_route_ids(&self) -> Vec<String> {
self.routes.shutdown_route_ids()
}
pub fn swap_pipeline(
&self,
route_id: &str,
new_pipeline: BoxProcessor,
) -> Result<(), CamelError> {
let managed = self
.routes
.get(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
if managed.aggregate_split.is_some() {
tracing::warn!(
route_id = %route_id,
"swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
);
}
super::pipeline_runtime::swap_pipeline(&managed.pipeline, new_pipeline);
info!(route_id = %route_id, "Pipeline swapped atomically");
Ok(())
}
pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
self.routes.route_from_uri(route_id)
}
pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
self.routes.get_pipeline(route_id)
}
pub(super) async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
self.routes.stop_route(route_id).await
}
pub async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
self.start_route(route_id).await
}
pub async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
self.stop_route(route_id).await
}
}
impl DefaultRouteController {
#[allow(clippy::too_many_arguments)]
pub(super) async fn start_aggregate_route(
&mut self,
route_id: &str,
split: AggregateSplitInfo,
consumer: Box<dyn Consumer>,
consumer_ctx: ConsumerContext,
mut rx: mpsc::Receiver<ExchangeEnvelope>,
crash_notifier: Option<mpsc::Sender<CrashNotification>>,
runtime_for_consumer: Option<Weak<dyn RuntimeHandle>>,
tx_for_storage: mpsc::Sender<ExchangeEnvelope>,
pipeline_cancel: CancellationToken,
) -> Result<(), CamelError> {
let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
let route_cancel_clone = pipeline_cancel.clone();
let svc = AggregatorService::new(
split.agg_config.clone(),
late_tx,
Arc::clone(&self.languages),
route_cancel_clone,
);
let agg = Arc::new(std::sync::Mutex::new(svc));
let pipeline_cancel_for_monitor = pipeline_cancel.clone();
let agg_for_monitor = Arc::clone(&agg);
{
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist"); managed.agg_service = Some(Arc::clone(&agg));
}
let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
let pre_pipeline = split.pre_pipeline;
let post_pipeline = split.post_pipeline;
let pipeline_handle = tokio::spawn(async move {
loop {
tokio::select! {
biased;
late_ex = async {
let mut rx = late_rx.lock().await;
rx.recv().await
} => {
match late_ex {
Some(ex) => {
let pipe = post_pipeline.load();
if let Err(e) = pipe.clone_inner().oneshot(ex).await {
tracing::warn!(error = %e, "late exchange post-pipeline failed");
}
}
None => return,
}
}
envelope_opt = rx.recv() => {
match envelope_opt {
Some(envelope) => {
let ExchangeEnvelope { exchange, reply_tx } = envelope;
let pre_pipe = pre_pipeline.load();
let ex = match pre_pipe.clone_inner().oneshot(exchange).await {
Ok(ex) => ex,
Err(e) => {
if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
continue;
}
};
let ex = {
let cloned_svc = agg
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock") .clone();
cloned_svc.oneshot(ex).await
};
match ex {
Ok(ex) => {
if !is_pending(&ex) {
let post_pipe = post_pipeline.load();
let out = post_pipe.clone_inner().oneshot(ex).await;
if let Some(tx) = reply_tx { let _ = tx.send(out); }
} else if let Some(tx) = reply_tx {
let _ = tx.send(Ok(ex));
}
}
Err(e) => {
if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
}
}
}
None => return,
}
}
_ = pipeline_cancel.cancelled() => {
{
let guard = agg
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
}
let mut rx_guard = late_rx.lock().await;
while let Ok(late_ex) = rx_guard.try_recv() {
let pipe = post_pipeline.load();
let _ = pipe.clone_inner().oneshot(late_ex).await;
}
break;
}
}
}
});
#[cfg(test)]
emit_start_route_event("pipeline_spawned");
let consumer_handle = super::consumer_management::spawn_consumer_task(
route_id.to_string(),
consumer,
consumer_ctx,
crash_notifier,
runtime_for_consumer,
false,
);
let force_on_stop = agg_for_monitor
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock") .config()
.force_completion_on_stop;
let consumer_handle = tokio::spawn(async move {
let _ = consumer_handle.await;
if !pipeline_cancel_for_monitor.is_cancelled() {
let guard = agg_for_monitor
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock"); guard.force_complete_all();
drop(guard);
if force_on_stop {
pipeline_cancel_for_monitor.cancel();
}
}
});
#[cfg(test)]
emit_start_route_event("consumer_spawned");
{
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist"); managed.consumer_handle = Some(consumer_handle);
managed.pipeline_handle = Some(pipeline_handle);
managed.channel_sender = Some(tx_for_storage);
}
info!(route_id = %route_id, "Route started (aggregate with timeout)");
Ok(())
}
}
#[cfg(test)]
#[path = "route_controller_tests.rs"]
mod tests;