1use std::sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6};
7
8use camel_api::CamelError;
9use camel_component_direct::DirectComponent;
10use camel_component_log::LogComponent;
11use camel_component_mock::MockComponent;
12use camel_component_timer::TimerComponent;
13use camel_core::CamelContext;
14use camel_core::route::RouteDefinition;
15use tokio::sync::Mutex;
16
17use crate::time::TimeController;
18
19pub struct NoTimeControl;
24pub struct WithTimeControl;
25
26type Registration = Box<dyn FnOnce(&mut CamelContext) + Send>;
31
32pub struct CamelTestContextBuilder<S = NoTimeControl> {
36 registrations: Vec<Registration>,
37 mock: MockComponent,
38 _state: std::marker::PhantomData<S>,
39}
40
41impl CamelTestContextBuilder<NoTimeControl> {
42 pub(crate) fn new() -> Self {
43 Self {
44 registrations: Vec::new(),
45 mock: MockComponent::new(),
46 _state: std::marker::PhantomData,
47 }
48 }
49
50 pub fn with_time_control(self) -> CamelTestContextBuilder<WithTimeControl> {
53 CamelTestContextBuilder {
54 registrations: self.registrations,
55 mock: self.mock,
56 _state: std::marker::PhantomData,
57 }
58 }
59
60 pub async fn build(self) -> CamelTestContext {
62 build_context(self.registrations, self.mock).await
63 }
64}
65
66impl CamelTestContextBuilder<WithTimeControl> {
67 pub async fn build(self) -> (CamelTestContext, TimeController) {
72 tokio::time::pause();
73 let ctx = build_context(self.registrations, self.mock).await;
74 (ctx, TimeController)
75 }
76}
77
78macro_rules! impl_builder_methods {
79 ($S:ty) => {
80 impl CamelTestContextBuilder<$S> {
81 pub fn with_mock(self) -> Self {
84 self
85 }
86
87 pub fn with_timer(mut self) -> Self {
89 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
90 ctx.register_component(TimerComponent::new());
91 }));
92 self
93 }
94
95 pub fn with_log(mut self) -> Self {
97 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
98 ctx.register_component(LogComponent::new());
99 }));
100 self
101 }
102
103 pub fn with_direct(mut self) -> Self {
105 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
106 ctx.register_component(DirectComponent::new());
107 }));
108 self
109 }
110
111 pub fn with_seda(mut self) -> Self {
113 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
114 ctx.register_component(camel_component_seda::SedaComponent::new());
115 }));
116 self
117 }
118
119 pub fn with_component<C>(mut self, component: C) -> Self
121 where
122 C: camel_component_api::Component + 'static,
123 {
124 self.registrations
125 .push(Box::new(move |ctx: &mut CamelContext| {
126 ctx.register_component(component);
127 }));
128 self
129 }
130 }
131 };
132}
133
134impl_builder_methods!(NoTimeControl);
135impl_builder_methods!(WithTimeControl);
136
137async fn build_context(registrations: Vec<Registration>, mock: MockComponent) -> CamelTestContext {
142 let mut ctx = CamelContext::builder().build().await.unwrap(); ctx.register_component(mock.clone());
146
147 for register in registrations {
149 register(&mut ctx);
150 }
151
152 let ctx = Arc::new(Mutex::new(ctx));
153 let stopped = Arc::new(AtomicBool::new(false));
154
155 CamelTestContext {
156 ctx: ctx.clone(),
157 mock,
158 stopped: stopped.clone(),
159 _guard: TestGuard { ctx, stopped },
160 }
161}
162
163pub(crate) struct TestGuard {
168 ctx: Arc<Mutex<CamelContext>>,
169 stopped: Arc<AtomicBool>,
170}
171
172impl Drop for TestGuard {
173 fn drop(&mut self) {
174 if self.stopped.swap(true, Ordering::SeqCst) {
175 return;
177 }
178 let ctx = self.ctx.clone();
179 if let Ok(handle) = tokio::runtime::Handle::try_current() {
180 match handle.runtime_flavor() {
181 tokio::runtime::RuntimeFlavor::MultiThread => {
182 tokio::task::block_in_place(|| {
184 handle.block_on(async move {
185 let mut ctx = ctx.lock().await;
186 let _ = ctx.stop().await;
187 });
188 });
189 }
190 tokio::runtime::RuntimeFlavor::CurrentThread => {
191 handle.spawn(async move {
194 let mut ctx = ctx.lock().await;
195 let _ = ctx.stop().await;
196 });
197 }
198 _ => {
199 handle.spawn(async move {
200 let mut ctx = ctx.lock().await;
201 let _ = ctx.stop().await;
202 });
203 }
204 }
205 }
206 }
207}
208
209pub struct CamelTestContext {
235 ctx: Arc<Mutex<CamelContext>>,
236 mock: MockComponent,
237 stopped: Arc<AtomicBool>,
238 _guard: TestGuard,
239}
240
241impl CamelTestContext {
242 pub fn builder() -> CamelTestContextBuilder<NoTimeControl> {
244 CamelTestContextBuilder::new()
245 }
246
247 pub async fn add_route(&self, route: RouteDefinition) -> Result<(), CamelError> {
249 let ctx = self.ctx.lock().await;
250 ctx.add_route_definition(route).await
251 }
252
253 pub async fn start(&self) {
255 let mut ctx = self.ctx.lock().await;
256 ctx.start().await.expect("CamelTestContext: start failed"); }
258
259 pub async fn stop(&self) {
262 if self.stopped.swap(true, Ordering::SeqCst) {
263 return; }
265 let mut ctx = self.ctx.lock().await;
266 ctx.stop().await.expect("CamelTestContext: stop failed"); }
268
269 pub async fn shutdown(self) {
271 self.stop().await;
272 }
273
274 pub fn mock(&self) -> &MockComponent {
276 &self.mock
277 }
278
279 pub fn ctx(&self) -> &Arc<Mutex<CamelContext>> {
281 &self.ctx
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288 use camel_builder::{RouteBuilder, StepAccumulator};
289 use std::time::Duration;
290
291 #[tokio::test]
292 async fn builder_without_time_control_builds_context() {
293 let harness = CamelTestContext::builder()
294 .with_mock()
295 .with_timer()
296 .with_log()
297 .build()
298 .await;
299
300 assert!(harness.mock().get_endpoint("result").is_none());
301 let guard = harness.ctx().lock().await;
302 let _ = &*guard;
303 }
304
305 #[tokio::test]
306 async fn builder_with_time_control_builds_and_advances_clock() {
307 let (_harness, time) = CamelTestContext::builder()
308 .with_mock()
309 .with_timer()
310 .with_time_control()
311 .build()
312 .await;
313
314 time.advance(Duration::from_millis(1)).await;
315 time.resume();
316 }
317
318 #[tokio::test]
319 async fn stop_is_idempotent_and_shutdown_is_safe() {
320 let harness = CamelTestContext::builder().with_mock().build().await;
321 harness.stop().await;
322 harness.stop().await;
323 harness.shutdown().await;
324 }
325
326 #[tokio::test]
327 async fn add_route_returns_error_for_invalid_step_uri() {
328 let harness = CamelTestContext::builder().with_mock().build().await;
329
330 let route = RouteBuilder::from("direct:start")
331 .route_id("bad-route")
332 .to("not-a-uri")
333 .build()
334 .unwrap();
335
336 let err = harness.add_route(route).await.expect_err("must fail");
337 assert!(err.to_string().contains("Invalid") || err.to_string().contains("invalid"));
338 }
339
340 #[tokio::test]
341 async fn with_component_registers_custom_component() {
342 let harness = CamelTestContext::builder()
343 .with_component(camel_component_direct::DirectComponent::new())
344 .with_mock()
345 .build()
346 .await;
347
348 let route = RouteBuilder::from("direct:start")
349 .route_id("direct-route")
350 .to("mock:out")
351 .build()
352 .unwrap();
353
354 harness.add_route(route).await.unwrap();
355 harness.start().await;
356 harness.stop().await;
357
358 let _guard = harness.ctx().lock().await;
360 }
361
362 #[tokio::test]
365 async fn tst004_route_lifecycle_start_stop_restart() {
366 let harness = CamelTestContext::builder()
367 .with_direct()
368 .with_mock()
369 .build()
370 .await;
371
372 let route = RouteBuilder::from("direct:lifecycle")
373 .route_id("lifecycle-route")
374 .to("mock:lifecycle-out")
375 .build()
376 .unwrap();
377
378 harness.add_route(route).await.unwrap();
379
380 harness.start().await;
382
383 harness.stop().await;
385
386 {
388 let mut ctx = harness.ctx().lock().await;
389 ctx.start().await.expect("restart should succeed");
390 ctx.stop().await.expect("stop after restart should succeed");
391 }
392 }
393
394 #[tokio::test]
397 async fn tst005_concurrent_exchange_processing() {
398 use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
399 use camel_core::route::compose_pipeline;
400 use std::sync::Arc;
401 use std::sync::atomic::{AtomicU32, Ordering};
402 use tower::ServiceExt;
403
404 let counter = Arc::new(AtomicU32::new(0));
405 let processor: BoxProcessor = {
406 let c = Arc::clone(&counter);
407 BoxProcessor::from_fn(move |ex: Exchange| {
408 let c = Arc::clone(&c);
409 Box::pin(async move {
410 c.fetch_add(1, Ordering::Relaxed);
411 tokio::task::yield_now().await;
412 Ok(ex)
413 })
414 })
415 };
416
417 let pipeline = compose_pipeline(vec![processor]);
418
419 let concurrency: u32 = 10;
420 let mut handles = Vec::with_capacity(concurrency as usize);
421 for i in 0..concurrency {
422 let p = pipeline.clone();
423 handles.push(tokio::spawn(async move {
424 let ex = Exchange::new(Message::new(format!("msg-{i}")));
425 p.oneshot(ex).await.unwrap()
426 }));
427 }
428
429 for h in handles {
430 let _ = h.await.unwrap();
431 }
432
433 assert_eq!(counter.load(Ordering::Relaxed), concurrency);
434 }
435
436 #[tokio::test]
439 async fn tst006_error_handler_invoked_on_failure() {
440 use camel_api::error_handler::ExceptionPolicy;
441 use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
442 use camel_processor::ErrorHandlerService;
443 use std::sync::Arc;
444 use tower::ServiceExt;
445
446 let error_received = Arc::new(std::sync::Mutex::new(false));
447 let error_received_clone = Arc::clone(&error_received);
448
449 let handler: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
451 let r = Arc::clone(&error_received_clone);
452 Box::pin(async move {
453 *r.lock().unwrap() = true;
454 Ok(ex)
455 })
456 });
457
458 let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
460 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
461 });
462
463 let policy = ExceptionPolicy::new(|_| true);
464 let svc = ErrorHandlerService::new(failing, Some(handler), vec![(policy, None)]);
465 let ex = Exchange::new(Message::new("test"));
466 let result = svc.oneshot(ex).await;
467
468 assert!(result.is_ok(), "error handler should absorb the error");
469 assert!(
470 result.unwrap().has_error(),
471 "exchange should have error set"
472 );
473 assert!(
474 *error_received.lock().unwrap(),
475 "error handler processor should have been invoked"
476 );
477 }
478
479 #[tokio::test]
482 async fn tst007_dead_letter_channel_receives_failed_exchange() {
483 use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
484 use camel_processor::ErrorHandlerService;
485 use std::sync::Arc;
486 use tower::ServiceExt;
487
488 let dlc_received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
489 let dlc_received_clone = Arc::clone(&dlc_received);
490
491 let dlc: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
493 let r = Arc::clone(&dlc_received_clone);
494 Box::pin(async move {
495 r.lock().unwrap().push(ex.clone());
496 Ok(ex)
497 })
498 });
499
500 let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
501 Box::pin(async { Err(CamelError::ProcessorError("fail".into())) })
502 });
503
504 let svc = ErrorHandlerService::new(failing, Some(dlc), vec![]);
505 let ex = Exchange::new(Message::new("dlc-test"));
506 let result = svc.oneshot(ex).await;
507
508 assert!(result.is_ok());
509 let exchanges = dlc_received.lock().unwrap();
510 assert_eq!(
511 exchanges.len(),
512 1,
513 "DLC should have received exactly one exchange"
514 );
515 assert!(exchanges[0].has_error());
516 }
517
518 #[tokio::test]
521 async fn tst008_header_propagation_across_processors() {
522 use camel_api::{Body, BoxProcessor, BoxProcessorExt, Exchange, Message, Value};
523 use camel_core::route::compose_pipeline;
524 use tower::ServiceExt;
525
526 let step1: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
527 Box::pin(async move {
528 ex.input
529 .set_header("trace-id", Value::String("abc-123".into()));
530 Ok(ex)
531 })
532 });
533
534 let step2: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
535 Box::pin(async move {
536 ex.input.body = Body::Text("processed".to_string());
538 Ok(ex)
539 })
540 });
541
542 let pipeline = compose_pipeline(vec![step1, step2]);
543 let ex = Exchange::new(Message::new("input"));
544 let result = pipeline.oneshot(ex).await.unwrap();
545
546 assert_eq!(
547 result.input.header("trace-id"),
548 Some(&Value::String("abc-123".into())),
549 "header should survive across processors"
550 );
551 assert_eq!(result.input.body.as_text(), Some("processed"));
552 }
553
554 #[tokio::test]
557 async fn tst009_exchange_body_type_conversion() {
558 use camel_api::body::Body;
559 use camel_api::body_converter::{BodyType, convert};
560
561 let text_body = Body::Text("hello".to_string());
563 let bytes_body = convert(text_body, BodyType::Bytes).unwrap();
564 assert!(matches!(bytes_body, Body::Bytes(_)));
565 if let Body::Bytes(ref b) = bytes_body {
566 assert_eq!(b.as_ref(), b"hello");
567 }
568
569 let text_body_back = convert(bytes_body, BodyType::Text).unwrap();
571 assert!(matches!(text_body_back, Body::Text(_)));
572 assert_eq!(text_body_back.as_text(), Some("hello"));
573 }
574
575 #[tokio::test]
578 async fn tst010_multicast_delivers_to_multiple_endpoints() {
579 use camel_api::multicast::{MulticastConfig, MulticastStrategy};
580 use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
581 use camel_processor::MulticastService;
582 use std::sync::Arc;
583 use tower::ServiceExt;
584
585 let received_a = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
586 let received_b = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
587
588 let endpoint_a: BoxProcessor = {
589 let r = Arc::clone(&received_a);
590 BoxProcessor::from_fn(move |ex: Exchange| {
591 let r = Arc::clone(&r);
592 Box::pin(async move {
593 r.lock().unwrap().push(ex.clone());
594 Ok(ex)
595 })
596 })
597 };
598
599 let endpoint_b: BoxProcessor = {
600 let r = Arc::clone(&received_b);
601 BoxProcessor::from_fn(move |ex: Exchange| {
602 let r = Arc::clone(&r);
603 Box::pin(async move {
604 r.lock().unwrap().push(ex.clone());
605 Ok(ex)
606 })
607 })
608 };
609
610 let config = MulticastConfig::new().aggregation(MulticastStrategy::LastWins);
611
612 let svc = MulticastService::new(vec![endpoint_a, endpoint_b], config)
613 .expect("multicast service creation should succeed");
614 let ex = Exchange::new(Message::new("multicast-test"));
615 let _result = svc.oneshot(ex).await.unwrap();
616
617 assert_eq!(
618 received_a.lock().unwrap().len(),
619 1,
620 "endpoint A should receive exactly one exchange"
621 );
622 assert_eq!(
623 received_b.lock().unwrap().len(),
624 1,
625 "endpoint B should receive exactly one exchange"
626 );
627 }
628}