Skip to main content

camel_component_mock/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::{Mutex, Notify};
8use tower::Service;
9
10use camel_api::{BoxProcessor, CamelError, Exchange};
11use camel_component::{Component, Consumer, Endpoint, ProducerContext};
12use camel_endpoint::parse_uri;
13
14// ---------------------------------------------------------------------------
15// MockComponent
16// ---------------------------------------------------------------------------
17
18/// The Mock component is a testing utility that records every exchange it
19/// receives via its producer.  It exposes helpers to inspect and assert on
20/// the recorded exchanges.
21///
22/// URI format: `mock:name`
23///
24/// When `create_endpoint` is called multiple times with the same name, the
25/// returned endpoints share the same received-exchanges storage. This enables
26/// test assertions: create mock, register it, run routes, then inspect via
27/// `component.get_endpoint("name")`.
28#[derive(Clone)]
29pub struct MockComponent {
30    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34    pub fn new() -> Self {
35        Self {
36            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37        }
38    }
39
40    /// Retrieve a previously created endpoint's inner data by name.
41    ///
42    /// This is the primary way to inspect recorded exchanges in tests.
43    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44        let registry = self
45            .registry
46            .lock()
47            .expect("mutex poisoned: another thread panicked while holding this lock");
48        registry.get(name).cloned()
49    }
50}
51
52impl Default for MockComponent {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl Component for MockComponent {
59    fn scheme(&self) -> &str {
60        "mock"
61    }
62
63    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
64        let parts = parse_uri(uri)?;
65        if parts.scheme != "mock" {
66            return Err(CamelError::InvalidUri(format!(
67                "expected scheme 'mock', got '{}'",
68                parts.scheme
69            )));
70        }
71
72        let name = parts.path;
73        let mut registry = self.registry.lock().map_err(|e| {
74            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
75        })?;
76        let inner = registry
77            .entry(name.clone())
78            .or_insert_with(|| {
79                Arc::new(MockEndpointInner {
80                    uri: uri.to_string(),
81                    name,
82                    received: Arc::new(Mutex::new(Vec::new())),
83                    notify: Arc::new(Notify::new()),
84                })
85            })
86            .clone();
87
88        Ok(Box::new(MockEndpoint(inner)))
89    }
90}
91
92// ---------------------------------------------------------------------------
93// MockEndpoint / MockEndpointInner
94// ---------------------------------------------------------------------------
95
96/// A mock endpoint that records all exchanges sent to it.
97///
98/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
99/// `MockEndpoint` instances created with the same name share the same inner
100/// storage.
101pub struct MockEndpoint(Arc<MockEndpointInner>);
102
103/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
104/// instances created with the same name via `MockComponent`.
105///
106/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
107/// recorded exchanges in tests.
108pub struct MockEndpointInner {
109    uri: String,
110    pub name: String,
111    received: Arc<Mutex<Vec<Exchange>>>,
112    notify: Arc<Notify>,
113}
114
115impl MockEndpointInner {
116    /// Return a snapshot of all exchanges received so far.
117    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
118        self.received.lock().await.clone()
119    }
120
121    /// Assert that exactly `expected` exchanges have been received.
122    ///
123    /// # Panics
124    ///
125    /// Panics if the count does not match.
126    pub async fn assert_exchange_count(&self, expected: usize) {
127        let actual = self.received.lock().await.len();
128        assert_eq!(
129            actual, expected,
130            "MockEndpoint expected {expected} exchanges, got {actual}"
131        );
132    }
133
134    /// Wait until at least `count` exchanges have been received, or panic on timeout.
135    ///
136    /// Uses `tokio::sync::Notify` — no polling. Returns immediately if `count`
137    /// exchanges are already present.
138    ///
139    /// # Panics
140    ///
141    /// Panics if `timeout` elapses before `count` exchanges arrive.
142    pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
143        let deadline = tokio::time::Instant::now() + timeout;
144        loop {
145            {
146                let received = self.received.lock().await;
147                if received.len() >= count {
148                    return;
149                }
150            }
151            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
152            if remaining.is_zero() {
153                // Re-check in case the final exchange arrived between the lock drop
154                // above and entering the select — Notify does not buffer permits.
155                let got = self.received.lock().await.len();
156                if got >= count {
157                    return;
158                }
159                panic!(
160                    "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
161                    self.name, count, got, timeout
162                );
163            }
164            tokio::select! {
165                _ = self.notify.notified() => {}
166                _ = tokio::time::sleep(remaining) => {}
167            }
168        }
169    }
170
171    /// Return an [`ExchangeAssert`] for the exchange at `idx`.
172    ///
173    /// # Panics
174    ///
175    /// Panics if `idx` is out of bounds. Always call [`await_exchanges`] first
176    /// to ensure the exchange has been received.
177    ///
178    /// Panics if called from a single-threaded tokio runtime. Use
179    /// `#[tokio::test(flavor = "multi_thread")]` for tests that call this method.
180    ///
181    /// [`await_exchanges`]: MockEndpointInner::await_exchanges
182    pub fn exchange(&self, idx: usize) -> ExchangeAssert {
183        let received = tokio::task::block_in_place(|| self.received.blocking_lock());
184        if idx >= received.len() {
185            panic!(
186                "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
187                self.name,
188                idx,
189                received.len()
190            );
191        }
192        ExchangeAssert {
193            exchange: received[idx].clone(),
194            idx,
195            endpoint_name: self.name.clone(),
196        }
197    }
198}
199
200impl Endpoint for MockEndpoint {
201    fn uri(&self) -> &str {
202        &self.0.uri
203    }
204
205    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
206        Err(CamelError::EndpointCreationFailed(
207            "mock endpoint does not support consumers (it is a sink)".to_string(),
208        ))
209    }
210
211    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
212        Ok(BoxProcessor::new(MockProducer {
213            received: Arc::clone(&self.0.received),
214            notify: Arc::clone(&self.0.notify),
215        }))
216    }
217}
218
219// ---------------------------------------------------------------------------
220// MockProducer
221// ---------------------------------------------------------------------------
222
223/// A producer that simply records each exchange it processes.
224#[derive(Clone)]
225struct MockProducer {
226    received: Arc<Mutex<Vec<Exchange>>>,
227    notify: Arc<Notify>,
228}
229
230impl Service<Exchange> for MockProducer {
231    type Response = Exchange;
232    type Error = CamelError;
233    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
234
235    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236        Poll::Ready(Ok(()))
237    }
238
239    fn call(&mut self, exchange: Exchange) -> Self::Future {
240        let received = Arc::clone(&self.received);
241        let notify = Arc::clone(&self.notify);
242        Box::pin(async move {
243            received.lock().await.push(exchange.clone());
244            notify.notify_waiters();
245            Ok(exchange)
246        })
247    }
248}
249
250// ---------------------------------------------------------------------------
251// ExchangeAssert
252// ---------------------------------------------------------------------------
253
254/// A handle for making synchronous assertions on a recorded exchange.
255///
256/// Obtain one via [`MockEndpointInner::exchange`] after calling
257/// [`MockEndpointInner::await_exchanges`].
258///
259/// All methods panic with descriptive messages on failure, making test output
260/// self-explanatory without additional context.
261pub struct ExchangeAssert {
262    exchange: Exchange,
263    idx: usize,
264    endpoint_name: String,
265}
266
267impl ExchangeAssert {
268    fn location(&self) -> String {
269        format!(
270            "MockEndpoint '{}' exchange[{}]",
271            self.endpoint_name, self.idx
272        )
273    }
274
275    /// Assert that the body is `Body::Text` equal to `expected`.
276    pub fn assert_body_text(self, expected: &str) -> Self {
277        match self.exchange.input.body.as_text() {
278            Some(actual) if actual == expected => {}
279            Some(actual) => panic!(
280                "{}: expected body text {:?}, got {:?}",
281                self.location(),
282                expected,
283                actual
284            ),
285            None => panic!(
286                "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
287                self.location(),
288                expected,
289                self.exchange.input.body
290            ),
291        }
292        self
293    }
294
295    /// Assert that the body is `Body::Json` equal to `expected`.
296    pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
297        match &self.exchange.input.body {
298            camel_api::Body::Json(actual) if *actual == expected => {}
299            camel_api::Body::Json(actual) => panic!(
300                "{}: expected body JSON {}, got {}",
301                self.location(),
302                expected,
303                actual
304            ),
305            other => panic!(
306                "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
307                self.location(),
308                expected,
309                other
310            ),
311        }
312        self
313    }
314
315    /// Assert that the body is `Body::Bytes` equal to `expected`.
316    pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
317        match &self.exchange.input.body {
318            camel_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
319            camel_api::Body::Bytes(actual) => panic!(
320                "{}: expected body bytes {:?}, got {:?}",
321                self.location(),
322                expected,
323                actual
324            ),
325            other => panic!(
326                "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
327                self.location(),
328                expected,
329                other
330            ),
331        }
332        self
333    }
334
335    /// Assert that header `key` exists and equals `expected`.
336    ///
337    /// # Panics
338    ///
339    /// Panics if the header is missing or its value does not match `expected`.
340    pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
341        match self.exchange.input.headers.get(key) {
342            Some(actual) if *actual == expected => {}
343            Some(actual) => panic!(
344                "{}: expected header {:?} = {}, got {}",
345                self.location(),
346                key,
347                expected,
348                actual
349            ),
350            None => panic!(
351                "{}: expected header {:?} = {}, but header is absent",
352                self.location(),
353                key,
354                expected
355            ),
356        }
357        self
358    }
359
360    /// Assert that header `key` is present (any value).
361    ///
362    /// # Panics
363    ///
364    /// Panics if the header key is absent.
365    pub fn assert_header_exists(self, key: &str) -> Self {
366        if !self.exchange.input.headers.contains_key(key) {
367            panic!(
368                "{}: expected header {:?} to be present, but it was absent",
369                self.location(),
370                key
371            );
372        }
373        self
374    }
375
376    /// Assert that the exchange has an error (`exchange.error` is `Some`).
377    ///
378    /// # Panics
379    ///
380    /// Panics if `exchange.error` is `None`.
381    pub fn assert_has_error(self) -> Self {
382        if self.exchange.error.is_none() {
383            panic!(
384                "{}: expected exchange to have an error, but error is None",
385                self.location()
386            );
387        }
388        self
389    }
390
391    /// Assert that the exchange has no error (`exchange.error` is `None`).
392    ///
393    /// # Panics
394    ///
395    /// Panics if `exchange.error` is `Some`.
396    pub fn assert_no_error(self) -> Self {
397        if let Some(ref err) = self.exchange.error {
398            panic!(
399                "{}: expected exchange to have no error, but got: {}",
400                self.location(),
401                err
402            );
403        }
404        self
405    }
406}
407
408// ---------------------------------------------------------------------------
409// Tests
410// ---------------------------------------------------------------------------
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use camel_api::Message;
416    use tower::ServiceExt;
417
418    fn test_producer_ctx() -> ProducerContext {
419        ProducerContext::new()
420    }
421
422    #[test]
423    fn test_mock_component_scheme() {
424        let component = MockComponent::new();
425        assert_eq!(component.scheme(), "mock");
426    }
427
428    #[test]
429    fn test_mock_creates_endpoint() {
430        let component = MockComponent::new();
431        let endpoint = component.create_endpoint("mock:result");
432        assert!(endpoint.is_ok());
433    }
434
435    #[test]
436    fn test_mock_wrong_scheme() {
437        let component = MockComponent::new();
438        let result = component.create_endpoint("timer:tick");
439        assert!(result.is_err());
440    }
441
442    #[test]
443    fn test_mock_endpoint_no_consumer() {
444        let component = MockComponent::new();
445        let endpoint = component.create_endpoint("mock:result").unwrap();
446        assert!(endpoint.create_consumer().is_err());
447    }
448
449    #[test]
450    fn test_mock_endpoint_creates_producer() {
451        let ctx = test_producer_ctx();
452        let component = MockComponent::new();
453        let endpoint = component.create_endpoint("mock:result").unwrap();
454        assert!(endpoint.create_producer(&ctx).is_ok());
455    }
456
457    #[tokio::test]
458    async fn test_mock_producer_records_exchange() {
459        let ctx = test_producer_ctx();
460        let component = MockComponent::new();
461        let endpoint = component.create_endpoint("mock:test").unwrap();
462
463        let mut producer = endpoint.create_producer(&ctx).unwrap();
464
465        let ex1 = Exchange::new(Message::new("first"));
466        let ex2 = Exchange::new(Message::new("second"));
467
468        producer.call(ex1).await.unwrap();
469        producer.call(ex2).await.unwrap();
470
471        let inner = component.get_endpoint("test").unwrap();
472        inner.assert_exchange_count(2).await;
473
474        let received = inner.get_received_exchanges().await;
475        assert_eq!(received[0].input.body.as_text(), Some("first"));
476        assert_eq!(received[1].input.body.as_text(), Some("second"));
477    }
478
479    #[tokio::test]
480    async fn test_mock_producer_passes_through_exchange() {
481        let ctx = test_producer_ctx();
482        let component = MockComponent::new();
483        let endpoint = component.create_endpoint("mock:passthrough").unwrap();
484
485        let producer = endpoint.create_producer(&ctx).unwrap();
486        let exchange = Exchange::new(Message::new("hello"));
487        let result = producer.oneshot(exchange).await.unwrap();
488
489        // Producer should return the exchange unchanged
490        assert_eq!(result.input.body.as_text(), Some("hello"));
491    }
492
493    #[tokio::test]
494    async fn test_mock_assert_count_passes() {
495        let component = MockComponent::new();
496        let endpoint = component.create_endpoint("mock:count").unwrap();
497        let inner = component.get_endpoint("count").unwrap();
498
499        inner.assert_exchange_count(0).await;
500
501        let ctx = test_producer_ctx();
502        let mut producer = endpoint.create_producer(&ctx).unwrap();
503        producer
504            .call(Exchange::new(Message::new("one")))
505            .await
506            .unwrap();
507
508        inner.assert_exchange_count(1).await;
509    }
510
511    #[tokio::test]
512    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
513    async fn test_mock_assert_count_fails() {
514        let component = MockComponent::new();
515        // Endpoint not created yet, so get_endpoint returns None.
516        // Create it first, then assert.
517        let _endpoint = component.create_endpoint("mock:fail").unwrap();
518        let inner = component.get_endpoint("fail").unwrap();
519
520        inner.assert_exchange_count(5).await;
521    }
522
523    #[tokio::test]
524    async fn test_mock_component_shared_registry() {
525        let component = MockComponent::new();
526        let ep1 = component.create_endpoint("mock:shared").unwrap();
527        let ep2 = component.create_endpoint("mock:shared").unwrap();
528
529        // Producing via ep1's producer...
530        let ctx = test_producer_ctx();
531        let mut p1 = ep1.create_producer(&ctx).unwrap();
532        p1.call(Exchange::new(Message::new("from-ep1")))
533            .await
534            .unwrap();
535
536        // ...and via ep2's producer...
537        let mut p2 = ep2.create_producer(&ctx).unwrap();
538        p2.call(Exchange::new(Message::new("from-ep2")))
539            .await
540            .unwrap();
541
542        // ...both should be visible via the shared storage
543        let inner = component.get_endpoint("shared").unwrap();
544        inner.assert_exchange_count(2).await;
545
546        let received = inner.get_received_exchanges().await;
547        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
548        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
549    }
550
551    #[tokio::test]
552    async fn await_exchanges_resolves_immediately() {
553        // If exchanges are already present, await_exchanges returns without timeout.
554        let ctx = test_producer_ctx();
555        let component = MockComponent::new();
556        let endpoint = component.create_endpoint("mock:immediate").unwrap();
557        let inner = component.get_endpoint("immediate").unwrap();
558
559        let mut producer = endpoint.create_producer(&ctx).unwrap();
560        producer
561            .call(Exchange::new(Message::new("a")))
562            .await
563            .unwrap();
564        producer
565            .call(Exchange::new(Message::new("b")))
566            .await
567            .unwrap();
568
569        // Should return immediately — both exchanges already received.
570        inner
571            .await_exchanges(2, std::time::Duration::from_millis(100))
572            .await;
573    }
574
575    #[tokio::test]
576    async fn await_exchanges_waits_then_resolves() {
577        // await_exchanges unblocks when a producer sends after the call.
578        let ctx = test_producer_ctx();
579        let component = MockComponent::new();
580        let endpoint = component.create_endpoint("mock:waiter").unwrap();
581        let inner = component.get_endpoint("waiter").unwrap();
582
583        // Spawn producer that sends after a short delay.
584        let mut producer = endpoint.create_producer(&ctx).unwrap();
585        tokio::spawn(async move {
586            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
587            producer
588                .call(Exchange::new(Message::new("delayed")))
589                .await
590                .unwrap();
591        });
592
593        // This should block until the spawned task delivers the exchange.
594        inner
595            .await_exchanges(1, std::time::Duration::from_millis(500))
596            .await;
597
598        let received = inner.get_received_exchanges().await;
599        assert_eq!(received.len(), 1);
600        assert_eq!(received[0].input.body.as_text(), Some("delayed"));
601    }
602
603    #[tokio::test]
604    #[should_panic(expected = "timed out waiting for 5 exchanges")]
605    async fn await_exchanges_times_out() {
606        let component = MockComponent::new();
607        let _endpoint = component.create_endpoint("mock:timeout").unwrap();
608        let inner = component.get_endpoint("timeout").unwrap();
609
610        // Nobody sends — should panic after timeout.
611        inner
612            .await_exchanges(5, std::time::Duration::from_millis(50))
613            .await;
614    }
615
616    #[tokio::test(flavor = "multi_thread")]
617    async fn exchange_idx_returns_assert() {
618        let ctx = test_producer_ctx();
619        let component = MockComponent::new();
620        let endpoint = component.create_endpoint("mock:assert-idx").unwrap();
621        let inner = component.get_endpoint("assert-idx").unwrap();
622
623        let mut producer = endpoint.create_producer(&ctx).unwrap();
624        producer
625            .call(Exchange::new(Message::new("hello")))
626            .await
627            .unwrap();
628
629        inner
630            .await_exchanges(1, std::time::Duration::from_millis(500))
631            .await;
632        // Should not panic — index 0 exists.
633        let _assert = inner.exchange(0);
634    }
635
636    #[tokio::test(flavor = "multi_thread")]
637    #[should_panic(expected = "exchange index 5 out of bounds")]
638    async fn exchange_idx_out_of_bounds() {
639        let ctx = test_producer_ctx();
640        let component = MockComponent::new();
641        let endpoint = component.create_endpoint("mock:oob").unwrap();
642        let inner = component.get_endpoint("oob").unwrap();
643
644        let mut producer = endpoint.create_producer(&ctx).unwrap();
645        producer
646            .call(Exchange::new(Message::new("only-one")))
647            .await
648            .unwrap();
649
650        inner
651            .await_exchanges(1, std::time::Duration::from_millis(500))
652            .await;
653        // Only 1 exchange, index 5 should panic.
654        let _assert = inner.exchange(5);
655    }
656
657    #[tokio::test(flavor = "multi_thread")]
658    async fn assert_body_text_pass() {
659        let ctx = test_producer_ctx();
660        let component = MockComponent::new();
661        let endpoint = component.create_endpoint("mock:body-text-pass").unwrap();
662        let inner = component.get_endpoint("body-text-pass").unwrap();
663        let mut producer = endpoint.create_producer(&ctx).unwrap();
664        producer
665            .call(Exchange::new(Message::new("hello")))
666            .await
667            .unwrap();
668        inner
669            .await_exchanges(1, std::time::Duration::from_millis(500))
670            .await;
671        inner.exchange(0).assert_body_text("hello");
672    }
673
674    #[tokio::test(flavor = "multi_thread")]
675    #[should_panic(expected = "expected body text")]
676    async fn assert_body_text_fail() {
677        let ctx = test_producer_ctx();
678        let component = MockComponent::new();
679        let endpoint = component.create_endpoint("mock:body-text-fail").unwrap();
680        let inner = component.get_endpoint("body-text-fail").unwrap();
681        let mut producer = endpoint.create_producer(&ctx).unwrap();
682        producer
683            .call(Exchange::new(Message::new("hello")))
684            .await
685            .unwrap();
686        inner
687            .await_exchanges(1, std::time::Duration::from_millis(500))
688            .await;
689        inner.exchange(0).assert_body_text("world");
690    }
691
692    #[tokio::test(flavor = "multi_thread")]
693    async fn assert_body_json_pass() {
694        use camel_api::Body;
695        let ctx = test_producer_ctx();
696        let component = MockComponent::new();
697        let endpoint = component.create_endpoint("mock:body-json-pass").unwrap();
698        let inner = component.get_endpoint("body-json-pass").unwrap();
699        let mut producer = endpoint.create_producer(&ctx).unwrap();
700        let mut msg = Message::new("");
701        msg.body = Body::Json(serde_json::json!({"key": "value"}));
702        producer.call(Exchange::new(msg)).await.unwrap();
703        inner
704            .await_exchanges(1, std::time::Duration::from_millis(500))
705            .await;
706        inner
707            .exchange(0)
708            .assert_body_json(serde_json::json!({"key": "value"}));
709    }
710
711    #[tokio::test(flavor = "multi_thread")]
712    #[should_panic(expected = "expected body JSON")]
713    async fn assert_body_json_fail() {
714        use camel_api::Body;
715        let ctx = test_producer_ctx();
716        let component = MockComponent::new();
717        let endpoint = component.create_endpoint("mock:body-json-fail").unwrap();
718        let inner = component.get_endpoint("body-json-fail").unwrap();
719        let mut producer = endpoint.create_producer(&ctx).unwrap();
720        let mut msg = Message::new("");
721        msg.body = Body::Json(serde_json::json!({"key": "value"}));
722        producer.call(Exchange::new(msg)).await.unwrap();
723        inner
724            .await_exchanges(1, std::time::Duration::from_millis(500))
725            .await;
726        inner
727            .exchange(0)
728            .assert_body_json(serde_json::json!({"key": "other"}));
729    }
730
731    #[tokio::test(flavor = "multi_thread")]
732    async fn assert_body_bytes_pass() {
733        use bytes::Bytes;
734        use camel_api::Body;
735        let ctx = test_producer_ctx();
736        let component = MockComponent::new();
737        let endpoint = component.create_endpoint("mock:body-bytes-pass").unwrap();
738        let inner = component.get_endpoint("body-bytes-pass").unwrap();
739        let mut producer = endpoint.create_producer(&ctx).unwrap();
740        let mut msg = Message::new("");
741        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
742        producer.call(Exchange::new(msg)).await.unwrap();
743        inner
744            .await_exchanges(1, std::time::Duration::from_millis(500))
745            .await;
746        inner.exchange(0).assert_body_bytes(b"binary");
747    }
748
749    #[tokio::test(flavor = "multi_thread")]
750    #[should_panic(expected = "expected body bytes")]
751    async fn assert_body_bytes_fail() {
752        use bytes::Bytes;
753        use camel_api::Body;
754        let ctx = test_producer_ctx();
755        let component = MockComponent::new();
756        let endpoint = component.create_endpoint("mock:body-bytes-fail").unwrap();
757        let inner = component.get_endpoint("body-bytes-fail").unwrap();
758        let mut producer = endpoint.create_producer(&ctx).unwrap();
759        let mut msg = Message::new("");
760        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
761        producer.call(Exchange::new(msg)).await.unwrap();
762        inner
763            .await_exchanges(1, std::time::Duration::from_millis(500))
764            .await;
765        inner.exchange(0).assert_body_bytes(b"different");
766    }
767
768    #[tokio::test(flavor = "multi_thread")]
769    async fn assert_header_pass() {
770        let ctx = test_producer_ctx();
771        let component = MockComponent::new();
772        let endpoint = component.create_endpoint("mock:hdr-pass").unwrap();
773        let inner = component.get_endpoint("hdr-pass").unwrap();
774        let mut producer = endpoint.create_producer(&ctx).unwrap();
775        let mut msg = Message::new("body");
776        msg.headers
777            .insert("x-key".to_string(), serde_json::json!("value"));
778        producer.call(Exchange::new(msg)).await.unwrap();
779        inner
780            .await_exchanges(1, std::time::Duration::from_millis(500))
781            .await;
782        inner
783            .exchange(0)
784            .assert_header("x-key", serde_json::json!("value"));
785    }
786
787    #[tokio::test(flavor = "multi_thread")]
788    #[should_panic(expected = "expected header")]
789    async fn assert_header_fail() {
790        let ctx = test_producer_ctx();
791        let component = MockComponent::new();
792        let endpoint = component.create_endpoint("mock:hdr-fail").unwrap();
793        let inner = component.get_endpoint("hdr-fail").unwrap();
794        let mut producer = endpoint.create_producer(&ctx).unwrap();
795        let mut msg = Message::new("body");
796        msg.headers
797            .insert("x-key".to_string(), serde_json::json!("value"));
798        producer.call(Exchange::new(msg)).await.unwrap();
799        inner
800            .await_exchanges(1, std::time::Duration::from_millis(500))
801            .await;
802        inner
803            .exchange(0)
804            .assert_header("x-key", serde_json::json!("other"));
805    }
806
807    #[tokio::test(flavor = "multi_thread")]
808    async fn assert_header_exists_pass() {
809        let ctx = test_producer_ctx();
810        let component = MockComponent::new();
811        let endpoint = component.create_endpoint("mock:hdr-exists-pass").unwrap();
812        let inner = component.get_endpoint("hdr-exists-pass").unwrap();
813        let mut producer = endpoint.create_producer(&ctx).unwrap();
814        let mut msg = Message::new("body");
815        msg.headers
816            .insert("x-present".to_string(), serde_json::json!(42));
817        producer.call(Exchange::new(msg)).await.unwrap();
818        inner
819            .await_exchanges(1, std::time::Duration::from_millis(500))
820            .await;
821        inner.exchange(0).assert_header_exists("x-present");
822    }
823
824    #[tokio::test(flavor = "multi_thread")]
825    #[should_panic(expected = "expected header")]
826    async fn assert_header_exists_fail() {
827        let ctx = test_producer_ctx();
828        let component = MockComponent::new();
829        let endpoint = component.create_endpoint("mock:hdr-exists-fail").unwrap();
830        let inner = component.get_endpoint("hdr-exists-fail").unwrap();
831        let mut producer = endpoint.create_producer(&ctx).unwrap();
832        producer
833            .call(Exchange::new(Message::new("body")))
834            .await
835            .unwrap();
836        inner
837            .await_exchanges(1, std::time::Duration::from_millis(500))
838            .await;
839        inner.exchange(0).assert_header_exists("x-missing");
840    }
841
842    #[tokio::test(flavor = "multi_thread")]
843    async fn assert_has_error_pass() {
844        let ctx = test_producer_ctx();
845        let component = MockComponent::new();
846        let endpoint = component.create_endpoint("mock:err-pass").unwrap();
847        let inner = component.get_endpoint("err-pass").unwrap();
848        let mut producer = endpoint.create_producer(&ctx).unwrap();
849        let mut ex = Exchange::new(Message::new("body"));
850        ex.error = Some(camel_api::CamelError::ProcessorError("oops".to_string()));
851        producer.call(ex).await.unwrap();
852        inner
853            .await_exchanges(1, std::time::Duration::from_millis(500))
854            .await;
855        inner.exchange(0).assert_has_error();
856    }
857
858    #[tokio::test(flavor = "multi_thread")]
859    #[should_panic(expected = "expected exchange to have an error")]
860    async fn assert_has_error_fail() {
861        let ctx = test_producer_ctx();
862        let component = MockComponent::new();
863        let endpoint = component.create_endpoint("mock:has-err-fail").unwrap();
864        let inner = component.get_endpoint("has-err-fail").unwrap();
865        let mut producer = endpoint.create_producer(&ctx).unwrap();
866        producer
867            .call(Exchange::new(Message::new("body")))
868            .await
869            .unwrap();
870        inner
871            .await_exchanges(1, std::time::Duration::from_millis(500))
872            .await;
873        inner.exchange(0).assert_has_error();
874    }
875
876    #[tokio::test(flavor = "multi_thread")]
877    async fn assert_no_error_pass() {
878        let ctx = test_producer_ctx();
879        let component = MockComponent::new();
880        let endpoint = component.create_endpoint("mock:no-err-pass").unwrap();
881        let inner = component.get_endpoint("no-err-pass").unwrap();
882        let mut producer = endpoint.create_producer(&ctx).unwrap();
883        producer
884            .call(Exchange::new(Message::new("body")))
885            .await
886            .unwrap();
887        inner
888            .await_exchanges(1, std::time::Duration::from_millis(500))
889            .await;
890        inner.exchange(0).assert_no_error();
891    }
892
893    #[tokio::test(flavor = "multi_thread")]
894    #[should_panic(expected = "expected exchange to have no error")]
895    async fn assert_no_error_fail() {
896        let ctx = test_producer_ctx();
897        let component = MockComponent::new();
898        let endpoint = component.create_endpoint("mock:no-err-fail").unwrap();
899        let inner = component.get_endpoint("no-err-fail").unwrap();
900        let mut producer = endpoint.create_producer(&ctx).unwrap();
901        let mut ex = Exchange::new(Message::new("body"));
902        ex.error = Some(camel_api::CamelError::ProcessorError("oops".to_string()));
903        producer.call(ex).await.unwrap();
904        inner
905            .await_exchanges(1, std::time::Duration::from_millis(500))
906            .await;
907        inner.exchange(0).assert_no_error();
908    }
909}