use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tower::ServiceExt;
use tracing::warn;
use camel_api::aggregator::AggregatorConfig;
use camel_api::{CamelError, Exchange, ResequencePolicyConfig, RuntimeCommand, RuntimeHandle};
use camel_component_api::{ConcurrencyModel, consumer::ExchangeEnvelope};
use camel_processor::aggregator::{AggregatorService, has_timeout_condition};
use crate::lifecycle::adapters::pipeline_runtime::SharedPipeline;
use crate::lifecycle::adapters::route_runtime_state;
use crate::lifecycle::adapters::step_compilers::CompiledStep;
use crate::lifecycle::application::route_definition::{BuilderStep, RouteDefinitionInfo};
pub(crate) fn collect_lifecycle(steps: &[CompiledStep]) -> Vec<Arc<dyn camel_api::StepLifecycle>> {
let mut out = Vec::new();
for step in steps {
match step {
CompiledStep::Process { lifecycle, .. } => {
if let Some(lc) = lifecycle {
out.push(Arc::clone(lc));
}
}
CompiledStep::Segment { lifecycle, .. } => {
if let Some(lcs) = lifecycle {
out.extend(lcs.iter().map(Arc::clone));
}
}
CompiledStep::Stop => {}
}
}
out
}
#[derive(Debug, Clone)]
pub struct CrashNotification {
pub route_id: String,
pub error: String,
}
#[cfg(test)]
type StartRouteEventHook = Arc<dyn Fn(&'static str) + Send + Sync + 'static>;
#[cfg(test)]
static START_ROUTE_EVENT_HOOK: std::sync::LazyLock<std::sync::Mutex<Option<StartRouteEventHook>>> =
std::sync::LazyLock::new(|| std::sync::Mutex::new(None));
#[cfg(test)]
pub(super) fn set_start_route_event_hook(hook: Option<StartRouteEventHook>) {
*START_ROUTE_EVENT_HOOK
.lock()
.expect("start route event hook lock") = hook;
}
#[cfg(test)]
pub(super) fn emit_start_route_event(event: &'static str) {
if let Some(hook) = START_ROUTE_EVENT_HOOK
.lock()
.expect("start route event hook lock")
.as_ref()
{
hook(event);
}
}
#[derive(Clone)]
pub(crate) struct AggregateSplitInfo {
pub(super) pre_pipeline: SharedPipeline,
pub(super) agg_config: AggregatorConfig,
pub(super) post_pipeline: SharedPipeline,
}
pub(super) struct ManagedRoute {
pub(super) definition: RouteDefinitionInfo,
pub(super) from_uri: String,
pub(super) pipeline: SharedPipeline,
pub(super) concurrency: Option<ConcurrencyModel>,
pub(super) consumer_handle: Option<JoinHandle<()>>,
pub(super) pipeline_handle: Option<JoinHandle<()>>,
pub(super) consumer_cancel_token: CancellationToken,
pub(super) pipeline_cancel_token: CancellationToken,
pub(super) channel_sender: Option<mpsc::Sender<ExchangeEnvelope>>,
pub(super) in_flight: Option<Arc<std::sync::atomic::AtomicU64>>,
pub(super) aggregate_split: Option<AggregateSplitInfo>,
pub(super) agg_service: Option<Arc<AggregatorService>>,
pub(super) compiled: route_runtime_state::CompiledRoute,
}
#[derive(Debug)]
pub(crate) struct CompiledPipeline {
pub(crate) processor: camel_api::BoxProcessor,
pub(crate) lifecycle: Vec<Arc<dyn camel_api::StepLifecycle>>,
}
pub(crate) struct PreparedRoute {
pub(crate) route_id: String,
pub(super) managed: ManagedRoute,
}
pub(super) fn handle_is_running(handle: &Option<JoinHandle<()>>) -> bool {
handle.as_ref().is_some_and(|h| !h.is_finished())
}
pub(super) fn inferred_lifecycle_label(managed: &ManagedRoute) -> &'static str {
match (
handle_is_running(&managed.consumer_handle),
handle_is_running(&managed.pipeline_handle),
) {
(true, true) => "Started",
(false, true) => "Suspended",
(true, false) => "Stopping",
(false, false) => "Stopped",
}
}
pub(super) fn find_top_level_aggregate_requiring_split(
steps: &[BuilderStep],
) -> Option<(usize, AggregatorConfig)> {
for (i, step) in steps.iter().enumerate() {
if let BuilderStep::Aggregate { config } = step {
if has_timeout_condition(&config.completion) || config.force_completion_on_stop {
return Some((i, config.clone()));
}
break;
}
}
None
}
#[derive(Debug, Clone)]
pub(super) struct ResequenceSplitInfo {
pub(super) index: usize,
pub(super) policy_config: ResequencePolicyConfig,
}
pub(super) fn find_top_level_resequencer_requiring_split(
steps: &[BuilderStep],
) -> Result<Option<ResequenceSplitInfo>, CamelError> {
let mut found: Option<ResequenceSplitInfo> = None;
for (i, step) in steps.iter().enumerate() {
if let BuilderStep::Resequence { policy_config } = step {
if found.is_some() {
return Err(CamelError::RouteError(
"Multiple top-level Resequence steps found — at most one allowed".into(),
));
}
found = Some(ResequenceSplitInfo {
index: i,
policy_config: policy_config.clone(),
});
}
}
Ok(found)
}
pub(super) fn assert_no_mixed_top_level_splits(steps: &[BuilderStep]) -> Result<(), CamelError> {
let has_aggregate_split = steps
.iter()
.any(|step| matches!(step, BuilderStep::Aggregate { config } if has_timeout_condition(&config.completion) || config.force_completion_on_stop));
let has_resequence = steps
.iter()
.any(|step| matches!(step, BuilderStep::Resequence { .. }));
if has_aggregate_split && has_resequence {
return Err(CamelError::RouteError(
"Route contains both a top-level Aggregate (requiring split) and a Resequence step — these split mechanisms are mutually exclusive".into(),
));
}
Ok(())
}
pub(super) fn is_pending(ex: &Exchange) -> bool {
ex.property("CamelAggregatorPending")
.and_then(|v| v.as_bool())
.unwrap_or(false)
}
pub(super) async fn ready_with_backoff(
pipeline: &mut camel_api::BoxProcessor,
cancel: &CancellationToken,
) -> Result<(), CamelError> {
loop {
match pipeline.ready().await {
Ok(_) => return Ok(()),
Err(CamelError::CircuitOpen(ref msg)) => {
warn!("Circuit open, backing off: {msg}");
tokio::select! {
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {
continue;
}
_ = cancel.cancelled() => {
return Err(CamelError::CircuitOpen(msg.clone()));
}
}
}
Err(e) => {
tracing::error!("Pipeline not ready: {e}");
return Err(e);
}
}
}
}
pub(super) fn runtime_failure_command(route_id: &str, error: &str) -> RuntimeCommand {
let stamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
RuntimeCommand::FailRoute {
route_id: route_id.to_string(),
error: error.to_string(),
command_id: format!("ctrl-fail-{route_id}-{stamp}"),
causation_id: None,
}
}
pub(super) async fn publish_runtime_failure(
runtime: Option<std::sync::Weak<dyn RuntimeHandle>>,
route_id: &str,
error: &str,
) {
let Some(runtime) = runtime.and_then(|weak| weak.upgrade()) else {
return;
};
let command = runtime_failure_command(route_id, error);
if let Err(runtime_error) = runtime.execute(command).await {
warn!(
route_id = %route_id,
error = %runtime_error,
"failed to synchronize route crash with runtime projection"
);
}
}
#[cfg(test)]
mod resequence_tests {
use super::*;
use camel_api::aggregator::AggregatorConfig;
fn make_aggregate_with_timeout() -> BuilderStep {
BuilderStep::Aggregate {
config: AggregatorConfig::correlate_by("id")
.complete_on_timeout(std::time::Duration::from_secs(5))
.build()
.expect("build aggregate config"), }
}
fn make_aggregate_with_force_completion() -> BuilderStep {
BuilderStep::Aggregate {
config: AggregatorConfig::correlate_by("id")
.complete_when_size(1)
.force_completion_on_stop(true)
.build()
.expect("build aggregate config"), }
}
fn make_aggregate_simple() -> BuilderStep {
BuilderStep::Aggregate {
config: AggregatorConfig::correlate_by("id")
.complete_when_size(5)
.build()
.expect("build aggregate config"), }
}
fn make_resequence() -> BuilderStep {
BuilderStep::Resequence {
policy_config: ResequencePolicyConfig::default(),
}
}
fn make_log(msg: &str) -> BuilderStep {
BuilderStep::Log {
level: camel_processor::LogLevel::Info,
message: msg.to_string(),
}
}
fn make_set_body(body: &str) -> BuilderStep {
use camel_api::{LanguageExpressionDef, ValueSourceDef};
BuilderStep::DeclarativeSetBody {
value: ValueSourceDef::Expression(LanguageExpressionDef {
language: "simple".into(),
source: body.to_string(),
}),
}
}
#[test]
fn find_top_level_resequencer_requiring_split_detects_single() {
let steps = vec![
make_log("A"),
make_resequence(),
make_set_body("B"),
make_log("C"),
];
let result = find_top_level_resequencer_requiring_split(&steps);
let info = result
.expect("should succeed")
.expect("should detect resequence");
assert_eq!(info.index, 1);
let pre = &steps[..info.index];
let post = &steps[info.index + 1..];
assert_eq!(pre.len(), 1);
assert!(matches!(pre[0], BuilderStep::Log { .. }));
assert_eq!(post.len(), 2);
}
#[test]
fn find_top_level_resequencer_requiring_split_rejects_multiple() {
let steps = vec![make_resequence(), make_resequence()];
let result = find_top_level_resequencer_requiring_split(&steps);
assert!(result.is_err());
let err = result.unwrap_err();
assert!(
matches!(err, CamelError::RouteError(_)),
"should be RouteError, got {err:?}"
);
}
#[test]
fn find_top_level_resequencer_requiring_split_none_when_absent() {
let steps = vec![make_log("A"), make_set_body("B")];
let result = find_top_level_resequencer_requiring_split(&steps);
assert!(result.expect("should succeed").is_none());
}
#[test]
fn assert_no_mixed_top_level_splits_rejects_aggregate_plus_resequencer() {
let steps = vec![make_aggregate_with_timeout(), make_resequence()];
let result = assert_no_mixed_top_level_splits(&steps);
assert!(result.is_err());
}
#[test]
fn assert_no_mixed_top_level_splits_rejects_force_completion_aggregate() {
let steps = vec![make_resequence(), make_aggregate_with_force_completion()];
let result = assert_no_mixed_top_level_splits(&steps);
assert!(result.is_err());
}
#[test]
fn assert_no_mixed_top_level_splits_allows_no_split() {
let steps = vec![make_log("A"), make_set_body("B")];
let result = assert_no_mixed_top_level_splits(&steps);
assert!(result.is_ok());
}
#[test]
fn assert_no_mixed_top_level_splits_allows_aggregate_only() {
let steps = vec![make_aggregate_with_timeout()];
let result = assert_no_mixed_top_level_splits(&steps);
assert!(result.is_ok());
}
#[test]
fn assert_no_mixed_top_level_splits_allows_simple_aggregate() {
let steps = vec![make_aggregate_simple(), make_resequence()];
let result = assert_no_mixed_top_level_splits(&steps);
assert!(result.is_ok());
}
#[test]
fn assert_no_mixed_top_level_splits_allows_resequence_only() {
let steps = vec![make_resequence()];
let result = assert_no_mixed_top_level_splits(&steps);
assert!(result.is_ok());
}
#[test]
fn find_top_level_resequencer_requiring_split_at_index_zero() {
let steps = vec![make_resequence(), make_log("C"), make_set_body("D")];
let result = find_top_level_resequencer_requiring_split(&steps);
let info = result
.expect("should succeed")
.expect("should detect resequence at index 0");
assert_eq!(info.index, 0);
let pre = &steps[..info.index];
let post = &steps[info.index + 1..];
assert!(
pre.is_empty(),
"pre should be empty when resequence is first"
);
assert_eq!(post.len(), 2, "post should contain Log and SetBody");
}
#[test]
fn find_top_level_resequencer_requiring_split_at_last_index() {
let steps = vec![make_log("A"), make_set_body("B"), make_resequence()];
let result = find_top_level_resequencer_requiring_split(&steps);
let info = result
.expect("should succeed")
.expect("should detect resequence at last index");
assert_eq!(info.index, 2);
let pre = &steps[..info.index];
let post = &steps[info.index + 1..];
assert_eq!(pre.len(), 2, "pre should contain Log and SetBody");
assert!(
post.is_empty(),
"post should be empty when resequence is last"
);
}
}