Skip to main content

camel_component_mock/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::{Mutex, Notify};
8use tower::Service;
9
10use camel_component_api::parse_uri;
11use camel_component_api::{BoxProcessor, CamelError, Exchange};
12use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
13
14// ---------------------------------------------------------------------------
15// MockComponent
16// ---------------------------------------------------------------------------
17
18/// The Mock component is a testing utility that records every exchange it
19/// receives via its producer.  It exposes helpers to inspect and assert on
20/// the recorded exchanges.
21///
22/// URI format: `mock:name`
23///
24/// When `create_endpoint` is called multiple times with the same name, the
25/// returned endpoints share the same received-exchanges storage. This enables
26/// test assertions: create mock, register it, run routes, then inspect via
27/// `component.get_endpoint("name")`.
28#[derive(Clone)]
29pub struct MockComponent {
30    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34    pub fn new() -> Self {
35        Self {
36            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37        }
38    }
39
40    /// Retrieve a previously created endpoint's inner data by name.
41    ///
42    /// This is the primary way to inspect recorded exchanges in tests.
43    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44        let registry = self
45            .registry
46            .lock()
47            .expect("mutex poisoned: another thread panicked while holding this lock");
48        registry.get(name).cloned()
49    }
50}
51
52impl Default for MockComponent {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl Component for MockComponent {
59    fn scheme(&self) -> &str {
60        "mock"
61    }
62
63    fn create_endpoint(
64        &self,
65        uri: &str,
66        _ctx: &dyn camel_component_api::ComponentContext,
67    ) -> Result<Box<dyn Endpoint>, CamelError> {
68        let parts = parse_uri(uri)?;
69        if parts.scheme != "mock" {
70            return Err(CamelError::InvalidUri(format!(
71                "expected scheme 'mock', got '{}'",
72                parts.scheme
73            )));
74        }
75
76        let name = parts.path;
77        let mut registry = self.registry.lock().map_err(|e| {
78            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
79        })?;
80        let inner = registry
81            .entry(name.clone())
82            .or_insert_with(|| {
83                Arc::new(MockEndpointInner {
84                    uri: uri.to_string(),
85                    name,
86                    received: Arc::new(Mutex::new(Vec::new())),
87                    notify: Arc::new(Notify::new()),
88                })
89            })
90            .clone();
91
92        Ok(Box::new(MockEndpoint(inner)))
93    }
94}
95
96// ---------------------------------------------------------------------------
97// MockEndpoint / MockEndpointInner
98// ---------------------------------------------------------------------------
99
100/// A mock endpoint that records all exchanges sent to it.
101///
102/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
103/// `MockEndpoint` instances created with the same name share the same inner
104/// storage.
105pub struct MockEndpoint(Arc<MockEndpointInner>);
106
107/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
108/// instances created with the same name via `MockComponent`.
109///
110/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
111/// recorded exchanges in tests.
112pub struct MockEndpointInner {
113    uri: String,
114    pub name: String,
115    received: Arc<Mutex<Vec<Exchange>>>,
116    notify: Arc<Notify>,
117}
118
119impl MockEndpointInner {
120    /// Return a snapshot of all exchanges received so far.
121    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
122        self.received.lock().await.clone()
123    }
124
125    /// Assert that exactly `expected` exchanges have been received.
126    ///
127    /// # Panics
128    ///
129    /// Panics if the count does not match.
130    pub async fn assert_exchange_count(&self, expected: usize) {
131        let actual = self.received.lock().await.len();
132        assert_eq!(
133            actual, expected,
134            "MockEndpoint expected {expected} exchanges, got {actual}"
135        );
136    }
137
138    /// Wait until at least `count` exchanges have been received, or panic on timeout.
139    ///
140    /// Uses `tokio::sync::Notify` — no polling. Returns immediately if `count`
141    /// exchanges are already present.
142    ///
143    /// # Panics
144    ///
145    /// Panics if `timeout` elapses before `count` exchanges arrive.
146    pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
147        let deadline = tokio::time::Instant::now() + timeout;
148        loop {
149            {
150                let received = self.received.lock().await;
151                if received.len() >= count {
152                    return;
153                }
154            }
155            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
156            if remaining.is_zero() {
157                // Re-check in case the final exchange arrived between the lock drop
158                // above and entering the select — Notify does not buffer permits.
159                let got = self.received.lock().await.len();
160                if got >= count {
161                    return;
162                }
163                panic!(
164                    "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
165                    self.name, count, got, timeout
166                );
167            }
168            tokio::select! {
169                _ = self.notify.notified() => {}
170                _ = tokio::time::sleep(remaining) => {}
171            }
172        }
173    }
174
175    /// Return an [`ExchangeAssert`] for the exchange at `idx`.
176    ///
177    /// # Panics
178    ///
179    /// Panics if `idx` is out of bounds. Always call [`await_exchanges`] first
180    /// to ensure the exchange has been received.
181    ///
182    /// Panics if called from a single-threaded tokio runtime. Use
183    /// `#[tokio::test(flavor = "multi_thread")]` for tests that call this method.
184    ///
185    /// [`await_exchanges`]: MockEndpointInner::await_exchanges
186    pub fn exchange(&self, idx: usize) -> ExchangeAssert {
187        let received = tokio::task::block_in_place(|| self.received.blocking_lock());
188        if idx >= received.len() {
189            panic!(
190                "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
191                self.name,
192                idx,
193                received.len()
194            );
195        }
196        ExchangeAssert {
197            exchange: received[idx].clone(),
198            idx,
199            endpoint_name: self.name.clone(),
200        }
201    }
202}
203
204impl Endpoint for MockEndpoint {
205    fn uri(&self) -> &str {
206        &self.0.uri
207    }
208
209    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
210        Err(CamelError::EndpointCreationFailed(
211            "mock endpoint does not support consumers (it is a sink)".to_string(),
212        ))
213    }
214
215    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
216        Ok(BoxProcessor::new(MockProducer {
217            received: Arc::clone(&self.0.received),
218            notify: Arc::clone(&self.0.notify),
219        }))
220    }
221}
222
223// ---------------------------------------------------------------------------
224// MockProducer
225// ---------------------------------------------------------------------------
226
227/// A producer that simply records each exchange it processes.
228#[derive(Clone)]
229struct MockProducer {
230    received: Arc<Mutex<Vec<Exchange>>>,
231    notify: Arc<Notify>,
232}
233
234impl Service<Exchange> for MockProducer {
235    type Response = Exchange;
236    type Error = CamelError;
237    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
238
239    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240        Poll::Ready(Ok(()))
241    }
242
243    fn call(&mut self, exchange: Exchange) -> Self::Future {
244        let received = Arc::clone(&self.received);
245        let notify = Arc::clone(&self.notify);
246        Box::pin(async move {
247            received.lock().await.push(exchange.clone());
248            notify.notify_waiters();
249            Ok(exchange)
250        })
251    }
252}
253
254// ---------------------------------------------------------------------------
255// ExchangeAssert
256// ---------------------------------------------------------------------------
257
258/// A handle for making synchronous assertions on a recorded exchange.
259///
260/// Obtain one via [`MockEndpointInner::exchange`] after calling
261/// [`MockEndpointInner::await_exchanges`].
262///
263/// All methods panic with descriptive messages on failure, making test output
264/// self-explanatory without additional context.
265pub struct ExchangeAssert {
266    exchange: Exchange,
267    idx: usize,
268    endpoint_name: String,
269}
270
271impl ExchangeAssert {
272    fn location(&self) -> String {
273        format!(
274            "MockEndpoint '{}' exchange[{}]",
275            self.endpoint_name, self.idx
276        )
277    }
278
279    /// Assert that the body is `Body::Text` equal to `expected`.
280    pub fn assert_body_text(self, expected: &str) -> Self {
281        match self.exchange.input.body.as_text() {
282            Some(actual) if actual == expected => {}
283            Some(actual) => panic!(
284                "{}: expected body text {:?}, got {:?}",
285                self.location(),
286                expected,
287                actual
288            ),
289            None => panic!(
290                "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
291                self.location(),
292                expected,
293                self.exchange.input.body
294            ),
295        }
296        self
297    }
298
299    /// Assert that the body is `Body::Json` equal to `expected`.
300    pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
301        match &self.exchange.input.body {
302            camel_component_api::Body::Json(actual) if *actual == expected => {}
303            camel_component_api::Body::Json(actual) => panic!(
304                "{}: expected body JSON {}, got {}",
305                self.location(),
306                expected,
307                actual
308            ),
309            other => panic!(
310                "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
311                self.location(),
312                expected,
313                other
314            ),
315        }
316        self
317    }
318
319    /// Assert that the body is `Body::Bytes` equal to `expected`.
320    pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
321        match &self.exchange.input.body {
322            camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
323            camel_component_api::Body::Bytes(actual) => panic!(
324                "{}: expected body bytes {:?}, got {:?}",
325                self.location(),
326                expected,
327                actual
328            ),
329            other => panic!(
330                "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
331                self.location(),
332                expected,
333                other
334            ),
335        }
336        self
337    }
338
339    /// Assert that header `key` exists and equals `expected`.
340    ///
341    /// # Panics
342    ///
343    /// Panics if the header is missing or its value does not match `expected`.
344    pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
345        match self.exchange.input.headers.get(key) {
346            Some(actual) if *actual == expected => {}
347            Some(actual) => panic!(
348                "{}: expected header {:?} = {}, got {}",
349                self.location(),
350                key,
351                expected,
352                actual
353            ),
354            None => panic!(
355                "{}: expected header {:?} = {}, but header is absent",
356                self.location(),
357                key,
358                expected
359            ),
360        }
361        self
362    }
363
364    /// Assert that header `key` is present (any value).
365    ///
366    /// # Panics
367    ///
368    /// Panics if the header key is absent.
369    pub fn assert_header_exists(self, key: &str) -> Self {
370        if !self.exchange.input.headers.contains_key(key) {
371            panic!(
372                "{}: expected header {:?} to be present, but it was absent",
373                self.location(),
374                key
375            );
376        }
377        self
378    }
379
380    /// Assert that the exchange has an error (`exchange.error` is `Some`).
381    ///
382    /// # Panics
383    ///
384    /// Panics if `exchange.error` is `None`.
385    pub fn assert_has_error(self) -> Self {
386        if self.exchange.error.is_none() {
387            panic!(
388                "{}: expected exchange to have an error, but error is None",
389                self.location()
390            );
391        }
392        self
393    }
394
395    /// Assert that the exchange has no error (`exchange.error` is `None`).
396    ///
397    /// # Panics
398    ///
399    /// Panics if `exchange.error` is `Some`.
400    pub fn assert_no_error(self) -> Self {
401        if let Some(ref err) = self.exchange.error {
402            panic!(
403                "{}: expected exchange to have no error, but got: {}",
404                self.location(),
405                err
406            );
407        }
408        self
409    }
410}
411
412// ---------------------------------------------------------------------------
413// Tests
414// ---------------------------------------------------------------------------
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use camel_component_api::Message;
420    use camel_component_api::NoOpComponentContext;
421    use tower::ServiceExt;
422
423    fn test_producer_ctx() -> ProducerContext {
424        ProducerContext::new()
425    }
426
427    #[test]
428    fn test_mock_component_scheme() {
429        let component = MockComponent::new();
430        assert_eq!(component.scheme(), "mock");
431    }
432
433    #[test]
434    fn test_mock_creates_endpoint() {
435        let component = MockComponent::new();
436        let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
437        assert!(endpoint.is_ok());
438    }
439
440    #[test]
441    fn test_mock_wrong_scheme() {
442        let component = MockComponent::new();
443        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
444        assert!(result.is_err());
445    }
446
447    #[test]
448    fn test_mock_endpoint_no_consumer() {
449        let component = MockComponent::new();
450        let endpoint = component
451            .create_endpoint("mock:result", &NoOpComponentContext)
452            .unwrap();
453        assert!(endpoint.create_consumer().is_err());
454    }
455
456    #[test]
457    fn test_mock_endpoint_creates_producer() {
458        let ctx = test_producer_ctx();
459        let component = MockComponent::new();
460        let endpoint = component
461            .create_endpoint("mock:result", &NoOpComponentContext)
462            .unwrap();
463        assert!(endpoint.create_producer(&ctx).is_ok());
464    }
465
466    #[tokio::test]
467    async fn test_mock_producer_records_exchange() {
468        let ctx = test_producer_ctx();
469        let component = MockComponent::new();
470        let endpoint = component
471            .create_endpoint("mock:test", &NoOpComponentContext)
472            .unwrap();
473
474        let mut producer = endpoint.create_producer(&ctx).unwrap();
475
476        let ex1 = Exchange::new(Message::new("first"));
477        let ex2 = Exchange::new(Message::new("second"));
478
479        producer.call(ex1).await.unwrap();
480        producer.call(ex2).await.unwrap();
481
482        let inner = component.get_endpoint("test").unwrap();
483        inner.assert_exchange_count(2).await;
484
485        let received = inner.get_received_exchanges().await;
486        assert_eq!(received[0].input.body.as_text(), Some("first"));
487        assert_eq!(received[1].input.body.as_text(), Some("second"));
488    }
489
490    #[tokio::test]
491    async fn test_mock_producer_passes_through_exchange() {
492        let ctx = test_producer_ctx();
493        let component = MockComponent::new();
494        let endpoint = component
495            .create_endpoint("mock:passthrough", &NoOpComponentContext)
496            .unwrap();
497
498        let producer = endpoint.create_producer(&ctx).unwrap();
499        let exchange = Exchange::new(Message::new("hello"));
500        let result = producer.oneshot(exchange).await.unwrap();
501
502        // Producer should return the exchange unchanged
503        assert_eq!(result.input.body.as_text(), Some("hello"));
504    }
505
506    #[tokio::test]
507    async fn test_mock_assert_count_passes() {
508        let component = MockComponent::new();
509        let endpoint = component
510            .create_endpoint("mock:count", &NoOpComponentContext)
511            .unwrap();
512        let inner = component.get_endpoint("count").unwrap();
513
514        inner.assert_exchange_count(0).await;
515
516        let ctx = test_producer_ctx();
517        let mut producer = endpoint.create_producer(&ctx).unwrap();
518        producer
519            .call(Exchange::new(Message::new("one")))
520            .await
521            .unwrap();
522
523        inner.assert_exchange_count(1).await;
524    }
525
526    #[tokio::test]
527    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
528    async fn test_mock_assert_count_fails() {
529        let component = MockComponent::new();
530        // Endpoint not created yet, so get_endpoint returns None.
531        // Create it first, then assert.
532        let _endpoint = component
533            .create_endpoint("mock:fail", &NoOpComponentContext)
534            .unwrap();
535        let inner = component.get_endpoint("fail").unwrap();
536
537        inner.assert_exchange_count(5).await;
538    }
539
540    #[tokio::test]
541    async fn test_mock_component_shared_registry() {
542        let component = MockComponent::new();
543        let ep1 = component
544            .create_endpoint("mock:shared", &NoOpComponentContext)
545            .unwrap();
546        let ep2 = component
547            .create_endpoint("mock:shared", &NoOpComponentContext)
548            .unwrap();
549
550        // Producing via ep1's producer...
551        let ctx = test_producer_ctx();
552        let mut p1 = ep1.create_producer(&ctx).unwrap();
553        p1.call(Exchange::new(Message::new("from-ep1")))
554            .await
555            .unwrap();
556
557        // ...and via ep2's producer...
558        let mut p2 = ep2.create_producer(&ctx).unwrap();
559        p2.call(Exchange::new(Message::new("from-ep2")))
560            .await
561            .unwrap();
562
563        // ...both should be visible via the shared storage
564        let inner = component.get_endpoint("shared").unwrap();
565        inner.assert_exchange_count(2).await;
566
567        let received = inner.get_received_exchanges().await;
568        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
569        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
570    }
571
572    #[tokio::test]
573    async fn await_exchanges_resolves_immediately() {
574        // If exchanges are already present, await_exchanges returns without timeout.
575        let ctx = test_producer_ctx();
576        let component = MockComponent::new();
577        let endpoint = component
578            .create_endpoint("mock:immediate", &NoOpComponentContext)
579            .unwrap();
580        let inner = component.get_endpoint("immediate").unwrap();
581
582        let mut producer = endpoint.create_producer(&ctx).unwrap();
583        producer
584            .call(Exchange::new(Message::new("a")))
585            .await
586            .unwrap();
587        producer
588            .call(Exchange::new(Message::new("b")))
589            .await
590            .unwrap();
591
592        // Should return immediately — both exchanges already received.
593        inner
594            .await_exchanges(2, std::time::Duration::from_millis(100))
595            .await;
596    }
597
598    #[tokio::test]
599    async fn await_exchanges_waits_then_resolves() {
600        // await_exchanges unblocks when a producer sends after the call.
601        let ctx = test_producer_ctx();
602        let component = MockComponent::new();
603        let endpoint = component
604            .create_endpoint("mock:waiter", &NoOpComponentContext)
605            .unwrap();
606        let inner = component.get_endpoint("waiter").unwrap();
607
608        // Spawn producer that sends after a short delay.
609        let mut producer = endpoint.create_producer(&ctx).unwrap();
610        tokio::spawn(async move {
611            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
612            producer
613                .call(Exchange::new(Message::new("delayed")))
614                .await
615                .unwrap();
616        });
617
618        // This should block until the spawned task delivers the exchange.
619        inner
620            .await_exchanges(1, std::time::Duration::from_millis(500))
621            .await;
622
623        let received = inner.get_received_exchanges().await;
624        assert_eq!(received.len(), 1);
625        assert_eq!(received[0].input.body.as_text(), Some("delayed"));
626    }
627
628    #[tokio::test]
629    #[should_panic(expected = "timed out waiting for 5 exchanges")]
630    async fn await_exchanges_times_out() {
631        let component = MockComponent::new();
632        let _endpoint = component
633            .create_endpoint("mock:timeout", &NoOpComponentContext)
634            .unwrap();
635        let inner = component.get_endpoint("timeout").unwrap();
636
637        // Nobody sends — should panic after timeout.
638        inner
639            .await_exchanges(5, std::time::Duration::from_millis(50))
640            .await;
641    }
642
643    #[tokio::test(flavor = "multi_thread")]
644    async fn exchange_idx_returns_assert() {
645        let ctx = test_producer_ctx();
646        let component = MockComponent::new();
647        let endpoint = component
648            .create_endpoint("mock:assert-idx", &NoOpComponentContext)
649            .unwrap();
650        let inner = component.get_endpoint("assert-idx").unwrap();
651
652        let mut producer = endpoint.create_producer(&ctx).unwrap();
653        producer
654            .call(Exchange::new(Message::new("hello")))
655            .await
656            .unwrap();
657
658        inner
659            .await_exchanges(1, std::time::Duration::from_millis(500))
660            .await;
661        // Should not panic — index 0 exists.
662        let _assert = inner.exchange(0);
663    }
664
665    #[tokio::test(flavor = "multi_thread")]
666    #[should_panic(expected = "exchange index 5 out of bounds")]
667    async fn exchange_idx_out_of_bounds() {
668        let ctx = test_producer_ctx();
669        let component = MockComponent::new();
670        let endpoint = component
671            .create_endpoint("mock:oob", &NoOpComponentContext)
672            .unwrap();
673        let inner = component.get_endpoint("oob").unwrap();
674
675        let mut producer = endpoint.create_producer(&ctx).unwrap();
676        producer
677            .call(Exchange::new(Message::new("only-one")))
678            .await
679            .unwrap();
680
681        inner
682            .await_exchanges(1, std::time::Duration::from_millis(500))
683            .await;
684        // Only 1 exchange, index 5 should panic.
685        let _assert = inner.exchange(5);
686    }
687
688    #[tokio::test(flavor = "multi_thread")]
689    async fn assert_body_text_pass() {
690        let ctx = test_producer_ctx();
691        let component = MockComponent::new();
692        let endpoint = component
693            .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
694            .unwrap();
695        let inner = component.get_endpoint("body-text-pass").unwrap();
696        let mut producer = endpoint.create_producer(&ctx).unwrap();
697        producer
698            .call(Exchange::new(Message::new("hello")))
699            .await
700            .unwrap();
701        inner
702            .await_exchanges(1, std::time::Duration::from_millis(500))
703            .await;
704        inner.exchange(0).assert_body_text("hello");
705    }
706
707    #[tokio::test(flavor = "multi_thread")]
708    #[should_panic(expected = "expected body text")]
709    async fn assert_body_text_fail() {
710        let ctx = test_producer_ctx();
711        let component = MockComponent::new();
712        let endpoint = component
713            .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
714            .unwrap();
715        let inner = component.get_endpoint("body-text-fail").unwrap();
716        let mut producer = endpoint.create_producer(&ctx).unwrap();
717        producer
718            .call(Exchange::new(Message::new("hello")))
719            .await
720            .unwrap();
721        inner
722            .await_exchanges(1, std::time::Duration::from_millis(500))
723            .await;
724        inner.exchange(0).assert_body_text("world");
725    }
726
727    #[tokio::test(flavor = "multi_thread")]
728    async fn assert_body_json_pass() {
729        use camel_component_api::Body;
730        let ctx = test_producer_ctx();
731        let component = MockComponent::new();
732        let endpoint = component
733            .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
734            .unwrap();
735        let inner = component.get_endpoint("body-json-pass").unwrap();
736        let mut producer = endpoint.create_producer(&ctx).unwrap();
737        let mut msg = Message::new("");
738        msg.body = Body::Json(serde_json::json!({"key": "value"}));
739        producer.call(Exchange::new(msg)).await.unwrap();
740        inner
741            .await_exchanges(1, std::time::Duration::from_millis(500))
742            .await;
743        inner
744            .exchange(0)
745            .assert_body_json(serde_json::json!({"key": "value"}));
746    }
747
748    #[tokio::test(flavor = "multi_thread")]
749    #[should_panic(expected = "expected body JSON")]
750    async fn assert_body_json_fail() {
751        use camel_component_api::Body;
752        let ctx = test_producer_ctx();
753        let component = MockComponent::new();
754        let endpoint = component
755            .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
756            .unwrap();
757        let inner = component.get_endpoint("body-json-fail").unwrap();
758        let mut producer = endpoint.create_producer(&ctx).unwrap();
759        let mut msg = Message::new("");
760        msg.body = Body::Json(serde_json::json!({"key": "value"}));
761        producer.call(Exchange::new(msg)).await.unwrap();
762        inner
763            .await_exchanges(1, std::time::Duration::from_millis(500))
764            .await;
765        inner
766            .exchange(0)
767            .assert_body_json(serde_json::json!({"key": "other"}));
768    }
769
770    #[tokio::test(flavor = "multi_thread")]
771    async fn assert_body_bytes_pass() {
772        use bytes::Bytes;
773        use camel_component_api::Body;
774        let ctx = test_producer_ctx();
775        let component = MockComponent::new();
776        let endpoint = component
777            .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
778            .unwrap();
779        let inner = component.get_endpoint("body-bytes-pass").unwrap();
780        let mut producer = endpoint.create_producer(&ctx).unwrap();
781        let mut msg = Message::new("");
782        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
783        producer.call(Exchange::new(msg)).await.unwrap();
784        inner
785            .await_exchanges(1, std::time::Duration::from_millis(500))
786            .await;
787        inner.exchange(0).assert_body_bytes(b"binary");
788    }
789
790    #[tokio::test(flavor = "multi_thread")]
791    #[should_panic(expected = "expected body bytes")]
792    async fn assert_body_bytes_fail() {
793        use bytes::Bytes;
794        use camel_component_api::Body;
795        let ctx = test_producer_ctx();
796        let component = MockComponent::new();
797        let endpoint = component
798            .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
799            .unwrap();
800        let inner = component.get_endpoint("body-bytes-fail").unwrap();
801        let mut producer = endpoint.create_producer(&ctx).unwrap();
802        let mut msg = Message::new("");
803        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
804        producer.call(Exchange::new(msg)).await.unwrap();
805        inner
806            .await_exchanges(1, std::time::Duration::from_millis(500))
807            .await;
808        inner.exchange(0).assert_body_bytes(b"different");
809    }
810
811    #[tokio::test(flavor = "multi_thread")]
812    async fn assert_header_pass() {
813        let ctx = test_producer_ctx();
814        let component = MockComponent::new();
815        let endpoint = component
816            .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
817            .unwrap();
818        let inner = component.get_endpoint("hdr-pass").unwrap();
819        let mut producer = endpoint.create_producer(&ctx).unwrap();
820        let mut msg = Message::new("body");
821        msg.headers
822            .insert("x-key".to_string(), serde_json::json!("value"));
823        producer.call(Exchange::new(msg)).await.unwrap();
824        inner
825            .await_exchanges(1, std::time::Duration::from_millis(500))
826            .await;
827        inner
828            .exchange(0)
829            .assert_header("x-key", serde_json::json!("value"));
830    }
831
832    #[tokio::test(flavor = "multi_thread")]
833    #[should_panic(expected = "expected header")]
834    async fn assert_header_fail() {
835        let ctx = test_producer_ctx();
836        let component = MockComponent::new();
837        let endpoint = component
838            .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
839            .unwrap();
840        let inner = component.get_endpoint("hdr-fail").unwrap();
841        let mut producer = endpoint.create_producer(&ctx).unwrap();
842        let mut msg = Message::new("body");
843        msg.headers
844            .insert("x-key".to_string(), serde_json::json!("value"));
845        producer.call(Exchange::new(msg)).await.unwrap();
846        inner
847            .await_exchanges(1, std::time::Duration::from_millis(500))
848            .await;
849        inner
850            .exchange(0)
851            .assert_header("x-key", serde_json::json!("other"));
852    }
853
854    #[tokio::test(flavor = "multi_thread")]
855    async fn assert_header_exists_pass() {
856        let ctx = test_producer_ctx();
857        let component = MockComponent::new();
858        let endpoint = component
859            .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
860            .unwrap();
861        let inner = component.get_endpoint("hdr-exists-pass").unwrap();
862        let mut producer = endpoint.create_producer(&ctx).unwrap();
863        let mut msg = Message::new("body");
864        msg.headers
865            .insert("x-present".to_string(), serde_json::json!(42));
866        producer.call(Exchange::new(msg)).await.unwrap();
867        inner
868            .await_exchanges(1, std::time::Duration::from_millis(500))
869            .await;
870        inner.exchange(0).assert_header_exists("x-present");
871    }
872
873    #[tokio::test(flavor = "multi_thread")]
874    #[should_panic(expected = "expected header")]
875    async fn assert_header_exists_fail() {
876        let ctx = test_producer_ctx();
877        let component = MockComponent::new();
878        let endpoint = component
879            .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
880            .unwrap();
881        let inner = component.get_endpoint("hdr-exists-fail").unwrap();
882        let mut producer = endpoint.create_producer(&ctx).unwrap();
883        producer
884            .call(Exchange::new(Message::new("body")))
885            .await
886            .unwrap();
887        inner
888            .await_exchanges(1, std::time::Duration::from_millis(500))
889            .await;
890        inner.exchange(0).assert_header_exists("x-missing");
891    }
892
893    #[tokio::test(flavor = "multi_thread")]
894    async fn assert_has_error_pass() {
895        let ctx = test_producer_ctx();
896        let component = MockComponent::new();
897        let endpoint = component
898            .create_endpoint("mock:err-pass", &NoOpComponentContext)
899            .unwrap();
900        let inner = component.get_endpoint("err-pass").unwrap();
901        let mut producer = endpoint.create_producer(&ctx).unwrap();
902        let mut ex = Exchange::new(Message::new("body"));
903        ex.error = Some(camel_component_api::CamelError::ProcessorError(
904            "oops".to_string(),
905        ));
906        producer.call(ex).await.unwrap();
907        inner
908            .await_exchanges(1, std::time::Duration::from_millis(500))
909            .await;
910        inner.exchange(0).assert_has_error();
911    }
912
913    #[tokio::test(flavor = "multi_thread")]
914    #[should_panic(expected = "expected exchange to have an error")]
915    async fn assert_has_error_fail() {
916        let ctx = test_producer_ctx();
917        let component = MockComponent::new();
918        let endpoint = component
919            .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
920            .unwrap();
921        let inner = component.get_endpoint("has-err-fail").unwrap();
922        let mut producer = endpoint.create_producer(&ctx).unwrap();
923        producer
924            .call(Exchange::new(Message::new("body")))
925            .await
926            .unwrap();
927        inner
928            .await_exchanges(1, std::time::Duration::from_millis(500))
929            .await;
930        inner.exchange(0).assert_has_error();
931    }
932
933    #[tokio::test(flavor = "multi_thread")]
934    async fn assert_no_error_pass() {
935        let ctx = test_producer_ctx();
936        let component = MockComponent::new();
937        let endpoint = component
938            .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
939            .unwrap();
940        let inner = component.get_endpoint("no-err-pass").unwrap();
941        let mut producer = endpoint.create_producer(&ctx).unwrap();
942        producer
943            .call(Exchange::new(Message::new("body")))
944            .await
945            .unwrap();
946        inner
947            .await_exchanges(1, std::time::Duration::from_millis(500))
948            .await;
949        inner.exchange(0).assert_no_error();
950    }
951
952    #[tokio::test(flavor = "multi_thread")]
953    #[should_panic(expected = "expected exchange to have no error")]
954    async fn assert_no_error_fail() {
955        let ctx = test_producer_ctx();
956        let component = MockComponent::new();
957        let endpoint = component
958            .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
959            .unwrap();
960        let inner = component.get_endpoint("no-err-fail").unwrap();
961        let mut producer = endpoint.create_producer(&ctx).unwrap();
962        let mut ex = Exchange::new(Message::new("body"));
963        ex.error = Some(camel_component_api::CamelError::ProcessorError(
964            "oops".to_string(),
965        ));
966        producer.call(ex).await.unwrap();
967        inner
968            .await_exchanges(1, std::time::Duration::from_millis(500))
969            .await;
970        inner.exchange(0).assert_no_error();
971    }
972}