camel_component_mock/
lib.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use tokio::sync::Mutex;
8use tower::Service;
9
10use camel_api::{BoxProcessor, CamelError, Exchange};
11use camel_component::{Component, Consumer, Endpoint, ProducerContext};
12use camel_endpoint::parse_uri;
13
14#[derive(Clone)]
29pub struct MockComponent {
30 registry: Arc<std::sync::Mutex<HashMap<String, Arc<MockEndpointInner>>>>,
31}
32
33impl MockComponent {
34 pub fn new() -> Self {
35 Self {
36 registry: Arc::new(std::sync::Mutex::new(HashMap::new())),
37 }
38 }
39
40 pub fn get_endpoint(&self, name: &str) -> Option<Arc<MockEndpointInner>> {
44 let registry = self
45 .registry
46 .lock()
47 .expect("mutex poisoned: another thread panicked while holding this lock");
48 registry.get(name).cloned()
49 }
50}
51
52impl Default for MockComponent {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl Component for MockComponent {
59 fn scheme(&self) -> &str {
60 "mock"
61 }
62
63 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
64 let parts = parse_uri(uri)?;
65 if parts.scheme != "mock" {
66 return Err(CamelError::InvalidUri(format!(
67 "expected scheme 'mock', got '{}'",
68 parts.scheme
69 )));
70 }
71
72 let name = parts.path;
73 let mut registry = self.registry.lock().map_err(|e| {
74 CamelError::EndpointCreationFailed(format!("mock registry lock poisoned: {e}"))
75 })?;
76 let inner = registry
77 .entry(name.clone())
78 .or_insert_with(|| {
79 Arc::new(MockEndpointInner {
80 uri: uri.to_string(),
81 name,
82 received: Arc::new(Mutex::new(Vec::new())),
83 })
84 })
85 .clone();
86
87 Ok(Box::new(MockEndpoint(inner)))
88 }
89}
90
91pub struct MockEndpoint(Arc<MockEndpointInner>);
101
102pub struct MockEndpointInner {
108 uri: String,
109 pub name: String,
110 received: Arc<Mutex<Vec<Exchange>>>,
111}
112
113impl MockEndpointInner {
114 pub async fn get_received_exchanges(&self) -> Vec<Exchange> {
116 self.received.lock().await.clone()
117 }
118
119 pub async fn assert_exchange_count(&self, expected: usize) {
125 let actual = self.received.lock().await.len();
126 assert_eq!(
127 actual, expected,
128 "MockEndpoint expected {expected} exchanges, got {actual}"
129 );
130 }
131}
132
133impl Endpoint for MockEndpoint {
134 fn uri(&self) -> &str {
135 &self.0.uri
136 }
137
138 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
139 Err(CamelError::EndpointCreationFailed(
140 "mock endpoint does not support consumers (it is a sink)".to_string(),
141 ))
142 }
143
144 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
145 Ok(BoxProcessor::new(MockProducer {
146 received: Arc::clone(&self.0.received),
147 }))
148 }
149}
150
151#[derive(Clone)]
157struct MockProducer {
158 received: Arc<Mutex<Vec<Exchange>>>,
159}
160
161impl Service<Exchange> for MockProducer {
162 type Response = Exchange;
163 type Error = CamelError;
164 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
165
166 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
167 Poll::Ready(Ok(()))
168 }
169
170 fn call(&mut self, exchange: Exchange) -> Self::Future {
171 let received = Arc::clone(&self.received);
172 Box::pin(async move {
173 received.lock().await.push(exchange.clone());
174 Ok(exchange)
175 })
176 }
177}
178
179#[cfg(test)]
184mod tests {
185 use super::*;
186 use camel_api::Message;
187 use std::sync::Arc;
188 use tokio::sync::Mutex;
189 use tower::ServiceExt;
190
191 struct NullRouteController;
193 #[async_trait::async_trait]
194 impl camel_api::RouteController for NullRouteController {
195 async fn start_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
196 Ok(())
197 }
198 async fn stop_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
199 Ok(())
200 }
201 async fn restart_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
202 Ok(())
203 }
204 async fn suspend_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
205 Ok(())
206 }
207 async fn resume_route(&mut self, _: &str) -> Result<(), camel_api::CamelError> {
208 Ok(())
209 }
210 fn route_status(&self, _: &str) -> Option<camel_api::RouteStatus> {
211 None
212 }
213 async fn start_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
214 Ok(())
215 }
216 async fn stop_all_routes(&mut self) -> Result<(), camel_api::CamelError> {
217 Ok(())
218 }
219 }
220
221 fn test_producer_ctx() -> ProducerContext {
222 ProducerContext::new(Arc::new(Mutex::new(NullRouteController)))
223 }
224
225 #[test]
226 fn test_mock_component_scheme() {
227 let component = MockComponent::new();
228 assert_eq!(component.scheme(), "mock");
229 }
230
231 #[test]
232 fn test_mock_creates_endpoint() {
233 let component = MockComponent::new();
234 let endpoint = component.create_endpoint("mock:result");
235 assert!(endpoint.is_ok());
236 }
237
238 #[test]
239 fn test_mock_wrong_scheme() {
240 let component = MockComponent::new();
241 let result = component.create_endpoint("timer:tick");
242 assert!(result.is_err());
243 }
244
245 #[test]
246 fn test_mock_endpoint_no_consumer() {
247 let component = MockComponent::new();
248 let endpoint = component.create_endpoint("mock:result").unwrap();
249 assert!(endpoint.create_consumer().is_err());
250 }
251
252 #[test]
253 fn test_mock_endpoint_creates_producer() {
254 let ctx = test_producer_ctx();
255 let component = MockComponent::new();
256 let endpoint = component.create_endpoint("mock:result").unwrap();
257 assert!(endpoint.create_producer(&ctx).is_ok());
258 }
259
260 #[tokio::test]
261 async fn test_mock_producer_records_exchange() {
262 let ctx = test_producer_ctx();
263 let component = MockComponent::new();
264 let endpoint = component.create_endpoint("mock:test").unwrap();
265
266 let mut producer = endpoint.create_producer(&ctx).unwrap();
267
268 let ex1 = Exchange::new(Message::new("first"));
269 let ex2 = Exchange::new(Message::new("second"));
270
271 producer.call(ex1).await.unwrap();
272 producer.call(ex2).await.unwrap();
273
274 let inner = component.get_endpoint("test").unwrap();
275 inner.assert_exchange_count(2).await;
276
277 let received = inner.get_received_exchanges().await;
278 assert_eq!(received[0].input.body.as_text(), Some("first"));
279 assert_eq!(received[1].input.body.as_text(), Some("second"));
280 }
281
282 #[tokio::test]
283 async fn test_mock_producer_passes_through_exchange() {
284 let ctx = test_producer_ctx();
285 let component = MockComponent::new();
286 let endpoint = component.create_endpoint("mock:passthrough").unwrap();
287
288 let producer = endpoint.create_producer(&ctx).unwrap();
289 let exchange = Exchange::new(Message::new("hello"));
290 let result = producer.oneshot(exchange).await.unwrap();
291
292 assert_eq!(result.input.body.as_text(), Some("hello"));
294 }
295
296 #[tokio::test]
297 async fn test_mock_assert_count_passes() {
298 let component = MockComponent::new();
299 let endpoint = component.create_endpoint("mock:count").unwrap();
300 let inner = component.get_endpoint("count").unwrap();
301
302 inner.assert_exchange_count(0).await;
303
304 let ctx = test_producer_ctx();
305 let mut producer = endpoint.create_producer(&ctx).unwrap();
306 producer
307 .call(Exchange::new(Message::new("one")))
308 .await
309 .unwrap();
310
311 inner.assert_exchange_count(1).await;
312 }
313
314 #[tokio::test]
315 #[should_panic(expected = "MockEndpoint expected 5 exchanges, got 0")]
316 async fn test_mock_assert_count_fails() {
317 let component = MockComponent::new();
318 let _endpoint = component.create_endpoint("mock:fail").unwrap();
321 let inner = component.get_endpoint("fail").unwrap();
322
323 inner.assert_exchange_count(5).await;
324 }
325
326 #[tokio::test]
327 async fn test_mock_component_shared_registry() {
328 let component = MockComponent::new();
329 let ep1 = component.create_endpoint("mock:shared").unwrap();
330 let ep2 = component.create_endpoint("mock:shared").unwrap();
331
332 let ctx = test_producer_ctx();
334 let mut p1 = ep1.create_producer(&ctx).unwrap();
335 p1.call(Exchange::new(Message::new("from-ep1")))
336 .await
337 .unwrap();
338
339 let mut p2 = ep2.create_producer(&ctx).unwrap();
341 p2.call(Exchange::new(Message::new("from-ep2")))
342 .await
343 .unwrap();
344
345 let inner = component.get_endpoint("shared").unwrap();
347 inner.assert_exchange_count(2).await;
348
349 let received = inner.get_received_exchanges().await;
350 assert_eq!(received[0].input.body.as_text(), Some("from-ep1"));
351 assert_eq!(received[1].input.body.as_text(), Some("from-ep2"));
352 }
353}