1use std::collections::{HashMap, VecDeque};
28use std::future::Future;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use tokio::sync::{Mutex, Notify};
34use tower::Service;
35
36use camel_component_api::parse_uri;
37use camel_component_api::{BoxProcessor, CamelError, Exchange};
38use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
39use tracing::debug;
40
41const DEFAULT_MAX_RETAINED: usize = 10_000;
43
44#[derive(Clone, Debug)]
67pub struct MockConfig {
68 pub max_retained: usize,
71 pub copy_on_exchange: bool,
75 pub fail_fast: bool,
78 pub assert_period_ms: u64,
81 pub any_order: bool,
84}
85
86impl Default for MockConfig {
87 fn default() -> Self {
88 Self {
89 max_retained: DEFAULT_MAX_RETAINED,
90 copy_on_exchange: false,
91 fail_fast: false,
92 assert_period_ms: 0,
93 any_order: false,
94 }
95 }
96}
97
98impl MockConfig {
99 pub fn new(max_retained: usize) -> Self {
101 Self {
102 max_retained,
103 ..Self::default()
104 }
105 }
106}
107
108pub struct MockExpectations {
118 expected_bodies: Vec<camel_component_api::Body>,
119 expected_headers: Vec<(String, serde_json::Value)>,
120 expected_header_regexes: Vec<(String, String)>,
121}
122
123impl Default for MockExpectations {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129impl MockExpectations {
130 pub fn new() -> Self {
132 Self {
133 expected_bodies: Vec::new(),
134 expected_headers: Vec::new(),
135 expected_header_regexes: Vec::new(),
136 }
137 }
138
139 pub fn push_body(&mut self, body: camel_component_api::Body) {
141 self.expected_bodies.push(body);
142 }
143
144 pub fn push_header(&mut self, key: String, value: serde_json::Value) {
146 self.expected_headers.push((key, value));
147 }
148
149 pub fn push_header_regex(&mut self, key: String, pattern: String) {
151 self.expected_header_regexes.push((key, pattern));
152 }
153}
154
155#[derive(Clone)]
170pub struct MockComponent {
171 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
172 config: MockConfig,
173}
174
175impl MockComponent {
176 pub fn new() -> Self {
177 Self::with_config(MockConfig::default())
178 }
179
180 pub fn with_config(config: MockConfig) -> Self {
182 Self {
183 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
184 config,
185 }
186 }
187
188 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
192 let registry = self
193 .registry
194 .lock()
195 .expect("mutex poisoned: another thread panicked while holding this lock"); registry.get(name).cloned()
197 }
198}
199
200impl Default for MockComponent {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl Component for MockComponent {
207 fn scheme(&self) -> &str {
208 "mock"
209 }
210
211 fn create_endpoint(
212 &self,
213 uri: &str,
214 _ctx: &dyn camel_component_api::ComponentContext,
215 ) -> Result<Box<dyn Endpoint>, CamelError> {
216 let parts = parse_uri(uri)?;
217 if parts.scheme != "mock" {
218 return Err(CamelError::InvalidUri(format!(
219 "expected scheme 'mock', got '{}'",
220 parts.scheme
221 )));
222 }
223
224 let name = parts.path;
225 if name.is_empty() {
226 return Err(CamelError::InvalidUri(
227 "mock endpoint name must be non-empty (use 'mock:<name>')".to_string(),
228 ));
229 }
230 let mut registry = self.registry.lock().map_err(|e| {
231 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
232 })?;
233 let max_retained = self.config.max_retained;
234 let copy_on_exchange = self.config.copy_on_exchange;
235 let fail_fast = self.config.fail_fast;
236 let assert_period_ms = self.config.assert_period_ms;
237 let any_order = self.config.any_order;
238 let inner = registry
239 .entry(name.clone())
240 .or_insert_with(|| {
241 Arc::new(MockEndpointInner {
242 uri: uri.to_string(),
243 name,
244 received: Arc::new(Mutex::new(VecDeque::new())),
245 notify: Arc::new(Notify::new()),
246 max_retained,
247 copy_on_exchange,
248 fail_fast,
249 fail_fast_error: Arc::new(std::sync::Mutex::new(None)),
250 assert_period_ms,
251 any_order,
252 expectations: Arc::new(std::sync::Mutex::new(MockExpectations::new())),
253 })
254 })
255 .clone();
256
257 debug!(endpoint_name = %inner.name, "mock endpoint created");
258 Ok(Box::new(MockEndpoint(inner)))
259 }
260}
261
262pub struct MockEndpoint(Arc<MockEndpointInner>);
272
273pub struct MockEndpointInner {
279 uri: String,
280 pub name: String,
281 received: Arc<Mutex<VecDeque<Exchange>>>,
282 notify: Arc<Notify>,
283 max_retained: usize,
284 copy_on_exchange: bool,
285 fail_fast: bool,
286 fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
287 assert_period_ms: u64,
288 any_order: bool,
289 expectations: Arc<std::sync::Mutex<MockExpectations>>,
290}
291
292impl MockEndpointInner {
293 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
295 self.received.lock().await.iter().cloned().collect()
296 }
297
298 pub async fn received_count(&self) -> usize {
300 self.received.lock().await.len()
301 }
302
303 pub async fn reset(&self) {
307 self.received.lock().await.clear();
308 if let Ok(mut guard) = self.fail_fast_error.lock() {
309 *guard = None;
310 }
311 }
312
313 pub async fn assert_exchange_count(&self, expected: usize) {
319 let actual = self.received.lock().await.len();
320 assert_eq!(
321 actual, expected,
322 "MockEndpoint expected {expected} exchanges, got {actual}"
323 );
324 }
325
326 pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
335 let deadline = tokio::time::Instant::now() + timeout;
336 loop {
337 {
338 let received = self.received.lock().await;
339 if received.len() >= count {
340 return;
341 }
342 }
343 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
344 if remaining.is_zero() {
345 let got = self.received.lock().await.len();
348 if got >= count {
349 return;
350 }
351 panic!(
352 "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
353 self.name, count, got, timeout
354 );
355 }
356 tokio::select! {
357 _ = self.notify.notified() => {}
358 _ = tokio::time::sleep(remaining) => {}
359 }
360 }
361 }
362
363 pub async fn await_exchanges_with_timeout(&self, count: usize, fallback: std::time::Duration) {
368 let duration = if self.assert_period_ms > 0 {
369 std::time::Duration::from_millis(self.assert_period_ms)
370 } else {
371 fallback
372 };
373 self.await_exchanges(count, duration).await;
374 }
375
376 pub fn exchange(&self, idx: usize) -> ExchangeAssert {
390 let received = tokio::task::block_in_place(|| self.received.blocking_lock());
391 if idx >= received.len() {
392 panic!(
393 "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
394 self.name,
395 idx,
396 received.len()
397 );
398 }
399 ExchangeAssert {
400 exchange: received[idx].clone(),
401 idx,
402 endpoint_name: self.name.clone(),
403 }
404 }
405
406 pub fn expect_body(&self, body: camel_component_api::Body) {
408 if let Ok(mut guard) = self.expectations.lock() {
409 guard.push_body(body);
410 }
411 }
412
413 pub fn expect_header(&self, key: &str, value: impl Into<serde_json::Value>) {
415 if let Ok(mut guard) = self.expectations.lock() {
416 guard.push_header(key.to_string(), value.into());
417 }
418 }
419
420 pub fn expect_header_regex(&self, key: &str, pattern: &str) {
425 if let Ok(mut guard) = self.expectations.lock() {
426 guard.push_header_regex(key.to_string(), pattern.to_string());
427 }
428 }
429
430 pub async fn assert_satisfied(&self) {
438 let received = self.get_received_exchanges().await;
439
440 {
442 let guard = self
443 .expectations
444 .lock()
445 .expect("expectations lock poisoned"); if !guard.expected_bodies.is_empty() {
447 let received_bodies: Vec<_> = received.iter().map(|e| &e.input.body).collect();
448 if guard.expected_bodies.len() != received_bodies.len() {
449 panic!(
450 "MockEndpoint '{}': expected {} bodies, got {}",
451 self.name,
452 guard.expected_bodies.len(),
453 received_bodies.len()
454 );
455 }
456 if self.any_order {
457 let mut unmatched: Vec<_> = received_bodies.iter().collect();
459 for expected in &guard.expected_bodies {
460 let idx = unmatched
461 .iter()
462 .position(|actual| body_eq(expected, actual));
463 match idx {
464 Some(i) => {
465 unmatched.remove(i);
466 }
467 None => panic!(
468 "MockEndpoint '{}': expected body {:?} not found in received exchanges (anyOrder mode)",
469 self.name, expected
470 ),
471 }
472 }
473 } else {
474 for (i, expected) in guard.expected_bodies.iter().enumerate() {
475 if !body_eq(expected, received_bodies[i]) {
476 panic!(
477 "MockEndpoint '{}': body[{}] expected {:?}, got {:?}",
478 self.name, i, expected, received_bodies[i]
479 );
480 }
481 }
482 }
483 }
484
485 for (key, value) in &guard.expected_headers {
487 let found = received
488 .iter()
489 .any(|ex| ex.input.headers.get(key).is_some_and(|v| v == value));
490 if !found {
491 panic!(
492 "MockEndpoint '{}': expected header '{}' = {} not found in any received exchange",
493 self.name, key, value
494 );
495 }
496 }
497
498 for (key, pattern) in &guard.expected_header_regexes {
500 let re = regex::Regex::new(pattern).unwrap_or_else(|e| {
501 panic!(
502 "MockEndpoint '{}': invalid regex pattern {:?}: {e}",
503 self.name, pattern
504 )
505 });
506 let found = received.iter().any(|ex| {
507 ex.input.headers.get(key).is_some_and(|v| {
508 let s = match v {
509 serde_json::Value::String(s) => s.clone(),
510 other => other.to_string(),
511 };
512 re.is_match(&s)
513 })
514 });
515 if !found {
516 panic!(
517 "MockEndpoint '{}': no received exchange has header '{}' matching regex {:?}",
518 self.name, key, pattern
519 );
520 }
521 }
522 }
523 }
524
525 pub fn fail_fast_error(&self) -> Option<CamelError> {
527 self.fail_fast_error.lock().ok().and_then(|g| g.clone())
528 }
529}
530
531fn body_eq(a: &camel_component_api::Body, b: &camel_component_api::Body) -> bool {
533 match (a, b) {
534 (camel_component_api::Body::Empty, camel_component_api::Body::Empty) => true,
535 (camel_component_api::Body::Text(a), camel_component_api::Body::Text(b)) => a == b,
536 (camel_component_api::Body::Json(a), camel_component_api::Body::Json(b)) => a == b,
537 (camel_component_api::Body::Xml(a), camel_component_api::Body::Xml(b)) => a == b,
538 (camel_component_api::Body::Bytes(a), camel_component_api::Body::Bytes(b)) => a == b,
539 _ => false,
540 }
541}
542
543impl Endpoint for MockEndpoint {
544 fn uri(&self) -> &str {
545 &self.0.uri
546 }
547
548 fn create_consumer(
549 &self,
550 _rt: Arc<dyn RuntimeObservability>,
551 ) -> Result<Box<dyn Consumer>, CamelError> {
552 Err(CamelError::EndpointCreationFailed(
553 "mock endpoint does not support consumers (it is a sink)".to_string(),
554 ))
555 }
556
557 fn create_producer(
558 &self,
559 _rt: Arc<dyn RuntimeObservability>,
560 _ctx: &ProducerContext,
561 ) -> Result<BoxProcessor, CamelError> {
562 Ok(BoxProcessor::new(MockProducer {
563 name: self.0.name.clone(),
564 received: Arc::clone(&self.0.received),
565 notify: Arc::clone(&self.0.notify),
566 max_retained: self.0.max_retained,
567 copy_on_exchange: self.0.copy_on_exchange,
568 fail_fast: self.0.fail_fast,
569 fail_fast_error: Arc::clone(&self.0.fail_fast_error),
570 }))
571 }
572}
573
574#[derive(Clone)]
580struct MockProducer {
581 name: String,
582 received: Arc<Mutex<VecDeque<Exchange>>>,
583 notify: Arc<Notify>,
584 max_retained: usize,
585 copy_on_exchange: bool,
586 fail_fast: bool,
587 fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
588}
589
590impl Service<Exchange> for MockProducer {
591 type Response = Exchange;
592 type Error = CamelError;
593 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
594
595 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
596 if self.fail_fast
598 && let Ok(guard) = self.fail_fast_error.lock()
599 && guard.is_some()
600 {
601 return Poll::Ready(Err(CamelError::ProcessorError(
602 "mock endpoint in fail-fast mode: a previous exchange caused an error".to_string(),
603 )));
604 }
605 Poll::Ready(Ok(()))
606 }
607
608 fn call(&mut self, exchange: Exchange) -> Self::Future {
609 let name = self.name.clone();
610 let received = Arc::clone(&self.received);
611 let notify = Arc::clone(&self.notify);
612 let max_retained = self.max_retained;
613 let copy_on_exchange = self.copy_on_exchange;
614 let fail_fast = self.fail_fast;
615 let fail_fast_error = Arc::clone(&self.fail_fast_error);
616 Box::pin(async move {
617 if fail_fast
619 && let Ok(guard) = fail_fast_error.lock()
620 && guard.is_some()
621 {
622 return Err(CamelError::ProcessorError(
623 "mock endpoint in fail-fast mode: a previous exchange caused an error"
624 .to_string(),
625 ));
626 }
627
628 let correlation_id = exchange
629 .input
630 .headers
631 .get("CamelCorrelationId")
632 .and_then(|v| v.as_str())
633 .map(|s| s.to_string());
634
635 let exchange_to_store = if copy_on_exchange {
636 let mut cloned = exchange.clone();
637 cloned.input.body = clone_body(&exchange.input.body);
639 cloned
640 } else {
641 exchange.clone()
642 };
643
644 let mut guard = received.lock().await;
645 if guard.len() >= max_retained {
646 tracing::warn!(
647 endpoint_name = %name,
648 max = max_retained,
649 "max retained exchanges reached, dropping oldest"
650 );
651 guard.pop_front();
652 }
653 guard.push_back(exchange_to_store);
654 let count = guard.len();
655 drop(guard);
656
657 debug!(
658 endpoint_name = %name,
659 count = %count,
660 correlation_id = correlation_id.as_deref().unwrap_or("none"),
661 "exchange recorded on mock"
662 );
663 notify.notify_waiters();
664
665 Ok(exchange)
666 })
667 }
668}
669
670fn clone_body(body: &camel_component_api::Body) -> camel_component_api::Body {
672 match body {
673 camel_component_api::Body::Empty => camel_component_api::Body::Empty,
674 camel_component_api::Body::Text(s) => camel_component_api::Body::Text(s.clone()),
675 camel_component_api::Body::Json(v) => camel_component_api::Body::Json(v.clone()),
676 camel_component_api::Body::Xml(s) => camel_component_api::Body::Xml(s.clone()),
677 camel_component_api::Body::Bytes(b) => camel_component_api::Body::Bytes(b.clone()),
678 camel_component_api::Body::Stream(_) => {
679 camel_component_api::Body::Empty
681 }
682 }
683}
684
685pub struct ExchangeAssert {
697 exchange: Exchange,
698 idx: usize,
699 endpoint_name: String,
700}
701
702impl ExchangeAssert {
703 fn location(&self) -> String {
704 format!(
705 "MockEndpoint '{}' exchange[{}]",
706 self.endpoint_name, self.idx
707 )
708 }
709
710 pub fn assert_body_text(self, expected: &str) -> Self {
712 match self.exchange.input.body.as_text() {
713 Some(actual) if actual == expected => {}
714 Some(actual) => panic!(
715 "{}: expected body text {:?}, got {:?}",
716 self.location(),
717 expected,
718 actual
719 ),
720 None => panic!(
721 "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
722 self.location(),
723 expected,
724 self.exchange.input.body
725 ),
726 }
727 self
728 }
729
730 pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
732 match &self.exchange.input.body {
733 camel_component_api::Body::Json(actual) if *actual == expected => {}
734 camel_component_api::Body::Json(actual) => panic!(
735 "{}: expected body JSON {}, got {}",
736 self.location(),
737 expected,
738 actual
739 ),
740 other => panic!(
741 "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
742 self.location(),
743 expected,
744 other
745 ),
746 }
747 self
748 }
749
750 pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
752 match &self.exchange.input.body {
753 camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
754 camel_component_api::Body::Bytes(actual) => panic!(
755 "{}: expected body bytes {:?}, got {:?}",
756 self.location(),
757 expected,
758 actual
759 ),
760 other => panic!(
761 "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
762 self.location(),
763 expected,
764 other
765 ),
766 }
767 self
768 }
769
770 pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
776 match self.exchange.input.headers.get(key) {
777 Some(actual) if *actual == expected => {}
778 Some(actual) => panic!(
779 "{}: expected header {:?} = {}, got {}",
780 self.location(),
781 key,
782 expected,
783 actual
784 ),
785 None => panic!(
786 "{}: expected header {:?} = {}, but header is absent",
787 self.location(),
788 key,
789 expected
790 ),
791 }
792 self
793 }
794
795 pub fn assert_header_exists(self, key: &str) -> Self {
801 if !self.exchange.input.headers.contains_key(key) {
802 panic!(
803 "{}: expected header {:?} to be present, but it was absent",
804 self.location(),
805 key
806 );
807 }
808 self
809 }
810
811 pub fn assert_has_error(self) -> Self {
817 if self.exchange.error.is_none() {
818 panic!(
819 "{}: expected exchange to have an error, but error is None",
820 self.location()
821 );
822 }
823 self
824 }
825
826 pub fn assert_no_error(self) -> Self {
832 if let Some(ref err) = self.exchange.error {
833 panic!(
834 "{}: expected exchange to have no error, but got: {}",
835 self.location(),
836 err
837 );
838 }
839 self
840 }
841}
842
843#[cfg(test)]
848mod tests {
849 use camel_component_api::test_support::PanicRuntimeObservability;
850 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
851 std::sync::Arc::new(PanicRuntimeObservability)
852 }
853
854 use super::*;
855 use camel_component_api::Message;
856 use camel_component_api::NoOpComponentContext;
857 use tower::ServiceExt;
858
859 fn test_producer_ctx() -> ProducerContext {
860 ProducerContext::new()
861 }
862
863 #[test]
864 fn test_mock_component_scheme() {
865 let component = MockComponent::new();
866 assert_eq!(component.scheme(), "mock");
867 }
868
869 #[test]
870 fn test_mock_component_default() {
871 let component = MockComponent::default();
872 assert_eq!(component.scheme(), "mock");
873 assert!(component.get_endpoint("missing").is_none());
874 }
875
876 #[test]
877 fn test_mock_creates_endpoint() {
878 let component = MockComponent::new();
879 let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
880 assert!(endpoint.is_ok());
881 }
882
883 #[test]
884 fn test_mock_wrong_scheme() {
885 let component = MockComponent::new();
886 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
887 assert!(result.is_err());
888 }
889
890 #[test]
891 fn test_empty_mock_endpoint_name_rejected() {
892 let component = MockComponent::new();
893 let result = component.create_endpoint("mock:", &NoOpComponentContext);
894 assert!(result.is_err(), "empty mock name should be rejected");
895 }
896
897 #[test]
898 fn test_valid_mock_endpoint_name_accepted() {
899 let component = MockComponent::new();
900 let result = component.create_endpoint("mock:result", &NoOpComponentContext);
901 assert!(result.is_ok());
902 }
903
904 #[test]
905 fn test_mock_endpoint_no_consumer() {
906 let component = MockComponent::new();
907 let endpoint = component
908 .create_endpoint("mock:result", &NoOpComponentContext)
909 .unwrap();
910 assert!(endpoint.create_consumer(rt()).is_err());
911 }
912
913 #[test]
914 fn test_mock_endpoint_creates_producer() {
915 let ctx = test_producer_ctx();
916 let component = MockComponent::new();
917 let endpoint = component
918 .create_endpoint("mock:result", &NoOpComponentContext)
919 .unwrap();
920 assert!(endpoint.create_producer(rt(), &ctx).is_ok());
921 }
922
923 #[test]
924 fn test_mock_endpoint_uri() {
925 let component = MockComponent::new();
926 let endpoint = component
927 .create_endpoint("mock:uri-check", &NoOpComponentContext)
928 .unwrap();
929 assert_eq!(endpoint.uri(), "mock:uri-check");
930 }
931
932 #[test]
933 fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
934 let component = MockComponent::new();
935 let _ = component
936 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
937 .unwrap();
938 let _ = component
939 .create_endpoint("mock:shared-inner", &NoOpComponentContext)
940 .unwrap();
941
942 let first = component.get_endpoint("shared-inner").unwrap();
943 let second = component.get_endpoint("shared-inner").unwrap();
944 assert!(Arc::ptr_eq(&first, &second));
945 }
946
947 #[tokio::test]
948 async fn test_mock_producer_records_exchange() {
949 let ctx = test_producer_ctx();
950 let component = MockComponent::new();
951 let endpoint = component
952 .create_endpoint("mock:test", &NoOpComponentContext)
953 .unwrap();
954
955 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
956
957 let ex1 = Exchange::new(Message::new("first"));
958 let ex2 = Exchange::new(Message::new("second"));
959
960 producer.call(ex1).await.unwrap();
961 producer.call(ex2).await.unwrap();
962
963 let inner = component.get_endpoint("test").unwrap();
964 inner.assert_exchange_count(2).await;
965
966 let received = inner.get_received_exchanges().await;
967 assert_eq!(received[0].input.body.as_text(), Some("first"));
968 assert_eq!(received[1].input.body.as_text(), Some("second"));
969 }
970
971 #[tokio::test]
972 async fn test_mock_producer_passes_through_exchange() {
973 let ctx = test_producer_ctx();
974 let component = MockComponent::new();
975 let endpoint = component
976 .create_endpoint("mock:passthrough", &NoOpComponentContext)
977 .unwrap();
978
979 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
980 let exchange = Exchange::new(Message::new("hello"));
981 let result = producer.oneshot(exchange).await.unwrap();
982
983 assert_eq!(result.input.body.as_text(), Some("hello"));
985 }
986
987 #[tokio::test]
988 async fn test_mock_assert_count_passes() {
989 let component = MockComponent::new();
990 let endpoint = component
991 .create_endpoint("mock:count", &NoOpComponentContext)
992 .unwrap();
993 let inner = component.get_endpoint("count").unwrap();
994
995 inner.assert_exchange_count(0).await;
996
997 let ctx = test_producer_ctx();
998 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
999 producer
1000 .call(Exchange::new(Message::new("one")))
1001 .await
1002 .unwrap();
1003
1004 inner.assert_exchange_count(1).await;
1005 }
1006
1007 #[tokio::test]
1008 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
1009 async fn test_mock_assert_count_fails() {
1010 let component = MockComponent::new();
1011 let _endpoint = component
1014 .create_endpoint("mock:fail", &NoOpComponentContext)
1015 .unwrap();
1016 let inner = component.get_endpoint("fail").unwrap();
1017
1018 inner.assert_exchange_count(5).await;
1019 }
1020
1021 #[tokio::test]
1022 async fn test_mock_component_shared_registry() {
1023 let component = MockComponent::new();
1024 let ep1 = component
1025 .create_endpoint("mock:shared", &NoOpComponentContext)
1026 .unwrap();
1027 let ep2 = component
1028 .create_endpoint("mock:shared", &NoOpComponentContext)
1029 .unwrap();
1030
1031 let ctx = test_producer_ctx();
1033 let mut p1 = ep1.create_producer(rt(), &ctx).unwrap();
1034 p1.call(Exchange::new(Message::new("from-ep1")))
1035 .await
1036 .unwrap();
1037
1038 let mut p2 = ep2.create_producer(rt(), &ctx).unwrap();
1040 p2.call(Exchange::new(Message::new("from-ep2")))
1041 .await
1042 .unwrap();
1043
1044 let inner = component.get_endpoint("shared").unwrap();
1046 inner.assert_exchange_count(2).await;
1047
1048 let received = inner.get_received_exchanges().await;
1049 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
1050 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
1051 }
1052
1053 #[tokio::test]
1054 async fn await_exchanges_resolves_immediately() {
1055 let ctx = test_producer_ctx();
1057 let component = MockComponent::new();
1058 let endpoint = component
1059 .create_endpoint("mock:immediate", &NoOpComponentContext)
1060 .unwrap();
1061 let inner = component.get_endpoint("immediate").unwrap();
1062
1063 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1064 producer
1065 .call(Exchange::new(Message::new("a")))
1066 .await
1067 .unwrap();
1068 producer
1069 .call(Exchange::new(Message::new("b")))
1070 .await
1071 .unwrap();
1072
1073 inner
1075 .await_exchanges(2, std::time::Duration::from_millis(100))
1076 .await;
1077 }
1078
1079 #[tokio::test]
1080 async fn await_exchanges_waits_then_resolves() {
1081 let ctx = test_producer_ctx();
1083 let component = MockComponent::new();
1084 let endpoint = component
1085 .create_endpoint("mock:waiter", &NoOpComponentContext)
1086 .unwrap();
1087 let inner = component.get_endpoint("waiter").unwrap();
1088
1089 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1091 tokio::spawn(async move {
1092 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1093 producer
1094 .call(Exchange::new(Message::new("delayed")))
1095 .await
1096 .unwrap();
1097 });
1098
1099 inner
1101 .await_exchanges(1, std::time::Duration::from_millis(500))
1102 .await;
1103
1104 let received = inner.get_received_exchanges().await;
1105 assert_eq!(received.len(), 1);
1106 assert_eq!(received[0].input.body.as_text(), Some("delayed"));
1107 }
1108
1109 #[tokio::test]
1110 #[should_panic(expected = "timed out waiting for 5 exchanges")]
1111 async fn await_exchanges_times_out() {
1112 let component = MockComponent::new();
1113 let _endpoint = component
1114 .create_endpoint("mock:timeout", &NoOpComponentContext)
1115 .unwrap();
1116 let inner = component.get_endpoint("timeout").unwrap();
1117
1118 inner
1120 .await_exchanges(5, std::time::Duration::from_millis(50))
1121 .await;
1122 }
1123
1124 #[tokio::test(flavor = "multi_thread")]
1125 async fn exchange_idx_returns_assert() {
1126 let ctx = test_producer_ctx();
1127 let component = MockComponent::new();
1128 let endpoint = component
1129 .create_endpoint("mock:assert-idx", &NoOpComponentContext)
1130 .unwrap();
1131 let inner = component.get_endpoint("assert-idx").unwrap();
1132
1133 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1134 producer
1135 .call(Exchange::new(Message::new("hello")))
1136 .await
1137 .unwrap();
1138
1139 inner
1140 .await_exchanges(1, std::time::Duration::from_millis(500))
1141 .await;
1142 let _assert = inner.exchange(0);
1144 }
1145
1146 #[tokio::test(flavor = "multi_thread")]
1147 #[should_panic(expected = "exchange index 5 out of bounds")]
1148 async fn exchange_idx_out_of_bounds() {
1149 let ctx = test_producer_ctx();
1150 let component = MockComponent::new();
1151 let endpoint = component
1152 .create_endpoint("mock:oob", &NoOpComponentContext)
1153 .unwrap();
1154 let inner = component.get_endpoint("oob").unwrap();
1155
1156 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1157 producer
1158 .call(Exchange::new(Message::new("only-one")))
1159 .await
1160 .unwrap();
1161
1162 inner
1163 .await_exchanges(1, std::time::Duration::from_millis(500))
1164 .await;
1165 let _assert = inner.exchange(5);
1167 }
1168
1169 #[tokio::test(flavor = "multi_thread")]
1170 async fn assert_body_text_pass() {
1171 let ctx = test_producer_ctx();
1172 let component = MockComponent::new();
1173 let endpoint = component
1174 .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
1175 .unwrap();
1176 let inner = component.get_endpoint("body-text-pass").unwrap();
1177 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1178 producer
1179 .call(Exchange::new(Message::new("hello")))
1180 .await
1181 .unwrap();
1182 inner
1183 .await_exchanges(1, std::time::Duration::from_millis(500))
1184 .await;
1185 inner.exchange(0).assert_body_text("hello");
1186 }
1187
1188 #[tokio::test(flavor = "multi_thread")]
1189 #[should_panic(expected = "expected body text")]
1190 async fn assert_body_text_fail() {
1191 let ctx = test_producer_ctx();
1192 let component = MockComponent::new();
1193 let endpoint = component
1194 .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
1195 .unwrap();
1196 let inner = component.get_endpoint("body-text-fail").unwrap();
1197 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1198 producer
1199 .call(Exchange::new(Message::new("hello")))
1200 .await
1201 .unwrap();
1202 inner
1203 .await_exchanges(1, std::time::Duration::from_millis(500))
1204 .await;
1205 inner.exchange(0).assert_body_text("world");
1206 }
1207
1208 #[tokio::test(flavor = "multi_thread")]
1209 async fn assert_body_json_pass() {
1210 use camel_component_api::Body;
1211 let ctx = test_producer_ctx();
1212 let component = MockComponent::new();
1213 let endpoint = component
1214 .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
1215 .unwrap();
1216 let inner = component.get_endpoint("body-json-pass").unwrap();
1217 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1218 let mut msg = Message::new("");
1219 msg.body = Body::Json(serde_json::json!({"key": "value"}));
1220 producer.call(Exchange::new(msg)).await.unwrap();
1221 inner
1222 .await_exchanges(1, std::time::Duration::from_millis(500))
1223 .await;
1224 inner
1225 .exchange(0)
1226 .assert_body_json(serde_json::json!({"key": "value"}));
1227 }
1228
1229 #[tokio::test(flavor = "multi_thread")]
1230 #[should_panic(expected = "expected body JSON")]
1231 async fn assert_body_json_fail() {
1232 use camel_component_api::Body;
1233 let ctx = test_producer_ctx();
1234 let component = MockComponent::new();
1235 let endpoint = component
1236 .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
1237 .unwrap();
1238 let inner = component.get_endpoint("body-json-fail").unwrap();
1239 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1240 let mut msg = Message::new("");
1241 msg.body = Body::Json(serde_json::json!({"key": "value"}));
1242 producer.call(Exchange::new(msg)).await.unwrap();
1243 inner
1244 .await_exchanges(1, std::time::Duration::from_millis(500))
1245 .await;
1246 inner
1247 .exchange(0)
1248 .assert_body_json(serde_json::json!({"key": "other"}));
1249 }
1250
1251 #[tokio::test(flavor = "multi_thread")]
1252 async fn assert_body_bytes_pass() {
1253 use bytes::Bytes;
1254 use camel_component_api::Body;
1255 let ctx = test_producer_ctx();
1256 let component = MockComponent::new();
1257 let endpoint = component
1258 .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
1259 .unwrap();
1260 let inner = component.get_endpoint("body-bytes-pass").unwrap();
1261 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1262 let mut msg = Message::new("");
1263 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1264 producer.call(Exchange::new(msg)).await.unwrap();
1265 inner
1266 .await_exchanges(1, std::time::Duration::from_millis(500))
1267 .await;
1268 inner.exchange(0).assert_body_bytes(b"binary");
1269 }
1270
1271 #[tokio::test(flavor = "multi_thread")]
1272 #[should_panic(expected = "expected body bytes")]
1273 async fn assert_body_bytes_fail() {
1274 use bytes::Bytes;
1275 use camel_component_api::Body;
1276 let ctx = test_producer_ctx();
1277 let component = MockComponent::new();
1278 let endpoint = component
1279 .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
1280 .unwrap();
1281 let inner = component.get_endpoint("body-bytes-fail").unwrap();
1282 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1283 let mut msg = Message::new("");
1284 msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1285 producer.call(Exchange::new(msg)).await.unwrap();
1286 inner
1287 .await_exchanges(1, std::time::Duration::from_millis(500))
1288 .await;
1289 inner.exchange(0).assert_body_bytes(b"different");
1290 }
1291
1292 #[tokio::test(flavor = "multi_thread")]
1293 async fn assert_header_pass() {
1294 let ctx = test_producer_ctx();
1295 let component = MockComponent::new();
1296 let endpoint = component
1297 .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
1298 .unwrap();
1299 let inner = component.get_endpoint("hdr-pass").unwrap();
1300 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1301 let mut msg = Message::new("body");
1302 msg.headers
1303 .insert("x-key".to_string(), serde_json::json!("value"));
1304 producer.call(Exchange::new(msg)).await.unwrap();
1305 inner
1306 .await_exchanges(1, std::time::Duration::from_millis(500))
1307 .await;
1308 inner
1309 .exchange(0)
1310 .assert_header("x-key", serde_json::json!("value"));
1311 }
1312
1313 #[tokio::test(flavor = "multi_thread")]
1314 #[should_panic(expected = "expected header")]
1315 async fn assert_header_fail() {
1316 let ctx = test_producer_ctx();
1317 let component = MockComponent::new();
1318 let endpoint = component
1319 .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
1320 .unwrap();
1321 let inner = component.get_endpoint("hdr-fail").unwrap();
1322 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1323 let mut msg = Message::new("body");
1324 msg.headers
1325 .insert("x-key".to_string(), serde_json::json!("value"));
1326 producer.call(Exchange::new(msg)).await.unwrap();
1327 inner
1328 .await_exchanges(1, std::time::Duration::from_millis(500))
1329 .await;
1330 inner
1331 .exchange(0)
1332 .assert_header("x-key", serde_json::json!("other"));
1333 }
1334
1335 #[tokio::test(flavor = "multi_thread")]
1336 async fn assert_header_exists_pass() {
1337 let ctx = test_producer_ctx();
1338 let component = MockComponent::new();
1339 let endpoint = component
1340 .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
1341 .unwrap();
1342 let inner = component.get_endpoint("hdr-exists-pass").unwrap();
1343 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1344 let mut msg = Message::new("body");
1345 msg.headers
1346 .insert("x-present".to_string(), serde_json::json!(42));
1347 producer.call(Exchange::new(msg)).await.unwrap();
1348 inner
1349 .await_exchanges(1, std::time::Duration::from_millis(500))
1350 .await;
1351 inner.exchange(0).assert_header_exists("x-present");
1352 }
1353
1354 #[tokio::test(flavor = "multi_thread")]
1355 #[should_panic(expected = "expected header")]
1356 async fn assert_header_exists_fail() {
1357 let ctx = test_producer_ctx();
1358 let component = MockComponent::new();
1359 let endpoint = component
1360 .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
1361 .unwrap();
1362 let inner = component.get_endpoint("hdr-exists-fail").unwrap();
1363 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1364 producer
1365 .call(Exchange::new(Message::new("body")))
1366 .await
1367 .unwrap();
1368 inner
1369 .await_exchanges(1, std::time::Duration::from_millis(500))
1370 .await;
1371 inner.exchange(0).assert_header_exists("x-missing");
1372 }
1373
1374 #[tokio::test(flavor = "multi_thread")]
1375 async fn assert_has_error_pass() {
1376 let ctx = test_producer_ctx();
1377 let component = MockComponent::new();
1378 let endpoint = component
1379 .create_endpoint("mock:err-pass", &NoOpComponentContext)
1380 .unwrap();
1381 let inner = component.get_endpoint("err-pass").unwrap();
1382 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1383 let mut ex = Exchange::new(Message::new("body"));
1384 ex.set_error(camel_component_api::CamelError::ProcessorError(
1385 "oops".to_string(),
1386 ));
1387 producer.call(ex).await.unwrap();
1388 inner
1389 .await_exchanges(1, std::time::Duration::from_millis(500))
1390 .await;
1391 inner.exchange(0).assert_has_error();
1392 }
1393
1394 #[tokio::test(flavor = "multi_thread")]
1395 #[should_panic(expected = "expected exchange to have an error")]
1396 async fn assert_has_error_fail() {
1397 let ctx = test_producer_ctx();
1398 let component = MockComponent::new();
1399 let endpoint = component
1400 .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
1401 .unwrap();
1402 let inner = component.get_endpoint("has-err-fail").unwrap();
1403 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1404 producer
1405 .call(Exchange::new(Message::new("body")))
1406 .await
1407 .unwrap();
1408 inner
1409 .await_exchanges(1, std::time::Duration::from_millis(500))
1410 .await;
1411 inner.exchange(0).assert_has_error();
1412 }
1413
1414 #[tokio::test(flavor = "multi_thread")]
1415 async fn assert_no_error_pass() {
1416 let ctx = test_producer_ctx();
1417 let component = MockComponent::new();
1418 let endpoint = component
1419 .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
1420 .unwrap();
1421 let inner = component.get_endpoint("no-err-pass").unwrap();
1422 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1423 producer
1424 .call(Exchange::new(Message::new("body")))
1425 .await
1426 .unwrap();
1427 inner
1428 .await_exchanges(1, std::time::Duration::from_millis(500))
1429 .await;
1430 inner.exchange(0).assert_no_error();
1431 }
1432
1433 #[tokio::test]
1438 async fn test_mock_reset_clears_exchanges() {
1439 let component = MockComponent::new();
1440 let endpoint = component
1441 .create_endpoint("mock:reset-test", &NoOpComponentContext)
1442 .unwrap();
1443 let inner = component.get_endpoint("reset-test").unwrap();
1444
1445 let ctx = test_producer_ctx();
1446 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1447 producer
1448 .call(Exchange::new(Message::new("a")))
1449 .await
1450 .unwrap();
1451 producer
1452 .call(Exchange::new(Message::new("b")))
1453 .await
1454 .unwrap();
1455
1456 assert_eq!(inner.received_count().await, 2);
1457 inner.reset().await;
1458 assert_eq!(inner.received_count().await, 0);
1459 }
1460
1461 #[tokio::test]
1462 async fn test_mock_bounded_retention_drops_oldest() {
1463 let config = MockConfig {
1464 max_retained: 3,
1465 ..Default::default()
1466 };
1467 let component = MockComponent::with_config(config);
1468 let endpoint = component
1469 .create_endpoint("mock:bounded", &NoOpComponentContext)
1470 .unwrap();
1471 let inner = component.get_endpoint("bounded").unwrap();
1472
1473 let ctx = test_producer_ctx();
1474 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1475
1476 for i in 0..5 {
1478 producer
1479 .call(Exchange::new(Message::new(format!("msg-{i}"))))
1480 .await
1481 .unwrap();
1482 }
1483
1484 assert_eq!(inner.received_count().await, 3);
1485 let received = inner.get_received_exchanges().await;
1486 assert_eq!(received[0].input.body.as_text(), Some("msg-2"));
1488 assert_eq!(received[1].input.body.as_text(), Some("msg-3"));
1489 assert_eq!(received[2].input.body.as_text(), Some("msg-4"));
1490 }
1491
1492 #[tokio::test]
1493 async fn test_mock_reset_then_record_again() {
1494 let component = MockComponent::new();
1495 let endpoint = component
1496 .create_endpoint("mock:reset-reuse", &NoOpComponentContext)
1497 .unwrap();
1498 let inner = component.get_endpoint("reset-reuse").unwrap();
1499
1500 let ctx = test_producer_ctx();
1501 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1502 producer
1503 .call(Exchange::new(Message::new("before-reset")))
1504 .await
1505 .unwrap();
1506 inner.reset().await;
1507
1508 producer
1509 .call(Exchange::new(Message::new("after-reset")))
1510 .await
1511 .unwrap();
1512
1513 let received = inner.get_received_exchanges().await;
1514 assert_eq!(received.len(), 1);
1515 assert_eq!(received[0].input.body.as_text(), Some("after-reset"));
1516 }
1517
1518 #[tokio::test(flavor = "multi_thread")]
1519 #[should_panic(expected = "expected exchange to have no error")]
1520 async fn assert_no_error_fail() {
1521 let ctx = test_producer_ctx();
1522 let component = MockComponent::new();
1523 let endpoint = component
1524 .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
1525 .unwrap();
1526 let inner = component.get_endpoint("no-err-fail").unwrap();
1527 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1528 let mut ex = Exchange::new(Message::new("body"));
1529 ex.set_error(camel_component_api::CamelError::ProcessorError(
1530 "oops".to_string(),
1531 ));
1532 producer.call(ex).await.unwrap();
1533 inner
1534 .await_exchanges(1, std::time::Duration::from_millis(500))
1535 .await;
1536 inner.exchange(0).assert_no_error();
1537 }
1538
1539 #[tokio::test]
1544 async fn test_copy_on_exchange_stores_cloned_body() {
1545 let config = MockConfig {
1546 copy_on_exchange: true,
1547 ..Default::default()
1548 };
1549 let component = MockComponent::with_config(config);
1550 let endpoint = component
1551 .create_endpoint("mock:copy", &NoOpComponentContext)
1552 .unwrap();
1553 let inner = component.get_endpoint("copy").unwrap();
1554
1555 let ctx = test_producer_ctx();
1556 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1557
1558 let mut msg = Message::new("original");
1559 msg.headers.insert("x-test".into(), serde_json::json!(1));
1560 let ex = Exchange::new(msg);
1561 producer.call(ex).await.unwrap();
1562
1563 let received = inner.get_received_exchanges().await;
1564 assert_eq!(received[0].input.body.as_text(), Some("original"));
1565 }
1566
1567 #[tokio::test]
1568 async fn test_copy_on_exchange_false_shares_storage() {
1569 let config = MockConfig {
1570 copy_on_exchange: false,
1571 ..Default::default()
1572 };
1573 let component = MockComponent::with_config(config);
1574 let endpoint = component
1575 .create_endpoint("mock:no-copy", &NoOpComponentContext)
1576 .unwrap();
1577 let inner = component.get_endpoint("no-copy").unwrap();
1578
1579 let ctx = test_producer_ctx();
1580 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1581
1582 producer
1583 .call(Exchange::new(Message::new("direct")))
1584 .await
1585 .unwrap();
1586
1587 let received = inner.get_received_exchanges().await;
1588 assert_eq!(received[0].input.body.as_text(), Some("direct"));
1589 }
1590
1591 #[tokio::test]
1596 async fn test_assert_satisfied_bodies_in_order() {
1597 let component = MockComponent::new();
1598 let endpoint = component
1599 .create_endpoint("mock:sat-bodies", &NoOpComponentContext)
1600 .unwrap();
1601 let inner = component.get_endpoint("sat-bodies").unwrap();
1602
1603 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1604 inner.expect_body(camel_component_api::Body::Text("beta".into()));
1605
1606 let ctx = test_producer_ctx();
1607 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1608 producer
1609 .call(Exchange::new(Message::new("alpha")))
1610 .await
1611 .unwrap();
1612 producer
1613 .call(Exchange::new(Message::new("beta")))
1614 .await
1615 .unwrap();
1616
1617 inner.assert_satisfied().await;
1618 }
1619
1620 #[tokio::test]
1621 #[should_panic(expected = "body[0] expected")]
1622 async fn test_assert_satisfied_bodies_wrong_order_fails() {
1623 let component = MockComponent::new();
1624 let endpoint = component
1625 .create_endpoint("mock:sat-bodies-fail", &NoOpComponentContext)
1626 .unwrap();
1627 let inner = component.get_endpoint("sat-bodies-fail").unwrap();
1628
1629 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1630 inner.expect_body(camel_component_api::Body::Text("beta".into()));
1631
1632 let ctx = test_producer_ctx();
1633 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1634 producer
1635 .call(Exchange::new(Message::new("beta")))
1636 .await
1637 .unwrap();
1638 producer
1639 .call(Exchange::new(Message::new("alpha")))
1640 .await
1641 .unwrap();
1642
1643 inner.assert_satisfied().await;
1644 }
1645
1646 #[tokio::test]
1647 async fn test_assert_satisfied_headers() {
1648 let component = MockComponent::new();
1649 let endpoint = component
1650 .create_endpoint("mock:sat-hdr", &NoOpComponentContext)
1651 .unwrap();
1652 let inner = component.get_endpoint("sat-hdr").unwrap();
1653
1654 inner.expect_header("status", serde_json::json!("ok"));
1655
1656 let ctx = test_producer_ctx();
1657 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1658 let mut msg = Message::new("body");
1659 msg.headers.insert("status".into(), serde_json::json!("ok"));
1660 producer.call(Exchange::new(msg)).await.unwrap();
1661
1662 inner.assert_satisfied().await;
1663 }
1664
1665 #[tokio::test]
1666 #[should_panic(expected = "expected header 'missing' =")]
1667 async fn test_assert_satisfied_headers_missing() {
1668 let component = MockComponent::new();
1669 let endpoint = component
1670 .create_endpoint("mock:sat-hdr-missing", &NoOpComponentContext)
1671 .unwrap();
1672 let inner = component.get_endpoint("sat-hdr-missing").unwrap();
1673
1674 inner.expect_header("missing", serde_json::json!("value"));
1675
1676 let ctx = test_producer_ctx();
1677 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1678 producer
1679 .call(Exchange::new(Message::new("body")))
1680 .await
1681 .unwrap();
1682
1683 inner.assert_satisfied().await;
1684 }
1685
1686 #[tokio::test]
1691 async fn test_fail_fast_rejects_after_first_call() {
1692 let config = MockConfig {
1693 fail_fast: true,
1694 ..Default::default()
1695 };
1696 let component = MockComponent::with_config(config);
1697 let endpoint = component
1698 .create_endpoint("mock:ff", &NoOpComponentContext)
1699 .unwrap();
1700
1701 let ctx = test_producer_ctx();
1702 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1703
1704 producer
1706 .call(Exchange::new(Message::new("ok")))
1707 .await
1708 .unwrap();
1709 }
1710
1711 #[tokio::test]
1712 async fn test_fail_fast_no_error_when_all_good() {
1713 let config = MockConfig {
1714 fail_fast: true,
1715 ..Default::default()
1716 };
1717 let component = MockComponent::with_config(config);
1718 let endpoint = component
1719 .create_endpoint("mock:ff-good", &NoOpComponentContext)
1720 .unwrap();
1721 let inner = component.get_endpoint("ff-good").unwrap();
1722
1723 let ctx = test_producer_ctx();
1724 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1725
1726 producer
1727 .call(Exchange::new(Message::new("a")))
1728 .await
1729 .unwrap();
1730 producer
1731 .call(Exchange::new(Message::new("b")))
1732 .await
1733 .unwrap();
1734
1735 assert!(inner.fail_fast_error().is_none());
1736 inner.assert_exchange_count(2).await;
1737 }
1738
1739 #[tokio::test]
1744 async fn test_await_exchanges_with_timeout_uses_config_period() {
1745 let config = MockConfig {
1746 assert_period_ms: 100,
1747 ..Default::default()
1748 };
1749 let component = MockComponent::with_config(config);
1750 let endpoint = component
1751 .create_endpoint("mock:ap", &NoOpComponentContext)
1752 .unwrap();
1753 let inner = component.get_endpoint("ap").unwrap();
1754
1755 let ctx = test_producer_ctx();
1756 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1757 producer
1758 .call(Exchange::new(Message::new("x")))
1759 .await
1760 .unwrap();
1761
1762 inner
1763 .await_exchanges_with_timeout(1, std::time::Duration::from_millis(1))
1764 .await;
1765 }
1766
1767 #[tokio::test]
1768 async fn test_await_exchanges_with_timeout_uses_fallback_when_zero() {
1769 let config = MockConfig {
1770 assert_period_ms: 0,
1771 ..Default::default()
1772 };
1773 let component = MockComponent::with_config(config);
1774 let endpoint = component
1775 .create_endpoint("mock:ap-fb", &NoOpComponentContext)
1776 .unwrap();
1777 let inner = component.get_endpoint("ap-fb").unwrap();
1778
1779 let ctx = test_producer_ctx();
1780 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1781 producer
1782 .call(Exchange::new(Message::new("y")))
1783 .await
1784 .unwrap();
1785
1786 inner
1787 .await_exchanges_with_timeout(1, std::time::Duration::from_millis(200))
1788 .await;
1789 }
1790
1791 #[tokio::test]
1796 async fn test_expect_header_regex_match() {
1797 let component = MockComponent::new();
1798 let endpoint = component
1799 .create_endpoint("mock:re-hdr", &NoOpComponentContext)
1800 .unwrap();
1801 let inner = component.get_endpoint("re-hdr").unwrap();
1802
1803 inner.expect_header_regex("x-trace-id", r"^[a-f0-9]{8}$");
1804
1805 let ctx = test_producer_ctx();
1806 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1807 let mut msg = Message::new("body");
1808 msg.headers
1809 .insert("x-trace-id".into(), serde_json::json!("deadbeef"));
1810 producer.call(Exchange::new(msg)).await.unwrap();
1811
1812 inner.assert_satisfied().await;
1813 }
1814
1815 #[tokio::test]
1816 #[should_panic(expected = "no received exchange has header")]
1817 async fn test_expect_header_regex_no_match() {
1818 let component = MockComponent::new();
1819 let endpoint = component
1820 .create_endpoint("mock:re-hdr-fail", &NoOpComponentContext)
1821 .unwrap();
1822 let inner = component.get_endpoint("re-hdr-fail").unwrap();
1823
1824 inner.expect_header_regex("x-trace-id", r"^\d+$");
1825
1826 let ctx = test_producer_ctx();
1827 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1828 let mut msg = Message::new("body");
1829 msg.headers
1830 .insert("x-trace-id".into(), serde_json::json!("abc"));
1831 producer.call(Exchange::new(msg)).await.unwrap();
1832
1833 inner.assert_satisfied().await;
1834 }
1835
1836 #[tokio::test]
1841 async fn test_any_order_bodies_match() {
1842 let config = MockConfig {
1843 any_order: true,
1844 ..Default::default()
1845 };
1846 let component = MockComponent::with_config(config);
1847 let endpoint = component
1848 .create_endpoint("mock:anyorder", &NoOpComponentContext)
1849 .unwrap();
1850 let inner = component.get_endpoint("anyorder").unwrap();
1851
1852 inner.expect_body(camel_component_api::Body::Text("beta".into()));
1853 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1854
1855 let ctx = test_producer_ctx();
1856 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1857 producer
1858 .call(Exchange::new(Message::new("alpha")))
1859 .await
1860 .unwrap();
1861 producer
1862 .call(Exchange::new(Message::new("beta")))
1863 .await
1864 .unwrap();
1865
1866 inner.assert_satisfied().await;
1867 }
1868
1869 #[tokio::test]
1870 #[should_panic(expected = "not found in received exchanges (anyOrder mode)")]
1871 async fn test_any_order_bodies_missing() {
1872 let config = MockConfig {
1873 any_order: true,
1874 ..Default::default()
1875 };
1876 let component = MockComponent::with_config(config);
1877 let endpoint = component
1878 .create_endpoint("mock:anyorder-fail", &NoOpComponentContext)
1879 .unwrap();
1880 let inner = component.get_endpoint("anyorder-fail").unwrap();
1881
1882 inner.expect_body(camel_component_api::Body::Text("gamma".into()));
1883 inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1884
1885 let ctx = test_producer_ctx();
1886 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1887 producer
1888 .call(Exchange::new(Message::new("alpha")))
1889 .await
1890 .unwrap();
1891 producer
1892 .call(Exchange::new(Message::new("beta")))
1893 .await
1894 .unwrap();
1895
1896 inner.assert_satisfied().await;
1897 }
1898
1899 #[tokio::test]
1904 async fn test_tracing_logs_exchange_received() {
1905 let ctx = test_producer_ctx();
1907 let component = MockComponent::new();
1908 let endpoint = component
1909 .create_endpoint("mock:trace", &NoOpComponentContext)
1910 .unwrap();
1911 let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1912 producer
1913 .call(Exchange::new(Message::new("traced")))
1914 .await
1915 .unwrap();
1916
1917 let inner = component.get_endpoint("trace").unwrap();
1918 inner.assert_exchange_count(1).await;
1919 }
1920
1921 #[test]
1926 fn test_mock_config_new() {
1927 let cfg = MockConfig::new(42);
1928 assert_eq!(cfg.max_retained, 42);
1929 assert!(!cfg.copy_on_exchange);
1930 assert!(!cfg.fail_fast);
1931 assert!(!cfg.any_order);
1932 }
1933}