1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::task::{Context, Poll};
11
12use tokio::sync::{Mutex, Notify};
13use tower::Service;
14
15use camel_component_api::parse_uri;
16use camel_component_api::{BoxProcessor, CamelError, Exchange};
17use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
18use tracing::debug;
19
20#[derive(Clone)]
35pub struct MockComponent {
36 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
37}
38
39impl MockComponent {
40 pub fn new() -> Self {
41 Self {
42 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
43 }
44 }
45
46 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
50 let registry = self
51 .registry
52 .lock()
53 .expect("mutex poisoned: another thread panicked while holding this lock"); registry.get(name).cloned()
55 }
56}
57
58impl Default for MockComponent {
59 fn default() -> Self {
60 Self::new()
61 }
62}
63
64impl Component for MockComponent {
65 fn scheme(&self) -> &str {
66 "mock"
67 }
68
69 fn create_endpoint(
70 &self,
71 uri: &str,
72 _ctx: &dyn camel_component_api::ComponentContext,
73 ) -> Result<Box<dyn Endpoint>, CamelError> {
74 let parts = parse_uri(uri)?;
75 if parts.scheme != "mock" {
76 return Err(CamelError::InvalidUri(format!(
77 "expected scheme 'mock', got '{}'",
78 parts.scheme
79 )));
80 }
81
82 let name = parts.path;
83 let mut registry = self.registry.lock().map_err(|e| {
84 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
85 })?;
86 let inner = registry
87 .entry(name.clone())
88 .or_insert_with(|| {
89 Arc::new(MockEndpointInner {
90 uri: uri.to_string(),
91 name,
92 received: Arc::new(Mutex::new(Vec::new())),
93 notify: Arc::new(Notify::new()),
94 })
95 })
96 .clone();
97
98 debug!(endpoint_name = %inner.name, "mock endpoint created");
99 Ok(Box::new(MockEndpoint(inner)))
100 }
101}
102
103pub struct MockEndpoint(Arc<MockEndpointInner>);
113
114pub struct MockEndpointInner {
120 uri: String,
121 pub name: String,
122 received: Arc<Mutex<Vec<Exchange>>>,
123 notify: Arc<Notify>,
124}
125
126impl MockEndpointInner {
127 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
129 self.received.lock().await.clone()
130 }
131
132 pub async fn assert_exchange_count(&self, expected: usize) {
138 let actual = self.received.lock().await.len();
139 assert_eq!(
140 actual, expected,
141 "MockEndpoint expected {expected} exchanges, got {actual}"
142 );
143 }
144
145 pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
154 let deadline = tokio::time::Instant::now() + timeout;
155 loop {
156 {
157 let received = self.received.lock().await;
158 if received.len() >= count {
159 return;
160 }
161 }
162 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
163 if remaining.is_zero() {
164 let got = self.received.lock().await.len();
167 if got >= count {
168 return;
169 }
170 panic!(
171 "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
172 self.name, count, got, timeout
173 );
174 }
175 tokio::select! {
176 _ = self.notify.notified() => {}
177 _ = tokio::time::sleep(remaining) => {}
178 }
179 }
180 }
181
182 pub fn exchange(&self, idx: usize) -> ExchangeAssert {
194 let received = tokio::task::block_in_place(|| self.received.blocking_lock());
195 if idx >= received.len() {
196 panic!(
197 "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
198 self.name,
199 idx,
200 received.len()
201 );
202 }
203 ExchangeAssert {
204 exchange: received[idx].clone(),
205 idx,
206 endpoint_name: self.name.clone(),
207 }
208 }
209}
210
211impl Endpoint for MockEndpoint {
212 fn uri(&self) -> &str {
213 &self.0.uri
214 }
215
216 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
217 Err(CamelError::EndpointCreationFailed(
218 "mock endpoint does not support consumers (it is a sink)".to_string(),
219 ))
220 }
221
222 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
223 Ok(BoxProcessor::new(MockProducer {
224 name: self.0.name.clone(),
225 received: Arc::clone(&self.0.received),
226 notify: Arc::clone(&self.0.notify),
227 }))
228 }
229}
230
231#[derive(Clone)]
237struct MockProducer {
238 name: String,
239 received: Arc<Mutex<Vec<Exchange>>>,
240 notify: Arc<Notify>,
241}
242
243impl Service<Exchange> for MockProducer {
244 type Response = Exchange;
245 type Error = CamelError;
246 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
247
248 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
249 Poll::Ready(Ok(()))
250 }
251
252 fn call(&mut self, exchange: Exchange) -> Self::Future {
253 let name = self.name.clone();
254 let received = Arc::clone(&self.received);
255 let notify = Arc::clone(&self.notify);
256 Box::pin(async move {
257 received.lock().await.push(exchange.clone());
258 let count = received.lock().await.len();
259 debug!(endpoint_name = %name, count = %count, "exchange recorded on mock");
260 notify.notify_waiters();
261 Ok(exchange)
262 })
263 }
264}
265
266pub struct ExchangeAssert {
278 exchange: Exchange,
279 idx: usize,
280 endpoint_name: String,
281}
282
283impl ExchangeAssert {
284 fn location(&self) -> String {
285 format!(
286 "MockEndpoint '{}' exchange[{}]",
287 self.endpoint_name, self.idx
288 )
289 }
290
291 pub fn assert_body_text(self, expected: &str) -> Self {
293 match self.exchange.input.body.as_text() {
294 Some(actual) if actual == expected => {}
295 Some(actual) => panic!(
296 "{}: expected body text {:?}, got {:?}",
297 self.location(),
298 expected,
299 actual
300 ),
301 None => panic!(
302 "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
303 self.location(),
304 expected,
305 self.exchange.input.body
306 ),
307 }
308 self
309 }
310
311 pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
313 match &self.exchange.input.body {
314 camel_component_api::Body::Json(actual) if *actual == expected => {}
315 camel_component_api::Body::Json(actual) => panic!(
316 "{}: expected body JSON {}, got {}",
317 self.location(),
318 expected,
319 actual
320 ),
321 other => panic!(
322 "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
323 self.location(),
324 expected,
325 other
326 ),
327 }
328 self
329 }
330
331 pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
333 match &self.exchange.input.body {
334 camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
335 camel_component_api::Body::Bytes(actual) => panic!(
336 "{}: expected body bytes {:?}, got {:?}",
337 self.location(),
338 expected,
339 actual
340 ),
341 other => panic!(
342 "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
343 self.location(),
344 expected,
345 other
346 ),
347 }
348 self
349 }
350
351 pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
357 match self.exchange.input.headers.get(key) {
358 Some(actual) if *actual == expected => {}
359 Some(actual) => panic!(
360 "{}: expected header {:?} = {}, got {}",
361 self.location(),
362 key,
363 expected,
364 actual
365 ),
366 None => panic!(
367 "{}: expected header {:?} = {}, but header is absent",
368 self.location(),
369 key,
370 expected
371 ),
372 }
373 self
374 }
375
376 pub fn assert_header_exists(self, key: &str) -> Self {
382 if !self.exchange.input.headers.contains_key(key) {
383 panic!(
384 "{}: expected header {:?} to be present, but it was absent",
385 self.location(),
386 key
387 );
388 }
389 self
390 }
391
392 pub fn assert_has_error(self) -> Self {
398 if self.exchange.error.is_none() {
399 panic!(
400 "{}: expected exchange to have an error, but error is None",
401 self.location()
402 );
403 }
404 self
405 }
406
407 pub fn assert_no_error(self) -> Self {
413 if let Some(ref err) = self.exchange.error {
414 panic!(
415 "{}: expected exchange to have no error, but got: {}",
416 self.location(),
417 err
418 );
419 }
420 self
421 }
422}
423
424#[cfg(test)]
429mod tests {
430 use super::*;
431 use camel_component_api::Message;
432 use camel_component_api::NoOpComponentContext;
433 use tower::ServiceExt;
434
435 fn test_producer_ctx() -> ProducerContext {
436 ProducerContext::new()
437 }
438
439 #[test]
440 fn test_mock_component_scheme() {
441 let component = MockComponent::new();
442 assert_eq!(component.scheme(), "mock");
443 }
444
445 #[test]
446 fn test_mock_component_default() {
447 let component = MockComponent::default();
448 assert_eq!(component.scheme(), "mock");
449 assert!(component.get_endpoint("missing").is_none());
450 }
451
452 #[test]
453 fn test_mock_creates_endpoint() {
454 let component = MockComponent::new();
455 let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
456 assert!(endpoint.is_ok());
457 }
458
459 #[test]
460 fn test_mock_wrong_scheme() {
461 let component = MockComponent::new();
462 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
463 assert!(result.is_err());
464 }
465
466 #[test]
467 fn test_mock_endpoint_no_consumer() {
468 let component = MockComponent::new();
469 let endpoint = component
470 .create_endpoint("mock:result", &NoOpComponentContext)
471 .unwrap();
472 assert!(endpoint.create_consumer().is_err());
473 }
474
475 #[test]
476 fn test_mock_endpoint_creates_producer() {
477 let ctx = test_producer_ctx();
478 let component = MockComponent::new();
479 let endpoint = component
480 .create_endpoint("mock:result", &NoOpComponentContext)
481 .unwrap();
482 assert!(endpoint.create_producer(&ctx).is_ok());
483 }
484
485 #[test]
486 fn test_mock_endpoint_uri() {
487 let component = MockComponent::new();
488 let endpoint = component
489 .create_endpoint("mock:uri-check", &NoOpComponentContext)
490 .unwrap();
491 assert_eq!(endpoint.uri(), "mock:uri-check");
492 }
493
494 #[test]
495 fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
496 let component = MockComponent::new();
497 let _ = component
498 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
499 .unwrap();
500 let _ = component
501 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
502 .unwrap();
503
504 let first = component.get_endpoint("shared-inner").unwrap();
505 let second = component.get_endpoint("shared-inner").unwrap();
506 assert!(Arc::ptr_eq(&first, &second));
507 }
508
509 #[tokio::test]
510 async fn test_mock_producer_records_exchange() {
511 let ctx = test_producer_ctx();
512 let component = MockComponent::new();
513 let endpoint = component
514 .create_endpoint("mock:test", &NoOpComponentContext)
515 .unwrap();
516
517 let mut producer = endpoint.create_producer(&ctx).unwrap();
518
519 let ex1 = Exchange::new(Message::new("first"));
520 let ex2 = Exchange::new(Message::new("second"));
521
522 producer.call(ex1).await.unwrap();
523 producer.call(ex2).await.unwrap();
524
525 let inner = component.get_endpoint("test").unwrap();
526 inner.assert_exchange_count(2).await;
527
528 let received = inner.get_received_exchanges().await;
529 assert_eq!(received[0].input.body.as_text(), Some("first"));
530 assert_eq!(received[1].input.body.as_text(), Some("second"));
531 }
532
533 #[tokio::test]
534 async fn test_mock_producer_passes_through_exchange() {
535 let ctx = test_producer_ctx();
536 let component = MockComponent::new();
537 let endpoint = component
538 .create_endpoint("mock:passthrough", &NoOpComponentContext)
539 .unwrap();
540
541 let producer = endpoint.create_producer(&ctx).unwrap();
542 let exchange = Exchange::new(Message::new("hello"));
543 let result = producer.oneshot(exchange).await.unwrap();
544
545 assert_eq!(result.input.body.as_text(), Some("hello"));
547 }
548
549 #[tokio::test]
550 async fn test_mock_assert_count_passes() {
551 let component = MockComponent::new();
552 let endpoint = component
553 .create_endpoint("mock:count", &NoOpComponentContext)
554 .unwrap();
555 let inner = component.get_endpoint("count").unwrap();
556
557 inner.assert_exchange_count(0).await;
558
559 let ctx = test_producer_ctx();
560 let mut producer = endpoint.create_producer(&ctx).unwrap();
561 producer
562 .call(Exchange::new(Message::new("one")))
563 .await
564 .unwrap();
565
566 inner.assert_exchange_count(1).await;
567 }
568
569 #[tokio::test]
570 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
571 async fn test_mock_assert_count_fails() {
572 let component = MockComponent::new();
573 let _endpoint = component
576 .create_endpoint("mock:fail", &NoOpComponentContext)
577 .unwrap();
578 let inner = component.get_endpoint("fail").unwrap();
579
580 inner.assert_exchange_count(5).await;
581 }
582
583 #[tokio::test]
584 async fn test_mock_component_shared_registry() {
585 let component = MockComponent::new();
586 let ep1 = component
587 .create_endpoint("mock:shared", &NoOpComponentContext)
588 .unwrap();
589 let ep2 = component
590 .create_endpoint("mock:shared", &NoOpComponentContext)
591 .unwrap();
592
593 let ctx = test_producer_ctx();
595 let mut p1 = ep1.create_producer(&ctx).unwrap();
596 p1.call(Exchange::new(Message::new("from-ep1")))
597 .await
598 .unwrap();
599
600 let mut p2 = ep2.create_producer(&ctx).unwrap();
602 p2.call(Exchange::new(Message::new("from-ep2")))
603 .await
604 .unwrap();
605
606 let inner = component.get_endpoint("shared").unwrap();
608 inner.assert_exchange_count(2).await;
609
610 let received = inner.get_received_exchanges().await;
611 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
612 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
613 }
614
615 #[tokio::test]
616 async fn await_exchanges_resolves_immediately() {
617 let ctx = test_producer_ctx();
619 let component = MockComponent::new();
620 let endpoint = component
621 .create_endpoint("mock:immediate", &NoOpComponentContext)
622 .unwrap();
623 let inner = component.get_endpoint("immediate").unwrap();
624
625 let mut producer = endpoint.create_producer(&ctx).unwrap();
626 producer
627 .call(Exchange::new(Message::new("a")))
628 .await
629 .unwrap();
630 producer
631 .call(Exchange::new(Message::new("b")))
632 .await
633 .unwrap();
634
635 inner
637 .await_exchanges(2, std::time::Duration::from_millis(100))
638 .await;
639 }
640
641 #[tokio::test]
642 async fn await_exchanges_waits_then_resolves() {
643 let ctx = test_producer_ctx();
645 let component = MockComponent::new();
646 let endpoint = component
647 .create_endpoint("mock:waiter", &NoOpComponentContext)
648 .unwrap();
649 let inner = component.get_endpoint("waiter").unwrap();
650
651 let mut producer = endpoint.create_producer(&ctx).unwrap();
653 tokio::spawn(async move {
654 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
655 producer
656 .call(Exchange::new(Message::new("delayed")))
657 .await
658 .unwrap();
659 });
660
661 inner
663 .await_exchanges(1, std::time::Duration::from_millis(500))
664 .await;
665
666 let received = inner.get_received_exchanges().await;
667 assert_eq!(received.len(), 1);
668 assert_eq!(received[0].input.body.as_text(), Some("delayed"));
669 }
670
671 #[tokio::test]
672 #[should_panic(expected = "timed out waiting for 5 exchanges")]
673 async fn await_exchanges_times_out() {
674 let component = MockComponent::new();
675 let _endpoint = component
676 .create_endpoint("mock:timeout", &NoOpComponentContext)
677 .unwrap();
678 let inner = component.get_endpoint("timeout").unwrap();
679
680 inner
682 .await_exchanges(5, std::time::Duration::from_millis(50))
683 .await;
684 }
685
686 #[tokio::test(flavor = "multi_thread")]
687 async fn exchange_idx_returns_assert() {
688 let ctx = test_producer_ctx();
689 let component = MockComponent::new();
690 let endpoint = component
691 .create_endpoint("mock:assert-idx", &NoOpComponentContext)
692 .unwrap();
693 let inner = component.get_endpoint("assert-idx").unwrap();
694
695 let mut producer = endpoint.create_producer(&ctx).unwrap();
696 producer
697 .call(Exchange::new(Message::new("hello")))
698 .await
699 .unwrap();
700
701 inner
702 .await_exchanges(1, std::time::Duration::from_millis(500))
703 .await;
704 let _assert = inner.exchange(0);
706 }
707
708 #[tokio::test(flavor = "multi_thread")]
709 #[should_panic(expected = "exchange index 5 out of bounds")]
710 async fn exchange_idx_out_of_bounds() {
711 let ctx = test_producer_ctx();
712 let component = MockComponent::new();
713 let endpoint = component
714 .create_endpoint("mock:oob", &NoOpComponentContext)
715 .unwrap();
716 let inner = component.get_endpoint("oob").unwrap();
717
718 let mut producer = endpoint.create_producer(&ctx).unwrap();
719 producer
720 .call(Exchange::new(Message::new("only-one")))
721 .await
722 .unwrap();
723
724 inner
725 .await_exchanges(1, std::time::Duration::from_millis(500))
726 .await;
727 let _assert = inner.exchange(5);
729 }
730
731 #[tokio::test(flavor = "multi_thread")]
732 async fn assert_body_text_pass() {
733 let ctx = test_producer_ctx();
734 let component = MockComponent::new();
735 let endpoint = component
736 .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
737 .unwrap();
738 let inner = component.get_endpoint("body-text-pass").unwrap();
739 let mut producer = endpoint.create_producer(&ctx).unwrap();
740 producer
741 .call(Exchange::new(Message::new("hello")))
742 .await
743 .unwrap();
744 inner
745 .await_exchanges(1, std::time::Duration::from_millis(500))
746 .await;
747 inner.exchange(0).assert_body_text("hello");
748 }
749
750 #[tokio::test(flavor = "multi_thread")]
751 #[should_panic(expected = "expected body text")]
752 async fn assert_body_text_fail() {
753 let ctx = test_producer_ctx();
754 let component = MockComponent::new();
755 let endpoint = component
756 .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
757 .unwrap();
758 let inner = component.get_endpoint("body-text-fail").unwrap();
759 let mut producer = endpoint.create_producer(&ctx).unwrap();
760 producer
761 .call(Exchange::new(Message::new("hello")))
762 .await
763 .unwrap();
764 inner
765 .await_exchanges(1, std::time::Duration::from_millis(500))
766 .await;
767 inner.exchange(0).assert_body_text("world");
768 }
769
770 #[tokio::test(flavor = "multi_thread")]
771 async fn assert_body_json_pass() {
772 use camel_component_api::Body;
773 let ctx = test_producer_ctx();
774 let component = MockComponent::new();
775 let endpoint = component
776 .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
777 .unwrap();
778 let inner = component.get_endpoint("body-json-pass").unwrap();
779 let mut producer = endpoint.create_producer(&ctx).unwrap();
780 let mut msg = Message::new("");
781 msg.body = Body::Json(serde_json::json!({"key": "value"}));
782 producer.call(Exchange::new(msg)).await.unwrap();
783 inner
784 .await_exchanges(1, std::time::Duration::from_millis(500))
785 .await;
786 inner
787 .exchange(0)
788 .assert_body_json(serde_json::json!({"key": "value"}));
789 }
790
791 #[tokio::test(flavor = "multi_thread")]
792 #[should_panic(expected = "expected body JSON")]
793 async fn assert_body_json_fail() {
794 use camel_component_api::Body;
795 let ctx = test_producer_ctx();
796 let component = MockComponent::new();
797 let endpoint = component
798 .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
799 .unwrap();
800 let inner = component.get_endpoint("body-json-fail").unwrap();
801 let mut producer = endpoint.create_producer(&ctx).unwrap();
802 let mut msg = Message::new("");
803 msg.body = Body::Json(serde_json::json!({"key": "value"}));
804 producer.call(Exchange::new(msg)).await.unwrap();
805 inner
806 .await_exchanges(1, std::time::Duration::from_millis(500))
807 .await;
808 inner
809 .exchange(0)
810 .assert_body_json(serde_json::json!({"key": "other"}));
811 }
812
813 #[tokio::test(flavor = "multi_thread")]
814 async fn assert_body_bytes_pass() {
815 use bytes::Bytes;
816 use camel_component_api::Body;
817 let ctx = test_producer_ctx();
818 let component = MockComponent::new();
819 let endpoint = component
820 .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
821 .unwrap();
822 let inner = component.get_endpoint("body-bytes-pass").unwrap();
823 let mut producer = endpoint.create_producer(&ctx).unwrap();
824 let mut msg = Message::new("");
825 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
826 producer.call(Exchange::new(msg)).await.unwrap();
827 inner
828 .await_exchanges(1, std::time::Duration::from_millis(500))
829 .await;
830 inner.exchange(0).assert_body_bytes(b"binary");
831 }
832
833 #[tokio::test(flavor = "multi_thread")]
834 #[should_panic(expected = "expected body bytes")]
835 async fn assert_body_bytes_fail() {
836 use bytes::Bytes;
837 use camel_component_api::Body;
838 let ctx = test_producer_ctx();
839 let component = MockComponent::new();
840 let endpoint = component
841 .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
842 .unwrap();
843 let inner = component.get_endpoint("body-bytes-fail").unwrap();
844 let mut producer = endpoint.create_producer(&ctx).unwrap();
845 let mut msg = Message::new("");
846 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
847 producer.call(Exchange::new(msg)).await.unwrap();
848 inner
849 .await_exchanges(1, std::time::Duration::from_millis(500))
850 .await;
851 inner.exchange(0).assert_body_bytes(b"different");
852 }
853
854 #[tokio::test(flavor = "multi_thread")]
855 async fn assert_header_pass() {
856 let ctx = test_producer_ctx();
857 let component = MockComponent::new();
858 let endpoint = component
859 .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
860 .unwrap();
861 let inner = component.get_endpoint("hdr-pass").unwrap();
862 let mut producer = endpoint.create_producer(&ctx).unwrap();
863 let mut msg = Message::new("body");
864 msg.headers
865 .insert("x-key".to_string(), serde_json::json!("value"));
866 producer.call(Exchange::new(msg)).await.unwrap();
867 inner
868 .await_exchanges(1, std::time::Duration::from_millis(500))
869 .await;
870 inner
871 .exchange(0)
872 .assert_header("x-key", serde_json::json!("value"));
873 }
874
875 #[tokio::test(flavor = "multi_thread")]
876 #[should_panic(expected = "expected header")]
877 async fn assert_header_fail() {
878 let ctx = test_producer_ctx();
879 let component = MockComponent::new();
880 let endpoint = component
881 .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
882 .unwrap();
883 let inner = component.get_endpoint("hdr-fail").unwrap();
884 let mut producer = endpoint.create_producer(&ctx).unwrap();
885 let mut msg = Message::new("body");
886 msg.headers
887 .insert("x-key".to_string(), serde_json::json!("value"));
888 producer.call(Exchange::new(msg)).await.unwrap();
889 inner
890 .await_exchanges(1, std::time::Duration::from_millis(500))
891 .await;
892 inner
893 .exchange(0)
894 .assert_header("x-key", serde_json::json!("other"));
895 }
896
897 #[tokio::test(flavor = "multi_thread")]
898 async fn assert_header_exists_pass() {
899 let ctx = test_producer_ctx();
900 let component = MockComponent::new();
901 let endpoint = component
902 .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
903 .unwrap();
904 let inner = component.get_endpoint("hdr-exists-pass").unwrap();
905 let mut producer = endpoint.create_producer(&ctx).unwrap();
906 let mut msg = Message::new("body");
907 msg.headers
908 .insert("x-present".to_string(), serde_json::json!(42));
909 producer.call(Exchange::new(msg)).await.unwrap();
910 inner
911 .await_exchanges(1, std::time::Duration::from_millis(500))
912 .await;
913 inner.exchange(0).assert_header_exists("x-present");
914 }
915
916 #[tokio::test(flavor = "multi_thread")]
917 #[should_panic(expected = "expected header")]
918 async fn assert_header_exists_fail() {
919 let ctx = test_producer_ctx();
920 let component = MockComponent::new();
921 let endpoint = component
922 .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
923 .unwrap();
924 let inner = component.get_endpoint("hdr-exists-fail").unwrap();
925 let mut producer = endpoint.create_producer(&ctx).unwrap();
926 producer
927 .call(Exchange::new(Message::new("body")))
928 .await
929 .unwrap();
930 inner
931 .await_exchanges(1, std::time::Duration::from_millis(500))
932 .await;
933 inner.exchange(0).assert_header_exists("x-missing");
934 }
935
936 #[tokio::test(flavor = "multi_thread")]
937 async fn assert_has_error_pass() {
938 let ctx = test_producer_ctx();
939 let component = MockComponent::new();
940 let endpoint = component
941 .create_endpoint("mock:err-pass", &NoOpComponentContext)
942 .unwrap();
943 let inner = component.get_endpoint("err-pass").unwrap();
944 let mut producer = endpoint.create_producer(&ctx).unwrap();
945 let mut ex = Exchange::new(Message::new("body"));
946 ex.error = Some(camel_component_api::CamelError::ProcessorError(
947 "oops".to_string(),
948 ));
949 producer.call(ex).await.unwrap();
950 inner
951 .await_exchanges(1, std::time::Duration::from_millis(500))
952 .await;
953 inner.exchange(0).assert_has_error();
954 }
955
956 #[tokio::test(flavor = "multi_thread")]
957 #[should_panic(expected = "expected exchange to have an error")]
958 async fn assert_has_error_fail() {
959 let ctx = test_producer_ctx();
960 let component = MockComponent::new();
961 let endpoint = component
962 .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
963 .unwrap();
964 let inner = component.get_endpoint("has-err-fail").unwrap();
965 let mut producer = endpoint.create_producer(&ctx).unwrap();
966 producer
967 .call(Exchange::new(Message::new("body")))
968 .await
969 .unwrap();
970 inner
971 .await_exchanges(1, std::time::Duration::from_millis(500))
972 .await;
973 inner.exchange(0).assert_has_error();
974 }
975
976 #[tokio::test(flavor = "multi_thread")]
977 async fn assert_no_error_pass() {
978 let ctx = test_producer_ctx();
979 let component = MockComponent::new();
980 let endpoint = component
981 .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
982 .unwrap();
983 let inner = component.get_endpoint("no-err-pass").unwrap();
984 let mut producer = endpoint.create_producer(&ctx).unwrap();
985 producer
986 .call(Exchange::new(Message::new("body")))
987 .await
988 .unwrap();
989 inner
990 .await_exchanges(1, std::time::Duration::from_millis(500))
991 .await;
992 inner.exchange(0).assert_no_error();
993 }
994
995 #[tokio::test(flavor = "multi_thread")]
996 #[should_panic(expected = "expected exchange to have no error")]
997 async fn assert_no_error_fail() {
998 let ctx = test_producer_ctx();
999 let component = MockComponent::new();
1000 let endpoint = component
1001 .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
1002 .unwrap();
1003 let inner = component.get_endpoint("no-err-fail").unwrap();
1004 let mut producer = endpoint.create_producer(&ctx).unwrap();
1005 let mut ex = Exchange::new(Message::new("body"));
1006 ex.error = Some(camel_component_api::CamelError::ProcessorError(
1007 "oops".to_string(),
1008 ));
1009 producer.call(ex).await.unwrap();
1010 inner
1011 .await_exchanges(1, std::time::Duration::from_millis(500))
1012 .await;
1013 inner.exchange(0).assert_no_error();
1014 }
1015}