use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tower::Service;
use tracing::{error, info, warn};
use camel_api::{CamelError, NoOpMetrics};
use camel_component_api::{ConcurrencyModel, ConsumerContext, consumer::ExchangeEnvelope};
use crate::lifecycle::adapters::consumer_management;
use crate::lifecycle::adapters::controller_component_context::ControllerComponentContext;
use crate::lifecycle::adapters::route_controller::DefaultRouteController;
#[cfg(test)]
use crate::lifecycle::adapters::route_helpers::emit_start_route_event;
use crate::lifecycle::adapters::route_helpers::{
handle_is_running, inferred_lifecycle_label, ready_with_backoff,
};
use crate::lifecycle::adapters::route_registry::DEFAULT_SHUTDOWN_TIMEOUT;
#[async_trait::async_trait]
impl camel_api::RouteController for DefaultRouteController {
async fn start_route(&mut self, route_id: &str) -> Result<(), CamelError> {
{
let managed = self
.routes
.get_mut(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
let consumer_running = handle_is_running(&managed.consumer_handle);
let pipeline_running = handle_is_running(&managed.pipeline_handle);
if consumer_running && pipeline_running {
return Ok(());
}
if !consumer_running && pipeline_running {
return Err(CamelError::RouteError(format!(
"Route '{}' is suspended; use resume_route() to resume, or stop_route() then start_route() for full restart",
route_id
)));
}
if consumer_running && !pipeline_running {
return Err(CamelError::RouteError(format!(
"Route '{}' has inconsistent execution state; stop_route() then retry start_route()",
route_id
)));
}
}
info!(route_id = %route_id, "Starting route");
let (from_uri, pipeline, concurrency) = {
let managed = self
.routes
.get(route_id)
.expect("invariant: route must exist after prior existence check"); (
managed.from_uri.clone(),
Arc::clone(&managed.pipeline),
managed.concurrency.clone(),
)
};
let crash_notifier = self.crash_notifier.clone();
let runtime_for_consumer = self.runtime.clone();
let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
Arc::clone(&self.registry),
Arc::clone(&self.languages),
self.tracer_metrics
.clone()
.unwrap_or_else(|| Arc::new(NoOpMetrics)),
Arc::clone(&self.platform_service),
self.health_registry(),
Some(route_id.to_string()),
));
let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
Arc::clone(&consumer_component_ctx) as Arc<_>;
let (mut consumer, consumer_concurrency) = consumer_management::create_route_consumer(
consumer_rt,
&self.registry,
&from_uri,
consumer_component_ctx.as_ref(),
)?;
let effective_concurrency = concurrency.unwrap_or(consumer_concurrency);
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
if let (Some(sp_config), Some(authenticator)) = (
managed.compiled.security_policy.as_ref(),
managed.compiled.security_authenticator.as_ref(),
) {
use camel_component_api::SecurityContext;
let sec_ctx =
SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
consumer.set_security_context(sec_ctx);
}
let (tx, mut rx) = mpsc::channel::<ExchangeEnvelope>(256);
let consumer_cancel = managed.consumer_cancel_token.child_token();
let pipeline_cancel = managed.pipeline_cancel_token.child_token();
let tx_for_storage = tx.clone();
let consumer_ctx = ConsumerContext::new(tx, consumer_cancel.clone(), route_id.to_string());
let split_clone = managed.aggregate_split.clone();
if let Some(split) = split_clone {
return self
.start_aggregate_route(
route_id,
split,
consumer,
consumer_ctx,
rx,
crash_notifier,
runtime_for_consumer,
tx_for_storage,
pipeline_cancel,
)
.await;
}
let pipeline_handle = match effective_concurrency {
ConcurrencyModel::Sequential => {
tokio::spawn(async move {
loop {
let envelope = tokio::select! {
envelope = rx.recv() => match envelope {
Some(e) => e,
None => return, },
_ = pipeline_cancel.cancelled() => {
return;
}
};
let ExchangeEnvelope { exchange, reply_tx } = envelope;
let mut pipeline = pipeline.load().clone_inner();
if let Err(e) = ready_with_backoff(&mut pipeline, &pipeline_cancel).await {
if let Some(tx) = reply_tx {
let _ = tx.send(Err(e));
}
return;
}
let result = pipeline.call(exchange).await;
if let Some(tx) = reply_tx {
let _ = tx.send(result);
} else if let Err(ref e) = result
&& !matches!(e, CamelError::Stopped)
{
error!("Pipeline error: {e}");
}
}
})
}
ConcurrencyModel::Concurrent { max } => {
let sem = max.map(|n| Arc::new(tokio::sync::Semaphore::new(n)));
tokio::spawn(async move {
loop {
let envelope = tokio::select! {
envelope = rx.recv() => match envelope {
Some(e) => e,
None => return, },
_ = pipeline_cancel.cancelled() => {
return;
}
};
let ExchangeEnvelope { exchange, reply_tx } = envelope;
let pipe_ref = Arc::clone(&pipeline);
let sem = sem.clone();
let cancel = pipeline_cancel.clone();
tokio::spawn(async move {
let _permit = match &sem {
Some(s) => Some(s.acquire().await.expect("semaphore closed")), None => None,
};
let mut pipe = pipe_ref.load().clone_inner();
if let Err(e) = ready_with_backoff(&mut pipe, &cancel).await {
if let Some(tx) = reply_tx {
let _ = tx.send(Err(e));
}
return;
}
let result = pipe.call(exchange).await;
if let Some(tx) = reply_tx {
let _ = tx.send(result);
} else if let Err(ref e) = result
&& !matches!(e, CamelError::Stopped)
{
error!("Pipeline error: {e}");
}
});
}
})
}
};
#[cfg(test)]
emit_start_route_event("pipeline_spawned");
let consumer_handle = consumer_management::spawn_consumer_task(
route_id.to_string(),
consumer,
consumer_ctx,
crash_notifier,
runtime_for_consumer,
false,
);
#[cfg(test)]
emit_start_route_event("consumer_spawned");
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
managed.pipeline_handle = Some(pipeline_handle);
managed.channel_sender = Some(tx_for_storage);
info!(route_id = %route_id, "Route started");
self.health_registry().mark_route_started(route_id);
Ok(())
}
async fn stop_route(&mut self, route_id: &str) -> Result<(), CamelError> {
self.stop_route_internal(route_id).await?;
self.health_registry().mark_route_stopped(route_id);
Ok(())
}
async fn restart_route(&mut self, route_id: &str) -> Result<(), CamelError> {
self.stop_route(route_id).await?;
tokio::time::sleep(Duration::from_millis(100)).await;
self.start_route(route_id).await
}
async fn suspend_route(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self
.routes
.get_mut(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
let consumer_running = handle_is_running(&managed.consumer_handle);
let pipeline_running = handle_is_running(&managed.pipeline_handle);
if !consumer_running || !pipeline_running {
return Err(CamelError::RouteError(format!(
"Cannot suspend route '{}' with execution lifecycle {}",
route_id,
inferred_lifecycle_label(managed)
)));
}
info!(route_id = %route_id, "Suspending route (consumer only, keeping pipeline)");
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check"); managed.consumer_cancel_token.cancel();
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check"); let consumer_handle = managed.consumer_handle.take();
let timeout_result = tokio::time::timeout(DEFAULT_SHUTDOWN_TIMEOUT, async {
if let Some(handle) = consumer_handle {
let _ = handle.await;
}
})
.await;
if timeout_result.is_err() {
warn!(route_id = %route_id, "Consumer shutdown timed out during suspend");
}
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
managed.consumer_cancel_token = CancellationToken::new();
info!(route_id = %route_id, "Route suspended (pipeline still running)");
self.health_registry().mark_route_stopped(route_id);
Ok(())
}
async fn resume_route(&mut self, route_id: &str) -> Result<(), CamelError> {
let managed = self
.routes
.get(route_id)
.ok_or_else(|| CamelError::RouteError(format!("Route '{}' not found", route_id)))?;
let consumer_running = handle_is_running(&managed.consumer_handle);
let pipeline_running = handle_is_running(&managed.pipeline_handle);
if consumer_running || !pipeline_running {
return Err(CamelError::RouteError(format!(
"Cannot resume route '{}' with execution lifecycle {} (expected Suspended)",
route_id,
inferred_lifecycle_label(managed)
)));
}
let sender = managed.channel_sender.clone().ok_or_else(|| {
CamelError::RouteError("Suspended route has no channel sender".into())
})?;
let from_uri = managed.from_uri.clone();
info!(route_id = %route_id, "Resuming route (spawning consumer only)");
let consumer_component_ctx = Arc::new(ControllerComponentContext::new(
Arc::clone(&self.registry),
Arc::clone(&self.languages),
self.tracer_metrics
.clone()
.unwrap_or_else(|| Arc::new(NoOpMetrics)),
Arc::clone(&self.platform_service),
self.health_registry(),
Some(route_id.to_string()),
));
let consumer_rt: Arc<dyn camel_component_api::RuntimeObservability> =
Arc::clone(&consumer_component_ctx) as Arc<_>;
let (mut consumer, _) = consumer_management::create_route_consumer(
consumer_rt,
&self.registry,
&from_uri,
consumer_component_ctx.as_ref(),
)?;
let managed = self
.routes
.get(route_id)
.expect("invariant: route must exist after prior existence check"); if let (Some(sp_config), Some(authenticator)) = (
managed.compiled.security_policy.as_ref(),
managed.compiled.security_authenticator.as_ref(),
) {
use camel_component_api::SecurityContext;
let sec_ctx =
SecurityContext::from_arc(Arc::clone(&sp_config.policy), Arc::clone(authenticator));
consumer.set_security_context(sec_ctx);
}
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check");
let consumer_cancel = managed.consumer_cancel_token.child_token();
let crash_notifier = self.crash_notifier.clone();
let runtime_for_consumer = self.runtime.clone();
let consumer_ctx =
ConsumerContext::new(sender, consumer_cancel.clone(), route_id.to_string());
let consumer_handle = consumer_management::spawn_consumer_task(
route_id.to_string(),
consumer,
consumer_ctx,
crash_notifier,
runtime_for_consumer,
true,
);
let managed = self
.routes
.get_mut(route_id)
.expect("invariant: route must exist after prior existence check"); managed.consumer_handle = Some(consumer_handle);
info!(route_id = %route_id, "Route resumed");
self.health_registry().mark_route_started(route_id);
Ok(())
}
async fn start_all_routes(&mut self) -> Result<(), CamelError> {
let route_ids: Vec<String> = {
let pairs = self.routes.auto_startup_sorted();
pairs.into_iter().map(|(id, _)| id).collect()
};
info!("Starting {} auto-startup routes", route_ids.len());
let mut errors: Vec<String> = Vec::new();
for route_id in route_ids {
if let Err(e) = self.start_route(&route_id).await {
errors.push(format!("Route '{}': {}", route_id, e));
}
}
if !errors.is_empty() {
return Err(CamelError::RouteError(format!(
"Failed to start routes: {}",
errors.join(", ")
)));
}
info!("All auto-startup routes started");
Ok(())
}
async fn stop_all_routes(&mut self) -> Result<(), CamelError> {
let route_ids: Vec<String> = {
let pairs = self.routes.shutdown_sorted();
pairs.into_iter().map(|(id, _)| id).collect()
};
info!("Stopping {} routes", route_ids.len());
for route_id in route_ids {
let _ = self.stop_route(&route_id).await;
}
info!("All routes stopped");
Ok(())
}
}