use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use camel_api::CamelError;
use camel_component_direct::DirectComponent;
use camel_component_log::LogComponent;
use camel_component_mock::MockComponent;
use camel_component_timer::TimerComponent;
use camel_core::CamelContext;
use camel_core::route::RouteDefinition;
use tokio::sync::Mutex;
use crate::time::TimeController;
pub struct NoTimeControl;
pub struct WithTimeControl;
type Registration = Box<dyn FnOnce(&mut CamelContext) + Send>;
pub struct CamelTestContextBuilder<S = NoTimeControl> {
registrations: Vec<Registration>,
mock: MockComponent,
_state: std::marker::PhantomData<S>,
}
impl CamelTestContextBuilder<NoTimeControl> {
pub(crate) fn new() -> Self {
Self {
registrations: Vec::new(),
mock: MockComponent::new(),
_state: std::marker::PhantomData,
}
}
pub fn with_time_control(self) -> CamelTestContextBuilder<WithTimeControl> {
CamelTestContextBuilder {
registrations: self.registrations,
mock: self.mock,
_state: std::marker::PhantomData,
}
}
pub async fn build(self) -> CamelTestContext {
build_context(self.registrations, self.mock)
}
}
impl CamelTestContextBuilder<WithTimeControl> {
pub async fn build(self) -> (CamelTestContext, TimeController) {
tokio::time::pause();
let ctx = build_context(self.registrations, self.mock);
(ctx, TimeController)
}
}
macro_rules! impl_builder_methods {
($S:ty) => {
impl CamelTestContextBuilder<$S> {
pub fn with_mock(self) -> Self {
self
}
pub fn with_timer(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(TimerComponent::new());
}));
self
}
pub fn with_log(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(LogComponent::new());
}));
self
}
pub fn with_direct(mut self) -> Self {
self.registrations.push(Box::new(|ctx: &mut CamelContext| {
ctx.register_component(DirectComponent::new());
}));
self
}
pub fn with_component<C>(mut self, component: C) -> Self
where
C: camel_component::Component + 'static,
{
self.registrations
.push(Box::new(move |ctx: &mut CamelContext| {
ctx.register_component(component);
}));
self
}
}
};
}
impl_builder_methods!(NoTimeControl);
impl_builder_methods!(WithTimeControl);
fn build_context(registrations: Vec<Registration>, mock: MockComponent) -> CamelTestContext {
let mut ctx = CamelContext::new();
ctx.register_component(mock.clone());
for register in registrations {
register(&mut ctx);
}
let ctx = Arc::new(Mutex::new(ctx));
let stopped = Arc::new(AtomicBool::new(false));
CamelTestContext {
ctx: ctx.clone(),
mock,
stopped: stopped.clone(),
_guard: TestGuard { ctx, stopped },
}
}
pub(crate) struct TestGuard {
ctx: Arc<Mutex<CamelContext>>,
stopped: Arc<AtomicBool>,
}
impl Drop for TestGuard {
fn drop(&mut self) {
if self.stopped.swap(true, Ordering::SeqCst) {
return;
}
let ctx = self.ctx.clone();
if let Ok(handle) = tokio::runtime::Handle::try_current() {
match handle.runtime_flavor() {
tokio::runtime::RuntimeFlavor::MultiThread => {
tokio::task::block_in_place(|| {
handle.block_on(async move {
let mut ctx = ctx.lock().await;
let _ = ctx.stop().await;
});
});
}
tokio::runtime::RuntimeFlavor::CurrentThread => {
handle.spawn(async move {
let mut ctx = ctx.lock().await;
let _ = ctx.stop().await;
});
}
_ => {
handle.spawn(async move {
let mut ctx = ctx.lock().await;
let _ = ctx.stop().await;
});
}
}
}
}
}
pub struct CamelTestContext {
ctx: Arc<Mutex<CamelContext>>,
mock: MockComponent,
stopped: Arc<AtomicBool>,
_guard: TestGuard,
}
impl CamelTestContext {
pub fn builder() -> CamelTestContextBuilder<NoTimeControl> {
CamelTestContextBuilder::new()
}
pub async fn add_route(&self, route: RouteDefinition) -> Result<(), CamelError> {
let ctx = self.ctx.lock().await;
ctx.add_route_definition(route).await
}
pub async fn start(&self) {
let mut ctx = self.ctx.lock().await;
ctx.start().await.expect("CamelTestContext: start failed");
}
pub async fn stop(&self) {
if self.stopped.swap(true, Ordering::SeqCst) {
return; }
let mut ctx = self.ctx.lock().await;
ctx.stop().await.expect("CamelTestContext: stop failed");
}
pub async fn shutdown(self) {
self.stop().await;
}
pub fn mock(&self) -> &MockComponent {
&self.mock
}
pub fn ctx(&self) -> &Arc<Mutex<CamelContext>> {
&self.ctx
}
}
#[cfg(test)]
mod tests {
use super::*;
use camel_builder::{RouteBuilder, StepAccumulator};
use std::time::Duration;
#[tokio::test]
async fn builder_without_time_control_builds_context() {
let harness = CamelTestContext::builder()
.with_mock()
.with_timer()
.with_log()
.build()
.await;
assert!(harness.mock().get_endpoint("result").is_none());
let guard = harness.ctx().lock().await;
let _ = &*guard;
}
#[tokio::test]
async fn builder_with_time_control_builds_and_advances_clock() {
let (_harness, time) = CamelTestContext::builder()
.with_mock()
.with_timer()
.with_time_control()
.build()
.await;
time.advance(Duration::from_millis(1)).await;
time.resume();
}
#[tokio::test]
async fn stop_is_idempotent_and_shutdown_is_safe() {
let harness = CamelTestContext::builder().with_mock().build().await;
harness.stop().await;
harness.stop().await;
harness.shutdown().await;
}
#[tokio::test]
async fn add_route_returns_error_for_invalid_step_uri() {
let harness = CamelTestContext::builder().with_mock().build().await;
let route = RouteBuilder::from("direct:start")
.route_id("bad-route")
.to("not-a-uri")
.build()
.unwrap();
let err = harness.add_route(route).await.expect_err("must fail");
assert!(err.to_string().contains("Invalid") || err.to_string().contains("invalid"));
}
#[tokio::test]
async fn with_component_registers_custom_component() {
let harness = CamelTestContext::builder()
.with_component(camel_component_direct::DirectComponent::new())
.with_mock()
.build()
.await;
let route = RouteBuilder::from("direct:start")
.route_id("direct-route")
.to("mock:out")
.build()
.unwrap();
harness.add_route(route).await.unwrap();
harness.start().await;
harness.stop().await;
let _guard = harness.ctx().lock().await;
}
}