1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::{Mutex, Notify};
8use tower::Service;
9
10use camel_api::{BoxProcessor, CamelError, Exchange};
11use camel_component::{Component, Consumer, Endpoint, ProducerContext};
12use camel_endpoint::parse_uri;
13
14#[derive(Clone)]
29pub struct MockComponent {
30 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34 pub fn new() -> Self {
35 Self {
36 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37 }
38 }
39
40 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44 let registry = self
45 .registry
46 .lock()
47 .expect("mutex poisoned: another thread panicked while holding this lock");
48 registry.get(name).cloned()
49 }
50}
51
52impl Default for MockComponent {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl Component for MockComponent {
59 fn scheme(&self) -> &str {
60 "mock"
61 }
62
63 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
64 let parts = parse_uri(uri)?;
65 if parts.scheme != "mock" {
66 return Err(CamelError::InvalidUri(format!(
67 "expected scheme 'mock', got '{}'",
68 parts.scheme
69 )));
70 }
71
72 let name = parts.path;
73 let mut registry = self.registry.lock().map_err(|e| {
74 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
75 })?;
76 let inner = registry
77 .entry(name.clone())
78 .or_insert_with(|| {
79 Arc::new(MockEndpointInner {
80 uri: uri.to_string(),
81 name,
82 received: Arc::new(Mutex::new(Vec::new())),
83 notify: Arc::new(Notify::new()),
84 })
85 })
86 .clone();
87
88 Ok(Box::new(MockEndpoint(inner)))
89 }
90}
91
92pub struct MockEndpoint(Arc<MockEndpointInner>);
102
103pub struct MockEndpointInner {
109 uri: String,
110 pub name: String,
111 received: Arc<Mutex<Vec<Exchange>>>,
112 notify: Arc<Notify>,
113}
114
115impl MockEndpointInner {
116 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
118 self.received.lock().await.clone()
119 }
120
121 pub async fn assert_exchange_count(&self, expected: usize) {
127 let actual = self.received.lock().await.len();
128 assert_eq!(
129 actual, expected,
130 "MockEndpoint expected {expected} exchanges, got {actual}"
131 );
132 }
133
134 pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
143 let deadline = tokio::time::Instant::now() + timeout;
144 loop {
145 {
146 let received = self.received.lock().await;
147 if received.len() >= count {
148 return;
149 }
150 }
151 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
152 if remaining.is_zero() {
153 let got = self.received.lock().await.len();
156 if got >= count {
157 return;
158 }
159 panic!(
160 "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
161 self.name, count, got, timeout
162 );
163 }
164 tokio::select! {
165 _ = self.notify.notified() => {}
166 _ = tokio::time::sleep(remaining) => {}
167 }
168 }
169 }
170
171 pub fn exchange(&self, idx: usize) -> ExchangeAssert {
183 let received = tokio::task::block_in_place(|| self.received.blocking_lock());
184 if idx >= received.len() {
185 panic!(
186 "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
187 self.name,
188 idx,
189 received.len()
190 );
191 }
192 ExchangeAssert {
193 exchange: received[idx].clone(),
194 idx,
195 endpoint_name: self.name.clone(),
196 }
197 }
198}
199
200impl Endpoint for MockEndpoint {
201 fn uri(&self) -> &str {
202 &self.0.uri
203 }
204
205 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
206 Err(CamelError::EndpointCreationFailed(
207 "mock endpoint does not support consumers (it is a sink)".to_string(),
208 ))
209 }
210
211 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
212 Ok(BoxProcessor::new(MockProducer {
213 received: Arc::clone(&self.0.received),
214 notify: Arc::clone(&self.0.notify),
215 }))
216 }
217}
218
219#[derive(Clone)]
225struct MockProducer {
226 received: Arc<Mutex<Vec<Exchange>>>,
227 notify: Arc<Notify>,
228}
229
230impl Service<Exchange> for MockProducer {
231 type Response = Exchange;
232 type Error = CamelError;
233 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
234
235 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
236 Poll::Ready(Ok(()))
237 }
238
239 fn call(&mut self, exchange: Exchange) -> Self::Future {
240 let received = Arc::clone(&self.received);
241 let notify = Arc::clone(&self.notify);
242 Box::pin(async move {
243 received.lock().await.push(exchange.clone());
244 notify.notify_waiters();
245 Ok(exchange)
246 })
247 }
248}
249
250pub struct ExchangeAssert {
262 exchange: Exchange,
263 idx: usize,
264 endpoint_name: String,
265}
266
267impl ExchangeAssert {
268 fn location(&self) -> String {
269 format!(
270 "MockEndpoint '{}' exchange[{}]",
271 self.endpoint_name, self.idx
272 )
273 }
274
275 pub fn assert_body_text(self, expected: &str) -> Self {
277 match self.exchange.input.body.as_text() {
278 Some(actual) if actual == expected => {}
279 Some(actual) => panic!(
280 "{}: expected body text {:?}, got {:?}",
281 self.location(),
282 expected,
283 actual
284 ),
285 None => panic!(
286 "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
287 self.location(),
288 expected,
289 self.exchange.input.body
290 ),
291 }
292 self
293 }
294
295 pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
297 match &self.exchange.input.body {
298 camel_api::Body::Json(actual) if *actual == expected => {}
299 camel_api::Body::Json(actual) => panic!(
300 "{}: expected body JSON {}, got {}",
301 self.location(),
302 expected,
303 actual
304 ),
305 other => panic!(
306 "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
307 self.location(),
308 expected,
309 other
310 ),
311 }
312 self
313 }
314
315 pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
317 match &self.exchange.input.body {
318 camel_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
319 camel_api::Body::Bytes(actual) => panic!(
320 "{}: expected body bytes {:?}, got {:?}",
321 self.location(),
322 expected,
323 actual
324 ),
325 other => panic!(
326 "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
327 self.location(),
328 expected,
329 other
330 ),
331 }
332 self
333 }
334
335 pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
341 match self.exchange.input.headers.get(key) {
342 Some(actual) if *actual == expected => {}
343 Some(actual) => panic!(
344 "{}: expected header {:?} = {}, got {}",
345 self.location(),
346 key,
347 expected,
348 actual
349 ),
350 None => panic!(
351 "{}: expected header {:?} = {}, but header is absent",
352 self.location(),
353 key,
354 expected
355 ),
356 }
357 self
358 }
359
360 pub fn assert_header_exists(self, key: &str) -> Self {
366 if !self.exchange.input.headers.contains_key(key) {
367 panic!(
368 "{}: expected header {:?} to be present, but it was absent",
369 self.location(),
370 key
371 );
372 }
373 self
374 }
375
376 pub fn assert_has_error(self) -> Self {
382 if self.exchange.error.is_none() {
383 panic!(
384 "{}: expected exchange to have an error, but error is None",
385 self.location()
386 );
387 }
388 self
389 }
390
391 pub fn assert_no_error(self) -> Self {
397 if let Some(ref err) = self.exchange.error {
398 panic!(
399 "{}: expected exchange to have no error, but got: {}",
400 self.location(),
401 err
402 );
403 }
404 self
405 }
406}
407
408#[cfg(test)]
413mod tests {
414 use super::*;
415 use camel_api::Message;
416 use tower::ServiceExt;
417
418 fn test_producer_ctx() -> ProducerContext {
419 ProducerContext::new()
420 }
421
422 #[test]
423 fn test_mock_component_scheme() {
424 let component = MockComponent::new();
425 assert_eq!(component.scheme(), "mock");
426 }
427
428 #[test]
429 fn test_mock_creates_endpoint() {
430 let component = MockComponent::new();
431 let endpoint = component.create_endpoint("mock:result");
432 assert!(endpoint.is_ok());
433 }
434
435 #[test]
436 fn test_mock_wrong_scheme() {
437 let component = MockComponent::new();
438 let result = component.create_endpoint("timer:tick");
439 assert!(result.is_err());
440 }
441
442 #[test]
443 fn test_mock_endpoint_no_consumer() {
444 let component = MockComponent::new();
445 let endpoint = component.create_endpoint("mock:result").unwrap();
446 assert!(endpoint.create_consumer().is_err());
447 }
448
449 #[test]
450 fn test_mock_endpoint_creates_producer() {
451 let ctx = test_producer_ctx();
452 let component = MockComponent::new();
453 let endpoint = component.create_endpoint("mock:result").unwrap();
454 assert!(endpoint.create_producer(&ctx).is_ok());
455 }
456
457 #[tokio::test]
458 async fn test_mock_producer_records_exchange() {
459 let ctx = test_producer_ctx();
460 let component = MockComponent::new();
461 let endpoint = component.create_endpoint("mock:test").unwrap();
462
463 let mut producer = endpoint.create_producer(&ctx).unwrap();
464
465 let ex1 = Exchange::new(Message::new("first"));
466 let ex2 = Exchange::new(Message::new("second"));
467
468 producer.call(ex1).await.unwrap();
469 producer.call(ex2).await.unwrap();
470
471 let inner = component.get_endpoint("test").unwrap();
472 inner.assert_exchange_count(2).await;
473
474 let received = inner.get_received_exchanges().await;
475 assert_eq!(received[0].input.body.as_text(), Some("first"));
476 assert_eq!(received[1].input.body.as_text(), Some("second"));
477 }
478
479 #[tokio::test]
480 async fn test_mock_producer_passes_through_exchange() {
481 let ctx = test_producer_ctx();
482 let component = MockComponent::new();
483 let endpoint = component.create_endpoint("mock:passthrough").unwrap();
484
485 let producer = endpoint.create_producer(&ctx).unwrap();
486 let exchange = Exchange::new(Message::new("hello"));
487 let result = producer.oneshot(exchange).await.unwrap();
488
489 assert_eq!(result.input.body.as_text(), Some("hello"));
491 }
492
493 #[tokio::test]
494 async fn test_mock_assert_count_passes() {
495 let component = MockComponent::new();
496 let endpoint = component.create_endpoint("mock:count").unwrap();
497 let inner = component.get_endpoint("count").unwrap();
498
499 inner.assert_exchange_count(0).await;
500
501 let ctx = test_producer_ctx();
502 let mut producer = endpoint.create_producer(&ctx).unwrap();
503 producer
504 .call(Exchange::new(Message::new("one")))
505 .await
506 .unwrap();
507
508 inner.assert_exchange_count(1).await;
509 }
510
511 #[tokio::test]
512 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
513 async fn test_mock_assert_count_fails() {
514 let component = MockComponent::new();
515 let _endpoint = component.create_endpoint("mock:fail").unwrap();
518 let inner = component.get_endpoint("fail").unwrap();
519
520 inner.assert_exchange_count(5).await;
521 }
522
523 #[tokio::test]
524 async fn test_mock_component_shared_registry() {
525 let component = MockComponent::new();
526 let ep1 = component.create_endpoint("mock:shared").unwrap();
527 let ep2 = component.create_endpoint("mock:shared").unwrap();
528
529 let ctx = test_producer_ctx();
531 let mut p1 = ep1.create_producer(&ctx).unwrap();
532 p1.call(Exchange::new(Message::new("from-ep1")))
533 .await
534 .unwrap();
535
536 let mut p2 = ep2.create_producer(&ctx).unwrap();
538 p2.call(Exchange::new(Message::new("from-ep2")))
539 .await
540 .unwrap();
541
542 let inner = component.get_endpoint("shared").unwrap();
544 inner.assert_exchange_count(2).await;
545
546 let received = inner.get_received_exchanges().await;
547 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
548 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
549 }
550
551 #[tokio::test]
552 async fn await_exchanges_resolves_immediately() {
553 let ctx = test_producer_ctx();
555 let component = MockComponent::new();
556 let endpoint = component.create_endpoint("mock:immediate").unwrap();
557 let inner = component.get_endpoint("immediate").unwrap();
558
559 let mut producer = endpoint.create_producer(&ctx).unwrap();
560 producer
561 .call(Exchange::new(Message::new("a")))
562 .await
563 .unwrap();
564 producer
565 .call(Exchange::new(Message::new("b")))
566 .await
567 .unwrap();
568
569 inner
571 .await_exchanges(2, std::time::Duration::from_millis(100))
572 .await;
573 }
574
575 #[tokio::test]
576 async fn await_exchanges_waits_then_resolves() {
577 let ctx = test_producer_ctx();
579 let component = MockComponent::new();
580 let endpoint = component.create_endpoint("mock:waiter").unwrap();
581 let inner = component.get_endpoint("waiter").unwrap();
582
583 let mut producer = endpoint.create_producer(&ctx).unwrap();
585 tokio::spawn(async move {
586 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
587 producer
588 .call(Exchange::new(Message::new("delayed")))
589 .await
590 .unwrap();
591 });
592
593 inner
595 .await_exchanges(1, std::time::Duration::from_millis(500))
596 .await;
597
598 let received = inner.get_received_exchanges().await;
599 assert_eq!(received.len(), 1);
600 assert_eq!(received[0].input.body.as_text(), Some("delayed"));
601 }
602
603 #[tokio::test]
604 #[should_panic(expected = "timed out waiting for 5 exchanges")]
605 async fn await_exchanges_times_out() {
606 let component = MockComponent::new();
607 let _endpoint = component.create_endpoint("mock:timeout").unwrap();
608 let inner = component.get_endpoint("timeout").unwrap();
609
610 inner
612 .await_exchanges(5, std::time::Duration::from_millis(50))
613 .await;
614 }
615
616 #[tokio::test(flavor = "multi_thread")]
617 async fn exchange_idx_returns_assert() {
618 let ctx = test_producer_ctx();
619 let component = MockComponent::new();
620 let endpoint = component.create_endpoint("mock:assert-idx").unwrap();
621 let inner = component.get_endpoint("assert-idx").unwrap();
622
623 let mut producer = endpoint.create_producer(&ctx).unwrap();
624 producer
625 .call(Exchange::new(Message::new("hello")))
626 .await
627 .unwrap();
628
629 inner
630 .await_exchanges(1, std::time::Duration::from_millis(500))
631 .await;
632 let _assert = inner.exchange(0);
634 }
635
636 #[tokio::test(flavor = "multi_thread")]
637 #[should_panic(expected = "exchange index 5 out of bounds")]
638 async fn exchange_idx_out_of_bounds() {
639 let ctx = test_producer_ctx();
640 let component = MockComponent::new();
641 let endpoint = component.create_endpoint("mock:oob").unwrap();
642 let inner = component.get_endpoint("oob").unwrap();
643
644 let mut producer = endpoint.create_producer(&ctx).unwrap();
645 producer
646 .call(Exchange::new(Message::new("only-one")))
647 .await
648 .unwrap();
649
650 inner
651 .await_exchanges(1, std::time::Duration::from_millis(500))
652 .await;
653 let _assert = inner.exchange(5);
655 }
656
657 #[tokio::test(flavor = "multi_thread")]
658 async fn assert_body_text_pass() {
659 let ctx = test_producer_ctx();
660 let component = MockComponent::new();
661 let endpoint = component.create_endpoint("mock:body-text-pass").unwrap();
662 let inner = component.get_endpoint("body-text-pass").unwrap();
663 let mut producer = endpoint.create_producer(&ctx).unwrap();
664 producer
665 .call(Exchange::new(Message::new("hello")))
666 .await
667 .unwrap();
668 inner
669 .await_exchanges(1, std::time::Duration::from_millis(500))
670 .await;
671 inner.exchange(0).assert_body_text("hello");
672 }
673
674 #[tokio::test(flavor = "multi_thread")]
675 #[should_panic(expected = "expected body text")]
676 async fn assert_body_text_fail() {
677 let ctx = test_producer_ctx();
678 let component = MockComponent::new();
679 let endpoint = component.create_endpoint("mock:body-text-fail").unwrap();
680 let inner = component.get_endpoint("body-text-fail").unwrap();
681 let mut producer = endpoint.create_producer(&ctx).unwrap();
682 producer
683 .call(Exchange::new(Message::new("hello")))
684 .await
685 .unwrap();
686 inner
687 .await_exchanges(1, std::time::Duration::from_millis(500))
688 .await;
689 inner.exchange(0).assert_body_text("world");
690 }
691
692 #[tokio::test(flavor = "multi_thread")]
693 async fn assert_body_json_pass() {
694 use camel_api::Body;
695 let ctx = test_producer_ctx();
696 let component = MockComponent::new();
697 let endpoint = component.create_endpoint("mock:body-json-pass").unwrap();
698 let inner = component.get_endpoint("body-json-pass").unwrap();
699 let mut producer = endpoint.create_producer(&ctx).unwrap();
700 let mut msg = Message::new("");
701 msg.body = Body::Json(serde_json::json!({"key": "value"}));
702 producer.call(Exchange::new(msg)).await.unwrap();
703 inner
704 .await_exchanges(1, std::time::Duration::from_millis(500))
705 .await;
706 inner
707 .exchange(0)
708 .assert_body_json(serde_json::json!({"key": "value"}));
709 }
710
711 #[tokio::test(flavor = "multi_thread")]
712 #[should_panic(expected = "expected body JSON")]
713 async fn assert_body_json_fail() {
714 use camel_api::Body;
715 let ctx = test_producer_ctx();
716 let component = MockComponent::new();
717 let endpoint = component.create_endpoint("mock:body-json-fail").unwrap();
718 let inner = component.get_endpoint("body-json-fail").unwrap();
719 let mut producer = endpoint.create_producer(&ctx).unwrap();
720 let mut msg = Message::new("");
721 msg.body = Body::Json(serde_json::json!({"key": "value"}));
722 producer.call(Exchange::new(msg)).await.unwrap();
723 inner
724 .await_exchanges(1, std::time::Duration::from_millis(500))
725 .await;
726 inner
727 .exchange(0)
728 .assert_body_json(serde_json::json!({"key": "other"}));
729 }
730
731 #[tokio::test(flavor = "multi_thread")]
732 async fn assert_body_bytes_pass() {
733 use bytes::Bytes;
734 use camel_api::Body;
735 let ctx = test_producer_ctx();
736 let component = MockComponent::new();
737 let endpoint = component.create_endpoint("mock:body-bytes-pass").unwrap();
738 let inner = component.get_endpoint("body-bytes-pass").unwrap();
739 let mut producer = endpoint.create_producer(&ctx).unwrap();
740 let mut msg = Message::new("");
741 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
742 producer.call(Exchange::new(msg)).await.unwrap();
743 inner
744 .await_exchanges(1, std::time::Duration::from_millis(500))
745 .await;
746 inner.exchange(0).assert_body_bytes(b"binary");
747 }
748
749 #[tokio::test(flavor = "multi_thread")]
750 #[should_panic(expected = "expected body bytes")]
751 async fn assert_body_bytes_fail() {
752 use bytes::Bytes;
753 use camel_api::Body;
754 let ctx = test_producer_ctx();
755 let component = MockComponent::new();
756 let endpoint = component.create_endpoint("mock:body-bytes-fail").unwrap();
757 let inner = component.get_endpoint("body-bytes-fail").unwrap();
758 let mut producer = endpoint.create_producer(&ctx).unwrap();
759 let mut msg = Message::new("");
760 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
761 producer.call(Exchange::new(msg)).await.unwrap();
762 inner
763 .await_exchanges(1, std::time::Duration::from_millis(500))
764 .await;
765 inner.exchange(0).assert_body_bytes(b"different");
766 }
767
768 #[tokio::test(flavor = "multi_thread")]
769 async fn assert_header_pass() {
770 let ctx = test_producer_ctx();
771 let component = MockComponent::new();
772 let endpoint = component.create_endpoint("mock:hdr-pass").unwrap();
773 let inner = component.get_endpoint("hdr-pass").unwrap();
774 let mut producer = endpoint.create_producer(&ctx).unwrap();
775 let mut msg = Message::new("body");
776 msg.headers
777 .insert("x-key".to_string(), serde_json::json!("value"));
778 producer.call(Exchange::new(msg)).await.unwrap();
779 inner
780 .await_exchanges(1, std::time::Duration::from_millis(500))
781 .await;
782 inner
783 .exchange(0)
784 .assert_header("x-key", serde_json::json!("value"));
785 }
786
787 #[tokio::test(flavor = "multi_thread")]
788 #[should_panic(expected = "expected header")]
789 async fn assert_header_fail() {
790 let ctx = test_producer_ctx();
791 let component = MockComponent::new();
792 let endpoint = component.create_endpoint("mock:hdr-fail").unwrap();
793 let inner = component.get_endpoint("hdr-fail").unwrap();
794 let mut producer = endpoint.create_producer(&ctx).unwrap();
795 let mut msg = Message::new("body");
796 msg.headers
797 .insert("x-key".to_string(), serde_json::json!("value"));
798 producer.call(Exchange::new(msg)).await.unwrap();
799 inner
800 .await_exchanges(1, std::time::Duration::from_millis(500))
801 .await;
802 inner
803 .exchange(0)
804 .assert_header("x-key", serde_json::json!("other"));
805 }
806
807 #[tokio::test(flavor = "multi_thread")]
808 async fn assert_header_exists_pass() {
809 let ctx = test_producer_ctx();
810 let component = MockComponent::new();
811 let endpoint = component.create_endpoint("mock:hdr-exists-pass").unwrap();
812 let inner = component.get_endpoint("hdr-exists-pass").unwrap();
813 let mut producer = endpoint.create_producer(&ctx).unwrap();
814 let mut msg = Message::new("body");
815 msg.headers
816 .insert("x-present".to_string(), serde_json::json!(42));
817 producer.call(Exchange::new(msg)).await.unwrap();
818 inner
819 .await_exchanges(1, std::time::Duration::from_millis(500))
820 .await;
821 inner.exchange(0).assert_header_exists("x-present");
822 }
823
824 #[tokio::test(flavor = "multi_thread")]
825 #[should_panic(expected = "expected header")]
826 async fn assert_header_exists_fail() {
827 let ctx = test_producer_ctx();
828 let component = MockComponent::new();
829 let endpoint = component.create_endpoint("mock:hdr-exists-fail").unwrap();
830 let inner = component.get_endpoint("hdr-exists-fail").unwrap();
831 let mut producer = endpoint.create_producer(&ctx).unwrap();
832 producer
833 .call(Exchange::new(Message::new("body")))
834 .await
835 .unwrap();
836 inner
837 .await_exchanges(1, std::time::Duration::from_millis(500))
838 .await;
839 inner.exchange(0).assert_header_exists("x-missing");
840 }
841
842 #[tokio::test(flavor = "multi_thread")]
843 async fn assert_has_error_pass() {
844 let ctx = test_producer_ctx();
845 let component = MockComponent::new();
846 let endpoint = component.create_endpoint("mock:err-pass").unwrap();
847 let inner = component.get_endpoint("err-pass").unwrap();
848 let mut producer = endpoint.create_producer(&ctx).unwrap();
849 let mut ex = Exchange::new(Message::new("body"));
850 ex.error = Some(camel_api::CamelError::ProcessorError("oops".to_string()));
851 producer.call(ex).await.unwrap();
852 inner
853 .await_exchanges(1, std::time::Duration::from_millis(500))
854 .await;
855 inner.exchange(0).assert_has_error();
856 }
857
858 #[tokio::test(flavor = "multi_thread")]
859 #[should_panic(expected = "expected exchange to have an error")]
860 async fn assert_has_error_fail() {
861 let ctx = test_producer_ctx();
862 let component = MockComponent::new();
863 let endpoint = component.create_endpoint("mock:has-err-fail").unwrap();
864 let inner = component.get_endpoint("has-err-fail").unwrap();
865 let mut producer = endpoint.create_producer(&ctx).unwrap();
866 producer
867 .call(Exchange::new(Message::new("body")))
868 .await
869 .unwrap();
870 inner
871 .await_exchanges(1, std::time::Duration::from_millis(500))
872 .await;
873 inner.exchange(0).assert_has_error();
874 }
875
876 #[tokio::test(flavor = "multi_thread")]
877 async fn assert_no_error_pass() {
878 let ctx = test_producer_ctx();
879 let component = MockComponent::new();
880 let endpoint = component.create_endpoint("mock:no-err-pass").unwrap();
881 let inner = component.get_endpoint("no-err-pass").unwrap();
882 let mut producer = endpoint.create_producer(&ctx).unwrap();
883 producer
884 .call(Exchange::new(Message::new("body")))
885 .await
886 .unwrap();
887 inner
888 .await_exchanges(1, std::time::Duration::from_millis(500))
889 .await;
890 inner.exchange(0).assert_no_error();
891 }
892
893 #[tokio::test(flavor = "multi_thread")]
894 #[should_panic(expected = "expected exchange to have no error")]
895 async fn assert_no_error_fail() {
896 let ctx = test_producer_ctx();
897 let component = MockComponent::new();
898 let endpoint = component.create_endpoint("mock:no-err-fail").unwrap();
899 let inner = component.get_endpoint("no-err-fail").unwrap();
900 let mut producer = endpoint.create_producer(&ctx).unwrap();
901 let mut ex = Exchange::new(Message::new("body"));
902 ex.error = Some(camel_api::CamelError::ProcessorError("oops".to_string()));
903 producer.call(ex).await.unwrap();
904 inner
905 .await_exchanges(1, std::time::Duration::from_millis(500))
906 .await;
907 inner.exchange(0).assert_no_error();
908 }
909}