Skip to main content

camel_test/
harness.rs

1// crates/camel-test/src/harness.rs
2
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, Ordering},
6};
7
8use camel_api::CamelError;
9use camel_component_direct::DirectComponent;
10use camel_component_log::LogComponent;
11use camel_component_mock::MockComponent;
12use camel_component_timer::TimerComponent;
13use camel_core::CamelContext;
14use camel_core::route::RouteDefinition;
15use tokio::sync::Mutex;
16
17use crate::time::TimeController;
18
19// ---------------------------------------------------------------------------
20// Typestates
21// ---------------------------------------------------------------------------
22
23pub struct NoTimeControl;
24pub struct WithTimeControl;
25
26// ---------------------------------------------------------------------------
27// Builder
28// ---------------------------------------------------------------------------
29
30type Registration = Box<dyn FnOnce(&mut CamelContext) + Send>;
31
32/// Builder for [`CamelTestContext`].
33///
34/// Use [`CamelTestContext::builder()`] to obtain one.
35pub struct CamelTestContextBuilder<S = NoTimeControl> {
36    registrations: Vec<Registration>,
37    mock: MockComponent,
38    _state: std::marker::PhantomData<S>,
39}
40
41impl CamelTestContextBuilder<NoTimeControl> {
42    pub(crate) fn new() -> Self {
43        Self {
44            registrations: Vec::new(),
45            mock: MockComponent::new(),
46            _state: std::marker::PhantomData,
47        }
48    }
49
50    /// Activate tokio mock-time. `build()` will call `tokio::time::pause()`
51    /// and return a [`TimeController`] alongside the harness.
52    pub fn with_time_control(self) -> CamelTestContextBuilder<WithTimeControl> {
53        CamelTestContextBuilder {
54            registrations: self.registrations,
55            mock: self.mock,
56            _state: std::marker::PhantomData,
57        }
58    }
59
60    /// Build the harness without time control.
61    pub async fn build(self) -> CamelTestContext {
62        build_context(self.registrations, self.mock).await
63    }
64}
65
66impl CamelTestContextBuilder<WithTimeControl> {
67    /// Build the harness with time control.
68    ///
69    /// Calls `tokio::time::pause()` before returning. Use the returned
70    /// [`TimeController`] to advance the clock inside the test.
71    pub async fn build(self) -> (CamelTestContext, TimeController) {
72        tokio::time::pause();
73        let ctx = build_context(self.registrations, self.mock).await;
74        (ctx, TimeController)
75    }
76}
77
78macro_rules! impl_builder_methods {
79    ($S:ty) => {
80        impl CamelTestContextBuilder<$S> {
81            /// Include `MockComponent` explicitly (always registered; this is a
82            /// documentation signal for call sites).
83            pub fn with_mock(self) -> Self {
84                self
85            }
86
87            /// Register `TimerComponent`.
88            pub fn with_timer(mut self) -> Self {
89                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
90                    ctx.register_component(TimerComponent::new());
91                }));
92                self
93            }
94
95            /// Register `LogComponent`.
96            pub fn with_log(mut self) -> Self {
97                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
98                    ctx.register_component(LogComponent::new());
99                }));
100                self
101            }
102
103            /// Register `DirectComponent`.
104            pub fn with_direct(mut self) -> Self {
105                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
106                    ctx.register_component(DirectComponent::new());
107                }));
108                self
109            }
110
111            /// Register any component that implements the `Component` trait.
112            pub fn with_component<C>(mut self, component: C) -> Self
113            where
114                C: camel_component_api::Component + 'static,
115            {
116                self.registrations
117                    .push(Box::new(move |ctx: &mut CamelContext| {
118                        ctx.register_component(component);
119                    }));
120                self
121            }
122        }
123    };
124}
125
126impl_builder_methods!(NoTimeControl);
127impl_builder_methods!(WithTimeControl);
128
129// ---------------------------------------------------------------------------
130// Internal build helper
131// ---------------------------------------------------------------------------
132
133async fn build_context(registrations: Vec<Registration>, mock: MockComponent) -> CamelTestContext {
134    let mut ctx = CamelContext::builder().build().await.unwrap(); // allow-unwrap
135
136    // MockComponent is always registered.
137    ctx.register_component(mock.clone());
138
139    // Run caller-declared registrations.
140    for register in registrations {
141        register(&mut ctx);
142    }
143
144    let ctx = Arc::new(Mutex::new(ctx));
145    let stopped = Arc::new(AtomicBool::new(false));
146
147    CamelTestContext {
148        ctx: ctx.clone(),
149        mock,
150        stopped: stopped.clone(),
151        _guard: TestGuard { ctx, stopped },
152    }
153}
154
155// ---------------------------------------------------------------------------
156// TestGuard — automatic stop on drop
157// ---------------------------------------------------------------------------
158
159pub(crate) struct TestGuard {
160    ctx: Arc<Mutex<CamelContext>>,
161    stopped: Arc<AtomicBool>,
162}
163
164impl Drop for TestGuard {
165    fn drop(&mut self) {
166        if self.stopped.swap(true, Ordering::SeqCst) {
167            // Already stopped explicitly — nothing to do.
168            return;
169        }
170        let ctx = self.ctx.clone();
171        if let Ok(handle) = tokio::runtime::Handle::try_current() {
172            match handle.runtime_flavor() {
173                tokio::runtime::RuntimeFlavor::MultiThread => {
174                    // Deterministic cleanup when blocking is supported.
175                    tokio::task::block_in_place(|| {
176                        handle.block_on(async move {
177                            let mut ctx = ctx.lock().await;
178                            let _ = ctx.stop().await;
179                        });
180                    });
181                }
182                tokio::runtime::RuntimeFlavor::CurrentThread => {
183                    // Best effort fallback for current-thread runtimes where
184                    // blocking in Drop is not possible.
185                    handle.spawn(async move {
186                        let mut ctx = ctx.lock().await;
187                        let _ = ctx.stop().await;
188                    });
189                }
190                _ => {
191                    handle.spawn(async move {
192                        let mut ctx = ctx.lock().await;
193                        let _ = ctx.stop().await;
194                    });
195                }
196            }
197        }
198    }
199}
200
201// ---------------------------------------------------------------------------
202// CamelTestContext
203// ---------------------------------------------------------------------------
204
205/// Test harness that wraps [`CamelContext`] with teardown helpers,
206/// pre-registered components, and a shared [`MockComponent`] accessor.
207///
208/// # Example
209///
210/// ```no_run
211/// # use camel_test::CamelTestContext;
212/// # use std::time::Duration;
213/// #[tokio::test]
214/// async fn my_test() {
215///     let h = CamelTestContext::builder()
216///         .with_timer()
217///         .with_mock()
218///         .build()
219///         .await;
220///
221///     // add routes, start, assert…
222///     h.stop().await; // deterministic teardown
223///     // Drop also performs best-effort cleanup if omitted
224/// }
225/// ```
226pub struct CamelTestContext {
227    ctx: Arc<Mutex<CamelContext>>,
228    mock: MockComponent,
229    stopped: Arc<AtomicBool>,
230    _guard: TestGuard,
231}
232
233impl CamelTestContext {
234    /// Obtain a builder.
235    pub fn builder() -> CamelTestContextBuilder<NoTimeControl> {
236        CamelTestContextBuilder::new()
237    }
238
239    /// Add a route definition to the context.
240    pub async fn add_route(&self, route: RouteDefinition) -> Result<(), CamelError> {
241        let ctx = self.ctx.lock().await;
242        ctx.add_route_definition(route).await
243    }
244
245    /// Start all routes.
246    pub async fn start(&self) {
247        let mut ctx = self.ctx.lock().await;
248        ctx.start().await.expect("CamelTestContext: start failed"); // allow-unwrap
249    }
250
251    /// Stop all routes explicitly. Safe to call before the harness is dropped —
252    /// subsequent drop is a no-op.
253    pub async fn stop(&self) {
254        if self.stopped.swap(true, Ordering::SeqCst) {
255            return; // already stopped
256        }
257        let mut ctx = self.ctx.lock().await;
258        ctx.stop().await.expect("CamelTestContext: stop failed"); // allow-unwrap
259    }
260
261    /// Consume the harness and stop routes deterministically.
262    pub async fn shutdown(self) {
263        self.stop().await;
264    }
265
266    /// Access the shared mock component for assertions.
267    pub fn mock(&self) -> &MockComponent {
268        &self.mock
269    }
270
271    /// Escape hatch: access the underlying [`CamelContext`] directly.
272    pub fn ctx(&self) -> &Arc<Mutex<CamelContext>> {
273        &self.ctx
274    }
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280    use camel_builder::{RouteBuilder, StepAccumulator};
281    use std::time::Duration;
282
283    #[tokio::test]
284    async fn builder_without_time_control_builds_context() {
285        let harness = CamelTestContext::builder()
286            .with_mock()
287            .with_timer()
288            .with_log()
289            .build()
290            .await;
291
292        assert!(harness.mock().get_endpoint("result").is_none());
293        let guard = harness.ctx().lock().await;
294        let _ = &*guard;
295    }
296
297    #[tokio::test]
298    async fn builder_with_time_control_builds_and_advances_clock() {
299        let (_harness, time) = CamelTestContext::builder()
300            .with_mock()
301            .with_timer()
302            .with_time_control()
303            .build()
304            .await;
305
306        time.advance(Duration::from_millis(1)).await;
307        time.resume();
308    }
309
310    #[tokio::test]
311    async fn stop_is_idempotent_and_shutdown_is_safe() {
312        let harness = CamelTestContext::builder().with_mock().build().await;
313        harness.stop().await;
314        harness.stop().await;
315        harness.shutdown().await;
316    }
317
318    #[tokio::test]
319    async fn add_route_returns_error_for_invalid_step_uri() {
320        let harness = CamelTestContext::builder().with_mock().build().await;
321
322        let route = RouteBuilder::from("direct:start")
323            .route_id("bad-route")
324            .to("not-a-uri")
325            .build()
326            .unwrap();
327
328        let err = harness.add_route(route).await.expect_err("must fail");
329        assert!(err.to_string().contains("Invalid") || err.to_string().contains("invalid"));
330    }
331
332    #[tokio::test]
333    async fn with_component_registers_custom_component() {
334        let harness = CamelTestContext::builder()
335            .with_component(camel_component_direct::DirectComponent::new())
336            .with_mock()
337            .build()
338            .await;
339
340        let route = RouteBuilder::from("direct:start")
341            .route_id("direct-route")
342            .to("mock:out")
343            .build()
344            .unwrap();
345
346        harness.add_route(route).await.unwrap();
347        harness.start().await;
348        harness.stop().await;
349
350        // Harness context remains accessible after lifecycle.
351        let _guard = harness.ctx().lock().await;
352    }
353
354    // ── TST-004: Route lifecycle (start/stop/restart) ─────────────────────────
355
356    #[tokio::test]
357    async fn tst004_route_lifecycle_start_stop_restart() {
358        let harness = CamelTestContext::builder()
359            .with_direct()
360            .with_mock()
361            .build()
362            .await;
363
364        let route = RouteBuilder::from("direct:lifecycle")
365            .route_id("lifecycle-route")
366            .to("mock:lifecycle-out")
367            .build()
368            .unwrap();
369
370        harness.add_route(route).await.unwrap();
371
372        // Start the context — routes transition to Started.
373        harness.start().await;
374
375        // Stop the context — routes transition to Stopped.
376        harness.stop().await;
377
378        // Restart via the underlying CamelContext to verify start-after-stop works.
379        {
380            let mut ctx = harness.ctx().lock().await;
381            ctx.start().await.expect("restart should succeed");
382            ctx.stop().await.expect("stop after restart should succeed");
383        }
384    }
385
386    // ── TST-005: Concurrent exchange processing ───────────────────────────────
387
388    #[tokio::test]
389    async fn tst005_concurrent_exchange_processing() {
390        use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
391        use camel_core::route::compose_pipeline;
392        use std::sync::Arc;
393        use std::sync::atomic::{AtomicU32, Ordering};
394        use tower::ServiceExt;
395
396        let counter = Arc::new(AtomicU32::new(0));
397        let processor: BoxProcessor = {
398            let c = Arc::clone(&counter);
399            BoxProcessor::from_fn(move |ex: Exchange| {
400                let c = Arc::clone(&c);
401                Box::pin(async move {
402                    c.fetch_add(1, Ordering::Relaxed);
403                    tokio::task::yield_now().await;
404                    Ok(ex)
405                })
406            })
407        };
408
409        let pipeline = compose_pipeline(vec![processor]);
410
411        let concurrency: u32 = 10;
412        let mut handles = Vec::with_capacity(concurrency as usize);
413        for i in 0..concurrency {
414            let p = pipeline.clone();
415            handles.push(tokio::spawn(async move {
416                let ex = Exchange::new(Message::new(format!("msg-{i}")));
417                p.oneshot(ex).await.unwrap()
418            }));
419        }
420
421        for h in handles {
422            let _ = h.await.unwrap();
423        }
424
425        assert_eq!(counter.load(Ordering::Relaxed), concurrency);
426    }
427
428    // ── TST-006: Error handler invocation ─────────────────────────────────────
429
430    #[tokio::test]
431    async fn tst006_error_handler_invoked_on_failure() {
432        use camel_api::error_handler::ExceptionPolicy;
433        use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
434        use camel_processor::ErrorHandlerService;
435        use std::sync::Arc;
436        use tower::ServiceExt;
437
438        let error_received = Arc::new(std::sync::Mutex::new(false));
439        let error_received_clone = Arc::clone(&error_received);
440
441        // Handler processor that records it was called.
442        let handler: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
443            let r = Arc::clone(&error_received_clone);
444            Box::pin(async move {
445                *r.lock().unwrap() = true;
446                Ok(ex)
447            })
448        });
449
450        // Inner processor that always fails.
451        let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
452            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
453        });
454
455        let policy = ExceptionPolicy::new(|_| true);
456        let svc = ErrorHandlerService::new(failing, Some(handler), vec![(policy, None)]);
457        let ex = Exchange::new(Message::new("test"));
458        let result = svc.oneshot(ex).await;
459
460        assert!(result.is_ok(), "error handler should absorb the error");
461        assert!(
462            result.unwrap().has_error(),
463            "exchange should have error set"
464        );
465        assert!(
466            *error_received.lock().unwrap(),
467            "error handler processor should have been invoked"
468        );
469    }
470
471    // ── TST-007: Dead letter channel ──────────────────────────────────────────
472
473    #[tokio::test]
474    async fn tst007_dead_letter_channel_receives_failed_exchange() {
475        use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
476        use camel_processor::ErrorHandlerService;
477        use std::sync::Arc;
478        use tower::ServiceExt;
479
480        let dlc_received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
481        let dlc_received_clone = Arc::clone(&dlc_received);
482
483        // DLC processor that captures the exchange.
484        let dlc: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
485            let r = Arc::clone(&dlc_received_clone);
486            Box::pin(async move {
487                r.lock().unwrap().push(ex.clone());
488                Ok(ex)
489            })
490        });
491
492        let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
493            Box::pin(async { Err(CamelError::ProcessorError("fail".into())) })
494        });
495
496        let svc = ErrorHandlerService::new(failing, Some(dlc), vec![]);
497        let ex = Exchange::new(Message::new("dlc-test"));
498        let result = svc.oneshot(ex).await;
499
500        assert!(result.is_ok());
501        let exchanges = dlc_received.lock().unwrap();
502        assert_eq!(
503            exchanges.len(),
504            1,
505            "DLC should have received exactly one exchange"
506        );
507        assert!(exchanges[0].has_error());
508    }
509
510    // ── TST-008: Header propagation across processors ─────────────────────────
511
512    #[tokio::test]
513    async fn tst008_header_propagation_across_processors() {
514        use camel_api::{Body, BoxProcessor, BoxProcessorExt, Exchange, Message, Value};
515        use camel_core::route::compose_pipeline;
516        use tower::ServiceExt;
517
518        let step1: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
519            Box::pin(async move {
520                ex.input
521                    .set_header("trace-id", Value::String("abc-123".into()));
522                Ok(ex)
523            })
524        });
525
526        let step2: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
527            Box::pin(async move {
528                // Modify body but leave headers intact.
529                ex.input.body = Body::Text("processed".to_string());
530                Ok(ex)
531            })
532        });
533
534        let pipeline = compose_pipeline(vec![step1, step2]);
535        let ex = Exchange::new(Message::new("input"));
536        let result = pipeline.oneshot(ex).await.unwrap();
537
538        assert_eq!(
539            result.input.header("trace-id"),
540            Some(&Value::String("abc-123".into())),
541            "header should survive across processors"
542        );
543        assert_eq!(result.input.body.as_text(), Some("processed"));
544    }
545
546    // ── TST-009: Exchange body type conversion ────────────────────────────────
547
548    #[tokio::test]
549    async fn tst009_exchange_body_type_conversion() {
550        use camel_api::body::Body;
551        use camel_api::body_converter::{BodyType, convert};
552
553        // String → Bytes
554        let text_body = Body::Text("hello".to_string());
555        let bytes_body = convert(text_body, BodyType::Bytes).unwrap();
556        assert!(matches!(bytes_body, Body::Bytes(_)));
557        if let Body::Bytes(ref b) = bytes_body {
558            assert_eq!(b.as_ref(), b"hello");
559        }
560
561        // Bytes → String
562        let text_body_back = convert(bytes_body, BodyType::Text).unwrap();
563        assert!(matches!(text_body_back, Body::Text(_)));
564        assert_eq!(text_body_back.as_text(), Some("hello"));
565    }
566
567    // ── TST-010: Multicast EIP ────────────────────────────────────────────────
568
569    #[tokio::test]
570    async fn tst010_multicast_delivers_to_multiple_endpoints() {
571        use camel_api::multicast::{MulticastConfig, MulticastStrategy};
572        use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
573        use camel_processor::MulticastService;
574        use std::sync::Arc;
575        use tower::ServiceExt;
576
577        let received_a = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
578        let received_b = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
579
580        let endpoint_a: BoxProcessor = {
581            let r = Arc::clone(&received_a);
582            BoxProcessor::from_fn(move |ex: Exchange| {
583                let r = Arc::clone(&r);
584                Box::pin(async move {
585                    r.lock().unwrap().push(ex.clone());
586                    Ok(ex)
587                })
588            })
589        };
590
591        let endpoint_b: BoxProcessor = {
592            let r = Arc::clone(&received_b);
593            BoxProcessor::from_fn(move |ex: Exchange| {
594                let r = Arc::clone(&r);
595                Box::pin(async move {
596                    r.lock().unwrap().push(ex.clone());
597                    Ok(ex)
598                })
599            })
600        };
601
602        let config = MulticastConfig::new().aggregation(MulticastStrategy::LastWins);
603
604        let svc = MulticastService::new(vec![endpoint_a, endpoint_b], config)
605            .expect("multicast service creation should succeed");
606        let ex = Exchange::new(Message::new("multicast-test"));
607        let _result = svc.oneshot(ex).await.unwrap();
608
609        assert_eq!(
610            received_a.lock().unwrap().len(),
611            1,
612            "endpoint A should receive exactly one exchange"
613        );
614        assert_eq!(
615            received_b.lock().unwrap().len(),
616            1,
617            "endpoint B should receive exactly one exchange"
618        );
619    }
620}