Skip to main content

camel_component_mock/
lib.rs

1//! # camel-component-mock
2//!
3//! Mock component for rust-camel — testing utility that records received
4//! exchanges for later assertion, useful for verifying route output in tests.
5//!
6//! Main types: `MockComponent`, `MockEndpoint`, `MockProducer`, `MockExpectations`.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use camel_component_mock::MockComponent;
12//! use camel_component_api::{Component, NoOpComponentContext, Exchange, Message};
13//!
14//! // Create a mock component and endpoint
15//! let component = MockComponent::new();
16//! let endpoint = component
17//!     .create_endpoint("mock:result", &NoOpComponentContext)
18//!     .unwrap();
19//!
20//! // In a real route, the producer would be used as a Tower service.
21//! // After sending exchanges, you can inspect them:
22//! let inner = component.get_endpoint("result").unwrap();
23//! // inner.assert_exchange_count(1).await;
24//! // inner.exchange(0).assert_body_text("hello");
25//! ```
26
27use std::collections::{HashMap, VecDeque};
28use std::future::Future;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use tokio::sync::{Mutex, Notify};
34use tower::Service;
35
36use camel_component_api::parse_uri;
37use camel_component_api::{BoxProcessor, CamelError, Exchange};
38use camel_component_api::{Component, Consumer, Endpoint, ProducerContext};
39use tracing::debug;
40
41/// Default maximum number of exchanges retained by a mock endpoint.
42const DEFAULT_MAX_RETAINED: usize = 10_000;
43
44// ---------------------------------------------------------------------------
45// MockConfig
46// ---------------------------------------------------------------------------
47
48/// Configuration for [`MockComponent`].
49///
50/// Controls how many exchanges are retained before the oldest are dropped,
51/// and other behavioural flags for assertions.
52///
53/// # Examples
54///
55/// ```rust
56/// use camel_component_mock::MockConfig;
57///
58/// let config = MockConfig {
59///     max_retained: 100,
60///     copy_on_exchange: true,
61///     fail_fast: false,
62///     assert_period_ms: 0,
63///     any_order: false,
64/// };
65/// ```
66#[derive(Clone, Debug)]
67pub struct MockConfig {
68    /// Maximum number of exchanges to retain. When exceeded, the oldest
69    /// exchange is dropped. Defaults to 10 000.
70    pub max_retained: usize,
71    /// When `true`, clone the exchange body before storing it in the received
72    /// exchanges list. This prevents aliasing when the caller mutates the
73    /// original exchange after sending. Defaults to `false`.
74    pub copy_on_exchange: bool,
75    /// When `true`, after the first failing assertion the mock stops processing
76    /// exchanges and records the error. Defaults to `false`.
77    pub fail_fast: bool,
78    /// Time in milliseconds to wait before asserting expectations (to allow
79    /// async processing to complete). Defaults to `0` (no wait).
80    pub assert_period_ms: u64,
81    /// When `true`, [`MockEndpointInner::assert_satisfied`] matches expected
82    /// bodies in any order rather than strict sequence. Defaults to `false`.
83    pub any_order: bool,
84}
85
86impl Default for MockConfig {
87    fn default() -> Self {
88        Self {
89            max_retained: DEFAULT_MAX_RETAINED,
90            copy_on_exchange: false,
91            fail_fast: false,
92            assert_period_ms: 0,
93            any_order: false,
94        }
95    }
96}
97
98impl MockConfig {
99    /// Create a config with a custom retention limit.
100    pub fn new(max_retained: usize) -> Self {
101        Self {
102            max_retained,
103            ..Self::default()
104        }
105    }
106}
107
108// ---------------------------------------------------------------------------
109// MockExpectations
110// ---------------------------------------------------------------------------
111
112/// Expectations set on a mock endpoint for batch-style assertion.
113///
114/// Use [`MockEndpointInner::expect_body`] and
115/// [`MockEndpointInner::expect_header`] to populate expectations, then call
116/// [`MockEndpointInner::assert_satisfied`] after exchanges have been received.
117pub struct MockExpectations {
118    expected_bodies: Vec<camel_component_api::Body>,
119    expected_headers: Vec<(String, serde_json::Value)>,
120    expected_header_regexes: Vec<(String, String)>,
121}
122
123impl Default for MockExpectations {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129impl MockExpectations {
130    /// Create an empty set of expectations.
131    pub fn new() -> Self {
132        Self {
133            expected_bodies: Vec::new(),
134            expected_headers: Vec::new(),
135            expected_header_regexes: Vec::new(),
136        }
137    }
138
139    /// Add an expected body value.
140    pub fn push_body(&mut self, body: camel_component_api::Body) {
141        self.expected_bodies.push(body);
142    }
143
144    /// Add an expected header key-value pair.
145    pub fn push_header(&mut self, key: String, value: serde_json::Value) {
146        self.expected_headers.push((key, value));
147    }
148
149    /// Add an expected header regex pattern.
150    pub fn push_header_regex(&mut self, key: String, pattern: String) {
151        self.expected_header_regexes.push((key, pattern));
152    }
153}
154
155// ---------------------------------------------------------------------------
156// MockComponent
157// ---------------------------------------------------------------------------
158
159/// The Mock component is a testing utility that records every exchange it
160/// receives via its producer.  It exposes helpers to inspect and assert on
161/// the recorded exchanges.
162///
163/// URI format: `mock:name`
164///
165/// When `create_endpoint` is called multiple times with the same name, the
166/// returned endpoints share the same received-exchanges storage. This enables
167/// test assertions: create mock, register it, run routes, then inspect via
168/// `component.get_endpoint("name")`.
169#[derive(Clone)]
170pub struct MockComponent {
171    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
172    config: MockConfig,
173}
174
175impl MockComponent {
176    pub fn new() -> Self {
177        Self::with_config(MockConfig::default())
178    }
179
180    /// Create a `MockComponent` with a custom [`MockConfig`].
181    pub fn with_config(config: MockConfig) -> Self {
182        Self {
183            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
184            config,
185        }
186    }
187
188    /// Retrieve a previously created endpoint's inner data by name.
189    ///
190    /// This is the primary way to inspect recorded exchanges in tests.
191    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
192        let registry = self
193            .registry
194            .lock()
195            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
196        registry.get(name).cloned()
197    }
198}
199
200impl Default for MockComponent {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl Component for MockComponent {
207    fn scheme(&self) -> &str {
208        "mock"
209    }
210
211    fn create_endpoint(
212        &self,
213        uri: &str,
214        _ctx: &dyn camel_component_api::ComponentContext,
215    ) -> Result<Box<dyn Endpoint>, CamelError> {
216        let parts = parse_uri(uri)?;
217        if parts.scheme != "mock" {
218            return Err(CamelError::InvalidUri(format!(
219                "expected scheme 'mock', got '{}'",
220                parts.scheme
221            )));
222        }
223
224        let name = parts.path;
225        if name.is_empty() {
226            return Err(CamelError::InvalidUri(
227                "mock endpoint name must be non-empty (use 'mock:<name>')".to_string(),
228            ));
229        }
230        let mut registry = self.registry.lock().map_err(|e| {
231            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
232        })?;
233        let max_retained = self.config.max_retained;
234        let copy_on_exchange = self.config.copy_on_exchange;
235        let fail_fast = self.config.fail_fast;
236        let assert_period_ms = self.config.assert_period_ms;
237        let any_order = self.config.any_order;
238        let inner = registry
239            .entry(name.clone())
240            .or_insert_with(|| {
241                Arc::new(MockEndpointInner {
242                    uri: uri.to_string(),
243                    name,
244                    received: Arc::new(Mutex::new(VecDeque::new())),
245                    notify: Arc::new(Notify::new()),
246                    max_retained,
247                    copy_on_exchange,
248                    fail_fast,
249                    fail_fast_error: Arc::new(std::sync::Mutex::new(None)),
250                    assert_period_ms,
251                    any_order,
252                    expectations: Arc::new(std::sync::Mutex::new(MockExpectations::new())),
253                })
254            })
255            .clone();
256
257        debug!(endpoint_name = %inner.name, "mock endpoint created");
258        Ok(Box::new(MockEndpoint(inner)))
259    }
260}
261
262// ---------------------------------------------------------------------------
263// MockEndpoint / MockEndpointInner
264// ---------------------------------------------------------------------------
265
266/// A mock endpoint that records all exchanges sent to it.
267///
268/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
269/// `MockEndpoint` instances created with the same name share the same inner
270/// storage.
271pub struct MockEndpoint(Arc<MockEndpointInner>);
272
273/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
274/// instances created with the same name via `MockComponent`.
275///
276/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
277/// recorded exchanges in tests.
278pub struct MockEndpointInner {
279    uri: String,
280    pub name: String,
281    received: Arc<Mutex<VecDeque<Exchange>>>,
282    notify: Arc<Notify>,
283    max_retained: usize,
284    copy_on_exchange: bool,
285    fail_fast: bool,
286    fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
287    assert_period_ms: u64,
288    any_order: bool,
289    expectations: Arc<std::sync::Mutex<MockExpectations>>,
290}
291
292impl MockEndpointInner {
293    /// Return a snapshot of all exchanges retained so far.
294    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
295        self.received.lock().await.iter().cloned().collect()
296    }
297
298    /// Return the number of currently retained exchanges.
299    pub async fn received_count(&self) -> usize {
300        self.received.lock().await.len()
301    }
302
303    /// Clear all retained exchanges and reset internal counters.
304    ///
305    /// Useful between test cases to reuse the same mock endpoint.
306    pub async fn reset(&self) {
307        self.received.lock().await.clear();
308        if let Ok(mut guard) = self.fail_fast_error.lock() {
309            *guard = None;
310        }
311    }
312
313    /// Assert that exactly `expected` exchanges have been received.
314    ///
315    /// # Panics
316    ///
317    /// Panics if the count does not match.
318    pub async fn assert_exchange_count(&self, expected: usize) {
319        let actual = self.received.lock().await.len();
320        assert_eq!(
321            actual, expected,
322            "MockEndpoint expected {expected} exchanges, got {actual}"
323        );
324    }
325
326    /// Wait until at least `count` exchanges have been received, or panic on timeout.
327    ///
328    /// Uses `tokio::sync::Notify` — no polling. Returns immediately if `count`
329    /// exchanges are already present.
330    ///
331    /// # Panics
332    ///
333    /// Panics if `timeout` elapses before `count` exchanges arrive.
334    pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
335        let deadline = tokio::time::Instant::now() + timeout;
336        loop {
337            {
338                let received = self.received.lock().await;
339                if received.len() >= count {
340                    return;
341                }
342            }
343            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
344            if remaining.is_zero() {
345                // Re-check in case the final exchange arrived between the lock drop
346                // above and entering the select — Notify does not buffer permits.
347                let got = self.received.lock().await.len();
348                if got >= count {
349                    return;
350                }
351                panic!(
352                    "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
353                    self.name, count, got, timeout
354                );
355            }
356            tokio::select! {
357                _ = self.notify.notified() => {}
358                _ = tokio::time::sleep(remaining) => {}
359            }
360        }
361    }
362
363    /// Wait for exchanges with a configurable timeout derived from `assert_period_ms`.
364    ///
365    /// If `assert_period_ms` is 0, uses the provided `fallback` duration.
366    /// Otherwise, waits for `assert_period_ms` milliseconds before checking.
367    pub async fn await_exchanges_with_timeout(&self, count: usize, fallback: std::time::Duration) {
368        let duration = if self.assert_period_ms > 0 {
369            std::time::Duration::from_millis(self.assert_period_ms)
370        } else {
371            fallback
372        };
373        self.await_exchanges(count, duration).await;
374    }
375
376    /// Return an [`ExchangeAssert`] for the exchange at `idx`.
377    ///
378    /// # Panics
379    ///
380    /// Panics if `idx` is out of bounds. Always call [`await_exchanges`] first
381    /// to ensure the exchange has been received.
382    ///
383    /// Panics if called from a single-threaded tokio runtime. Use
384    /// `#[tokio::test(flavor = "multi_thread")]` for tests that call this method.
385    ///
386    /// [`await_exchanges`]: MockEndpointInner::await_exchanges
387    // NOTE: requires multi-threaded Tokio runtime (current_thread will deadlock)
388    // due to `block_in_place` used for blocking_lock.
389    pub fn exchange(&self, idx: usize) -> ExchangeAssert {
390        let received = tokio::task::block_in_place(|| self.received.blocking_lock());
391        if idx >= received.len() {
392            panic!(
393                "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
394                self.name,
395                idx,
396                received.len()
397            );
398        }
399        ExchangeAssert {
400            exchange: received[idx].clone(),
401            idx,
402            endpoint_name: self.name.clone(),
403        }
404    }
405
406    /// Add an expected body to the expectations list.
407    pub fn expect_body(&self, body: camel_component_api::Body) {
408        if let Ok(mut guard) = self.expectations.lock() {
409            guard.push_body(body);
410        }
411    }
412
413    /// Add an expected header key-value pair to the expectations list.
414    pub fn expect_header(&self, key: &str, value: impl Into<serde_json::Value>) {
415        if let Ok(mut guard) = self.expectations.lock() {
416            guard.push_header(key.to_string(), value.into());
417        }
418    }
419
420    /// Add an expected header regex pattern to the expectations list.
421    ///
422    /// After `await_exchanges()`, `assert_satisfied()` checks whether any
423    /// received exchange has the named header matching the given regex pattern.
424    pub fn expect_header_regex(&self, key: &str, pattern: &str) {
425        if let Ok(mut guard) = self.expectations.lock() {
426            guard.push_header_regex(key.to_string(), pattern.to_string());
427        }
428    }
429
430    /// Assert that all registered expectations are satisfied.
431    ///
432    /// # Panics
433    ///
434    /// Panics if expected bodies do not match received bodies (in order or any
435    /// order depending on `any_order` config), if expected headers are missing,
436    /// or if header regex patterns do not match.
437    pub async fn assert_satisfied(&self) {
438        let received = self.get_received_exchanges().await;
439
440        // Check expected bodies
441        {
442            let guard = self
443                .expectations
444                .lock()
445                .expect("expectations lock poisoned"); // allow-unwrap
446            if !guard.expected_bodies.is_empty() {
447                let received_bodies: Vec<_> = received.iter().map(|e| &e.input.body).collect();
448                if guard.expected_bodies.len() != received_bodies.len() {
449                    panic!(
450                        "MockEndpoint '{}': expected {} bodies, got {}",
451                        self.name,
452                        guard.expected_bodies.len(),
453                        received_bodies.len()
454                    );
455                }
456                if self.any_order {
457                    // Match in any order — each expected body must appear exactly once
458                    let mut unmatched: Vec<_> = received_bodies.iter().collect();
459                    for expected in &guard.expected_bodies {
460                        let idx = unmatched
461                            .iter()
462                            .position(|actual| body_eq(expected, actual));
463                        match idx {
464                            Some(i) => {
465                                unmatched.remove(i);
466                            }
467                            None => panic!(
468                                "MockEndpoint '{}': expected body {:?} not found in received exchanges (anyOrder mode)",
469                                self.name, expected
470                            ),
471                        }
472                    }
473                } else {
474                    for (i, expected) in guard.expected_bodies.iter().enumerate() {
475                        if !body_eq(expected, received_bodies[i]) {
476                            panic!(
477                                "MockEndpoint '{}': body[{}] expected {:?}, got {:?}",
478                                self.name, i, expected, received_bodies[i]
479                            );
480                        }
481                    }
482                }
483            }
484
485            // Check expected headers (must all be present on at least one exchange)
486            for (key, value) in &guard.expected_headers {
487                let found = received
488                    .iter()
489                    .any(|ex| ex.input.headers.get(key).is_some_and(|v| v == value));
490                if !found {
491                    panic!(
492                        "MockEndpoint '{}': expected header '{}' = {} not found in any received exchange",
493                        self.name, key, value
494                    );
495                }
496            }
497
498            // Check expected header regexes
499            for (key, pattern) in &guard.expected_header_regexes {
500                let re = regex::Regex::new(pattern).unwrap_or_else(|e| {
501                    panic!(
502                        "MockEndpoint '{}': invalid regex pattern {:?}: {e}",
503                        self.name, pattern
504                    )
505                });
506                let found = received.iter().any(|ex| {
507                    ex.input.headers.get(key).is_some_and(|v| {
508                        let s = match v {
509                            serde_json::Value::String(s) => s.clone(),
510                            other => other.to_string(),
511                        };
512                        re.is_match(&s)
513                    })
514                });
515                if !found {
516                    panic!(
517                        "MockEndpoint '{}': no received exchange has header '{}' matching regex {:?}",
518                        self.name, key, pattern
519                    );
520                }
521            }
522        }
523    }
524
525    /// Return the stored fail-fast error, if any.
526    pub fn fail_fast_error(&self) -> Option<CamelError> {
527        self.fail_fast_error.lock().ok().and_then(|g| g.clone())
528    }
529}
530
531/// Compare two `Body` values for equality (used by assert_satisfied).
532fn body_eq(a: &camel_component_api::Body, b: &camel_component_api::Body) -> bool {
533    match (a, b) {
534        (camel_component_api::Body::Empty, camel_component_api::Body::Empty) => true,
535        (camel_component_api::Body::Text(a), camel_component_api::Body::Text(b)) => a == b,
536        (camel_component_api::Body::Json(a), camel_component_api::Body::Json(b)) => a == b,
537        (camel_component_api::Body::Xml(a), camel_component_api::Body::Xml(b)) => a == b,
538        (camel_component_api::Body::Bytes(a), camel_component_api::Body::Bytes(b)) => a == b,
539        _ => false,
540    }
541}
542
543impl Endpoint for MockEndpoint {
544    fn uri(&self) -> &str {
545        &self.0.uri
546    }
547
548    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
549        Err(CamelError::EndpointCreationFailed(
550            "mock endpoint does not support consumers (it is a sink)".to_string(),
551        ))
552    }
553
554    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
555        Ok(BoxProcessor::new(MockProducer {
556            name: self.0.name.clone(),
557            received: Arc::clone(&self.0.received),
558            notify: Arc::clone(&self.0.notify),
559            max_retained: self.0.max_retained,
560            copy_on_exchange: self.0.copy_on_exchange,
561            fail_fast: self.0.fail_fast,
562            fail_fast_error: Arc::clone(&self.0.fail_fast_error),
563        }))
564    }
565}
566
567// ---------------------------------------------------------------------------
568// MockProducer
569// ---------------------------------------------------------------------------
570
571/// A producer that simply records each exchange it processes.
572#[derive(Clone)]
573struct MockProducer {
574    name: String,
575    received: Arc<Mutex<VecDeque<Exchange>>>,
576    notify: Arc<Notify>,
577    max_retained: usize,
578    copy_on_exchange: bool,
579    fail_fast: bool,
580    fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
581}
582
583impl Service<Exchange> for MockProducer {
584    type Response = Exchange;
585    type Error = CamelError;
586    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
587
588    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
589        // In fail-fast mode, reject new exchanges if a previous one failed
590        if self.fail_fast
591            && let Ok(guard) = self.fail_fast_error.lock()
592            && guard.is_some()
593        {
594            return Poll::Ready(Err(CamelError::ProcessorError(
595                "mock endpoint in fail-fast mode: a previous exchange caused an error".to_string(),
596            )));
597        }
598        Poll::Ready(Ok(()))
599    }
600
601    fn call(&mut self, exchange: Exchange) -> Self::Future {
602        let name = self.name.clone();
603        let received = Arc::clone(&self.received);
604        let notify = Arc::clone(&self.notify);
605        let max_retained = self.max_retained;
606        let copy_on_exchange = self.copy_on_exchange;
607        let fail_fast = self.fail_fast;
608        let fail_fast_error = Arc::clone(&self.fail_fast_error);
609        Box::pin(async move {
610            // In fail-fast mode, check if a previous error was recorded
611            if fail_fast
612                && let Ok(guard) = fail_fast_error.lock()
613                && guard.is_some()
614            {
615                return Err(CamelError::ProcessorError(
616                    "mock endpoint in fail-fast mode: a previous exchange caused an error"
617                        .to_string(),
618                ));
619            }
620
621            let correlation_id = exchange
622                .input
623                .headers
624                .get("CamelCorrelationId")
625                .and_then(|v| v.as_str())
626                .map(|s| s.to_string());
627
628            let exchange_to_store = if copy_on_exchange {
629                let mut cloned = exchange.clone();
630                // Deep-clone the body to break aliasing
631                cloned.input.body = clone_body(&exchange.input.body);
632                cloned
633            } else {
634                exchange.clone()
635            };
636
637            let mut guard = received.lock().await;
638            if guard.len() >= max_retained {
639                tracing::warn!(
640                    endpoint_name = %name,
641                    max = max_retained,
642                    "max retained exchanges reached, dropping oldest"
643                );
644                guard.pop_front();
645            }
646            guard.push_back(exchange_to_store);
647            let count = guard.len();
648            drop(guard);
649
650            debug!(
651                endpoint_name = %name,
652                count = %count,
653                correlation_id = correlation_id.as_deref().unwrap_or("none"),
654                "exchange recorded on mock"
655            );
656            notify.notify_waiters();
657
658            Ok(exchange)
659        })
660    }
661}
662
663/// Deep-clone a `Body` value.
664fn clone_body(body: &camel_component_api::Body) -> camel_component_api::Body {
665    match body {
666        camel_component_api::Body::Empty => camel_component_api::Body::Empty,
667        camel_component_api::Body::Text(s) => camel_component_api::Body::Text(s.clone()),
668        camel_component_api::Body::Json(v) => camel_component_api::Body::Json(v.clone()),
669        camel_component_api::Body::Xml(s) => camel_component_api::Body::Xml(s.clone()),
670        camel_component_api::Body::Bytes(b) => camel_component_api::Body::Bytes(b.clone()),
671        camel_component_api::Body::Stream(_) => {
672            // Streams cannot be cloned; use Empty as fallback
673            camel_component_api::Body::Empty
674        }
675    }
676}
677
678// ---------------------------------------------------------------------------
679// ExchangeAssert
680// ---------------------------------------------------------------------------
681
682/// A handle for making synchronous assertions on a recorded exchange.
683///
684/// Obtain one via [`MockEndpointInner::exchange`] after calling
685/// [`MockEndpointInner::await_exchanges`].
686///
687/// All methods panic with descriptive messages on failure, making test output
688/// self-explanatory without additional context.
689pub struct ExchangeAssert {
690    exchange: Exchange,
691    idx: usize,
692    endpoint_name: String,
693}
694
695impl ExchangeAssert {
696    fn location(&self) -> String {
697        format!(
698            "MockEndpoint '{}' exchange[{}]",
699            self.endpoint_name, self.idx
700        )
701    }
702
703    /// Assert that the body is `Body::Text` equal to `expected`.
704    pub fn assert_body_text(self, expected: &str) -> Self {
705        match self.exchange.input.body.as_text() {
706            Some(actual) if actual == expected => {}
707            Some(actual) => panic!(
708                "{}: expected body text {:?}, got {:?}",
709                self.location(),
710                expected,
711                actual
712            ),
713            None => panic!(
714                "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
715                self.location(),
716                expected,
717                self.exchange.input.body
718            ),
719        }
720        self
721    }
722
723    /// Assert that the body is `Body::Json` equal to `expected`.
724    pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
725        match &self.exchange.input.body {
726            camel_component_api::Body::Json(actual) if *actual == expected => {}
727            camel_component_api::Body::Json(actual) => panic!(
728                "{}: expected body JSON {}, got {}",
729                self.location(),
730                expected,
731                actual
732            ),
733            other => panic!(
734                "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
735                self.location(),
736                expected,
737                other
738            ),
739        }
740        self
741    }
742
743    /// Assert that the body is `Body::Bytes` equal to `expected`.
744    pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
745        match &self.exchange.input.body {
746            camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
747            camel_component_api::Body::Bytes(actual) => panic!(
748                "{}: expected body bytes {:?}, got {:?}",
749                self.location(),
750                expected,
751                actual
752            ),
753            other => panic!(
754                "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
755                self.location(),
756                expected,
757                other
758            ),
759        }
760        self
761    }
762
763    /// Assert that header `key` exists and equals `expected`.
764    ///
765    /// # Panics
766    ///
767    /// Panics if the header is missing or its value does not match `expected`.
768    pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
769        match self.exchange.input.headers.get(key) {
770            Some(actual) if *actual == expected => {}
771            Some(actual) => panic!(
772                "{}: expected header {:?} = {}, got {}",
773                self.location(),
774                key,
775                expected,
776                actual
777            ),
778            None => panic!(
779                "{}: expected header {:?} = {}, but header is absent",
780                self.location(),
781                key,
782                expected
783            ),
784        }
785        self
786    }
787
788    /// Assert that header `key` is present (any value).
789    ///
790    /// # Panics
791    ///
792    /// Panics if the header key is absent.
793    pub fn assert_header_exists(self, key: &str) -> Self {
794        if !self.exchange.input.headers.contains_key(key) {
795            panic!(
796                "{}: expected header {:?} to be present, but it was absent",
797                self.location(),
798                key
799            );
800        }
801        self
802    }
803
804    /// Assert that the exchange has an error (`exchange.error` is `Some`).
805    ///
806    /// # Panics
807    ///
808    /// Panics if `exchange.error` is `None`.
809    pub fn assert_has_error(self) -> Self {
810        if self.exchange.error.is_none() {
811            panic!(
812                "{}: expected exchange to have an error, but error is None",
813                self.location()
814            );
815        }
816        self
817    }
818
819    /// Assert that the exchange has no error (`exchange.error` is `None`).
820    ///
821    /// # Panics
822    ///
823    /// Panics if `exchange.error` is `Some`.
824    pub fn assert_no_error(self) -> Self {
825        if let Some(ref err) = self.exchange.error {
826            panic!(
827                "{}: expected exchange to have no error, but got: {}",
828                self.location(),
829                err
830            );
831        }
832        self
833    }
834}
835
836// ---------------------------------------------------------------------------
837// Tests
838// ---------------------------------------------------------------------------
839
840#[cfg(test)]
841mod tests {
842    use super::*;
843    use camel_component_api::Message;
844    use camel_component_api::NoOpComponentContext;
845    use tower::ServiceExt;
846
847    fn test_producer_ctx() -> ProducerContext {
848        ProducerContext::new()
849    }
850
851    #[test]
852    fn test_mock_component_scheme() {
853        let component = MockComponent::new();
854        assert_eq!(component.scheme(), "mock");
855    }
856
857    #[test]
858    fn test_mock_component_default() {
859        let component = MockComponent::default();
860        assert_eq!(component.scheme(), "mock");
861        assert!(component.get_endpoint("missing").is_none());
862    }
863
864    #[test]
865    fn test_mock_creates_endpoint() {
866        let component = MockComponent::new();
867        let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
868        assert!(endpoint.is_ok());
869    }
870
871    #[test]
872    fn test_mock_wrong_scheme() {
873        let component = MockComponent::new();
874        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
875        assert!(result.is_err());
876    }
877
878    #[test]
879    fn test_empty_mock_endpoint_name_rejected() {
880        let component = MockComponent::new();
881        let result = component.create_endpoint("mock:", &NoOpComponentContext);
882        assert!(result.is_err(), "empty mock name should be rejected");
883    }
884
885    #[test]
886    fn test_valid_mock_endpoint_name_accepted() {
887        let component = MockComponent::new();
888        let result = component.create_endpoint("mock:result", &NoOpComponentContext);
889        assert!(result.is_ok());
890    }
891
892    #[test]
893    fn test_mock_endpoint_no_consumer() {
894        let component = MockComponent::new();
895        let endpoint = component
896            .create_endpoint("mock:result", &NoOpComponentContext)
897            .unwrap();
898        assert!(endpoint.create_consumer().is_err());
899    }
900
901    #[test]
902    fn test_mock_endpoint_creates_producer() {
903        let ctx = test_producer_ctx();
904        let component = MockComponent::new();
905        let endpoint = component
906            .create_endpoint("mock:result", &NoOpComponentContext)
907            .unwrap();
908        assert!(endpoint.create_producer(&ctx).is_ok());
909    }
910
911    #[test]
912    fn test_mock_endpoint_uri() {
913        let component = MockComponent::new();
914        let endpoint = component
915            .create_endpoint("mock:uri-check", &NoOpComponentContext)
916            .unwrap();
917        assert_eq!(endpoint.uri(), "mock:uri-check");
918    }
919
920    #[test]
921    fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
922        let component = MockComponent::new();
923        let _ = component
924            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
925            .unwrap();
926        let _ = component
927            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
928            .unwrap();
929
930        let first = component.get_endpoint("shared-inner").unwrap();
931        let second = component.get_endpoint("shared-inner").unwrap();
932        assert!(Arc::ptr_eq(&first, &second));
933    }
934
935    #[tokio::test]
936    async fn test_mock_producer_records_exchange() {
937        let ctx = test_producer_ctx();
938        let component = MockComponent::new();
939        let endpoint = component
940            .create_endpoint("mock:test", &NoOpComponentContext)
941            .unwrap();
942
943        let mut producer = endpoint.create_producer(&ctx).unwrap();
944
945        let ex1 = Exchange::new(Message::new("first"));
946        let ex2 = Exchange::new(Message::new("second"));
947
948        producer.call(ex1).await.unwrap();
949        producer.call(ex2).await.unwrap();
950
951        let inner = component.get_endpoint("test").unwrap();
952        inner.assert_exchange_count(2).await;
953
954        let received = inner.get_received_exchanges().await;
955        assert_eq!(received[0].input.body.as_text(), Some("first"));
956        assert_eq!(received[1].input.body.as_text(), Some("second"));
957    }
958
959    #[tokio::test]
960    async fn test_mock_producer_passes_through_exchange() {
961        let ctx = test_producer_ctx();
962        let component = MockComponent::new();
963        let endpoint = component
964            .create_endpoint("mock:passthrough", &NoOpComponentContext)
965            .unwrap();
966
967        let producer = endpoint.create_producer(&ctx).unwrap();
968        let exchange = Exchange::new(Message::new("hello"));
969        let result = producer.oneshot(exchange).await.unwrap();
970
971        // Producer should return the exchange unchanged
972        assert_eq!(result.input.body.as_text(), Some("hello"));
973    }
974
975    #[tokio::test]
976    async fn test_mock_assert_count_passes() {
977        let component = MockComponent::new();
978        let endpoint = component
979            .create_endpoint("mock:count", &NoOpComponentContext)
980            .unwrap();
981        let inner = component.get_endpoint("count").unwrap();
982
983        inner.assert_exchange_count(0).await;
984
985        let ctx = test_producer_ctx();
986        let mut producer = endpoint.create_producer(&ctx).unwrap();
987        producer
988            .call(Exchange::new(Message::new("one")))
989            .await
990            .unwrap();
991
992        inner.assert_exchange_count(1).await;
993    }
994
995    #[tokio::test]
996    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
997    async fn test_mock_assert_count_fails() {
998        let component = MockComponent::new();
999        // Endpoint not created yet, so get_endpoint returns None.
1000        // Create it first, then assert.
1001        let _endpoint = component
1002            .create_endpoint("mock:fail", &NoOpComponentContext)
1003            .unwrap();
1004        let inner = component.get_endpoint("fail").unwrap();
1005
1006        inner.assert_exchange_count(5).await;
1007    }
1008
1009    #[tokio::test]
1010    async fn test_mock_component_shared_registry() {
1011        let component = MockComponent::new();
1012        let ep1 = component
1013            .create_endpoint("mock:shared", &NoOpComponentContext)
1014            .unwrap();
1015        let ep2 = component
1016            .create_endpoint("mock:shared", &NoOpComponentContext)
1017            .unwrap();
1018
1019        // Producing via ep1's producer...
1020        let ctx = test_producer_ctx();
1021        let mut p1 = ep1.create_producer(&ctx).unwrap();
1022        p1.call(Exchange::new(Message::new("from-ep1")))
1023            .await
1024            .unwrap();
1025
1026        // ...and via ep2's producer...
1027        let mut p2 = ep2.create_producer(&ctx).unwrap();
1028        p2.call(Exchange::new(Message::new("from-ep2")))
1029            .await
1030            .unwrap();
1031
1032        // ...both should be visible via the shared storage
1033        let inner = component.get_endpoint("shared").unwrap();
1034        inner.assert_exchange_count(2).await;
1035
1036        let received = inner.get_received_exchanges().await;
1037        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
1038        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
1039    }
1040
1041    #[tokio::test]
1042    async fn await_exchanges_resolves_immediately() {
1043        // If exchanges are already present, await_exchanges returns without timeout.
1044        let ctx = test_producer_ctx();
1045        let component = MockComponent::new();
1046        let endpoint = component
1047            .create_endpoint("mock:immediate", &NoOpComponentContext)
1048            .unwrap();
1049        let inner = component.get_endpoint("immediate").unwrap();
1050
1051        let mut producer = endpoint.create_producer(&ctx).unwrap();
1052        producer
1053            .call(Exchange::new(Message::new("a")))
1054            .await
1055            .unwrap();
1056        producer
1057            .call(Exchange::new(Message::new("b")))
1058            .await
1059            .unwrap();
1060
1061        // Should return immediately — both exchanges already received.
1062        inner
1063            .await_exchanges(2, std::time::Duration::from_millis(100))
1064            .await;
1065    }
1066
1067    #[tokio::test]
1068    async fn await_exchanges_waits_then_resolves() {
1069        // await_exchanges unblocks when a producer sends after the call.
1070        let ctx = test_producer_ctx();
1071        let component = MockComponent::new();
1072        let endpoint = component
1073            .create_endpoint("mock:waiter", &NoOpComponentContext)
1074            .unwrap();
1075        let inner = component.get_endpoint("waiter").unwrap();
1076
1077        // Spawn producer that sends after a short delay.
1078        let mut producer = endpoint.create_producer(&ctx).unwrap();
1079        tokio::spawn(async move {
1080            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1081            producer
1082                .call(Exchange::new(Message::new("delayed")))
1083                .await
1084                .unwrap();
1085        });
1086
1087        // This should block until the spawned task delivers the exchange.
1088        inner
1089            .await_exchanges(1, std::time::Duration::from_millis(500))
1090            .await;
1091
1092        let received = inner.get_received_exchanges().await;
1093        assert_eq!(received.len(), 1);
1094        assert_eq!(received[0].input.body.as_text(), Some("delayed"));
1095    }
1096
1097    #[tokio::test]
1098    #[should_panic(expected = "timed out waiting for 5 exchanges")]
1099    async fn await_exchanges_times_out() {
1100        let component = MockComponent::new();
1101        let _endpoint = component
1102            .create_endpoint("mock:timeout", &NoOpComponentContext)
1103            .unwrap();
1104        let inner = component.get_endpoint("timeout").unwrap();
1105
1106        // Nobody sends — should panic after timeout.
1107        inner
1108            .await_exchanges(5, std::time::Duration::from_millis(50))
1109            .await;
1110    }
1111
1112    #[tokio::test(flavor = "multi_thread")]
1113    async fn exchange_idx_returns_assert() {
1114        let ctx = test_producer_ctx();
1115        let component = MockComponent::new();
1116        let endpoint = component
1117            .create_endpoint("mock:assert-idx", &NoOpComponentContext)
1118            .unwrap();
1119        let inner = component.get_endpoint("assert-idx").unwrap();
1120
1121        let mut producer = endpoint.create_producer(&ctx).unwrap();
1122        producer
1123            .call(Exchange::new(Message::new("hello")))
1124            .await
1125            .unwrap();
1126
1127        inner
1128            .await_exchanges(1, std::time::Duration::from_millis(500))
1129            .await;
1130        // Should not panic — index 0 exists.
1131        let _assert = inner.exchange(0);
1132    }
1133
1134    #[tokio::test(flavor = "multi_thread")]
1135    #[should_panic(expected = "exchange index 5 out of bounds")]
1136    async fn exchange_idx_out_of_bounds() {
1137        let ctx = test_producer_ctx();
1138        let component = MockComponent::new();
1139        let endpoint = component
1140            .create_endpoint("mock:oob", &NoOpComponentContext)
1141            .unwrap();
1142        let inner = component.get_endpoint("oob").unwrap();
1143
1144        let mut producer = endpoint.create_producer(&ctx).unwrap();
1145        producer
1146            .call(Exchange::new(Message::new("only-one")))
1147            .await
1148            .unwrap();
1149
1150        inner
1151            .await_exchanges(1, std::time::Duration::from_millis(500))
1152            .await;
1153        // Only 1 exchange, index 5 should panic.
1154        let _assert = inner.exchange(5);
1155    }
1156
1157    #[tokio::test(flavor = "multi_thread")]
1158    async fn assert_body_text_pass() {
1159        let ctx = test_producer_ctx();
1160        let component = MockComponent::new();
1161        let endpoint = component
1162            .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
1163            .unwrap();
1164        let inner = component.get_endpoint("body-text-pass").unwrap();
1165        let mut producer = endpoint.create_producer(&ctx).unwrap();
1166        producer
1167            .call(Exchange::new(Message::new("hello")))
1168            .await
1169            .unwrap();
1170        inner
1171            .await_exchanges(1, std::time::Duration::from_millis(500))
1172            .await;
1173        inner.exchange(0).assert_body_text("hello");
1174    }
1175
1176    #[tokio::test(flavor = "multi_thread")]
1177    #[should_panic(expected = "expected body text")]
1178    async fn assert_body_text_fail() {
1179        let ctx = test_producer_ctx();
1180        let component = MockComponent::new();
1181        let endpoint = component
1182            .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
1183            .unwrap();
1184        let inner = component.get_endpoint("body-text-fail").unwrap();
1185        let mut producer = endpoint.create_producer(&ctx).unwrap();
1186        producer
1187            .call(Exchange::new(Message::new("hello")))
1188            .await
1189            .unwrap();
1190        inner
1191            .await_exchanges(1, std::time::Duration::from_millis(500))
1192            .await;
1193        inner.exchange(0).assert_body_text("world");
1194    }
1195
1196    #[tokio::test(flavor = "multi_thread")]
1197    async fn assert_body_json_pass() {
1198        use camel_component_api::Body;
1199        let ctx = test_producer_ctx();
1200        let component = MockComponent::new();
1201        let endpoint = component
1202            .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
1203            .unwrap();
1204        let inner = component.get_endpoint("body-json-pass").unwrap();
1205        let mut producer = endpoint.create_producer(&ctx).unwrap();
1206        let mut msg = Message::new("");
1207        msg.body = Body::Json(serde_json::json!({"key": "value"}));
1208        producer.call(Exchange::new(msg)).await.unwrap();
1209        inner
1210            .await_exchanges(1, std::time::Duration::from_millis(500))
1211            .await;
1212        inner
1213            .exchange(0)
1214            .assert_body_json(serde_json::json!({"key": "value"}));
1215    }
1216
1217    #[tokio::test(flavor = "multi_thread")]
1218    #[should_panic(expected = "expected body JSON")]
1219    async fn assert_body_json_fail() {
1220        use camel_component_api::Body;
1221        let ctx = test_producer_ctx();
1222        let component = MockComponent::new();
1223        let endpoint = component
1224            .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
1225            .unwrap();
1226        let inner = component.get_endpoint("body-json-fail").unwrap();
1227        let mut producer = endpoint.create_producer(&ctx).unwrap();
1228        let mut msg = Message::new("");
1229        msg.body = Body::Json(serde_json::json!({"key": "value"}));
1230        producer.call(Exchange::new(msg)).await.unwrap();
1231        inner
1232            .await_exchanges(1, std::time::Duration::from_millis(500))
1233            .await;
1234        inner
1235            .exchange(0)
1236            .assert_body_json(serde_json::json!({"key": "other"}));
1237    }
1238
1239    #[tokio::test(flavor = "multi_thread")]
1240    async fn assert_body_bytes_pass() {
1241        use bytes::Bytes;
1242        use camel_component_api::Body;
1243        let ctx = test_producer_ctx();
1244        let component = MockComponent::new();
1245        let endpoint = component
1246            .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
1247            .unwrap();
1248        let inner = component.get_endpoint("body-bytes-pass").unwrap();
1249        let mut producer = endpoint.create_producer(&ctx).unwrap();
1250        let mut msg = Message::new("");
1251        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1252        producer.call(Exchange::new(msg)).await.unwrap();
1253        inner
1254            .await_exchanges(1, std::time::Duration::from_millis(500))
1255            .await;
1256        inner.exchange(0).assert_body_bytes(b"binary");
1257    }
1258
1259    #[tokio::test(flavor = "multi_thread")]
1260    #[should_panic(expected = "expected body bytes")]
1261    async fn assert_body_bytes_fail() {
1262        use bytes::Bytes;
1263        use camel_component_api::Body;
1264        let ctx = test_producer_ctx();
1265        let component = MockComponent::new();
1266        let endpoint = component
1267            .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
1268            .unwrap();
1269        let inner = component.get_endpoint("body-bytes-fail").unwrap();
1270        let mut producer = endpoint.create_producer(&ctx).unwrap();
1271        let mut msg = Message::new("");
1272        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1273        producer.call(Exchange::new(msg)).await.unwrap();
1274        inner
1275            .await_exchanges(1, std::time::Duration::from_millis(500))
1276            .await;
1277        inner.exchange(0).assert_body_bytes(b"different");
1278    }
1279
1280    #[tokio::test(flavor = "multi_thread")]
1281    async fn assert_header_pass() {
1282        let ctx = test_producer_ctx();
1283        let component = MockComponent::new();
1284        let endpoint = component
1285            .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
1286            .unwrap();
1287        let inner = component.get_endpoint("hdr-pass").unwrap();
1288        let mut producer = endpoint.create_producer(&ctx).unwrap();
1289        let mut msg = Message::new("body");
1290        msg.headers
1291            .insert("x-key".to_string(), serde_json::json!("value"));
1292        producer.call(Exchange::new(msg)).await.unwrap();
1293        inner
1294            .await_exchanges(1, std::time::Duration::from_millis(500))
1295            .await;
1296        inner
1297            .exchange(0)
1298            .assert_header("x-key", serde_json::json!("value"));
1299    }
1300
1301    #[tokio::test(flavor = "multi_thread")]
1302    #[should_panic(expected = "expected header")]
1303    async fn assert_header_fail() {
1304        let ctx = test_producer_ctx();
1305        let component = MockComponent::new();
1306        let endpoint = component
1307            .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
1308            .unwrap();
1309        let inner = component.get_endpoint("hdr-fail").unwrap();
1310        let mut producer = endpoint.create_producer(&ctx).unwrap();
1311        let mut msg = Message::new("body");
1312        msg.headers
1313            .insert("x-key".to_string(), serde_json::json!("value"));
1314        producer.call(Exchange::new(msg)).await.unwrap();
1315        inner
1316            .await_exchanges(1, std::time::Duration::from_millis(500))
1317            .await;
1318        inner
1319            .exchange(0)
1320            .assert_header("x-key", serde_json::json!("other"));
1321    }
1322
1323    #[tokio::test(flavor = "multi_thread")]
1324    async fn assert_header_exists_pass() {
1325        let ctx = test_producer_ctx();
1326        let component = MockComponent::new();
1327        let endpoint = component
1328            .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
1329            .unwrap();
1330        let inner = component.get_endpoint("hdr-exists-pass").unwrap();
1331        let mut producer = endpoint.create_producer(&ctx).unwrap();
1332        let mut msg = Message::new("body");
1333        msg.headers
1334            .insert("x-present".to_string(), serde_json::json!(42));
1335        producer.call(Exchange::new(msg)).await.unwrap();
1336        inner
1337            .await_exchanges(1, std::time::Duration::from_millis(500))
1338            .await;
1339        inner.exchange(0).assert_header_exists("x-present");
1340    }
1341
1342    #[tokio::test(flavor = "multi_thread")]
1343    #[should_panic(expected = "expected header")]
1344    async fn assert_header_exists_fail() {
1345        let ctx = test_producer_ctx();
1346        let component = MockComponent::new();
1347        let endpoint = component
1348            .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
1349            .unwrap();
1350        let inner = component.get_endpoint("hdr-exists-fail").unwrap();
1351        let mut producer = endpoint.create_producer(&ctx).unwrap();
1352        producer
1353            .call(Exchange::new(Message::new("body")))
1354            .await
1355            .unwrap();
1356        inner
1357            .await_exchanges(1, std::time::Duration::from_millis(500))
1358            .await;
1359        inner.exchange(0).assert_header_exists("x-missing");
1360    }
1361
1362    #[tokio::test(flavor = "multi_thread")]
1363    async fn assert_has_error_pass() {
1364        let ctx = test_producer_ctx();
1365        let component = MockComponent::new();
1366        let endpoint = component
1367            .create_endpoint("mock:err-pass", &NoOpComponentContext)
1368            .unwrap();
1369        let inner = component.get_endpoint("err-pass").unwrap();
1370        let mut producer = endpoint.create_producer(&ctx).unwrap();
1371        let mut ex = Exchange::new(Message::new("body"));
1372        ex.set_error(camel_component_api::CamelError::ProcessorError(
1373            "oops".to_string(),
1374        ));
1375        producer.call(ex).await.unwrap();
1376        inner
1377            .await_exchanges(1, std::time::Duration::from_millis(500))
1378            .await;
1379        inner.exchange(0).assert_has_error();
1380    }
1381
1382    #[tokio::test(flavor = "multi_thread")]
1383    #[should_panic(expected = "expected exchange to have an error")]
1384    async fn assert_has_error_fail() {
1385        let ctx = test_producer_ctx();
1386        let component = MockComponent::new();
1387        let endpoint = component
1388            .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
1389            .unwrap();
1390        let inner = component.get_endpoint("has-err-fail").unwrap();
1391        let mut producer = endpoint.create_producer(&ctx).unwrap();
1392        producer
1393            .call(Exchange::new(Message::new("body")))
1394            .await
1395            .unwrap();
1396        inner
1397            .await_exchanges(1, std::time::Duration::from_millis(500))
1398            .await;
1399        inner.exchange(0).assert_has_error();
1400    }
1401
1402    #[tokio::test(flavor = "multi_thread")]
1403    async fn assert_no_error_pass() {
1404        let ctx = test_producer_ctx();
1405        let component = MockComponent::new();
1406        let endpoint = component
1407            .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
1408            .unwrap();
1409        let inner = component.get_endpoint("no-err-pass").unwrap();
1410        let mut producer = endpoint.create_producer(&ctx).unwrap();
1411        producer
1412            .call(Exchange::new(Message::new("body")))
1413            .await
1414            .unwrap();
1415        inner
1416            .await_exchanges(1, std::time::Duration::from_millis(500))
1417            .await;
1418        inner.exchange(0).assert_no_error();
1419    }
1420
1421    // -----------------------------------------------------------------------
1422    // A-13: reset() and bounded retention tests
1423    // -----------------------------------------------------------------------
1424
1425    #[tokio::test]
1426    async fn test_mock_reset_clears_exchanges() {
1427        let component = MockComponent::new();
1428        let endpoint = component
1429            .create_endpoint("mock:reset-test", &NoOpComponentContext)
1430            .unwrap();
1431        let inner = component.get_endpoint("reset-test").unwrap();
1432
1433        let ctx = test_producer_ctx();
1434        let mut producer = endpoint.create_producer(&ctx).unwrap();
1435        producer
1436            .call(Exchange::new(Message::new("a")))
1437            .await
1438            .unwrap();
1439        producer
1440            .call(Exchange::new(Message::new("b")))
1441            .await
1442            .unwrap();
1443
1444        assert_eq!(inner.received_count().await, 2);
1445        inner.reset().await;
1446        assert_eq!(inner.received_count().await, 0);
1447    }
1448
1449    #[tokio::test]
1450    async fn test_mock_bounded_retention_drops_oldest() {
1451        let config = MockConfig {
1452            max_retained: 3,
1453            ..Default::default()
1454        };
1455        let component = MockComponent::with_config(config);
1456        let endpoint = component
1457            .create_endpoint("mock:bounded", &NoOpComponentContext)
1458            .unwrap();
1459        let inner = component.get_endpoint("bounded").unwrap();
1460
1461        let ctx = test_producer_ctx();
1462        let mut producer = endpoint.create_producer(&ctx).unwrap();
1463
1464        // Send 5 exchanges, but max_retained is 3
1465        for i in 0..5 {
1466            producer
1467                .call(Exchange::new(Message::new(format!("msg-{i}"))))
1468                .await
1469                .unwrap();
1470        }
1471
1472        assert_eq!(inner.received_count().await, 3);
1473        let received = inner.get_received_exchanges().await;
1474        // Oldest (msg-0, msg-1) should be dropped
1475        assert_eq!(received[0].input.body.as_text(), Some("msg-2"));
1476        assert_eq!(received[1].input.body.as_text(), Some("msg-3"));
1477        assert_eq!(received[2].input.body.as_text(), Some("msg-4"));
1478    }
1479
1480    #[tokio::test]
1481    async fn test_mock_reset_then_record_again() {
1482        let component = MockComponent::new();
1483        let endpoint = component
1484            .create_endpoint("mock:reset-reuse", &NoOpComponentContext)
1485            .unwrap();
1486        let inner = component.get_endpoint("reset-reuse").unwrap();
1487
1488        let ctx = test_producer_ctx();
1489        let mut producer = endpoint.create_producer(&ctx).unwrap();
1490        producer
1491            .call(Exchange::new(Message::new("before-reset")))
1492            .await
1493            .unwrap();
1494        inner.reset().await;
1495
1496        producer
1497            .call(Exchange::new(Message::new("after-reset")))
1498            .await
1499            .unwrap();
1500
1501        let received = inner.get_received_exchanges().await;
1502        assert_eq!(received.len(), 1);
1503        assert_eq!(received[0].input.body.as_text(), Some("after-reset"));
1504    }
1505
1506    #[tokio::test(flavor = "multi_thread")]
1507    #[should_panic(expected = "expected exchange to have no error")]
1508    async fn assert_no_error_fail() {
1509        let ctx = test_producer_ctx();
1510        let component = MockComponent::new();
1511        let endpoint = component
1512            .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
1513            .unwrap();
1514        let inner = component.get_endpoint("no-err-fail").unwrap();
1515        let mut producer = endpoint.create_producer(&ctx).unwrap();
1516        let mut ex = Exchange::new(Message::new("body"));
1517        ex.set_error(camel_component_api::CamelError::ProcessorError(
1518            "oops".to_string(),
1519        ));
1520        producer.call(ex).await.unwrap();
1521        inner
1522            .await_exchanges(1, std::time::Duration::from_millis(500))
1523            .await;
1524        inner.exchange(0).assert_no_error();
1525    }
1526
1527    // -----------------------------------------------------------------------
1528    // MOCK-003: copy_on_exchange tests
1529    // -----------------------------------------------------------------------
1530
1531    #[tokio::test]
1532    async fn test_copy_on_exchange_stores_cloned_body() {
1533        let config = MockConfig {
1534            copy_on_exchange: true,
1535            ..Default::default()
1536        };
1537        let component = MockComponent::with_config(config);
1538        let endpoint = component
1539            .create_endpoint("mock:copy", &NoOpComponentContext)
1540            .unwrap();
1541        let inner = component.get_endpoint("copy").unwrap();
1542
1543        let ctx = test_producer_ctx();
1544        let mut producer = endpoint.create_producer(&ctx).unwrap();
1545
1546        let mut msg = Message::new("original");
1547        msg.headers.insert("x-test".into(), serde_json::json!(1));
1548        let ex = Exchange::new(msg);
1549        producer.call(ex).await.unwrap();
1550
1551        let received = inner.get_received_exchanges().await;
1552        assert_eq!(received[0].input.body.as_text(), Some("original"));
1553    }
1554
1555    #[tokio::test]
1556    async fn test_copy_on_exchange_false_shares_storage() {
1557        let config = MockConfig {
1558            copy_on_exchange: false,
1559            ..Default::default()
1560        };
1561        let component = MockComponent::with_config(config);
1562        let endpoint = component
1563            .create_endpoint("mock:no-copy", &NoOpComponentContext)
1564            .unwrap();
1565        let inner = component.get_endpoint("no-copy").unwrap();
1566
1567        let ctx = test_producer_ctx();
1568        let mut producer = endpoint.create_producer(&ctx).unwrap();
1569
1570        producer
1571            .call(Exchange::new(Message::new("direct")))
1572            .await
1573            .unwrap();
1574
1575        let received = inner.get_received_exchanges().await;
1576        assert_eq!(received[0].input.body.as_text(), Some("direct"));
1577    }
1578
1579    // -----------------------------------------------------------------------
1580    // MOCK-004: expect_body / expect_header / assert_satisfied tests
1581    // -----------------------------------------------------------------------
1582
1583    #[tokio::test]
1584    async fn test_assert_satisfied_bodies_in_order() {
1585        let component = MockComponent::new();
1586        let endpoint = component
1587            .create_endpoint("mock:sat-bodies", &NoOpComponentContext)
1588            .unwrap();
1589        let inner = component.get_endpoint("sat-bodies").unwrap();
1590
1591        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1592        inner.expect_body(camel_component_api::Body::Text("beta".into()));
1593
1594        let ctx = test_producer_ctx();
1595        let mut producer = endpoint.create_producer(&ctx).unwrap();
1596        producer
1597            .call(Exchange::new(Message::new("alpha")))
1598            .await
1599            .unwrap();
1600        producer
1601            .call(Exchange::new(Message::new("beta")))
1602            .await
1603            .unwrap();
1604
1605        inner.assert_satisfied().await;
1606    }
1607
1608    #[tokio::test]
1609    #[should_panic(expected = "body[0] expected")]
1610    async fn test_assert_satisfied_bodies_wrong_order_fails() {
1611        let component = MockComponent::new();
1612        let endpoint = component
1613            .create_endpoint("mock:sat-bodies-fail", &NoOpComponentContext)
1614            .unwrap();
1615        let inner = component.get_endpoint("sat-bodies-fail").unwrap();
1616
1617        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1618        inner.expect_body(camel_component_api::Body::Text("beta".into()));
1619
1620        let ctx = test_producer_ctx();
1621        let mut producer = endpoint.create_producer(&ctx).unwrap();
1622        producer
1623            .call(Exchange::new(Message::new("beta")))
1624            .await
1625            .unwrap();
1626        producer
1627            .call(Exchange::new(Message::new("alpha")))
1628            .await
1629            .unwrap();
1630
1631        inner.assert_satisfied().await;
1632    }
1633
1634    #[tokio::test]
1635    async fn test_assert_satisfied_headers() {
1636        let component = MockComponent::new();
1637        let endpoint = component
1638            .create_endpoint("mock:sat-hdr", &NoOpComponentContext)
1639            .unwrap();
1640        let inner = component.get_endpoint("sat-hdr").unwrap();
1641
1642        inner.expect_header("status", serde_json::json!("ok"));
1643
1644        let ctx = test_producer_ctx();
1645        let mut producer = endpoint.create_producer(&ctx).unwrap();
1646        let mut msg = Message::new("body");
1647        msg.headers.insert("status".into(), serde_json::json!("ok"));
1648        producer.call(Exchange::new(msg)).await.unwrap();
1649
1650        inner.assert_satisfied().await;
1651    }
1652
1653    #[tokio::test]
1654    #[should_panic(expected = "expected header 'missing' =")]
1655    async fn test_assert_satisfied_headers_missing() {
1656        let component = MockComponent::new();
1657        let endpoint = component
1658            .create_endpoint("mock:sat-hdr-missing", &NoOpComponentContext)
1659            .unwrap();
1660        let inner = component.get_endpoint("sat-hdr-missing").unwrap();
1661
1662        inner.expect_header("missing", serde_json::json!("value"));
1663
1664        let ctx = test_producer_ctx();
1665        let mut producer = endpoint.create_producer(&ctx).unwrap();
1666        producer
1667            .call(Exchange::new(Message::new("body")))
1668            .await
1669            .unwrap();
1670
1671        inner.assert_satisfied().await;
1672    }
1673
1674    // -----------------------------------------------------------------------
1675    // MOCK-005: fail_fast tests
1676    // -----------------------------------------------------------------------
1677
1678    #[tokio::test]
1679    async fn test_fail_fast_rejects_after_first_call() {
1680        let config = MockConfig {
1681            fail_fast: true,
1682            ..Default::default()
1683        };
1684        let component = MockComponent::with_config(config);
1685        let endpoint = component
1686            .create_endpoint("mock:ff", &NoOpComponentContext)
1687            .unwrap();
1688
1689        let ctx = test_producer_ctx();
1690        let mut producer = endpoint.create_producer(&ctx).unwrap();
1691
1692        // First call succeeds
1693        producer
1694            .call(Exchange::new(Message::new("ok")))
1695            .await
1696            .unwrap();
1697    }
1698
1699    #[tokio::test]
1700    async fn test_fail_fast_no_error_when_all_good() {
1701        let config = MockConfig {
1702            fail_fast: true,
1703            ..Default::default()
1704        };
1705        let component = MockComponent::with_config(config);
1706        let endpoint = component
1707            .create_endpoint("mock:ff-good", &NoOpComponentContext)
1708            .unwrap();
1709        let inner = component.get_endpoint("ff-good").unwrap();
1710
1711        let ctx = test_producer_ctx();
1712        let mut producer = endpoint.create_producer(&ctx).unwrap();
1713
1714        producer
1715            .call(Exchange::new(Message::new("a")))
1716            .await
1717            .unwrap();
1718        producer
1719            .call(Exchange::new(Message::new("b")))
1720            .await
1721            .unwrap();
1722
1723        assert!(inner.fail_fast_error().is_none());
1724        inner.assert_exchange_count(2).await;
1725    }
1726
1727    // -----------------------------------------------------------------------
1728    // MOCK-008: await_exchanges_with_timeout tests
1729    // -----------------------------------------------------------------------
1730
1731    #[tokio::test]
1732    async fn test_await_exchanges_with_timeout_uses_config_period() {
1733        let config = MockConfig {
1734            assert_period_ms: 100,
1735            ..Default::default()
1736        };
1737        let component = MockComponent::with_config(config);
1738        let endpoint = component
1739            .create_endpoint("mock:ap", &NoOpComponentContext)
1740            .unwrap();
1741        let inner = component.get_endpoint("ap").unwrap();
1742
1743        let ctx = test_producer_ctx();
1744        let mut producer = endpoint.create_producer(&ctx).unwrap();
1745        producer
1746            .call(Exchange::new(Message::new("x")))
1747            .await
1748            .unwrap();
1749
1750        inner
1751            .await_exchanges_with_timeout(1, std::time::Duration::from_millis(1))
1752            .await;
1753    }
1754
1755    #[tokio::test]
1756    async fn test_await_exchanges_with_timeout_uses_fallback_when_zero() {
1757        let config = MockConfig {
1758            assert_period_ms: 0,
1759            ..Default::default()
1760        };
1761        let component = MockComponent::with_config(config);
1762        let endpoint = component
1763            .create_endpoint("mock:ap-fb", &NoOpComponentContext)
1764            .unwrap();
1765        let inner = component.get_endpoint("ap-fb").unwrap();
1766
1767        let ctx = test_producer_ctx();
1768        let mut producer = endpoint.create_producer(&ctx).unwrap();
1769        producer
1770            .call(Exchange::new(Message::new("y")))
1771            .await
1772            .unwrap();
1773
1774        inner
1775            .await_exchanges_with_timeout(1, std::time::Duration::from_millis(200))
1776            .await;
1777    }
1778
1779    // -----------------------------------------------------------------------
1780    // MOCK-009: expect_header_regex tests
1781    // -----------------------------------------------------------------------
1782
1783    #[tokio::test]
1784    async fn test_expect_header_regex_match() {
1785        let component = MockComponent::new();
1786        let endpoint = component
1787            .create_endpoint("mock:re-hdr", &NoOpComponentContext)
1788            .unwrap();
1789        let inner = component.get_endpoint("re-hdr").unwrap();
1790
1791        inner.expect_header_regex("x-trace-id", r"^[a-f0-9]{8}$");
1792
1793        let ctx = test_producer_ctx();
1794        let mut producer = endpoint.create_producer(&ctx).unwrap();
1795        let mut msg = Message::new("body");
1796        msg.headers
1797            .insert("x-trace-id".into(), serde_json::json!("deadbeef"));
1798        producer.call(Exchange::new(msg)).await.unwrap();
1799
1800        inner.assert_satisfied().await;
1801    }
1802
1803    #[tokio::test]
1804    #[should_panic(expected = "no received exchange has header")]
1805    async fn test_expect_header_regex_no_match() {
1806        let component = MockComponent::new();
1807        let endpoint = component
1808            .create_endpoint("mock:re-hdr-fail", &NoOpComponentContext)
1809            .unwrap();
1810        let inner = component.get_endpoint("re-hdr-fail").unwrap();
1811
1812        inner.expect_header_regex("x-trace-id", r"^\d+$");
1813
1814        let ctx = test_producer_ctx();
1815        let mut producer = endpoint.create_producer(&ctx).unwrap();
1816        let mut msg = Message::new("body");
1817        msg.headers
1818            .insert("x-trace-id".into(), serde_json::json!("abc"));
1819        producer.call(Exchange::new(msg)).await.unwrap();
1820
1821        inner.assert_satisfied().await;
1822    }
1823
1824    // -----------------------------------------------------------------------
1825    // MOCK-010: any_order tests
1826    // -----------------------------------------------------------------------
1827
1828    #[tokio::test]
1829    async fn test_any_order_bodies_match() {
1830        let config = MockConfig {
1831            any_order: true,
1832            ..Default::default()
1833        };
1834        let component = MockComponent::with_config(config);
1835        let endpoint = component
1836            .create_endpoint("mock:anyorder", &NoOpComponentContext)
1837            .unwrap();
1838        let inner = component.get_endpoint("anyorder").unwrap();
1839
1840        inner.expect_body(camel_component_api::Body::Text("beta".into()));
1841        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1842
1843        let ctx = test_producer_ctx();
1844        let mut producer = endpoint.create_producer(&ctx).unwrap();
1845        producer
1846            .call(Exchange::new(Message::new("alpha")))
1847            .await
1848            .unwrap();
1849        producer
1850            .call(Exchange::new(Message::new("beta")))
1851            .await
1852            .unwrap();
1853
1854        inner.assert_satisfied().await;
1855    }
1856
1857    #[tokio::test]
1858    #[should_panic(expected = "not found in received exchanges (anyOrder mode)")]
1859    async fn test_any_order_bodies_missing() {
1860        let config = MockConfig {
1861            any_order: true,
1862            ..Default::default()
1863        };
1864        let component = MockComponent::with_config(config);
1865        let endpoint = component
1866            .create_endpoint("mock:anyorder-fail", &NoOpComponentContext)
1867            .unwrap();
1868        let inner = component.get_endpoint("anyorder-fail").unwrap();
1869
1870        inner.expect_body(camel_component_api::Body::Text("gamma".into()));
1871        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1872
1873        let ctx = test_producer_ctx();
1874        let mut producer = endpoint.create_producer(&ctx).unwrap();
1875        producer
1876            .call(Exchange::new(Message::new("alpha")))
1877            .await
1878            .unwrap();
1879        producer
1880            .call(Exchange::new(Message::new("beta")))
1881            .await
1882            .unwrap();
1883
1884        inner.assert_satisfied().await;
1885    }
1886
1887    // -----------------------------------------------------------------------
1888    // MOCK-012: tracing instrumentation tests (compilation + basic)
1889    // -----------------------------------------------------------------------
1890
1891    #[tokio::test]
1892    async fn test_tracing_logs_exchange_received() {
1893        // Verify the producer doesn't panic and the debug trace fires
1894        let ctx = test_producer_ctx();
1895        let component = MockComponent::new();
1896        let endpoint = component
1897            .create_endpoint("mock:trace", &NoOpComponentContext)
1898            .unwrap();
1899        let mut producer = endpoint.create_producer(&ctx).unwrap();
1900        producer
1901            .call(Exchange::new(Message::new("traced")))
1902            .await
1903            .unwrap();
1904
1905        let inner = component.get_endpoint("trace").unwrap();
1906        inner.assert_exchange_count(1).await;
1907    }
1908
1909    // -----------------------------------------------------------------------
1910    // MOCK-006 / MOCK-007: doctest exists on MockConfig
1911    // -----------------------------------------------------------------------
1912
1913    #[test]
1914    fn test_mock_config_new() {
1915        let cfg = MockConfig::new(42);
1916        assert_eq!(cfg.max_retained, 42);
1917        assert!(!cfg.copy_on_exchange);
1918        assert!(!cfg.fail_fast);
1919        assert!(!cfg.any_order);
1920    }
1921}