1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::{Mutex, Notify};
8use tower::Service;
9
10use camel_component_api::parse_uri;
11use camel_component_api::{BoxProcessor, CamelError, Exchange};
12use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
13
14#[derive(Clone)]
29pub struct MockComponent {
30 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34 pub fn new() -> Self {
35 Self {
36 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37 }
38 }
39
40 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44 let registry = self
45 .registry
46 .lock()
47 .expect("mutex poisoned: another thread panicked while holding this lock");
48 registry.get(name).cloned()
49 }
50}
51
52impl Default for MockComponent {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl Component for MockComponent {
59 fn scheme(&self) -> &str {
60 "mock"
61 }
62
63 fn create_endpoint(
64 &self,
65 uri: &str,
66 _ctx: &dyn camel_component_api::ComponentContext,
67 ) -> Result<Box<dyn Endpoint>, CamelError> {
68 let parts = parse_uri(uri)?;
69 if parts.scheme != "mock" {
70 return Err(CamelError::InvalidUri(format!(
71 "expected scheme 'mock', got '{}'",
72 parts.scheme
73 )));
74 }
75
76 let name = parts.path;
77 let mut registry = self.registry.lock().map_err(|e| {
78 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
79 })?;
80 let inner = registry
81 .entry(name.clone())
82 .or_insert_with(|| {
83 Arc::new(MockEndpointInner {
84 uri: uri.to_string(),
85 name,
86 received: Arc::new(Mutex::new(Vec::new())),
87 notify: Arc::new(Notify::new()),
88 })
89 })
90 .clone();
91
92 Ok(Box::new(MockEndpoint(inner)))
93 }
94}
95
96pub struct MockEndpoint(Arc<MockEndpointInner>);
106
107pub struct MockEndpointInner {
113 uri: String,
114 pub name: String,
115 received: Arc<Mutex<Vec<Exchange>>>,
116 notify: Arc<Notify>,
117}
118
119impl MockEndpointInner {
120 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
122 self.received.lock().await.clone()
123 }
124
125 pub async fn assert_exchange_count(&self, expected: usize) {
131 let actual = self.received.lock().await.len();
132 assert_eq!(
133 actual, expected,
134 "MockEndpoint expected {expected} exchanges, got {actual}"
135 );
136 }
137
138 pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
147 let deadline = tokio::time::Instant::now() + timeout;
148 loop {
149 {
150 let received = self.received.lock().await;
151 if received.len() >= count {
152 return;
153 }
154 }
155 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
156 if remaining.is_zero() {
157 let got = self.received.lock().await.len();
160 if got >= count {
161 return;
162 }
163 panic!(
164 "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
165 self.name, count, got, timeout
166 );
167 }
168 tokio::select! {
169 _ = self.notify.notified() => {}
170 _ = tokio::time::sleep(remaining) => {}
171 }
172 }
173 }
174
175 pub fn exchange(&self, idx: usize) -> ExchangeAssert {
187 let received = tokio::task::block_in_place(|| self.received.blocking_lock());
188 if idx >= received.len() {
189 panic!(
190 "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
191 self.name,
192 idx,
193 received.len()
194 );
195 }
196 ExchangeAssert {
197 exchange: received[idx].clone(),
198 idx,
199 endpoint_name: self.name.clone(),
200 }
201 }
202}
203
204impl Endpoint for MockEndpoint {
205 fn uri(&self) -> &str {
206 &self.0.uri
207 }
208
209 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
210 Err(CamelError::EndpointCreationFailed(
211 "mock endpoint does not support consumers (it is a sink)".to_string(),
212 ))
213 }
214
215 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
216 Ok(BoxProcessor::new(MockProducer {
217 received: Arc::clone(&self.0.received),
218 notify: Arc::clone(&self.0.notify),
219 }))
220 }
221}
222
223#[derive(Clone)]
229struct MockProducer {
230 received: Arc<Mutex<Vec<Exchange>>>,
231 notify: Arc<Notify>,
232}
233
234impl Service<Exchange> for MockProducer {
235 type Response = Exchange;
236 type Error = CamelError;
237 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
238
239 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240 Poll::Ready(Ok(()))
241 }
242
243 fn call(&mut self, exchange: Exchange) -> Self::Future {
244 let received = Arc::clone(&self.received);
245 let notify = Arc::clone(&self.notify);
246 Box::pin(async move {
247 received.lock().await.push(exchange.clone());
248 notify.notify_waiters();
249 Ok(exchange)
250 })
251 }
252}
253
254pub struct ExchangeAssert {
266 exchange: Exchange,
267 idx: usize,
268 endpoint_name: String,
269}
270
271impl ExchangeAssert {
272 fn location(&self) -> String {
273 format!(
274 "MockEndpoint '{}' exchange[{}]",
275 self.endpoint_name, self.idx
276 )
277 }
278
279 pub fn assert_body_text(self, expected: &str) -> Self {
281 match self.exchange.input.body.as_text() {
282 Some(actual) if actual == expected => {}
283 Some(actual) => panic!(
284 "{}: expected body text {:?}, got {:?}",
285 self.location(),
286 expected,
287 actual
288 ),
289 None => panic!(
290 "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
291 self.location(),
292 expected,
293 self.exchange.input.body
294 ),
295 }
296 self
297 }
298
299 pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
301 match &self.exchange.input.body {
302 camel_component_api::Body::Json(actual) if *actual == expected => {}
303 camel_component_api::Body::Json(actual) => panic!(
304 "{}: expected body JSON {}, got {}",
305 self.location(),
306 expected,
307 actual
308 ),
309 other => panic!(
310 "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
311 self.location(),
312 expected,
313 other
314 ),
315 }
316 self
317 }
318
319 pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
321 match &self.exchange.input.body {
322 camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
323 camel_component_api::Body::Bytes(actual) => panic!(
324 "{}: expected body bytes {:?}, got {:?}",
325 self.location(),
326 expected,
327 actual
328 ),
329 other => panic!(
330 "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
331 self.location(),
332 expected,
333 other
334 ),
335 }
336 self
337 }
338
339 pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
345 match self.exchange.input.headers.get(key) {
346 Some(actual) if *actual == expected => {}
347 Some(actual) => panic!(
348 "{}: expected header {:?} = {}, got {}",
349 self.location(),
350 key,
351 expected,
352 actual
353 ),
354 None => panic!(
355 "{}: expected header {:?} = {}, but header is absent",
356 self.location(),
357 key,
358 expected
359 ),
360 }
361 self
362 }
363
364 pub fn assert_header_exists(self, key: &str) -> Self {
370 if !self.exchange.input.headers.contains_key(key) {
371 panic!(
372 "{}: expected header {:?} to be present, but it was absent",
373 self.location(),
374 key
375 );
376 }
377 self
378 }
379
380 pub fn assert_has_error(self) -> Self {
386 if self.exchange.error.is_none() {
387 panic!(
388 "{}: expected exchange to have an error, but error is None",
389 self.location()
390 );
391 }
392 self
393 }
394
395 pub fn assert_no_error(self) -> Self {
401 if let Some(ref err) = self.exchange.error {
402 panic!(
403 "{}: expected exchange to have no error, but got: {}",
404 self.location(),
405 err
406 );
407 }
408 self
409 }
410}
411
412#[cfg(test)]
417mod tests {
418 use super::*;
419 use camel_component_api::Message;
420 use camel_component_api::NoOpComponentContext;
421 use tower::ServiceExt;
422
423 fn test_producer_ctx() -> ProducerContext {
424 ProducerContext::new()
425 }
426
427 #[test]
428 fn test_mock_component_scheme() {
429 let component = MockComponent::new();
430 assert_eq!(component.scheme(), "mock");
431 }
432
433 #[test]
434 fn test_mock_component_default() {
435 let component = MockComponent::default();
436 assert_eq!(component.scheme(), "mock");
437 assert!(component.get_endpoint("missing").is_none());
438 }
439
440 #[test]
441 fn test_mock_creates_endpoint() {
442 let component = MockComponent::new();
443 let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
444 assert!(endpoint.is_ok());
445 }
446
447 #[test]
448 fn test_mock_wrong_scheme() {
449 let component = MockComponent::new();
450 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
451 assert!(result.is_err());
452 }
453
454 #[test]
455 fn test_mock_endpoint_no_consumer() {
456 let component = MockComponent::new();
457 let endpoint = component
458 .create_endpoint("mock:result", &NoOpComponentContext)
459 .unwrap();
460 assert!(endpoint.create_consumer().is_err());
461 }
462
463 #[test]
464 fn test_mock_endpoint_creates_producer() {
465 let ctx = test_producer_ctx();
466 let component = MockComponent::new();
467 let endpoint = component
468 .create_endpoint("mock:result", &NoOpComponentContext)
469 .unwrap();
470 assert!(endpoint.create_producer(&ctx).is_ok());
471 }
472
473 #[test]
474 fn test_mock_endpoint_uri() {
475 let component = MockComponent::new();
476 let endpoint = component
477 .create_endpoint("mock:uri-check", &NoOpComponentContext)
478 .unwrap();
479 assert_eq!(endpoint.uri(), "mock:uri-check");
480 }
481
482 #[test]
483 fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
484 let component = MockComponent::new();
485 let _ = component
486 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
487 .unwrap();
488 let _ = component
489 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
490 .unwrap();
491
492 let first = component.get_endpoint("shared-inner").unwrap();
493 let second = component.get_endpoint("shared-inner").unwrap();
494 assert!(Arc::ptr_eq(&first, &second));
495 }
496
497 #[tokio::test]
498 async fn test_mock_producer_records_exchange() {
499 let ctx = test_producer_ctx();
500 let component = MockComponent::new();
501 let endpoint = component
502 .create_endpoint("mock:test", &NoOpComponentContext)
503 .unwrap();
504
505 let mut producer = endpoint.create_producer(&ctx).unwrap();
506
507 let ex1 = Exchange::new(Message::new("first"));
508 let ex2 = Exchange::new(Message::new("second"));
509
510 producer.call(ex1).await.unwrap();
511 producer.call(ex2).await.unwrap();
512
513 let inner = component.get_endpoint("test").unwrap();
514 inner.assert_exchange_count(2).await;
515
516 let received = inner.get_received_exchanges().await;
517 assert_eq!(received[0].input.body.as_text(), Some("first"));
518 assert_eq!(received[1].input.body.as_text(), Some("second"));
519 }
520
521 #[tokio::test]
522 async fn test_mock_producer_passes_through_exchange() {
523 let ctx = test_producer_ctx();
524 let component = MockComponent::new();
525 let endpoint = component
526 .create_endpoint("mock:passthrough", &NoOpComponentContext)
527 .unwrap();
528
529 let producer = endpoint.create_producer(&ctx).unwrap();
530 let exchange = Exchange::new(Message::new("hello"));
531 let result = producer.oneshot(exchange).await.unwrap();
532
533 assert_eq!(result.input.body.as_text(), Some("hello"));
535 }
536
537 #[tokio::test]
538 async fn test_mock_assert_count_passes() {
539 let component = MockComponent::new();
540 let endpoint = component
541 .create_endpoint("mock:count", &NoOpComponentContext)
542 .unwrap();
543 let inner = component.get_endpoint("count").unwrap();
544
545 inner.assert_exchange_count(0).await;
546
547 let ctx = test_producer_ctx();
548 let mut producer = endpoint.create_producer(&ctx).unwrap();
549 producer
550 .call(Exchange::new(Message::new("one")))
551 .await
552 .unwrap();
553
554 inner.assert_exchange_count(1).await;
555 }
556
557 #[tokio::test]
558 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
559 async fn test_mock_assert_count_fails() {
560 let component = MockComponent::new();
561 let _endpoint = component
564 .create_endpoint("mock:fail", &NoOpComponentContext)
565 .unwrap();
566 let inner = component.get_endpoint("fail").unwrap();
567
568 inner.assert_exchange_count(5).await;
569 }
570
571 #[tokio::test]
572 async fn test_mock_component_shared_registry() {
573 let component = MockComponent::new();
574 let ep1 = component
575 .create_endpoint("mock:shared", &NoOpComponentContext)
576 .unwrap();
577 let ep2 = component
578 .create_endpoint("mock:shared", &NoOpComponentContext)
579 .unwrap();
580
581 let ctx = test_producer_ctx();
583 let mut p1 = ep1.create_producer(&ctx).unwrap();
584 p1.call(Exchange::new(Message::new("from-ep1")))
585 .await
586 .unwrap();
587
588 let mut p2 = ep2.create_producer(&ctx).unwrap();
590 p2.call(Exchange::new(Message::new("from-ep2")))
591 .await
592 .unwrap();
593
594 let inner = component.get_endpoint("shared").unwrap();
596 inner.assert_exchange_count(2).await;
597
598 let received = inner.get_received_exchanges().await;
599 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
600 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
601 }
602
603 #[tokio::test]
604 async fn await_exchanges_resolves_immediately() {
605 let ctx = test_producer_ctx();
607 let component = MockComponent::new();
608 let endpoint = component
609 .create_endpoint("mock:immediate", &NoOpComponentContext)
610 .unwrap();
611 let inner = component.get_endpoint("immediate").unwrap();
612
613 let mut producer = endpoint.create_producer(&ctx).unwrap();
614 producer
615 .call(Exchange::new(Message::new("a")))
616 .await
617 .unwrap();
618 producer
619 .call(Exchange::new(Message::new("b")))
620 .await
621 .unwrap();
622
623 inner
625 .await_exchanges(2, std::time::Duration::from_millis(100))
626 .await;
627 }
628
629 #[tokio::test]
630 async fn await_exchanges_waits_then_resolves() {
631 let ctx = test_producer_ctx();
633 let component = MockComponent::new();
634 let endpoint = component
635 .create_endpoint("mock:waiter", &NoOpComponentContext)
636 .unwrap();
637 let inner = component.get_endpoint("waiter").unwrap();
638
639 let mut producer = endpoint.create_producer(&ctx).unwrap();
641 tokio::spawn(async move {
642 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
643 producer
644 .call(Exchange::new(Message::new("delayed")))
645 .await
646 .unwrap();
647 });
648
649 inner
651 .await_exchanges(1, std::time::Duration::from_millis(500))
652 .await;
653
654 let received = inner.get_received_exchanges().await;
655 assert_eq!(received.len(), 1);
656 assert_eq!(received[0].input.body.as_text(), Some("delayed"));
657 }
658
659 #[tokio::test]
660 #[should_panic(expected = "timed out waiting for 5 exchanges")]
661 async fn await_exchanges_times_out() {
662 let component = MockComponent::new();
663 let _endpoint = component
664 .create_endpoint("mock:timeout", &NoOpComponentContext)
665 .unwrap();
666 let inner = component.get_endpoint("timeout").unwrap();
667
668 inner
670 .await_exchanges(5, std::time::Duration::from_millis(50))
671 .await;
672 }
673
674 #[tokio::test(flavor = "multi_thread")]
675 async fn exchange_idx_returns_assert() {
676 let ctx = test_producer_ctx();
677 let component = MockComponent::new();
678 let endpoint = component
679 .create_endpoint("mock:assert-idx", &NoOpComponentContext)
680 .unwrap();
681 let inner = component.get_endpoint("assert-idx").unwrap();
682
683 let mut producer = endpoint.create_producer(&ctx).unwrap();
684 producer
685 .call(Exchange::new(Message::new("hello")))
686 .await
687 .unwrap();
688
689 inner
690 .await_exchanges(1, std::time::Duration::from_millis(500))
691 .await;
692 let _assert = inner.exchange(0);
694 }
695
696 #[tokio::test(flavor = "multi_thread")]
697 #[should_panic(expected = "exchange index 5 out of bounds")]
698 async fn exchange_idx_out_of_bounds() {
699 let ctx = test_producer_ctx();
700 let component = MockComponent::new();
701 let endpoint = component
702 .create_endpoint("mock:oob", &NoOpComponentContext)
703 .unwrap();
704 let inner = component.get_endpoint("oob").unwrap();
705
706 let mut producer = endpoint.create_producer(&ctx).unwrap();
707 producer
708 .call(Exchange::new(Message::new("only-one")))
709 .await
710 .unwrap();
711
712 inner
713 .await_exchanges(1, std::time::Duration::from_millis(500))
714 .await;
715 let _assert = inner.exchange(5);
717 }
718
719 #[tokio::test(flavor = "multi_thread")]
720 async fn assert_body_text_pass() {
721 let ctx = test_producer_ctx();
722 let component = MockComponent::new();
723 let endpoint = component
724 .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
725 .unwrap();
726 let inner = component.get_endpoint("body-text-pass").unwrap();
727 let mut producer = endpoint.create_producer(&ctx).unwrap();
728 producer
729 .call(Exchange::new(Message::new("hello")))
730 .await
731 .unwrap();
732 inner
733 .await_exchanges(1, std::time::Duration::from_millis(500))
734 .await;
735 inner.exchange(0).assert_body_text("hello");
736 }
737
738 #[tokio::test(flavor = "multi_thread")]
739 #[should_panic(expected = "expected body text")]
740 async fn assert_body_text_fail() {
741 let ctx = test_producer_ctx();
742 let component = MockComponent::new();
743 let endpoint = component
744 .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
745 .unwrap();
746 let inner = component.get_endpoint("body-text-fail").unwrap();
747 let mut producer = endpoint.create_producer(&ctx).unwrap();
748 producer
749 .call(Exchange::new(Message::new("hello")))
750 .await
751 .unwrap();
752 inner
753 .await_exchanges(1, std::time::Duration::from_millis(500))
754 .await;
755 inner.exchange(0).assert_body_text("world");
756 }
757
758 #[tokio::test(flavor = "multi_thread")]
759 async fn assert_body_json_pass() {
760 use camel_component_api::Body;
761 let ctx = test_producer_ctx();
762 let component = MockComponent::new();
763 let endpoint = component
764 .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
765 .unwrap();
766 let inner = component.get_endpoint("body-json-pass").unwrap();
767 let mut producer = endpoint.create_producer(&ctx).unwrap();
768 let mut msg = Message::new("");
769 msg.body = Body::Json(serde_json::json!({"key": "value"}));
770 producer.call(Exchange::new(msg)).await.unwrap();
771 inner
772 .await_exchanges(1, std::time::Duration::from_millis(500))
773 .await;
774 inner
775 .exchange(0)
776 .assert_body_json(serde_json::json!({"key": "value"}));
777 }
778
779 #[tokio::test(flavor = "multi_thread")]
780 #[should_panic(expected = "expected body JSON")]
781 async fn assert_body_json_fail() {
782 use camel_component_api::Body;
783 let ctx = test_producer_ctx();
784 let component = MockComponent::new();
785 let endpoint = component
786 .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
787 .unwrap();
788 let inner = component.get_endpoint("body-json-fail").unwrap();
789 let mut producer = endpoint.create_producer(&ctx).unwrap();
790 let mut msg = Message::new("");
791 msg.body = Body::Json(serde_json::json!({"key": "value"}));
792 producer.call(Exchange::new(msg)).await.unwrap();
793 inner
794 .await_exchanges(1, std::time::Duration::from_millis(500))
795 .await;
796 inner
797 .exchange(0)
798 .assert_body_json(serde_json::json!({"key": "other"}));
799 }
800
801 #[tokio::test(flavor = "multi_thread")]
802 async fn assert_body_bytes_pass() {
803 use bytes::Bytes;
804 use camel_component_api::Body;
805 let ctx = test_producer_ctx();
806 let component = MockComponent::new();
807 let endpoint = component
808 .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
809 .unwrap();
810 let inner = component.get_endpoint("body-bytes-pass").unwrap();
811 let mut producer = endpoint.create_producer(&ctx).unwrap();
812 let mut msg = Message::new("");
813 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
814 producer.call(Exchange::new(msg)).await.unwrap();
815 inner
816 .await_exchanges(1, std::time::Duration::from_millis(500))
817 .await;
818 inner.exchange(0).assert_body_bytes(b"binary");
819 }
820
821 #[tokio::test(flavor = "multi_thread")]
822 #[should_panic(expected = "expected body bytes")]
823 async fn assert_body_bytes_fail() {
824 use bytes::Bytes;
825 use camel_component_api::Body;
826 let ctx = test_producer_ctx();
827 let component = MockComponent::new();
828 let endpoint = component
829 .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
830 .unwrap();
831 let inner = component.get_endpoint("body-bytes-fail").unwrap();
832 let mut producer = endpoint.create_producer(&ctx).unwrap();
833 let mut msg = Message::new("");
834 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
835 producer.call(Exchange::new(msg)).await.unwrap();
836 inner
837 .await_exchanges(1, std::time::Duration::from_millis(500))
838 .await;
839 inner.exchange(0).assert_body_bytes(b"different");
840 }
841
842 #[tokio::test(flavor = "multi_thread")]
843 async fn assert_header_pass() {
844 let ctx = test_producer_ctx();
845 let component = MockComponent::new();
846 let endpoint = component
847 .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
848 .unwrap();
849 let inner = component.get_endpoint("hdr-pass").unwrap();
850 let mut producer = endpoint.create_producer(&ctx).unwrap();
851 let mut msg = Message::new("body");
852 msg.headers
853 .insert("x-key".to_string(), serde_json::json!("value"));
854 producer.call(Exchange::new(msg)).await.unwrap();
855 inner
856 .await_exchanges(1, std::time::Duration::from_millis(500))
857 .await;
858 inner
859 .exchange(0)
860 .assert_header("x-key", serde_json::json!("value"));
861 }
862
863 #[tokio::test(flavor = "multi_thread")]
864 #[should_panic(expected = "expected header")]
865 async fn assert_header_fail() {
866 let ctx = test_producer_ctx();
867 let component = MockComponent::new();
868 let endpoint = component
869 .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
870 .unwrap();
871 let inner = component.get_endpoint("hdr-fail").unwrap();
872 let mut producer = endpoint.create_producer(&ctx).unwrap();
873 let mut msg = Message::new("body");
874 msg.headers
875 .insert("x-key".to_string(), serde_json::json!("value"));
876 producer.call(Exchange::new(msg)).await.unwrap();
877 inner
878 .await_exchanges(1, std::time::Duration::from_millis(500))
879 .await;
880 inner
881 .exchange(0)
882 .assert_header("x-key", serde_json::json!("other"));
883 }
884
885 #[tokio::test(flavor = "multi_thread")]
886 async fn assert_header_exists_pass() {
887 let ctx = test_producer_ctx();
888 let component = MockComponent::new();
889 let endpoint = component
890 .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
891 .unwrap();
892 let inner = component.get_endpoint("hdr-exists-pass").unwrap();
893 let mut producer = endpoint.create_producer(&ctx).unwrap();
894 let mut msg = Message::new("body");
895 msg.headers
896 .insert("x-present".to_string(), serde_json::json!(42));
897 producer.call(Exchange::new(msg)).await.unwrap();
898 inner
899 .await_exchanges(1, std::time::Duration::from_millis(500))
900 .await;
901 inner.exchange(0).assert_header_exists("x-present");
902 }
903
904 #[tokio::test(flavor = "multi_thread")]
905 #[should_panic(expected = "expected header")]
906 async fn assert_header_exists_fail() {
907 let ctx = test_producer_ctx();
908 let component = MockComponent::new();
909 let endpoint = component
910 .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
911 .unwrap();
912 let inner = component.get_endpoint("hdr-exists-fail").unwrap();
913 let mut producer = endpoint.create_producer(&ctx).unwrap();
914 producer
915 .call(Exchange::new(Message::new("body")))
916 .await
917 .unwrap();
918 inner
919 .await_exchanges(1, std::time::Duration::from_millis(500))
920 .await;
921 inner.exchange(0).assert_header_exists("x-missing");
922 }
923
924 #[tokio::test(flavor = "multi_thread")]
925 async fn assert_has_error_pass() {
926 let ctx = test_producer_ctx();
927 let component = MockComponent::new();
928 let endpoint = component
929 .create_endpoint("mock:err-pass", &NoOpComponentContext)
930 .unwrap();
931 let inner = component.get_endpoint("err-pass").unwrap();
932 let mut producer = endpoint.create_producer(&ctx).unwrap();
933 let mut ex = Exchange::new(Message::new("body"));
934 ex.error = Some(camel_component_api::CamelError::ProcessorError(
935 "oops".to_string(),
936 ));
937 producer.call(ex).await.unwrap();
938 inner
939 .await_exchanges(1, std::time::Duration::from_millis(500))
940 .await;
941 inner.exchange(0).assert_has_error();
942 }
943
944 #[tokio::test(flavor = "multi_thread")]
945 #[should_panic(expected = "expected exchange to have an error")]
946 async fn assert_has_error_fail() {
947 let ctx = test_producer_ctx();
948 let component = MockComponent::new();
949 let endpoint = component
950 .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
951 .unwrap();
952 let inner = component.get_endpoint("has-err-fail").unwrap();
953 let mut producer = endpoint.create_producer(&ctx).unwrap();
954 producer
955 .call(Exchange::new(Message::new("body")))
956 .await
957 .unwrap();
958 inner
959 .await_exchanges(1, std::time::Duration::from_millis(500))
960 .await;
961 inner.exchange(0).assert_has_error();
962 }
963
964 #[tokio::test(flavor = "multi_thread")]
965 async fn assert_no_error_pass() {
966 let ctx = test_producer_ctx();
967 let component = MockComponent::new();
968 let endpoint = component
969 .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
970 .unwrap();
971 let inner = component.get_endpoint("no-err-pass").unwrap();
972 let mut producer = endpoint.create_producer(&ctx).unwrap();
973 producer
974 .call(Exchange::new(Message::new("body")))
975 .await
976 .unwrap();
977 inner
978 .await_exchanges(1, std::time::Duration::from_millis(500))
979 .await;
980 inner.exchange(0).assert_no_error();
981 }
982
983 #[tokio::test(flavor = "multi_thread")]
984 #[should_panic(expected = "expected exchange to have no error")]
985 async fn assert_no_error_fail() {
986 let ctx = test_producer_ctx();
987 let component = MockComponent::new();
988 let endpoint = component
989 .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
990 .unwrap();
991 let inner = component.get_endpoint("no-err-fail").unwrap();
992 let mut producer = endpoint.create_producer(&ctx).unwrap();
993 let mut ex = Exchange::new(Message::new("body"));
994 ex.error = Some(camel_component_api::CamelError::ProcessorError(
995 "oops".to_string(),
996 ));
997 producer.call(ex).await.unwrap();
998 inner
999 .await_exchanges(1, std::time::Duration::from_millis(500))
1000 .await;
1001 inner.exchange(0).assert_no_error();
1002 }
1003}