Skip to main content

camel_component_mock/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::Mutex;
8use tower::Service;
9
10use camel_api::{BoxProcessor, CamelError, Exchange};
11use camel_component::{Component, Consumer, Endpoint, ProducerContext};
12use camel_endpoint::parse_uri;
13
14// ---------------------------------------------------------------------------
15// MockComponent
16// ---------------------------------------------------------------------------
17
18/// The Mock component is a testing utility that records every exchange it
19/// receives via its producer.  It exposes helpers to inspect and assert on
20/// the recorded exchanges.
21///
22/// URI format: `mock:name`
23///
24/// When `create_endpoint` is called multiple times with the same name, the
25/// returned endpoints share the same received-exchanges storage. This enables
26/// test assertions: create mock, register it, run routes, then inspect via
27/// `component.get_endpoint("name")`.
28#[derive(Clone)]
29pub struct MockComponent {
30    registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34    pub fn new() -> Self {
35        Self {
36            registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37        }
38    }
39
40    /// Retrieve a previously created endpoint's inner data by name.
41    ///
42    /// This is the primary way to inspect recorded exchanges in tests.
43    pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44        let registry = self
45            .registry
46            .lock()
47            .expect("mutex poisoned: another thread panicked while holding this lock");
48        registry.get(name).cloned()
49    }
50}
51
52impl Default for MockComponent {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl Component for MockComponent {
59    fn scheme(&self) -> &str {
60        "mock"
61    }
62
63    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
64        let parts = parse_uri(uri)?;
65        if parts.scheme != "mock" {
66            return Err(CamelError::InvalidUri(format!(
67                "expected scheme 'mock', got '{}'",
68                parts.scheme
69            )));
70        }
71
72        let name = parts.path;
73        let mut registry = self.registry.lock().map_err(|e| {
74            CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
75        })?;
76        let inner = registry
77            .entry(name.clone())
78            .or_insert_with(|| {
79                Arc::new(MockEndpointInner {
80                    uri: uri.to_string(),
81                    name,
82                    received: Arc::new(Mutex::new(Vec::new())),
83                })
84            })
85            .clone();
86
87        Ok(Box::new(MockEndpoint(inner)))
88    }
89}
90
91// ---------------------------------------------------------------------------
92// MockEndpoint / MockEndpointInner
93// ---------------------------------------------------------------------------
94
95/// A mock endpoint that records all exchanges sent to it.
96///
97/// This is a thin wrapper around `Arc<MockEndpointInner>`. Multiple
98/// `MockEndpoint` instances created with the same name share the same inner
99/// storage.
100pub struct MockEndpoint(Arc<MockEndpointInner>);
101
102/// The actual data behind a mock endpoint. Shared across all `MockEndpoint`
103/// instances created with the same name via `MockComponent`.
104///
105/// Use `get_received_exchanges` and `assert_exchange_count` to inspect
106/// recorded exchanges in tests.
107pub struct MockEndpointInner {
108    uri: String,
109    pub name: String,
110    received: Arc<Mutex<Vec<Exchange>>>,
111}
112
113impl MockEndpointInner {
114    /// Return a snapshot of all exchanges received so far.
115    pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
116        self.received.lock().await.clone()
117    }
118
119    /// Assert that exactly `expected` exchanges have been received.
120    ///
121    /// # Panics
122    ///
123    /// Panics if the count does not match.
124    pub async fn assert_exchange_count(&self, expected: usize) {
125        let actual = self.received.lock().await.len();
126        assert_eq!(
127            actual, expected,
128            "MockEndpoint expected {expected} exchanges, got {actual}"
129        );
130    }
131}
132
133impl Endpoint for MockEndpoint {
134    fn uri(&self) -> &str {
135        &self.0.uri
136    }
137
138    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
139        Err(CamelError::EndpointCreationFailed(
140            "mock endpoint does not support consumers (it is a sink)".to_string(),
141        ))
142    }
143
144    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
145        Ok(BoxProcessor::new(MockProducer {
146            received: Arc::clone(&self.0.received),
147        }))
148    }
149}
150
151// ---------------------------------------------------------------------------
152// MockProducer
153// ---------------------------------------------------------------------------
154
155/// A producer that simply records each exchange it processes.
156#[derive(Clone)]
157struct MockProducer {
158    received: Arc<Mutex<Vec<Exchange>>>,
159}
160
161impl Service<Exchange> for MockProducer {
162    type Response = Exchange;
163    type Error = CamelError;
164    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
165
166    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167        Poll::Ready(Ok(()))
168    }
169
170    fn call(&mut self, exchange: Exchange) -> Self::Future {
171        let received = Arc::clone(&self.received);
172        Box::pin(async move {
173            received.lock().await.push(exchange.clone());
174            Ok(exchange)
175        })
176    }
177}
178
179// ---------------------------------------------------------------------------
180// Tests
181// ---------------------------------------------------------------------------
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use camel_api::Message;
187    use std::sync::Arc;
188    use tokio::sync::Mutex;
189    use tower::ServiceExt;
190
191    // NullRouteController for testing
192    struct NullRouteController;
193    #[async_trait::async_trait]
194    impl camel_api::RouteController for NullRouteController {
195        async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
196            Ok(())
197        }
198        async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
199            Ok(())
200        }
201        async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
202            Ok(())
203        }
204        async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
205            Ok(())
206        }
207        async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
208            Ok(())
209        }
210        fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
211            None
212        }
213        async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
214            Ok(())
215        }
216        async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
217            Ok(())
218        }
219    }
220
221    fn test_producer_ctx() -> ProducerContext {
222        ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
223    }
224
225    #[test]
226    fn test_mock_component_scheme() {
227        let component = MockComponent::new();
228        assert_eq!(component.scheme(), "mock");
229    }
230
231    #[test]
232    fn test_mock_creates_endpoint() {
233        let component = MockComponent::new();
234        let endpoint = component.create_endpoint("mock:result");
235        assert!(endpoint.is_ok());
236    }
237
238    #[test]
239    fn test_mock_wrong_scheme() {
240        let component = MockComponent::new();
241        let result = component.create_endpoint("timer:tick");
242        assert!(result.is_err());
243    }
244
245    #[test]
246    fn test_mock_endpoint_no_consumer() {
247        let component = MockComponent::new();
248        let endpoint = component.create_endpoint("mock:result").unwrap();
249        assert!(endpoint.create_consumer().is_err());
250    }
251
252    #[test]
253    fn test_mock_endpoint_creates_producer() {
254        let ctx = test_producer_ctx();
255        let component = MockComponent::new();
256        let endpoint = component.create_endpoint("mock:result").unwrap();
257        assert!(endpoint.create_producer(&ctx).is_ok());
258    }
259
260    #[tokio::test]
261    async fn test_mock_producer_records_exchange() {
262        let ctx = test_producer_ctx();
263        let component = MockComponent::new();
264        let endpoint = component.create_endpoint("mock:test").unwrap();
265
266        let mut producer = endpoint.create_producer(&ctx).unwrap();
267
268        let ex1 = Exchange::new(Message::new("first"));
269        let ex2 = Exchange::new(Message::new("second"));
270
271        producer.call(ex1).await.unwrap();
272        producer.call(ex2).await.unwrap();
273
274        let inner = component.get_endpoint("test").unwrap();
275        inner.assert_exchange_count(2).await;
276
277        let received = inner.get_received_exchanges().await;
278        assert_eq!(received[0].input.body.as_text(), Some("first"));
279        assert_eq!(received[1].input.body.as_text(), Some("second"));
280    }
281
282    #[tokio::test]
283    async fn test_mock_producer_passes_through_exchange() {
284        let ctx = test_producer_ctx();
285        let component = MockComponent::new();
286        let endpoint = component.create_endpoint("mock:passthrough").unwrap();
287
288        let producer = endpoint.create_producer(&ctx).unwrap();
289        let exchange = Exchange::new(Message::new("hello"));
290        let result = producer.oneshot(exchange).await.unwrap();
291
292        // Producer should return the exchange unchanged
293        assert_eq!(result.input.body.as_text(), Some("hello"));
294    }
295
296    #[tokio::test]
297    async fn test_mock_assert_count_passes() {
298        let component = MockComponent::new();
299        let endpoint = component.create_endpoint("mock:count").unwrap();
300        let inner = component.get_endpoint("count").unwrap();
301
302        inner.assert_exchange_count(0).await;
303
304        let ctx = test_producer_ctx();
305        let mut producer = endpoint.create_producer(&ctx).unwrap();
306        producer
307            .call(Exchange::new(Message::new("one")))
308            .await
309            .unwrap();
310
311        inner.assert_exchange_count(1).await;
312    }
313
314    #[tokio::test]
315    #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
316    async fn test_mock_assert_count_fails() {
317        let component = MockComponent::new();
318        // Endpoint not created yet, so get_endpoint returns None.
319        // Create it first, then assert.
320        let _endpoint = component.create_endpoint("mock:fail").unwrap();
321        let inner = component.get_endpoint("fail").unwrap();
322
323        inner.assert_exchange_count(5).await;
324    }
325
326    #[tokio::test]
327    async fn test_mock_component_shared_registry() {
328        let component = MockComponent::new();
329        let ep1 = component.create_endpoint("mock:shared").unwrap();
330        let ep2 = component.create_endpoint("mock:shared").unwrap();
331
332        // Producing via ep1's producer...
333        let ctx = test_producer_ctx();
334        let mut p1 = ep1.create_producer(&ctx).unwrap();
335        p1.call(Exchange::new(Message::new("from-ep1")))
336            .await
337            .unwrap();
338
339        // ...and via ep2's producer...
340        let mut p2 = ep2.create_producer(&ctx).unwrap();
341        p2.call(Exchange::new(Message::new("from-ep2")))
342            .await
343            .unwrap();
344
345        // ...both should be visible via the shared storage
346        let inner = component.get_endpoint("shared").unwrap();
347        inner.assert_exchange_count(2).await;
348
349        let received = inner.get_received_exchanges().await;
350        assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
351        assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
352    }
353}