1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::{Mutex, Notify};
8use tower::Service;
9
10use camel_component_api::parse_uri;
11use camel_component_api::{BoxProcessor, CamelError, Exchange};
12use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
13
14#[derive(Clone)]
29pub struct MockComponent {
30 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34 pub fn new() -> Self {
35 Self {
36 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37 }
38 }
39
40 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44 let registry = self
45 .registry
46 .lock()
47 .expect("mutex poisoned: another thread panicked while holding this lock");
48 registry.get(name).cloned()
49 }
50}
51
52impl Default for MockComponent {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl Component for MockComponent {
59 fn scheme(&self) -> &str {
60 "mock"
61 }
62
63 fn create_endpoint(
64 &self,
65 uri: &str,
66 _ctx: &dyn camel_component_api::ComponentContext,
67 ) -> Result<Box<dyn Endpoint>, CamelError> {
68 let parts = parse_uri(uri)?;
69 if parts.scheme != "mock" {
70 return Err(CamelError::InvalidUri(format!(
71 "expected scheme 'mock', got '{}'",
72 parts.scheme
73 )));
74 }
75
76 let name = parts.path;
77 let mut registry = self.registry.lock().map_err(|e| {
78 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
79 })?;
80 let inner = registry
81 .entry(name.clone())
82 .or_insert_with(|| {
83 Arc::new(MockEndpointInner {
84 uri: uri.to_string(),
85 name,
86 received: Arc::new(Mutex::new(Vec::new())),
87 notify: Arc::new(Notify::new()),
88 })
89 })
90 .clone();
91
92 Ok(Box::new(MockEndpoint(inner)))
93 }
94}
95
96pub struct MockEndpoint(Arc<MockEndpointInner>);
106
107pub struct MockEndpointInner {
113 uri: String,
114 pub name: String,
115 received: Arc<Mutex<Vec<Exchange>>>,
116 notify: Arc<Notify>,
117}
118
119impl MockEndpointInner {
120 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
122 self.received.lock().await.clone()
123 }
124
125 pub async fn assert_exchange_count(&self, expected: usize) {
131 let actual = self.received.lock().await.len();
132 assert_eq!(
133 actual, expected,
134 "MockEndpoint expected {expected} exchanges, got {actual}"
135 );
136 }
137
138 pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
147 let deadline = tokio::time::Instant::now() + timeout;
148 loop {
149 {
150 let received = self.received.lock().await;
151 if received.len() >= count {
152 return;
153 }
154 }
155 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
156 if remaining.is_zero() {
157 let got = self.received.lock().await.len();
160 if got >= count {
161 return;
162 }
163 panic!(
164 "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
165 self.name, count, got, timeout
166 );
167 }
168 tokio::select! {
169 _ = self.notify.notified() => {}
170 _ = tokio::time::sleep(remaining) => {}
171 }
172 }
173 }
174
175 pub fn exchange(&self, idx: usize) -> ExchangeAssert {
187 let received = tokio::task::block_in_place(|| self.received.blocking_lock());
188 if idx >= received.len() {
189 panic!(
190 "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
191 self.name,
192 idx,
193 received.len()
194 );
195 }
196 ExchangeAssert {
197 exchange: received[idx].clone(),
198 idx,
199 endpoint_name: self.name.clone(),
200 }
201 }
202}
203
204impl Endpoint for MockEndpoint {
205 fn uri(&self) -> &str {
206 &self.0.uri
207 }
208
209 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
210 Err(CamelError::EndpointCreationFailed(
211 "mock endpoint does not support consumers (it is a sink)".to_string(),
212 ))
213 }
214
215 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
216 Ok(BoxProcessor::new(MockProducer {
217 received: Arc::clone(&self.0.received),
218 notify: Arc::clone(&self.0.notify),
219 }))
220 }
221}
222
223#[derive(Clone)]
229struct MockProducer {
230 received: Arc<Mutex<Vec<Exchange>>>,
231 notify: Arc<Notify>,
232}
233
234impl Service<Exchange> for MockProducer {
235 type Response = Exchange;
236 type Error = CamelError;
237 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
238
239 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
240 Poll::Ready(Ok(()))
241 }
242
243 fn call(&mut self, exchange: Exchange) -> Self::Future {
244 let received = Arc::clone(&self.received);
245 let notify = Arc::clone(&self.notify);
246 Box::pin(async move {
247 received.lock().await.push(exchange.clone());
248 notify.notify_waiters();
249 Ok(exchange)
250 })
251 }
252}
253
254pub struct ExchangeAssert {
266 exchange: Exchange,
267 idx: usize,
268 endpoint_name: String,
269}
270
271impl ExchangeAssert {
272 fn location(&self) -> String {
273 format!(
274 "MockEndpoint '{}' exchange[{}]",
275 self.endpoint_name, self.idx
276 )
277 }
278
279 pub fn assert_body_text(self, expected: &str) -> Self {
281 match self.exchange.input.body.as_text() {
282 Some(actual) if actual == expected => {}
283 Some(actual) => panic!(
284 "{}: expected body text {:?}, got {:?}",
285 self.location(),
286 expected,
287 actual
288 ),
289 None => panic!(
290 "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
291 self.location(),
292 expected,
293 self.exchange.input.body
294 ),
295 }
296 self
297 }
298
299 pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
301 match &self.exchange.input.body {
302 camel_component_api::Body::Json(actual) if *actual == expected => {}
303 camel_component_api::Body::Json(actual) => panic!(
304 "{}: expected body JSON {}, got {}",
305 self.location(),
306 expected,
307 actual
308 ),
309 other => panic!(
310 "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
311 self.location(),
312 expected,
313 other
314 ),
315 }
316 self
317 }
318
319 pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
321 match &self.exchange.input.body {
322 camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
323 camel_component_api::Body::Bytes(actual) => panic!(
324 "{}: expected body bytes {:?}, got {:?}",
325 self.location(),
326 expected,
327 actual
328 ),
329 other => panic!(
330 "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
331 self.location(),
332 expected,
333 other
334 ),
335 }
336 self
337 }
338
339 pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
345 match self.exchange.input.headers.get(key) {
346 Some(actual) if *actual == expected => {}
347 Some(actual) => panic!(
348 "{}: expected header {:?} = {}, got {}",
349 self.location(),
350 key,
351 expected,
352 actual
353 ),
354 None => panic!(
355 "{}: expected header {:?} = {}, but header is absent",
356 self.location(),
357 key,
358 expected
359 ),
360 }
361 self
362 }
363
364 pub fn assert_header_exists(self, key: &str) -> Self {
370 if !self.exchange.input.headers.contains_key(key) {
371 panic!(
372 "{}: expected header {:?} to be present, but it was absent",
373 self.location(),
374 key
375 );
376 }
377 self
378 }
379
380 pub fn assert_has_error(self) -> Self {
386 if self.exchange.error.is_none() {
387 panic!(
388 "{}: expected exchange to have an error, but error is None",
389 self.location()
390 );
391 }
392 self
393 }
394
395 pub fn assert_no_error(self) -> Self {
401 if let Some(ref err) = self.exchange.error {
402 panic!(
403 "{}: expected exchange to have no error, but got: {}",
404 self.location(),
405 err
406 );
407 }
408 self
409 }
410}
411
412#[cfg(test)]
417mod tests {
418 use super::*;
419 use camel_component_api::Message;
420 use camel_component_api::NoOpComponentContext;
421 use tower::ServiceExt;
422
423 fn test_producer_ctx() -> ProducerContext {
424 ProducerContext::new()
425 }
426
427 #[test]
428 fn test_mock_component_scheme() {
429 let component = MockComponent::new();
430 assert_eq!(component.scheme(), "mock");
431 }
432
433 #[test]
434 fn test_mock_creates_endpoint() {
435 let component = MockComponent::new();
436 let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
437 assert!(endpoint.is_ok());
438 }
439
440 #[test]
441 fn test_mock_wrong_scheme() {
442 let component = MockComponent::new();
443 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
444 assert!(result.is_err());
445 }
446
447 #[test]
448 fn test_mock_endpoint_no_consumer() {
449 let component = MockComponent::new();
450 let endpoint = component
451 .create_endpoint("mock:result", &NoOpComponentContext)
452 .unwrap();
453 assert!(endpoint.create_consumer().is_err());
454 }
455
456 #[test]
457 fn test_mock_endpoint_creates_producer() {
458 let ctx = test_producer_ctx();
459 let component = MockComponent::new();
460 let endpoint = component
461 .create_endpoint("mock:result", &NoOpComponentContext)
462 .unwrap();
463 assert!(endpoint.create_producer(&ctx).is_ok());
464 }
465
466 #[tokio::test]
467 async fn test_mock_producer_records_exchange() {
468 let ctx = test_producer_ctx();
469 let component = MockComponent::new();
470 let endpoint = component
471 .create_endpoint("mock:test", &NoOpComponentContext)
472 .unwrap();
473
474 let mut producer = endpoint.create_producer(&ctx).unwrap();
475
476 let ex1 = Exchange::new(Message::new("first"));
477 let ex2 = Exchange::new(Message::new("second"));
478
479 producer.call(ex1).await.unwrap();
480 producer.call(ex2).await.unwrap();
481
482 let inner = component.get_endpoint("test").unwrap();
483 inner.assert_exchange_count(2).await;
484
485 let received = inner.get_received_exchanges().await;
486 assert_eq!(received[0].input.body.as_text(), Some("first"));
487 assert_eq!(received[1].input.body.as_text(), Some("second"));
488 }
489
490 #[tokio::test]
491 async fn test_mock_producer_passes_through_exchange() {
492 let ctx = test_producer_ctx();
493 let component = MockComponent::new();
494 let endpoint = component
495 .create_endpoint("mock:passthrough", &NoOpComponentContext)
496 .unwrap();
497
498 let producer = endpoint.create_producer(&ctx).unwrap();
499 let exchange = Exchange::new(Message::new("hello"));
500 let result = producer.oneshot(exchange).await.unwrap();
501
502 assert_eq!(result.input.body.as_text(), Some("hello"));
504 }
505
506 #[tokio::test]
507 async fn test_mock_assert_count_passes() {
508 let component = MockComponent::new();
509 let endpoint = component
510 .create_endpoint("mock:count", &NoOpComponentContext)
511 .unwrap();
512 let inner = component.get_endpoint("count").unwrap();
513
514 inner.assert_exchange_count(0).await;
515
516 let ctx = test_producer_ctx();
517 let mut producer = endpoint.create_producer(&ctx).unwrap();
518 producer
519 .call(Exchange::new(Message::new("one")))
520 .await
521 .unwrap();
522
523 inner.assert_exchange_count(1).await;
524 }
525
526 #[tokio::test]
527 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
528 async fn test_mock_assert_count_fails() {
529 let component = MockComponent::new();
530 let _endpoint = component
533 .create_endpoint("mock:fail", &NoOpComponentContext)
534 .unwrap();
535 let inner = component.get_endpoint("fail").unwrap();
536
537 inner.assert_exchange_count(5).await;
538 }
539
540 #[tokio::test]
541 async fn test_mock_component_shared_registry() {
542 let component = MockComponent::new();
543 let ep1 = component
544 .create_endpoint("mock:shared", &NoOpComponentContext)
545 .unwrap();
546 let ep2 = component
547 .create_endpoint("mock:shared", &NoOpComponentContext)
548 .unwrap();
549
550 let ctx = test_producer_ctx();
552 let mut p1 = ep1.create_producer(&ctx).unwrap();
553 p1.call(Exchange::new(Message::new("from-ep1")))
554 .await
555 .unwrap();
556
557 let mut p2 = ep2.create_producer(&ctx).unwrap();
559 p2.call(Exchange::new(Message::new("from-ep2")))
560 .await
561 .unwrap();
562
563 let inner = component.get_endpoint("shared").unwrap();
565 inner.assert_exchange_count(2).await;
566
567 let received = inner.get_received_exchanges().await;
568 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
569 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
570 }
571
572 #[tokio::test]
573 async fn await_exchanges_resolves_immediately() {
574 let ctx = test_producer_ctx();
576 let component = MockComponent::new();
577 let endpoint = component
578 .create_endpoint("mock:immediate", &NoOpComponentContext)
579 .unwrap();
580 let inner = component.get_endpoint("immediate").unwrap();
581
582 let mut producer = endpoint.create_producer(&ctx).unwrap();
583 producer
584 .call(Exchange::new(Message::new("a")))
585 .await
586 .unwrap();
587 producer
588 .call(Exchange::new(Message::new("b")))
589 .await
590 .unwrap();
591
592 inner
594 .await_exchanges(2, std::time::Duration::from_millis(100))
595 .await;
596 }
597
598 #[tokio::test]
599 async fn await_exchanges_waits_then_resolves() {
600 let ctx = test_producer_ctx();
602 let component = MockComponent::new();
603 let endpoint = component
604 .create_endpoint("mock:waiter", &NoOpComponentContext)
605 .unwrap();
606 let inner = component.get_endpoint("waiter").unwrap();
607
608 let mut producer = endpoint.create_producer(&ctx).unwrap();
610 tokio::spawn(async move {
611 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
612 producer
613 .call(Exchange::new(Message::new("delayed")))
614 .await
615 .unwrap();
616 });
617
618 inner
620 .await_exchanges(1, std::time::Duration::from_millis(500))
621 .await;
622
623 let received = inner.get_received_exchanges().await;
624 assert_eq!(received.len(), 1);
625 assert_eq!(received[0].input.body.as_text(), Some("delayed"));
626 }
627
628 #[tokio::test]
629 #[should_panic(expected = "timed out waiting for 5 exchanges")]
630 async fn await_exchanges_times_out() {
631 let component = MockComponent::new();
632 let _endpoint = component
633 .create_endpoint("mock:timeout", &NoOpComponentContext)
634 .unwrap();
635 let inner = component.get_endpoint("timeout").unwrap();
636
637 inner
639 .await_exchanges(5, std::time::Duration::from_millis(50))
640 .await;
641 }
642
643 #[tokio::test(flavor = "multi_thread")]
644 async fn exchange_idx_returns_assert() {
645 let ctx = test_producer_ctx();
646 let component = MockComponent::new();
647 let endpoint = component
648 .create_endpoint("mock:assert-idx", &NoOpComponentContext)
649 .unwrap();
650 let inner = component.get_endpoint("assert-idx").unwrap();
651
652 let mut producer = endpoint.create_producer(&ctx).unwrap();
653 producer
654 .call(Exchange::new(Message::new("hello")))
655 .await
656 .unwrap();
657
658 inner
659 .await_exchanges(1, std::time::Duration::from_millis(500))
660 .await;
661 let _assert = inner.exchange(0);
663 }
664
665 #[tokio::test(flavor = "multi_thread")]
666 #[should_panic(expected = "exchange index 5 out of bounds")]
667 async fn exchange_idx_out_of_bounds() {
668 let ctx = test_producer_ctx();
669 let component = MockComponent::new();
670 let endpoint = component
671 .create_endpoint("mock:oob", &NoOpComponentContext)
672 .unwrap();
673 let inner = component.get_endpoint("oob").unwrap();
674
675 let mut producer = endpoint.create_producer(&ctx).unwrap();
676 producer
677 .call(Exchange::new(Message::new("only-one")))
678 .await
679 .unwrap();
680
681 inner
682 .await_exchanges(1, std::time::Duration::from_millis(500))
683 .await;
684 let _assert = inner.exchange(5);
686 }
687
688 #[tokio::test(flavor = "multi_thread")]
689 async fn assert_body_text_pass() {
690 let ctx = test_producer_ctx();
691 let component = MockComponent::new();
692 let endpoint = component
693 .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
694 .unwrap();
695 let inner = component.get_endpoint("body-text-pass").unwrap();
696 let mut producer = endpoint.create_producer(&ctx).unwrap();
697 producer
698 .call(Exchange::new(Message::new("hello")))
699 .await
700 .unwrap();
701 inner
702 .await_exchanges(1, std::time::Duration::from_millis(500))
703 .await;
704 inner.exchange(0).assert_body_text("hello");
705 }
706
707 #[tokio::test(flavor = "multi_thread")]
708 #[should_panic(expected = "expected body text")]
709 async fn assert_body_text_fail() {
710 let ctx = test_producer_ctx();
711 let component = MockComponent::new();
712 let endpoint = component
713 .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
714 .unwrap();
715 let inner = component.get_endpoint("body-text-fail").unwrap();
716 let mut producer = endpoint.create_producer(&ctx).unwrap();
717 producer
718 .call(Exchange::new(Message::new("hello")))
719 .await
720 .unwrap();
721 inner
722 .await_exchanges(1, std::time::Duration::from_millis(500))
723 .await;
724 inner.exchange(0).assert_body_text("world");
725 }
726
727 #[tokio::test(flavor = "multi_thread")]
728 async fn assert_body_json_pass() {
729 use camel_component_api::Body;
730 let ctx = test_producer_ctx();
731 let component = MockComponent::new();
732 let endpoint = component
733 .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
734 .unwrap();
735 let inner = component.get_endpoint("body-json-pass").unwrap();
736 let mut producer = endpoint.create_producer(&ctx).unwrap();
737 let mut msg = Message::new("");
738 msg.body = Body::Json(serde_json::json!({"key": "value"}));
739 producer.call(Exchange::new(msg)).await.unwrap();
740 inner
741 .await_exchanges(1, std::time::Duration::from_millis(500))
742 .await;
743 inner
744 .exchange(0)
745 .assert_body_json(serde_json::json!({"key": "value"}));
746 }
747
748 #[tokio::test(flavor = "multi_thread")]
749 #[should_panic(expected = "expected body JSON")]
750 async fn assert_body_json_fail() {
751 use camel_component_api::Body;
752 let ctx = test_producer_ctx();
753 let component = MockComponent::new();
754 let endpoint = component
755 .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
756 .unwrap();
757 let inner = component.get_endpoint("body-json-fail").unwrap();
758 let mut producer = endpoint.create_producer(&ctx).unwrap();
759 let mut msg = Message::new("");
760 msg.body = Body::Json(serde_json::json!({"key": "value"}));
761 producer.call(Exchange::new(msg)).await.unwrap();
762 inner
763 .await_exchanges(1, std::time::Duration::from_millis(500))
764 .await;
765 inner
766 .exchange(0)
767 .assert_body_json(serde_json::json!({"key": "other"}));
768 }
769
770 #[tokio::test(flavor = "multi_thread")]
771 async fn assert_body_bytes_pass() {
772 use bytes::Bytes;
773 use camel_component_api::Body;
774 let ctx = test_producer_ctx();
775 let component = MockComponent::new();
776 let endpoint = component
777 .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
778 .unwrap();
779 let inner = component.get_endpoint("body-bytes-pass").unwrap();
780 let mut producer = endpoint.create_producer(&ctx).unwrap();
781 let mut msg = Message::new("");
782 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
783 producer.call(Exchange::new(msg)).await.unwrap();
784 inner
785 .await_exchanges(1, std::time::Duration::from_millis(500))
786 .await;
787 inner.exchange(0).assert_body_bytes(b"binary");
788 }
789
790 #[tokio::test(flavor = "multi_thread")]
791 #[should_panic(expected = "expected body bytes")]
792 async fn assert_body_bytes_fail() {
793 use bytes::Bytes;
794 use camel_component_api::Body;
795 let ctx = test_producer_ctx();
796 let component = MockComponent::new();
797 let endpoint = component
798 .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
799 .unwrap();
800 let inner = component.get_endpoint("body-bytes-fail").unwrap();
801 let mut producer = endpoint.create_producer(&ctx).unwrap();
802 let mut msg = Message::new("");
803 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
804 producer.call(Exchange::new(msg)).await.unwrap();
805 inner
806 .await_exchanges(1, std::time::Duration::from_millis(500))
807 .await;
808 inner.exchange(0).assert_body_bytes(b"different");
809 }
810
811 #[tokio::test(flavor = "multi_thread")]
812 async fn assert_header_pass() {
813 let ctx = test_producer_ctx();
814 let component = MockComponent::new();
815 let endpoint = component
816 .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
817 .unwrap();
818 let inner = component.get_endpoint("hdr-pass").unwrap();
819 let mut producer = endpoint.create_producer(&ctx).unwrap();
820 let mut msg = Message::new("body");
821 msg.headers
822 .insert("x-key".to_string(), serde_json::json!("value"));
823 producer.call(Exchange::new(msg)).await.unwrap();
824 inner
825 .await_exchanges(1, std::time::Duration::from_millis(500))
826 .await;
827 inner
828 .exchange(0)
829 .assert_header("x-key", serde_json::json!("value"));
830 }
831
832 #[tokio::test(flavor = "multi_thread")]
833 #[should_panic(expected = "expected header")]
834 async fn assert_header_fail() {
835 let ctx = test_producer_ctx();
836 let component = MockComponent::new();
837 let endpoint = component
838 .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
839 .unwrap();
840 let inner = component.get_endpoint("hdr-fail").unwrap();
841 let mut producer = endpoint.create_producer(&ctx).unwrap();
842 let mut msg = Message::new("body");
843 msg.headers
844 .insert("x-key".to_string(), serde_json::json!("value"));
845 producer.call(Exchange::new(msg)).await.unwrap();
846 inner
847 .await_exchanges(1, std::time::Duration::from_millis(500))
848 .await;
849 inner
850 .exchange(0)
851 .assert_header("x-key", serde_json::json!("other"));
852 }
853
854 #[tokio::test(flavor = "multi_thread")]
855 async fn assert_header_exists_pass() {
856 let ctx = test_producer_ctx();
857 let component = MockComponent::new();
858 let endpoint = component
859 .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
860 .unwrap();
861 let inner = component.get_endpoint("hdr-exists-pass").unwrap();
862 let mut producer = endpoint.create_producer(&ctx).unwrap();
863 let mut msg = Message::new("body");
864 msg.headers
865 .insert("x-present".to_string(), serde_json::json!(42));
866 producer.call(Exchange::new(msg)).await.unwrap();
867 inner
868 .await_exchanges(1, std::time::Duration::from_millis(500))
869 .await;
870 inner.exchange(0).assert_header_exists("x-present");
871 }
872
873 #[tokio::test(flavor = "multi_thread")]
874 #[should_panic(expected = "expected header")]
875 async fn assert_header_exists_fail() {
876 let ctx = test_producer_ctx();
877 let component = MockComponent::new();
878 let endpoint = component
879 .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
880 .unwrap();
881 let inner = component.get_endpoint("hdr-exists-fail").unwrap();
882 let mut producer = endpoint.create_producer(&ctx).unwrap();
883 producer
884 .call(Exchange::new(Message::new("body")))
885 .await
886 .unwrap();
887 inner
888 .await_exchanges(1, std::time::Duration::from_millis(500))
889 .await;
890 inner.exchange(0).assert_header_exists("x-missing");
891 }
892
893 #[tokio::test(flavor = "multi_thread")]
894 async fn assert_has_error_pass() {
895 let ctx = test_producer_ctx();
896 let component = MockComponent::new();
897 let endpoint = component
898 .create_endpoint("mock:err-pass", &NoOpComponentContext)
899 .unwrap();
900 let inner = component.get_endpoint("err-pass").unwrap();
901 let mut producer = endpoint.create_producer(&ctx).unwrap();
902 let mut ex = Exchange::new(Message::new("body"));
903 ex.error = Some(camel_component_api::CamelError::ProcessorError(
904 "oops".to_string(),
905 ));
906 producer.call(ex).await.unwrap();
907 inner
908 .await_exchanges(1, std::time::Duration::from_millis(500))
909 .await;
910 inner.exchange(0).assert_has_error();
911 }
912
913 #[tokio::test(flavor = "multi_thread")]
914 #[should_panic(expected = "expected exchange to have an error")]
915 async fn assert_has_error_fail() {
916 let ctx = test_producer_ctx();
917 let component = MockComponent::new();
918 let endpoint = component
919 .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
920 .unwrap();
921 let inner = component.get_endpoint("has-err-fail").unwrap();
922 let mut producer = endpoint.create_producer(&ctx).unwrap();
923 producer
924 .call(Exchange::new(Message::new("body")))
925 .await
926 .unwrap();
927 inner
928 .await_exchanges(1, std::time::Duration::from_millis(500))
929 .await;
930 inner.exchange(0).assert_has_error();
931 }
932
933 #[tokio::test(flavor = "multi_thread")]
934 async fn assert_no_error_pass() {
935 let ctx = test_producer_ctx();
936 let component = MockComponent::new();
937 let endpoint = component
938 .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
939 .unwrap();
940 let inner = component.get_endpoint("no-err-pass").unwrap();
941 let mut producer = endpoint.create_producer(&ctx).unwrap();
942 producer
943 .call(Exchange::new(Message::new("body")))
944 .await
945 .unwrap();
946 inner
947 .await_exchanges(1, std::time::Duration::from_millis(500))
948 .await;
949 inner.exchange(0).assert_no_error();
950 }
951
952 #[tokio::test(flavor = "multi_thread")]
953 #[should_panic(expected = "expected exchange to have no error")]
954 async fn assert_no_error_fail() {
955 let ctx = test_producer_ctx();
956 let component = MockComponent::new();
957 let endpoint = component
958 .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
959 .unwrap();
960 let inner = component.get_endpoint("no-err-fail").unwrap();
961 let mut producer = endpoint.create_producer(&ctx).unwrap();
962 let mut ex = Exchange::new(Message::new("body"));
963 ex.error = Some(camel_component_api::CamelError::ProcessorError(
964 "oops".to_string(),
965 ));
966 producer.call(ex).await.unwrap();
967 inner
968 .await_exchanges(1, std::time::Duration::from_millis(500))
969 .await;
970 inner.exchange(0).assert_no_error();
971 }
972}