use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tower::ServiceExt;
use tracing::warn;
use camel_api::aggregator::AggregatorConfig;
use camel_api::{CamelError, Exchange, RuntimeCommand, RuntimeHandle};
use camel_component_api::{ConcurrencyModel, consumer::ExchangeEnvelope};
use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
use crate::lifecycle::adapters::pipeline_runtime::SharedPipeline;
use crate::lifecycle::adapters::route_runtime_state;
use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinitionInfo};
#[derive(Debug, Clone)]
pub struct CrashNotification {
pub route_id: String,
pub error: String,
}
#[cfg(test)]
type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
#[cfg(test)]
static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
#[cfg(test)]
pub(super) fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
*START_ROUTE_EVENT_HOOK
.lock()
.expect("start route event hook lock") = hook;
}
#[cfg(test)]
pub(super) fn emit_start_route_event(event: &'static str) {
if let Some(hook) = START_ROUTE_EVENT_HOOK
.lock()
.expect("start route event hook lock")
.as_ref()
{
hook(event);
}
}
#[derive(Clone)]
pub(super) struct AggregateSplitInfo {
pub(super) pre_pipeline: SharedPipeline,
pub(super) agg_config: AggregatorConfig,
pub(super) post_pipeline: SharedPipeline,
}
pub(super) struct ManagedRoute {
pub(super) definition: RouteDefinitionInfo,
pub(super) from_uri: String,
pub(super) pipeline: SharedPipeline,
pub(super) concurrency: Option<ConcurrencyModel>,
pub(super) consumer_handle: Option<JoinHandle<()>>,
pub(super) pipeline_handle: Option<JoinHandle<()>>,
pub(super) consumer_cancel_token: CancellationToken,
pub(super) pipeline_cancel_token: CancellationToken,
pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
pub(super) aggregate_split: Option<AggregateSplitInfo>,
pub(super) agg_service: Option<Arc<std::sync::Mutex<AggregatorService>>>,
pub(super) compiled: route_runtime_state::CompiledRoute,
}
pub(crate) struct PreparedRoute {
pub(crate) route_id: String,
pub(super) managed: ManagedRoute,
}
pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
handle.as_ref().is_some_and(|h| !h.is_finished())
}
pub(super) fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
match (
handle_is_running(&managed.consumer_handle),
handle_is_running(&managed.pipeline_handle),
) {
(true, true) => "Started",
(false, true) => "Suspended",
(true, false) => "Stopping",
(false, false) => "Stopped",
}
}
pub(super) fn find_top_level_aggregate_requiring_split(
steps: &[BuilderStep],
) -> Option<(usize, AggregatorConfig)> {
for (i, step) in steps.iter().enumerate() {
if let BuilderStep::Aggregate { config } = step {
if has_timeout_condition(&config.completion) || config.force_completion_on_stop {
return Some((i, config.clone()));
}
break;
}
}
None
}
pub(super) fn is_pending(ex: &Exchange) -> bool {
ex.property("CamelAggregatorPending")
.and_then(|v| v.as_bool())
.unwrap_or(false)
}
pub(super) async fn ready_with_backoff(
pipeline: &mut camel_api::BoxProcessor,
cancel: &CancellationToken,
) -> Result<(), CamelError> {
loop {
match pipeline.ready().await {
Ok(_) => return Ok(()),
Err(CamelError::CircuitOpen(ref msg)) => {
warn!("Circuit open, backing off: {msg}");
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
continue;
}
_ = cancel.cancelled() => {
return Err(CamelError::CircuitOpen(msg.clone()));
}
}
}
Err(e) => {
tracing::error!("Pipeline not ready: {e}");
return Err(e);
}
}
}
}
pub(super) fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
RuntimeCommand::FailRoute {
route_id: route_id.to_string(),
error: error.to_string(),
command_id: format!("ctrl-fail-{route_id}-{stamp}"),
causation_id: None,
}
}
pub(super) async fn publish_runtime_failure(
runtime: Option<std::sync::Weak<dyn RuntimeHandle>>,
route_id: &str,
error: &str,
) {
let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
return;
};
let command = runtime_failure_command(route_id, error);
if let Err(runtime_error) = runtime.execute(command).await {
warn!(
route_id = %route_id,
error = %runtime_error,
"failed to synchronize route crash with runtime projection"
);
}
}