use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::warn;
pub(crate) enum FunctionStagingMode {
DirectAdd,
DryCompile,
HotReload { generation: u64 },
}
use camel_api::{
BoxProcessor, CamelError, Exchange, FilterPredicate, FunctionInvoker, IdentityProcessor,
ProducerContext, Value,
body::Body,
loop_eip::{LoopConfig, LoopMode},
};
use camel_bean::BeanRegistry;
use camel_component_api::ComponentContext;
use camel_endpoint::parse_uri;
use camel_language_api::{Expression, Language, LanguageError, Predicate};
use camel_processor::script_mutator::ScriptMutator;
use camel_processor::{ChoiceService, WhenClause};
use crate::lifecycle::adapters::route_compiler::compose_pipeline;
use crate::lifecycle::adapters::route_controller::SharedLanguageRegistry;
use crate::lifecycle::application::route_definition::{
BuilderStep, LanguageExpressionDef, ValueSourceDef,
};
use crate::shared::components::domain::Registry;
pub(crate) fn resolve_language(
languages: &SharedLanguageRegistry,
language: &str,
) -> Result<Arc<dyn Language>, CamelError> {
let guard = languages
.lock()
.expect("mutex poisoned: another thread panicked while holding this lock"); guard.get(language).cloned().ok_or_else(|| {
CamelError::RouteError(format!(
"language `{language}` is not registered in CamelContext"
))
})
}
pub(crate) fn compile_language_expression(
languages: &SharedLanguageRegistry,
expression: &LanguageExpressionDef,
) -> Result<Arc<dyn Expression>, CamelError> {
let language = resolve_language(languages, &expression.language)?;
let compiled = language
.create_expression(&expression.source)
.map_err(|e| {
CamelError::RouteError(format!(
"failed to compile {} expression `{}`: {e}",
expression.language, expression.source
))
})?;
Ok(Arc::from(compiled))
}
pub(crate) fn compile_language_predicate(
languages: &SharedLanguageRegistry,
expression: &LanguageExpressionDef,
) -> Result<Arc<dyn Predicate>, CamelError> {
let language = resolve_language(languages, &expression.language)?;
let compiled = language.create_predicate(&expression.source).map_err(|e| {
CamelError::RouteError(format!(
"failed to compile {} predicate `{}`: {e}",
expression.language, expression.source
))
})?;
Ok(Arc::from(compiled))
}
pub(crate) fn compile_filter_predicate(
languages: &SharedLanguageRegistry,
expression: &LanguageExpressionDef,
) -> Result<FilterPredicate, CamelError> {
let predicate = compile_language_predicate(languages, expression)?;
Ok(Arc::new(move |exchange: &Exchange| {
predicate.matches(exchange).unwrap_or(false)
}))
}
fn value_to_body(value: Value) -> Body {
match value {
Value::Null => Body::Empty,
Value::String(text) => Body::Text(text),
other => Body::Json(other),
}
}
#[allow(clippy::only_used_in_recursion, clippy::too_many_arguments)]
pub(crate) fn resolve_steps(
steps: Vec<BuilderStep>,
producer_ctx: &ProducerContext,
registry: &Arc<std::sync::Mutex<Registry>>,
languages: &SharedLanguageRegistry,
beans: &Arc<std::sync::Mutex<BeanRegistry>>,
function_invoker: Option<Arc<dyn FunctionInvoker>>,
component_ctx: Arc<dyn ComponentContext>,
route_id: Option<&str>,
staging_mode: &FunctionStagingMode,
) -> Result<Vec<(BoxProcessor, Option<camel_api::BodyType>)>, CamelError> {
let resolve_producer = |uri: &str| -> Result<BoxProcessor, CamelError> {
let parsed = parse_uri(uri)?;
let component = component_ctx
.resolve_component(&parsed.scheme)
.ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
let endpoint = component.create_endpoint(uri, component_ctx.as_ref())?;
endpoint.create_producer(producer_ctx)
};
let mut processors: Vec<(BoxProcessor, Option<camel_api::BodyType>)> = Vec::new();
for (step_index, step) in steps.into_iter().enumerate() {
match step {
BuilderStep::Processor(svc) => {
processors.push((svc, None));
}
BuilderStep::To(uri) => {
let parsed = parse_uri(&uri)?;
let component = component_ctx
.resolve_component(&parsed.scheme)
.ok_or_else(|| CamelError::ComponentNotFound(parsed.scheme.clone()))?;
let endpoint = component.create_endpoint(&uri, component_ctx.as_ref())?;
let contract = endpoint.body_contract();
let producer = endpoint.create_producer(producer_ctx)?;
processors.push((producer, contract));
}
BuilderStep::Stop => {
processors.push((BoxProcessor::new(camel_processor::StopService), None));
}
BuilderStep::Delay { config } => {
let svc = camel_processor::delayer::DelayerService::new(config);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Loop { config, steps } => {
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc = camel_processor::loop_eip::LoopService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeLoop {
count,
while_predicate,
steps,
} => {
let mode = match (count, while_predicate) {
(Some(n), None) => LoopMode::Count(n),
(None, Some(pred)) => {
let predicate = compile_filter_predicate(languages, &pred)?;
LoopMode::While(predicate)
}
(Some(_), Some(_)) => {
return Err(CamelError::RouteError(
"loop: cannot specify both 'count' and 'while'".into(),
));
}
(None, None) => {
return Err(CamelError::RouteError(
"loop: must specify either 'count' or 'while'".into(),
));
}
};
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let config = LoopConfig { mode };
let svc = camel_processor::loop_eip::LoopService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Log { level, message } => {
let svc = camel_processor::LogProcessor::new(level, message);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeSetHeader { key, value } => match value {
ValueSourceDef::Literal(value) => {
let svc = camel_processor::SetHeader::new(IdentityProcessor, key, value);
processors.push((BoxProcessor::new(svc), None));
}
ValueSourceDef::Expression(expression) => {
let expression = compile_language_expression(languages, &expression)?;
let svc = camel_processor::DynamicSetHeader::new(
IdentityProcessor,
key,
move |exchange: &Exchange| {
expression.evaluate(exchange).unwrap_or(Value::Null)
},
);
processors.push((BoxProcessor::new(svc), None));
}
},
BuilderStep::DeclarativeSetProperty { key, value_source } => match value_source {
ValueSourceDef::Literal(value) => {
let svc = camel_processor::set_property::SetProperty::new(
IdentityProcessor,
key,
value,
);
processors.push((BoxProcessor::new(svc), None));
}
ValueSourceDef::Expression(expression) => {
let expression = compile_language_expression(languages, &expression)?;
let svc = camel_processor::DynamicSetProperty::new(
IdentityProcessor,
key,
move |exchange: &Exchange| {
expression.evaluate(exchange).unwrap_or(Value::Null)
},
);
processors.push((BoxProcessor::new(svc), None));
}
},
BuilderStep::DeclarativeSetBody { value } => match value {
ValueSourceDef::Literal(value) => {
let body = value_to_body(value);
let svc = camel_processor::SetBody::new(
IdentityProcessor,
move |_exchange: &Exchange| body.clone(),
);
processors.push((BoxProcessor::new(svc), None));
}
ValueSourceDef::Expression(expression) => {
let expression = compile_language_expression(languages, &expression)?;
let svc = camel_processor::SetBody::new(
IdentityProcessor,
move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
value_to_body(value)
},
);
processors.push((BoxProcessor::new(svc), None));
}
},
BuilderStep::DeclarativeFilter { predicate, steps } => {
let predicate = compile_filter_predicate(languages, &predicate)?;
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc = camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeChoice { whens, otherwise } => {
let mut when_clauses = Vec::new();
for when_step in whens {
let predicate = compile_filter_predicate(languages, &when_step.predicate)?;
let sub_pairs = resolve_steps(
when_step.steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let pipeline = compose_pipeline(sub_processors);
when_clauses.push(WhenClause {
predicate,
pipeline,
});
}
let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
let sub_pairs = resolve_steps(
otherwise_steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
Some(compose_pipeline(sub_processors))
} else {
None
};
let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeScript { expression } => {
let lang = resolve_language(languages, &expression.language)?;
match lang.create_mutating_expression(&expression.source) {
Ok(mut_expr) => {
processors.push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
}
Err(LanguageError::NotSupported { .. }) => {
let expression = compile_language_expression(languages, &expression)?;
let svc = camel_processor::SetBody::new(
IdentityProcessor,
move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
value_to_body(value)
},
);
processors.push((BoxProcessor::new(svc), None));
}
Err(e) => {
return Err(CamelError::RouteError(format!(
"Failed to create mutating expression for language '{}': {}",
expression.language, e
)));
}
}
}
BuilderStep::DeclarativeFunction { mut definition } => {
let Some(invoker) = function_invoker.clone() else {
return Err(CamelError::Config(
"function: step requires FunctionRuntimeService registered via with_lifecycle"
.into(),
));
};
definition.route_id = route_id.map(|s| s.to_string());
definition.step_index = Some(step_index);
match staging_mode {
FunctionStagingMode::DirectAdd => {
invoker.stage_pending(definition.clone(), route_id, 0);
}
FunctionStagingMode::HotReload { generation } => {
invoker.stage_pending(definition.clone(), route_id, *generation);
}
FunctionStagingMode::DryCompile => {}
}
let step = crate::step::function_step::FunctionStep::new(invoker, definition);
processors.push((BoxProcessor::new(step), None));
}
BuilderStep::Split { config, steps } => {
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let splitter =
camel_processor::splitter::SplitterService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(splitter), None));
}
BuilderStep::DeclarativeSplit {
expression,
aggregation,
parallel,
parallel_limit,
stop_on_exception,
steps,
} => {
let lang_expr = compile_language_expression(languages, &expression)?;
let split_fn = move |exchange: &Exchange| {
let value = lang_expr.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::String(s) => s
.lines()
.filter(|line| !line.is_empty())
.map(|line| {
let mut fragment = exchange.clone();
fragment.input.body = Body::from(line.to_string());
fragment
})
.collect(),
Value::Array(arr) => arr
.into_iter()
.map(|v| {
let mut fragment = exchange.clone();
fragment.input.body = Body::from(v);
fragment
})
.collect(),
_ => vec![exchange.clone()],
}
};
let mut config = camel_api::splitter::SplitterConfig::new(Arc::new(split_fn))
.aggregation(aggregation)
.parallel(parallel)
.stop_on_exception(stop_on_exception);
if let Some(limit) = parallel_limit {
config = config.parallel_limit(limit);
}
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let splitter =
camel_processor::splitter::SplitterService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(splitter), None));
}
BuilderStep::Aggregate { config } => {
let (late_tx, _late_rx) = mpsc::channel(256);
let registry: SharedLanguageRegistry =
Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
let cancel = CancellationToken::new();
let svc =
camel_processor::AggregatorService::new(config, late_tx, registry, cancel);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Filter { predicate, steps } => {
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc = camel_processor::FilterService::from_predicate(predicate, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Choice { whens, otherwise } => {
let mut when_clauses = Vec::new();
for when_step in whens {
let sub_pairs = resolve_steps(
when_step.steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let pipeline = compose_pipeline(sub_processors);
when_clauses.push(WhenClause {
predicate: when_step.predicate,
pipeline,
});
}
let otherwise_pipeline = if let Some(otherwise_steps) = otherwise {
let sub_pairs = resolve_steps(
otherwise_steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
Some(compose_pipeline(sub_processors))
} else {
None
};
let svc = ChoiceService::new(when_clauses, otherwise_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::WireTap { uri } => {
let producer = resolve_producer(&uri)?;
let svc = camel_processor::WireTapService::new(producer);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Multicast { config, steps } => {
let mut endpoints = Vec::new();
for step in steps {
let sub_pairs = resolve_steps(
vec![step],
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let endpoint = compose_pipeline(sub_processors);
endpoints.push(endpoint);
}
let svc = camel_processor::MulticastService::new(endpoints, config);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeLog { level, message } => {
let ValueSourceDef::Expression(expression) = message else {
unreachable!(
"DeclarativeLog with Literal should have been compiled to a Processor"
);
};
let expression = compile_language_expression(languages, &expression)?;
let svc =
camel_processor::log::DynamicLog::new(level, move |exchange: &Exchange| {
expression
.evaluate(exchange)
.unwrap_or_else(|e| {
warn!(error = %e, "log expression evaluation failed");
Value::Null
})
.to_string()
});
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::Bean { name, method } => {
let beans = beans.lock().expect(
"beans mutex poisoned: another thread panicked while holding this lock",
);
let bean = beans.get(&name).ok_or_else(|| {
CamelError::ProcessorError(format!("Bean not found: {}", name))
})?;
let bean_clone = Arc::clone(&bean);
let method = method.clone();
let processor = tower::service_fn(move |mut exchange: Exchange| {
let bean = Arc::clone(&bean_clone);
let method = method.clone();
async move {
bean.call(&method, &mut exchange).await?;
Ok(exchange)
}
});
processors.push((BoxProcessor::new(processor), None));
}
BuilderStep::Script { language, script } => {
let lang = resolve_language(languages, &language)?;
match lang.create_mutating_expression(&script) {
Ok(mut_expr) => {
processors.push((BoxProcessor::new(ScriptMutator::new(mut_expr)), None));
}
Err(LanguageError::NotSupported {
feature,
language: ref lang_name,
}) => {
return Err(CamelError::RouteError(format!(
"Language '{}' does not support {} (required for .script() step)",
lang_name, feature
)));
}
Err(e) => {
return Err(CamelError::RouteError(format!(
"Failed to create mutating expression for language '{}': {}",
language, e
)));
}
}
}
BuilderStep::Throttle { config, steps } => {
let sub_pairs = resolve_steps(
steps,
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let sub_pipeline = compose_pipeline(sub_processors);
let svc = camel_processor::throttler::ThrottlerService::new(config, sub_pipeline);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::LoadBalance { config, steps } => {
let mut endpoints = Vec::new();
for step in steps {
let sub_pairs = resolve_steps(
vec![step],
producer_ctx,
registry,
languages,
beans,
function_invoker.clone(),
Arc::clone(&component_ctx),
route_id,
staging_mode,
)?;
let sub_processors: Vec<BoxProcessor> =
sub_pairs.into_iter().map(|(p, _)| p).collect();
let endpoint = compose_pipeline(sub_processors);
endpoints.push(endpoint);
}
let svc =
camel_processor::load_balancer::LoadBalancerService::new(endpoints, config);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DynamicRouter { config } => {
use camel_api::EndpointResolver;
let producer_ctx_clone = producer_ctx.clone();
let component_ctx_clone = Arc::clone(&component_ctx);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let component = match component_ctx_clone.resolve_component(&parsed.scheme) {
Some(c) => c,
None => return None,
};
let endpoint =
match component.create_endpoint(uri, component_ctx_clone.as_ref()) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc =
camel_processor::dynamic_router::DynamicRouterService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeDynamicRouter {
expression,
uri_delimiter,
cache_size,
ignore_invalid_endpoints,
max_iterations,
} => {
use camel_api::EndpointResolver;
let expression = compile_language_expression(languages, &expression)?;
let expression: camel_api::RouterExpression =
Arc::new(move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::Null => None,
Value::String(s) => Some(s),
other => Some(other.to_string()),
}
});
let config = camel_api::DynamicRouterConfig::new(expression)
.uri_delimiter(uri_delimiter)
.cache_size(cache_size)
.ignore_invalid_endpoints(ignore_invalid_endpoints)
.max_iterations(max_iterations);
let producer_ctx_clone = producer_ctx.clone();
let component_ctx_clone = Arc::clone(&component_ctx);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let component = match component_ctx_clone.resolve_component(&parsed.scheme) {
Some(c) => c,
None => return None,
};
let endpoint =
match component.create_endpoint(uri, component_ctx_clone.as_ref()) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc =
camel_processor::dynamic_router::DynamicRouterService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::RoutingSlip { config } => {
use camel_api::EndpointResolver;
let producer_ctx_clone = producer_ctx.clone();
let component_ctx_clone = Arc::clone(&component_ctx);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let component = match component_ctx_clone.resolve_component(&parsed.scheme) {
Some(c) => c,
None => return None,
};
let endpoint =
match component.create_endpoint(uri, component_ctx_clone.as_ref()) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc = camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeRoutingSlip {
expression,
uri_delimiter,
cache_size,
ignore_invalid_endpoints,
} => {
use camel_api::EndpointResolver;
let expression = compile_language_expression(languages, &expression)?;
let expression: camel_api::RoutingSlipExpression =
Arc::new(move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::Null => None,
Value::String(s) => Some(s),
other => Some(other.to_string()),
}
});
let config = camel_api::RoutingSlipConfig::new(expression)
.uri_delimiter(uri_delimiter)
.cache_size(cache_size)
.ignore_invalid_endpoints(ignore_invalid_endpoints);
let producer_ctx_clone = producer_ctx.clone();
let component_ctx_clone = Arc::clone(&component_ctx);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let component = match component_ctx_clone.resolve_component(&parsed.scheme) {
Some(c) => c,
None => return None,
};
let endpoint =
match component.create_endpoint(uri, component_ctx_clone.as_ref()) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc = camel_processor::routing_slip::RoutingSlipService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::RecipientList { config } => {
use camel_api::EndpointResolver;
let producer_ctx_clone = producer_ctx.clone();
let component_ctx_clone = Arc::clone(&component_ctx);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let component = match component_ctx_clone.resolve_component(&parsed.scheme) {
Some(c) => c,
None => return None,
};
let endpoint =
match component.create_endpoint(uri, component_ctx_clone.as_ref()) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let svc =
camel_processor::recipient_list::RecipientListService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
BuilderStep::DeclarativeRecipientList {
expression,
delimiter,
parallel,
parallel_limit,
stop_on_exception,
aggregation,
} => {
use camel_api::EndpointResolver;
let expression = compile_language_expression(languages, &expression)?;
let expression: camel_api::recipient_list::RecipientListExpression =
Arc::new(move |exchange: &Exchange| {
let value = expression.evaluate(exchange).unwrap_or(Value::Null);
match value {
Value::Null => String::new(),
Value::String(s) => s,
other => other.to_string(),
}
});
let config = camel_api::recipient_list::RecipientListConfig::new(expression)
.delimiter(&delimiter)
.parallel(parallel)
.stop_on_exception(stop_on_exception);
let config = if let Some(limit) = parallel_limit {
config.parallel_limit(limit)
} else {
config
};
let producer_ctx_clone = producer_ctx.clone();
let component_ctx_clone = Arc::clone(&component_ctx);
let resolver: EndpointResolver = Arc::new(move |uri: &str| {
let parsed = match parse_uri(uri) {
Ok(p) => p,
Err(_) => return None,
};
let component = match component_ctx_clone.resolve_component(&parsed.scheme) {
Some(c) => c,
None => return None,
};
let endpoint =
match component.create_endpoint(uri, component_ctx_clone.as_ref()) {
Ok(e) => e,
Err(_) => return None,
};
let producer = match endpoint.create_producer(&producer_ctx_clone) {
Ok(p) => p,
Err(_) => return None,
};
Some(BoxProcessor::new(producer))
});
let _ = aggregation; let svc =
camel_processor::recipient_list::RecipientListService::new(config, resolver);
processors.push((BoxProcessor::new(svc), None));
}
}
}
Ok(processors)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::lifecycle::application::route_definition::LanguageExpressionDef;
use crate::shared::components::domain::Registry;
fn languages_with_simple() -> SharedLanguageRegistry {
let mut map: std::collections::HashMap<String, Arc<dyn Language>> =
std::collections::HashMap::new();
map.insert(
"simple".to_string(),
Arc::new(camel_language_simple::SimpleLanguage::new()),
);
Arc::new(std::sync::Mutex::new(map))
}
#[test]
fn resolve_language_returns_error_for_unregistered_name() {
let languages = Arc::new(std::sync::Mutex::new(std::collections::HashMap::new()));
let err = match resolve_language(&languages, "missing") {
Ok(_) => panic!("resolve_language should fail for unregistered language"),
Err(err) => err,
};
assert!(err.to_string().contains("missing"));
}
#[test]
fn compile_language_expression_and_predicate_work_for_simple_language() {
let languages = languages_with_simple();
let expression = LanguageExpressionDef {
language: "simple".into(),
source: "${header.answer}".into(),
};
let predicate_expression = LanguageExpressionDef {
language: "simple".into(),
source: "${header.answer} == '42'".into(),
};
let compiled_expression = compile_language_expression(&languages, &expression).unwrap();
let compiled_predicate =
compile_language_predicate(&languages, &predicate_expression).unwrap();
let mut msg = camel_api::message::Message::default();
msg.set_header("answer", Value::String("42".into()));
let exchange = Exchange::new(msg);
assert_eq!(
compiled_expression.evaluate(&exchange).unwrap(),
Value::String("42".into())
);
assert!(compiled_predicate.matches(&exchange).unwrap());
}
#[test]
fn compile_filter_predicate_returns_boolean_result() {
let languages = languages_with_simple();
let expression = LanguageExpressionDef {
language: "simple".into(),
source: "${header.flag} == 'yes'".into(),
};
let predicate = compile_filter_predicate(&languages, &expression).unwrap();
let mut msg = camel_api::message::Message::default();
msg.set_header("flag", Value::String("yes".into()));
let exchange = Exchange::new(msg);
assert!(predicate(&exchange));
}
#[test]
fn value_to_body_covers_null_string_and_json() {
assert!(matches!(value_to_body(Value::Null), Body::Empty));
assert!(matches!(
value_to_body(Value::String("x".into())),
Body::Text(ref s) if s == "x"
));
assert!(matches!(
value_to_body(Value::Number(serde_json::Number::from(7))),
Body::Json(Value::Number(_))
));
}
#[test]
fn resolve_steps_validates_declarative_loop_shape() {
let languages = languages_with_simple();
let producer_ctx = ProducerContext::new();
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let beans = Arc::new(std::sync::Mutex::new(BeanRegistry::new()));
let component_ctx: Arc<dyn ComponentContext> =
Arc::new(camel_component_api::NoOpComponentContext);
let both = resolve_steps(
vec![BuilderStep::DeclarativeLoop {
count: Some(2),
while_predicate: Some(LanguageExpressionDef {
language: "simple".into(),
source: "${header.k} == 'v'".into(),
}),
steps: vec![],
}],
&producer_ctx,
®istry,
&languages,
&beans,
None,
Arc::clone(&component_ctx),
Some("r1"),
&FunctionStagingMode::DirectAdd,
)
.unwrap_err();
assert!(
both.to_string()
.contains("cannot specify both 'count' and 'while'")
);
let neither = resolve_steps(
vec![BuilderStep::DeclarativeLoop {
count: None,
while_predicate: None,
steps: vec![],
}],
&producer_ctx,
®istry,
&languages,
&beans,
None,
component_ctx,
Some("r1"),
&FunctionStagingMode::DirectAdd,
)
.unwrap_err();
assert!(
neither
.to_string()
.contains("must specify either 'count' or 'while'")
);
}
#[test]
fn resolve_steps_returns_component_not_found_for_to_step() {
let languages = languages_with_simple();
let producer_ctx = ProducerContext::new();
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let beans = Arc::new(std::sync::Mutex::new(BeanRegistry::new()));
let component_ctx: Arc<dyn ComponentContext> =
Arc::new(camel_component_api::NoOpComponentContext);
let err = resolve_steps(
vec![BuilderStep::To("unknown:dest".into())],
&producer_ctx,
®istry,
&languages,
&beans,
None,
component_ctx,
Some("r1"),
&FunctionStagingMode::DirectAdd,
)
.unwrap_err();
assert!(err.to_string().contains("unknown"));
}
#[test]
fn compile_language_expression_and_predicate_propagate_compile_errors() {
let languages = languages_with_simple();
let bad_expr = LanguageExpressionDef {
language: "simple".into(),
source: "${header.a".into(),
};
let bad_pred = LanguageExpressionDef {
language: "simple".into(),
source: "${header.a == 'x'".into(),
};
let expr_err = match compile_language_expression(&languages, &bad_expr) {
Ok(_) => panic!("expression compile should fail"),
Err(err) => err,
};
assert!(
expr_err
.to_string()
.contains("failed to compile simple expression")
);
let pred_err = match compile_language_predicate(&languages, &bad_pred) {
Ok(_) => panic!("predicate compile should fail"),
Err(err) => err,
};
assert!(
pred_err
.to_string()
.contains("failed to compile simple predicate")
);
}
#[test]
fn resolve_steps_covers_non_endpoint_variants() {
use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
use std::time::Duration;
let expr = |source: &str| LanguageExpressionDef {
language: "simple".into(),
source: source.into(),
};
let languages = languages_with_simple();
let producer_ctx = ProducerContext::new();
let registry = Arc::new(std::sync::Mutex::new(Registry::new()));
let beans = Arc::new(std::sync::Mutex::new(BeanRegistry::new()));
let component_ctx: Arc<dyn ComponentContext> =
Arc::new(camel_component_api::NoOpComponentContext);
let steps = vec![
BuilderStep::Processor(BoxProcessor::new(IdentityProcessor)),
BuilderStep::Stop,
BuilderStep::Delay {
config: camel_api::DelayConfig::new(1),
},
BuilderStep::Loop {
config: camel_api::loop_eip::LoopConfig::new(camel_api::loop_eip::LoopMode::Count(
1,
)),
steps: vec![BuilderStep::Stop],
},
BuilderStep::DeclarativeLoop {
count: Some(1),
while_predicate: None,
steps: vec![BuilderStep::Stop],
},
BuilderStep::Log {
level: camel_processor::LogLevel::Info,
message: "log".into(),
},
BuilderStep::DeclarativeSetHeader {
key: "k".into(),
value: ValueSourceDef::Literal(Value::String("v".into())),
},
BuilderStep::DeclarativeSetProperty {
key: "p".into(),
value_source: ValueSourceDef::Expression(expr("${header.k}")),
},
BuilderStep::DeclarativeSetBody {
value: ValueSourceDef::Expression(expr("${header.k}")),
},
BuilderStep::DeclarativeFilter {
predicate: expr("${header.k} == 'v'"),
steps: vec![BuilderStep::Stop],
},
BuilderStep::DeclarativeChoice {
whens: vec![
crate::lifecycle::application::route_definition::DeclarativeWhenStep {
predicate: expr("${header.k} == 'v'"),
steps: vec![BuilderStep::Stop],
},
],
otherwise: Some(vec![BuilderStep::Stop]),
},
BuilderStep::DeclarativeScript {
expression: expr("${header.k}"),
},
BuilderStep::Split {
config: SplitterConfig::new(split_body_lines())
.aggregation(AggregationStrategy::CollectAll),
steps: vec![BuilderStep::Stop],
},
BuilderStep::DeclarativeSplit {
expression: expr("${body}"),
aggregation: AggregationStrategy::Original,
parallel: false,
parallel_limit: Some(2),
stop_on_exception: true,
steps: vec![BuilderStep::Stop],
},
BuilderStep::Aggregate {
config: camel_api::AggregatorConfig::correlate_by("id")
.complete_when_size(1)
.build(),
},
BuilderStep::Filter {
predicate: Arc::new(|_| true),
steps: vec![BuilderStep::Stop],
},
BuilderStep::Choice {
whens: vec![crate::lifecycle::application::route_definition::WhenStep {
predicate: Arc::new(|_| true),
steps: vec![BuilderStep::Stop],
}],
otherwise: Some(vec![BuilderStep::Stop]),
},
BuilderStep::Multicast {
steps: vec![BuilderStep::Stop, BuilderStep::Stop],
config: camel_api::MulticastConfig::new(),
},
BuilderStep::DeclarativeLog {
level: camel_processor::LogLevel::Info,
message: ValueSourceDef::Expression(expr("${header.k}")),
},
BuilderStep::Throttle {
config: camel_api::ThrottlerConfig::new(10, Duration::from_millis(10)),
steps: vec![BuilderStep::Stop],
},
BuilderStep::LoadBalance {
config: camel_api::LoadBalancerConfig::round_robin(),
steps: vec![BuilderStep::Stop, BuilderStep::Stop],
},
BuilderStep::DynamicRouter {
config: camel_api::DynamicRouterConfig::new(Arc::new(|_| None)),
},
BuilderStep::DeclarativeDynamicRouter {
expression: expr("${header.routes}"),
uri_delimiter: ",".into(),
cache_size: 8,
ignore_invalid_endpoints: true,
max_iterations: 3,
},
BuilderStep::RoutingSlip {
config: camel_api::RoutingSlipConfig::new(Arc::new(|_| None)),
},
BuilderStep::DeclarativeRoutingSlip {
expression: expr("${header.routes}"),
uri_delimiter: ";".into(),
cache_size: 16,
ignore_invalid_endpoints: true,
},
BuilderStep::RecipientList {
config: camel_api::recipient_list::RecipientListConfig::new(Arc::new(|_| {
String::new()
})),
},
BuilderStep::DeclarativeRecipientList {
expression: expr("${header.routes}"),
delimiter: ",".into(),
parallel: true,
parallel_limit: Some(2),
stop_on_exception: false,
aggregation: "noop".into(),
},
];
let resolved = resolve_steps(
steps,
&producer_ctx,
®istry,
&languages,
&beans,
None,
component_ctx,
Some("r1"),
&FunctionStagingMode::DirectAdd,
)
.unwrap();
assert!(!resolved.is_empty());
}
}