Skip to main content

camel_component_mock/
lib.rs

1//! # camel-component-mock
2//!
3//! Mock component for rust-camel — testing utility that records received
4//! exchanges for later assertion, useful for verifying route output in tests.
5//!
6//! Main types: `MockComponent`, `MockEndpoint`, `MockProducer`, `MockExpectations`.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use camel_component_mock::MockComponent;
12//! use camel_component_api::{Component, NoOpComponentContext, Exchange, Message};
13//!
14//! // Create a mock component and endpoint
15//! let component = MockComponent::new();
16//! let endpoint = component
17//!     .create_endpoint("mock:result", &NoOpComponentContext)
18//!     .unwrap();
19//!
20//! // In a real route, the producer would be used as a Tower service.
21//! // After sending exchanges, you can inspect them:
22//! let inner = component.get_endpoint("result").unwrap();
23//! // inner.assert_exchange_count(1).await;
24//! // inner.exchange(0).assert_body_text("hello");
25//! ```
26
27use std::collections::{HashMap, VecDeque};
28use std::future::Future;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use tokio::sync::{Mutex, Notify};
34use tower::Service;
35
36use camel_component_api::parse_uri;
37use camel_component_api::{BoxProcessor, CamelError, Exchange};
38use camel_component_api::{Component, Consumer, Endpoint, ProducerContext, RuntimeObservability};
39use tracing::debug;
40
41/// Default maximum number of exchanges retained by a mock endpoint.
42const DEFAULT_MAX_RETAINED: usize = 10_000;
43
44// ---------------------------------------------------------------------------
45// MockConfig
46// ---------------------------------------------------------------------------
47
48/// Configuration for [`MockComponent`].
49///
50/// Controls how many exchanges are retained before the oldest are dropped,
51/// and other behavioural flags for assertions.
52///
53/// # Examples
54///
55/// ```rust
56/// use camel_component_mock::MockConfig;
57///
58/// let config = MockConfig {
59///     max_retained: 100,
60///     copy_on_exchange: true,
61///     fail_fast: false,
62///     assert_period_ms: 0,
63///     any_order: false,
64/// };
65/// ```
66#[derive(Clone, Debug)]
67pub struct MockConfig {
68    /// Maximum number of exchanges to retain. When exceeded, the oldest
69    /// exchange is dropped. Defaults to 10 000.
70    pub max_retained: usize,
71    /// When `true`, clone the exchange body before storing it in the received
72    /// exchanges list. This prevents aliasing when the caller mutates the
73    /// original exchange after sending. Defaults to `false`.
74    pub copy_on_exchange: bool,
75    /// When `true`, after the first failing assertion the mock stops processing
76    /// exchanges and records the error. Defaults to `false`.
77    pub fail_fast: bool,
78    /// Time in milliseconds to wait before asserting expectations (to allow
79    /// async processing to complete). Defaults to `0` (no wait).
80    pub assert_period_ms: u64,
81    /// When `true`, [`MockEndpointInner::assert_satisfied`] matches expected
82    /// bodies in any order rather than strict sequence. Defaults to `false`.
83    pub any_order: bool,
84}
85
86impl Default for MockConfig {
87    fn default() -> Self {
88        Self {
89            max_retained: DEFAULT_MAX_RETAINED,
90            copy_on_exchange: false,
91            fail_fast: false,
92            assert_period_ms: 0,
93            any_order: false,
94        }
95    }
96}
97
98impl MockConfig {
99    /// Create a config with a custom retention limit.
100    pub fn new(max_retained: usize) -> Self {
101        Self {
102            max_retained,
103            ..Self::default()
104        }
105    }
106}
107
108// ---------------------------------------------------------------------------
109// MockExpectations
110// ---------------------------------------------------------------------------
111
112/// Expectations set on a mock endpoint for batch-style assertion.
113///
114/// Use [`MockEndpointInner::expect_body`] and
115/// [`MockEndpointInner::expect_header`] to populate expectations, then call
116/// [`MockEndpointInner::assert_satisfied`] after exchanges have been received.
117pub struct MockExpectations {
118    expected_bodies: Vec<camel_component_api::Body>,
119    expected_headers: Vec<(String, serde_json::Value)>,
120    expected_header_regexes: Vec<(String, String)>,
121}
122
123impl Default for MockExpectations {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129impl MockExpectations {
130    /// Create an empty set of expectations.
131    pub fn new() -> Self {
132        Self {
133            expected_bodies: Vec::new(),
134            expected_headers: Vec::new(),
135            expected_header_regexes: Vec::new(),
136        }
137    }
138
139    /// Add an expected body value.
140    pub fn push_body(&mut self, body: camel_component_api::Body) {
141        self.expected_bodies.push(body);
142    }
143
144    /// Add an expected header key-value pair.
145    pub fn push_header(&mut self, key: String, value: serde_json::Value) {
146        self.expected_headers.push((key, value));
147    }
148
149    /// Add an expected header regex pattern.
150    pub fn push_header_regex(&mut self, key: String, pattern: String) {
151        self.expected_header_regexes.push((key, pattern));
152    }
153}
154
155// ---------------------------------------------------------------------------
156// MockComponent
157// ---------------------------------------------------------------------------
158
159/// The Mock component is a testing utility that records every exchange it
160/// receives via its producer.  It exposes helpers to inspect and assert on
161/// the recorded exchanges.
162///
163/// URI format: `mock:name`
164///
165/// When `create_endpoint` is called multiple times with the same name, the
166/// returned endpoints share the same received-exchanges storage. This enables
167/// test assertions: create mock, register it, run routes, then inspect via
168/// `component.get_endpoint("name")`.
169#[derive(Clone)]
170pub struct MockComponent {
171    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
172    config: MockConfig,
173}
174
175impl MockComponent {
176    pub fn new() -> Self {
177        Self::with_config(MockConfig::default())
178    }
179
180    /// Create a `MockComponent` with a custom [`MockConfig`].
181    pub fn with_config(config: MockConfig) -> Self {
182        Self {
183            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
184            config,
185        }
186    }
187
188    /// Retrieve a previously created endpoint's inner data by name.
189    ///
190    /// This is the primary way to inspect recorded exchanges in tests.
191    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
192        let registry = self
193            .registry
194            .lock()
195            .expect("mutex poisoned: another thread panicked while holding this lock"); // allow-unwrap
196        registry.get(name).cloned()
197    }
198}
199
200impl Default for MockComponent {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl Component for MockComponent {
207    fn scheme(&self) -> &str {
208        "mock"
209    }
210
211    fn create_endpoint(
212        &self,
213        uri: &str,
214        _ctx: &dyn camel_component_api::ComponentContext,
215    ) -> Result<Box<dyn Endpoint>, CamelError> {
216        let parts = parse_uri(uri)?;
217        if parts.scheme != "mock" {
218            return Err(CamelError::InvalidUri(format!(
219                "expected scheme 'mock', got '{}'",
220                parts.scheme
221            )));
222        }
223
224        let name = parts.path;
225        if name.is_empty() {
226            return Err(CamelError::InvalidUri(
227                "mock endpoint name must be non-empty (use 'mock:<name>')".to_string(),
228            ));
229        }
230        let mut registry = self.registry.lock().map_err(|e| {
231            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
232        })?;
233        let max_retained = self.config.max_retained;
234        let copy_on_exchange = self.config.copy_on_exchange;
235        let fail_fast = self.config.fail_fast;
236        let assert_period_ms = self.config.assert_period_ms;
237        let any_order = self.config.any_order;
238        let inner = registry
239            .entry(name.clone())
240            .or_insert_with(|| {
241                Arc::new(MockEndpointInner {
242                    uri: uri.to_string(),
243                    name,
244                    received: Arc::new(Mutex::new(VecDeque::new())),
245                    notify: Arc::new(Notify::new()),
246                    max_retained,
247                    copy_on_exchange,
248                    fail_fast,
249                    fail_fast_error: Arc::new(std::sync::Mutex::new(None)),
250                    assert_period_ms,
251                    any_order,
252                    expectations: Arc::new(std::sync::Mutex::new(MockExpectations::new())),
253                })
254            })
255            .clone();
256
257        debug!(endpoint_name = %inner.name, "mock endpoint created");
258        Ok(Box::new(MockEndpoint(inner)))
259    }
260}
261
262// ---------------------------------------------------------------------------
263// MockEndpoint / MockEndpointInner
264// ---------------------------------------------------------------------------
265
266/// A mock endpoint that records all exchanges sent to it.
267///
268/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
269/// `MockEndpoint` instances created with the same name share the same inner
270/// storage.
271pub struct MockEndpoint(Arc<MockEndpointInner>);
272
273/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
274/// instances created with the same name via `MockComponent`.
275///
276/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
277/// recorded exchanges in tests.
278pub struct MockEndpointInner {
279    uri: String,
280    pub name: String,
281    received: Arc<Mutex<VecDeque<Exchange>>>,
282    notify: Arc<Notify>,
283    max_retained: usize,
284    copy_on_exchange: bool,
285    fail_fast: bool,
286    fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
287    assert_period_ms: u64,
288    any_order: bool,
289    expectations: Arc<std::sync::Mutex<MockExpectations>>,
290}
291
292impl MockEndpointInner {
293    /// Return a snapshot of all exchanges retained so far.
294    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
295        self.received.lock().await.iter().cloned().collect()
296    }
297
298    /// Return the number of currently retained exchanges.
299    pub async fn received_count(&self) -> usize {
300        self.received.lock().await.len()
301    }
302
303    /// Clear all retained exchanges and reset internal counters.
304    ///
305    /// Useful between test cases to reuse the same mock endpoint.
306    pub async fn reset(&self) {
307        self.received.lock().await.clear();
308        if let Ok(mut guard) = self.fail_fast_error.lock() {
309            *guard = None;
310        }
311    }
312
313    /// Assert that exactly `expected` exchanges have been received.
314    ///
315    /// # Panics
316    ///
317    /// Panics if the count does not match.
318    pub async fn assert_exchange_count(&self, expected: usize) {
319        let actual = self.received.lock().await.len();
320        assert_eq!(
321            actual, expected,
322            "MockEndpoint expected {expected} exchanges, got {actual}"
323        );
324    }
325
326    /// Wait until at least `count` exchanges have been received, or panic on timeout.
327    ///
328    /// Uses `tokio::sync::Notify` — no polling. Returns immediately if `count`
329    /// exchanges are already present.
330    ///
331    /// # Panics
332    ///
333    /// Panics if `timeout` elapses before `count` exchanges arrive.
334    pub async fn await_exchanges(&self, count: usize, timeout: std::time::Duration) {
335        let deadline = tokio::time::Instant::now() + timeout;
336        loop {
337            {
338                let received = self.received.lock().await;
339                if received.len() >= count {
340                    return;
341                }
342            }
343            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
344            if remaining.is_zero() {
345                // Re-check in case the final exchange arrived between the lock drop
346                // above and entering the select — Notify does not buffer permits.
347                let got = self.received.lock().await.len();
348                if got >= count {
349                    return;
350                }
351                panic!(
352                    "MockEndpoint '{}': timed out waiting for {} exchanges (got {} after {:?})",
353                    self.name, count, got, timeout
354                );
355            }
356            tokio::select! {
357                _ = self.notify.notified() => {}
358                _ = tokio::time::sleep(remaining) => {}
359            }
360        }
361    }
362
363    /// Wait for exchanges with a configurable timeout derived from `assert_period_ms`.
364    ///
365    /// If `assert_period_ms` is 0, uses the provided `fallback` duration.
366    /// Otherwise, waits for `assert_period_ms` milliseconds before checking.
367    pub async fn await_exchanges_with_timeout(&self, count: usize, fallback: std::time::Duration) {
368        let duration = if self.assert_period_ms > 0 {
369            std::time::Duration::from_millis(self.assert_period_ms)
370        } else {
371            fallback
372        };
373        self.await_exchanges(count, duration).await;
374    }
375
376    /// Return an [`ExchangeAssert`] for the exchange at `idx`.
377    ///
378    /// # Panics
379    ///
380    /// Panics if `idx` is out of bounds. Always call [`await_exchanges`] first
381    /// to ensure the exchange has been received.
382    ///
383    /// Panics if called from a single-threaded tokio runtime. Use
384    /// `#[tokio::test(flavor = "multi_thread")]` for tests that call this method.
385    ///
386    /// [`await_exchanges`]: MockEndpointInner::await_exchanges
387    // NOTE: requires multi-threaded Tokio runtime (current_thread will deadlock)
388    // due to `block_in_place` used for blocking_lock.
389    pub fn exchange(&self, idx: usize) -> ExchangeAssert {
390        let received = tokio::task::block_in_place(|| self.received.blocking_lock());
391        if idx >= received.len() {
392            panic!(
393                "MockEndpoint '{}': exchange index {} out of bounds (got {} exchanges)",
394                self.name,
395                idx,
396                received.len()
397            );
398        }
399        ExchangeAssert {
400            exchange: received[idx].clone(),
401            idx,
402            endpoint_name: self.name.clone(),
403        }
404    }
405
406    /// Add an expected body to the expectations list.
407    pub fn expect_body(&self, body: camel_component_api::Body) {
408        if let Ok(mut guard) = self.expectations.lock() {
409            guard.push_body(body);
410        }
411    }
412
413    /// Add an expected header key-value pair to the expectations list.
414    pub fn expect_header(&self, key: &str, value: impl Into<serde_json::Value>) {
415        if let Ok(mut guard) = self.expectations.lock() {
416            guard.push_header(key.to_string(), value.into());
417        }
418    }
419
420    /// Add an expected header regex pattern to the expectations list.
421    ///
422    /// After `await_exchanges()`, `assert_satisfied()` checks whether any
423    /// received exchange has the named header matching the given regex pattern.
424    pub fn expect_header_regex(&self, key: &str, pattern: &str) {
425        if let Ok(mut guard) = self.expectations.lock() {
426            guard.push_header_regex(key.to_string(), pattern.to_string());
427        }
428    }
429
430    /// Assert that all registered expectations are satisfied.
431    ///
432    /// # Panics
433    ///
434    /// Panics if expected bodies do not match received bodies (in order or any
435    /// order depending on `any_order` config), if expected headers are missing,
436    /// or if header regex patterns do not match.
437    pub async fn assert_satisfied(&self) {
438        let received = self.get_received_exchanges().await;
439
440        // Check expected bodies
441        {
442            let guard = self
443                .expectations
444                .lock()
445                .expect("expectations lock poisoned"); // allow-unwrap
446            if !guard.expected_bodies.is_empty() {
447                let received_bodies: Vec<_> = received.iter().map(|e| &e.input.body).collect();
448                if guard.expected_bodies.len() != received_bodies.len() {
449                    panic!(
450                        "MockEndpoint '{}': expected {} bodies, got {}",
451                        self.name,
452                        guard.expected_bodies.len(),
453                        received_bodies.len()
454                    );
455                }
456                if self.any_order {
457                    // Match in any order — each expected body must appear exactly once
458                    let mut unmatched: Vec<_> = received_bodies.iter().collect();
459                    for expected in &guard.expected_bodies {
460                        let idx = unmatched
461                            .iter()
462                            .position(|actual| body_eq(expected, actual));
463                        match idx {
464                            Some(i) => {
465                                unmatched.remove(i);
466                            }
467                            None => panic!(
468                                "MockEndpoint '{}': expected body {:?} not found in received exchanges (anyOrder mode)",
469                                self.name, expected
470                            ),
471                        }
472                    }
473                } else {
474                    for (i, expected) in guard.expected_bodies.iter().enumerate() {
475                        if !body_eq(expected, received_bodies[i]) {
476                            panic!(
477                                "MockEndpoint '{}': body[{}] expected {:?}, got {:?}",
478                                self.name, i, expected, received_bodies[i]
479                            );
480                        }
481                    }
482                }
483            }
484
485            // Check expected headers (must all be present on at least one exchange)
486            for (key, value) in &guard.expected_headers {
487                let found = received
488                    .iter()
489                    .any(|ex| ex.input.headers.get(key).is_some_and(|v| v == value));
490                if !found {
491                    panic!(
492                        "MockEndpoint '{}': expected header '{}' = {} not found in any received exchange",
493                        self.name, key, value
494                    );
495                }
496            }
497
498            // Check expected header regexes
499            for (key, pattern) in &guard.expected_header_regexes {
500                let re = regex::Regex::new(pattern).unwrap_or_else(|e| {
501                    panic!(
502                        "MockEndpoint '{}': invalid regex pattern {:?}: {e}",
503                        self.name, pattern
504                    )
505                });
506                let found = received.iter().any(|ex| {
507                    ex.input.headers.get(key).is_some_and(|v| {
508                        let s = match v {
509                            serde_json::Value::String(s) => s.clone(),
510                            other => other.to_string(),
511                        };
512                        re.is_match(&s)
513                    })
514                });
515                if !found {
516                    panic!(
517                        "MockEndpoint '{}': no received exchange has header '{}' matching regex {:?}",
518                        self.name, key, pattern
519                    );
520                }
521            }
522        }
523    }
524
525    /// Return the stored fail-fast error, if any.
526    pub fn fail_fast_error(&self) -> Option<CamelError> {
527        self.fail_fast_error.lock().ok().and_then(|g| g.clone())
528    }
529}
530
531/// Compare two `Body` values for equality (used by assert_satisfied).
532fn body_eq(a: &camel_component_api::Body, b: &camel_component_api::Body) -> bool {
533    match (a, b) {
534        (camel_component_api::Body::Empty, camel_component_api::Body::Empty) => true,
535        (camel_component_api::Body::Text(a), camel_component_api::Body::Text(b)) => a == b,
536        (camel_component_api::Body::Json(a), camel_component_api::Body::Json(b)) => a == b,
537        (camel_component_api::Body::Xml(a), camel_component_api::Body::Xml(b)) => a == b,
538        (camel_component_api::Body::Bytes(a), camel_component_api::Body::Bytes(b)) => a == b,
539        _ => false,
540    }
541}
542
543impl Endpoint for MockEndpoint {
544    fn uri(&self) -> &str {
545        &self.0.uri
546    }
547
548    fn create_consumer(
549        &self,
550        _rt: Arc<dyn RuntimeObservability>,
551    ) -> Result<Box<dyn Consumer>, CamelError> {
552        Err(CamelError::EndpointCreationFailed(
553            "mock endpoint does not support consumers (it is a sink)".to_string(),
554        ))
555    }
556
557    fn create_producer(
558        &self,
559        _rt: Arc<dyn RuntimeObservability>,
560        _ctx: &ProducerContext,
561    ) -> Result<BoxProcessor, CamelError> {
562        Ok(BoxProcessor::new(MockProducer {
563            name: self.0.name.clone(),
564            received: Arc::clone(&self.0.received),
565            notify: Arc::clone(&self.0.notify),
566            max_retained: self.0.max_retained,
567            copy_on_exchange: self.0.copy_on_exchange,
568            fail_fast: self.0.fail_fast,
569            fail_fast_error: Arc::clone(&self.0.fail_fast_error),
570        }))
571    }
572}
573
574// ---------------------------------------------------------------------------
575// MockProducer
576// ---------------------------------------------------------------------------
577
578/// A producer that simply records each exchange it processes.
579#[derive(Clone)]
580struct MockProducer {
581    name: String,
582    received: Arc<Mutex<VecDeque<Exchange>>>,
583    notify: Arc<Notify>,
584    max_retained: usize,
585    copy_on_exchange: bool,
586    fail_fast: bool,
587    fail_fast_error: Arc<std::sync::Mutex<Option<CamelError>>>,
588}
589
590impl Service<Exchange> for MockProducer {
591    type Response = Exchange;
592    type Error = CamelError;
593    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
594
595    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
596        // In fail-fast mode, reject new exchanges if a previous one failed
597        if self.fail_fast
598            && let Ok(guard) = self.fail_fast_error.lock()
599            && guard.is_some()
600        {
601            return Poll::Ready(Err(CamelError::ProcessorError(
602                "mock endpoint in fail-fast mode: a previous exchange caused an error".to_string(),
603            )));
604        }
605        Poll::Ready(Ok(()))
606    }
607
608    fn call(&mut self, exchange: Exchange) -> Self::Future {
609        let name = self.name.clone();
610        let received = Arc::clone(&self.received);
611        let notify = Arc::clone(&self.notify);
612        let max_retained = self.max_retained;
613        let copy_on_exchange = self.copy_on_exchange;
614        let fail_fast = self.fail_fast;
615        let fail_fast_error = Arc::clone(&self.fail_fast_error);
616        Box::pin(async move {
617            // In fail-fast mode, check if a previous error was recorded
618            if fail_fast
619                && let Ok(guard) = fail_fast_error.lock()
620                && guard.is_some()
621            {
622                return Err(CamelError::ProcessorError(
623                    "mock endpoint in fail-fast mode: a previous exchange caused an error"
624                        .to_string(),
625                ));
626            }
627
628            let correlation_id = exchange
629                .input
630                .headers
631                .get("CamelCorrelationId")
632                .and_then(|v| v.as_str())
633                .map(|s| s.to_string());
634
635            let exchange_to_store = if copy_on_exchange {
636                let mut cloned = exchange.clone();
637                // Deep-clone the body to break aliasing
638                cloned.input.body = clone_body(&exchange.input.body);
639                cloned
640            } else {
641                exchange.clone()
642            };
643
644            let mut guard = received.lock().await;
645            if guard.len() >= max_retained {
646                tracing::warn!(
647                    endpoint_name = %name,
648                    max = max_retained,
649                    "max retained exchanges reached, dropping oldest"
650                );
651                guard.pop_front();
652            }
653            guard.push_back(exchange_to_store);
654            let count = guard.len();
655            drop(guard);
656
657            debug!(
658                endpoint_name = %name,
659                count = %count,
660                correlation_id = correlation_id.as_deref().unwrap_or("none"),
661                "exchange recorded on mock"
662            );
663            notify.notify_waiters();
664
665            Ok(exchange)
666        })
667    }
668}
669
670/// Deep-clone a `Body` value.
671fn clone_body(body: &camel_component_api::Body) -> camel_component_api::Body {
672    match body {
673        camel_component_api::Body::Empty => camel_component_api::Body::Empty,
674        camel_component_api::Body::Text(s) => camel_component_api::Body::Text(s.clone()),
675        camel_component_api::Body::Json(v) => camel_component_api::Body::Json(v.clone()),
676        camel_component_api::Body::Xml(s) => camel_component_api::Body::Xml(s.clone()),
677        camel_component_api::Body::Bytes(b) => camel_component_api::Body::Bytes(b.clone()),
678        camel_component_api::Body::Stream(_) => {
679            // Streams cannot be cloned; use Empty as fallback
680            camel_component_api::Body::Empty
681        }
682    }
683}
684
685// ---------------------------------------------------------------------------
686// ExchangeAssert
687// ---------------------------------------------------------------------------
688
689/// A handle for making synchronous assertions on a recorded exchange.
690///
691/// Obtain one via [`MockEndpointInner::exchange`] after calling
692/// [`MockEndpointInner::await_exchanges`].
693///
694/// All methods panic with descriptive messages on failure, making test output
695/// self-explanatory without additional context.
696pub struct ExchangeAssert {
697    exchange: Exchange,
698    idx: usize,
699    endpoint_name: String,
700}
701
702impl ExchangeAssert {
703    fn location(&self) -> String {
704        format!(
705            "MockEndpoint '{}' exchange[{}]",
706            self.endpoint_name, self.idx
707        )
708    }
709
710    /// Assert that the body is `Body::Text` equal to `expected`.
711    pub fn assert_body_text(self, expected: &str) -> Self {
712        match self.exchange.input.body.as_text() {
713            Some(actual) if actual == expected => {}
714            Some(actual) => panic!(
715                "{}: expected body text {:?}, got {:?}",
716                self.location(),
717                expected,
718                actual
719            ),
720            None => panic!(
721                "{}: expected body text {:?}, but body is not Body::Text (got {:?})",
722                self.location(),
723                expected,
724                self.exchange.input.body
725            ),
726        }
727        self
728    }
729
730    /// Assert that the body is `Body::Json` equal to `expected`.
731    pub fn assert_body_json(self, expected: serde_json::Value) -> Self {
732        match &self.exchange.input.body {
733            camel_component_api::Body::Json(actual) if *actual == expected => {}
734            camel_component_api::Body::Json(actual) => panic!(
735                "{}: expected body JSON {}, got {}",
736                self.location(),
737                expected,
738                actual
739            ),
740            other => panic!(
741                "{}: expected body JSON {}, but body is not Body::Json (got {:?})",
742                self.location(),
743                expected,
744                other
745            ),
746        }
747        self
748    }
749
750    /// Assert that the body is `Body::Bytes` equal to `expected`.
751    pub fn assert_body_bytes(self, expected: &[u8]) -> Self {
752        match &self.exchange.input.body {
753            camel_component_api::Body::Bytes(actual) if actual.as_ref() == expected => {}
754            camel_component_api::Body::Bytes(actual) => panic!(
755                "{}: expected body bytes {:?}, got {:?}",
756                self.location(),
757                expected,
758                actual
759            ),
760            other => panic!(
761                "{}: expected body bytes {:?}, but body is not Body::Bytes (got {:?})",
762                self.location(),
763                expected,
764                other
765            ),
766        }
767        self
768    }
769
770    /// Assert that header `key` exists and equals `expected`.
771    ///
772    /// # Panics
773    ///
774    /// Panics if the header is missing or its value does not match `expected`.
775    pub fn assert_header(self, key: &str, expected: serde_json::Value) -> Self {
776        match self.exchange.input.headers.get(key) {
777            Some(actual) if *actual == expected => {}
778            Some(actual) => panic!(
779                "{}: expected header {:?} = {}, got {}",
780                self.location(),
781                key,
782                expected,
783                actual
784            ),
785            None => panic!(
786                "{}: expected header {:?} = {}, but header is absent",
787                self.location(),
788                key,
789                expected
790            ),
791        }
792        self
793    }
794
795    /// Assert that header `key` is present (any value).
796    ///
797    /// # Panics
798    ///
799    /// Panics if the header key is absent.
800    pub fn assert_header_exists(self, key: &str) -> Self {
801        if !self.exchange.input.headers.contains_key(key) {
802            panic!(
803                "{}: expected header {:?} to be present, but it was absent",
804                self.location(),
805                key
806            );
807        }
808        self
809    }
810
811    /// Assert that the exchange has an error (`exchange.error` is `Some`).
812    ///
813    /// # Panics
814    ///
815    /// Panics if `exchange.error` is `None`.
816    pub fn assert_has_error(self) -> Self {
817        if self.exchange.error.is_none() {
818            panic!(
819                "{}: expected exchange to have an error, but error is None",
820                self.location()
821            );
822        }
823        self
824    }
825
826    /// Assert that the exchange has no error (`exchange.error` is `None`).
827    ///
828    /// # Panics
829    ///
830    /// Panics if `exchange.error` is `Some`.
831    pub fn assert_no_error(self) -> Self {
832        if let Some(ref err) = self.exchange.error {
833            panic!(
834                "{}: expected exchange to have no error, but got: {}",
835                self.location(),
836                err
837            );
838        }
839        self
840    }
841}
842
843// ---------------------------------------------------------------------------
844// Tests
845// ---------------------------------------------------------------------------
846
847#[cfg(test)]
848mod tests {
849    use camel_component_api::test_support::PanicRuntimeObservability;
850    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
851        std::sync::Arc::new(PanicRuntimeObservability)
852    }
853
854    use super::*;
855    use camel_component_api::Message;
856    use camel_component_api::NoOpComponentContext;
857    use tower::ServiceExt;
858
859    fn test_producer_ctx() -> ProducerContext {
860        ProducerContext::new()
861    }
862
863    #[test]
864    fn test_mock_component_scheme() {
865        let component = MockComponent::new();
866        assert_eq!(component.scheme(), "mock");
867    }
868
869    #[test]
870    fn test_mock_component_default() {
871        let component = MockComponent::default();
872        assert_eq!(component.scheme(), "mock");
873        assert!(component.get_endpoint("missing").is_none());
874    }
875
876    #[test]
877    fn test_mock_creates_endpoint() {
878        let component = MockComponent::new();
879        let endpoint = component.create_endpoint("mock:result", &NoOpComponentContext);
880        assert!(endpoint.is_ok());
881    }
882
883    #[test]
884    fn test_mock_wrong_scheme() {
885        let component = MockComponent::new();
886        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
887        assert!(result.is_err());
888    }
889
890    #[test]
891    fn test_empty_mock_endpoint_name_rejected() {
892        let component = MockComponent::new();
893        let result = component.create_endpoint("mock:", &NoOpComponentContext);
894        assert!(result.is_err(), "empty mock name should be rejected");
895    }
896
897    #[test]
898    fn test_valid_mock_endpoint_name_accepted() {
899        let component = MockComponent::new();
900        let result = component.create_endpoint("mock:result", &NoOpComponentContext);
901        assert!(result.is_ok());
902    }
903
904    #[test]
905    fn test_mock_endpoint_no_consumer() {
906        let component = MockComponent::new();
907        let endpoint = component
908            .create_endpoint("mock:result", &NoOpComponentContext)
909            .unwrap();
910        assert!(endpoint.create_consumer(rt()).is_err());
911    }
912
913    #[test]
914    fn test_mock_endpoint_creates_producer() {
915        let ctx = test_producer_ctx();
916        let component = MockComponent::new();
917        let endpoint = component
918            .create_endpoint("mock:result", &NoOpComponentContext)
919            .unwrap();
920        assert!(endpoint.create_producer(rt(), &ctx).is_ok());
921    }
922
923    #[test]
924    fn test_mock_endpoint_uri() {
925        let component = MockComponent::new();
926        let endpoint = component
927            .create_endpoint("mock:uri-check", &NoOpComponentContext)
928            .unwrap();
929        assert_eq!(endpoint.uri(), "mock:uri-check");
930    }
931
932    #[test]
933    fn test_mock_get_endpoint_returns_same_inner_for_same_name() {
934        let component = MockComponent::new();
935        let _ = component
936            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
937            .unwrap();
938        let _ = component
939            .create_endpoint("mock:shared-inner", &NoOpComponentContext)
940            .unwrap();
941
942        let first = component.get_endpoint("shared-inner").unwrap();
943        let second = component.get_endpoint("shared-inner").unwrap();
944        assert!(Arc::ptr_eq(&first, &second));
945    }
946
947    #[tokio::test]
948    async fn test_mock_producer_records_exchange() {
949        let ctx = test_producer_ctx();
950        let component = MockComponent::new();
951        let endpoint = component
952            .create_endpoint("mock:test", &NoOpComponentContext)
953            .unwrap();
954
955        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
956
957        let ex1 = Exchange::new(Message::new("first"));
958        let ex2 = Exchange::new(Message::new("second"));
959
960        producer.call(ex1).await.unwrap();
961        producer.call(ex2).await.unwrap();
962
963        let inner = component.get_endpoint("test").unwrap();
964        inner.assert_exchange_count(2).await;
965
966        let received = inner.get_received_exchanges().await;
967        assert_eq!(received[0].input.body.as_text(), Some("first"));
968        assert_eq!(received[1].input.body.as_text(), Some("second"));
969    }
970
971    #[tokio::test]
972    async fn test_mock_producer_passes_through_exchange() {
973        let ctx = test_producer_ctx();
974        let component = MockComponent::new();
975        let endpoint = component
976            .create_endpoint("mock:passthrough", &NoOpComponentContext)
977            .unwrap();
978
979        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
980        let exchange = Exchange::new(Message::new("hello"));
981        let result = producer.oneshot(exchange).await.unwrap();
982
983        // Producer should return the exchange unchanged
984        assert_eq!(result.input.body.as_text(), Some("hello"));
985    }
986
987    #[tokio::test]
988    async fn test_mock_assert_count_passes() {
989        let component = MockComponent::new();
990        let endpoint = component
991            .create_endpoint("mock:count", &NoOpComponentContext)
992            .unwrap();
993        let inner = component.get_endpoint("count").unwrap();
994
995        inner.assert_exchange_count(0).await;
996
997        let ctx = test_producer_ctx();
998        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
999        producer
1000            .call(Exchange::new(Message::new("one")))
1001            .await
1002            .unwrap();
1003
1004        inner.assert_exchange_count(1).await;
1005    }
1006
1007    #[tokio::test]
1008    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
1009    async fn test_mock_assert_count_fails() {
1010        let component = MockComponent::new();
1011        // Endpoint not created yet, so get_endpoint returns None.
1012        // Create it first, then assert.
1013        let _endpoint = component
1014            .create_endpoint("mock:fail", &NoOpComponentContext)
1015            .unwrap();
1016        let inner = component.get_endpoint("fail").unwrap();
1017
1018        inner.assert_exchange_count(5).await;
1019    }
1020
1021    #[tokio::test]
1022    async fn test_mock_component_shared_registry() {
1023        let component = MockComponent::new();
1024        let ep1 = component
1025            .create_endpoint("mock:shared", &NoOpComponentContext)
1026            .unwrap();
1027        let ep2 = component
1028            .create_endpoint("mock:shared", &NoOpComponentContext)
1029            .unwrap();
1030
1031        // Producing via ep1's producer...
1032        let ctx = test_producer_ctx();
1033        let mut p1 = ep1.create_producer(rt(), &ctx).unwrap();
1034        p1.call(Exchange::new(Message::new("from-ep1")))
1035            .await
1036            .unwrap();
1037
1038        // ...and via ep2's producer...
1039        let mut p2 = ep2.create_producer(rt(), &ctx).unwrap();
1040        p2.call(Exchange::new(Message::new("from-ep2")))
1041            .await
1042            .unwrap();
1043
1044        // ...both should be visible via the shared storage
1045        let inner = component.get_endpoint("shared").unwrap();
1046        inner.assert_exchange_count(2).await;
1047
1048        let received = inner.get_received_exchanges().await;
1049        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
1050        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
1051    }
1052
1053    #[tokio::test]
1054    async fn await_exchanges_resolves_immediately() {
1055        // If exchanges are already present, await_exchanges returns without timeout.
1056        let ctx = test_producer_ctx();
1057        let component = MockComponent::new();
1058        let endpoint = component
1059            .create_endpoint("mock:immediate", &NoOpComponentContext)
1060            .unwrap();
1061        let inner = component.get_endpoint("immediate").unwrap();
1062
1063        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1064        producer
1065            .call(Exchange::new(Message::new("a")))
1066            .await
1067            .unwrap();
1068        producer
1069            .call(Exchange::new(Message::new("b")))
1070            .await
1071            .unwrap();
1072
1073        // Should return immediately — both exchanges already received.
1074        inner
1075            .await_exchanges(2, std::time::Duration::from_millis(100))
1076            .await;
1077    }
1078
1079    #[tokio::test]
1080    async fn await_exchanges_waits_then_resolves() {
1081        // await_exchanges unblocks when a producer sends after the call.
1082        let ctx = test_producer_ctx();
1083        let component = MockComponent::new();
1084        let endpoint = component
1085            .create_endpoint("mock:waiter", &NoOpComponentContext)
1086            .unwrap();
1087        let inner = component.get_endpoint("waiter").unwrap();
1088
1089        // Spawn producer that sends after a short delay.
1090        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1091        tokio::spawn(async move {
1092            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1093            producer
1094                .call(Exchange::new(Message::new("delayed")))
1095                .await
1096                .unwrap();
1097        });
1098
1099        // This should block until the spawned task delivers the exchange.
1100        inner
1101            .await_exchanges(1, std::time::Duration::from_millis(500))
1102            .await;
1103
1104        let received = inner.get_received_exchanges().await;
1105        assert_eq!(received.len(), 1);
1106        assert_eq!(received[0].input.body.as_text(), Some("delayed"));
1107    }
1108
1109    #[tokio::test]
1110    #[should_panic(expected = "timed out waiting for 5 exchanges")]
1111    async fn await_exchanges_times_out() {
1112        let component = MockComponent::new();
1113        let _endpoint = component
1114            .create_endpoint("mock:timeout", &NoOpComponentContext)
1115            .unwrap();
1116        let inner = component.get_endpoint("timeout").unwrap();
1117
1118        // Nobody sends — should panic after timeout.
1119        inner
1120            .await_exchanges(5, std::time::Duration::from_millis(50))
1121            .await;
1122    }
1123
1124    #[tokio::test(flavor = "multi_thread")]
1125    async fn exchange_idx_returns_assert() {
1126        let ctx = test_producer_ctx();
1127        let component = MockComponent::new();
1128        let endpoint = component
1129            .create_endpoint("mock:assert-idx", &NoOpComponentContext)
1130            .unwrap();
1131        let inner = component.get_endpoint("assert-idx").unwrap();
1132
1133        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1134        producer
1135            .call(Exchange::new(Message::new("hello")))
1136            .await
1137            .unwrap();
1138
1139        inner
1140            .await_exchanges(1, std::time::Duration::from_millis(500))
1141            .await;
1142        // Should not panic — index 0 exists.
1143        let _assert = inner.exchange(0);
1144    }
1145
1146    #[tokio::test(flavor = "multi_thread")]
1147    #[should_panic(expected = "exchange index 5 out of bounds")]
1148    async fn exchange_idx_out_of_bounds() {
1149        let ctx = test_producer_ctx();
1150        let component = MockComponent::new();
1151        let endpoint = component
1152            .create_endpoint("mock:oob", &NoOpComponentContext)
1153            .unwrap();
1154        let inner = component.get_endpoint("oob").unwrap();
1155
1156        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1157        producer
1158            .call(Exchange::new(Message::new("only-one")))
1159            .await
1160            .unwrap();
1161
1162        inner
1163            .await_exchanges(1, std::time::Duration::from_millis(500))
1164            .await;
1165        // Only 1 exchange, index 5 should panic.
1166        let _assert = inner.exchange(5);
1167    }
1168
1169    #[tokio::test(flavor = "multi_thread")]
1170    async fn assert_body_text_pass() {
1171        let ctx = test_producer_ctx();
1172        let component = MockComponent::new();
1173        let endpoint = component
1174            .create_endpoint("mock:body-text-pass", &NoOpComponentContext)
1175            .unwrap();
1176        let inner = component.get_endpoint("body-text-pass").unwrap();
1177        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1178        producer
1179            .call(Exchange::new(Message::new("hello")))
1180            .await
1181            .unwrap();
1182        inner
1183            .await_exchanges(1, std::time::Duration::from_millis(500))
1184            .await;
1185        inner.exchange(0).assert_body_text("hello");
1186    }
1187
1188    #[tokio::test(flavor = "multi_thread")]
1189    #[should_panic(expected = "expected body text")]
1190    async fn assert_body_text_fail() {
1191        let ctx = test_producer_ctx();
1192        let component = MockComponent::new();
1193        let endpoint = component
1194            .create_endpoint("mock:body-text-fail", &NoOpComponentContext)
1195            .unwrap();
1196        let inner = component.get_endpoint("body-text-fail").unwrap();
1197        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1198        producer
1199            .call(Exchange::new(Message::new("hello")))
1200            .await
1201            .unwrap();
1202        inner
1203            .await_exchanges(1, std::time::Duration::from_millis(500))
1204            .await;
1205        inner.exchange(0).assert_body_text("world");
1206    }
1207
1208    #[tokio::test(flavor = "multi_thread")]
1209    async fn assert_body_json_pass() {
1210        use camel_component_api::Body;
1211        let ctx = test_producer_ctx();
1212        let component = MockComponent::new();
1213        let endpoint = component
1214            .create_endpoint("mock:body-json-pass", &NoOpComponentContext)
1215            .unwrap();
1216        let inner = component.get_endpoint("body-json-pass").unwrap();
1217        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1218        let mut msg = Message::new("");
1219        msg.body = Body::Json(serde_json::json!({"key": "value"}));
1220        producer.call(Exchange::new(msg)).await.unwrap();
1221        inner
1222            .await_exchanges(1, std::time::Duration::from_millis(500))
1223            .await;
1224        inner
1225            .exchange(0)
1226            .assert_body_json(serde_json::json!({"key": "value"}));
1227    }
1228
1229    #[tokio::test(flavor = "multi_thread")]
1230    #[should_panic(expected = "expected body JSON")]
1231    async fn assert_body_json_fail() {
1232        use camel_component_api::Body;
1233        let ctx = test_producer_ctx();
1234        let component = MockComponent::new();
1235        let endpoint = component
1236            .create_endpoint("mock:body-json-fail", &NoOpComponentContext)
1237            .unwrap();
1238        let inner = component.get_endpoint("body-json-fail").unwrap();
1239        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1240        let mut msg = Message::new("");
1241        msg.body = Body::Json(serde_json::json!({"key": "value"}));
1242        producer.call(Exchange::new(msg)).await.unwrap();
1243        inner
1244            .await_exchanges(1, std::time::Duration::from_millis(500))
1245            .await;
1246        inner
1247            .exchange(0)
1248            .assert_body_json(serde_json::json!({"key": "other"}));
1249    }
1250
1251    #[tokio::test(flavor = "multi_thread")]
1252    async fn assert_body_bytes_pass() {
1253        use bytes::Bytes;
1254        use camel_component_api::Body;
1255        let ctx = test_producer_ctx();
1256        let component = MockComponent::new();
1257        let endpoint = component
1258            .create_endpoint("mock:body-bytes-pass", &NoOpComponentContext)
1259            .unwrap();
1260        let inner = component.get_endpoint("body-bytes-pass").unwrap();
1261        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1262        let mut msg = Message::new("");
1263        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1264        producer.call(Exchange::new(msg)).await.unwrap();
1265        inner
1266            .await_exchanges(1, std::time::Duration::from_millis(500))
1267            .await;
1268        inner.exchange(0).assert_body_bytes(b"binary");
1269    }
1270
1271    #[tokio::test(flavor = "multi_thread")]
1272    #[should_panic(expected = "expected body bytes")]
1273    async fn assert_body_bytes_fail() {
1274        use bytes::Bytes;
1275        use camel_component_api::Body;
1276        let ctx = test_producer_ctx();
1277        let component = MockComponent::new();
1278        let endpoint = component
1279            .create_endpoint("mock:body-bytes-fail", &NoOpComponentContext)
1280            .unwrap();
1281        let inner = component.get_endpoint("body-bytes-fail").unwrap();
1282        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1283        let mut msg = Message::new("");
1284        msg.body = Body::Bytes(Bytes::from_static(b"binary"));
1285        producer.call(Exchange::new(msg)).await.unwrap();
1286        inner
1287            .await_exchanges(1, std::time::Duration::from_millis(500))
1288            .await;
1289        inner.exchange(0).assert_body_bytes(b"different");
1290    }
1291
1292    #[tokio::test(flavor = "multi_thread")]
1293    async fn assert_header_pass() {
1294        let ctx = test_producer_ctx();
1295        let component = MockComponent::new();
1296        let endpoint = component
1297            .create_endpoint("mock:hdr-pass", &NoOpComponentContext)
1298            .unwrap();
1299        let inner = component.get_endpoint("hdr-pass").unwrap();
1300        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1301        let mut msg = Message::new("body");
1302        msg.headers
1303            .insert("x-key".to_string(), serde_json::json!("value"));
1304        producer.call(Exchange::new(msg)).await.unwrap();
1305        inner
1306            .await_exchanges(1, std::time::Duration::from_millis(500))
1307            .await;
1308        inner
1309            .exchange(0)
1310            .assert_header("x-key", serde_json::json!("value"));
1311    }
1312
1313    #[tokio::test(flavor = "multi_thread")]
1314    #[should_panic(expected = "expected header")]
1315    async fn assert_header_fail() {
1316        let ctx = test_producer_ctx();
1317        let component = MockComponent::new();
1318        let endpoint = component
1319            .create_endpoint("mock:hdr-fail", &NoOpComponentContext)
1320            .unwrap();
1321        let inner = component.get_endpoint("hdr-fail").unwrap();
1322        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1323        let mut msg = Message::new("body");
1324        msg.headers
1325            .insert("x-key".to_string(), serde_json::json!("value"));
1326        producer.call(Exchange::new(msg)).await.unwrap();
1327        inner
1328            .await_exchanges(1, std::time::Duration::from_millis(500))
1329            .await;
1330        inner
1331            .exchange(0)
1332            .assert_header("x-key", serde_json::json!("other"));
1333    }
1334
1335    #[tokio::test(flavor = "multi_thread")]
1336    async fn assert_header_exists_pass() {
1337        let ctx = test_producer_ctx();
1338        let component = MockComponent::new();
1339        let endpoint = component
1340            .create_endpoint("mock:hdr-exists-pass", &NoOpComponentContext)
1341            .unwrap();
1342        let inner = component.get_endpoint("hdr-exists-pass").unwrap();
1343        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1344        let mut msg = Message::new("body");
1345        msg.headers
1346            .insert("x-present".to_string(), serde_json::json!(42));
1347        producer.call(Exchange::new(msg)).await.unwrap();
1348        inner
1349            .await_exchanges(1, std::time::Duration::from_millis(500))
1350            .await;
1351        inner.exchange(0).assert_header_exists("x-present");
1352    }
1353
1354    #[tokio::test(flavor = "multi_thread")]
1355    #[should_panic(expected = "expected header")]
1356    async fn assert_header_exists_fail() {
1357        let ctx = test_producer_ctx();
1358        let component = MockComponent::new();
1359        let endpoint = component
1360            .create_endpoint("mock:hdr-exists-fail", &NoOpComponentContext)
1361            .unwrap();
1362        let inner = component.get_endpoint("hdr-exists-fail").unwrap();
1363        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1364        producer
1365            .call(Exchange::new(Message::new("body")))
1366            .await
1367            .unwrap();
1368        inner
1369            .await_exchanges(1, std::time::Duration::from_millis(500))
1370            .await;
1371        inner.exchange(0).assert_header_exists("x-missing");
1372    }
1373
1374    #[tokio::test(flavor = "multi_thread")]
1375    async fn assert_has_error_pass() {
1376        let ctx = test_producer_ctx();
1377        let component = MockComponent::new();
1378        let endpoint = component
1379            .create_endpoint("mock:err-pass", &NoOpComponentContext)
1380            .unwrap();
1381        let inner = component.get_endpoint("err-pass").unwrap();
1382        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1383        let mut ex = Exchange::new(Message::new("body"));
1384        ex.set_error(camel_component_api::CamelError::ProcessorError(
1385            "oops".to_string(),
1386        ));
1387        producer.call(ex).await.unwrap();
1388        inner
1389            .await_exchanges(1, std::time::Duration::from_millis(500))
1390            .await;
1391        inner.exchange(0).assert_has_error();
1392    }
1393
1394    #[tokio::test(flavor = "multi_thread")]
1395    #[should_panic(expected = "expected exchange to have an error")]
1396    async fn assert_has_error_fail() {
1397        let ctx = test_producer_ctx();
1398        let component = MockComponent::new();
1399        let endpoint = component
1400            .create_endpoint("mock:has-err-fail", &NoOpComponentContext)
1401            .unwrap();
1402        let inner = component.get_endpoint("has-err-fail").unwrap();
1403        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1404        producer
1405            .call(Exchange::new(Message::new("body")))
1406            .await
1407            .unwrap();
1408        inner
1409            .await_exchanges(1, std::time::Duration::from_millis(500))
1410            .await;
1411        inner.exchange(0).assert_has_error();
1412    }
1413
1414    #[tokio::test(flavor = "multi_thread")]
1415    async fn assert_no_error_pass() {
1416        let ctx = test_producer_ctx();
1417        let component = MockComponent::new();
1418        let endpoint = component
1419            .create_endpoint("mock:no-err-pass", &NoOpComponentContext)
1420            .unwrap();
1421        let inner = component.get_endpoint("no-err-pass").unwrap();
1422        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1423        producer
1424            .call(Exchange::new(Message::new("body")))
1425            .await
1426            .unwrap();
1427        inner
1428            .await_exchanges(1, std::time::Duration::from_millis(500))
1429            .await;
1430        inner.exchange(0).assert_no_error();
1431    }
1432
1433    // -----------------------------------------------------------------------
1434    // A-13: reset() and bounded retention tests
1435    // -----------------------------------------------------------------------
1436
1437    #[tokio::test]
1438    async fn test_mock_reset_clears_exchanges() {
1439        let component = MockComponent::new();
1440        let endpoint = component
1441            .create_endpoint("mock:reset-test", &NoOpComponentContext)
1442            .unwrap();
1443        let inner = component.get_endpoint("reset-test").unwrap();
1444
1445        let ctx = test_producer_ctx();
1446        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1447        producer
1448            .call(Exchange::new(Message::new("a")))
1449            .await
1450            .unwrap();
1451        producer
1452            .call(Exchange::new(Message::new("b")))
1453            .await
1454            .unwrap();
1455
1456        assert_eq!(inner.received_count().await, 2);
1457        inner.reset().await;
1458        assert_eq!(inner.received_count().await, 0);
1459    }
1460
1461    #[tokio::test]
1462    async fn test_mock_bounded_retention_drops_oldest() {
1463        let config = MockConfig {
1464            max_retained: 3,
1465            ..Default::default()
1466        };
1467        let component = MockComponent::with_config(config);
1468        let endpoint = component
1469            .create_endpoint("mock:bounded", &NoOpComponentContext)
1470            .unwrap();
1471        let inner = component.get_endpoint("bounded").unwrap();
1472
1473        let ctx = test_producer_ctx();
1474        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1475
1476        // Send 5 exchanges, but max_retained is 3
1477        for i in 0..5 {
1478            producer
1479                .call(Exchange::new(Message::new(format!("msg-{i}"))))
1480                .await
1481                .unwrap();
1482        }
1483
1484        assert_eq!(inner.received_count().await, 3);
1485        let received = inner.get_received_exchanges().await;
1486        // Oldest (msg-0, msg-1) should be dropped
1487        assert_eq!(received[0].input.body.as_text(), Some("msg-2"));
1488        assert_eq!(received[1].input.body.as_text(), Some("msg-3"));
1489        assert_eq!(received[2].input.body.as_text(), Some("msg-4"));
1490    }
1491
1492    #[tokio::test]
1493    async fn test_mock_reset_then_record_again() {
1494        let component = MockComponent::new();
1495        let endpoint = component
1496            .create_endpoint("mock:reset-reuse", &NoOpComponentContext)
1497            .unwrap();
1498        let inner = component.get_endpoint("reset-reuse").unwrap();
1499
1500        let ctx = test_producer_ctx();
1501        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1502        producer
1503            .call(Exchange::new(Message::new("before-reset")))
1504            .await
1505            .unwrap();
1506        inner.reset().await;
1507
1508        producer
1509            .call(Exchange::new(Message::new("after-reset")))
1510            .await
1511            .unwrap();
1512
1513        let received = inner.get_received_exchanges().await;
1514        assert_eq!(received.len(), 1);
1515        assert_eq!(received[0].input.body.as_text(), Some("after-reset"));
1516    }
1517
1518    #[tokio::test(flavor = "multi_thread")]
1519    #[should_panic(expected = "expected exchange to have no error")]
1520    async fn assert_no_error_fail() {
1521        let ctx = test_producer_ctx();
1522        let component = MockComponent::new();
1523        let endpoint = component
1524            .create_endpoint("mock:no-err-fail", &NoOpComponentContext)
1525            .unwrap();
1526        let inner = component.get_endpoint("no-err-fail").unwrap();
1527        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1528        let mut ex = Exchange::new(Message::new("body"));
1529        ex.set_error(camel_component_api::CamelError::ProcessorError(
1530            "oops".to_string(),
1531        ));
1532        producer.call(ex).await.unwrap();
1533        inner
1534            .await_exchanges(1, std::time::Duration::from_millis(500))
1535            .await;
1536        inner.exchange(0).assert_no_error();
1537    }
1538
1539    // -----------------------------------------------------------------------
1540    // MOCK-003: copy_on_exchange tests
1541    // -----------------------------------------------------------------------
1542
1543    #[tokio::test]
1544    async fn test_copy_on_exchange_stores_cloned_body() {
1545        let config = MockConfig {
1546            copy_on_exchange: true,
1547            ..Default::default()
1548        };
1549        let component = MockComponent::with_config(config);
1550        let endpoint = component
1551            .create_endpoint("mock:copy", &NoOpComponentContext)
1552            .unwrap();
1553        let inner = component.get_endpoint("copy").unwrap();
1554
1555        let ctx = test_producer_ctx();
1556        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1557
1558        let mut msg = Message::new("original");
1559        msg.headers.insert("x-test".into(), serde_json::json!(1));
1560        let ex = Exchange::new(msg);
1561        producer.call(ex).await.unwrap();
1562
1563        let received = inner.get_received_exchanges().await;
1564        assert_eq!(received[0].input.body.as_text(), Some("original"));
1565    }
1566
1567    #[tokio::test]
1568    async fn test_copy_on_exchange_false_shares_storage() {
1569        let config = MockConfig {
1570            copy_on_exchange: false,
1571            ..Default::default()
1572        };
1573        let component = MockComponent::with_config(config);
1574        let endpoint = component
1575            .create_endpoint("mock:no-copy", &NoOpComponentContext)
1576            .unwrap();
1577        let inner = component.get_endpoint("no-copy").unwrap();
1578
1579        let ctx = test_producer_ctx();
1580        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1581
1582        producer
1583            .call(Exchange::new(Message::new("direct")))
1584            .await
1585            .unwrap();
1586
1587        let received = inner.get_received_exchanges().await;
1588        assert_eq!(received[0].input.body.as_text(), Some("direct"));
1589    }
1590
1591    // -----------------------------------------------------------------------
1592    // MOCK-004: expect_body / expect_header / assert_satisfied tests
1593    // -----------------------------------------------------------------------
1594
1595    #[tokio::test]
1596    async fn test_assert_satisfied_bodies_in_order() {
1597        let component = MockComponent::new();
1598        let endpoint = component
1599            .create_endpoint("mock:sat-bodies", &NoOpComponentContext)
1600            .unwrap();
1601        let inner = component.get_endpoint("sat-bodies").unwrap();
1602
1603        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1604        inner.expect_body(camel_component_api::Body::Text("beta".into()));
1605
1606        let ctx = test_producer_ctx();
1607        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1608        producer
1609            .call(Exchange::new(Message::new("alpha")))
1610            .await
1611            .unwrap();
1612        producer
1613            .call(Exchange::new(Message::new("beta")))
1614            .await
1615            .unwrap();
1616
1617        inner.assert_satisfied().await;
1618    }
1619
1620    #[tokio::test]
1621    #[should_panic(expected = "body[0] expected")]
1622    async fn test_assert_satisfied_bodies_wrong_order_fails() {
1623        let component = MockComponent::new();
1624        let endpoint = component
1625            .create_endpoint("mock:sat-bodies-fail", &NoOpComponentContext)
1626            .unwrap();
1627        let inner = component.get_endpoint("sat-bodies-fail").unwrap();
1628
1629        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1630        inner.expect_body(camel_component_api::Body::Text("beta".into()));
1631
1632        let ctx = test_producer_ctx();
1633        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1634        producer
1635            .call(Exchange::new(Message::new("beta")))
1636            .await
1637            .unwrap();
1638        producer
1639            .call(Exchange::new(Message::new("alpha")))
1640            .await
1641            .unwrap();
1642
1643        inner.assert_satisfied().await;
1644    }
1645
1646    #[tokio::test]
1647    async fn test_assert_satisfied_headers() {
1648        let component = MockComponent::new();
1649        let endpoint = component
1650            .create_endpoint("mock:sat-hdr", &NoOpComponentContext)
1651            .unwrap();
1652        let inner = component.get_endpoint("sat-hdr").unwrap();
1653
1654        inner.expect_header("status", serde_json::json!("ok"));
1655
1656        let ctx = test_producer_ctx();
1657        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1658        let mut msg = Message::new("body");
1659        msg.headers.insert("status".into(), serde_json::json!("ok"));
1660        producer.call(Exchange::new(msg)).await.unwrap();
1661
1662        inner.assert_satisfied().await;
1663    }
1664
1665    #[tokio::test]
1666    #[should_panic(expected = "expected header 'missing' =")]
1667    async fn test_assert_satisfied_headers_missing() {
1668        let component = MockComponent::new();
1669        let endpoint = component
1670            .create_endpoint("mock:sat-hdr-missing", &NoOpComponentContext)
1671            .unwrap();
1672        let inner = component.get_endpoint("sat-hdr-missing").unwrap();
1673
1674        inner.expect_header("missing", serde_json::json!("value"));
1675
1676        let ctx = test_producer_ctx();
1677        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1678        producer
1679            .call(Exchange::new(Message::new("body")))
1680            .await
1681            .unwrap();
1682
1683        inner.assert_satisfied().await;
1684    }
1685
1686    // -----------------------------------------------------------------------
1687    // MOCK-005: fail_fast tests
1688    // -----------------------------------------------------------------------
1689
1690    #[tokio::test]
1691    async fn test_fail_fast_rejects_after_first_call() {
1692        let config = MockConfig {
1693            fail_fast: true,
1694            ..Default::default()
1695        };
1696        let component = MockComponent::with_config(config);
1697        let endpoint = component
1698            .create_endpoint("mock:ff", &NoOpComponentContext)
1699            .unwrap();
1700
1701        let ctx = test_producer_ctx();
1702        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1703
1704        // First call succeeds
1705        producer
1706            .call(Exchange::new(Message::new("ok")))
1707            .await
1708            .unwrap();
1709    }
1710
1711    #[tokio::test]
1712    async fn test_fail_fast_no_error_when_all_good() {
1713        let config = MockConfig {
1714            fail_fast: true,
1715            ..Default::default()
1716        };
1717        let component = MockComponent::with_config(config);
1718        let endpoint = component
1719            .create_endpoint("mock:ff-good", &NoOpComponentContext)
1720            .unwrap();
1721        let inner = component.get_endpoint("ff-good").unwrap();
1722
1723        let ctx = test_producer_ctx();
1724        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1725
1726        producer
1727            .call(Exchange::new(Message::new("a")))
1728            .await
1729            .unwrap();
1730        producer
1731            .call(Exchange::new(Message::new("b")))
1732            .await
1733            .unwrap();
1734
1735        assert!(inner.fail_fast_error().is_none());
1736        inner.assert_exchange_count(2).await;
1737    }
1738
1739    // -----------------------------------------------------------------------
1740    // MOCK-008: await_exchanges_with_timeout tests
1741    // -----------------------------------------------------------------------
1742
1743    #[tokio::test]
1744    async fn test_await_exchanges_with_timeout_uses_config_period() {
1745        let config = MockConfig {
1746            assert_period_ms: 100,
1747            ..Default::default()
1748        };
1749        let component = MockComponent::with_config(config);
1750        let endpoint = component
1751            .create_endpoint("mock:ap", &NoOpComponentContext)
1752            .unwrap();
1753        let inner = component.get_endpoint("ap").unwrap();
1754
1755        let ctx = test_producer_ctx();
1756        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1757        producer
1758            .call(Exchange::new(Message::new("x")))
1759            .await
1760            .unwrap();
1761
1762        inner
1763            .await_exchanges_with_timeout(1, std::time::Duration::from_millis(1))
1764            .await;
1765    }
1766
1767    #[tokio::test]
1768    async fn test_await_exchanges_with_timeout_uses_fallback_when_zero() {
1769        let config = MockConfig {
1770            assert_period_ms: 0,
1771            ..Default::default()
1772        };
1773        let component = MockComponent::with_config(config);
1774        let endpoint = component
1775            .create_endpoint("mock:ap-fb", &NoOpComponentContext)
1776            .unwrap();
1777        let inner = component.get_endpoint("ap-fb").unwrap();
1778
1779        let ctx = test_producer_ctx();
1780        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1781        producer
1782            .call(Exchange::new(Message::new("y")))
1783            .await
1784            .unwrap();
1785
1786        inner
1787            .await_exchanges_with_timeout(1, std::time::Duration::from_millis(200))
1788            .await;
1789    }
1790
1791    // -----------------------------------------------------------------------
1792    // MOCK-009: expect_header_regex tests
1793    // -----------------------------------------------------------------------
1794
1795    #[tokio::test]
1796    async fn test_expect_header_regex_match() {
1797        let component = MockComponent::new();
1798        let endpoint = component
1799            .create_endpoint("mock:re-hdr", &NoOpComponentContext)
1800            .unwrap();
1801        let inner = component.get_endpoint("re-hdr").unwrap();
1802
1803        inner.expect_header_regex("x-trace-id", r"^[a-f0-9]{8}$");
1804
1805        let ctx = test_producer_ctx();
1806        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1807        let mut msg = Message::new("body");
1808        msg.headers
1809            .insert("x-trace-id".into(), serde_json::json!("deadbeef"));
1810        producer.call(Exchange::new(msg)).await.unwrap();
1811
1812        inner.assert_satisfied().await;
1813    }
1814
1815    #[tokio::test]
1816    #[should_panic(expected = "no received exchange has header")]
1817    async fn test_expect_header_regex_no_match() {
1818        let component = MockComponent::new();
1819        let endpoint = component
1820            .create_endpoint("mock:re-hdr-fail", &NoOpComponentContext)
1821            .unwrap();
1822        let inner = component.get_endpoint("re-hdr-fail").unwrap();
1823
1824        inner.expect_header_regex("x-trace-id", r"^\d+$");
1825
1826        let ctx = test_producer_ctx();
1827        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1828        let mut msg = Message::new("body");
1829        msg.headers
1830            .insert("x-trace-id".into(), serde_json::json!("abc"));
1831        producer.call(Exchange::new(msg)).await.unwrap();
1832
1833        inner.assert_satisfied().await;
1834    }
1835
1836    // -----------------------------------------------------------------------
1837    // MOCK-010: any_order tests
1838    // -----------------------------------------------------------------------
1839
1840    #[tokio::test]
1841    async fn test_any_order_bodies_match() {
1842        let config = MockConfig {
1843            any_order: true,
1844            ..Default::default()
1845        };
1846        let component = MockComponent::with_config(config);
1847        let endpoint = component
1848            .create_endpoint("mock:anyorder", &NoOpComponentContext)
1849            .unwrap();
1850        let inner = component.get_endpoint("anyorder").unwrap();
1851
1852        inner.expect_body(camel_component_api::Body::Text("beta".into()));
1853        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1854
1855        let ctx = test_producer_ctx();
1856        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1857        producer
1858            .call(Exchange::new(Message::new("alpha")))
1859            .await
1860            .unwrap();
1861        producer
1862            .call(Exchange::new(Message::new("beta")))
1863            .await
1864            .unwrap();
1865
1866        inner.assert_satisfied().await;
1867    }
1868
1869    #[tokio::test]
1870    #[should_panic(expected = "not found in received exchanges (anyOrder mode)")]
1871    async fn test_any_order_bodies_missing() {
1872        let config = MockConfig {
1873            any_order: true,
1874            ..Default::default()
1875        };
1876        let component = MockComponent::with_config(config);
1877        let endpoint = component
1878            .create_endpoint("mock:anyorder-fail", &NoOpComponentContext)
1879            .unwrap();
1880        let inner = component.get_endpoint("anyorder-fail").unwrap();
1881
1882        inner.expect_body(camel_component_api::Body::Text("gamma".into()));
1883        inner.expect_body(camel_component_api::Body::Text("alpha".into()));
1884
1885        let ctx = test_producer_ctx();
1886        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1887        producer
1888            .call(Exchange::new(Message::new("alpha")))
1889            .await
1890            .unwrap();
1891        producer
1892            .call(Exchange::new(Message::new("beta")))
1893            .await
1894            .unwrap();
1895
1896        inner.assert_satisfied().await;
1897    }
1898
1899    // -----------------------------------------------------------------------
1900    // MOCK-012: tracing instrumentation tests (compilation + basic)
1901    // -----------------------------------------------------------------------
1902
1903    #[tokio::test]
1904    async fn test_tracing_logs_exchange_received() {
1905        // Verify the producer doesn't panic and the debug trace fires
1906        let ctx = test_producer_ctx();
1907        let component = MockComponent::new();
1908        let endpoint = component
1909            .create_endpoint("mock:trace", &NoOpComponentContext)
1910            .unwrap();
1911        let mut producer = endpoint.create_producer(rt(), &ctx).unwrap();
1912        producer
1913            .call(Exchange::new(Message::new("traced")))
1914            .await
1915            .unwrap();
1916
1917        let inner = component.get_endpoint("trace").unwrap();
1918        inner.assert_exchange_count(1).await;
1919    }
1920
1921    // -----------------------------------------------------------------------
1922    // MOCK-006 / MOCK-007: doctest exists on MockConfig
1923    // -----------------------------------------------------------------------
1924
1925    #[test]
1926    fn test_mock_config_new() {
1927        let cfg = MockConfig::new(42);
1928        assert_eq!(cfg.max_retained, 42);
1929        assert!(!cfg.copy_on_exchange);
1930        assert!(!cfg.fail_fast);
1931        assert!(!cfg.any_order);
1932    }
1933}