use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::Duration;
use tokio::sync::{Mutex, mpsc};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tower::{Layer, Service, ServiceExt};
use tracing::{error, info, warn};
use camel_api::UnitOfWorkConfig;
use camel_api::aggregator::AggregatorConfig;
use camel_api::error_handler::ErrorHandlerConfig;
use camel_api::metrics::MetricsCollector;
use camel_api::{
BoxProcessor, CamelError, Exchange, FilterPredicate, IdentityProcessor, ProducerContext,
RouteController, RuntimeCommand, RuntimeHandle, Value, body::Body,
};
use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
use camel_endpoint::parse_uri;
use camel_language_api::{Expression, Language, LanguageError, Predicate};
pub use camel_processor::aggregator::SharedLanguageRegistry;
use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
use camel_processor::circuit_breaker::CircuitBreakerLayer;
use camel_processor::error_handler::ErrorHandlerLayer;
use camel_processor::script_mutator::ScriptMutator;
use camel_processor::{ChoiceService, WhenClause};
use crate::lifecycle::adapters::exchange_uow::ExchangeUoWLayer;
use crate::lifecycle::adapters::route_compiler::{
compose_pipeline, compose_traced_pipeline_with_contracts,
};
use crate::lifecycle::application::route_definition::{
BuilderStep, LanguageExpressionDef, RouteDefinition, RouteDefinitionInfo, ValueSourceDef,
};
use crate::shared::components::domain::Registry;
use crate::shared::observability::domain::{DetailLevel, TracerConfig};
use arc_swap::ArcSwap;
use camel_bean::BeanRegistry;
#[derive(Debug, Clone)]
pub struct CrashNotification {
pub route_id: String,
pub error: String,
}
pub(crate) struct SyncBoxProcessor(pub(crate) BoxProcessor);
unsafe impl Sync for SyncBoxProcessor {}
type SharedPipeline = Arc<ArcSwap<SyncBoxProcessor>>;
#[async_trait::async_trait]
pub trait RouteControllerInternal: RouteController + Send {
fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError>;
fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError>;
fn route_from_uri(&self, route_id: &str) -> Option<String>;
fn set_error_handler(&mut self, config: ErrorHandlerConfig);
fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>);
fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>);
fn route_count(&self) -> usize;
fn in_flight_count(&self, route_id: &str) -> Option<u64>;
fn route_exists(&self, route_id: &str) -> bool;
fn route_ids(&self) -> Vec<String>;
fn auto_startup_route_ids(&self) -> Vec<String>;
fn shutdown_route_ids(&self) -> Vec<String>;
fn set_tracer_config(&mut self, config: &TracerConfig);
fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError>;
fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError>;
async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError>;
}
struct AggregateSplitInfo {
pre_pipeline: SharedPipeline,
agg_config: AggregatorConfig,
post_pipeline: SharedPipeline,
}
struct ManagedRoute {
definition: RouteDefinitionInfo,
from_uri: String,
pipeline: SharedPipeline,
concurrency: Option<ConcurrencyModel>,
consumer_handle: Option<JoinHandle<()>>,
pipeline_handle: Option<JoinHandle<()>>,
consumer_cancel_token: CancellationToken,
pipeline_cancel_token: CancellationToken,
channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
aggregate_split: Option<AggregateSplitInfo>,
agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
}
fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
handle.as_ref().is_some_and(|h| !h.is_finished())
}
fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
match (
handle_is_running(&managed.consumer_handle),
handle_is_running(&managed.pipeline_handle),
) {
(true, true) => "Started",
(false, true) => "Suspended",
(true, false) => "Stopping",
(false, false) => "Stopped",
}
}
fn find_top_level_aggregate_with_timeout(
steps: &[BuilderStep],
) -> Option<(usize, AggregatorConfig)> {
for (i, step) in steps.iter().enumerate() {
if let BuilderStep::Aggregate { config } = step {
if has_timeout_condition(&config.completion) {
return Some((i, config.clone()));
}
break;
}
}
None
}
fn is_pending(ex: &Exchange) -> bool {
ex.property("CamelAggregatorPending")
.and_then(|v| v.as_bool())
.unwrap_or(false)
}
async fn ready_with_backoff(
pipeline: &mut BoxProcessor,
cancel: &CancellationToken,
) -> Result<(), CamelError> {
loop {
match pipeline.ready().await {
Ok(_) => return Ok(()),
Err(CamelError::CircuitOpen(ref msg)) => {
warn!("Circuit open, backing off: {msg}");
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {
continue;
}
_ = cancel.cancelled() => {
return Err(CamelError::CircuitOpen(msg.clone()));
}
}
}
Err(e) => {
error!("Pipeline not ready: {e}");
return Err(e);
}
}
}
}
fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
RuntimeCommand::FailRoute {
route_id: route_id.to_string(),
error: error.to_string(),
command_id: format!("ctrl-fail-{route_id}-{stamp}"),
causation_id: None,
}
}
async fn publish_runtime_failure(
runtime: Option<Weak<dyn RuntimeHandle>>,
route_id: &str,
error: &str,
) {
let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
return;
};
let command = runtime_failure_command(route_id, error);
if let Err(runtime_error) = runtime.execute(command).await {
warn!(
route_id = %route_id,
error = %runtime_error,
"failed to synchronize route crash with runtime projection"
);
}
}
pub struct DefaultRouteController {
routes: HashMap<String, ManagedRoute>,
registry: Arc<std::sync::Mutex<Registry>>,
languages: SharedLanguageRegistry,
beans: Arc<std::sync::Mutex<BeanRegistry>>,
self_ref: Option<Arc<Mutex<dyn RouteController>>>,
runtime: Option<Weak<dyn RuntimeHandle>>,
global_error_handler: Option<ErrorHandlerConfig>,
crash_notifier: Option<mpsc::Sender<CrashNotification>>,
tracing_enabled: bool,
tracer_detail_level: DetailLevel,
tracer_metrics: Option<Arc<dyn MetricsCollector>>,
}
impl DefaultRouteController {
pub fn new(registry: Arc<std::sync::Mutex<Registry>>) -> Self {
Self::with_beans(
registry,
Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
)
}
pub fn with_beans(
registry: Arc<std::sync::Mutex<Registry>>,
beans: Arc<std::sync::Mutex<BeanRegistry>>,
) -> Self {
Self {
routes: HashMap::new(),
registry,
languages: Arc::new(std::sync::Mutex::new(HashMap::new())),
beans,
self_ref: None,
runtime: None,
global_error_handler: None,
crash_notifier: None,
tracing_enabled: false,
tracer_detail_level: DetailLevel::Minimal,
tracer_metrics: None,
}
}
pub fn with_languages(
registry: Arc<std::sync::Mutex<Registry>>,
languages: SharedLanguageRegistry,
) -> Self {
Self {
routes: HashMap::new(),
registry,
languages,
beans: Arc::new(std::sync::Mutex::new(BeanRegistry::new())),
self_ref: None,
runtime: None,
global_error_handler: None,
crash_notifier: None,
tracing_enabled: false,
tracer_detail_level: DetailLevel::Minimal,
tracer_metrics: None,
}
}
pub fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
self.self_ref = Some(self_ref);
}
pub fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
self.runtime = Some(Arc::downgrade(&runtime));
}
pub fn self_ref_for_supervision(&self) -> Option<Arc<Mutex<dyn RouteController>>> {
self.self_ref.clone()
}
pub fn runtime_handle_for_supervision(&self) -> Option<Arc<dyn RuntimeHandle>> {
self.runtime.as_ref().and_then(Weak::upgrade)
}
pub fn set_crash_notifier(&mut self, tx: mpsc::Sender<CrashNotification>) {
self.crash_notifier = Some(tx);
}
pub fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
self.global_error_handler = Some(config);
}
pub fn set_tracer_config(&mut self, config: &TracerConfig) {
self.tracing_enabled = config.enabled;
self.tracer_detail_level = config.detail_level.clone();
self.tracer_metrics = config.metrics_collector.clone();
}
fn build_producer_context(&self) -> Result<ProducerContext, CamelError> {
let mut producer_ctx = ProducerContext::new();
if let Some(runtime) = self.runtime.as_ref().and_then(Weak::upgrade) {
producer_ctx = producer_ctx.with_runtime(runtime);
}
Ok(producer_ctx)
}
fn resolve_error_handler(
&self,
config: ErrorHandlerConfig,
producer_ctx: &ProducerContext,
registry: &Registry,
) -> Result<ErrorHandlerLayer, CamelError> {
let dlc_producer = if let Some(ref uri) = config.dlc_uri {
let parsed = parse_uri(uri)?;
let component = registry.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(uri)?;
Some(endpoint.create_producer(producer_ctx)?)
} else {
None
};
let mut resolved_policies = Vec::new();
for policy in config.policies {
let handler_producer = if let Some(ref uri) = policy.handled_by {
let parsed = parse_uri(uri)?;
let component = registry.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(uri)?;
Some(endpoint.create_producer(producer_ctx)?)
} else {
None
};
resolved_policies.push((policy, handler_producer));
}
Ok(ErrorHandlerLayer::new(dlc_producer, resolved_policies))
}
fn resolve_uow_layer(
&self,
config: &UnitOfWorkConfig,
producer_ctx: &ProducerContext,
registry: &Registry,
counter: Option<Arc<AtomicU64>>,
) -> Result<(ExchangeUoWLayer, Arc<AtomicU64>), CamelError> {
let resolve_uri = |uri: &str| -> Result<BoxProcessor, CamelError> {
let parsed = parse_uri(uri)?;
let component = registry.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(uri)?;
endpoint.create_producer(producer_ctx).map_err(|e| {
CamelError::RouteError(format!("UoW hook URI '{uri}' could not be resolved: {e}"))
})
};
let on_complete = config.on_complete.as_deref().map(resolve_uri).transpose()?;
let on_failure = config.on_failure.as_deref().map(resolve_uri).transpose()?;
let counter = counter.unwrap_or_else(|| Arc::new(AtomicU64::new(0)));
let layer = ExchangeUoWLayer::new(Arc::clone(&counter), on_complete, on_failure);
Ok((layer, counter))
}
fn resolve_language(&self, language: &str) -> Result<Arc<dyn Language>, CamelError> {
let guard = self
.languages
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock");
guard.get(language).cloned().ok_or_else(|| {
CamelError::RouteError(format!(
"language `{language}` is not registered in CamelContext"
))
})
}
fn compile_language_expression(
&self,
expression: &LanguageExpressionDef,
) -> Result<Arc<dyn Expression>, CamelError> {
let language = self.resolve_language(&expression.language)?;
let compiled = language
.create_expression(&expression.source)
.map_err(|e| {
CamelError::RouteError(format!(
"failed to compile {} expression `{}`: {e}",
expression.language, expression.source
))
})?;
Ok(Arc::from(compiled))
}
fn compile_language_predicate(
&self,
expression: &LanguageExpressionDef,
) -> Result<Arc<dyn Predicate>, CamelError> {
let language = self.resolve_language(&expression.language)?;
let compiled = language.create_predicate(&expression.source).map_err(|e| {
CamelError::RouteError(format!(
"failed to compile {} predicate `{}`: {e}",
expression.language, expression.source
))
})?;
Ok(Arc::from(compiled))
}
fn compile_filter_predicate(
&self,
expression: &LanguageExpressionDef,
) -> Result<FilterPredicate, CamelError> {
let predicate = self.compile_language_predicate(expression)?;
Ok(Arc::new(move |exchange: &Exchange| {
predicate.matches(exchange).unwrap_or(false)
}))
}
fn value_to_body(value: Value) -> Body {
match value {
Value::Null => Body::Empty,
Value::String(text) => Body::Text(text),
other => Body::Json(other),
}
}
pub(crate) fn resolve_steps(
&self,
steps: Vec<BuilderStep>,
producer_ctx: &ProducerContext,
registry: &Arc<std::sync::Mutex<Registry>>,
) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
let parsed = parse_uri(uri)?;
let registry_guard = registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock");
let component = registry_guard.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(uri)?;
endpoint.create_producer(producer_ctx)
};
let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
for step in steps {
match step {
BuilderStep::Processor(svc) => {
processors.push((svc, None));
}
BuilderStep::To(uri) => {
let parsed = parse_uri(&uri)?;
let registry_guard = registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock");
let component = registry_guard.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(&uri)?;
let contract = endpoint.body_contract();
let producer = endpoint.create_producer(producer_ctx)?;
processors.push((producer, contract));
}
BuilderStep::Stop => {
processors.push((BoxProcessor::new(camel_processor::StopService), None));
}
BuilderStep::Delay { config } => {
let svc = camel_processor::delayer::DelayerService::new(config);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Log { level, message } => {
let svc = camel_processor::LogProcessor::new(level, message);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeSetHeader { key, value } => match value {
ValueSourceDef::Literal(value) => {
let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
processors.push((BoxProcessor::new(svc), None));
}
ValueSourceDef::Expression(expression) => {
let expression = self.compile_language_expression(&expression)?;
let svc = camel_processor::DynamicSetHeader::new(
IdentityProcessor,
key,
move |exchange: &Exchange| {
expression.evaluate(exchange).unwrap_or(Value::Null)
},
);
processors.push((BoxProcessor::new(svc), None));
}
},
BuilderStep::DeclarativeSetBody { value } => match value {
ValueSourceDef::Literal(value) => {
let body = Self::value_to_body(value);
let svc = camel_processor::SetBody::new(
IdentityProcessor,
move |_exchange: &Exchange| body.clone(),
);
processors.push((BoxProcessor::new(svc), None));
}
ValueSourceDef::Expression(expression) => {
let expression = self.compile_language_expression(&expression)?;
let svc = camel_processor::SetBody::new(
IdentityProcessor,
move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
Self::value_to_body(value)
},
);
processors.push((BoxProcessor::new(svc), None));
}
},
BuilderStep::DeclarativeFilter { predicate, steps } => {
let predicate = self.compile_filter_predicate(&predicate)?;
let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc =
camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeChoice { whens, otherwise } => {
let mut when_clauses = Vec::new();
for when_step in whens {
let predicate = self.compile_filter_predicate(&when_step.predicate)?;
let sub_pairs =
self.resolve_steps(when_step.steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let pipeline = compose_pipeline(sub_processors);
when_clauses.push(WhenClause {
predicate,
pipeline,
});
}
let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
let sub_pairs =
self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
Some(compose_pipeline(sub_processors))
} else {
None
};
let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeScript { expression } => {
let lang = self.resolve_language(&expression.language)?;
match lang.create_mutating_expression(&expression.source) {
Ok(mut_expr) => {
processors
.push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
}
Err(LanguageError::NotSupported { .. }) => {
let expression = self.compile_language_expression(&expression)?;
let svc = camel_processor::SetBody::new(
IdentityProcessor,
move |exchange: &Exchange| {
let value =
expression.evaluate(exchange).unwrap_or(Value::Null);
Self::value_to_body(value)
},
);
processors.push((BoxProcessor::new(svc), None));
}
Err(e) => {
return Err(CamelError::RouteError(format!(
"Failed to create mutating expression for language '{}': {}",
expression.language, e
)));
}
}
}
BuilderStep::Split { config, steps } => {
let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let splitter =
camel_processor::splitter::SplitterService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(splitter), None));
}
BuilderStep::DeclarativeSplit {
expression,
aggregation,
parallel,
parallel_limit,
stop_on_exception,
steps,
} => {
let lang_expr = self.compile_language_expression(&expression)?;
let split_fn = move |exchange: &Exchange| {
let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::String(s) => s
.lines()
.filter(|line| !line.is_empty())
.map(|line| {
let mut fragment = exchange.clone();
fragment.input.body = Body::from(line.to_string());
fragment
})
.collect(),
Value::Array(arr) => arr
.into_iter()
.map(|v| {
let mut fragment = exchange.clone();
fragment.input.body = Body::from(v);
fragment
})
.collect(),
_ => vec![exchange.clone()],
}
};
let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
.aggregation(aggregation)
.parallel(parallel)
.stop_on_exception(stop_on_exception);
if let Some(limit) = parallel_limit {
config = config.parallel_limit(limit);
}
let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let splitter =
camel_processor::splitter::SplitterService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(splitter), None));
}
BuilderStep::Aggregate { config } => {
let (late_tx, _late_rx) = mpsc::channel(256);
let registry: SharedLanguageRegistry =
Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
let cancel = CancellationToken::new();
let svc =
camel_processor::AggregatorService::new(config, late_tx, registry, cancel);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Filter { predicate, steps } => {
let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc =
camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Choice { whens, otherwise } => {
let mut when_clauses = Vec::new();
for when_step in whens {
let sub_pairs =
self.resolve_steps(when_step.steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let pipeline = compose_pipeline(sub_processors);
when_clauses.push(WhenClause {
predicate: when_step.predicate,
pipeline,
});
}
let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
let sub_pairs =
self.resolve_steps(otherwise_steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
Some(compose_pipeline(sub_processors))
} else {
None
};
let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::WireTap { uri } => {
let producer = resolve_producer(&uri)?;
let svc = camel_processor::WireTapService::new(producer);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Multicast { config, steps } => {
let mut endpoints = Vec::new();
for step in steps {
let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let endpoint = compose_pipeline(sub_processors);
endpoints.push(endpoint);
}
let svc = camel_processor::MulticastService::new(endpoints, config);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeLog { level, message } => {
let ValueSourceDef::Expression(expression) = message else {
unreachable!(
"DeclarativeLog with Literal should have been compiled to a Processor"
);
};
let expression = self.compile_language_expression(&expression)?;
let svc =
camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
expression
.evaluate(exchange)
.unwrap_or_else(|e| {
warn!(error = %e, "log expression evaluation failed");
Value::Null
})
.to_string()
});
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Bean { name, method } => {
let beans = self.beans.lock().expect(
"beans mutex poisoned: another thread panicked while holding this lock",
);
let bean = beans.get(&name).ok_or_else(|| {
CamelError::ProcessorError(format!("Bean not found: {}", name))
})?;
let bean_clone = Arc::clone(&bean);
let method = method.clone();
let processor = tower::service_fn(move |mut exchange: Exchange| {
let bean = Arc::clone(&bean_clone);
let method = method.clone();
async move {
bean.call(&method, &mut exchange).await?;
Ok(exchange)
}
});
processors.push((BoxProcessor::new(processor), None));
}
BuilderStep::Script { language, script } => {
let lang = self.resolve_language(&language)?;
match lang.create_mutating_expression(&script) {
Ok(mut_expr) => {
processors
.push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
}
Err(LanguageError::NotSupported {
feature,
language: ref lang_name,
}) => {
return Err(CamelError::RouteError(format!(
"Language '{}' does not support {} (required for .script() step)",
lang_name, feature
)));
}
Err(e) => {
return Err(CamelError::RouteError(format!(
"Failed to create mutating expression for language '{}': {}",
language, e
)));
}
}
}
BuilderStep::Throttle { config, steps } => {
let sub_pairs = self.resolve_steps(steps, producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc =
camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::LoadBalance { config, steps } => {
let mut endpoints = Vec::new();
for step in steps {
let sub_pairs = self.resolve_steps(vec![step], producer_ctx, registry)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let endpoint = compose_pipeline(sub_processors);
endpoints.push(endpoint);
}
let svc =
camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DynamicRouter { config } => {
use camel_api::EndpointResolver;
let producer_ctx_clone = producer_ctx.clone();
let registry_clone = Arc::clone(registry);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let registry_guard = match registry_clone.lock() {
Ok(g) => g,
Err(_) => return None, };
let component = match registry_guard.get_or_err(&parsed.scheme) {
Ok(c) => c,
Err(_) => return None,
};
let endpoint = match component.create_endpoint(uri) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc = camel_processor::dynamic_router::DynamicRouterService::new(
config, resolver,
);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeDynamicRouter {
expression,
uri_delimiter,
cache_size,
ignore_invalid_endpoints,
max_iterations,
} => {
use camel_api::EndpointResolver;
let expression = self.compile_language_expression(&expression)?;
let expression: camel_api::RouterExpression =
Arc::new(move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::Null => None,
Value::String(s) => Some(s),
other => Some(other.to_string()),
}
});
let config = camel_api::DynamicRouterConfig::new(expression)
.uri_delimiter(uri_delimiter)
.cache_size(cache_size)
.ignore_invalid_endpoints(ignore_invalid_endpoints)
.max_iterations(max_iterations);
let producer_ctx_clone = producer_ctx.clone();
let registry_clone = Arc::clone(registry);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let registry_guard = match registry_clone.lock() {
Ok(g) => g,
Err(_) => return None,
};
let component = match registry_guard.get_or_err(&parsed.scheme) {
Ok(c) => c,
Err(_) => return None,
};
let endpoint = match component.create_endpoint(uri) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc = camel_processor::dynamic_router::DynamicRouterService::new(
config, resolver,
);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::RoutingSlip { config } => {
use camel_api::EndpointResolver;
let producer_ctx_clone = producer_ctx.clone();
let registry_clone = Arc::clone(registry);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let registry_guard = match registry_clone.lock() {
Ok(g) => g,
Err(_) => return None,
};
let component = match registry_guard.get_or_err(&parsed.scheme) {
Ok(c) => c,
Err(_) => return None,
};
let endpoint = match component.create_endpoint(uri) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc =
camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeRoutingSlip {
expression,
uri_delimiter,
cache_size,
ignore_invalid_endpoints,
} => {
use camel_api::EndpointResolver;
let expression = self.compile_language_expression(&expression)?;
let expression: camel_api::RoutingSlipExpression =
Arc::new(move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::Null => None,
Value::String(s) => Some(s),
other => Some(other.to_string()),
}
});
let config = camel_api::RoutingSlipConfig::new(expression)
.uri_delimiter(uri_delimiter)
.cache_size(cache_size)
.ignore_invalid_endpoints(ignore_invalid_endpoints);
let producer_ctx_clone = producer_ctx.clone();
let registry_clone = Arc::clone(registry);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let registry_guard = match registry_clone.lock() {
Ok(g) => g,
Err(_) => return None,
};
let component = match registry_guard.get_or_err(&parsed.scheme) {
Ok(c) => c,
Err(_) => return None,
};
let endpoint = match component.create_endpoint(uri) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc =
camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
}
}
Ok(processors)
}
pub fn add_route(&mut self, definition: RouteDefinition) -> Result<(), CamelError> {
let route_id = definition.route_id().to_string();
if self.routes.contains_key(&route_id) {
return Err(CamelError::RouteError(format!(
"Route '{}' already exists",
route_id
)));
}
info!(route_id = %route_id, "Adding route to controller");
let definition_info = definition.to_info();
let RouteDefinition {
from_uri,
steps,
error_handler,
circuit_breaker,
unit_of_work,
concurrency,
..
} = definition;
let producer_ctx = self.build_producer_context()?;
let mut aggregate_split: Option<AggregateSplitInfo> = None;
let processors_with_contracts = match find_top_level_aggregate_with_timeout(&steps) {
Some((idx, agg_config)) => {
let mut pre_steps = steps;
let mut rest = pre_steps.split_off(idx);
let _agg_step = rest.remove(0);
let post_steps = rest;
let pre_pairs = self.resolve_steps(pre_steps, &producer_ctx, &self.registry)?;
let pre_procs: Vec<BoxProcessor> = pre_pairs.into_iter().map(|(p, _)| p).collect();
let pre_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
compose_pipeline(pre_procs),
)));
let post_pairs = self.resolve_steps(post_steps, &producer_ctx, &self.registry)?;
let post_procs: Vec<BoxProcessor> =
post_pairs.into_iter().map(|(p, _)| p).collect();
let post_pipeline = Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(
compose_pipeline(post_procs),
)));
aggregate_split = Some(AggregateSplitInfo {
pre_pipeline,
agg_config,
post_pipeline,
});
vec![]
}
None => self.resolve_steps(steps, &producer_ctx, &self.registry)?,
};
let route_id_for_tracing = route_id.clone();
let mut pipeline = if processors_with_contracts.is_empty() {
BoxProcessor::new(IdentityProcessor)
} else {
compose_traced_pipeline_with_contracts(
processors_with_contracts,
&route_id_for_tracing,
self.tracing_enabled,
self.tracer_detail_level.clone(),
self.tracer_metrics.clone(),
)
};
if let Some(cb_config) = circuit_breaker {
let cb_layer = CircuitBreakerLayer::new(cb_config);
pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
}
let eh_config = error_handler.or_else(|| self.global_error_handler.clone());
if let Some(config) = eh_config {
let registry = self
.registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock");
let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
pipeline = BoxProcessor::new(layer.layer(pipeline));
}
let uow_counter = if let Some(uow_config) = &unit_of_work {
let registry = self
.registry
.lock()
.expect("mutex poisoned: registry lock in add_route uow");
let (uow_layer, counter) =
self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, None)?;
pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
Some(counter)
} else {
None
};
self.routes.insert(
route_id.clone(),
ManagedRoute {
definition: definition_info,
from_uri,
pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(pipeline))),
concurrency,
consumer_handle: None,
pipeline_handle: None,
consumer_cancel_token: CancellationToken::new(),
pipeline_cancel_token: CancellationToken::new(),
channel_sender: None,
in_flight: uow_counter,
aggregate_split,
agg_service: None,
},
);
Ok(())
}
pub fn compile_route_definition(
&self,
def: RouteDefinition,
) -> Result<BoxProcessor, CamelError> {
let route_id = def.route_id().to_string();
let producer_ctx = self.build_producer_context()?;
let processors_with_contracts =
self.resolve_steps(def.steps, &producer_ctx, &self.registry)?;
let mut pipeline = compose_traced_pipeline_with_contracts(
processors_with_contracts,
&route_id,
self.tracing_enabled,
self.tracer_detail_level.clone(),
self.tracer_metrics.clone(),
);
if let Some(cb_config) = def.circuit_breaker {
let cb_layer = CircuitBreakerLayer::new(cb_config);
pipeline = BoxProcessor::new(cb_layer.layer(pipeline));
}
let eh_config = def
.error_handler
.clone()
.or_else(|| self.global_error_handler.clone());
if let Some(config) = eh_config {
let registry = self
.registry
.lock()
.expect("mutex poisoned: registry lock in compile_route_definition");
let layer = self.resolve_error_handler(config, &producer_ctx, ®istry)?;
pipeline = BoxProcessor::new(layer.layer(pipeline));
}
if let Some(uow_config) = &def.unit_of_work {
let existing_counter = self
.routes
.get(&route_id)
.and_then(|r| r.in_flight.as_ref().map(Arc::clone));
let registry = self
.registry
.lock()
.expect("mutex poisoned: registry lock in compile_route_definition uow");
let (uow_layer, _counter) =
self.resolve_uow_layer(uow_config, &producer_ctx, ®istry, existing_counter)?;
pipeline = BoxProcessor::new(uow_layer.layer(pipeline));
}
Ok(pipeline)
}
pub fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self.routes.get(route_id).ok_or_else(|| {
CamelError::RouteError(format!("Route '{}' not found for removal", route_id))
})?;
if handle_is_running(&managed.consumer_handle)
|| handle_is_running(&managed.pipeline_handle)
{
return Err(CamelError::RouteError(format!(
"Route '{}' must be stopped before removal (current execution lifecycle: {})",
route_id,
inferred_lifecycle_label(managed)
)));
}
self.routes.remove(route_id);
info!(route_id = %route_id, "Route removed from controller");
Ok(())
}
pub fn route_count(&self) -> usize {
self.routes.len()
}
pub fn in_flight_count(&self, route_id: &str) -> Option<u64> {
self.routes.get(route_id).map(|r| {
r.in_flight
.as_ref()
.map_or(0, |c| c.load(Ordering::Relaxed))
})
}
pub fn route_exists(&self, route_id: &str) -> bool {
self.routes.contains_key(route_id)
}
pub fn route_ids(&self) -> Vec<String> {
self.routes.keys().cloned().collect()
}
pub fn auto_startup_route_ids(&self) -> Vec<String> {
let mut pairs: Vec<(String, i32)> = self
.routes
.iter()
.filter(|(_, managed)| managed.definition.auto_startup())
.map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
.collect();
pairs.sort_by_key(|(_, order)| *order);
pairs.into_iter().map(|(id, _)| id).collect()
}
pub fn shutdown_route_ids(&self) -> Vec<String> {
let mut pairs: Vec<(String, i32)> = self
.routes
.iter()
.map(|(id, managed)| (id.clone(), managed.definition.startup_order()))
.collect();
pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
pairs.into_iter().map(|(id, _)| id).collect()
}
pub fn swap_pipeline(
&self,
route_id: &str,
new_pipeline: BoxProcessor,
) -> Result<(), CamelError> {
let managed = self
.routes
.get(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
if managed.aggregate_split.is_some() {
tracing::warn!(
route_id = %route_id,
"swap_pipeline: aggregate routes with timeout do not support hot-reload of pre/post segments"
);
}
managed
.pipeline
.store(Arc::new(SyncBoxProcessor(new_pipeline)));
info!(route_id = %route_id, "Pipeline swapped atomically");
Ok(())
}
pub fn route_from_uri(&self, route_id: &str) -> Option<String> {
self.routes.get(route_id).map(|r| r.from_uri.clone())
}
pub fn get_pipeline(&self, route_id: &str) -> Option<BoxProcessor> {
self.routes
.get(route_id)
.map(|r| r.pipeline.load().0.clone())
}
async fn stop_route_internal(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self
.routes
.get_mut(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
if !handle_is_running(&managed.consumer_handle)
&& !handle_is_running(&managed.pipeline_handle)
{
return Ok(());
}
info!(route_id = %route_id, "Stopping route");
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_cancel_token.cancel();
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
if let Some(agg_svc) = &managed.agg_service {
let guard = agg_svc.lock().unwrap();
guard.force_complete_all();
}
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.pipeline_cancel_token.cancel();
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
let consumer_handle = managed.consumer_handle.take();
let pipeline_handle = managed.pipeline_handle.take();
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.channel_sender = None;
let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
match (consumer_handle, pipeline_handle) {
(Some(c), Some(p)) => {
let _ = tokio::join!(c, p);
}
(Some(c), None) => {
let _ = c.await;
}
(None, Some(p)) => {
let _ = p.await;
}
(None, None) => {}
}
})
.await;
if timeout_result.is_err() {
warn!(route_id = %route_id, "Route shutdown timed out after 30s — tasks may still be running");
}
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_cancel_token = CancellationToken::new();
managed.pipeline_cancel_token = CancellationToken::new();
info!(route_id = %route_id, "Route stopped");
Ok(())
}
}
#[async_trait::async_trait]
impl RouteController for DefaultRouteController {
async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
{
let managed = self
.routes
.get_mut(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
let consumer_running = handle_is_running(&managed.consumer_handle);
let pipeline_running = handle_is_running(&managed.pipeline_handle);
if consumer_running && pipeline_running {
return Ok(());
}
if !consumer_running && pipeline_running {
return Err(CamelError::RouteError(format!(
"Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
route_id
)));
}
if consumer_running && !pipeline_running {
return Err(CamelError::RouteError(format!(
"Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
route_id
)));
}
}
info!(route_id = %route_id, "Starting route");
let (from_uri, pipeline, concurrency) = {
let managed = self
.routes
.get(route_id)
.expect("invariant: route must exist after prior existence check");
(
managed.from_uri.clone(),
Arc::clone(&managed.pipeline),
managed.concurrency.clone(),
)
};
let crash_notifier = self.crash_notifier.clone();
let runtime_for_consumer = self.runtime.clone();
let parsed = parse_uri(&from_uri)?;
let registry = self
.registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock");
let component = registry.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(&from_uri)?;
let mut consumer = endpoint.create_consumer()?;
let consumer_concurrency = consumer.concurrency_model();
drop(registry);
let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
let consumer_cancel = managed.consumer_cancel_token.child_token();
let pipeline_cancel = managed.pipeline_cancel_token.child_token();
let tx_for_storage = tx.clone();
let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone());
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
if let Some(split) = managed.aggregate_split.as_ref() {
let (late_tx, late_rx) = mpsc::channel::<Exchange>(256);
let route_cancel_clone = pipeline_cancel.clone();
let svc = AggregatorService::new(
split.agg_config.clone(),
late_tx,
Arc::clone(&self.languages),
route_cancel_clone,
);
let agg = Arc::new(std::sync::Mutex::new(svc));
managed.agg_service = Some(Arc::clone(&agg));
let late_rx = Arc::new(tokio::sync::Mutex::new(late_rx));
let pre_pipeline = Arc::clone(&split.pre_pipeline);
let post_pipeline = Arc::clone(&split.post_pipeline);
let route_id_for_consumer = route_id.to_string();
let consumer_handle = tokio::spawn(async move {
if let Err(e) = consumer.start(consumer_ctx).await {
error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
let error_msg = e.to_string();
if let Some(tx) = crash_notifier {
let _ = tx
.send(CrashNotification {
route_id: route_id_for_consumer.clone(),
error: error_msg.clone(),
})
.await;
}
publish_runtime_failure(
runtime_for_consumer,
&route_id_for_consumer,
&error_msg,
)
.await;
}
});
let pipeline_handle = tokio::spawn(async move {
loop {
tokio::select! {
biased;
late_ex = async {
let mut rx = late_rx.lock().await;
rx.recv().await
} => {
match late_ex {
Some(ex) => {
let pipe = post_pipeline.load();
if let Err(e) = pipe.0.clone().oneshot(ex).await {
tracing::warn!(error = %e, "late exchange post-pipeline failed");
}
}
None => return,
}
}
envelope_opt = rx.recv() => {
match envelope_opt {
Some(envelope) => {
let ExchangeEnvelope { exchange, reply_tx } = envelope;
let pre_pipe = pre_pipeline.load();
let ex = match pre_pipe.0.clone().oneshot(exchange).await {
Ok(ex) => ex,
Err(e) => {
if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
continue;
}
};
let ex = {
let cloned_svc = agg.lock().unwrap().clone();
cloned_svc.oneshot(ex).await
};
match ex {
Ok(ex) => {
if !is_pending(&ex) {
let post_pipe = post_pipeline.load();
let out = post_pipe.0.clone().oneshot(ex).await;
if let Some(tx) = reply_tx { let _ = tx.send(out); }
} else if let Some(tx) = reply_tx {
let _ = tx.send(Ok(ex));
}
}
Err(e) => {
if let Some(tx) = reply_tx { let _ = tx.send(Err(e)); }
}
}
}
None => return,
}
}
_ = pipeline_cancel.cancelled() => {
{
let guard = agg.lock().unwrap();
guard.force_complete_all();
}
let mut rx_guard = late_rx.lock().await;
while let Ok(late_ex) = rx_guard.try_recv() {
let pipe = post_pipeline.load();
let _ = pipe.0.clone().oneshot(late_ex).await;
}
break;
}
}
}
});
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist");
managed.consumer_handle = Some(consumer_handle);
managed.pipeline_handle = Some(pipeline_handle);
managed.channel_sender = Some(tx_for_storage);
info!(route_id = %route_id, "Route started (aggregate with timeout)");
return Ok(());
}
let route_id_for_consumer = route_id.to_string();
let consumer_handle = tokio::spawn(async move {
if let Err(e) = consumer.start(consumer_ctx).await {
error!(route_id = %route_id_for_consumer, "Consumer error: {e}");
let error_msg = e.to_string();
if let Some(tx) = crash_notifier {
let _ = tx
.send(CrashNotification {
route_id: route_id_for_consumer.clone(),
error: error_msg.clone(),
})
.await;
}
publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
.await;
}
});
let pipeline_handle = match effective_concurrency {
ConcurrencyModel::Sequential => {
tokio::spawn(async move {
loop {
let envelope = tokio::select! {
envelope = rx.recv() => match envelope {
Some(e) => e,
None => return, },
_ = pipeline_cancel.cancelled() => {
return;
}
};
let ExchangeEnvelope { exchange, reply_tx } = envelope;
let mut pipeline = pipeline.load().0.clone();
if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
if let Some(tx) = reply_tx {
let _ = tx.send(Err(e));
}
return;
}
let result = pipeline.call(exchange).await;
if let Some(tx) = reply_tx {
let _ = tx.send(result);
} else if let Err(ref e) = result
&& !matches!(e, CamelError::Stopped)
{
error!("Pipeline error: {e}");
}
}
})
}
ConcurrencyModel::Concurrent { max } => {
let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
tokio::spawn(async move {
loop {
let envelope = tokio::select! {
envelope = rx.recv() => match envelope {
Some(e) => e,
None => return, },
_ = pipeline_cancel.cancelled() => {
return;
}
};
let ExchangeEnvelope { exchange, reply_tx } = envelope;
let pipe_ref = Arc::clone(&pipeline);
let sem = sem.clone();
let cancel = pipeline_cancel.clone();
tokio::spawn(async move {
let _permit = match &sem {
Some(s) => Some(s.acquire().await.expect("semaphore closed")),
None => None,
};
let mut pipe = pipe_ref.load().0.clone();
if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
if let Some(tx) = reply_tx {
let _ = tx.send(Err(e));
}
return;
}
let result = pipe.call(exchange).await;
if let Some(tx) = reply_tx {
let _ = tx.send(result);
} else if let Err(ref e) = result
&& !matches!(e, CamelError::Stopped)
{
error!("Pipeline error: {e}");
}
});
}
})
}
};
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_handle = Some(consumer_handle);
managed.pipeline_handle = Some(pipeline_handle);
managed.channel_sender = Some(tx_for_storage);
info!(route_id = %route_id, "Route started");
Ok(())
}
async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
self.stop_route_internal(route_id).await
}
async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
self.stop_route(route_id).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
self.start_route(route_id).await
}
async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self
.routes
.get_mut(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
let consumer_running = handle_is_running(&managed.consumer_handle);
let pipeline_running = handle_is_running(&managed.pipeline_handle);
if !consumer_running || !pipeline_running {
return Err(CamelError::RouteError(format!(
"Cannot suspend route '{}' with execution lifecycle {}",
route_id,
inferred_lifecycle_label(managed)
)));
}
info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_cancel_token.cancel();
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
let consumer_handle = managed.consumer_handle.take();
let timeout_result = tokio::time::timeout(Duration::from_secs(30), async {
if let Some(handle) = consumer_handle {
let _ = handle.await;
}
})
.await;
if timeout_result.is_err() {
warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
}
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_cancel_token = CancellationToken::new();
info!(route_id = %route_id, "Route suspended (pipeline still running)");
Ok(())
}
async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self
.routes
.get(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
let consumer_running = handle_is_running(&managed.consumer_handle);
let pipeline_running = handle_is_running(&managed.pipeline_handle);
if consumer_running || !pipeline_running {
return Err(CamelError::RouteError(format!(
"Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
route_id,
inferred_lifecycle_label(managed)
)));
}
let sender = managed.channel_sender.clone().ok_or_else(|| {
CamelError::RouteError("Suspended route has no channel sender".into())
})?;
let from_uri = managed.from_uri.clone();
info!(route_id = %route_id, "Resuming route (spawning consumer only)");
let parsed = parse_uri(&from_uri)?;
let registry = self
.registry
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock");
let component = registry.get_or_err(&parsed.scheme)?;
let endpoint = component.create_endpoint(&from_uri)?;
let mut consumer = endpoint.create_consumer()?;
drop(registry);
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
let consumer_cancel = managed.consumer_cancel_token.child_token();
let crash_notifier = self.crash_notifier.clone();
let runtime_for_consumer = self.runtime.clone();
let consumer_ctx = ConsumerContext::new(sender, consumer_cancel.clone());
let route_id_for_consumer = route_id.to_string();
let consumer_handle = tokio::spawn(async move {
if let Err(e) = consumer.start(consumer_ctx).await {
error!(route_id = %route_id_for_consumer, "Consumer error on resume: {e}");
let error_msg = e.to_string();
if let Some(tx) = crash_notifier {
let _ = tx
.send(CrashNotification {
route_id: route_id_for_consumer.clone(),
error: error_msg.clone(),
})
.await;
}
publish_runtime_failure(runtime_for_consumer, &route_id_for_consumer, &error_msg)
.await;
}
});
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_handle = Some(consumer_handle);
info!(route_id = %route_id, "Route resumed");
Ok(())
}
async fn start_all_routes(&mut self) -> Result<(), CamelError> {
let route_ids: Vec<String> = {
let mut pairs: Vec<_> = self
.routes
.iter()
.filter(|(_, r)| r.definition.auto_startup())
.map(|(id, r)| (id.clone(), r.definition.startup_order()))
.collect();
pairs.sort_by_key(|(_, order)| *order);
pairs.into_iter().map(|(id, _)| id).collect()
};
info!("Starting {} auto-startup routes", route_ids.len());
let mut errors: Vec<String> = Vec::new();
for route_id in route_ids {
if let Err(e) = self.start_route(&route_id).await {
errors.push(format!("Route '{}': {}", route_id, e));
}
}
if !errors.is_empty() {
return Err(CamelError::RouteError(format!(
"Failed to start routes: {}",
errors.join(", ")
)));
}
info!("All auto-startup routes started");
Ok(())
}
async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
let route_ids: Vec<String> = {
let mut pairs: Vec<_> = self
.routes
.iter()
.map(|(id, r)| (id.clone(), r.definition.startup_order()))
.collect();
pairs.sort_by_key(|(_, order)| std::cmp::Reverse(*order));
pairs.into_iter().map(|(id, _)| id).collect()
};
info!("Stopping {} routes", route_ids.len());
for route_id in route_ids {
let _ = self.stop_route(&route_id).await;
}
info!("All routes stopped");
Ok(())
}
}
#[async_trait::async_trait]
impl RouteControllerInternal for DefaultRouteController {
fn add_route(&mut self, def: RouteDefinition) -> Result<(), CamelError> {
DefaultRouteController::add_route(self, def)
}
fn swap_pipeline(&self, route_id: &str, pipeline: BoxProcessor) -> Result<(), CamelError> {
DefaultRouteController::swap_pipeline(self, route_id, pipeline)
}
fn route_from_uri(&self, route_id: &str) -> Option<String> {
DefaultRouteController::route_from_uri(self, route_id)
}
fn set_error_handler(&mut self, config: ErrorHandlerConfig) {
DefaultRouteController::set_error_handler(self, config)
}
fn set_self_ref(&mut self, self_ref: Arc<Mutex<dyn RouteController>>) {
DefaultRouteController::set_self_ref(self, self_ref)
}
fn set_runtime_handle(&mut self, runtime: Arc<dyn RuntimeHandle>) {
DefaultRouteController::set_runtime_handle(self, runtime)
}
fn route_count(&self) -> usize {
DefaultRouteController::route_count(self)
}
fn in_flight_count(&self, route_id: &str) -> Option<u64> {
DefaultRouteController::in_flight_count(self, route_id)
}
fn route_exists(&self, route_id: &str) -> bool {
DefaultRouteController::route_exists(self, route_id)
}
fn route_ids(&self) -> Vec<String> {
DefaultRouteController::route_ids(self)
}
fn auto_startup_route_ids(&self) -> Vec<String> {
DefaultRouteController::auto_startup_route_ids(self)
}
fn shutdown_route_ids(&self) -> Vec<String> {
DefaultRouteController::shutdown_route_ids(self)
}
fn set_tracer_config(&mut self, config: &TracerConfig) {
DefaultRouteController::set_tracer_config(self, config)
}
fn compile_route_definition(&self, def: RouteDefinition) -> Result<BoxProcessor, CamelError> {
DefaultRouteController::compile_route_definition(self, def)
}
fn remove_route(&mut self, route_id: &str) -> Result<(), CamelError> {
DefaultRouteController::remove_route(self, route_id)
}
async fn start_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
DefaultRouteController::start_route(self, route_id).await
}
async fn stop_route_reload(&mut self, route_id: &str) -> Result<(), CamelError> {
DefaultRouteController::stop_route(self, route_id).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::shared::components::domain::Registry;
fn build_controller() -> DefaultRouteController {
DefaultRouteController::new(Arc::new(std::sync::Mutex::new(Registry::new())))
}
fn build_controller_with_components() -> DefaultRouteController {
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
{
let mut guard = registry.lock().expect("registry lock");
guard.register(camel_component_timer::TimerComponent::new());
guard.register(camel_component_mock::MockComponent::new());
guard.register(camel_component_log::LogComponent::new());
}
DefaultRouteController::new(registry)
}
fn set_self_ref(controller: &mut DefaultRouteController) {
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let other: Arc<Mutex<dyn RouteController>> =
Arc::new(Mutex::new(DefaultRouteController::new(registry)));
controller.set_self_ref(other);
}
fn register_simple_language(controller: &mut DefaultRouteController) {
controller.languages.lock().expect("languages lock").insert(
"simple".into(),
Arc::new(camel_language_simple::SimpleLanguage),
);
}
#[test]
fn test_route_controller_internal_is_object_safe() {
let _: Option<Box<dyn RouteControllerInternal>> = None;
}
#[test]
fn helper_functions_cover_non_async_branches() {
let managed = ManagedRoute {
definition: RouteDefinition::new("timer:a", vec![])
.with_route_id("r")
.to_info(),
from_uri: "timer:a".into(),
pipeline: Arc::new(ArcSwap::from_pointee(SyncBoxProcessor(BoxProcessor::new(
IdentityProcessor,
)))),
concurrency: None,
consumer_handle: None,
pipeline_handle: None,
consumer_cancel_token: CancellationToken::new(),
pipeline_cancel_token: CancellationToken::new(),
channel_sender: None,
in_flight: None,
aggregate_split: None,
agg_service: None,
};
assert_eq!(inferred_lifecycle_label(&managed), "Stopped");
assert!(!handle_is_running(&managed.consumer_handle));
let cmd = runtime_failure_command("route-x", "boom");
match cmd {
RuntimeCommand::FailRoute {
route_id, error, ..
} => {
assert_eq!(route_id, "route-x");
assert_eq!(error, "boom");
}
_ => panic!("expected FailRoute command"),
}
}
#[test]
fn add_route_detects_duplicates() {
let mut controller = build_controller();
set_self_ref(&mut controller);
controller
.add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
.expect("add route");
let dup_err = controller
.add_route(RouteDefinition::new("timer:tick", vec![]).with_route_id("r1"))
.expect_err("duplicate must fail");
assert!(dup_err.to_string().contains("already exists"));
}
#[test]
fn route_introspection_and_ordering_helpers_work() {
let mut controller = build_controller();
set_self_ref(&mut controller);
controller
.add_route(
RouteDefinition::new("timer:a", vec![])
.with_route_id("a")
.with_startup_order(20),
)
.unwrap();
controller
.add_route(
RouteDefinition::new("timer:b", vec![])
.with_route_id("b")
.with_startup_order(10),
)
.unwrap();
controller
.add_route(
RouteDefinition::new("timer:c", vec![])
.with_route_id("c")
.with_auto_startup(false)
.with_startup_order(5),
)
.unwrap();
assert_eq!(controller.route_count(), 3);
assert_eq!(controller.route_from_uri("a"), Some("timer:a".into()));
assert!(controller.route_ids().contains(&"a".to_string()));
assert_eq!(
controller.auto_startup_route_ids(),
vec!["b".to_string(), "a".to_string()]
);
assert_eq!(
controller.shutdown_route_ids(),
vec!["a".to_string(), "b".to_string(), "c".to_string()]
);
}
#[test]
fn swap_pipeline_and_remove_route_behaviors() {
let mut controller = build_controller();
set_self_ref(&mut controller);
controller
.add_route(RouteDefinition::new("timer:a", vec![]).with_route_id("swap"))
.unwrap();
controller
.swap_pipeline("swap", BoxProcessor::new(IdentityProcessor))
.unwrap();
assert!(controller.get_pipeline("swap").is_some());
controller.remove_route("swap").unwrap();
assert_eq!(controller.route_count(), 0);
let err = controller
.remove_route("swap")
.expect_err("missing route must fail");
assert!(err.to_string().contains("not found"));
}
#[test]
fn resolve_steps_covers_declarative_and_eip_variants() {
use camel_api::LanguageExpressionDef;
use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
register_simple_language(&mut controller);
let expr = |source: &str| LanguageExpressionDef {
language: "simple".into(),
source: source.into(),
};
let steps = vec![
BuilderStep::To("mock:out".into()),
BuilderStep::Stop,
BuilderStep::Log {
level: camel_processor::LogLevel::Info,
message: "log".into(),
},
BuilderStep::DeclarativeSetHeader {
key: "k".into(),
value: ValueSourceDef::Literal(Value::String("v".into())),
},
BuilderStep::DeclarativeSetHeader {
key: "k2".into(),
value: ValueSourceDef::Expression(expr("${body}")),
},
BuilderStep::DeclarativeSetBody {
value: ValueSourceDef::Expression(expr("${body}")),
},
BuilderStep::DeclarativeFilter {
predicate: expr("${body} != null"),
steps: vec![BuilderStep::Stop],
},
BuilderStep::DeclarativeChoice {
whens: vec![
crate::lifecycle::application::route_definition::DeclarativeWhenStep {
predicate: expr("${body} == 'x'"),
steps: vec![BuilderStep::Stop],
},
],
otherwise: Some(vec![BuilderStep::Stop]),
},
BuilderStep::DeclarativeScript {
expression: expr("${body}"),
},
BuilderStep::Split {
config: SplitterConfig::new(split_body_lines())
.aggregation(AggregationStrategy::CollectAll),
steps: vec![BuilderStep::Stop],
},
BuilderStep::DeclarativeSplit {
expression: expr("${body}"),
aggregation: AggregationStrategy::Original,
parallel: false,
parallel_limit: Some(2),
stop_on_exception: true,
steps: vec![BuilderStep::Stop],
},
BuilderStep::Aggregate {
config: camel_api::AggregatorConfig::correlate_by("id")
.complete_when_size(1)
.build(),
},
BuilderStep::Filter {
predicate: Arc::new(|_| true),
steps: vec![BuilderStep::Stop],
},
BuilderStep::Choice {
whens: vec![crate::lifecycle::application::route_definition::WhenStep {
predicate: Arc::new(|_| true),
steps: vec![BuilderStep::Stop],
}],
otherwise: Some(vec![BuilderStep::Stop]),
},
BuilderStep::WireTap {
uri: "mock:tap".into(),
},
BuilderStep::Multicast {
steps: vec![
BuilderStep::To("mock:m1".into()),
BuilderStep::To("mock:m2".into()),
],
config: camel_api::MulticastConfig::new(),
},
BuilderStep::DeclarativeLog {
level: camel_processor::LogLevel::Info,
message: ValueSourceDef::Expression(expr("${body}")),
},
BuilderStep::Throttle {
config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(100)),
steps: vec![BuilderStep::To("mock:t".into())],
},
BuilderStep::LoadBalance {
config: camel_api::LoadBalancerConfig::round_robin(),
steps: vec![
BuilderStep::To("mock:l1".into()),
BuilderStep::To("mock:l2".into()),
],
},
BuilderStep::DynamicRouter {
config: camel_api::DynamicRouterConfig::new(Arc::new(|_| Some("mock:dr".into()))),
},
BuilderStep::RoutingSlip {
config: camel_api::RoutingSlipConfig::new(Arc::new(|_| Some("mock:rs".into()))),
},
];
let producer_ctx = ProducerContext::new();
let resolved = controller
.resolve_steps(steps, &producer_ctx, &controller.registry)
.expect("resolve should succeed");
assert!(!resolved.is_empty());
}
#[test]
fn resolve_steps_script_requires_mutating_language_support() {
use camel_api::LanguageExpressionDef;
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
register_simple_language(&mut controller);
let steps = vec![BuilderStep::Script {
language: "simple".into(),
script: "${body}".into(),
}];
let err = controller
.resolve_steps(steps, &ProducerContext::new(), &controller.registry)
.expect_err("simple script should fail for mutating expression");
assert!(err.to_string().contains("does not support"));
let bean_missing = vec![BuilderStep::Bean {
name: "unknown".into(),
method: "run".into(),
}];
let bean_err = controller
.resolve_steps(bean_missing, &ProducerContext::new(), &controller.registry)
.expect_err("missing bean must fail");
assert!(bean_err.to_string().contains("Bean not found"));
let bad_declarative = vec![BuilderStep::DeclarativeScript {
expression: LanguageExpressionDef {
language: "unknown".into(),
source: "x".into(),
},
}];
let lang_err = controller
.resolve_steps(
bad_declarative,
&ProducerContext::new(),
&controller.registry,
)
.expect_err("unknown language must fail");
assert!(lang_err.to_string().contains("not registered"));
}
#[tokio::test]
async fn lifecycle_methods_report_missing_routes() {
let mut controller = build_controller();
assert!(controller.start_route("missing").await.is_err());
assert!(controller.stop_route("missing").await.is_err());
assert!(controller.suspend_route("missing").await.is_err());
assert!(controller.resume_route("missing").await.is_err());
}
#[tokio::test]
async fn start_stop_route_happy_path_with_timer_and_mock() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let route = RouteDefinition::new(
"timer:tick?period=10&repeatCount=1",
vec![BuilderStep::To("mock:out".into())],
)
.with_route_id("rt-1");
controller.add_route(route).unwrap();
controller.start_route("rt-1").await.unwrap();
tokio::time::sleep(Duration::from_millis(40)).await;
controller.stop_route("rt-1").await.unwrap();
controller.remove_route("rt-1").unwrap();
}
#[tokio::test]
async fn suspend_resume_and_restart_cover_execution_transitions() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let route = RouteDefinition::new(
"timer:tick?period=30",
vec![BuilderStep::To("mock:out".into())],
)
.with_route_id("rt-2");
controller.add_route(route).unwrap();
controller.start_route("rt-2").await.unwrap();
controller.suspend_route("rt-2").await.unwrap();
controller.resume_route("rt-2").await.unwrap();
controller.restart_route("rt-2").await.unwrap();
controller.stop_route("rt-2").await.unwrap();
}
#[tokio::test]
async fn remove_route_rejects_running_route() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let route = RouteDefinition::new(
"timer:tick?period=25",
vec![BuilderStep::To("mock:out".into())],
)
.with_route_id("rt-running");
controller.add_route(route).unwrap();
controller.start_route("rt-running").await.unwrap();
let err = controller
.remove_route("rt-running")
.expect_err("running route removal must fail");
assert!(err.to_string().contains("must be stopped before removal"));
controller.stop_route("rt-running").await.unwrap();
controller.remove_route("rt-running").unwrap();
}
#[tokio::test]
async fn start_route_on_suspended_state_returns_guidance_error() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let route = RouteDefinition::new(
"timer:tick?period=40",
vec![BuilderStep::To("mock:out".into())],
)
.with_route_id("rt-suspend");
controller.add_route(route).unwrap();
controller.start_route("rt-suspend").await.unwrap();
controller.suspend_route("rt-suspend").await.unwrap();
let err = controller
.start_route("rt-suspend")
.await
.expect_err("start from suspended must fail");
assert!(err.to_string().contains("use resume_route"));
controller.resume_route("rt-suspend").await.unwrap();
controller.stop_route("rt-suspend").await.unwrap();
}
#[tokio::test]
async fn suspend_and_resume_validate_execution_state() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
controller
.add_route(
RouteDefinition::new("timer:tick?period=50", vec![]).with_route_id("rt-state"),
)
.unwrap();
let suspend_err = controller
.suspend_route("rt-state")
.await
.expect_err("suspend before start must fail");
assert!(suspend_err.to_string().contains("Cannot suspend route"));
controller.start_route("rt-state").await.unwrap();
let resume_err = controller
.resume_route("rt-state")
.await
.expect_err("resume while started must fail");
assert!(resume_err.to_string().contains("Cannot resume route"));
controller.stop_route("rt-state").await.unwrap();
}
#[tokio::test]
async fn concurrent_concurrency_override_path_executes() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let route = RouteDefinition::new(
"timer:tick?period=10&repeatCount=2",
vec![BuilderStep::To("mock:out".into())],
)
.with_route_id("rt-concurrent")
.with_concurrency(ConcurrencyModel::Concurrent { max: Some(2) });
controller.add_route(route).unwrap();
controller.start_route("rt-concurrent").await.unwrap();
tokio::time::sleep(Duration::from_millis(50)).await;
controller.stop_route("rt-concurrent").await.unwrap();
}
#[tokio::test]
async fn add_route_with_circuit_breaker_and_error_handler_compiles() {
use camel_api::circuit_breaker::CircuitBreakerConfig;
use camel_api::error_handler::ErrorHandlerConfig;
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let route = RouteDefinition::new("timer:tick?period=25", vec![BuilderStep::Stop])
.with_route_id("rt-eh")
.with_circuit_breaker(CircuitBreakerConfig::new())
.with_error_handler(ErrorHandlerConfig::dead_letter_channel("log:dlq"));
controller
.add_route(route)
.expect("route with layers should compile");
controller.start_route("rt-eh").await.unwrap();
controller.stop_route("rt-eh").await.unwrap();
}
#[tokio::test]
async fn compile_and_swap_errors_for_missing_route() {
let mut controller = build_controller_with_components();
set_self_ref(&mut controller);
let compiled = controller
.compile_route_definition(
RouteDefinition::new("timer:tick?period=10", vec![BuilderStep::Stop])
.with_route_id("compiled"),
)
.expect("compile should work");
let err = controller
.swap_pipeline("nope", compiled)
.expect_err("missing route swap must fail");
assert!(err.to_string().contains("not found"));
}
}