Skip to main content

camel_test/
harness.rs

1// crates/camel-test/src/harness.rs
2
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, Ordering},
6};
7
8use camel_api::CamelError;
9use camel_component_direct::DirectComponent;
10use camel_component_log::LogComponent;
11use camel_component_mock::MockComponent;
12use camel_component_timer::TimerComponent;
13use camel_core::CamelContext;
14use camel_core::route::RouteDefinition;
15use tokio::sync::Mutex;
16
17use crate::time::TimeController;
18
19// ---------------------------------------------------------------------------
20// Typestates
21// ---------------------------------------------------------------------------
22
23pub struct NoTimeControl;
24pub struct WithTimeControl;
25
26// ---------------------------------------------------------------------------
27// Builder
28// ---------------------------------------------------------------------------
29
30type Registration = Box<dyn FnOnce(&mut CamelContext) + Send>;
31
32/// Builder for [`CamelTestContext`].
33///
34/// Use [`CamelTestContext::builder()`] to obtain one.
35pub struct CamelTestContextBuilder<S = NoTimeControl> {
36    registrations: Vec<Registration>,
37    mock: MockComponent,
38    _state: std::marker::PhantomData<S>,
39}
40
41impl CamelTestContextBuilder<NoTimeControl> {
42    pub(crate) fn new() -> Self {
43        Self {
44            registrations: Vec::new(),
45            mock: MockComponent::new(),
46            _state: std::marker::PhantomData,
47        }
48    }
49
50    /// Activate tokio mock-time. `build()` will call `tokio::time::pause()`
51    /// and return a [`TimeController`] alongside the harness.
52    pub fn with_time_control(self) -> CamelTestContextBuilder<WithTimeControl> {
53        CamelTestContextBuilder {
54            registrations: self.registrations,
55            mock: self.mock,
56            _state: std::marker::PhantomData,
57        }
58    }
59
60    /// Build the harness without time control.
61    pub async fn build(self) -> CamelTestContext {
62        build_context(self.registrations, self.mock).await
63    }
64}
65
66impl CamelTestContextBuilder<WithTimeControl> {
67    /// Build the harness with time control.
68    ///
69    /// Calls `tokio::time::pause()` before returning. Use the returned
70    /// [`TimeController`] to advance the clock inside the test.
71    pub async fn build(self) -> (CamelTestContext, TimeController) {
72        tokio::time::pause();
73        let ctx = build_context(self.registrations, self.mock).await;
74        (ctx, TimeController)
75    }
76}
77
78macro_rules! impl_builder_methods {
79    ($S:ty) => {
80        impl CamelTestContextBuilder<$S> {
81            /// Include `MockComponent` explicitly (always registered; this is a
82            /// documentation signal for call sites).
83            pub fn with_mock(self) -> Self {
84                self
85            }
86
87            /// Register `TimerComponent`.
88            pub fn with_timer(mut self) -> Self {
89                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
90                    ctx.register_component(TimerComponent::new());
91                }));
92                self
93            }
94
95            /// Register `LogComponent`.
96            pub fn with_log(mut self) -> Self {
97                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
98                    ctx.register_component(LogComponent::new());
99                }));
100                self
101            }
102
103            /// Register `DirectComponent`.
104            pub fn with_direct(mut self) -> Self {
105                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
106                    ctx.register_component(DirectComponent::new());
107                }));
108                self
109            }
110
111            /// Register `SedaComponent`.
112            pub fn with_seda(mut self) -> Self {
113                self.registrations.push(Box::new(|ctx: &mut CamelContext| {
114                    ctx.register_component(camel_component_seda::SedaComponent::new());
115                }));
116                self
117            }
118
119            /// Register any component that implements the `Component` trait.
120            pub fn with_component<C>(mut self, component: C) -> Self
121            where
122                C: camel_component_api::Component + 'static,
123            {
124                self.registrations
125                    .push(Box::new(move |ctx: &mut CamelContext| {
126                        ctx.register_component(component);
127                    }));
128                self
129            }
130        }
131    };
132}
133
134impl_builder_methods!(NoTimeControl);
135impl_builder_methods!(WithTimeControl);
136
137// ---------------------------------------------------------------------------
138// Internal build helper
139// ---------------------------------------------------------------------------
140
141async fn build_context(registrations: Vec<Registration>, mock: MockComponent) -> CamelTestContext {
142    let mut ctx = CamelContext::builder().build().await.unwrap(); // allow-unwrap
143
144    // MockComponent is always registered.
145    ctx.register_component(mock.clone());
146
147    // Run caller-declared registrations.
148    for register in registrations {
149        register(&mut ctx);
150    }
151
152    let ctx = Arc::new(Mutex::new(ctx));
153    let stopped = Arc::new(AtomicBool::new(false));
154
155    CamelTestContext {
156        ctx: ctx.clone(),
157        mock,
158        stopped: stopped.clone(),
159        _guard: TestGuard { ctx, stopped },
160    }
161}
162
163// ---------------------------------------------------------------------------
164// TestGuard — automatic stop on drop
165// ---------------------------------------------------------------------------
166
167pub(crate) struct TestGuard {
168    ctx: Arc<Mutex<CamelContext>>,
169    stopped: Arc<AtomicBool>,
170}
171
172impl Drop for TestGuard {
173    fn drop(&mut self) {
174        if self.stopped.swap(true, Ordering::SeqCst) {
175            // Already stopped explicitly — nothing to do.
176            return;
177        }
178        let ctx = self.ctx.clone();
179        if let Ok(handle) = tokio::runtime::Handle::try_current() {
180            match handle.runtime_flavor() {
181                tokio::runtime::RuntimeFlavor::MultiThread => {
182                    // Deterministic cleanup when blocking is supported.
183                    tokio::task::block_in_place(|| {
184                        handle.block_on(async move {
185                            let mut ctx = ctx.lock().await;
186                            let _ = ctx.stop().await;
187                        });
188                    });
189                }
190                tokio::runtime::RuntimeFlavor::CurrentThread => {
191                    // Best effort fallback for current-thread runtimes where
192                    // blocking in Drop is not possible.
193                    handle.spawn(async move {
194                        let mut ctx = ctx.lock().await;
195                        let _ = ctx.stop().await;
196                    });
197                }
198                _ => {
199                    handle.spawn(async move {
200                        let mut ctx = ctx.lock().await;
201                        let _ = ctx.stop().await;
202                    });
203                }
204            }
205        }
206    }
207}
208
209// ---------------------------------------------------------------------------
210// CamelTestContext
211// ---------------------------------------------------------------------------
212
213/// Test harness that wraps [`CamelContext`] with teardown helpers,
214/// pre-registered components, and a shared [`MockComponent`] accessor.
215///
216/// # Example
217///
218/// ```no_run
219/// # use camel_test::CamelTestContext;
220/// # use std::time::Duration;
221/// #[tokio::test]
222/// async fn my_test() {
223///     let h = CamelTestContext::builder()
224///         .with_timer()
225///         .with_mock()
226///         .build()
227///         .await;
228///
229///     // add routes, start, assert…
230///     h.stop().await; // deterministic teardown
231///     // Drop also performs best-effort cleanup if omitted
232/// }
233/// ```
234pub struct CamelTestContext {
235    ctx: Arc<Mutex<CamelContext>>,
236    mock: MockComponent,
237    stopped: Arc<AtomicBool>,
238    _guard: TestGuard,
239}
240
241impl CamelTestContext {
242    /// Obtain a builder.
243    pub fn builder() -> CamelTestContextBuilder<NoTimeControl> {
244        CamelTestContextBuilder::new()
245    }
246
247    /// Add a route definition to the context.
248    pub async fn add_route(&self, route: RouteDefinition) -> Result<(), CamelError> {
249        let ctx = self.ctx.lock().await;
250        ctx.add_route_definition(route).await
251    }
252
253    /// Start all routes.
254    pub async fn start(&self) {
255        let mut ctx = self.ctx.lock().await;
256        ctx.start().await.expect("CamelTestContext: start failed"); // allow-unwrap
257    }
258
259    /// Stop all routes explicitly. Safe to call before the harness is dropped —
260    /// subsequent drop is a no-op.
261    pub async fn stop(&self) {
262        if self.stopped.swap(true, Ordering::SeqCst) {
263            return; // already stopped
264        }
265        let mut ctx = self.ctx.lock().await;
266        ctx.stop().await.expect("CamelTestContext: stop failed"); // allow-unwrap
267    }
268
269    /// Consume the harness and stop routes deterministically.
270    pub async fn shutdown(self) {
271        self.stop().await;
272    }
273
274    /// Access the shared mock component for assertions.
275    pub fn mock(&self) -> &MockComponent {
276        &self.mock
277    }
278
279    /// Escape hatch: access the underlying [`CamelContext`] directly.
280    pub fn ctx(&self) -> &Arc<Mutex<CamelContext>> {
281        &self.ctx
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288    use camel_builder::{RouteBuilder, StepAccumulator};
289    use std::time::Duration;
290
291    #[tokio::test]
292    async fn builder_without_time_control_builds_context() {
293        let harness = CamelTestContext::builder()
294            .with_mock()
295            .with_timer()
296            .with_log()
297            .build()
298            .await;
299
300        assert!(harness.mock().get_endpoint("result").is_none());
301        let guard = harness.ctx().lock().await;
302        let _ = &*guard;
303    }
304
305    #[tokio::test]
306    async fn builder_with_time_control_builds_and_advances_clock() {
307        let (_harness, time) = CamelTestContext::builder()
308            .with_mock()
309            .with_timer()
310            .with_time_control()
311            .build()
312            .await;
313
314        time.advance(Duration::from_millis(1)).await;
315        time.resume();
316    }
317
318    #[tokio::test]
319    async fn stop_is_idempotent_and_shutdown_is_safe() {
320        let harness = CamelTestContext::builder().with_mock().build().await;
321        harness.stop().await;
322        harness.stop().await;
323        harness.shutdown().await;
324    }
325
326    #[tokio::test]
327    async fn add_route_returns_error_for_invalid_step_uri() {
328        let harness = CamelTestContext::builder().with_mock().build().await;
329
330        let route = RouteBuilder::from("direct:start")
331            .route_id("bad-route")
332            .to("not-a-uri")
333            .build()
334            .unwrap();
335
336        let err = harness.add_route(route).await.expect_err("must fail");
337        assert!(err.to_string().contains("Invalid") || err.to_string().contains("invalid"));
338    }
339
340    #[tokio::test]
341    async fn with_component_registers_custom_component() {
342        let harness = CamelTestContext::builder()
343            .with_component(camel_component_direct::DirectComponent::new())
344            .with_mock()
345            .build()
346            .await;
347
348        let route = RouteBuilder::from("direct:start")
349            .route_id("direct-route")
350            .to("mock:out")
351            .build()
352            .unwrap();
353
354        harness.add_route(route).await.unwrap();
355        harness.start().await;
356        harness.stop().await;
357
358        // Harness context remains accessible after lifecycle.
359        let _guard = harness.ctx().lock().await;
360    }
361
362    // ── TST-004: Route lifecycle (start/stop/restart) ─────────────────────────
363
364    #[tokio::test]
365    async fn tst004_route_lifecycle_start_stop_restart() {
366        let harness = CamelTestContext::builder()
367            .with_direct()
368            .with_mock()
369            .build()
370            .await;
371
372        let route = RouteBuilder::from("direct:lifecycle")
373            .route_id("lifecycle-route")
374            .to("mock:lifecycle-out")
375            .build()
376            .unwrap();
377
378        harness.add_route(route).await.unwrap();
379
380        // Start the context — routes transition to Started.
381        harness.start().await;
382
383        // Stop the context — routes transition to Stopped.
384        harness.stop().await;
385
386        // Restart via the underlying CamelContext to verify start-after-stop works.
387        {
388            let mut ctx = harness.ctx().lock().await;
389            ctx.start().await.expect("restart should succeed");
390            ctx.stop().await.expect("stop after restart should succeed");
391        }
392    }
393
394    // ── TST-005: Concurrent exchange processing ───────────────────────────────
395
396    #[tokio::test]
397    async fn tst005_concurrent_exchange_processing() {
398        use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
399        use camel_core::route::compose_pipeline;
400        use std::sync::Arc;
401        use std::sync::atomic::{AtomicU32, Ordering};
402        use tower::ServiceExt;
403
404        let counter = Arc::new(AtomicU32::new(0));
405        let processor: BoxProcessor = {
406            let c = Arc::clone(&counter);
407            BoxProcessor::from_fn(move |ex: Exchange| {
408                let c = Arc::clone(&c);
409                Box::pin(async move {
410                    c.fetch_add(1, Ordering::Relaxed);
411                    tokio::task::yield_now().await;
412                    Ok(ex)
413                })
414            })
415        };
416
417        let pipeline = compose_pipeline(vec![processor]);
418
419        let concurrency: u32 = 10;
420        let mut handles = Vec::with_capacity(concurrency as usize);
421        for i in 0..concurrency {
422            let p = pipeline.clone();
423            handles.push(tokio::spawn(async move {
424                let ex = Exchange::new(Message::new(format!("msg-{i}")));
425                p.oneshot(ex).await.unwrap()
426            }));
427        }
428
429        for h in handles {
430            let _ = h.await.unwrap();
431        }
432
433        assert_eq!(counter.load(Ordering::Relaxed), concurrency);
434    }
435
436    // ── TST-006: Error handler invocation ─────────────────────────────────────
437
438    #[tokio::test]
439    async fn tst006_error_handler_invoked_on_failure() {
440        use camel_api::error_handler::ExceptionPolicy;
441        use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
442        use camel_processor::ErrorHandlerService;
443        use std::sync::Arc;
444        use tower::ServiceExt;
445
446        let error_received = Arc::new(std::sync::Mutex::new(false));
447        let error_received_clone = Arc::clone(&error_received);
448
449        // Handler processor that records it was called.
450        let handler: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
451            let r = Arc::clone(&error_received_clone);
452            Box::pin(async move {
453                *r.lock().unwrap() = true;
454                Ok(ex)
455            })
456        });
457
458        // Inner processor that always fails.
459        let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
460            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
461        });
462
463        let policy = ExceptionPolicy::new(|_| true);
464        let svc = ErrorHandlerService::new(failing, Some(handler), vec![(policy, None)]);
465        let ex = Exchange::new(Message::new("test"));
466        let result = svc.oneshot(ex).await;
467
468        assert!(result.is_ok(), "error handler should absorb the error");
469        assert!(
470            result.unwrap().has_error(),
471            "exchange should have error set"
472        );
473        assert!(
474            *error_received.lock().unwrap(),
475            "error handler processor should have been invoked"
476        );
477    }
478
479    // ── TST-007: Dead letter channel ──────────────────────────────────────────
480
481    #[tokio::test]
482    async fn tst007_dead_letter_channel_receives_failed_exchange() {
483        use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Exchange, Message};
484        use camel_processor::ErrorHandlerService;
485        use std::sync::Arc;
486        use tower::ServiceExt;
487
488        let dlc_received = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
489        let dlc_received_clone = Arc::clone(&dlc_received);
490
491        // DLC processor that captures the exchange.
492        let dlc: BoxProcessor = BoxProcessor::from_fn(move |ex: Exchange| {
493            let r = Arc::clone(&dlc_received_clone);
494            Box::pin(async move {
495                r.lock().unwrap().push(ex.clone());
496                Ok(ex)
497            })
498        });
499
500        let failing: BoxProcessor = BoxProcessor::from_fn(|_| {
501            Box::pin(async { Err(CamelError::ProcessorError("fail".into())) })
502        });
503
504        let svc = ErrorHandlerService::new(failing, Some(dlc), vec![]);
505        let ex = Exchange::new(Message::new("dlc-test"));
506        let result = svc.oneshot(ex).await;
507
508        assert!(result.is_ok());
509        let exchanges = dlc_received.lock().unwrap();
510        assert_eq!(
511            exchanges.len(),
512            1,
513            "DLC should have received exactly one exchange"
514        );
515        assert!(exchanges[0].has_error());
516    }
517
518    // ── TST-008: Header propagation across processors ─────────────────────────
519
520    #[tokio::test]
521    async fn tst008_header_propagation_across_processors() {
522        use camel_api::{Body, BoxProcessor, BoxProcessorExt, Exchange, Message, Value};
523        use camel_core::route::compose_pipeline;
524        use tower::ServiceExt;
525
526        let step1: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
527            Box::pin(async move {
528                ex.input
529                    .set_header("trace-id", Value::String("abc-123".into()));
530                Ok(ex)
531            })
532        });
533
534        let step2: BoxProcessor = BoxProcessor::from_fn(|mut ex: Exchange| {
535            Box::pin(async move {
536                // Modify body but leave headers intact.
537                ex.input.body = Body::Text("processed".to_string());
538                Ok(ex)
539            })
540        });
541
542        let pipeline = compose_pipeline(vec![step1, step2]);
543        let ex = Exchange::new(Message::new("input"));
544        let result = pipeline.oneshot(ex).await.unwrap();
545
546        assert_eq!(
547            result.input.header("trace-id"),
548            Some(&Value::String("abc-123".into())),
549            "header should survive across processors"
550        );
551        assert_eq!(result.input.body.as_text(), Some("processed"));
552    }
553
554    // ── TST-009: Exchange body type conversion ────────────────────────────────
555
556    #[tokio::test]
557    async fn tst009_exchange_body_type_conversion() {
558        use camel_api::body::Body;
559        use camel_api::body_converter::{BodyType, convert};
560
561        // String → Bytes
562        let text_body = Body::Text("hello".to_string());
563        let bytes_body = convert(text_body, BodyType::Bytes).unwrap();
564        assert!(matches!(bytes_body, Body::Bytes(_)));
565        if let Body::Bytes(ref b) = bytes_body {
566            assert_eq!(b.as_ref(), b"hello");
567        }
568
569        // Bytes → String
570        let text_body_back = convert(bytes_body, BodyType::Text).unwrap();
571        assert!(matches!(text_body_back, Body::Text(_)));
572        assert_eq!(text_body_back.as_text(), Some("hello"));
573    }
574
575    // ── TST-010: Multicast EIP ────────────────────────────────────────────────
576
577    #[tokio::test]
578    async fn tst010_multicast_delivers_to_multiple_endpoints() {
579        use camel_api::multicast::{MulticastConfig, MulticastStrategy};
580        use camel_api::{BoxProcessor, BoxProcessorExt, Exchange, Message};
581        use camel_processor::MulticastService;
582        use std::sync::Arc;
583        use tower::ServiceExt;
584
585        let received_a = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
586        let received_b = Arc::new(std::sync::Mutex::new(Vec::<Exchange>::new()));
587
588        let endpoint_a: BoxProcessor = {
589            let r = Arc::clone(&received_a);
590            BoxProcessor::from_fn(move |ex: Exchange| {
591                let r = Arc::clone(&r);
592                Box::pin(async move {
593                    r.lock().unwrap().push(ex.clone());
594                    Ok(ex)
595                })
596            })
597        };
598
599        let endpoint_b: BoxProcessor = {
600            let r = Arc::clone(&received_b);
601            BoxProcessor::from_fn(move |ex: Exchange| {
602                let r = Arc::clone(&r);
603                Box::pin(async move {
604                    r.lock().unwrap().push(ex.clone());
605                    Ok(ex)
606                })
607            })
608        };
609
610        let config = MulticastConfig::new().aggregation(MulticastStrategy::LastWins);
611
612        let svc = MulticastService::new(vec![endpoint_a, endpoint_b], config)
613            .expect("multicast service creation should succeed");
614        let ex = Exchange::new(Message::new("multicast-test"));
615        let _result = svc.oneshot(ex).await.unwrap();
616
617        assert_eq!(
618            received_a.lock().unwrap().len(),
619            1,
620            "endpoint A should receive exactly one exchange"
621        );
622        assert_eq!(
623            received_b.lock().unwrap().len(),
624            1,
625            "endpoint B should receive exactly one exchange"
626        );
627    }
628}