Skip to main content

camel_component_mock/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::{Mutex, Notify};
8use tower::Service;
9
10use camel_component_api::parse_uri;
11use camel_component_api::{BoxProcessor, CamelError, Exchange};
12use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
13
14// ---------------------------------------------------------------------------
15// MockComponent
16// ---------------------------------------------------------------------------
17
18/// The Mock component is a testing utility that records every exchange it
19/// receives via its producer.  It exposes helpers to inspect and assert on
20/// the recorded exchanges.
21///
22/// URI format: `mock:name`
23///
24/// When `create_endpoint` is called multiple times with the same name, the
25/// returned endpoints share the same received-exchanges storage. This enables
26/// test assertions: create mock, register it, run routes, then inspect via
27/// `component.get_endpoint("name")`.
28#[derive(Clone)]
29pub struct MockComponent {
30    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34    pub fn new() -> Self {
35        Self {
36            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37        }
38    }
39
40    /// Retrieve a previously created endpoint's inner data by name.
41    ///
42    /// This is the primary way to inspect recorded exchanges in tests.
43    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44        let registry = self
45            .registry
46            .lock()
47            .expect("mutex poisoned: another thread panicked while holding this lock");
48        registry.get(name).cloned()
49    }
50}
51
52impl Default for MockComponent {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl Component for MockComponent {
59    fn scheme(&self) -> &str {
60        "mock"
61    }
62
63    fn create_endpoint(
64        &self,
65        uri: &str,
66        _ctx: &dyn camel_component_api::ComponentContext,
67    ) -> Result<Box<dyn Endpoint>, CamelError> {
68        let parts = parse_uri(uri)?;
69        if parts.scheme != "mock" {
70            return Err(CamelError::InvalidUri(format!(
71                "expected scheme 'mock', got '{}'",
72                parts.scheme
73            )));
74        }
75
76        let name = parts.path;
77        let mut registry = self.registry.lock().map_err(|e| {
78            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
79        })?;
80        let inner = registry
81            .entry(name.clone())
82            .or_insert_with(|| {
83                Arc::new(MockEndpointInner {
84                    uri: uri.to_string(),
85                    name,
86                    received: Arc::new(Mutex::new(Vec::new())),
87                    notify: Arc::new(Notify::new()),
88                })
89            })
90            .clone();
91
92        Ok(Box::new(MockEndpoint(inner)))
93    }
94}
95
96// ---------------------------------------------------------------------------
97// MockEndpoint / MockEndpointInner
98// ---------------------------------------------------------------------------
99
100/// A mock endpoint that records all exchanges sent to it.
101///
102/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
103/// `MockEndpoint` instances created with the same name share the same inner
104/// storage.
105pub struct MockEndpoint(Arc<MockEndpointInner>);
106
107/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
108/// instances created with the same name via `MockComponent`.
109///
110/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
111/// recorded exchanges in tests.
112pub struct MockEndpointInner {
113    uri: String,
114    pub name: String,
115    received: Arc<Mutex<Vec<Exchange>>>,
116    notify: Arc<Notify>,
117}
118
119impl MockEndpointInner {
120    /// Return a snapshot of all exchanges received so far.
121    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
122        self.received.lock().await.clone()
123    }
124
125    /// Assert that exactly `expected` exchanges have been received.
126    ///
127    /// # Panics
128    ///
129    /// Panics if the count does not match.
130    pub async fn assert_exchange_count(&self, expected: usize) {
131        let actual = self.received.lock().await.len();
132        assert_eq!(
133            actual, expected,
134            "MockEndpoint expected {expected} exchanges, got {actual}"
135        );
136    }
137
138    /// Wait until at least `count` exchanges have been received, or panic on timeout.
139    ///
140    /// Uses `tokio::sync::Notify` — no polling. Returns immediately if `count`
141    /// exchanges are already present.
142    ///
143    /// # Panics
144    ///
145    /// Panics if `timeout` elapses before `count` exchanges arrive.
146    pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
147        let deadline = tokio::time::Instant::now() + timeout;
148        loop {
149            {
150                let received = self.received.lock().await;
151                if received.len() >= count {
152                    return;
153                }
154            }
155            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
156            if remaining.is_zero() {
157                // Re-check in case the final exchange arrived between the lock drop
158                // above and entering the select — Notify does not buffer permits.
159                let got = self.received.lock().await.len();
160                if got >= count {
161                    return;
162                }
163                panic!(
164                    "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
165                    self.name, count, got, timeout
166                );
167            }
168            tokio::select! {
169                _ = self.notify.notified() => {}
170                _ = tokio::time::sleep(remaining) => {}
171            }
172        }
173    }
174
175    /// Return an [`ExchangeAssert`] for the exchange at `idx`.
176    ///
177    /// # Panics
178    ///
179    /// Panics if `idx` is out of bounds. Always call [`await_exchanges`] first
180    /// to ensure the exchange has been received.
181    ///
182    /// Panics if called from a single-threaded tokio runtime. Use
183    /// `#[tokio::test(flavor = "multi_thread")]` for tests that call this method.
184    ///
185    /// [`await_exchanges`]: MockEndpointInner::await_exchanges
186    pub fn exchange(&self, idx: usize) -> ExchangeAssert {
187        let received = tokio::task::block_in_place(|| self.received.blocking_lock());
188        if idx >= received.len() {
189            panic!(
190                "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
191                self.name,
192                idx,
193                received.len()
194            );
195        }
196        ExchangeAssert {
197            exchange: received[idx].clone(),
198            idx,
199            endpoint_name: self.name.clone(),
200        }
201    }
202}
203
204impl Endpoint for MockEndpoint {
205    fn uri(&self) -> &str {
206        &self.0.uri
207    }
208
209    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
210        Err(CamelError::EndpointCreationFailed(
211            "mock endpoint does not support consumers (it is a sink)".to_string(),
212        ))
213    }
214
215    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
216        Ok(BoxProcessor::new(MockProducer {
217            received: Arc::clone(&self.0.received),
218            notify: Arc::clone(&self.0.notify),
219        }))
220    }
221}
222
223// ---------------------------------------------------------------------------
224// MockProducer
225// ---------------------------------------------------------------------------
226
227/// A producer that simply records each exchange it processes.
228#[derive(Clone)]
229struct MockProducer {
230    received: Arc<Mutex<Vec<Exchange>>>,
231    notify: Arc<Notify>,
232}
233
234impl Service<Exchange> for MockProducer {
235    type Response = Exchange;
236    type Error = CamelError;
237    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
238
239    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240        Poll::Ready(Ok(()))
241    }
242
243    fn call(&mut self, exchange: Exchange) -> Self::Future {
244        let received = Arc::clone(&self.received);
245        let notify = Arc::clone(&self.notify);
246        Box::pin(async move {
247            received.lock().await.push(exchange.clone());
248            notify.notify_waiters();
249            Ok(exchange)
250        })
251    }
252}
253
254// ---------------------------------------------------------------------------
255// ExchangeAssert
256// ---------------------------------------------------------------------------
257
258/// A handle for making synchronous assertions on a recorded exchange.
259///
260/// Obtain one via [`MockEndpointInner::exchange`] after calling
261/// [`MockEndpointInner::await_exchanges`].
262///
263/// All methods panic with descriptive messages on failure, making test output
264/// self-explanatory without additional context.
265pub struct ExchangeAssert {
266    exchange: Exchange,
267    idx: usize,
268    endpoint_name: String,
269}
270
271impl ExchangeAssert {
272    fn location(&self) -> String {
273        format!(
274            "MockEndpoint '{}' exchange[{}]",
275            self.endpoint_name, self.idx
276        )
277    }
278
279    /// Assert that the body is `Body::Text` equal to `expected`.
280    pub fn assert_body_text(self, expected: &str) -> Self {
281        match self.exchange.input.body.as_text() {
282            Some(actual) if actual == expected => {}
283            Some(actual) => panic!(
284                "{}: expected body text {:?}, got {:?}",
285                self.location(),
286                expected,
287                actual
288            ),
289            None => panic!(
290                "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
291                self.location(),
292                expected,
293                self.exchange.input.body
294            ),
295        }
296        self
297    }
298
299    /// Assert that the body is `Body::Json` equal to `expected`.
300    pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
301        match &self.exchange.input.body {
302            camel_component_api::Body::Json(actual) if *actual == expected => {}
303            camel_component_api::Body::Json(actual) => panic!(
304                "{}: expected body JSON {}, got {}",
305                self.location(),
306                expected,
307                actual
308            ),
309            other => panic!(
310                "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
311                self.location(),
312                expected,
313                other
314            ),
315        }
316        self
317    }
318
319    /// Assert that the body is `Body::Bytes` equal to `expected`.
320    pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
321        match &self.exchange.input.body {
322            camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
323            camel_component_api::Body::Bytes(actual) => panic!(
324                "{}: expected body bytes {:?}, got {:?}",
325                self.location(),
326                expected,
327                actual
328            ),
329            other => panic!(
330                "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
331                self.location(),
332                expected,
333                other
334            ),
335        }
336        self
337    }
338
339    /// Assert that header `key` exists and equals `expected`.
340    ///
341    /// # Panics
342    ///
343    /// Panics if the header is missing or its value does not match `expected`.
344    pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
345        match self.exchange.input.headers.get(key) {
346            Some(actual) if *actual == expected => {}
347            Some(actual) => panic!(
348                "{}: expected header {:?} = {}, got {}",
349                self.location(),
350                key,
351                expected,
352                actual
353            ),
354            None => panic!(
355                "{}: expected header {:?} = {}, but header is absent",
356                self.location(),
357                key,
358                expected
359            ),
360        }
361        self
362    }
363
364    /// Assert that header `key` is present (any value).
365    ///
366    /// # Panics
367    ///
368    /// Panics if the header key is absent.
369    pub fn assert_header_exists(self, key: &str) -> Self {
370        if !self.exchange.input.headers.contains_key(key) {
371            panic!(
372                "{}: expected header {:?} to be present, but it was absent",
373                self.location(),
374                key
375            );
376        }
377        self
378    }
379
380    /// Assert that the exchange has an error (`exchange.error` is `Some`).
381    ///
382    /// # Panics
383    ///
384    /// Panics if `exchange.error` is `None`.
385    pub fn assert_has_error(self) -> Self {
386        if self.exchange.error.is_none() {
387            panic!(
388                "{}: expected exchange to have an error, but error is None",
389                self.location()
390            );
391        }
392        self
393    }
394
395    /// Assert that the exchange has no error (`exchange.error` is `None`).
396    ///
397    /// # Panics
398    ///
399    /// Panics if `exchange.error` is `Some`.
400    pub fn assert_no_error(self) -> Self {
401        if let Some(ref err) = self.exchange.error {
402            panic!(
403                "{}: expected exchange to have no error, but got: {}",
404                self.location(),
405                err
406            );
407        }
408        self
409    }
410}
411
412// ---------------------------------------------------------------------------
413// Tests
414// ---------------------------------------------------------------------------
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419    use camel_component_api::Message;
420    use camel_component_api::NoOpComponentContext;
421    use tower::ServiceExt;
422
423    fn test_producer_ctx() -> ProducerContext {
424        ProducerContext::new()
425    }
426
427    #[test]
428    fn test_mock_component_scheme() {
429        let component = MockComponent::new();
430        assert_eq!(component.scheme(), "mock");
431    }
432
433    #[test]
434    fn test_mock_component_default() {
435        let component = MockComponent::default();
436        assert_eq!(component.scheme(), "mock");
437        assert!(component.get_endpoint("missing").is_none());
438    }
439
440    #[test]
441    fn test_mock_creates_endpoint() {
442        let component = MockComponent::new();
443        let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
444        assert!(endpoint.is_ok());
445    }
446
447    #[test]
448    fn test_mock_wrong_scheme() {
449        let component = MockComponent::new();
450        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
451        assert!(result.is_err());
452    }
453
454    #[test]
455    fn test_mock_endpoint_no_consumer() {
456        let component = MockComponent::new();
457        let endpoint = component
458            .create_endpoint("mock:result", &NoOpComponentContext)
459            .unwrap();
460        assert!(endpoint.create_consumer().is_err());
461    }
462
463    #[test]
464    fn test_mock_endpoint_creates_producer() {
465        let ctx = test_producer_ctx();
466        let component = MockComponent::new();
467        let endpoint = component
468            .create_endpoint("mock:result", &NoOpComponentContext)
469            .unwrap();
470        assert!(endpoint.create_producer(&ctx).is_ok());
471    }
472
473    #[test]
474    fn test_mock_endpoint_uri() {
475        let component = MockComponent::new();
476        let endpoint = component
477            .create_endpoint("mock:uri-check", &NoOpComponentContext)
478            .unwrap();
479        assert_eq!(endpoint.uri(), "mock:uri-check");
480    }
481
482    #[test]
483    fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
484        let component = MockComponent::new();
485        let _ = component
486            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
487            .unwrap();
488        let _ = component
489            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
490            .unwrap();
491
492        let first = component.get_endpoint("shared-inner").unwrap();
493        let second = component.get_endpoint("shared-inner").unwrap();
494        assert!(Arc::ptr_eq(&first, &second));
495    }
496
497    #[tokio::test]
498    async fn test_mock_producer_records_exchange() {
499        let ctx = test_producer_ctx();
500        let component = MockComponent::new();
501        let endpoint = component
502            .create_endpoint("mock:test", &NoOpComponentContext)
503            .unwrap();
504
505        let mut producer = endpoint.create_producer(&ctx).unwrap();
506
507        let ex1 = Exchange::new(Message::new("first"));
508        let ex2 = Exchange::new(Message::new("second"));
509
510        producer.call(ex1).await.unwrap();
511        producer.call(ex2).await.unwrap();
512
513        let inner = component.get_endpoint("test").unwrap();
514        inner.assert_exchange_count(2).await;
515
516        let received = inner.get_received_exchanges().await;
517        assert_eq!(received[0].input.body.as_text(), Some("first"));
518        assert_eq!(received[1].input.body.as_text(), Some("second"));
519    }
520
521    #[tokio::test]
522    async fn test_mock_producer_passes_through_exchange() {
523        let ctx = test_producer_ctx();
524        let component = MockComponent::new();
525        let endpoint = component
526            .create_endpoint("mock:passthrough", &NoOpComponentContext)
527            .unwrap();
528
529        let producer = endpoint.create_producer(&ctx).unwrap();
530        let exchange = Exchange::new(Message::new("hello"));
531        let result = producer.oneshot(exchange).await.unwrap();
532
533        // Producer should return the exchange unchanged
534        assert_eq!(result.input.body.as_text(), Some("hello"));
535    }
536
537    #[tokio::test]
538    async fn test_mock_assert_count_passes() {
539        let component = MockComponent::new();
540        let endpoint = component
541            .create_endpoint("mock:count", &NoOpComponentContext)
542            .unwrap();
543        let inner = component.get_endpoint("count").unwrap();
544
545        inner.assert_exchange_count(0).await;
546
547        let ctx = test_producer_ctx();
548        let mut producer = endpoint.create_producer(&ctx).unwrap();
549        producer
550            .call(Exchange::new(Message::new("one")))
551            .await
552            .unwrap();
553
554        inner.assert_exchange_count(1).await;
555    }
556
557    #[tokio::test]
558    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
559    async fn test_mock_assert_count_fails() {
560        let component = MockComponent::new();
561        // Endpoint not created yet, so get_endpoint returns None.
562        // Create it first, then assert.
563        let _endpoint = component
564            .create_endpoint("mock:fail", &NoOpComponentContext)
565            .unwrap();
566        let inner = component.get_endpoint("fail").unwrap();
567
568        inner.assert_exchange_count(5).await;
569    }
570
571    #[tokio::test]
572    async fn test_mock_component_shared_registry() {
573        let component = MockComponent::new();
574        let ep1 = component
575            .create_endpoint("mock:shared", &NoOpComponentContext)
576            .unwrap();
577        let ep2 = component
578            .create_endpoint("mock:shared", &NoOpComponentContext)
579            .unwrap();
580
581        // Producing via ep1's producer...
582        let ctx = test_producer_ctx();
583        let mut p1 = ep1.create_producer(&ctx).unwrap();
584        p1.call(Exchange::new(Message::new("from-ep1")))
585            .await
586            .unwrap();
587
588        // ...and via ep2's producer...
589        let mut p2 = ep2.create_producer(&ctx).unwrap();
590        p2.call(Exchange::new(Message::new("from-ep2")))
591            .await
592            .unwrap();
593
594        // ...both should be visible via the shared storage
595        let inner = component.get_endpoint("shared").unwrap();
596        inner.assert_exchange_count(2).await;
597
598        let received = inner.get_received_exchanges().await;
599        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
600        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
601    }
602
603    #[tokio::test]
604    async fn await_exchanges_resolves_immediately() {
605        // If exchanges are already present, await_exchanges returns without timeout.
606        let ctx = test_producer_ctx();
607        let component = MockComponent::new();
608        let endpoint = component
609            .create_endpoint("mock:immediate", &NoOpComponentContext)
610            .unwrap();
611        let inner = component.get_endpoint("immediate").unwrap();
612
613        let mut producer = endpoint.create_producer(&ctx).unwrap();
614        producer
615            .call(Exchange::new(Message::new("a")))
616            .await
617            .unwrap();
618        producer
619            .call(Exchange::new(Message::new("b")))
620            .await
621            .unwrap();
622
623        // Should return immediately — both exchanges already received.
624        inner
625            .await_exchanges(2, std::time::Duration::from_millis(100))
626            .await;
627    }
628
629    #[tokio::test]
630    async fn await_exchanges_waits_then_resolves() {
631        // await_exchanges unblocks when a producer sends after the call.
632        let ctx = test_producer_ctx();
633        let component = MockComponent::new();
634        let endpoint = component
635            .create_endpoint("mock:waiter", &NoOpComponentContext)
636            .unwrap();
637        let inner = component.get_endpoint("waiter").unwrap();
638
639        // Spawn producer that sends after a short delay.
640        let mut producer = endpoint.create_producer(&ctx).unwrap();
641        tokio::spawn(async move {
642            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
643            producer
644                .call(Exchange::new(Message::new("delayed")))
645                .await
646                .unwrap();
647        });
648
649        // This should block until the spawned task delivers the exchange.
650        inner
651            .await_exchanges(1, std::time::Duration::from_millis(500))
652            .await;
653
654        let received = inner.get_received_exchanges().await;
655        assert_eq!(received.len(), 1);
656        assert_eq!(received[0].input.body.as_text(), Some("delayed"));
657    }
658
659    #[tokio::test]
660    #[should_panic(expected = "timed out waiting for 5 exchanges")]
661    async fn await_exchanges_times_out() {
662        let component = MockComponent::new();
663        let _endpoint = component
664            .create_endpoint("mock:timeout", &NoOpComponentContext)
665            .unwrap();
666        let inner = component.get_endpoint("timeout").unwrap();
667
668        // Nobody sends — should panic after timeout.
669        inner
670            .await_exchanges(5, std::time::Duration::from_millis(50))
671            .await;
672    }
673
674    #[tokio::test(flavor = "multi_thread")]
675    async fn exchange_idx_returns_assert() {
676        let ctx = test_producer_ctx();
677        let component = MockComponent::new();
678        let endpoint = component
679            .create_endpoint("mock:assert-idx", &NoOpComponentContext)
680            .unwrap();
681        let inner = component.get_endpoint("assert-idx").unwrap();
682
683        let mut producer = endpoint.create_producer(&ctx).unwrap();
684        producer
685            .call(Exchange::new(Message::new("hello")))
686            .await
687            .unwrap();
688
689        inner
690            .await_exchanges(1, std::time::Duration::from_millis(500))
691            .await;
692        // Should not panic — index 0 exists.
693        let _assert = inner.exchange(0);
694    }
695
696    #[tokio::test(flavor = "multi_thread")]
697    #[should_panic(expected = "exchange index 5 out of bounds")]
698    async fn exchange_idx_out_of_bounds() {
699        let ctx = test_producer_ctx();
700        let component = MockComponent::new();
701        let endpoint = component
702            .create_endpoint("mock:oob", &NoOpComponentContext)
703            .unwrap();
704        let inner = component.get_endpoint("oob").unwrap();
705
706        let mut producer = endpoint.create_producer(&ctx).unwrap();
707        producer
708            .call(Exchange::new(Message::new("only-one")))
709            .await
710            .unwrap();
711
712        inner
713            .await_exchanges(1, std::time::Duration::from_millis(500))
714            .await;
715        // Only 1 exchange, index 5 should panic.
716        let _assert = inner.exchange(5);
717    }
718
719    #[tokio::test(flavor = "multi_thread")]
720    async fn assert_body_text_pass() {
721        let ctx = test_producer_ctx();
722        let component = MockComponent::new();
723        let endpoint = component
724            .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
725            .unwrap();
726        let inner = component.get_endpoint("body-text-pass").unwrap();
727        let mut producer = endpoint.create_producer(&ctx).unwrap();
728        producer
729            .call(Exchange::new(Message::new("hello")))
730            .await
731            .unwrap();
732        inner
733            .await_exchanges(1, std::time::Duration::from_millis(500))
734            .await;
735        inner.exchange(0).assert_body_text("hello");
736    }
737
738    #[tokio::test(flavor = "multi_thread")]
739    #[should_panic(expected = "expected body text")]
740    async fn assert_body_text_fail() {
741        let ctx = test_producer_ctx();
742        let component = MockComponent::new();
743        let endpoint = component
744            .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
745            .unwrap();
746        let inner = component.get_endpoint("body-text-fail").unwrap();
747        let mut producer = endpoint.create_producer(&ctx).unwrap();
748        producer
749            .call(Exchange::new(Message::new("hello")))
750            .await
751            .unwrap();
752        inner
753            .await_exchanges(1, std::time::Duration::from_millis(500))
754            .await;
755        inner.exchange(0).assert_body_text("world");
756    }
757
758    #[tokio::test(flavor = "multi_thread")]
759    async fn assert_body_json_pass() {
760        use camel_component_api::Body;
761        let ctx = test_producer_ctx();
762        let component = MockComponent::new();
763        let endpoint = component
764            .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
765            .unwrap();
766        let inner = component.get_endpoint("body-json-pass").unwrap();
767        let mut producer = endpoint.create_producer(&ctx).unwrap();
768        let mut msg = Message::new("");
769        msg.body = Body::Json(serde_json::json!({"key": "value"}));
770        producer.call(Exchange::new(msg)).await.unwrap();
771        inner
772            .await_exchanges(1, std::time::Duration::from_millis(500))
773            .await;
774        inner
775            .exchange(0)
776            .assert_body_json(serde_json::json!({"key": "value"}));
777    }
778
779    #[tokio::test(flavor = "multi_thread")]
780    #[should_panic(expected = "expected body JSON")]
781    async fn assert_body_json_fail() {
782        use camel_component_api::Body;
783        let ctx = test_producer_ctx();
784        let component = MockComponent::new();
785        let endpoint = component
786            .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
787            .unwrap();
788        let inner = component.get_endpoint("body-json-fail").unwrap();
789        let mut producer = endpoint.create_producer(&ctx).unwrap();
790        let mut msg = Message::new("");
791        msg.body = Body::Json(serde_json::json!({"key": "value"}));
792        producer.call(Exchange::new(msg)).await.unwrap();
793        inner
794            .await_exchanges(1, std::time::Duration::from_millis(500))
795            .await;
796        inner
797            .exchange(0)
798            .assert_body_json(serde_json::json!({"key": "other"}));
799    }
800
801    #[tokio::test(flavor = "multi_thread")]
802    async fn assert_body_bytes_pass() {
803        use bytes::Bytes;
804        use camel_component_api::Body;
805        let ctx = test_producer_ctx();
806        let component = MockComponent::new();
807        let endpoint = component
808            .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
809            .unwrap();
810        let inner = component.get_endpoint("body-bytes-pass").unwrap();
811        let mut producer = endpoint.create_producer(&ctx).unwrap();
812        let mut msg = Message::new("");
813        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
814        producer.call(Exchange::new(msg)).await.unwrap();
815        inner
816            .await_exchanges(1, std::time::Duration::from_millis(500))
817            .await;
818        inner.exchange(0).assert_body_bytes(b"binary");
819    }
820
821    #[tokio::test(flavor = "multi_thread")]
822    #[should_panic(expected = "expected body bytes")]
823    async fn assert_body_bytes_fail() {
824        use bytes::Bytes;
825        use camel_component_api::Body;
826        let ctx = test_producer_ctx();
827        let component = MockComponent::new();
828        let endpoint = component
829            .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
830            .unwrap();
831        let inner = component.get_endpoint("body-bytes-fail").unwrap();
832        let mut producer = endpoint.create_producer(&ctx).unwrap();
833        let mut msg = Message::new("");
834        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
835        producer.call(Exchange::new(msg)).await.unwrap();
836        inner
837            .await_exchanges(1, std::time::Duration::from_millis(500))
838            .await;
839        inner.exchange(0).assert_body_bytes(b"different");
840    }
841
842    #[tokio::test(flavor = "multi_thread")]
843    async fn assert_header_pass() {
844        let ctx = test_producer_ctx();
845        let component = MockComponent::new();
846        let endpoint = component
847            .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
848            .unwrap();
849        let inner = component.get_endpoint("hdr-pass").unwrap();
850        let mut producer = endpoint.create_producer(&ctx).unwrap();
851        let mut msg = Message::new("body");
852        msg.headers
853            .insert("x-key".to_string(), serde_json::json!("value"));
854        producer.call(Exchange::new(msg)).await.unwrap();
855        inner
856            .await_exchanges(1, std::time::Duration::from_millis(500))
857            .await;
858        inner
859            .exchange(0)
860            .assert_header("x-key", serde_json::json!("value"));
861    }
862
863    #[tokio::test(flavor = "multi_thread")]
864    #[should_panic(expected = "expected header")]
865    async fn assert_header_fail() {
866        let ctx = test_producer_ctx();
867        let component = MockComponent::new();
868        let endpoint = component
869            .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
870            .unwrap();
871        let inner = component.get_endpoint("hdr-fail").unwrap();
872        let mut producer = endpoint.create_producer(&ctx).unwrap();
873        let mut msg = Message::new("body");
874        msg.headers
875            .insert("x-key".to_string(), serde_json::json!("value"));
876        producer.call(Exchange::new(msg)).await.unwrap();
877        inner
878            .await_exchanges(1, std::time::Duration::from_millis(500))
879            .await;
880        inner
881            .exchange(0)
882            .assert_header("x-key", serde_json::json!("other"));
883    }
884
885    #[tokio::test(flavor = "multi_thread")]
886    async fn assert_header_exists_pass() {
887        let ctx = test_producer_ctx();
888        let component = MockComponent::new();
889        let endpoint = component
890            .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
891            .unwrap();
892        let inner = component.get_endpoint("hdr-exists-pass").unwrap();
893        let mut producer = endpoint.create_producer(&ctx).unwrap();
894        let mut msg = Message::new("body");
895        msg.headers
896            .insert("x-present".to_string(), serde_json::json!(42));
897        producer.call(Exchange::new(msg)).await.unwrap();
898        inner
899            .await_exchanges(1, std::time::Duration::from_millis(500))
900            .await;
901        inner.exchange(0).assert_header_exists("x-present");
902    }
903
904    #[tokio::test(flavor = "multi_thread")]
905    #[should_panic(expected = "expected header")]
906    async fn assert_header_exists_fail() {
907        let ctx = test_producer_ctx();
908        let component = MockComponent::new();
909        let endpoint = component
910            .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
911            .unwrap();
912        let inner = component.get_endpoint("hdr-exists-fail").unwrap();
913        let mut producer = endpoint.create_producer(&ctx).unwrap();
914        producer
915            .call(Exchange::new(Message::new("body")))
916            .await
917            .unwrap();
918        inner
919            .await_exchanges(1, std::time::Duration::from_millis(500))
920            .await;
921        inner.exchange(0).assert_header_exists("x-missing");
922    }
923
924    #[tokio::test(flavor = "multi_thread")]
925    async fn assert_has_error_pass() {
926        let ctx = test_producer_ctx();
927        let component = MockComponent::new();
928        let endpoint = component
929            .create_endpoint("mock:err-pass", &NoOpComponentContext)
930            .unwrap();
931        let inner = component.get_endpoint("err-pass").unwrap();
932        let mut producer = endpoint.create_producer(&ctx).unwrap();
933        let mut ex = Exchange::new(Message::new("body"));
934        ex.error = Some(camel_component_api::CamelError::ProcessorError(
935            "oops".to_string(),
936        ));
937        producer.call(ex).await.unwrap();
938        inner
939            .await_exchanges(1, std::time::Duration::from_millis(500))
940            .await;
941        inner.exchange(0).assert_has_error();
942    }
943
944    #[tokio::test(flavor = "multi_thread")]
945    #[should_panic(expected = "expected exchange to have an error")]
946    async fn assert_has_error_fail() {
947        let ctx = test_producer_ctx();
948        let component = MockComponent::new();
949        let endpoint = component
950            .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
951            .unwrap();
952        let inner = component.get_endpoint("has-err-fail").unwrap();
953        let mut producer = endpoint.create_producer(&ctx).unwrap();
954        producer
955            .call(Exchange::new(Message::new("body")))
956            .await
957            .unwrap();
958        inner
959            .await_exchanges(1, std::time::Duration::from_millis(500))
960            .await;
961        inner.exchange(0).assert_has_error();
962    }
963
964    #[tokio::test(flavor = "multi_thread")]
965    async fn assert_no_error_pass() {
966        let ctx = test_producer_ctx();
967        let component = MockComponent::new();
968        let endpoint = component
969            .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
970            .unwrap();
971        let inner = component.get_endpoint("no-err-pass").unwrap();
972        let mut producer = endpoint.create_producer(&ctx).unwrap();
973        producer
974            .call(Exchange::new(Message::new("body")))
975            .await
976            .unwrap();
977        inner
978            .await_exchanges(1, std::time::Duration::from_millis(500))
979            .await;
980        inner.exchange(0).assert_no_error();
981    }
982
983    #[tokio::test(flavor = "multi_thread")]
984    #[should_panic(expected = "expected exchange to have no error")]
985    async fn assert_no_error_fail() {
986        let ctx = test_producer_ctx();
987        let component = MockComponent::new();
988        let endpoint = component
989            .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
990            .unwrap();
991        let inner = component.get_endpoint("no-err-fail").unwrap();
992        let mut producer = endpoint.create_producer(&ctx).unwrap();
993        let mut ex = Exchange::new(Message::new("body"));
994        ex.error = Some(camel_component_api::CamelError::ProcessorError(
995            "oops".to_string(),
996        ));
997        producer.call(ex).await.unwrap();
998        inner
999            .await_exchanges(1, std::time::Duration::from_millis(500))
1000            .await;
1001        inner.exchange(0).assert_no_error();
1002    }
1003}