use camel_api::Value;
use camel_api::aggregator::AggregatorConfig;
use camel_api::splitter::{AggregationStrategy, SplitterConfig, split_body_lines};
use camel_builder::{RouteBuilder, StepAccumulator};
use camel_component_file::FileComponent;
use camel_component_http::HttpComponent;
use camel_component_log::LogComponent;
use camel_test::CamelTestContext;
#[tokio::test]
async fn timer_to_mock() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=3")
.route_id("test-route-1")
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(3).await;
let exchanges = endpoint.get_received_exchanges().await;
let first = &exchanges[0];
assert_eq!(
first.input.header("CamelTimerName"),
Some(&serde_json::Value::String("tick".into()))
);
assert_eq!(
first.input.header("CamelTimerCounter"),
Some(&serde_json::Value::Number(1.into()))
);
}
#[tokio::test]
async fn timer_filter_mock() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=4")
.route_id("test-route-2")
.filter(|ex| {
ex.input
.header("CamelTimerCounter")
.and_then(|v| v.as_u64())
.map(|n| n % 2 == 0)
.unwrap_or(false)
})
.to("mock:result")
.end_filter()
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(2).await;
let exchanges = endpoint.get_received_exchanges().await;
let counters: Vec<u64> = exchanges
.iter()
.map(|ex| {
ex.input
.header("CamelTimerCounter")
.and_then(|v| v.as_u64())
.expect("CamelTimerCounter header missing")
})
.collect();
assert_eq!(
counters,
vec![2, 4],
"only even counters should pass filter"
);
}
#[tokio::test]
async fn filter_matching_exchanges_reach_inner_mock() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = std::sync::Arc::clone(&counter);
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=4")
.route_id("test-route-3")
.process(move |mut ex: camel_api::Exchange| {
let c = std::sync::Arc::clone(&counter_clone);
async move {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ex.input
.set_header("active", Value::Bool(n.is_multiple_of(2)));
Ok(ex)
}
})
.filter(|ex| ex.input.header("active") == Some(&Value::Bool(true)))
.to("mock:matched")
.end_filter()
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("matched").unwrap();
endpoint.assert_exchange_count(2).await;
}
#[tokio::test]
async fn filter_non_matching_continue_outer_pipeline() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = std::sync::Arc::clone(&counter);
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=4")
.route_id("test-route-4")
.process(move |mut ex: camel_api::Exchange| {
let c = std::sync::Arc::clone(&counter_clone);
async move {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ex.input
.set_header("active", Value::Bool(n.is_multiple_of(2)));
Ok(ex)
}
})
.filter(|ex| ex.input.header("active") == Some(&Value::Bool(true)))
.to("mock:inner")
.end_filter()
.to("mock:outer") .build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
h.stop().await;
let inner = h.mock().get_endpoint("inner").unwrap();
let outer = h.mock().get_endpoint("outer").unwrap();
inner.assert_exchange_count(2).await; outer.assert_exchange_count(4).await; }
#[tokio::test]
async fn timer_set_header_mock() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=3")
.route_id("test-route-5")
.set_header("environment", Value::String("test".into()))
.set_header("version", Value::Number(1.into()))
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(3).await;
let exchanges = endpoint.get_received_exchanges().await;
for ex in exchanges.iter() {
assert_eq!(
ex.input.header("environment"),
Some(&Value::String("test".into())),
"Each exchange should have 'environment' header"
);
assert_eq!(
ex.input.header("version"),
Some(&Value::Number(1.into())),
"Each exchange should have 'version' header"
);
assert!(
ex.input.header("CamelTimerName").is_some(),
"Timer headers should still be present"
);
}
}
#[tokio::test]
async fn timer_to_log() {
let h = CamelTestContext::builder()
.with_timer()
.with_component(LogComponent::new())
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=2")
.route_id("test-route-6")
.set_header("source", Value::String("integration-test".into()))
.to("log:test?showHeaders=true")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
h.stop().await;
}
#[tokio::test]
async fn multiple_routes() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route_a = RouteBuilder::from("timer:routeA?period=50&repeatCount=2")
.route_id("test-route-7")
.set_header("route", Value::String("A".into()))
.to("mock:resultA")
.build()
.unwrap();
let route_b = RouteBuilder::from("timer:routeB?period=50&repeatCount=3")
.route_id("test-route-8")
.set_header("route", Value::String("B".into()))
.to("mock:resultB")
.build()
.unwrap();
h.add_route(route_a).await.unwrap();
h.add_route(route_b).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let endpoint_a = h.mock().get_endpoint("resultA").unwrap();
let endpoint_b = h.mock().get_endpoint("resultB").unwrap();
endpoint_a.assert_exchange_count(2).await;
endpoint_b.assert_exchange_count(3).await;
let a_exchanges = endpoint_a.get_received_exchanges().await;
let b_exchanges = endpoint_b.get_received_exchanges().await;
assert_eq!(
a_exchanges[0].input.header("route"),
Some(&Value::String("A".into()))
);
assert_eq!(
b_exchanges[0].input.header("route"),
Some(&Value::String("B".into()))
);
}
use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, CircuitBreakerConfig};
use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use std::time::Duration;
fn failing_step(msg: &'static str) -> BoxProcessor {
BoxProcessor::from_fn(move |_ex| {
Box::pin(async move { Err(CamelError::ProcessorError(msg.into())) })
})
}
#[tokio::test]
async fn dlc_receives_failed_exchange() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-9")
.process_fn(failing_step("intentional"))
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:dlc"))
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(300)).await;
h.stop().await;
let dlc = h.mock().get_endpoint("dlc").unwrap();
let exchanges = dlc.get_received_exchanges().await;
assert!(!exchanges.is_empty());
assert!(exchanges[0].has_error());
}
#[tokio::test]
async fn retry_recovers_before_dlc() {
use camel_api::error_handler::ErrorHandlerConfig;
let attempt = Arc::new(AtomicU32::new(0));
let attempt_clone = Arc::clone(&attempt);
let processor = BoxProcessor::from_fn(move |ex| {
let a = Arc::clone(&attempt_clone);
Box::pin(async move {
let n = a.fetch_add(1, Ordering::SeqCst);
if n < 2 {
Err(CamelError::ProcessorError("not yet".into()))
} else {
Ok(ex)
}
})
});
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let eh = ErrorHandlerConfig::dead_letter_channel("mock:dlc")
.on_exception(|_| true)
.retry(3)
.with_backoff(Duration::from_millis(1), 1.0, Duration::from_millis(10))
.build();
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-10")
.process_fn(processor)
.error_handler(eh)
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(400)).await;
h.stop().await;
if let Some(ep) = h.mock().get_endpoint("dlc") {
assert_eq!(ep.get_received_exchanges().await.len(), 0);
}
}
#[tokio::test]
async fn on_exception_handled_by_specific_endpoint() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let eh = ErrorHandlerConfig::dead_letter_channel("mock:default-dlc")
.on_exception(|e| matches!(e, CamelError::ProcessorError(_)))
.handled_by("mock:processor-errors")
.build();
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-11")
.process_fn(failing_step("processor error"))
.error_handler(eh)
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(300)).await;
h.stop().await;
let specific = h.mock().get_endpoint("processor-errors").unwrap();
assert!(!specific.get_received_exchanges().await.is_empty());
if let Some(ep) = h.mock().get_endpoint("default-dlc") {
assert_eq!(ep.get_received_exchanges().await.len(), 0);
}
}
#[tokio::test]
async fn global_error_handler_fallback() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
h.ctx()
.lock()
.await
.set_error_handler(ErrorHandlerConfig::dead_letter_channel("mock:global-dlc"))
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-12")
.process_fn(failing_step("global test"))
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(300)).await;
h.stop().await;
let dlc = h.mock().get_endpoint("global-dlc").unwrap();
assert!(!dlc.get_received_exchanges().await.is_empty());
}
#[tokio::test]
async fn per_route_overrides_global() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
h.ctx()
.lock()
.await
.set_error_handler(ErrorHandlerConfig::dead_letter_channel("mock:global-dlc"))
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-13")
.process_fn(failing_step("per-route test"))
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:route-dlc"))
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(300)).await;
h.stop().await;
let route_dlc = h.mock().get_endpoint("route-dlc").unwrap();
assert!(!route_dlc.get_received_exchanges().await.is_empty());
if let Some(ep) = h.mock().get_endpoint("global-dlc") {
assert_eq!(ep.get_received_exchanges().await.len(), 0);
}
}
#[tokio::test]
async fn direct_error_bubbles_to_caller() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_direct()
.with_mock()
.build()
.await;
let sub_route = RouteBuilder::from("direct:sub")
.route_id("test-route-14")
.process_fn(failing_step("subroute failure"))
.build()
.unwrap();
h.add_route(sub_route).await.unwrap();
let main_route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-15")
.to("direct:sub")
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:caller-dlc"))
.build()
.unwrap();
h.ctx()
.lock()
.await
.add_route_definition(main_route)
.await
.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(400)).await;
h.stop().await;
let dlc = h.mock().get_endpoint("caller-dlc").unwrap();
let exchanges = dlc.get_received_exchanges().await;
assert!(!exchanges.is_empty());
assert!(exchanges[0].has_error());
}
#[tokio::test]
async fn direct_error_contained_in_subroute() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_direct()
.with_mock()
.build()
.await;
let sub_route = RouteBuilder::from("direct:sub2")
.route_id("test-route-16")
.process_fn(failing_step("contained failure"))
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:sub-dlc"))
.build()
.unwrap();
h.add_route(sub_route).await.unwrap();
let main_route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-17")
.to("direct:sub2")
.to("mock:caller-received")
.build()
.unwrap();
h.ctx()
.lock()
.await
.add_route_definition(main_route)
.await
.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(400)).await;
h.stop().await;
let sub_dlc = h.mock().get_endpoint("sub-dlc").unwrap();
assert!(!sub_dlc.get_received_exchanges().await.is_empty());
let caller = h.mock().get_endpoint("caller-received").unwrap();
assert!(!caller.get_received_exchanges().await.is_empty());
}
#[tokio::test]
async fn no_error_handler_logs_and_continues() {
let h = CamelTestContext::builder().with_timer().build().await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=3")
.route_id("test-route-18")
.process_fn(failing_step("no handler"))
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
}
#[tokio::test]
async fn circuit_breaker_with_error_handler() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=5")
.route_id("test-route-19")
.process_fn(failing_step("cb test failure"))
.circuit_breaker(
CircuitBreakerConfig::new()
.failure_threshold(2)
.open_duration(Duration::from_secs(60)),
)
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:dlc"))
.to("mock:sink")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
let dlc = h.mock().get_endpoint("dlc").unwrap();
let dlc_exchanges = dlc.get_received_exchanges().await;
assert_eq!(
dlc_exchanges.len(),
2,
"DLC should receive exactly failure_threshold (2) exchanges"
);
for ex in &dlc_exchanges {
assert!(ex.has_error(), "Each DLC exchange should carry an error");
}
if let Some(sink) = h.mock().get_endpoint("sink") {
assert_eq!(
sink.get_received_exchanges().await.len(),
0,
"mock:sink should receive zero exchanges"
);
}
}
#[tokio::test]
async fn split_with_timer_and_mock() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:split-test?period=100&repeatCount=1")
.route_id("test-route-20")
.process(|mut ex: camel_api::Exchange| async move {
ex.input.body = camel_api::body::Body::Text("line1\nline2\nline3".to_string());
Ok(ex)
})
.split(SplitterConfig::new(split_body_lines()).aggregation(AggregationStrategy::CollectAll))
.to("mock:per-line")
.end_split()
.to("mock:final")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
h.stop().await;
let per_line = h.mock().get_endpoint("per-line").unwrap();
let per_line_count = per_line.get_received_exchanges().await.len();
assert_eq!(
per_line_count, 3,
"Expected 3 per-line exchanges, got {per_line_count}"
);
let final_ep = h.mock().get_endpoint("final").unwrap();
let final_exchanges = final_ep.get_received_exchanges().await;
assert_eq!(
final_exchanges.len(),
1,
"Expected 1 final exchange, got {}",
final_exchanges.len()
);
let expected = serde_json::json!(["line1", "line2", "line3"]);
match &final_exchanges[0].input.body {
camel_api::body::Body::Json(v) => assert_eq!(
*v, expected,
"CollectAll should produce JSON array of fragment bodies"
),
other => panic!("Expected JSON body from CollectAll, got {other:?}"),
}
}
#[tokio::test]
async fn split_with_error_handler() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:split-err?period=100&repeatCount=1")
.route_id("test-route-21")
.process(|mut ex: camel_api::Exchange| async move {
ex.input.body = camel_api::body::Body::Text("a\nb".to_string());
Ok(ex)
})
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:dlc"))
.split(SplitterConfig::new(split_body_lines()).stop_on_exception(true))
.process_fn(failing_step("fragment boom"))
.end_split()
.to("mock:sink")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
h.stop().await;
let dlc = h.mock().get_endpoint("dlc").unwrap();
let dlc_count = dlc.get_received_exchanges().await.len();
assert_eq!(
dlc_count, 1,
"Expected exactly 1 DLC exchange (stop_on_exception), got {dlc_count}"
);
let sink = h.mock().get_endpoint("sink").unwrap();
let sink_count = sink.get_received_exchanges().await.len();
assert_eq!(sink_count, 0, "Expected 0 sink exchanges, got {sink_count}");
}
#[tokio::test]
async fn file_consumer_to_mock() {
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path().to_str().unwrap();
std::fs::write(dir.path().join("a.txt"), "alpha").unwrap();
std::fs::write(dir.path().join("b.txt"), "beta").unwrap();
let h = CamelTestContext::builder()
.with_component(FileComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from(&format!(
"file:{dir_path}?noop=true&initialDelay=0&delay=100"
))
.route_id("test-file-consumer")
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
let exchanges = endpoint.get_received_exchanges().await;
assert!(
exchanges.len() >= 2,
"Should have read at least 2 files, got {}",
exchanges.len()
);
for ex in &exchanges {
assert!(ex.input.header("CamelFileName").is_some());
assert!(ex.input.header("CamelFileNameOnly").is_some());
}
}
#[tokio::test]
async fn timer_to_file_producer() {
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path().to_str().unwrap();
let h = CamelTestContext::builder()
.with_timer()
.with_component(FileComponent::new())
.build()
.await;
let route = RouteBuilder::from("timer:write-test?period=50&repeatCount=2")
.route_id("test-route-22")
.set_header("CamelFileName", Value::String("output.txt".into()))
.to(format!("file:{dir_path}?fileExist=Append"))
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
assert!(
dir.path().join("output.txt").exists(),
"File should have been written"
);
}
#[tokio::test]
async fn file_to_file_pipeline() {
let input_dir = tempfile::tempdir().unwrap();
let output_dir = tempfile::tempdir().unwrap();
let input_path = input_dir.path().to_str().unwrap();
let output_path = output_dir.path().to_str().unwrap();
std::fs::write(input_dir.path().join("source.txt"), "hello world").unwrap();
let h = CamelTestContext::builder()
.with_component(FileComponent::new())
.build()
.await;
let route = RouteBuilder::from(&format!(
"file:{input_path}?noop=true&initialDelay=0&delay=100"
))
.route_id("test-file-pipeline")
.to(format!("file:{output_path}"))
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
h.stop().await;
assert!(
output_dir.path().join("source.txt").exists(),
"File should have been copied to output directory"
);
let content = std::fs::read_to_string(output_dir.path().join("source.txt")).unwrap();
assert_eq!(content, "hello world");
}
#[tokio::test]
async fn http_component_registration_and_endpoint_creation() {
use camel_component_api::Component;
let component = HttpComponent::new();
assert_eq!(component.scheme(), "http");
let endpoint = component
.create_endpoint(
"http://example.com/api?httpMethod=POST&connectTimeout=5000",
&camel_component_api::NoOpComponentContext,
)
.unwrap();
assert!(endpoint.uri().contains("httpMethod=POST"));
}
#[tokio::test]
async fn http_query_params_forwarding_config() {
use camel_component_http::HttpEndpointConfig;
use camel_endpoint::UriConfig;
let config = HttpEndpointConfig::from_uri(
"http://api.example.com/v1/users?apiKey=secret123&httpMethod=GET&token=abc456",
)
.unwrap();
assert!(
config.query_params.contains_key("apiKey"),
"apiKey should be preserved"
);
assert!(
config.query_params.contains_key("token"),
"token should be preserved"
);
assert_eq!(config.query_params.get("apiKey").unwrap(), "secret123");
assert_eq!(config.query_params.get("token").unwrap(), "abc456");
assert!(
!config.query_params.contains_key("httpMethod"),
"httpMethod should not be forwarded"
);
}
#[tokio::test]
async fn http_get_e2e() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/greeting"))
.respond_with(ResponseTemplate::new(200).set_body_string("hello from wiremock"))
.expect(1)
.mount(&server)
.await;
let http_uri = format!(
"http://127.0.0.1:{}/api/greeting?httpMethod=GET&allowPrivateIps=true",
server.address().port()
);
let h = CamelTestContext::builder()
.with_timer()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-23")
.to(&http_uri)
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
let ex = &exchanges[0];
match &ex.input.body {
camel_api::body::Body::Bytes(b) => {
assert_eq!(std::str::from_utf8(b).unwrap(), "hello from wiremock");
}
other => panic!("expected Body::Bytes, got {:?}", other),
}
assert_eq!(
ex.input.header("CamelHttpResponseCode"),
Some(&serde_json::Value::Number(200.into()))
);
}
#[tokio::test]
async fn http_post_with_body_e2e() {
use wiremock::matchers::{body_string, method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/api/data"))
.and(body_string("payload-from-camel"))
.respond_with(ResponseTemplate::new(201).set_body_string("created"))
.expect(1)
.mount(&server)
.await;
let http_uri = format!(
"http://127.0.0.1:{}/api/data?httpMethod=POST&allowPrivateIps=true",
server.address().port()
);
let h = CamelTestContext::builder()
.with_timer()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-24")
.map_body(|_body| camel_api::body::Body::Text("payload-from-camel".into()))
.to(&http_uri)
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
let ex = &exchanges[0];
assert_eq!(
ex.input.header("CamelHttpResponseCode"),
Some(&serde_json::Value::Number(201.into()))
);
match &ex.input.body {
camel_api::body::Body::Bytes(b) => {
assert_eq!(std::str::from_utf8(b).unwrap(), "created");
}
other => panic!("expected Body::Bytes, got {:?}", other),
}
}
#[tokio::test]
async fn http_response_headers_mapped_e2e() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/headers"))
.respond_with(
ResponseTemplate::new(200)
.insert_header("x-custom-header", "custom-value")
.insert_header("x-request-id", "req-42")
.set_body_string("ok"),
)
.expect(1)
.mount(&server)
.await;
let http_uri = format!(
"http://127.0.0.1:{}/api/headers?httpMethod=GET&allowPrivateIps=true",
server.address().port()
);
let h = CamelTestContext::builder()
.with_timer()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-25")
.to(&http_uri)
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
let ex = &exchanges[0];
assert_eq!(
ex.input.header("x-custom-header"),
Some(&serde_json::Value::String("custom-value".into()))
);
assert_eq!(
ex.input.header("x-request-id"),
Some(&serde_json::Value::String("req-42".into()))
);
}
#[tokio::test]
async fn http_error_handling_e2e() {
use camel_api::error_handler::ErrorHandlerConfig;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/fail"))
.respond_with(ResponseTemplate::new(500).set_body_string("internal server error"))
.expect(1)
.mount(&server)
.await;
let http_uri = format!(
"http://127.0.0.1:{}/api/fail?httpMethod=GET&allowPrivateIps=true",
server.address().port()
);
let h = CamelTestContext::builder()
.with_timer()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-26")
.to(&http_uri)
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:dlc"))
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
let dlc = h.mock().get_endpoint("dlc").unwrap();
let dlc_exchanges = dlc.get_received_exchanges().await;
assert_eq!(
dlc_exchanges.len(),
1,
"DLC should receive the failed exchange"
);
assert!(
dlc_exchanges[0].has_error(),
"Exchange should carry an error"
);
if let Some(result) = h.mock().get_endpoint("result") {
assert_eq!(
result.get_received_exchanges().await.len(),
0,
"mock:result should not receive the failed exchange"
);
}
}
#[tokio::test]
async fn aggregator_collect_all() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = std::sync::Arc::clone(&counter);
let route = RouteBuilder::from("timer:agg-test?period=1&repeatCount=9")
.route_id("test-route-27")
.process(move |mut ex: camel_api::Exchange| {
let c = std::sync::Arc::clone(&counter_clone);
async move {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let key = ["A", "B", "C"][(n % 3) as usize];
ex.input
.headers
.insert("orderId".to_string(), serde_json::json!(key));
ex.input.body = camel_api::body::Body::Text(format!("item-{n}"));
Ok(ex)
}
})
.aggregate(
AggregatorConfig::correlate_by("orderId")
.complete_when_size(3)
.build(),
)
.to("mock:aggregated")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("aggregated").unwrap();
let exchanges = endpoint.get_received_exchanges().await;
let completed: Vec<_> = exchanges
.iter()
.filter(|e| e.property("CamelAggregatorPending").is_none())
.collect();
assert_eq!(
completed.len(),
3,
"expected 3 completed batches, got {}",
completed.len()
);
for ex in &completed {
let camel_api::body::Body::Json(v) = &ex.input.body else {
panic!("expected Body::Json, got {:?}", ex.input.body);
};
let arr = v.as_array().expect("expected JSON array");
assert_eq!(arr.len(), 3, "each batch should have 3 items");
}
}
#[tokio::test]
async fn aggregator_custom_strategy() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = std::sync::Arc::clone(&counter);
let fold_fn: camel_api::aggregator::AggregationFn =
std::sync::Arc::new(|mut acc: camel_api::Exchange, next: camel_api::Exchange| {
let a = acc.input.body.as_text().unwrap_or("").to_string();
let b = next.input.body.as_text().unwrap_or("").to_string();
acc.input.body = camel_api::body::Body::Text(format!("{a}+{b}"));
acc
});
let route = RouteBuilder::from("timer:agg-custom?period=1&repeatCount=4")
.route_id("test-route-28")
.process(move |mut ex: camel_api::Exchange| {
let c = std::sync::Arc::clone(&counter_clone);
async move {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ex.input
.headers
.insert("key".to_string(), serde_json::json!("X"));
ex.input.body = camel_api::body::Body::Text(n.to_string());
Ok(ex)
}
})
.aggregate(
AggregatorConfig::correlate_by("key")
.complete_when_size(4)
.strategy(camel_api::aggregator::AggregationStrategy::Custom(fold_fn))
.build(),
)
.to("mock:custom-agg")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("custom-agg").unwrap();
let exchanges = endpoint.get_received_exchanges().await;
let completed: Vec<_> = exchanges
.iter()
.filter(|e| e.property("CamelAggregatorPending").is_none())
.collect();
assert_eq!(
completed.len(),
1,
"expected 1 completed aggregate, got {}",
completed.len()
);
assert_eq!(completed[0].input.body.as_text(), Some("0+1+2+3"));
}
#[tokio::test]
async fn aggregator_scatter_gather() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:scatter?period=10&repeatCount=3")
.route_id("test-route-29")
.aggregate(
AggregatorConfig::correlate_by("CamelTimerName")
.complete_when_size(3)
.build(),
)
.to("mock:scatter-gather")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("scatter-gather").unwrap();
let exchanges = endpoint.get_received_exchanges().await;
let completed: Vec<_> = exchanges
.iter()
.filter(|e| e.property("CamelAggregatorPending").is_none())
.collect();
assert_eq!(
completed.len(),
1,
"expected 1 completed aggregate, got {}",
completed.len()
);
}
#[tokio::test]
async fn aggregator_agg_timeout_emits_after_inactivity() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:agg-timeout?period=50&repeatCount=1")
.route_id("agg-timeout-route")
.process(|mut ex: camel_api::Exchange| async move {
ex.input.set_header("key", Value::String("A".into()));
Ok(ex)
})
.aggregate(
AggregatorConfig::correlate_by("key")
.complete_on_timeout(std::time::Duration::from_millis(200))
.build(),
)
.to("mock:agg-timeout-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(450)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("agg-timeout-result").unwrap();
endpoint.assert_exchange_count(1).await;
}
#[tokio::test]
async fn aggregator_agg_force_completion_on_stop() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:agg-force?period=50&repeatCount=1")
.route_id("agg-force-route")
.process(|mut ex: camel_api::Exchange| async move {
ex.input.set_header("key", Value::String("A".into()));
Ok(ex)
})
.aggregate(
AggregatorConfig::correlate_by("key")
.complete_when_size(100)
.force_completion_on_stop(true)
.build(),
)
.to("mock:agg-force-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(80)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("agg-force-result").unwrap();
endpoint.assert_exchange_count(1).await;
}
#[tokio::test]
async fn route_set_body_static() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:set-body-static?period=50&repeatCount=1")
.route_id("test-route-30")
.set_body("enriched")
.to("mock:set-body-static")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(200)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("set-body-static").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(
exchanges[0].input.body.as_text(),
Some("enriched"),
"set_body should replace body with static string"
);
}
#[tokio::test]
async fn route_set_body_fn() {
use camel_api::body::Body;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:set-body-fn?period=50&repeatCount=1")
.route_id("test-route-31")
.set_body("hello")
.set_body_fn(|ex: &camel_api::Exchange| {
let text = ex.input.body.as_text().unwrap_or("");
Body::Text(text.to_uppercase())
})
.to("mock:set-body-fn")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(200)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("set-body-fn").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(
exchanges[0].input.body.as_text(),
Some("HELLO"),
"set_body_fn should uppercase the body read from the exchange"
);
}
#[tokio::test]
async fn route_set_header_fn() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:set-header-fn?period=50&repeatCount=1")
.route_id("test-route-32")
.set_body("ping")
.set_header_fn("echo", |ex: &camel_api::Exchange| {
Value::String(ex.input.body.as_text().unwrap_or("").into())
})
.to("mock:set-header-fn")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(200)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("set-header-fn").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(
exchanges[0].input.header("echo"),
Some(&Value::String("ping".into())),
"set_header_fn should copy body text into the 'echo' header"
);
}
#[tokio::test]
async fn http_query_params_forwarded_e2e() {
use wiremock::matchers::{method, path, query_param};
use wiremock::{Mock, MockServer, ResponseTemplate};
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/api/search"))
.and(query_param("apiKey", "secret123"))
.and(query_param("lang", "rust"))
.respond_with(ResponseTemplate::new(200).set_body_string("found"))
.expect(1)
.mount(&server)
.await;
let http_uri = format!(
"http://127.0.0.1:{}/api/search?httpMethod=GET&allowPrivateIps=true&apiKey=secret123&lang=rust",
server.address().port()
);
let h = CamelTestContext::builder()
.with_timer()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=1")
.route_id("test-route-33")
.to(&http_uri)
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(500)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("result").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
match &exchanges[0].input.body {
camel_api::body::Body::Bytes(b) => {
assert_eq!(std::str::from_utf8(b).unwrap(), "found");
}
other => panic!("expected Body::Bytes, got {:?}", other),
}
}
#[tokio::test]
async fn stop_inside_filter_prevents_outer_pipeline() {
use std::time::Duration;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let counter = std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0));
let counter_clone = std::sync::Arc::clone(&counter);
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=4")
.route_id("test-route-34")
.process(move |mut ex: camel_api::Exchange| {
let c = std::sync::Arc::clone(&counter_clone);
async move {
let n = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
ex.input
.set_header("active", Value::Bool(n.is_multiple_of(2)));
Ok(ex)
}
})
.filter(|ex| ex.input.header("active") == Some(&Value::Bool(true)))
.to("mock:inner")
.stop() .end_filter()
.to("mock:outer") .build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(400)).await;
h.stop().await;
let inner = h.mock().get_endpoint("inner").unwrap();
let outer = h.mock().get_endpoint("outer").unwrap();
inner.assert_exchange_count(2).await; outer.assert_exchange_count(2).await; }
#[tokio::test]
async fn multicast_sends_to_multiple_endpoints() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:multicast-test?period=50&repeatCount=1")
.route_id("test-route-35")
.multicast()
.to("mock:a")
.to("mock:b")
.to("mock:c")
.end_multicast()
.to("mock:final")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let endpoint_a = h.mock().get_endpoint("a").unwrap();
let endpoint_b = h.mock().get_endpoint("b").unwrap();
let endpoint_c = h.mock().get_endpoint("c").unwrap();
let endpoint_final = h.mock().get_endpoint("final").unwrap();
endpoint_a.assert_exchange_count(1).await;
endpoint_b.assert_exchange_count(1).await;
endpoint_c.assert_exchange_count(1).await;
endpoint_final.assert_exchange_count(1).await;
}
#[tokio::test]
async fn multicast_metadata_properties() {
use camel_processor::CAMEL_MULTICAST_INDEX;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:multicast-meta?period=50&repeatCount=1")
.route_id("test-route-36")
.multicast()
.to("mock:meta-a")
.to("mock:meta-b")
.to("mock:meta-c")
.end_multicast()
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let exchanges_a = h
.mock()
.get_endpoint("meta-a")
.unwrap()
.get_received_exchanges()
.await;
let exchanges_b = h
.mock()
.get_endpoint("meta-b")
.unwrap()
.get_received_exchanges()
.await;
let exchanges_c = h
.mock()
.get_endpoint("meta-c")
.unwrap()
.get_received_exchanges()
.await;
assert_eq!(exchanges_a.len(), 1);
assert_eq!(exchanges_b.len(), 1);
assert_eq!(exchanges_c.len(), 1);
let idx_a = exchanges_a[0].property(CAMEL_MULTICAST_INDEX);
let idx_b = exchanges_b[0].property(CAMEL_MULTICAST_INDEX);
let idx_c = exchanges_c[0].property(CAMEL_MULTICAST_INDEX);
assert!(idx_a.is_some(), "meta-a should have CAMEL_MULTICAST_INDEX");
assert!(idx_b.is_some(), "meta-b should have CAMEL_MULTICAST_INDEX");
assert!(idx_c.is_some(), "meta-c should have CAMEL_MULTICAST_INDEX");
assert_eq!(idx_a, Some(&Value::from(0i64)));
assert_eq!(idx_b, Some(&Value::from(1i64)));
assert_eq!(idx_c, Some(&Value::from(2i64)));
}
#[tokio::test]
async fn multicast_parallel_collect_all() {
use camel_api::body::Body;
use camel_api::multicast::MulticastStrategy;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:multicast-parallel?period=50&repeatCount=1")
.route_id("test-route-37")
.multicast()
.parallel(true)
.aggregation(MulticastStrategy::CollectAll)
.to("mock:p-a")
.to("mock:p-b")
.end_multicast()
.to("mock:p-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let endpoint_a = h.mock().get_endpoint("p-a").unwrap();
let endpoint_b = h.mock().get_endpoint("p-b").unwrap();
let endpoint_result = h.mock().get_endpoint("p-result").unwrap();
endpoint_a.assert_exchange_count(1).await;
endpoint_b.assert_exchange_count(1).await;
endpoint_result.assert_exchange_count(1).await;
let result_exchanges = endpoint_result.get_received_exchanges().await;
assert_eq!(result_exchanges.len(), 1);
assert!(
matches!(&result_exchanges[0].input.body, Body::Json(v) if v.is_array()),
"expected JSON array body from CollectAll aggregation, got {:?}",
result_exchanges[0].input.body
);
}
#[tokio::test]
async fn http_concurrent_pipeline() {
let h = CamelTestContext::builder()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("http://0.0.0.0:18080/concurrent-test")
.route_id("test-route-38")
.process(|ex| async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(ex)
})
.to("mock:concurrent-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let client = reqwest::Client::new();
let mut handles = Vec::new();
let start = std::time::Instant::now();
for i in 0..5 {
let client = client.clone();
handles.push(tokio::spawn(async move {
client
.get(format!("http://127.0.0.1:18080/concurrent-test?i={i}"))
.send()
.await
.unwrap()
}));
}
for handle in handles {
let resp = handle.await.unwrap();
assert_eq!(resp.status(), 200);
}
let elapsed = start.elapsed();
h.stop().await;
assert!(
elapsed < std::time::Duration::from_millis(350),
"Expected concurrent execution (<350ms), but took {:?}. \
Pipeline may be running sequentially.",
elapsed
);
let endpoint = h.mock().get_endpoint("concurrent-result").unwrap();
endpoint.assert_exchange_count(5).await;
}
#[tokio::test]
async fn http_sequential_override() {
let h = CamelTestContext::builder()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("http://0.0.0.0:18081/sequential-test")
.route_id("test-route-39")
.sequential()
.process(|ex| async move {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
Ok(ex)
})
.to("mock:sequential-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let client = reqwest::Client::new();
let mut handles = Vec::new();
let start = std::time::Instant::now();
for i in 0..3 {
let client = client.clone();
handles.push(tokio::spawn(async move {
client
.get(format!("http://127.0.0.1:18081/sequential-test?i={i}"))
.send()
.await
.unwrap()
}));
}
for handle in handles {
let resp = handle.await.unwrap();
assert_eq!(resp.status(), 200);
}
let elapsed = start.elapsed();
h.stop().await;
assert!(
elapsed >= std::time::Duration::from_millis(250),
"Expected sequential execution (>=250ms), but took {:?}. \
Pipeline may be running concurrently despite .sequential() override.",
elapsed
);
let endpoint = h.mock().get_endpoint("sequential-result").unwrap();
endpoint.assert_exchange_count(3).await;
}
#[tokio::test]
async fn http_concurrent_with_semaphore_limit() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
let h = CamelTestContext::builder()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let peak = Arc::new(AtomicUsize::new(0));
let current = Arc::new(AtomicUsize::new(0));
let peak_clone = peak.clone();
let current_clone = current.clone();
let route = RouteBuilder::from("http://0.0.0.0:18082/semaphore-test")
.route_id("test-route-40")
.concurrent(2)
.process(move |ex| {
let peak = peak_clone.clone();
let current = current_clone.clone();
async move {
let val = current.fetch_add(1, Ordering::SeqCst) + 1;
peak.fetch_max(val, Ordering::SeqCst);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
current.fetch_sub(1, Ordering::SeqCst);
Ok(ex)
}
})
.to("mock:semaphore-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let client = reqwest::Client::new();
let mut handles = Vec::new();
for i in 0..6 {
let client = client.clone();
handles.push(tokio::spawn(async move {
client
.get(format!("http://127.0.0.1:18082/semaphore-test?i={i}"))
.send()
.await
.unwrap()
}));
}
for handle in handles {
let resp = handle.await.unwrap();
assert_eq!(resp.status(), 200);
}
h.stop().await;
let peak_val = peak.load(Ordering::SeqCst);
assert!(
peak_val <= 2,
"Expected peak concurrency <= 2, but got {}. Semaphore not working.",
peak_val
);
let endpoint = h.mock().get_endpoint("semaphore-result").unwrap();
endpoint.assert_exchange_count(6).await;
}
#[tokio::test]
async fn http_concurrent_with_circuit_breaker() {
use camel_api::error_handler::ErrorHandlerConfig;
let h = CamelTestContext::builder()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("http://0.0.0.0:18083/cb-test")
.route_id("test-route-41")
.process_fn(failing_step("concurrent cb failure"))
.circuit_breaker(
CircuitBreakerConfig::new()
.failure_threshold(2)
.open_duration(Duration::from_secs(1)),
)
.error_handler(ErrorHandlerConfig::dead_letter_channel("mock:dlc"))
.to("mock:sink")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let client = reqwest::Client::new();
let mut handles = Vec::new();
for i in 0..5 {
let client = client.clone();
handles.push(tokio::spawn(async move {
client
.get(format!("http://127.0.0.1:18083/cb-test?i={i}"))
.send()
.await
.unwrap()
}));
}
for handle in handles {
let resp = handle.await.unwrap();
assert!(
resp.status() == 200 || resp.status() == 500,
"Expected 200 or 500, got {}",
resp.status()
);
}
h.stop().await;
let dlc = h.mock().get_endpoint("dlc").unwrap();
let dlc_exchanges = dlc.get_received_exchanges().await;
assert!(
dlc_exchanges.len() >= 2,
"DLC should receive at least 2 exchanges (failure_threshold), got {}",
dlc_exchanges.len()
);
assert!(
dlc_exchanges.len() <= 5,
"DLC should receive at most 5 exchanges (total requests), got {}",
dlc_exchanges.len()
);
for ex in &dlc_exchanges {
assert!(ex.has_error(), "Each DLC exchange should carry an error");
}
if let Some(sink) = h.mock().get_endpoint("sink") {
assert_eq!(
sink.get_received_exchanges().await.len(),
0,
"mock:sink should receive zero exchanges"
);
}
}
#[tokio::test]
async fn http_concurrent_shutdown_drains_inflight() {
let h = CamelTestContext::builder()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("http://0.0.0.0:18084/shutdown-test")
.route_id("test-route-42")
.process(|ex| async move {
tokio::time::sleep(Duration::from_millis(200)).await;
Ok(ex)
})
.to("mock:shutdown-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let client = reqwest::Client::new();
let mut handles = Vec::new();
for i in 0..3 {
let client = client.clone();
handles.push(tokio::spawn(async move {
client
.get(format!("http://127.0.0.1:18084/shutdown-test?i={i}"))
.send()
.await
.unwrap()
}));
}
tokio::time::sleep(Duration::from_millis(50)).await;
h.stop().await;
for handle in handles {
let resp = handle.await.unwrap();
assert_eq!(resp.status(), 200);
}
let endpoint = h.mock().get_endpoint("shutdown-result").unwrap();
endpoint.assert_exchange_count(3).await;
}
#[tokio::test]
async fn http_concurrent_error_propagation() {
let h = CamelTestContext::builder()
.with_component(HttpComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("http://0.0.0.0:18085/error-test")
.route_id("test-route-43")
.process(|ex| async move {
let should_fail = ex
.input
.header("CamelHttpQuery")
.and_then(|v| v.as_str())
.map(|q| q.contains("fail=true"))
.unwrap_or(false);
if should_fail {
Err(CamelError::ProcessorError("deliberate".into()))
} else {
Ok(ex)
}
})
.to("mock:error-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(100)).await;
let client = reqwest::Client::new();
let mut handles = Vec::new();
let mut fail_indices = Vec::new();
for i in 0..4 {
let should_fail = i % 2 == 0; if should_fail {
fail_indices.push(i);
}
let client = client.clone();
handles.push(tokio::spawn(async move {
let url = if should_fail {
format!("http://127.0.0.1:18085/error-test?i={i}&fail=true")
} else {
format!("http://127.0.0.1:18085/error-test?i={i}&fail=false")
};
let resp = client.get(&url).send().await.unwrap();
(i, should_fail, resp)
}));
}
for handle in handles {
let (i, should_fail, resp) = handle.await.unwrap();
if should_fail {
assert_eq!(
resp.status(),
500,
"Request {i} with fail=true should return 500"
);
} else {
assert_eq!(
resp.status(),
200,
"Request {i} with fail=false should return 200"
);
}
}
h.stop().await;
let endpoint = h.mock().get_endpoint("error-result").unwrap();
endpoint.assert_exchange_count(2).await;
}
#[tokio::test]
async fn choice_when_routes_matching_exchange() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=4")
.route_id("test-route-44")
.choice()
.when(|ex| {
ex.input
.header("CamelTimerCounter")
.and_then(|v| v.as_u64())
.map(|n| n % 2 == 0)
.unwrap_or(false)
})
.to("mock:even")
.end_when()
.when(|ex| {
ex.input
.header("CamelTimerCounter")
.and_then(|v| v.as_u64())
.map(|n| n % 2 != 0)
.unwrap_or(false)
})
.to("mock:odd")
.end_when()
.end_choice()
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
h.stop().await;
let even = h.mock().get_endpoint("even").unwrap();
let odd = h.mock().get_endpoint("odd").unwrap();
even.assert_exchange_count(2).await; odd.assert_exchange_count(2).await; }
#[tokio::test]
async fn choice_otherwise_fires_when_no_when_matches() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=3")
.route_id("test-route-45")
.choice()
.when(|ex| ex.input.header("nonexistent").is_some())
.to("mock:never")
.end_when()
.otherwise()
.to("mock:fallback")
.end_otherwise()
.end_choice()
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let fallback = h.mock().get_endpoint("fallback").unwrap();
fallback.assert_exchange_count(3).await;
if let Some(never) = h.mock().get_endpoint("never") {
never.assert_exchange_count(0).await;
}
}
#[tokio::test]
async fn choice_no_match_no_otherwise_continues() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=3")
.route_id("test-route-46")
.choice()
.when(|ex| ex.input.header("nonexistent").is_some())
.to("mock:never")
.end_when()
.end_choice()
.to("mock:after") .build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(300)).await;
h.stop().await;
let after = h.mock().get_endpoint("after").unwrap();
after.assert_exchange_count(3).await;
if let Some(never) = h.mock().get_endpoint("never") {
never.assert_exchange_count(0).await;
}
}
#[tokio::test]
async fn choice_short_circuits_first_match() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=4")
.route_id("test-route-47")
.choice()
.when(|_ex| true)
.to("mock:first")
.end_when()
.when(|_ex| true)
.to("mock:second")
.end_when()
.end_choice()
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(400)).await;
h.stop().await;
let first = h.mock().get_endpoint("first").unwrap();
first.assert_exchange_count(4).await;
if let Some(second) = h.mock().get_endpoint("second") {
second.assert_exchange_count(0).await;
}
}
#[tokio::test]
async fn delay_step_waits_configured_duration() {
use camel_api::body::Body;
use camel_api::{Exchange, Message};
use camel_component_direct::DirectComponent;
use std::time::{Duration, Instant};
use tower::util::ServiceExt;
let direct = DirectComponent::new();
let h = CamelTestContext::builder()
.with_component(direct)
.with_mock()
.build()
.await;
let route = RouteBuilder::from("direct:delay-in")
.route_id("test-delay-step")
.delay(Duration::from_millis(100))
.to("mock:delay-out")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(50)).await;
let producer = {
let ctx = h.ctx().lock().await;
let producer_ctx = ctx.producer_context();
let registry = ctx.registry();
let component = registry.get("direct").unwrap();
let endpoint = component.create_endpoint("direct:delay-in", &*ctx).unwrap();
endpoint.create_producer(&producer_ctx).unwrap()
};
let exchange = Exchange::new(Message::new(Body::Text("hello".to_string())));
let started = Instant::now();
let _ = producer.oneshot(exchange).await.unwrap();
let elapsed = started.elapsed();
assert!(
elapsed >= Duration::from_millis(100),
"expected at least 100ms delay, got {:?}",
elapsed
);
let endpoint = h.mock().get_endpoint("delay-out").unwrap();
endpoint.await_exchanges(1, Duration::from_secs(2)).await;
h.stop().await;
}
#[tokio::test]
async fn xml_body_pipeline() {
use camel_api::body::Body;
use camel_api::{Exchange, Message};
use camel_component_direct::DirectComponent;
use tower::util::ServiceExt;
let direct = DirectComponent::new();
let h = CamelTestContext::builder()
.with_component(direct)
.with_mock()
.build()
.await;
let route = RouteBuilder::from("direct:xml-in")
.route_id("test-xml-body-pipeline")
.convert_body_to(camel_api::BodyType::Text)
.to("mock:xml-out")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let producer = {
let ctx = h.ctx().lock().await;
let producer_ctx = ctx.producer_context();
let registry = ctx.registry();
let component = registry.get("direct").unwrap();
let endpoint = component.create_endpoint("direct:xml-in", &*ctx).unwrap();
endpoint.create_producer(&producer_ctx).unwrap()
};
let xml_content = "<root><msg>hello</msg></root>";
let exchange = Exchange::new(Message::new(Body::Xml(xml_content.to_string())));
let _ = producer.oneshot(exchange).await.unwrap();
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("xml-out").unwrap();
endpoint.assert_exchange_count(1).await;
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(
exchanges[0].input.body.as_text(),
Some(xml_content),
"Body should be converted from Xml to Text"
);
}
#[tokio::test(flavor = "multi_thread")]
async fn mock_new_assertion_api() {
use std::time::Duration;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:tick?period=50&repeatCount=3")
.route_id("test-mock-new-assertion-api")
.to("mock:assert-api-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
let ep = h.mock().get_endpoint("assert-api-result").unwrap();
ep.await_exchanges(3, Duration::from_millis(2000)).await;
ep.exchange(0).assert_body_text("timer://tick tick #1");
ep.exchange(1).assert_body_text("timer://tick tick #2");
ep.exchange(2).assert_body_text("timer://tick tick #3");
h.stop().await;
}