use std::time::Duration;
use camel_api::{RouteStatus, RuntimeCommand};
use camel_builder::{RouteBuilder, StepAccumulator};
use camel_component_controlbus::ControlBusComponent;
use camel_test::CamelTestContext;
async fn route_status(h: &CamelTestContext, route_id: &str) -> Option<RouteStatus> {
let ctx = h.ctx().lock().await;
match ctx
.runtime_route_status(route_id)
.await
.expect("runtime route status query failed")
{
Some(status) => match status.as_str() {
"Stopped" => Some(RouteStatus::Stopped),
"Registered" => Some(RouteStatus::Stopped),
"Starting" => Some(RouteStatus::Starting),
"Started" => Some(RouteStatus::Started),
"Stopping" => Some(RouteStatus::Stopping),
"Suspended" => Some(RouteStatus::Suspended),
"Failed" => Some(RouteStatus::Failed("failed".to_string())),
_ => None,
},
None => None,
}
}
async fn start_route(h: &CamelTestContext, route_id: &str) {
let runtime = {
let ctx = h.ctx().lock().await;
ctx.runtime()
};
runtime
.execute(RuntimeCommand::StartRoute {
route_id: route_id.to_string(),
command_id: format!("test:start:{route_id}"),
causation_id: None,
})
.await
.expect("failed to start route");
}
async fn suspend_route(h: &CamelTestContext, route_id: &str) {
let runtime = {
let ctx = h.ctx().lock().await;
ctx.runtime()
};
runtime
.execute(RuntimeCommand::SuspendRoute {
route_id: route_id.to_string(),
command_id: format!("test:suspend:{route_id}"),
causation_id: None,
})
.await
.expect("failed to suspend route");
}
async fn resume_route(h: &CamelTestContext, route_id: &str) {
let runtime = {
let ctx = h.ctx().lock().await;
ctx.runtime()
};
runtime
.execute(RuntimeCommand::ResumeRoute {
route_id: route_id.to_string(),
command_id: format!("test:resume:{route_id}"),
causation_id: None,
})
.await
.expect("failed to resume route");
}
#[tokio::test]
async fn autostartup_false_route_does_not_start() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:lazy?period=50&repeatCount=3")
.route_id("lazy-route")
.auto_startup(false)
.to("mock:result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
let status = route_status(&h, "lazy-route").await;
assert_eq!(
status,
Some(RouteStatus::Stopped),
"Route with auto_startup(false) should remain Stopped after ctx.start()"
);
tokio::time::sleep(Duration::from_millis(200)).await;
if let Some(endpoint) = h.mock().get_endpoint("result") {
let exchanges = endpoint.get_received_exchanges().await;
assert_eq!(
exchanges.len(),
0,
"Lazy route should not have processed any exchanges"
);
}
h.stop().await;
}
#[tokio::test]
async fn controlbus_starts_route() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.with_component(ControlBusComponent::new())
.build()
.await;
let lazy_route = RouteBuilder::from("timer:lazy?period=50&repeatCount=3")
.route_id("lazy-route")
.auto_startup(false)
.to("mock:lazy-result")
.build()
.unwrap();
let trigger_route = RouteBuilder::from("timer:trigger?period=50&repeatCount=1")
.route_id("trigger-route")
.to("controlbus:route?routeId=lazy-route&action=start")
.to("mock:trigger-done")
.build()
.unwrap();
h.add_route(lazy_route).await.unwrap();
h.add_route(trigger_route).await.unwrap();
let status_before = route_status(&h, "lazy-route").await;
assert_eq!(
status_before,
Some(RouteStatus::Stopped),
"Lazy route should be Stopped before context starts"
);
h.start().await;
let trigger_status = route_status(&h, "trigger-route").await;
assert_eq!(
trigger_status,
Some(RouteStatus::Started),
"Trigger route should be Started"
);
tokio::time::sleep(Duration::from_millis(150)).await;
let status_after = route_status(&h, "lazy-route").await;
assert_eq!(
status_after,
Some(RouteStatus::Started),
"Lazy route should be Started after ControlBus action"
);
let trigger_endpoint = h.mock().get_endpoint("trigger-done").unwrap();
trigger_endpoint.assert_exchange_count(1).await;
tokio::time::sleep(Duration::from_millis(250)).await;
h.stop().await;
let lazy_endpoint = h.mock().get_endpoint("lazy-result").unwrap();
let lazy_exchanges = lazy_endpoint.get_received_exchanges().await;
assert!(
lazy_exchanges.len() >= 3,
"Lazy route should have processed at least 3 exchanges after being started, got {}",
lazy_exchanges.len()
);
}
#[tokio::test]
async fn route_controller_starts_lazy_route() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let lazy_route = RouteBuilder::from("timer:direct-test?period=50&repeatCount=3")
.route_id("direct-lazy-route")
.auto_startup(false)
.to("mock:direct-result")
.build()
.unwrap();
h.add_route(lazy_route).await.unwrap();
h.start().await;
let status = route_status(&h, "direct-lazy-route").await;
assert_eq!(
status,
Some(RouteStatus::Stopped),
"Lazy route should be Stopped after ctx.start()"
);
start_route(&h, "direct-lazy-route").await;
let status_after = route_status(&h, "direct-lazy-route").await;
assert_eq!(
status_after,
Some(RouteStatus::Started),
"Lazy route should be Started after direct start_route call"
);
tokio::time::sleep(Duration::from_millis(250)).await;
h.stop().await;
let endpoint = h.mock().get_endpoint("direct-result").unwrap();
let exchanges = endpoint.get_received_exchanges().await;
assert!(
exchanges.len() >= 3,
"Route should have processed at least 3 exchanges after being started, got {}",
exchanges.len()
);
}
#[tokio::test]
async fn controlbus_stops_route() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.with_component(ControlBusComponent::new())
.build()
.await;
let auto_route = RouteBuilder::from("timer:auto?period=100&repeatCount=10")
.route_id("auto-route")
.auto_startup(true)
.to("mock:auto-result")
.build()
.unwrap();
let trigger_route = RouteBuilder::from("timer:stop-trigger?period=50&repeatCount=1")
.route_id("stop-trigger-route")
.to("controlbus:route?routeId=auto-route&action=stop")
.to("mock:stop-done")
.build()
.unwrap();
h.add_route(auto_route).await.unwrap();
h.add_route(trigger_route).await.unwrap();
h.start().await;
let auto_status = route_status(&h, "auto-route").await;
assert_eq!(
auto_status,
Some(RouteStatus::Started),
"Auto route should be Started"
);
tokio::time::sleep(Duration::from_millis(150)).await;
let status_after = route_status(&h, "auto-route").await;
assert_eq!(
status_after,
Some(RouteStatus::Stopped),
"Auto route should be Stopped after ControlBus stop action"
);
h.stop().await;
let stop_endpoint = h.mock().get_endpoint("stop-done").unwrap();
stop_endpoint.assert_exchange_count(1).await;
}
#[tokio::test]
async fn startup_order_respected() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route1 = RouteBuilder::from("timer:order1?period=50&repeatCount=1")
.route_id("route-order-1")
.startup_order(10)
.to("mock:order1")
.build()
.unwrap();
let route2 = RouteBuilder::from("timer:order2?period=50&repeatCount=1")
.route_id("route-order-2")
.startup_order(5) .to("mock:order2")
.build()
.unwrap();
let route3 = RouteBuilder::from("timer:order3?period=50&repeatCount=1")
.route_id("route-order-3")
.startup_order(20)
.to("mock:order3")
.build()
.unwrap();
h.add_route(route1).await.unwrap();
h.add_route(route2).await.unwrap();
h.add_route(route3).await.unwrap();
h.start().await;
assert_eq!(
route_status(&h, "route-order-1").await,
Some(RouteStatus::Started)
);
assert_eq!(
route_status(&h, "route-order-2").await,
Some(RouteStatus::Started)
);
assert_eq!(
route_status(&h, "route-order-3").await,
Some(RouteStatus::Started)
);
tokio::time::sleep(Duration::from_millis(200)).await;
h.stop().await;
let ep1 = h.mock().get_endpoint("order1").unwrap();
let ep2 = h.mock().get_endpoint("order2").unwrap();
let ep3 = h.mock().get_endpoint("order3").unwrap();
ep1.assert_exchange_count(1).await;
ep2.assert_exchange_count(1).await;
ep3.assert_exchange_count(1).await;
}
#[tokio::test]
async fn mixed_autostartup_routes() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let auto_route = RouteBuilder::from("timer:auto-start?period=50&repeatCount=2")
.route_id("auto-start-route")
.auto_startup(true)
.to("mock:auto")
.build()
.unwrap();
let lazy_route = RouteBuilder::from("timer:lazy-start?period=50&repeatCount=2")
.route_id("lazy-start-route")
.auto_startup(false)
.to("mock:lazy")
.build()
.unwrap();
h.add_route(auto_route).await.unwrap();
h.add_route(lazy_route).await.unwrap();
h.start().await;
assert_eq!(
route_status(&h, "auto-start-route").await,
Some(RouteStatus::Started)
);
assert_eq!(
route_status(&h, "lazy-start-route").await,
Some(RouteStatus::Stopped)
);
tokio::time::sleep(Duration::from_millis(200)).await;
let auto_ep = h.mock().get_endpoint("auto").unwrap();
auto_ep.assert_exchange_count(2).await;
if let Some(lazy_ep) = h.mock().get_endpoint("lazy") {
let exchanges = lazy_ep.get_received_exchanges().await;
assert_eq!(
exchanges.len(),
0,
"Lazy route should not have processed any exchanges"
);
}
h.stop().await;
}
#[tokio::test]
async fn suspend_changes_status_to_suspended() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:suspend-test?period=100&repeatCount=10")
.route_id("suspend-route")
.auto_startup(true)
.to("mock:suspend-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
assert_eq!(
route_status(&h, "suspend-route").await,
Some(RouteStatus::Started),
"Route should be Started initially"
);
suspend_route(&h, "suspend-route").await;
assert_eq!(
route_status(&h, "suspend-route").await,
Some(RouteStatus::Suspended),
"Route should be Suspended after suspend_route call"
);
h.stop().await;
}
#[tokio::test]
async fn resume_changes_status_to_started() {
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("timer:resume-test?period=100&repeatCount=10")
.route_id("resume-route")
.auto_startup(true)
.to("mock:resume-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
suspend_route(&h, "resume-route").await;
assert_eq!(
route_status(&h, "resume-route").await,
Some(RouteStatus::Suspended),
"Route should be Suspended"
);
resume_route(&h, "resume-route").await;
assert_eq!(
route_status(&h, "resume-route").await,
Some(RouteStatus::Started),
"Route should be Started after resume_route call"
);
h.stop().await;
}
#[tokio::test]
async fn suspend_drains_inflight_messages() {
const TIMER_PERIOD_MS: u64 = 50;
const INITIAL_FLOW_MS: u64 = 120;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from(&format!(
"timer:drain-test?period={TIMER_PERIOD_MS}&repeatCount=5"
))
.route_id("drain-route")
.auto_startup(true)
.to("mock:drain-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(INITIAL_FLOW_MS)).await;
let endpoint = h.mock().get_endpoint("drain-result").unwrap();
let count_before_suspend = endpoint.get_received_exchanges().await.len();
let suspend_start = std::time::Instant::now();
suspend_route(&h, "drain-route").await;
let suspend_duration = suspend_start.elapsed();
let count_after_suspend = endpoint.get_received_exchanges().await.len();
assert!(
count_after_suspend >= count_before_suspend,
"Suspend should drain in-flight messages. Before: {}, After: {}, Duration: {:?}",
count_before_suspend,
count_after_suspend,
suspend_duration
);
h.stop().await;
}
#[tokio::test]
async fn suspend_blocks_new_intake_until_resume() {
const TIMER_PERIOD_MS: u64 = 20; const INITIAL_FLOW_MS: u64 = 100; const SETTLING_MS: u64 = 50; const SUSPENDED_OBSERVATION_MS: u64 = 150; const POST_RESUME_FLOW_MS: u64 = 100;
let h = CamelTestContext::builder()
.with_timer()
.with_mock()
.build()
.await;
let route = RouteBuilder::from(&format!(
"timer:block-test?period={TIMER_PERIOD_MS}&repeatCount=50"
))
.route_id("block-route")
.auto_startup(true)
.to("mock:block-result")
.build()
.unwrap();
h.add_route(route).await.unwrap();
h.start().await;
tokio::time::sleep(Duration::from_millis(INITIAL_FLOW_MS)).await;
let endpoint = h.mock().get_endpoint("block-result").unwrap();
let count_before_suspend = endpoint.get_received_exchanges().await.len();
suspend_route(&h, "block-route").await;
tokio::time::sleep(Duration::from_millis(SETTLING_MS)).await;
let count_after_settling = endpoint.get_received_exchanges().await.len();
tokio::time::sleep(Duration::from_millis(SUSPENDED_OBSERVATION_MS)).await;
let count_while_suspended = endpoint.get_received_exchanges().await.len();
assert_eq!(
count_while_suspended, count_after_settling,
"After settling, no new messages should arrive while suspended. \
Before suspend: {}, After settling: {}, While suspended: {}",
count_before_suspend, count_after_settling, count_while_suspended
);
resume_route(&h, "block-route").await;
tokio::time::sleep(Duration::from_millis(POST_RESUME_FLOW_MS)).await;
let count_after_resume = endpoint.get_received_exchanges().await.len();
assert!(
count_after_resume > count_while_suspended,
"Messages should flow again after resume. While suspended: {}, After resume: {}",
count_while_suspended,
count_after_resume
);
h.stop().await;
}