camel_component_direct/
lib.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use async_trait::async_trait;
8use tokio::sync::{Mutex, mpsc, oneshot};
9use tower::Service;
10
11use camel_component_api::UriConfig;
12use camel_component_api::{BoxProcessor, CamelError, Exchange};
13use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
14
15type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
23type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
24
25#[derive(Debug, Clone, UriConfig)]
35#[uri_scheme = "direct"]
36#[uri_config(crate = "camel_component_api")]
37pub struct DirectConfig {
38 pub name: String,
40}
41
42pub struct DirectComponent {
54 registry: DirectRegistry,
55}
56
57impl DirectComponent {
58 pub fn new() -> Self {
59 Self {
60 registry: Arc::new(Mutex::new(HashMap::new())),
61 }
62 }
63}
64
65impl Default for DirectComponent {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl Component for DirectComponent {
72 fn scheme(&self) -> &str {
73 "direct"
74 }
75
76 fn create_endpoint(
77 &self,
78 uri: &str,
79 _ctx: &dyn camel_component_api::ComponentContext,
80 ) -> Result<Box<dyn Endpoint>, CamelError> {
81 let config = DirectConfig::from_uri(uri)?;
82 Ok(Box::new(DirectEndpoint {
83 uri: uri.to_string(),
84 name: config.name,
85 registry: Arc::clone(&self.registry),
86 }))
87 }
88}
89
90struct DirectEndpoint {
95 uri: String,
96 name: String,
97 registry: DirectRegistry,
98}
99
100impl Endpoint for DirectEndpoint {
101 fn uri(&self) -> &str {
102 &self.uri
103 }
104
105 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
106 Ok(Box::new(DirectConsumer {
107 name: self.name.clone(),
108 registry: Arc::clone(&self.registry),
109 }))
110 }
111
112 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
113 Ok(BoxProcessor::new(DirectProducer {
114 name: self.name.clone(),
115 registry: Arc::clone(&self.registry),
116 }))
117 }
118}
119
120struct DirectConsumer {
127 name: String,
128 registry: DirectRegistry,
129}
130
131#[async_trait]
132impl Consumer for DirectConsumer {
133 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
134 let (tx, mut rx) =
136 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
137
138 {
140 let mut reg = self.registry.lock().await;
141 reg.insert(self.name.clone(), tx);
142 }
143
144 loop {
146 tokio::select! {
147 _ = context.cancelled() => {
148 tracing::debug!("Direct '{}' received cancellation, stopping", self.name);
149 break;
150 }
151 msg = rx.recv() => {
152 match msg {
153 Some((exchange, reply_tx)) => {
154 let result = context.send_and_wait(exchange).await;
155 let _ = reply_tx.send(result);
156 }
157 None => break,
158 }
159 }
160 }
161 }
162
163 {
165 let mut reg = self.registry.lock().await;
166 reg.remove(&self.name);
167 }
168
169 Ok(())
170 }
171
172 async fn stop(&mut self) -> Result<(), CamelError> {
173 let mut reg = self.registry.lock().await;
175 reg.remove(&self.name);
176 Ok(())
177 }
178}
179
180#[derive(Clone)]
187struct DirectProducer {
188 name: String,
189 registry: DirectRegistry,
190}
191
192impl Service<Exchange> for DirectProducer {
193 type Response = Exchange;
194 type Error = CamelError;
195 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
196
197 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
198 Poll::Ready(Ok(()))
199 }
200
201 fn call(&mut self, exchange: Exchange) -> Self::Future {
202 let name = self.name.clone();
203 let registry = Arc::clone(&self.registry);
204
205 Box::pin(async move {
206 let reg = registry.lock().await;
207 let sender = reg.get(&name).ok_or_else(|| {
208 CamelError::EndpointCreationFailed(format!(
209 "no consumer registered for direct:{name}"
210 ))
211 })?;
212
213 let (reply_tx, reply_rx) = oneshot::channel();
214 sender
215 .send((exchange, reply_tx))
216 .await
217 .map_err(|_| CamelError::ChannelClosed)?;
218
219 drop(reg);
221
222 reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
224 })
225 }
226}
227
228#[cfg(test)]
233mod tests {
234 use super::*;
235 use camel_component_api::ExchangeEnvelope;
236 use camel_component_api::Message;
237 use camel_component_api::NoOpComponentContext;
238 use tower::ServiceExt;
239
240 fn test_producer_ctx() -> ProducerContext {
241 ProducerContext::new()
242 }
243
244 #[test]
245 fn test_direct_component_scheme() {
246 let component = DirectComponent::new();
247 assert_eq!(component.scheme(), "direct");
248 }
249
250 #[test]
251 fn test_direct_creates_endpoint() {
252 let component = DirectComponent::new();
253 let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
254 assert!(endpoint.is_ok());
255 }
256
257 #[test]
258 fn test_direct_wrong_scheme() {
259 let component = DirectComponent::new();
260 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
261 assert!(result.is_err());
262 }
263
264 #[test]
265 fn test_direct_endpoint_creates_consumer() {
266 let component = DirectComponent::new();
267 let endpoint = component
268 .create_endpoint("direct:foo", &NoOpComponentContext)
269 .unwrap();
270 assert!(endpoint.create_consumer().is_ok());
271 }
272
273 #[test]
274 fn test_direct_endpoint_creates_producer() {
275 let ctx = test_producer_ctx();
276 let component = DirectComponent::new();
277 let endpoint = component
278 .create_endpoint("direct:foo", &NoOpComponentContext)
279 .unwrap();
280 assert!(endpoint.create_producer(&ctx).is_ok());
281 }
282
283 #[tokio::test]
284 async fn test_direct_producer_no_consumer_registered() {
285 let ctx = test_producer_ctx();
286 let component = DirectComponent::new();
287 let endpoint = component
288 .create_endpoint("direct:missing", &NoOpComponentContext)
289 .unwrap();
290 let producer = endpoint.create_producer(&ctx).unwrap();
291
292 let exchange = Exchange::new(Message::new("test"));
293 let result = producer.oneshot(exchange).await;
294 assert!(result.is_err());
295 }
296
297 #[tokio::test]
298 async fn test_direct_producer_consumer_roundtrip() {
299 let component = DirectComponent::new();
300
301 let consumer_endpoint = component
303 .create_endpoint("direct:test", &NoOpComponentContext)
304 .unwrap();
305 let mut consumer = consumer_endpoint.create_consumer().unwrap();
306
307 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
309 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
310
311 tokio::spawn(async move {
313 consumer.start(ctx).await.unwrap();
314 });
315
316 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
318
319 tokio::spawn(async move {
321 while let Some(envelope) = route_rx.recv().await {
322 let ExchangeEnvelope { exchange, reply_tx } = envelope;
323 if let Some(tx) = reply_tx {
324 let _ = tx.send(Ok(exchange));
325 }
326 }
327 });
328
329 let ctx = test_producer_ctx();
331 let producer_endpoint = component
332 .create_endpoint("direct:test", &NoOpComponentContext)
333 .unwrap();
334 let producer = producer_endpoint.create_producer(&ctx).unwrap();
335
336 let exchange = Exchange::new(Message::new("hello direct"));
337 let result = producer.oneshot(exchange).await;
338
339 assert!(result.is_ok());
340 let reply = result.unwrap();
341 assert_eq!(reply.input.body.as_text(), Some("hello direct"));
342 }
343
344 #[tokio::test]
345 async fn test_direct_propagates_error_when_no_handler() {
346 let component = DirectComponent::new();
347
348 let consumer_endpoint = component
349 .create_endpoint("direct:err-test", &NoOpComponentContext)
350 .unwrap();
351 let mut consumer = consumer_endpoint.create_consumer().unwrap();
352
353 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
354 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
355
356 tokio::spawn(async move {
357 consumer.start(ctx).await.unwrap();
358 });
359
360 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
361
362 tokio::spawn(async move {
364 while let Some(envelope) = route_rx.recv().await {
365 if let Some(tx) = envelope.reply_tx {
366 let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
367 }
368 }
369 });
370
371 let ctx = test_producer_ctx();
372 let producer_endpoint = component
373 .create_endpoint("direct:err-test", &NoOpComponentContext)
374 .unwrap();
375 let producer = producer_endpoint.create_producer(&ctx).unwrap();
376
377 let exchange = Exchange::new(Message::new("test"));
378 let result = producer.oneshot(exchange).await;
379 assert!(result.is_err());
380 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
381 }
382
383 #[tokio::test]
384 async fn test_direct_consumer_stop_unregisters() {
385 let component = DirectComponent::new();
386 let endpoint = component
387 .create_endpoint("direct:cleanup", &NoOpComponentContext)
388 .unwrap();
389
390 let mut consumer = endpoint.create_consumer().unwrap();
392
393 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
394 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
395
396 let handle = tokio::spawn(async move {
398 consumer.start(ctx).await.unwrap();
399 });
400
401 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
402
403 {
405 let reg = component.registry.lock().await;
406 assert!(reg.contains_key("cleanup"));
407 }
408
409 let mut stop_consumer = DirectConsumer {
411 name: "cleanup".to_string(),
412 registry: Arc::clone(&component.registry),
413 };
414 stop_consumer.stop().await.unwrap();
415
416 {
418 let reg = component.registry.lock().await;
419 assert!(!reg.contains_key("cleanup"));
420 }
421
422 handle.abort();
423 }
424
425 #[tokio::test]
426 async fn test_direct_consumer_respects_cancellation() {
427 use tokio_util::sync::CancellationToken;
428
429 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
430 let token = CancellationToken::new();
431 let (tx, _rx) = mpsc::channel(16);
432 let ctx = ConsumerContext::new(tx, token.clone());
433
434 let mut consumer = DirectConsumer {
435 name: "cancel-test".to_string(),
436 registry: registry.clone(),
437 };
438
439 let handle = tokio::spawn(async move {
440 consumer.start(ctx).await.unwrap();
441 });
442
443 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
444 assert!(registry.lock().await.contains_key("cancel-test"));
445
446 token.cancel();
447 let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
448 assert!(
449 result.is_ok(),
450 "Consumer should have stopped after cancellation"
451 );
452
453 assert!(!registry.lock().await.contains_key("cancel-test"));
455 }
456}