1use std::collections::{HashMap, VecDeque};
28use std::future::Future;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use tokio::sync::{Mutex, Notify};
34use tower::Service;
35
36use camel_component_api::parse_uri;
37use camel_component_api::{BoxProcessor, CamelError, Exchange};
38use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
39use tracing::debug;
40
41const DEFAULT_MAX_RETAINED: usize = 10_000;
43
44#[derive(Clone, Debug)]
67pub struct MockConfig {
68 pub max_retained: usize,
71 pub copy_on_exchange: bool,
75 pub fail_fast: bool,
78 pub assert_period_ms: u64,
81 pub any_order: bool,
84}
85
86impl Default for MockConfig {
87 fn default() -> Self {
88 Self {
89 max_retained: DEFAULT_MAX_RETAINED,
90 copy_on_exchange: false,
91 fail_fast: false,
92 assert_period_ms: 0,
93 any_order: false,
94 }
95 }
96}
97
98impl MockConfig {
99 pub fn new(max_retained: usize) -> Self {
101 Self {
102 max_retained,
103 ..Self::default()
104 }
105 }
106}
107
108pub struct MockExpectations {
118 expected_bodies: Vec<camel_component_api::Body>,
119 expected_headers: Vec<(String, serde_json::Value)>,
120 expected_header_regexes: Vec<(String, String)>,
121}
122
123impl Default for MockExpectations {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129impl MockExpectations {
130 pub fn new() -> Self {
132 Self {
133 expected_bodies: Vec::new(),
134 expected_headers: Vec::new(),
135 expected_header_regexes: Vec::new(),
136 }
137 }
138
139 pub fn push_body(&mut self, body: camel_component_api::Body) {
141 self.expected_bodies.push(body);
142 }
143
144 pub fn push_header(&mut self, key: String, value: serde_json::Value) {
146 self.expected_headers.push((key, value));
147 }
148
149 pub fn push_header_regex(&mut self, key: String, pattern: String) {
151 self.expected_header_regexes.push((key, pattern));
152 }
153}
154
155#[derive(Clone)]
170pub struct MockComponent {
171 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
172 config: MockConfig,
173}
174
175impl MockComponent {
176 pub fn new() -> Self {
177 Self::with_config(MockConfig::default())
178 }
179
180 pub fn with_config(config: MockConfig) -> Self {
182 Self {
183 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
184 config,
185 }
186 }
187
188 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
192 let registry = self
193 .registry
194 .lock()
195 .expect("mutex poisoned: another thread panicked while holding this lock"); registry.get(name).cloned()
197 }
198}
199
200impl Default for MockComponent {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl Component for MockComponent {
207 fn scheme(&self) -> &str {
208 "mock"
209 }
210
211 fn create_endpoint(
212 &self,
213 uri: &str,
214 _ctx: &dyn camel_component_api::ComponentContext,
215 ) -> Result<Box<dyn Endpoint>, CamelError> {
216 let parts = parse_uri(uri)?;
217 if parts.scheme != "mock" {
218 return Err(CamelError::InvalidUri(format!(
219 "expected scheme 'mock', got '{}'",
220 parts.scheme
221 )));
222 }
223
224 let name = parts.path;
225 if name.is_empty() {
226 return Err(CamelError::InvalidUri(
227 "mock endpoint name must be non-empty (use 'mock:<name>')".to_string(),
228 ));
229 }
230 let mut registry = self.registry.lock().map_err(|e| {
231 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
232 })?;
233 let max_retained = self.config.max_retained;
234 let copy_on_exchange = self.config.copy_on_exchange;
235 let fail_fast = self.config.fail_fast;
236 let assert_period_ms = self.config.assert_period_ms;
237 let any_order = self.config.any_order;
238 let inner = registry
239 .entry(name.clone())
240 .or_insert_with(|| {
241 Arc::new(MockEndpointInner {
242 uri: uri.to_string(),
243 name,
244 received: Arc::new(Mutex::new(VecDeque::new())),
245 notify: Arc::new(Notify::new()),
246 max_retained,
247 copy_on_exchange,
248 fail_fast,
249 fail_fast_error: Arc::new(std::sync::Mutex::new(None)),
250 assert_period_ms,
251 any_order,
252 expectations: Arc::new(std::sync::Mutex::new(MockExpectations::new())),
253 })
254 })
255 .clone();
256
257 debug!(endpoint_name = %inner.name, "mock endpoint created");
258 Ok(Box::new(MockEndpoint(inner)))
259 }
260}
261
262pub struct MockEndpoint(Arc<MockEndpointInner>);
272
273pub struct MockEndpointInner {
279 uri: String,
280 pub name: String,
281 received: Arc<Mutex<VecDeque<Exchange>>>,
282 notify: Arc<Notify>,
283 max_retained: usize,
284 copy_on_exchange: bool,
285 fail_fast: bool,
286 fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
287 assert_period_ms: u64,
288 any_order: bool,
289 expectations: Arc<std::sync::Mutex<MockExpectations>>,
290}
291
292impl MockEndpointInner {
293 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
295 self.received.lock().await.iter().cloned().collect()
296 }
297
298 pub async fn received_count(&self) -> usize {
300 self.received.lock().await.len()
301 }
302
303 pub async fn reset(&self) {
307 self.received.lock().await.clear();
308 if let Ok(mut guard) = self.fail_fast_error.lock() {
309 *guard = None;
310 }
311 }
312
313 pub async fn assert_exchange_count(&self, expected: usize) {
319 let actual = self.received.lock().await.len();
320 assert_eq!(
321 actual, expected,
322 "MockEndpoint expected {expected} exchanges, got {actual}"
323 );
324 }
325
326 pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
335 let deadline = tokio::time::Instant::now() + timeout;
336 loop {
337 {
338 let received = self.received.lock().await;
339 if received.len() >= count {
340 return;
341 }
342 }
343 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
344 if remaining.is_zero() {
345 let got = self.received.lock().await.len();
348 if got >= count {
349 return;
350 }
351 panic!(
352 "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
353 self.name, count, got, timeout
354 );
355 }
356 tokio::select! {
357 _ = self.notify.notified() => {}
358 _ = tokio::time::sleep(remaining) => {}
359 }
360 }
361 }
362
363 pub async fn await_exchanges_with_timeout(&self, count: usize, fallback: std::time::Duration) {
368 let duration = if self.assert_period_ms > 0 {
369 std::time::Duration::from_millis(self.assert_period_ms)
370 } else {
371 fallback
372 };
373 self.await_exchanges(count, duration).await;
374 }
375
376 pub fn exchange(&self, idx: usize) -> ExchangeAssert {
390 let received = tokio::task::block_in_place(|| self.received.blocking_lock());
391 if idx >= received.len() {
392 panic!(
393 "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
394 self.name,
395 idx,
396 received.len()
397 );
398 }
399 ExchangeAssert {
400 exchange: received[idx].clone(),
401 idx,
402 endpoint_name: self.name.clone(),
403 }
404 }
405
406 pub fn expect_body(&self, body: camel_component_api::Body) {
408 if let Ok(mut guard) = self.expectations.lock() {
409 guard.push_body(body);
410 }
411 }
412
413 pub fn expect_header(&self, key: &str, value: impl Into<serde_json::Value>) {
415 if let Ok(mut guard) = self.expectations.lock() {
416 guard.push_header(key.to_string(), value.into());
417 }
418 }
419
420 pub fn expect_header_regex(&self, key: &str, pattern: &str) {
425 if let Ok(mut guard) = self.expectations.lock() {
426 guard.push_header_regex(key.to_string(), pattern.to_string());
427 }
428 }
429
430 pub async fn assert_satisfied(&self) {
438 let received = self.get_received_exchanges().await;
439
440 {
442 let guard = self
443 .expectations
444 .lock()
445 .expect("expectations lock poisoned"); if !guard.expected_bodies.is_empty() {
447 let received_bodies: Vec<_> = received.iter().map(|e| &e.input.body).collect();
448 if guard.expected_bodies.len() != received_bodies.len() {
449 panic!(
450 "MockEndpoint '{}': expected {} bodies, got {}",
451 self.name,
452 guard.expected_bodies.len(),
453 received_bodies.len()
454 );
455 }
456 if self.any_order {
457 let mut unmatched: Vec<_> = received_bodies.iter().collect();
459 for expected in &guard.expected_bodies {
460 let idx = unmatched
461 .iter()
462 .position(|actual| body_eq(expected, actual));
463 match idx {
464 Some(i) => {
465 unmatched.remove(i);
466 }
467 None => panic!(
468 "MockEndpoint '{}': expected body {:?} not found in received exchanges (anyOrder mode)",
469 self.name, expected
470 ),
471 }
472 }
473 } else {
474 for (i, expected) in guard.expected_bodies.iter().enumerate() {
475 if !body_eq(expected, received_bodies[i]) {
476 panic!(
477 "MockEndpoint '{}': body[{}] expected {:?}, got {:?}",
478 self.name, i, expected, received_bodies[i]
479 );
480 }
481 }
482 }
483 }
484
485 for (key, value) in &guard.expected_headers {
487 let found = received
488 .iter()
489 .any(|ex| ex.input.headers.get(key).is_some_and(|v| v == value));
490 if !found {
491 panic!(
492 "MockEndpoint '{}': expected header '{}' = {} not found in any received exchange",
493 self.name, key, value
494 );
495 }
496 }
497
498 for (key, pattern) in &guard.expected_header_regexes {
500 let re = regex::Regex::new(pattern).unwrap_or_else(|e| {
501 panic!(
502 "MockEndpoint '{}': invalid regex pattern {:?}: {e}",
503 self.name, pattern
504 )
505 });
506 let found = received.iter().any(|ex| {
507 ex.input.headers.get(key).is_some_and(|v| {
508 let s = match v {
509 serde_json::Value::String(s) => s.clone(),
510 other => other.to_string(),
511 };
512 re.is_match(&s)
513 })
514 });
515 if !found {
516 panic!(
517 "MockEndpoint '{}': no received exchange has header '{}' matching regex {:?}",
518 self.name, key, pattern
519 );
520 }
521 }
522 }
523 }
524
525 pub fn fail_fast_error(&self) -> Option<CamelError> {
527 self.fail_fast_error.lock().ok().and_then(|g| g.clone())
528 }
529}
530
531fn body_eq(a: &camel_component_api::Body, b: &camel_component_api::Body) -> bool {
533 match (a, b) {
534 (camel_component_api::Body::Empty, camel_component_api::Body::Empty) => true,
535 (camel_component_api::Body::Text(a), camel_component_api::Body::Text(b)) => a == b,
536 (camel_component_api::Body::Json(a), camel_component_api::Body::Json(b)) => a == b,
537 (camel_component_api::Body::Xml(a), camel_component_api::Body::Xml(b)) => a == b,
538 (camel_component_api::Body::Bytes(a), camel_component_api::Body::Bytes(b)) => a == b,
539 _ => false,
540 }
541}
542
543impl Endpoint for MockEndpoint {
544 fn uri(&self) -> &str {
545 &self.0.uri
546 }
547
548 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
549 Err(CamelError::EndpointCreationFailed(
550 "mock endpoint does not support consumers (it is a sink)".to_string(),
551 ))
552 }
553
554 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
555 Ok(BoxProcessor::new(MockProducer {
556 name: self.0.name.clone(),
557 received: Arc::clone(&self.0.received),
558 notify: Arc::clone(&self.0.notify),
559 max_retained: self.0.max_retained,
560 copy_on_exchange: self.0.copy_on_exchange,
561 fail_fast: self.0.fail_fast,
562 fail_fast_error: Arc::clone(&self.0.fail_fast_error),
563 }))
564 }
565}
566
567#[derive(Clone)]
573struct MockProducer {
574 name: String,
575 received: Arc<Mutex<VecDeque<Exchange>>>,
576 notify: Arc<Notify>,
577 max_retained: usize,
578 copy_on_exchange: bool,
579 fail_fast: bool,
580 fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
581}
582
583impl Service<Exchange> for MockProducer {
584 type Response = Exchange;
585 type Error = CamelError;
586 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
587
588 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
589 if self.fail_fast
591 && let Ok(guard) = self.fail_fast_error.lock()
592 && guard.is_some()
593 {
594 return Poll::Ready(Err(CamelError::ProcessorError(
595 "mock endpoint in fail-fast mode: a previous exchange caused an error".to_string(),
596 )));
597 }
598 Poll::Ready(Ok(()))
599 }
600
601 fn call(&mut self, exchange: Exchange) -> Self::Future {
602 let name = self.name.clone();
603 let received = Arc::clone(&self.received);
604 let notify = Arc::clone(&self.notify);
605 let max_retained = self.max_retained;
606 let copy_on_exchange = self.copy_on_exchange;
607 let fail_fast = self.fail_fast;
608 let fail_fast_error = Arc::clone(&self.fail_fast_error);
609 Box::pin(async move {
610 if fail_fast
612 && let Ok(guard) = fail_fast_error.lock()
613 && guard.is_some()
614 {
615 return Err(CamelError::ProcessorError(
616 "mock endpoint in fail-fast mode: a previous exchange caused an error"
617 .to_string(),
618 ));
619 }
620
621 let correlation_id = exchange
622 .input
623 .headers
624 .get("CamelCorrelationId")
625 .and_then(|v| v.as_str())
626 .map(|s| s.to_string());
627
628 let exchange_to_store = if copy_on_exchange {
629 let mut cloned = exchange.clone();
630 cloned.input.body = clone_body(&exchange.input.body);
632 cloned
633 } else {
634 exchange.clone()
635 };
636
637 let mut guard = received.lock().await;
638 if guard.len() >= max_retained {
639 tracing::warn!(
640 endpoint_name = %name,
641 max = max_retained,
642 "max retained exchanges reached, dropping oldest"
643 );
644 guard.pop_front();
645 }
646 guard.push_back(exchange_to_store);
647 let count = guard.len();
648 drop(guard);
649
650 debug!(
651 endpoint_name = %name,
652 count = %count,
653 correlation_id = correlation_id.as_deref().unwrap_or("none"),
654 "exchange recorded on mock"
655 );
656 notify.notify_waiters();
657
658 Ok(exchange)
659 })
660 }
661}
662
663fn clone_body(body: &camel_component_api::Body) -> camel_component_api::Body {
665 match body {
666 camel_component_api::Body::Empty => camel_component_api::Body::Empty,
667 camel_component_api::Body::Text(s) => camel_component_api::Body::Text(s.clone()),
668 camel_component_api::Body::Json(v) => camel_component_api::Body::Json(v.clone()),
669 camel_component_api::Body::Xml(s) => camel_component_api::Body::Xml(s.clone()),
670 camel_component_api::Body::Bytes(b) => camel_component_api::Body::Bytes(b.clone()),
671 camel_component_api::Body::Stream(_) => {
672 camel_component_api::Body::Empty
674 }
675 }
676}
677
678pub struct ExchangeAssert {
690 exchange: Exchange,
691 idx: usize,
692 endpoint_name: String,
693}
694
695impl ExchangeAssert {
696 fn location(&self) -> String {
697 format!(
698 "MockEndpoint '{}' exchange[{}]",
699 self.endpoint_name, self.idx
700 )
701 }
702
703 pub fn assert_body_text(self, expected: &str) -> Self {
705 match self.exchange.input.body.as_text() {
706 Some(actual) if actual == expected => {}
707 Some(actual) => panic!(
708 "{}: expected body text {:?}, got {:?}",
709 self.location(),
710 expected,
711 actual
712 ),
713 None => panic!(
714 "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
715 self.location(),
716 expected,
717 self.exchange.input.body
718 ),
719 }
720 self
721 }
722
723 pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
725 match &self.exchange.input.body {
726 camel_component_api::Body::Json(actual) if *actual == expected => {}
727 camel_component_api::Body::Json(actual) => panic!(
728 "{}: expected body JSON {}, got {}",
729 self.location(),
730 expected,
731 actual
732 ),
733 other => panic!(
734 "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
735 self.location(),
736 expected,
737 other
738 ),
739 }
740 self
741 }
742
743 pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
745 match &self.exchange.input.body {
746 camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
747 camel_component_api::Body::Bytes(actual) => panic!(
748 "{}: expected body bytes {:?}, got {:?}",
749 self.location(),
750 expected,
751 actual
752 ),
753 other => panic!(
754 "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
755 self.location(),
756 expected,
757 other
758 ),
759 }
760 self
761 }
762
763 pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
769 match self.exchange.input.headers.get(key) {
770 Some(actual) if *actual == expected => {}
771 Some(actual) => panic!(
772 "{}: expected header {:?} = {}, got {}",
773 self.location(),
774 key,
775 expected,
776 actual
777 ),
778 None => panic!(
779 "{}: expected header {:?} = {}, but header is absent",
780 self.location(),
781 key,
782 expected
783 ),
784 }
785 self
786 }
787
788 pub fn assert_header_exists(self, key: &str) -> Self {
794 if !self.exchange.input.headers.contains_key(key) {
795 panic!(
796 "{}: expected header {:?} to be present, but it was absent",
797 self.location(),
798 key
799 );
800 }
801 self
802 }
803
804 pub fn assert_has_error(self) -> Self {
810 if self.exchange.error.is_none() {
811 panic!(
812 "{}: expected exchange to have an error, but error is None",
813 self.location()
814 );
815 }
816 self
817 }
818
819 pub fn assert_no_error(self) -> Self {
825 if let Some(ref err) = self.exchange.error {
826 panic!(
827 "{}: expected exchange to have no error, but got: {}",
828 self.location(),
829 err
830 );
831 }
832 self
833 }
834}
835
836#[cfg(test)]
841mod tests {
842 use super::*;
843 use camel_component_api::Message;
844 use camel_component_api::NoOpComponentContext;
845 use tower::ServiceExt;
846
847 fn test_producer_ctx() -> ProducerContext {
848 ProducerContext::new()
849 }
850
851 #[test]
852 fn test_mock_component_scheme() {
853 let component = MockComponent::new();
854 assert_eq!(component.scheme(), "mock");
855 }
856
857 #[test]
858 fn test_mock_component_default() {
859 let component = MockComponent::default();
860 assert_eq!(component.scheme(), "mock");
861 assert!(component.get_endpoint("missing").is_none());
862 }
863
864 #[test]
865 fn test_mock_creates_endpoint() {
866 let component = MockComponent::new();
867 let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
868 assert!(endpoint.is_ok());
869 }
870
871 #[test]
872 fn test_mock_wrong_scheme() {
873 let component = MockComponent::new();
874 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
875 assert!(result.is_err());
876 }
877
878 #[test]
879 fn test_empty_mock_endpoint_name_rejected() {
880 let component = MockComponent::new();
881 let result = component.create_endpoint("mock:", &NoOpComponentContext);
882 assert!(result.is_err(), "empty mock name should be rejected");
883 }
884
885 #[test]
886 fn test_valid_mock_endpoint_name_accepted() {
887 let component = MockComponent::new();
888 let result = component.create_endpoint("mock:result", &NoOpComponentContext);
889 assert!(result.is_ok());
890 }
891
892 #[test]
893 fn test_mock_endpoint_no_consumer() {
894 let component = MockComponent::new();
895 let endpoint = component
896 .create_endpoint("mock:result", &NoOpComponentContext)
897 .unwrap();
898 assert!(endpoint.create_consumer().is_err());
899 }
900
901 #[test]
902 fn test_mock_endpoint_creates_producer() {
903 let ctx = test_producer_ctx();
904 let component = MockComponent::new();
905 let endpoint = component
906 .create_endpoint("mock:result", &NoOpComponentContext)
907 .unwrap();
908 assert!(endpoint.create_producer(&ctx).is_ok());
909 }
910
911 #[test]
912 fn test_mock_endpoint_uri() {
913 let component = MockComponent::new();
914 let endpoint = component
915 .create_endpoint("mock:uri-check", &NoOpComponentContext)
916 .unwrap();
917 assert_eq!(endpoint.uri(), "mock:uri-check");
918 }
919
920 #[test]
921 fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
922 let component = MockComponent::new();
923 let _ = component
924 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
925 .unwrap();
926 let _ = component
927 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
928 .unwrap();
929
930 let first = component.get_endpoint("shared-inner").unwrap();
931 let second = component.get_endpoint("shared-inner").unwrap();
932 assert!(Arc::ptr_eq(&first, &second));
933 }
934
935 #[tokio::test]
936 async fn test_mock_producer_records_exchange() {
937 let ctx = test_producer_ctx();
938 let component = MockComponent::new();
939 let endpoint = component
940 .create_endpoint("mock:test", &NoOpComponentContext)
941 .unwrap();
942
943 let mut producer = endpoint.create_producer(&ctx).unwrap();
944
945 let ex1 = Exchange::new(Message::new("first"));
946 let ex2 = Exchange::new(Message::new("second"));
947
948 producer.call(ex1).await.unwrap();
949 producer.call(ex2).await.unwrap();
950
951 let inner = component.get_endpoint("test").unwrap();
952 inner.assert_exchange_count(2).await;
953
954 let received = inner.get_received_exchanges().await;
955 assert_eq!(received[0].input.body.as_text(), Some("first"));
956 assert_eq!(received[1].input.body.as_text(), Some("second"));
957 }
958
959 #[tokio::test]
960 async fn test_mock_producer_passes_through_exchange() {
961 let ctx = test_producer_ctx();
962 let component = MockComponent::new();
963 let endpoint = component
964 .create_endpoint("mock:passthrough", &NoOpComponentContext)
965 .unwrap();
966
967 let producer = endpoint.create_producer(&ctx).unwrap();
968 let exchange = Exchange::new(Message::new("hello"));
969 let result = producer.oneshot(exchange).await.unwrap();
970
971 assert_eq!(result.input.body.as_text(), Some("hello"));
973 }
974
975 #[tokio::test]
976 async fn test_mock_assert_count_passes() {
977 let component = MockComponent::new();
978 let endpoint = component
979 .create_endpoint("mock:count", &NoOpComponentContext)
980 .unwrap();
981 let inner = component.get_endpoint("count").unwrap();
982
983 inner.assert_exchange_count(0).await;
984
985 let ctx = test_producer_ctx();
986 let mut producer = endpoint.create_producer(&ctx).unwrap();
987 producer
988 .call(Exchange::new(Message::new("one")))
989 .await
990 .unwrap();
991
992 inner.assert_exchange_count(1).await;
993 }
994
995 #[tokio::test]
996 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
997 async fn test_mock_assert_count_fails() {
998 let component = MockComponent::new();
999 let _endpoint = component
1002 .create_endpoint("mock:fail", &NoOpComponentContext)
1003 .unwrap();
1004 let inner = component.get_endpoint("fail").unwrap();
1005
1006 inner.assert_exchange_count(5).await;
1007 }
1008
1009 #[tokio::test]
1010 async fn test_mock_component_shared_registry() {
1011 let component = MockComponent::new();
1012 let ep1 = component
1013 .create_endpoint("mock:shared", &NoOpComponentContext)
1014 .unwrap();
1015 let ep2 = component
1016 .create_endpoint("mock:shared", &NoOpComponentContext)
1017 .unwrap();
1018
1019 let ctx = test_producer_ctx();
1021 let mut p1 = ep1.create_producer(&ctx).unwrap();
1022 p1.call(Exchange::new(Message::new("from-ep1")))
1023 .await
1024 .unwrap();
1025
1026 let mut p2 = ep2.create_producer(&ctx).unwrap();
1028 p2.call(Exchange::new(Message::new("from-ep2")))
1029 .await
1030 .unwrap();
1031
1032 let inner = component.get_endpoint("shared").unwrap();
1034 inner.assert_exchange_count(2).await;
1035
1036 let received = inner.get_received_exchanges().await;
1037 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
1038 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
1039 }
1040
1041 #[tokio::test]
1042 async fn await_exchanges_resolves_immediately() {
1043 let ctx = test_producer_ctx();
1045 let component = MockComponent::new();
1046 let endpoint = component
1047 .create_endpoint("mock:immediate", &NoOpComponentContext)
1048 .unwrap();
1049 let inner = component.get_endpoint("immediate").unwrap();
1050
1051 let mut producer = endpoint.create_producer(&ctx).unwrap();
1052 producer
1053 .call(Exchange::new(Message::new("a")))
1054 .await
1055 .unwrap();
1056 producer
1057 .call(Exchange::new(Message::new("b")))
1058 .await
1059 .unwrap();
1060
1061 inner
1063 .await_exchanges(2, std::time::Duration::from_millis(100))
1064 .await;
1065 }
1066
1067 #[tokio::test]
1068 async fn await_exchanges_waits_then_resolves() {
1069 let ctx = test_producer_ctx();
1071 let component = MockComponent::new();
1072 let endpoint = component
1073 .create_endpoint("mock:waiter", &NoOpComponentContext)
1074 .unwrap();
1075 let inner = component.get_endpoint("waiter").unwrap();
1076
1077 let mut producer = endpoint.create_producer(&ctx).unwrap();
1079 tokio::spawn(async move {
1080 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1081 producer
1082 .call(Exchange::new(Message::new("delayed")))
1083 .await
1084 .unwrap();
1085 });
1086
1087 inner
1089 .await_exchanges(1, std::time::Duration::from_millis(500))
1090 .await;
1091
1092 let received = inner.get_received_exchanges().await;
1093 assert_eq!(received.len(), 1);
1094 assert_eq!(received[0].input.body.as_text(), Some("delayed"));
1095 }
1096
1097 #[tokio::test]
1098 #[should_panic(expected = "timed out waiting for 5 exchanges")]
1099 async fn await_exchanges_times_out() {
1100 let component = MockComponent::new();
1101 let _endpoint = component
1102 .create_endpoint("mock:timeout", &NoOpComponentContext)
1103 .unwrap();
1104 let inner = component.get_endpoint("timeout").unwrap();
1105
1106 inner
1108 .await_exchanges(5, std::time::Duration::from_millis(50))
1109 .await;
1110 }
1111
1112 #[tokio::test(flavor = "multi_thread")]
1113 async fn exchange_idx_returns_assert() {
1114 let ctx = test_producer_ctx();
1115 let component = MockComponent::new();
1116 let endpoint = component
1117 .create_endpoint("mock:assert-idx", &NoOpComponentContext)
1118 .unwrap();
1119 let inner = component.get_endpoint("assert-idx").unwrap();
1120
1121 let mut producer = endpoint.create_producer(&ctx).unwrap();
1122 producer
1123 .call(Exchange::new(Message::new("hello")))
1124 .await
1125 .unwrap();
1126
1127 inner
1128 .await_exchanges(1, std::time::Duration::from_millis(500))
1129 .await;
1130 let _assert = inner.exchange(0);
1132 }
1133
1134 #[tokio::test(flavor = "multi_thread")]
1135 #[should_panic(expected = "exchange index 5 out of bounds")]
1136 async fn exchange_idx_out_of_bounds() {
1137 let ctx = test_producer_ctx();
1138 let component = MockComponent::new();
1139 let endpoint = component
1140 .create_endpoint("mock:oob", &NoOpComponentContext)
1141 .unwrap();
1142 let inner = component.get_endpoint("oob").unwrap();
1143
1144 let mut producer = endpoint.create_producer(&ctx).unwrap();
1145 producer
1146 .call(Exchange::new(Message::new("only-one")))
1147 .await
1148 .unwrap();
1149
1150 inner
1151 .await_exchanges(1, std::time::Duration::from_millis(500))
1152 .await;
1153 let _assert = inner.exchange(5);
1155 }
1156
1157 #[tokio::test(flavor = "multi_thread")]
1158 async fn assert_body_text_pass() {
1159 let ctx = test_producer_ctx();
1160 let component = MockComponent::new();
1161 let endpoint = component
1162 .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
1163 .unwrap();
1164 let inner = component.get_endpoint("body-text-pass").unwrap();
1165 let mut producer = endpoint.create_producer(&ctx).unwrap();
1166 producer
1167 .call(Exchange::new(Message::new("hello")))
1168 .await
1169 .unwrap();
1170 inner
1171 .await_exchanges(1, std::time::Duration::from_millis(500))
1172 .await;
1173 inner.exchange(0).assert_body_text("hello");
1174 }
1175
1176 #[tokio::test(flavor = "multi_thread")]
1177 #[should_panic(expected = "expected body text")]
1178 async fn assert_body_text_fail() {
1179 let ctx = test_producer_ctx();
1180 let component = MockComponent::new();
1181 let endpoint = component
1182 .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
1183 .unwrap();
1184 let inner = component.get_endpoint("body-text-fail").unwrap();
1185 let mut producer = endpoint.create_producer(&ctx).unwrap();
1186 producer
1187 .call(Exchange::new(Message::new("hello")))
1188 .await
1189 .unwrap();
1190 inner
1191 .await_exchanges(1, std::time::Duration::from_millis(500))
1192 .await;
1193 inner.exchange(0).assert_body_text("world");
1194 }
1195
1196 #[tokio::test(flavor = "multi_thread")]
1197 async fn assert_body_json_pass() {
1198 use camel_component_api::Body;
1199 let ctx = test_producer_ctx();
1200 let component = MockComponent::new();
1201 let endpoint = component
1202 .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
1203 .unwrap();
1204 let inner = component.get_endpoint("body-json-pass").unwrap();
1205 let mut producer = endpoint.create_producer(&ctx).unwrap();
1206 let mut msg = Message::new("");
1207 msg.body = Body::Json(serde_json::json!({"key": "value"}));
1208 producer.call(Exchange::new(msg)).await.unwrap();
1209 inner
1210 .await_exchanges(1, std::time::Duration::from_millis(500))
1211 .await;
1212 inner
1213 .exchange(0)
1214 .assert_body_json(serde_json::json!({"key": "value"}));
1215 }
1216
1217 #[tokio::test(flavor = "multi_thread")]
1218 #[should_panic(expected = "expected body JSON")]
1219 async fn assert_body_json_fail() {
1220 use camel_component_api::Body;
1221 let ctx = test_producer_ctx();
1222 let component = MockComponent::new();
1223 let endpoint = component
1224 .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
1225 .unwrap();
1226 let inner = component.get_endpoint("body-json-fail").unwrap();
1227 let mut producer = endpoint.create_producer(&ctx).unwrap();
1228 let mut msg = Message::new("");
1229 msg.body = Body::Json(serde_json::json!({"key": "value"}));
1230 producer.call(Exchange::new(msg)).await.unwrap();
1231 inner
1232 .await_exchanges(1, std::time::Duration::from_millis(500))
1233 .await;
1234 inner
1235 .exchange(0)
1236 .assert_body_json(serde_json::json!({"key": "other"}));
1237 }
1238
1239 #[tokio::test(flavor = "multi_thread")]
1240 async fn assert_body_bytes_pass() {
1241 use bytes::Bytes;
1242 use camel_component_api::Body;
1243 let ctx = test_producer_ctx();
1244 let component = MockComponent::new();
1245 let endpoint = component
1246 .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
1247 .unwrap();
1248 let inner = component.get_endpoint("body-bytes-pass").unwrap();
1249 let mut producer = endpoint.create_producer(&ctx).unwrap();
1250 let mut msg = Message::new("");
1251 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1252 producer.call(Exchange::new(msg)).await.unwrap();
1253 inner
1254 .await_exchanges(1, std::time::Duration::from_millis(500))
1255 .await;
1256 inner.exchange(0).assert_body_bytes(b"binary");
1257 }
1258
1259 #[tokio::test(flavor = "multi_thread")]
1260 #[should_panic(expected = "expected body bytes")]
1261 async fn assert_body_bytes_fail() {
1262 use bytes::Bytes;
1263 use camel_component_api::Body;
1264 let ctx = test_producer_ctx();
1265 let component = MockComponent::new();
1266 let endpoint = component
1267 .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
1268 .unwrap();
1269 let inner = component.get_endpoint("body-bytes-fail").unwrap();
1270 let mut producer = endpoint.create_producer(&ctx).unwrap();
1271 let mut msg = Message::new("");
1272 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1273 producer.call(Exchange::new(msg)).await.unwrap();
1274 inner
1275 .await_exchanges(1, std::time::Duration::from_millis(500))
1276 .await;
1277 inner.exchange(0).assert_body_bytes(b"different");
1278 }
1279
1280 #[tokio::test(flavor = "multi_thread")]
1281 async fn assert_header_pass() {
1282 let ctx = test_producer_ctx();
1283 let component = MockComponent::new();
1284 let endpoint = component
1285 .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
1286 .unwrap();
1287 let inner = component.get_endpoint("hdr-pass").unwrap();
1288 let mut producer = endpoint.create_producer(&ctx).unwrap();
1289 let mut msg = Message::new("body");
1290 msg.headers
1291 .insert("x-key".to_string(), serde_json::json!("value"));
1292 producer.call(Exchange::new(msg)).await.unwrap();
1293 inner
1294 .await_exchanges(1, std::time::Duration::from_millis(500))
1295 .await;
1296 inner
1297 .exchange(0)
1298 .assert_header("x-key", serde_json::json!("value"));
1299 }
1300
1301 #[tokio::test(flavor = "multi_thread")]
1302 #[should_panic(expected = "expected header")]
1303 async fn assert_header_fail() {
1304 let ctx = test_producer_ctx();
1305 let component = MockComponent::new();
1306 let endpoint = component
1307 .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
1308 .unwrap();
1309 let inner = component.get_endpoint("hdr-fail").unwrap();
1310 let mut producer = endpoint.create_producer(&ctx).unwrap();
1311 let mut msg = Message::new("body");
1312 msg.headers
1313 .insert("x-key".to_string(), serde_json::json!("value"));
1314 producer.call(Exchange::new(msg)).await.unwrap();
1315 inner
1316 .await_exchanges(1, std::time::Duration::from_millis(500))
1317 .await;
1318 inner
1319 .exchange(0)
1320 .assert_header("x-key", serde_json::json!("other"));
1321 }
1322
1323 #[tokio::test(flavor = "multi_thread")]
1324 async fn assert_header_exists_pass() {
1325 let ctx = test_producer_ctx();
1326 let component = MockComponent::new();
1327 let endpoint = component
1328 .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
1329 .unwrap();
1330 let inner = component.get_endpoint("hdr-exists-pass").unwrap();
1331 let mut producer = endpoint.create_producer(&ctx).unwrap();
1332 let mut msg = Message::new("body");
1333 msg.headers
1334 .insert("x-present".to_string(), serde_json::json!(42));
1335 producer.call(Exchange::new(msg)).await.unwrap();
1336 inner
1337 .await_exchanges(1, std::time::Duration::from_millis(500))
1338 .await;
1339 inner.exchange(0).assert_header_exists("x-present");
1340 }
1341
1342 #[tokio::test(flavor = "multi_thread")]
1343 #[should_panic(expected = "expected header")]
1344 async fn assert_header_exists_fail() {
1345 let ctx = test_producer_ctx();
1346 let component = MockComponent::new();
1347 let endpoint = component
1348 .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
1349 .unwrap();
1350 let inner = component.get_endpoint("hdr-exists-fail").unwrap();
1351 let mut producer = endpoint.create_producer(&ctx).unwrap();
1352 producer
1353 .call(Exchange::new(Message::new("body")))
1354 .await
1355 .unwrap();
1356 inner
1357 .await_exchanges(1, std::time::Duration::from_millis(500))
1358 .await;
1359 inner.exchange(0).assert_header_exists("x-missing");
1360 }
1361
1362 #[tokio::test(flavor = "multi_thread")]
1363 async fn assert_has_error_pass() {
1364 let ctx = test_producer_ctx();
1365 let component = MockComponent::new();
1366 let endpoint = component
1367 .create_endpoint("mock:err-pass", &NoOpComponentContext)
1368 .unwrap();
1369 let inner = component.get_endpoint("err-pass").unwrap();
1370 let mut producer = endpoint.create_producer(&ctx).unwrap();
1371 let mut ex = Exchange::new(Message::new("body"));
1372 ex.set_error(camel_component_api::CamelError::ProcessorError(
1373 "oops".to_string(),
1374 ));
1375 producer.call(ex).await.unwrap();
1376 inner
1377 .await_exchanges(1, std::time::Duration::from_millis(500))
1378 .await;
1379 inner.exchange(0).assert_has_error();
1380 }
1381
1382 #[tokio::test(flavor = "multi_thread")]
1383 #[should_panic(expected = "expected exchange to have an error")]
1384 async fn assert_has_error_fail() {
1385 let ctx = test_producer_ctx();
1386 let component = MockComponent::new();
1387 let endpoint = component
1388 .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
1389 .unwrap();
1390 let inner = component.get_endpoint("has-err-fail").unwrap();
1391 let mut producer = endpoint.create_producer(&ctx).unwrap();
1392 producer
1393 .call(Exchange::new(Message::new("body")))
1394 .await
1395 .unwrap();
1396 inner
1397 .await_exchanges(1, std::time::Duration::from_millis(500))
1398 .await;
1399 inner.exchange(0).assert_has_error();
1400 }
1401
1402 #[tokio::test(flavor = "multi_thread")]
1403 async fn assert_no_error_pass() {
1404 let ctx = test_producer_ctx();
1405 let component = MockComponent::new();
1406 let endpoint = component
1407 .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
1408 .unwrap();
1409 let inner = component.get_endpoint("no-err-pass").unwrap();
1410 let mut producer = endpoint.create_producer(&ctx).unwrap();
1411 producer
1412 .call(Exchange::new(Message::new("body")))
1413 .await
1414 .unwrap();
1415 inner
1416 .await_exchanges(1, std::time::Duration::from_millis(500))
1417 .await;
1418 inner.exchange(0).assert_no_error();
1419 }
1420
1421 #[tokio::test]
1426 async fn test_mock_reset_clears_exchanges() {
1427 let component = MockComponent::new();
1428 let endpoint = component
1429 .create_endpoint("mock:reset-test", &NoOpComponentContext)
1430 .unwrap();
1431 let inner = component.get_endpoint("reset-test").unwrap();
1432
1433 let ctx = test_producer_ctx();
1434 let mut producer = endpoint.create_producer(&ctx).unwrap();
1435 producer
1436 .call(Exchange::new(Message::new("a")))
1437 .await
1438 .unwrap();
1439 producer
1440 .call(Exchange::new(Message::new("b")))
1441 .await
1442 .unwrap();
1443
1444 assert_eq!(inner.received_count().await, 2);
1445 inner.reset().await;
1446 assert_eq!(inner.received_count().await, 0);
1447 }
1448
1449 #[tokio::test]
1450 async fn test_mock_bounded_retention_drops_oldest() {
1451 let config = MockConfig {
1452 max_retained: 3,
1453 ..Default::default()
1454 };
1455 let component = MockComponent::with_config(config);
1456 let endpoint = component
1457 .create_endpoint("mock:bounded", &NoOpComponentContext)
1458 .unwrap();
1459 let inner = component.get_endpoint("bounded").unwrap();
1460
1461 let ctx = test_producer_ctx();
1462 let mut producer = endpoint.create_producer(&ctx).unwrap();
1463
1464 for i in 0..5 {
1466 producer
1467 .call(Exchange::new(Message::new(format!("msg-{i}"))))
1468 .await
1469 .unwrap();
1470 }
1471
1472 assert_eq!(inner.received_count().await, 3);
1473 let received = inner.get_received_exchanges().await;
1474 assert_eq!(received[0].input.body.as_text(), Some("msg-2"));
1476 assert_eq!(received[1].input.body.as_text(), Some("msg-3"));
1477 assert_eq!(received[2].input.body.as_text(), Some("msg-4"));
1478 }
1479
1480 #[tokio::test]
1481 async fn test_mock_reset_then_record_again() {
1482 let component = MockComponent::new();
1483 let endpoint = component
1484 .create_endpoint("mock:reset-reuse", &NoOpComponentContext)
1485 .unwrap();
1486 let inner = component.get_endpoint("reset-reuse").unwrap();
1487
1488 let ctx = test_producer_ctx();
1489 let mut producer = endpoint.create_producer(&ctx).unwrap();
1490 producer
1491 .call(Exchange::new(Message::new("before-reset")))
1492 .await
1493 .unwrap();
1494 inner.reset().await;
1495
1496 producer
1497 .call(Exchange::new(Message::new("after-reset")))
1498 .await
1499 .unwrap();
1500
1501 let received = inner.get_received_exchanges().await;
1502 assert_eq!(received.len(), 1);
1503 assert_eq!(received[0].input.body.as_text(), Some("after-reset"));
1504 }
1505
1506 #[tokio::test(flavor = "multi_thread")]
1507 #[should_panic(expected = "expected exchange to have no error")]
1508 async fn assert_no_error_fail() {
1509 let ctx = test_producer_ctx();
1510 let component = MockComponent::new();
1511 let endpoint = component
1512 .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
1513 .unwrap();
1514 let inner = component.get_endpoint("no-err-fail").unwrap();
1515 let mut producer = endpoint.create_producer(&ctx).unwrap();
1516 let mut ex = Exchange::new(Message::new("body"));
1517 ex.set_error(camel_component_api::CamelError::ProcessorError(
1518 "oops".to_string(),
1519 ));
1520 producer.call(ex).await.unwrap();
1521 inner
1522 .await_exchanges(1, std::time::Duration::from_millis(500))
1523 .await;
1524 inner.exchange(0).assert_no_error();
1525 }
1526
1527 #[tokio::test]
1532 async fn test_copy_on_exchange_stores_cloned_body() {
1533 let config = MockConfig {
1534 copy_on_exchange: true,
1535 ..Default::default()
1536 };
1537 let component = MockComponent::with_config(config);
1538 let endpoint = component
1539 .create_endpoint("mock:copy", &NoOpComponentContext)
1540 .unwrap();
1541 let inner = component.get_endpoint("copy").unwrap();
1542
1543 let ctx = test_producer_ctx();
1544 let mut producer = endpoint.create_producer(&ctx).unwrap();
1545
1546 let mut msg = Message::new("original");
1547 msg.headers.insert("x-test".into(), serde_json::json!(1));
1548 let ex = Exchange::new(msg);
1549 producer.call(ex).await.unwrap();
1550
1551 let received = inner.get_received_exchanges().await;
1552 assert_eq!(received[0].input.body.as_text(), Some("original"));
1553 }
1554
1555 #[tokio::test]
1556 async fn test_copy_on_exchange_false_shares_storage() {
1557 let config = MockConfig {
1558 copy_on_exchange: false,
1559 ..Default::default()
1560 };
1561 let component = MockComponent::with_config(config);
1562 let endpoint = component
1563 .create_endpoint("mock:no-copy", &NoOpComponentContext)
1564 .unwrap();
1565 let inner = component.get_endpoint("no-copy").unwrap();
1566
1567 let ctx = test_producer_ctx();
1568 let mut producer = endpoint.create_producer(&ctx).unwrap();
1569
1570 producer
1571 .call(Exchange::new(Message::new("direct")))
1572 .await
1573 .unwrap();
1574
1575 let received = inner.get_received_exchanges().await;
1576 assert_eq!(received[0].input.body.as_text(), Some("direct"));
1577 }
1578
1579 #[tokio::test]
1584 async fn test_assert_satisfied_bodies_in_order() {
1585 let component = MockComponent::new();
1586 let endpoint = component
1587 .create_endpoint("mock:sat-bodies", &NoOpComponentContext)
1588 .unwrap();
1589 let inner = component.get_endpoint("sat-bodies").unwrap();
1590
1591 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1592 inner.expect_body(camel_component_api::Body::Text("beta".into()));
1593
1594 let ctx = test_producer_ctx();
1595 let mut producer = endpoint.create_producer(&ctx).unwrap();
1596 producer
1597 .call(Exchange::new(Message::new("alpha")))
1598 .await
1599 .unwrap();
1600 producer
1601 .call(Exchange::new(Message::new("beta")))
1602 .await
1603 .unwrap();
1604
1605 inner.assert_satisfied().await;
1606 }
1607
1608 #[tokio::test]
1609 #[should_panic(expected = "body[0] expected")]
1610 async fn test_assert_satisfied_bodies_wrong_order_fails() {
1611 let component = MockComponent::new();
1612 let endpoint = component
1613 .create_endpoint("mock:sat-bodies-fail", &NoOpComponentContext)
1614 .unwrap();
1615 let inner = component.get_endpoint("sat-bodies-fail").unwrap();
1616
1617 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1618 inner.expect_body(camel_component_api::Body::Text("beta".into()));
1619
1620 let ctx = test_producer_ctx();
1621 let mut producer = endpoint.create_producer(&ctx).unwrap();
1622 producer
1623 .call(Exchange::new(Message::new("beta")))
1624 .await
1625 .unwrap();
1626 producer
1627 .call(Exchange::new(Message::new("alpha")))
1628 .await
1629 .unwrap();
1630
1631 inner.assert_satisfied().await;
1632 }
1633
1634 #[tokio::test]
1635 async fn test_assert_satisfied_headers() {
1636 let component = MockComponent::new();
1637 let endpoint = component
1638 .create_endpoint("mock:sat-hdr", &NoOpComponentContext)
1639 .unwrap();
1640 let inner = component.get_endpoint("sat-hdr").unwrap();
1641
1642 inner.expect_header("status", serde_json::json!("ok"));
1643
1644 let ctx = test_producer_ctx();
1645 let mut producer = endpoint.create_producer(&ctx).unwrap();
1646 let mut msg = Message::new("body");
1647 msg.headers.insert("status".into(), serde_json::json!("ok"));
1648 producer.call(Exchange::new(msg)).await.unwrap();
1649
1650 inner.assert_satisfied().await;
1651 }
1652
1653 #[tokio::test]
1654 #[should_panic(expected = "expected header 'missing' =")]
1655 async fn test_assert_satisfied_headers_missing() {
1656 let component = MockComponent::new();
1657 let endpoint = component
1658 .create_endpoint("mock:sat-hdr-missing", &NoOpComponentContext)
1659 .unwrap();
1660 let inner = component.get_endpoint("sat-hdr-missing").unwrap();
1661
1662 inner.expect_header("missing", serde_json::json!("value"));
1663
1664 let ctx = test_producer_ctx();
1665 let mut producer = endpoint.create_producer(&ctx).unwrap();
1666 producer
1667 .call(Exchange::new(Message::new("body")))
1668 .await
1669 .unwrap();
1670
1671 inner.assert_satisfied().await;
1672 }
1673
1674 #[tokio::test]
1679 async fn test_fail_fast_rejects_after_first_call() {
1680 let config = MockConfig {
1681 fail_fast: true,
1682 ..Default::default()
1683 };
1684 let component = MockComponent::with_config(config);
1685 let endpoint = component
1686 .create_endpoint("mock:ff", &NoOpComponentContext)
1687 .unwrap();
1688
1689 let ctx = test_producer_ctx();
1690 let mut producer = endpoint.create_producer(&ctx).unwrap();
1691
1692 producer
1694 .call(Exchange::new(Message::new("ok")))
1695 .await
1696 .unwrap();
1697 }
1698
1699 #[tokio::test]
1700 async fn test_fail_fast_no_error_when_all_good() {
1701 let config = MockConfig {
1702 fail_fast: true,
1703 ..Default::default()
1704 };
1705 let component = MockComponent::with_config(config);
1706 let endpoint = component
1707 .create_endpoint("mock:ff-good", &NoOpComponentContext)
1708 .unwrap();
1709 let inner = component.get_endpoint("ff-good").unwrap();
1710
1711 let ctx = test_producer_ctx();
1712 let mut producer = endpoint.create_producer(&ctx).unwrap();
1713
1714 producer
1715 .call(Exchange::new(Message::new("a")))
1716 .await
1717 .unwrap();
1718 producer
1719 .call(Exchange::new(Message::new("b")))
1720 .await
1721 .unwrap();
1722
1723 assert!(inner.fail_fast_error().is_none());
1724 inner.assert_exchange_count(2).await;
1725 }
1726
1727 #[tokio::test]
1732 async fn test_await_exchanges_with_timeout_uses_config_period() {
1733 let config = MockConfig {
1734 assert_period_ms: 100,
1735 ..Default::default()
1736 };
1737 let component = MockComponent::with_config(config);
1738 let endpoint = component
1739 .create_endpoint("mock:ap", &NoOpComponentContext)
1740 .unwrap();
1741 let inner = component.get_endpoint("ap").unwrap();
1742
1743 let ctx = test_producer_ctx();
1744 let mut producer = endpoint.create_producer(&ctx).unwrap();
1745 producer
1746 .call(Exchange::new(Message::new("x")))
1747 .await
1748 .unwrap();
1749
1750 inner
1751 .await_exchanges_with_timeout(1, std::time::Duration::from_millis(1))
1752 .await;
1753 }
1754
1755 #[tokio::test]
1756 async fn test_await_exchanges_with_timeout_uses_fallback_when_zero() {
1757 let config = MockConfig {
1758 assert_period_ms: 0,
1759 ..Default::default()
1760 };
1761 let component = MockComponent::with_config(config);
1762 let endpoint = component
1763 .create_endpoint("mock:ap-fb", &NoOpComponentContext)
1764 .unwrap();
1765 let inner = component.get_endpoint("ap-fb").unwrap();
1766
1767 let ctx = test_producer_ctx();
1768 let mut producer = endpoint.create_producer(&ctx).unwrap();
1769 producer
1770 .call(Exchange::new(Message::new("y")))
1771 .await
1772 .unwrap();
1773
1774 inner
1775 .await_exchanges_with_timeout(1, std::time::Duration::from_millis(200))
1776 .await;
1777 }
1778
1779 #[tokio::test]
1784 async fn test_expect_header_regex_match() {
1785 let component = MockComponent::new();
1786 let endpoint = component
1787 .create_endpoint("mock:re-hdr", &NoOpComponentContext)
1788 .unwrap();
1789 let inner = component.get_endpoint("re-hdr").unwrap();
1790
1791 inner.expect_header_regex("x-trace-id", r"^[a-f0-9]{8}$");
1792
1793 let ctx = test_producer_ctx();
1794 let mut producer = endpoint.create_producer(&ctx).unwrap();
1795 let mut msg = Message::new("body");
1796 msg.headers
1797 .insert("x-trace-id".into(), serde_json::json!("deadbeef"));
1798 producer.call(Exchange::new(msg)).await.unwrap();
1799
1800 inner.assert_satisfied().await;
1801 }
1802
1803 #[tokio::test]
1804 #[should_panic(expected = "no received exchange has header")]
1805 async fn test_expect_header_regex_no_match() {
1806 let component = MockComponent::new();
1807 let endpoint = component
1808 .create_endpoint("mock:re-hdr-fail", &NoOpComponentContext)
1809 .unwrap();
1810 let inner = component.get_endpoint("re-hdr-fail").unwrap();
1811
1812 inner.expect_header_regex("x-trace-id", r"^\d+$");
1813
1814 let ctx = test_producer_ctx();
1815 let mut producer = endpoint.create_producer(&ctx).unwrap();
1816 let mut msg = Message::new("body");
1817 msg.headers
1818 .insert("x-trace-id".into(), serde_json::json!("abc"));
1819 producer.call(Exchange::new(msg)).await.unwrap();
1820
1821 inner.assert_satisfied().await;
1822 }
1823
1824 #[tokio::test]
1829 async fn test_any_order_bodies_match() {
1830 let config = MockConfig {
1831 any_order: true,
1832 ..Default::default()
1833 };
1834 let component = MockComponent::with_config(config);
1835 let endpoint = component
1836 .create_endpoint("mock:anyorder", &NoOpComponentContext)
1837 .unwrap();
1838 let inner = component.get_endpoint("anyorder").unwrap();
1839
1840 inner.expect_body(camel_component_api::Body::Text("beta".into()));
1841 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1842
1843 let ctx = test_producer_ctx();
1844 let mut producer = endpoint.create_producer(&ctx).unwrap();
1845 producer
1846 .call(Exchange::new(Message::new("alpha")))
1847 .await
1848 .unwrap();
1849 producer
1850 .call(Exchange::new(Message::new("beta")))
1851 .await
1852 .unwrap();
1853
1854 inner.assert_satisfied().await;
1855 }
1856
1857 #[tokio::test]
1858 #[should_panic(expected = "not found in received exchanges (anyOrder mode)")]
1859 async fn test_any_order_bodies_missing() {
1860 let config = MockConfig {
1861 any_order: true,
1862 ..Default::default()
1863 };
1864 let component = MockComponent::with_config(config);
1865 let endpoint = component
1866 .create_endpoint("mock:anyorder-fail", &NoOpComponentContext)
1867 .unwrap();
1868 let inner = component.get_endpoint("anyorder-fail").unwrap();
1869
1870 inner.expect_body(camel_component_api::Body::Text("gamma".into()));
1871 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1872
1873 let ctx = test_producer_ctx();
1874 let mut producer = endpoint.create_producer(&ctx).unwrap();
1875 producer
1876 .call(Exchange::new(Message::new("alpha")))
1877 .await
1878 .unwrap();
1879 producer
1880 .call(Exchange::new(Message::new("beta")))
1881 .await
1882 .unwrap();
1883
1884 inner.assert_satisfied().await;
1885 }
1886
1887 #[tokio::test]
1892 async fn test_tracing_logs_exchange_received() {
1893 let ctx = test_producer_ctx();
1895 let component = MockComponent::new();
1896 let endpoint = component
1897 .create_endpoint("mock:trace", &NoOpComponentContext)
1898 .unwrap();
1899 let mut producer = endpoint.create_producer(&ctx).unwrap();
1900 producer
1901 .call(Exchange::new(Message::new("traced")))
1902 .await
1903 .unwrap();
1904
1905 let inner = component.get_endpoint("trace").unwrap();
1906 inner.assert_exchange_count(1).await;
1907 }
1908
1909 #[test]
1914 fn test_mock_config_new() {
1915 let cfg = MockConfig::new(42);
1916 assert_eq!(cfg.max_retained, 42);
1917 assert!(!cfg.copy_on_exchange);
1918 assert!(!cfg.fail_fast);
1919 assert!(!cfg.any_order);
1920 }
1921}