use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use camel_api::CamelError;
use camel_component_direct::DirectComponent;
use camel_component_log::LogComponent;
use camel_component_mock::MockComponent;
use camel_component_timer::TimerComponent;
use camel_core::CamelContext;
use camel_core::route::RouteDefinition;
use tokio::sync::Mutex;
use crate::time::TimeController;
pub struct NoTimeControl;
pub struct WithTimeControl;
type Registration = Box<dyn FnOnce(&mut CamelContext) + Send>;
pub struct CamelTestContextBuilder<S = NoTimeControl> {
registrations: Vec<Registration>,
mock: MockComponent,
_state: std::marker::PhantomData<S>,
}
impl CamelTestContextBuilder<NoTimeControl> {
pub(crate) fn new() -> Self {
Self {
registrations: Vec::new(),
mock: MockComponent::new(),
_state: std::marker::PhantomData,
}
}
pub fn with_time_control(self) -> CamelTestContextBuilder<WithTimeControl> {
CamelTestContextBuilder {
registrations: self.registrations,
mock: self.mock,
_state: std::marker::PhantomData,
}
}
pub async fn build(self) -> CamelTestContext {
build_context(self.registrations, self.mock).await
}
}
impl CamelTestContextBuilder<WithTimeControl> {
pub async fn build(self) -> (CamelTestContext, TimeController) {
tokio::time::pause();
let ctx = build_context(self.registrations, self.mock).await;
(ctx, TimeController)
}
}
macro_rules! impl_builder_methods {
($S:ty) => {
impl CamelTestContextBuilder<$S> {
pub fn with_mock(self) -> Self {
self
}
pub fn with_timer(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(TimerComponent::new());
}));
self
}
pub fn with_log(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(LogComponent::new());
}));
self
}
pub fn with_direct(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(DirectComponent::new());
}));
self
}
pub fn with_seda(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(camel_component_seda::SedaComponent::new());
}));
self
}
pub fn with_component<C>(mut self, component: C) -> Self
where
C: camel_component_api::Component + 'static,
{
self.registrations
.push(Box::new(move |ctx: &mut CamelContext| {
ctx.register_component(component);
}));
self
}
}
};
}
impl_builder_methods!(NoTimeControl);
impl_builder_methods!(WithTimeControl);
async fn build_context(registrations: Vec<Registration>, mock: MockComponent) -> CamelTestContext {
let mut ctx = CamelContext::builder().build().await.unwrap();
ctx.register_component(mock.clone());
for register in registrations {
register(&mut ctx);
}
let ctx = Arc::new(Mutex::new(ctx));
let stopped = Arc::new(AtomicBool::new(false));
CamelTestContext {
ctx: ctx.clone(),
mock,
stopped: stopped.clone(),
_guard: TestGuard { ctx, stopped },
}
}
pub(crate) struct TestGuard {
ctx: Arc<Mutex<CamelContext>>,
stopped: Arc<AtomicBool>,
}
impl Drop for TestGuard {
fn drop(&mut self) {
if self.stopped.swap(true, Ordering::SeqCst) {
return;
}
let ctx = self.ctx.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(|| {
handle.block_on(async move {
let mut ctx = ctx.lock().await;
let _ = ctx.stop().await;
});
});
}
tokio::runtime::RuntimeFlavor::CurrentThread => {
handle.spawn(async move {
let mut ctx = ctx.lock().await;
let _ = ctx.stop().await;
});
}
_ => {
handle.spawn(async move {
let mut ctx = ctx.lock().await;
let _ = ctx.stop().await;
});
}
}
}
}
}
pub struct CamelTestContext {
ctx: Arc<Mutex<CamelContext>>,
mock: MockComponent,
stopped: Arc<AtomicBool>,
_guard: TestGuard,
}
impl CamelTestContext {
pub fn builder() -> CamelTestContextBuilder<NoTimeControl> {
CamelTestContextBuilder::new()
}
pub async fn add_route(&self, route: RouteDefinition) -> Result<(), CamelError> {
let ctx = self.ctx.lock().await;
ctx.add_route_definition(route).await
}
pub async fn start(&self) {
let mut ctx = self.ctx.lock().await;
ctx.start().await.expect("CamelTestContext: start failed"); }
pub async fn stop(&self) {
if self.stopped.swap(true, Ordering::SeqCst) {
return; }
let mut ctx = self.ctx.lock().await;
ctx.stop().await.expect("CamelTestContext: stop failed"); }
pub async fn shutdown(self) {
self.stop().await;
}
pub fn mock(&self) -> &MockComponent {
&self.mock
}
pub fn ctx(&self) -> &Arc<Mutex<CamelContext>> {
&self.ctx
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_builder::{RouteBuilder, StepAccumulator};
use std::time::Duration;
#[tokio::test]
async fn builder_without_time_control_builds_context() {
let harness = CamelTestContext::builder()
.with_mock()
.with_timer()
.with_log()
.build()
.await;
assert!(harness.mock().get_endpoint("result").is_none());
let guard = harness.ctx().lock().await;
let _ = &*guard;
}
#[tokio::test]
async fn builder_with_time_control_builds_and_advances_clock() {
let (_harness, time) = CamelTestContext::builder()
.with_mock()
.with_timer()
.with_time_control()
.build()
.await;
time.advance(Duration::from_millis(1)).await;
time.resume();
}
#[tokio::test]
async fn stop_is_idempotent_and_shutdown_is_safe() {
let harness = CamelTestContext::builder().with_mock().build().await;
harness.stop().await;
harness.stop().await;
harness.shutdown().await;
}
#[tokio::test]
async fn add_route_returns_error_for_invalid_step_uri() {
let harness = CamelTestContext::builder().with_mock().build().await;
let route = RouteBuilder::from("direct:start")
.route_id("bad-route")
.to("not-a-uri")
.build()
.unwrap();
let err = harness.add_route(route).await.expect_err("must fail");
assert!(err.to_string().contains("Invalid") || err.to_string().contains("invalid"));
}
#[tokio::test]
async fn with_component_registers_custom_component() {
let harness = CamelTestContext::builder()
.with_component(camel_component_direct::DirectComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("direct:start")
.route_id("direct-route")
.to("mock:out")
.build()
.unwrap();
harness.add_route(route).await.unwrap();
harness.start().await;
harness.stop().await;
let _guard = harness.ctx().lock().await;
}
#[tokio::test]
async fn tst004_route_lifecycle_start_stop_restart() {
let harness = CamelTestContext::builder()
.with_direct()
.with_mock()
.build()
.await;
let route = RouteBuilder::from("direct:lifecycle")
.route_id("lifecycle-route")
.to("mock:lifecycle-out")
.build()
.unwrap();
harness.add_route(route).await.unwrap();
harness.start().await;
harness.stop().await;
{
let mut ctx = harness.ctx().lock().await;
ctx.start().await.expect("restart should succeed");
ctx.stop().await.expect("stop after restart should succeed");
}
}
#[tokio::test]
async fn tst005_concurrent_exchange_processing() {
use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
use camel_core::route::compose_pipeline;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tower::ServiceExt;
let counter = Arc::new(AtomicU32::new(0));
let processor: BoxProcessor = {
let c = Arc::clone(&counter);
BoxProcessor::from_fn(move |ex: Exchange| {
let c = Arc::clone(&c);
Box::pin(async move {
c.fetch_add(1, Ordering::Relaxed);
tokio::task::yield_now().await;
Ok(ex)
})
})
};
let pipeline = compose_pipeline(vec![processor]);
let concurrency: u32 = 10;
let mut handles = Vec::with_capacity(concurrency as usize);
for i in 0..concurrency {
let p = pipeline.clone();
handles.push(tokio::spawn(async move {
let ex = Exchange::new(Message::new(format!("msg-{i}")));
p.oneshot(ex).await.unwrap()
}));
}
for h in handles {
let _ = h.await.unwrap();
}
assert_eq!(counter.load(Ordering::Relaxed), concurrency);
}
#[tokio::test]
async fn tst006_error_handler_invoked_on_failure() {
use camel_api::error_handler::ExceptionPolicy;
use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
use camel_processor::ErrorHandlerService;
use std::sync::Arc;
use tower::ServiceExt;
let error_received = Arc::new(std::sync::Mutex::new(false));
let error_received_clone = Arc::clone(&error_received);
let handler: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
let r = Arc::clone(&error_received_clone);
Box::pin(async move {
*r.lock().unwrap() = true;
Ok(ex)
})
});
let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
});
let policy = ExceptionPolicy::new(|_| true);
let svc = ErrorHandlerService::new(failing, Some(handler), vec![(policy, None)]);
let ex = Exchange::new(Message::new("test"));
let result = svc.oneshot(ex).await;
assert!(result.is_ok(), "error handler should absorb the error");
assert!(
result.unwrap().has_error(),
"exchange should have error set"
);
assert!(
*error_received.lock().unwrap(),
"error handler processor should have been invoked"
);
}
#[tokio::test]
async fn tst007_dead_letter_channel_receives_failed_exchange() {
use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
use camel_processor::ErrorHandlerService;
use std::sync::Arc;
use tower::ServiceExt;
let dlc_received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
let dlc_received_clone = Arc::clone(&dlc_received);
let dlc: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
let r = Arc::clone(&dlc_received_clone);
Box::pin(async move {
r.lock().unwrap().push(ex.clone());
Ok(ex)
})
});
let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
Box::pin(async { Err(CamelError::ProcessorError("fail".into())) })
});
let svc = ErrorHandlerService::new(failing, Some(dlc), vec![]);
let ex = Exchange::new(Message::new("dlc-test"));
let result = svc.oneshot(ex).await;
assert!(result.is_ok());
let exchanges = dlc_received.lock().unwrap();
assert_eq!(
exchanges.len(),
1,
"DLC should have received exactly one exchange"
);
assert!(exchanges[0].has_error());
}
#[tokio::test]
async fn tst008_header_propagation_across_processors() {
use camel_api::{Body, BoxProcessor, BoxProcessorExt, Exchange, Message, Value};
use camel_core::route::compose_pipeline;
use tower::ServiceExt;
let step1: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
ex.input
.set_header("trace-id", Value::String("abc-123".into()));
Ok(ex)
})
});
let step2: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
Box::pin(async move {
ex.input.body = Body::Text("processed".to_string());
Ok(ex)
})
});
let pipeline = compose_pipeline(vec![step1, step2]);
let ex = Exchange::new(Message::new("input"));
let result = pipeline.oneshot(ex).await.unwrap();
assert_eq!(
result.input.header("trace-id"),
Some(&Value::String("abc-123".into())),
"header should survive across processors"
);
assert_eq!(result.input.body.as_text(), Some("processed"));
}
#[tokio::test]
async fn tst009_exchange_body_type_conversion() {
use camel_api::body::Body;
use camel_api::body_converter::{BodyType, convert};
let text_body = Body::Text("hello".to_string());
let bytes_body = convert(text_body, BodyType::Bytes).unwrap();
assert!(matches!(bytes_body, Body::Bytes(_)));
if let Body::Bytes(ref b) = bytes_body {
assert_eq!(b.as_ref(), b"hello");
}
let text_body_back = convert(bytes_body, BodyType::Text).unwrap();
assert!(matches!(text_body_back, Body::Text(_)));
assert_eq!(text_body_back.as_text(), Some("hello"));
}
#[tokio::test]
async fn tst010_multicast_delivers_to_multiple_endpoints() {
use camel_api::multicast::{MulticastConfig, MulticastStrategy};
use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
use camel_processor::MulticastService;
use std::sync::Arc;
use tower::ServiceExt;
let received_a = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
let received_b = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
let endpoint_a: BoxProcessor = {
let r = Arc::clone(&received_a);
BoxProcessor::from_fn(move |ex: Exchange| {
let r = Arc::clone(&r);
Box::pin(async move {
r.lock().unwrap().push(ex.clone());
Ok(ex)
})
})
};
let endpoint_b: BoxProcessor = {
let r = Arc::clone(&received_b);
BoxProcessor::from_fn(move |ex: Exchange| {
let r = Arc::clone(&r);
Box::pin(async move {
r.lock().unwrap().push(ex.clone());
Ok(ex)
})
})
};
let config = MulticastConfig::new().aggregation(MulticastStrategy::LastWins);
let svc = MulticastService::new(vec![endpoint_a, endpoint_b], config)
.expect("multicast service creation should succeed");
let ex = Exchange::new(Message::new("multicast-test"));
let _result = svc.oneshot(ex).await.unwrap();
assert_eq!(
received_a.lock().unwrap().len(),
1,
"endpoint A should receive exactly one exchange"
);
assert_eq!(
received_b.lock().unwrap().len(),
1,
"endpoint B should receive exactly one exchange"
);
}
}