1use std::sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering},
6};
7
8use camel_api::CamelError;
9use camel_component_direct::DirectComponent;
10use camel_component_log::LogComponent;
11use camel_component_mock::MockComponent;
12use camel_component_timer::TimerComponent;
13use camel_core::CamelContext;
14use camel_core::route::RouteDefinition;
15use tokio::sync::Mutex;
16
17use crate::time::TimeController;
18
19pub struct NoTimeControl;
24pub struct WithTimeControl;
25
26type Registration = Box<dyn FnOnce(&mut CamelContext) + Send>;
31
32pub struct CamelTestContextBuilder<S = NoTimeControl> {
36 registrations: Vec<Registration>,
37 mock: MockComponent,
38 _state: std::marker::PhantomData<S>,
39}
40
41impl CamelTestContextBuilder<NoTimeControl> {
42 pub(crate) fn new() -> Self {
43 Self {
44 registrations: Vec::new(),
45 mock: MockComponent::new(),
46 _state: std::marker::PhantomData,
47 }
48 }
49
50 pub fn with_time_control(self) -> CamelTestContextBuilder<WithTimeControl> {
53 CamelTestContextBuilder {
54 registrations: self.registrations,
55 mock: self.mock,
56 _state: std::marker::PhantomData,
57 }
58 }
59
60 pub async fn build(self) -> CamelTestContext {
62 build_context(self.registrations, self.mock).await
63 }
64}
65
66impl CamelTestContextBuilder<WithTimeControl> {
67 pub async fn build(self) -> (CamelTestContext, TimeController) {
72 tokio::time::pause();
73 let ctx = build_context(self.registrations, self.mock).await;
74 (ctx, TimeController)
75 }
76}
77
78macro_rules! impl_builder_methods {
79 ($S:ty) => {
80 impl CamelTestContextBuilder<$S> {
81 pub fn with_mock(self) -> Self {
84 self
85 }
86
87 pub fn with_timer(mut self) -> Self {
89 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
90 ctx.register_component(TimerComponent::new());
91 }));
92 self
93 }
94
95 pub fn with_log(mut self) -> Self {
97 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
98 ctx.register_component(LogComponent::new());
99 }));
100 self
101 }
102
103 pub fn with_direct(mut self) -> Self {
105 self.registrations.push(Box::new(|ctx: &mut CamelContext| {
106 ctx.register_component(DirectComponent::new());
107 }));
108 self
109 }
110
111 pub fn with_component<C>(mut self, component: C) -> Self
113 where
114 C: camel_component_api::Component + 'static,
115 {
116 self.registrations
117 .push(Box::new(move |ctx: &mut CamelContext| {
118 ctx.register_component(component);
119 }));
120 self
121 }
122 }
123 };
124}
125
126impl_builder_methods!(NoTimeControl);
127impl_builder_methods!(WithTimeControl);
128
129async fn build_context(registrations: Vec<Registration>, mock: MockComponent) -> CamelTestContext {
134 let mut ctx = CamelContext::builder().build().await.unwrap(); ctx.register_component(mock.clone());
138
139 for register in registrations {
141 register(&mut ctx);
142 }
143
144 let ctx = Arc::new(Mutex::new(ctx));
145 let stopped = Arc::new(AtomicBool::new(false));
146
147 CamelTestContext {
148 ctx: ctx.clone(),
149 mock,
150 stopped: stopped.clone(),
151 _guard: TestGuard { ctx, stopped },
152 }
153}
154
155pub(crate) struct TestGuard {
160 ctx: Arc<Mutex<CamelContext>>,
161 stopped: Arc<AtomicBool>,
162}
163
164impl Drop for TestGuard {
165 fn drop(&mut self) {
166 if self.stopped.swap(true, Ordering::SeqCst) {
167 return;
169 }
170 let ctx = self.ctx.clone();
171 if let Ok(handle) = tokio::runtime::Handle::try_current() {
172 match handle.runtime_flavor() {
173 tokio::runtime::RuntimeFlavor::MultiThread => {
174 tokio::task::block_in_place(|| {
176 handle.block_on(async move {
177 let mut ctx = ctx.lock().await;
178 let _ = ctx.stop().await;
179 });
180 });
181 }
182 tokio::runtime::RuntimeFlavor::CurrentThread => {
183 handle.spawn(async move {
186 let mut ctx = ctx.lock().await;
187 let _ = ctx.stop().await;
188 });
189 }
190 _ => {
191 handle.spawn(async move {
192 let mut ctx = ctx.lock().await;
193 let _ = ctx.stop().await;
194 });
195 }
196 }
197 }
198 }
199}
200
201pub struct CamelTestContext {
227 ctx: Arc<Mutex<CamelContext>>,
228 mock: MockComponent,
229 stopped: Arc<AtomicBool>,
230 _guard: TestGuard,
231}
232
233impl CamelTestContext {
234 pub fn builder() -> CamelTestContextBuilder<NoTimeControl> {
236 CamelTestContextBuilder::new()
237 }
238
239 pub async fn add_route(&self, route: RouteDefinition) -> Result<(), CamelError> {
241 let ctx = self.ctx.lock().await;
242 ctx.add_route_definition(route).await
243 }
244
245 pub async fn start(&self) {
247 let mut ctx = self.ctx.lock().await;
248 ctx.start().await.expect("CamelTestContext: start failed"); }
250
251 pub async fn stop(&self) {
254 if self.stopped.swap(true, Ordering::SeqCst) {
255 return; }
257 let mut ctx = self.ctx.lock().await;
258 ctx.stop().await.expect("CamelTestContext: stop failed"); }
260
261 pub async fn shutdown(self) {
263 self.stop().await;
264 }
265
266 pub fn mock(&self) -> &MockComponent {
268 &self.mock
269 }
270
271 pub fn ctx(&self) -> &Arc<Mutex<CamelContext>> {
273 &self.ctx
274 }
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280 use camel_builder::{RouteBuilder, StepAccumulator};
281 use std::time::Duration;
282
283 #[tokio::test]
284 async fn builder_without_time_control_builds_context() {
285 let harness = CamelTestContext::builder()
286 .with_mock()
287 .with_timer()
288 .with_log()
289 .build()
290 .await;
291
292 assert!(harness.mock().get_endpoint("result").is_none());
293 let guard = harness.ctx().lock().await;
294 let _ = &*guard;
295 }
296
297 #[tokio::test]
298 async fn builder_with_time_control_builds_and_advances_clock() {
299 let (_harness, time) = CamelTestContext::builder()
300 .with_mock()
301 .with_timer()
302 .with_time_control()
303 .build()
304 .await;
305
306 time.advance(Duration::from_millis(1)).await;
307 time.resume();
308 }
309
310 #[tokio::test]
311 async fn stop_is_idempotent_and_shutdown_is_safe() {
312 let harness = CamelTestContext::builder().with_mock().build().await;
313 harness.stop().await;
314 harness.stop().await;
315 harness.shutdown().await;
316 }
317
318 #[tokio::test]
319 async fn add_route_returns_error_for_invalid_step_uri() {
320 let harness = CamelTestContext::builder().with_mock().build().await;
321
322 let route = RouteBuilder::from("direct:start")
323 .route_id("bad-route")
324 .to("not-a-uri")
325 .build()
326 .unwrap();
327
328 let err = harness.add_route(route).await.expect_err("must fail");
329 assert!(err.to_string().contains("Invalid") || err.to_string().contains("invalid"));
330 }
331
332 #[tokio::test]
333 async fn with_component_registers_custom_component() {
334 let harness = CamelTestContext::builder()
335 .with_component(camel_component_direct::DirectComponent::new())
336 .with_mock()
337 .build()
338 .await;
339
340 let route = RouteBuilder::from("direct:start")
341 .route_id("direct-route")
342 .to("mock:out")
343 .build()
344 .unwrap();
345
346 harness.add_route(route).await.unwrap();
347 harness.start().await;
348 harness.stop().await;
349
350 let _guard = harness.ctx().lock().await;
352 }
353
354 #[tokio::test]
357 async fn tst004_route_lifecycle_start_stop_restart() {
358 let harness = CamelTestContext::builder()
359 .with_direct()
360 .with_mock()
361 .build()
362 .await;
363
364 let route = RouteBuilder::from("direct:lifecycle")
365 .route_id("lifecycle-route")
366 .to("mock:lifecycle-out")
367 .build()
368 .unwrap();
369
370 harness.add_route(route).await.unwrap();
371
372 harness.start().await;
374
375 harness.stop().await;
377
378 {
380 let mut ctx = harness.ctx().lock().await;
381 ctx.start().await.expect("restart should succeed");
382 ctx.stop().await.expect("stop after restart should succeed");
383 }
384 }
385
386 #[tokio::test]
389 async fn tst005_concurrent_exchange_processing() {
390 use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
391 use camel_core::route::compose_pipeline;
392 use std::sync::Arc;
393 use std::sync::atomic::{AtomicU32, Ordering};
394 use tower::ServiceExt;
395
396 let counter = Arc::new(AtomicU32::new(0));
397 let processor: BoxProcessor = {
398 let c = Arc::clone(&counter);
399 BoxProcessor::from_fn(move |ex: Exchange| {
400 let c = Arc::clone(&c);
401 Box::pin(async move {
402 c.fetch_add(1, Ordering::Relaxed);
403 tokio::task::yield_now().await;
404 Ok(ex)
405 })
406 })
407 };
408
409 let pipeline = compose_pipeline(vec![processor]);
410
411 let concurrency: u32 = 10;
412 let mut handles = Vec::with_capacity(concurrency as usize);
413 for i in 0..concurrency {
414 let p = pipeline.clone();
415 handles.push(tokio::spawn(async move {
416 let ex = Exchange::new(Message::new(format!("msg-{i}")));
417 p.oneshot(ex).await.unwrap()
418 }));
419 }
420
421 for h in handles {
422 let _ = h.await.unwrap();
423 }
424
425 assert_eq!(counter.load(Ordering::Relaxed), concurrency);
426 }
427
428 #[tokio::test]
431 async fn tst006_error_handler_invoked_on_failure() {
432 use camel_api::error_handler::ExceptionPolicy;
433 use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
434 use camel_processor::ErrorHandlerService;
435 use std::sync::Arc;
436 use tower::ServiceExt;
437
438 let error_received = Arc::new(std::sync::Mutex::new(false));
439 let error_received_clone = Arc::clone(&error_received);
440
441 let handler: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
443 let r = Arc::clone(&error_received_clone);
444 Box::pin(async move {
445 *r.lock().unwrap() = true;
446 Ok(ex)
447 })
448 });
449
450 let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
452 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
453 });
454
455 let policy = ExceptionPolicy::new(|_| true);
456 let svc = ErrorHandlerService::new(failing, Some(handler), vec![(policy, None)]);
457 let ex = Exchange::new(Message::new("test"));
458 let result = svc.oneshot(ex).await;
459
460 assert!(result.is_ok(), "error handler should absorb the error");
461 assert!(
462 result.unwrap().has_error(),
463 "exchange should have error set"
464 );
465 assert!(
466 *error_received.lock().unwrap(),
467 "error handler processor should have been invoked"
468 );
469 }
470
471 #[tokio::test]
474 async fn tst007_dead_letter_channel_receives_failed_exchange() {
475 use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
476 use camel_processor::ErrorHandlerService;
477 use std::sync::Arc;
478 use tower::ServiceExt;
479
480 let dlc_received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
481 let dlc_received_clone = Arc::clone(&dlc_received);
482
483 let dlc: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
485 let r = Arc::clone(&dlc_received_clone);
486 Box::pin(async move {
487 r.lock().unwrap().push(ex.clone());
488 Ok(ex)
489 })
490 });
491
492 let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
493 Box::pin(async { Err(CamelError::ProcessorError("fail".into())) })
494 });
495
496 let svc = ErrorHandlerService::new(failing, Some(dlc), vec![]);
497 let ex = Exchange::new(Message::new("dlc-test"));
498 let result = svc.oneshot(ex).await;
499
500 assert!(result.is_ok());
501 let exchanges = dlc_received.lock().unwrap();
502 assert_eq!(
503 exchanges.len(),
504 1,
505 "DLC should have received exactly one exchange"
506 );
507 assert!(exchanges[0].has_error());
508 }
509
510 #[tokio::test]
513 async fn tst008_header_propagation_across_processors() {
514 use camel_api::{Body, BoxProcessor, BoxProcessorExt, Exchange, Message, Value};
515 use camel_core::route::compose_pipeline;
516 use tower::ServiceExt;
517
518 let step1: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
519 Box::pin(async move {
520 ex.input
521 .set_header("trace-id", Value::String("abc-123".into()));
522 Ok(ex)
523 })
524 });
525
526 let step2: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
527 Box::pin(async move {
528 ex.input.body = Body::Text("processed".to_string());
530 Ok(ex)
531 })
532 });
533
534 let pipeline = compose_pipeline(vec![step1, step2]);
535 let ex = Exchange::new(Message::new("input"));
536 let result = pipeline.oneshot(ex).await.unwrap();
537
538 assert_eq!(
539 result.input.header("trace-id"),
540 Some(&Value::String("abc-123".into())),
541 "header should survive across processors"
542 );
543 assert_eq!(result.input.body.as_text(), Some("processed"));
544 }
545
546 #[tokio::test]
549 async fn tst009_exchange_body_type_conversion() {
550 use camel_api::body::Body;
551 use camel_api::body_converter::{BodyType, convert};
552
553 let text_body = Body::Text("hello".to_string());
555 let bytes_body = convert(text_body, BodyType::Bytes).unwrap();
556 assert!(matches!(bytes_body, Body::Bytes(_)));
557 if let Body::Bytes(ref b) = bytes_body {
558 assert_eq!(b.as_ref(), b"hello");
559 }
560
561 let text_body_back = convert(bytes_body, BodyType::Text).unwrap();
563 assert!(matches!(text_body_back, Body::Text(_)));
564 assert_eq!(text_body_back.as_text(), Some("hello"));
565 }
566
567 #[tokio::test]
570 async fn tst010_multicast_delivers_to_multiple_endpoints() {
571 use camel_api::multicast::{MulticastConfig, MulticastStrategy};
572 use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
573 use camel_processor::MulticastService;
574 use std::sync::Arc;
575 use tower::ServiceExt;
576
577 let received_a = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
578 let received_b = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
579
580 let endpoint_a: BoxProcessor = {
581 let r = Arc::clone(&received_a);
582 BoxProcessor::from_fn(move |ex: Exchange| {
583 let r = Arc::clone(&r);
584 Box::pin(async move {
585 r.lock().unwrap().push(ex.clone());
586 Ok(ex)
587 })
588 })
589 };
590
591 let endpoint_b: BoxProcessor = {
592 let r = Arc::clone(&received_b);
593 BoxProcessor::from_fn(move |ex: Exchange| {
594 let r = Arc::clone(&r);
595 Box::pin(async move {
596 r.lock().unwrap().push(ex.clone());
597 Ok(ex)
598 })
599 })
600 };
601
602 let config = MulticastConfig::new().aggregation(MulticastStrategy::LastWins);
603
604 let svc = MulticastService::new(vec![endpoint_a, endpoint_b], config)
605 .expect("multicast service creation should succeed");
606 let ex = Exchange::new(Message::new("multicast-test"));
607 let _result = svc.oneshot(ex).await.unwrap();
608
609 assert_eq!(
610 received_a.lock().unwrap().len(),
611 1,
612 "endpoint A should receive exactly one exchange"
613 );
614 assert_eq!(
615 received_b.lock().unwrap().len(),
616 1,
617 "endpoint B should receive exactly one exchange"
618 );
619 }
620}