Skip to main content

camel_component_mock/
lib.rs

1//! Mock component for rust-camel — testing utility that records received
2//! exchanges for later assertion, useful for verifying route output in tests.
3//!
4//! Main types: `MockComponent`, `MockEndpoint`, `MockProducer`.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use tokio::sync::{Mutex, Notify};
13use tower::Service;
14
15use camel_component_api::parse_uri;
16use camel_component_api::{BoxProcessor, CamelError, Exchange};
17use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
18use tracing::debug;
19
20// ---------------------------------------------------------------------------
21// MockComponent
22// ---------------------------------------------------------------------------
23
24/// The Mock component is a testing utility that records every exchange it
25/// receives via its producer.  It exposes helpers to inspect and assert on
26/// the recorded exchanges.
27///
28/// URI format: `mock:name`
29///
30/// When `create_endpoint` is called multiple times with the same name, the
31/// returned endpoints share the same received-exchanges storage. This enables
32/// test assertions: create mock, register it, run routes, then inspect via
33/// `component.get_endpoint("name")`.
34#[derive(Clone)]
35pub struct MockComponent {
36    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
37}
38
39impl MockComponent {
40    pub fn new() -> Self {
41        Self {
42            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
43        }
44    }
45
46    /// Retrieve a previously created endpoint's inner data by name.
47    ///
48    /// This is the primary way to inspect recorded exchanges in tests.
49    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
50        let registry = self
51            .registry
52            .lock()
53            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
54        registry.get(name).cloned()
55    }
56}
57
58impl Default for MockComponent {
59    fn default() -> Self {
60        Self::new()
61    }
62}
63
64impl Component for MockComponent {
65    fn scheme(&self) -> &str {
66        "mock"
67    }
68
69    fn create_endpoint(
70        &self,
71        uri: &str,
72        _ctx: &dyn camel_component_api::ComponentContext,
73    ) -> Result<Box<dyn Endpoint>, CamelError> {
74        let parts = parse_uri(uri)?;
75        if parts.scheme != "mock" {
76            return Err(CamelError::InvalidUri(format!(
77                "expected scheme 'mock', got '{}'",
78                parts.scheme
79            )));
80        }
81
82        let name = parts.path;
83        let mut registry = self.registry.lock().map_err(|e| {
84            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
85        })?;
86        let inner = registry
87            .entry(name.clone())
88            .or_insert_with(|| {
89                Arc::new(MockEndpointInner {
90                    uri: uri.to_string(),
91                    name,
92                    received: Arc::new(Mutex::new(Vec::new())),
93                    notify: Arc::new(Notify::new()),
94                })
95            })
96            .clone();
97
98        debug!(endpoint_name = %inner.name, "mock endpoint created");
99        Ok(Box::new(MockEndpoint(inner)))
100    }
101}
102
103// ---------------------------------------------------------------------------
104// MockEndpoint / MockEndpointInner
105// ---------------------------------------------------------------------------
106
107/// A mock endpoint that records all exchanges sent to it.
108///
109/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
110/// `MockEndpoint` instances created with the same name share the same inner
111/// storage.
112pub struct MockEndpoint(Arc<MockEndpointInner>);
113
114/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
115/// instances created with the same name via `MockComponent`.
116///
117/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
118/// recorded exchanges in tests.
119pub struct MockEndpointInner {
120    uri: String,
121    pub name: String,
122    received: Arc<Mutex<Vec<Exchange>>>,
123    notify: Arc<Notify>,
124}
125
126impl MockEndpointInner {
127    /// Return a snapshot of all exchanges received so far.
128    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
129        self.received.lock().await.clone()
130    }
131
132    /// Assert that exactly `expected` exchanges have been received.
133    ///
134    /// # Panics
135    ///
136    /// Panics if the count does not match.
137    pub async fn assert_exchange_count(&self, expected: usize) {
138        let actual = self.received.lock().await.len();
139        assert_eq!(
140            actual, expected,
141            "MockEndpoint expected {expected} exchanges, got {actual}"
142        );
143    }
144
145    /// Wait until at least `count` exchanges have been received, or panic on timeout.
146    ///
147    /// Uses `tokio::sync::Notify` — no polling. Returns immediately if `count`
148    /// exchanges are already present.
149    ///
150    /// # Panics
151    ///
152    /// Panics if `timeout` elapses before `count` exchanges arrive.
153    pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
154        let deadline = tokio::time::Instant::now() + timeout;
155        loop {
156            {
157                let received = self.received.lock().await;
158                if received.len() >= count {
159                    return;
160                }
161            }
162            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
163            if remaining.is_zero() {
164                // Re-check in case the final exchange arrived between the lock drop
165                // above and entering the select — Notify does not buffer permits.
166                let got = self.received.lock().await.len();
167                if got >= count {
168                    return;
169                }
170                panic!(
171                    "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
172                    self.name, count, got, timeout
173                );
174            }
175            tokio::select! {
176                _ = self.notify.notified() => {}
177                _ = tokio::time::sleep(remaining) => {}
178            }
179        }
180    }
181
182    /// Return an [`ExchangeAssert`] for the exchange at `idx`.
183    ///
184    /// # Panics
185    ///
186    /// Panics if `idx` is out of bounds. Always call [`await_exchanges`] first
187    /// to ensure the exchange has been received.
188    ///
189    /// Panics if called from a single-threaded tokio runtime. Use
190    /// `#[tokio::test(flavor = "multi_thread")]` for tests that call this method.
191    ///
192    /// [`await_exchanges`]: MockEndpointInner::await_exchanges
193    pub fn exchange(&self, idx: usize) -> ExchangeAssert {
194        let received = tokio::task::block_in_place(|| self.received.blocking_lock());
195        if idx >= received.len() {
196            panic!(
197                "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
198                self.name,
199                idx,
200                received.len()
201            );
202        }
203        ExchangeAssert {
204            exchange: received[idx].clone(),
205            idx,
206            endpoint_name: self.name.clone(),
207        }
208    }
209}
210
211impl Endpoint for MockEndpoint {
212    fn uri(&self) -> &str {
213        &self.0.uri
214    }
215
216    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
217        Err(CamelError::EndpointCreationFailed(
218            "mock endpoint does not support consumers (it is a sink)".to_string(),
219        ))
220    }
221
222    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
223        Ok(BoxProcessor::new(MockProducer {
224            name: self.0.name.clone(),
225            received: Arc::clone(&self.0.received),
226            notify: Arc::clone(&self.0.notify),
227        }))
228    }
229}
230
231// ---------------------------------------------------------------------------
232// MockProducer
233// ---------------------------------------------------------------------------
234
235/// A producer that simply records each exchange it processes.
236#[derive(Clone)]
237struct MockProducer {
238    name: String,
239    received: Arc<Mutex<Vec<Exchange>>>,
240    notify: Arc<Notify>,
241}
242
243impl Service<Exchange> for MockProducer {
244    type Response = Exchange;
245    type Error = CamelError;
246    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
247
248    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
249        Poll::Ready(Ok(()))
250    }
251
252    fn call(&mut self, exchange: Exchange) -> Self::Future {
253        let name = self.name.clone();
254        let received = Arc::clone(&self.received);
255        let notify = Arc::clone(&self.notify);
256        Box::pin(async move {
257            received.lock().await.push(exchange.clone());
258            let count = received.lock().await.len();
259            debug!(endpoint_name = %name, count = %count, "exchange recorded on mock");
260            notify.notify_waiters();
261            Ok(exchange)
262        })
263    }
264}
265
266// ---------------------------------------------------------------------------
267// ExchangeAssert
268// ---------------------------------------------------------------------------
269
270/// A handle for making synchronous assertions on a recorded exchange.
271///
272/// Obtain one via [`MockEndpointInner::exchange`] after calling
273/// [`MockEndpointInner::await_exchanges`].
274///
275/// All methods panic with descriptive messages on failure, making test output
276/// self-explanatory without additional context.
277pub struct ExchangeAssert {
278    exchange: Exchange,
279    idx: usize,
280    endpoint_name: String,
281}
282
283impl ExchangeAssert {
284    fn location(&self) -> String {
285        format!(
286            "MockEndpoint '{}' exchange[{}]",
287            self.endpoint_name, self.idx
288        )
289    }
290
291    /// Assert that the body is `Body::Text` equal to `expected`.
292    pub fn assert_body_text(self, expected: &str) -> Self {
293        match self.exchange.input.body.as_text() {
294            Some(actual) if actual == expected => {}
295            Some(actual) => panic!(
296                "{}: expected body text {:?}, got {:?}",
297                self.location(),
298                expected,
299                actual
300            ),
301            None => panic!(
302                "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
303                self.location(),
304                expected,
305                self.exchange.input.body
306            ),
307        }
308        self
309    }
310
311    /// Assert that the body is `Body::Json` equal to `expected`.
312    pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
313        match &self.exchange.input.body {
314            camel_component_api::Body::Json(actual) if *actual == expected => {}
315            camel_component_api::Body::Json(actual) => panic!(
316                "{}: expected body JSON {}, got {}",
317                self.location(),
318                expected,
319                actual
320            ),
321            other => panic!(
322                "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
323                self.location(),
324                expected,
325                other
326            ),
327        }
328        self
329    }
330
331    /// Assert that the body is `Body::Bytes` equal to `expected`.
332    pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
333        match &self.exchange.input.body {
334            camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
335            camel_component_api::Body::Bytes(actual) => panic!(
336                "{}: expected body bytes {:?}, got {:?}",
337                self.location(),
338                expected,
339                actual
340            ),
341            other => panic!(
342                "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
343                self.location(),
344                expected,
345                other
346            ),
347        }
348        self
349    }
350
351    /// Assert that header `key` exists and equals `expected`.
352    ///
353    /// # Panics
354    ///
355    /// Panics if the header is missing or its value does not match `expected`.
356    pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
357        match self.exchange.input.headers.get(key) {
358            Some(actual) if *actual == expected => {}
359            Some(actual) => panic!(
360                "{}: expected header {:?} = {}, got {}",
361                self.location(),
362                key,
363                expected,
364                actual
365            ),
366            None => panic!(
367                "{}: expected header {:?} = {}, but header is absent",
368                self.location(),
369                key,
370                expected
371            ),
372        }
373        self
374    }
375
376    /// Assert that header `key` is present (any value).
377    ///
378    /// # Panics
379    ///
380    /// Panics if the header key is absent.
381    pub fn assert_header_exists(self, key: &str) -> Self {
382        if !self.exchange.input.headers.contains_key(key) {
383            panic!(
384                "{}: expected header {:?} to be present, but it was absent",
385                self.location(),
386                key
387            );
388        }
389        self
390    }
391
392    /// Assert that the exchange has an error (`exchange.error` is `Some`).
393    ///
394    /// # Panics
395    ///
396    /// Panics if `exchange.error` is `None`.
397    pub fn assert_has_error(self) -> Self {
398        if self.exchange.error.is_none() {
399            panic!(
400                "{}: expected exchange to have an error, but error is None",
401                self.location()
402            );
403        }
404        self
405    }
406
407    /// Assert that the exchange has no error (`exchange.error` is `None`).
408    ///
409    /// # Panics
410    ///
411    /// Panics if `exchange.error` is `Some`.
412    pub fn assert_no_error(self) -> Self {
413        if let Some(ref err) = self.exchange.error {
414            panic!(
415                "{}: expected exchange to have no error, but got: {}",
416                self.location(),
417                err
418            );
419        }
420        self
421    }
422}
423
424// ---------------------------------------------------------------------------
425// Tests
426// ---------------------------------------------------------------------------
427
428#[cfg(test)]
429mod tests {
430    use super::*;
431    use camel_component_api::Message;
432    use camel_component_api::NoOpComponentContext;
433    use tower::ServiceExt;
434
435    fn test_producer_ctx() -> ProducerContext {
436        ProducerContext::new()
437    }
438
439    #[test]
440    fn test_mock_component_scheme() {
441        let component = MockComponent::new();
442        assert_eq!(component.scheme(), "mock");
443    }
444
445    #[test]
446    fn test_mock_component_default() {
447        let component = MockComponent::default();
448        assert_eq!(component.scheme(), "mock");
449        assert!(component.get_endpoint("missing").is_none());
450    }
451
452    #[test]
453    fn test_mock_creates_endpoint() {
454        let component = MockComponent::new();
455        let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
456        assert!(endpoint.is_ok());
457    }
458
459    #[test]
460    fn test_mock_wrong_scheme() {
461        let component = MockComponent::new();
462        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
463        assert!(result.is_err());
464    }
465
466    #[test]
467    fn test_mock_endpoint_no_consumer() {
468        let component = MockComponent::new();
469        let endpoint = component
470            .create_endpoint("mock:result", &NoOpComponentContext)
471            .unwrap();
472        assert!(endpoint.create_consumer().is_err());
473    }
474
475    #[test]
476    fn test_mock_endpoint_creates_producer() {
477        let ctx = test_producer_ctx();
478        let component = MockComponent::new();
479        let endpoint = component
480            .create_endpoint("mock:result", &NoOpComponentContext)
481            .unwrap();
482        assert!(endpoint.create_producer(&ctx).is_ok());
483    }
484
485    #[test]
486    fn test_mock_endpoint_uri() {
487        let component = MockComponent::new();
488        let endpoint = component
489            .create_endpoint("mock:uri-check", &NoOpComponentContext)
490            .unwrap();
491        assert_eq!(endpoint.uri(), "mock:uri-check");
492    }
493
494    #[test]
495    fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
496        let component = MockComponent::new();
497        let _ = component
498            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
499            .unwrap();
500        let _ = component
501            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
502            .unwrap();
503
504        let first = component.get_endpoint("shared-inner").unwrap();
505        let second = component.get_endpoint("shared-inner").unwrap();
506        assert!(Arc::ptr_eq(&first, &second));
507    }
508
509    #[tokio::test]
510    async fn test_mock_producer_records_exchange() {
511        let ctx = test_producer_ctx();
512        let component = MockComponent::new();
513        let endpoint = component
514            .create_endpoint("mock:test", &NoOpComponentContext)
515            .unwrap();
516
517        let mut producer = endpoint.create_producer(&ctx).unwrap();
518
519        let ex1 = Exchange::new(Message::new("first"));
520        let ex2 = Exchange::new(Message::new("second"));
521
522        producer.call(ex1).await.unwrap();
523        producer.call(ex2).await.unwrap();
524
525        let inner = component.get_endpoint("test").unwrap();
526        inner.assert_exchange_count(2).await;
527
528        let received = inner.get_received_exchanges().await;
529        assert_eq!(received[0].input.body.as_text(), Some("first"));
530        assert_eq!(received[1].input.body.as_text(), Some("second"));
531    }
532
533    #[tokio::test]
534    async fn test_mock_producer_passes_through_exchange() {
535        let ctx = test_producer_ctx();
536        let component = MockComponent::new();
537        let endpoint = component
538            .create_endpoint("mock:passthrough", &NoOpComponentContext)
539            .unwrap();
540
541        let producer = endpoint.create_producer(&ctx).unwrap();
542        let exchange = Exchange::new(Message::new("hello"));
543        let result = producer.oneshot(exchange).await.unwrap();
544
545        // Producer should return the exchange unchanged
546        assert_eq!(result.input.body.as_text(), Some("hello"));
547    }
548
549    #[tokio::test]
550    async fn test_mock_assert_count_passes() {
551        let component = MockComponent::new();
552        let endpoint = component
553            .create_endpoint("mock:count", &NoOpComponentContext)
554            .unwrap();
555        let inner = component.get_endpoint("count").unwrap();
556
557        inner.assert_exchange_count(0).await;
558
559        let ctx = test_producer_ctx();
560        let mut producer = endpoint.create_producer(&ctx).unwrap();
561        producer
562            .call(Exchange::new(Message::new("one")))
563            .await
564            .unwrap();
565
566        inner.assert_exchange_count(1).await;
567    }
568
569    #[tokio::test]
570    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
571    async fn test_mock_assert_count_fails() {
572        let component = MockComponent::new();
573        // Endpoint not created yet, so get_endpoint returns None.
574        // Create it first, then assert.
575        let _endpoint = component
576            .create_endpoint("mock:fail", &NoOpComponentContext)
577            .unwrap();
578        let inner = component.get_endpoint("fail").unwrap();
579
580        inner.assert_exchange_count(5).await;
581    }
582
583    #[tokio::test]
584    async fn test_mock_component_shared_registry() {
585        let component = MockComponent::new();
586        let ep1 = component
587            .create_endpoint("mock:shared", &NoOpComponentContext)
588            .unwrap();
589        let ep2 = component
590            .create_endpoint("mock:shared", &NoOpComponentContext)
591            .unwrap();
592
593        // Producing via ep1's producer...
594        let ctx = test_producer_ctx();
595        let mut p1 = ep1.create_producer(&ctx).unwrap();
596        p1.call(Exchange::new(Message::new("from-ep1")))
597            .await
598            .unwrap();
599
600        // ...and via ep2's producer...
601        let mut p2 = ep2.create_producer(&ctx).unwrap();
602        p2.call(Exchange::new(Message::new("from-ep2")))
603            .await
604            .unwrap();
605
606        // ...both should be visible via the shared storage
607        let inner = component.get_endpoint("shared").unwrap();
608        inner.assert_exchange_count(2).await;
609
610        let received = inner.get_received_exchanges().await;
611        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
612        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
613    }
614
615    #[tokio::test]
616    async fn await_exchanges_resolves_immediately() {
617        // If exchanges are already present, await_exchanges returns without timeout.
618        let ctx = test_producer_ctx();
619        let component = MockComponent::new();
620        let endpoint = component
621            .create_endpoint("mock:immediate", &NoOpComponentContext)
622            .unwrap();
623        let inner = component.get_endpoint("immediate").unwrap();
624
625        let mut producer = endpoint.create_producer(&ctx).unwrap();
626        producer
627            .call(Exchange::new(Message::new("a")))
628            .await
629            .unwrap();
630        producer
631            .call(Exchange::new(Message::new("b")))
632            .await
633            .unwrap();
634
635        // Should return immediately — both exchanges already received.
636        inner
637            .await_exchanges(2, std::time::Duration::from_millis(100))
638            .await;
639    }
640
641    #[tokio::test]
642    async fn await_exchanges_waits_then_resolves() {
643        // await_exchanges unblocks when a producer sends after the call.
644        let ctx = test_producer_ctx();
645        let component = MockComponent::new();
646        let endpoint = component
647            .create_endpoint("mock:waiter", &NoOpComponentContext)
648            .unwrap();
649        let inner = component.get_endpoint("waiter").unwrap();
650
651        // Spawn producer that sends after a short delay.
652        let mut producer = endpoint.create_producer(&ctx).unwrap();
653        tokio::spawn(async move {
654            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
655            producer
656                .call(Exchange::new(Message::new("delayed")))
657                .await
658                .unwrap();
659        });
660
661        // This should block until the spawned task delivers the exchange.
662        inner
663            .await_exchanges(1, std::time::Duration::from_millis(500))
664            .await;
665
666        let received = inner.get_received_exchanges().await;
667        assert_eq!(received.len(), 1);
668        assert_eq!(received[0].input.body.as_text(), Some("delayed"));
669    }
670
671    #[tokio::test]
672    #[should_panic(expected = "timed out waiting for 5 exchanges")]
673    async fn await_exchanges_times_out() {
674        let component = MockComponent::new();
675        let _endpoint = component
676            .create_endpoint("mock:timeout", &NoOpComponentContext)
677            .unwrap();
678        let inner = component.get_endpoint("timeout").unwrap();
679
680        // Nobody sends — should panic after timeout.
681        inner
682            .await_exchanges(5, std::time::Duration::from_millis(50))
683            .await;
684    }
685
686    #[tokio::test(flavor = "multi_thread")]
687    async fn exchange_idx_returns_assert() {
688        let ctx = test_producer_ctx();
689        let component = MockComponent::new();
690        let endpoint = component
691            .create_endpoint("mock:assert-idx", &NoOpComponentContext)
692            .unwrap();
693        let inner = component.get_endpoint("assert-idx").unwrap();
694
695        let mut producer = endpoint.create_producer(&ctx).unwrap();
696        producer
697            .call(Exchange::new(Message::new("hello")))
698            .await
699            .unwrap();
700
701        inner
702            .await_exchanges(1, std::time::Duration::from_millis(500))
703            .await;
704        // Should not panic — index 0 exists.
705        let _assert = inner.exchange(0);
706    }
707
708    #[tokio::test(flavor = "multi_thread")]
709    #[should_panic(expected = "exchange index 5 out of bounds")]
710    async fn exchange_idx_out_of_bounds() {
711        let ctx = test_producer_ctx();
712        let component = MockComponent::new();
713        let endpoint = component
714            .create_endpoint("mock:oob", &NoOpComponentContext)
715            .unwrap();
716        let inner = component.get_endpoint("oob").unwrap();
717
718        let mut producer = endpoint.create_producer(&ctx).unwrap();
719        producer
720            .call(Exchange::new(Message::new("only-one")))
721            .await
722            .unwrap();
723
724        inner
725            .await_exchanges(1, std::time::Duration::from_millis(500))
726            .await;
727        // Only 1 exchange, index 5 should panic.
728        let _assert = inner.exchange(5);
729    }
730
731    #[tokio::test(flavor = "multi_thread")]
732    async fn assert_body_text_pass() {
733        let ctx = test_producer_ctx();
734        let component = MockComponent::new();
735        let endpoint = component
736            .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
737            .unwrap();
738        let inner = component.get_endpoint("body-text-pass").unwrap();
739        let mut producer = endpoint.create_producer(&ctx).unwrap();
740        producer
741            .call(Exchange::new(Message::new("hello")))
742            .await
743            .unwrap();
744        inner
745            .await_exchanges(1, std::time::Duration::from_millis(500))
746            .await;
747        inner.exchange(0).assert_body_text("hello");
748    }
749
750    #[tokio::test(flavor = "multi_thread")]
751    #[should_panic(expected = "expected body text")]
752    async fn assert_body_text_fail() {
753        let ctx = test_producer_ctx();
754        let component = MockComponent::new();
755        let endpoint = component
756            .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
757            .unwrap();
758        let inner = component.get_endpoint("body-text-fail").unwrap();
759        let mut producer = endpoint.create_producer(&ctx).unwrap();
760        producer
761            .call(Exchange::new(Message::new("hello")))
762            .await
763            .unwrap();
764        inner
765            .await_exchanges(1, std::time::Duration::from_millis(500))
766            .await;
767        inner.exchange(0).assert_body_text("world");
768    }
769
770    #[tokio::test(flavor = "multi_thread")]
771    async fn assert_body_json_pass() {
772        use camel_component_api::Body;
773        let ctx = test_producer_ctx();
774        let component = MockComponent::new();
775        let endpoint = component
776            .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
777            .unwrap();
778        let inner = component.get_endpoint("body-json-pass").unwrap();
779        let mut producer = endpoint.create_producer(&ctx).unwrap();
780        let mut msg = Message::new("");
781        msg.body = Body::Json(serde_json::json!({"key": "value"}));
782        producer.call(Exchange::new(msg)).await.unwrap();
783        inner
784            .await_exchanges(1, std::time::Duration::from_millis(500))
785            .await;
786        inner
787            .exchange(0)
788            .assert_body_json(serde_json::json!({"key": "value"}));
789    }
790
791    #[tokio::test(flavor = "multi_thread")]
792    #[should_panic(expected = "expected body JSON")]
793    async fn assert_body_json_fail() {
794        use camel_component_api::Body;
795        let ctx = test_producer_ctx();
796        let component = MockComponent::new();
797        let endpoint = component
798            .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
799            .unwrap();
800        let inner = component.get_endpoint("body-json-fail").unwrap();
801        let mut producer = endpoint.create_producer(&ctx).unwrap();
802        let mut msg = Message::new("");
803        msg.body = Body::Json(serde_json::json!({"key": "value"}));
804        producer.call(Exchange::new(msg)).await.unwrap();
805        inner
806            .await_exchanges(1, std::time::Duration::from_millis(500))
807            .await;
808        inner
809            .exchange(0)
810            .assert_body_json(serde_json::json!({"key": "other"}));
811    }
812
813    #[tokio::test(flavor = "multi_thread")]
814    async fn assert_body_bytes_pass() {
815        use bytes::Bytes;
816        use camel_component_api::Body;
817        let ctx = test_producer_ctx();
818        let component = MockComponent::new();
819        let endpoint = component
820            .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
821            .unwrap();
822        let inner = component.get_endpoint("body-bytes-pass").unwrap();
823        let mut producer = endpoint.create_producer(&ctx).unwrap();
824        let mut msg = Message::new("");
825        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
826        producer.call(Exchange::new(msg)).await.unwrap();
827        inner
828            .await_exchanges(1, std::time::Duration::from_millis(500))
829            .await;
830        inner.exchange(0).assert_body_bytes(b"binary");
831    }
832
833    #[tokio::test(flavor = "multi_thread")]
834    #[should_panic(expected = "expected body bytes")]
835    async fn assert_body_bytes_fail() {
836        use bytes::Bytes;
837        use camel_component_api::Body;
838        let ctx = test_producer_ctx();
839        let component = MockComponent::new();
840        let endpoint = component
841            .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
842            .unwrap();
843        let inner = component.get_endpoint("body-bytes-fail").unwrap();
844        let mut producer = endpoint.create_producer(&ctx).unwrap();
845        let mut msg = Message::new("");
846        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
847        producer.call(Exchange::new(msg)).await.unwrap();
848        inner
849            .await_exchanges(1, std::time::Duration::from_millis(500))
850            .await;
851        inner.exchange(0).assert_body_bytes(b"different");
852    }
853
854    #[tokio::test(flavor = "multi_thread")]
855    async fn assert_header_pass() {
856        let ctx = test_producer_ctx();
857        let component = MockComponent::new();
858        let endpoint = component
859            .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
860            .unwrap();
861        let inner = component.get_endpoint("hdr-pass").unwrap();
862        let mut producer = endpoint.create_producer(&ctx).unwrap();
863        let mut msg = Message::new("body");
864        msg.headers
865            .insert("x-key".to_string(), serde_json::json!("value"));
866        producer.call(Exchange::new(msg)).await.unwrap();
867        inner
868            .await_exchanges(1, std::time::Duration::from_millis(500))
869            .await;
870        inner
871            .exchange(0)
872            .assert_header("x-key", serde_json::json!("value"));
873    }
874
875    #[tokio::test(flavor = "multi_thread")]
876    #[should_panic(expected = "expected header")]
877    async fn assert_header_fail() {
878        let ctx = test_producer_ctx();
879        let component = MockComponent::new();
880        let endpoint = component
881            .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
882            .unwrap();
883        let inner = component.get_endpoint("hdr-fail").unwrap();
884        let mut producer = endpoint.create_producer(&ctx).unwrap();
885        let mut msg = Message::new("body");
886        msg.headers
887            .insert("x-key".to_string(), serde_json::json!("value"));
888        producer.call(Exchange::new(msg)).await.unwrap();
889        inner
890            .await_exchanges(1, std::time::Duration::from_millis(500))
891            .await;
892        inner
893            .exchange(0)
894            .assert_header("x-key", serde_json::json!("other"));
895    }
896
897    #[tokio::test(flavor = "multi_thread")]
898    async fn assert_header_exists_pass() {
899        let ctx = test_producer_ctx();
900        let component = MockComponent::new();
901        let endpoint = component
902            .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
903            .unwrap();
904        let inner = component.get_endpoint("hdr-exists-pass").unwrap();
905        let mut producer = endpoint.create_producer(&ctx).unwrap();
906        let mut msg = Message::new("body");
907        msg.headers
908            .insert("x-present".to_string(), serde_json::json!(42));
909        producer.call(Exchange::new(msg)).await.unwrap();
910        inner
911            .await_exchanges(1, std::time::Duration::from_millis(500))
912            .await;
913        inner.exchange(0).assert_header_exists("x-present");
914    }
915
916    #[tokio::test(flavor = "multi_thread")]
917    #[should_panic(expected = "expected header")]
918    async fn assert_header_exists_fail() {
919        let ctx = test_producer_ctx();
920        let component = MockComponent::new();
921        let endpoint = component
922            .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
923            .unwrap();
924        let inner = component.get_endpoint("hdr-exists-fail").unwrap();
925        let mut producer = endpoint.create_producer(&ctx).unwrap();
926        producer
927            .call(Exchange::new(Message::new("body")))
928            .await
929            .unwrap();
930        inner
931            .await_exchanges(1, std::time::Duration::from_millis(500))
932            .await;
933        inner.exchange(0).assert_header_exists("x-missing");
934    }
935
936    #[tokio::test(flavor = "multi_thread")]
937    async fn assert_has_error_pass() {
938        let ctx = test_producer_ctx();
939        let component = MockComponent::new();
940        let endpoint = component
941            .create_endpoint("mock:err-pass", &NoOpComponentContext)
942            .unwrap();
943        let inner = component.get_endpoint("err-pass").unwrap();
944        let mut producer = endpoint.create_producer(&ctx).unwrap();
945        let mut ex = Exchange::new(Message::new("body"));
946        ex.error = Some(camel_component_api::CamelError::ProcessorError(
947            "oops".to_string(),
948        ));
949        producer.call(ex).await.unwrap();
950        inner
951            .await_exchanges(1, std::time::Duration::from_millis(500))
952            .await;
953        inner.exchange(0).assert_has_error();
954    }
955
956    #[tokio::test(flavor = "multi_thread")]
957    #[should_panic(expected = "expected exchange to have an error")]
958    async fn assert_has_error_fail() {
959        let ctx = test_producer_ctx();
960        let component = MockComponent::new();
961        let endpoint = component
962            .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
963            .unwrap();
964        let inner = component.get_endpoint("has-err-fail").unwrap();
965        let mut producer = endpoint.create_producer(&ctx).unwrap();
966        producer
967            .call(Exchange::new(Message::new("body")))
968            .await
969            .unwrap();
970        inner
971            .await_exchanges(1, std::time::Duration::from_millis(500))
972            .await;
973        inner.exchange(0).assert_has_error();
974    }
975
976    #[tokio::test(flavor = "multi_thread")]
977    async fn assert_no_error_pass() {
978        let ctx = test_producer_ctx();
979        let component = MockComponent::new();
980        let endpoint = component
981            .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
982            .unwrap();
983        let inner = component.get_endpoint("no-err-pass").unwrap();
984        let mut producer = endpoint.create_producer(&ctx).unwrap();
985        producer
986            .call(Exchange::new(Message::new("body")))
987            .await
988            .unwrap();
989        inner
990            .await_exchanges(1, std::time::Duration::from_millis(500))
991            .await;
992        inner.exchange(0).assert_no_error();
993    }
994
995    #[tokio::test(flavor = "multi_thread")]
996    #[should_panic(expected = "expected exchange to have no error")]
997    async fn assert_no_error_fail() {
998        let ctx = test_producer_ctx();
999        let component = MockComponent::new();
1000        let endpoint = component
1001            .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
1002            .unwrap();
1003        let inner = component.get_endpoint("no-err-fail").unwrap();
1004        let mut producer = endpoint.create_producer(&ctx).unwrap();
1005        let mut ex = Exchange::new(Message::new("body"));
1006        ex.error = Some(camel_component_api::CamelError::ProcessorError(
1007            "oops".to_string(),
1008        ));
1009        producer.call(ex).await.unwrap();
1010        inner
1011            .await_exchanges(1, std::time::Duration::from_millis(500))
1012            .await;
1013        inner.exchange(0).assert_no_error();
1014    }
1015}