Skip to main content

camel_component_direct/
lib.rs

1//! In-memory direct component for rust-camel — synchronous point-to-point
2//! channel between routes sharing the same context with no serialization overhead.
3//!
4//! Main types: `DirectComponent`, `DirectEndpoint`, `DirectConsumer`, `DirectProducer`.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use tokio::sync::{mpsc, oneshot};
14use tower::Service;
15
16use camel_component_api::UriConfig;
17use camel_component_api::{BoxProcessor, CamelError, Exchange};
18use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
19use tracing::{debug, error, info};
20
21// ---------------------------------------------------------------------------
22// Shared state: maps endpoint names to senders that deliver exchanges to the
23// consumer side.  Each entry holds a sender of `(Exchange, oneshot::Sender)`
24// so the producer can wait for the consumer's pipeline to finish processing
25// and receive the (possibly transformed) exchange back.
26// ---------------------------------------------------------------------------
27
28type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
29type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
30
31// ---------------------------------------------------------------------------
32// DirectConfig
33// ---------------------------------------------------------------------------
34
35/// Configuration for Direct endpoints parsed from URIs.
36///
37/// URI format: `direct:name`
38///
39/// Example: `direct:foo` creates an endpoint named "foo"
40#[derive(Debug, Clone, UriConfig)]
41#[uri_scheme = "direct"]
42#[uri_config(crate = "camel_component_api")]
43pub struct DirectConfig {
44    /// Endpoint name (path portion).
45    pub name: String,
46}
47
48// ---------------------------------------------------------------------------
49// DirectComponent
50// ---------------------------------------------------------------------------
51
52/// The Direct component provides in-memory synchronous communication between
53/// routes.
54///
55/// URI format: `direct:name`
56///
57/// A producer sending to `direct:foo` will block until the consumer on
58/// `direct:foo` has finished processing the exchange.
59pub struct DirectComponent {
60    registry: DirectRegistry,
61}
62
63impl DirectComponent {
64    pub fn new() -> Self {
65        Self {
66            registry: Arc::new(Mutex::new(HashMap::new())),
67        }
68    }
69}
70
71impl Default for DirectComponent {
72    fn default() -> Self {
73        Self::new()
74    }
75}
76
77impl Component for DirectComponent {
78    fn scheme(&self) -> &str {
79        "direct"
80    }
81
82    fn create_endpoint(
83        &self,
84        uri: &str,
85        _ctx: &dyn camel_component_api::ComponentContext,
86    ) -> Result<Box<dyn Endpoint>, CamelError> {
87        let config = DirectConfig::from_uri(uri)?;
88        if config.name.trim().is_empty() {
89            return Err(CamelError::InvalidUri(
90                "direct: endpoint name must not be empty".to_string(),
91            ));
92        }
93        let name = config.name.clone();
94        debug!(endpoint_name = %name, "direct endpoint created");
95        Ok(Box::new(DirectEndpoint {
96            uri: uri.to_string(),
97            name: config.name,
98            registry: Arc::clone(&self.registry),
99        }))
100    }
101}
102
103// ---------------------------------------------------------------------------
104// DirectEndpoint
105// ---------------------------------------------------------------------------
106
107struct DirectEndpoint {
108    uri: String,
109    name: String,
110    registry: DirectRegistry,
111}
112
113impl Endpoint for DirectEndpoint {
114    fn uri(&self) -> &str {
115        &self.uri
116    }
117
118    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
119        Ok(Box::new(DirectConsumer {
120            name: self.name.clone(),
121            registry: Arc::clone(&self.registry),
122        }))
123    }
124
125    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
126        Ok(BoxProcessor::new(DirectProducer {
127            name: self.name.clone(),
128            registry: Arc::clone(&self.registry),
129        }))
130    }
131}
132
133// ---------------------------------------------------------------------------
134// DirectConsumer
135// ---------------------------------------------------------------------------
136
137/// The Direct consumer registers itself in the shared registry and forwards
138/// incoming exchanges to the route pipeline via `ConsumerContext`.
139struct DirectConsumer {
140    name: String,
141    registry: DirectRegistry,
142}
143
144#[async_trait]
145impl Consumer for DirectConsumer {
146    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
147        // Create a channel for producers to send exchanges to this consumer.
148        let (tx, mut rx) =
149            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
150
151        // Register ourselves so producers can find us.
152        {
153            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
154            reg.insert(self.name.clone(), tx);
155        }
156
157        info!(endpoint_name = %self.name, "direct consumer started");
158
159        // Process incoming exchanges with cooperative cancellation.
160        loop {
161            tokio::select! {
162                _ = context.cancelled() => {
163                    debug!(endpoint_name = %self.name, "direct consumer received cancellation");
164                    break;
165                }
166                msg = rx.recv() => {
167                    match msg {
168                        Some((exchange, reply_tx)) => {
169                            let result = context.send_and_wait(exchange).await;
170                            let _ = reply_tx.send(result);
171                        }
172                        None => break,
173                    }
174                }
175            }
176        }
177
178        // Cleanup: remove from registry on exit
179        {
180            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
181            reg.remove(&self.name);
182        }
183
184        debug!(endpoint_name = %self.name, "direct consumer stopped");
185
186        Ok(())
187    }
188
189    async fn stop(&mut self) -> Result<(), CamelError> {
190        // Remove from registry so no new producers can send to us.
191        let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
192        reg.remove(&self.name);
193        debug!(endpoint_name = %self.name, "direct consumer stopped");
194        Ok(())
195    }
196}
197
198// ---------------------------------------------------------------------------
199// DirectProducer
200// ---------------------------------------------------------------------------
201
202/// The Direct producer sends an exchange to the named direct endpoint and
203/// waits for a reply (synchronous in-memory call).
204#[derive(Clone)]
205struct DirectProducer {
206    name: String,
207    registry: DirectRegistry,
208}
209
210impl Service<Exchange> for DirectProducer {
211    type Response = Exchange;
212    type Error = CamelError;
213    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
214
215    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216        let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
217        match reg.get(&self.name) {
218            None => Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
219                "direct endpoint '{}' not registered",
220                self.name
221            )))),
222            Some(sender) if sender.is_closed() => {
223                Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
224                    "direct endpoint '{}' channel closed",
225                    self.name
226                ))))
227            }
228            Some(_) => Poll::Ready(Ok(())),
229        }
230    }
231
232    fn call(&mut self, exchange: Exchange) -> Self::Future {
233        let name = self.name.clone();
234        let registry = Arc::clone(&self.registry);
235
236        Box::pin(async move {
237            let sender = {
238                let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
239                reg.get(&name)
240                    .ok_or_else(|| {
241                        let err = CamelError::EndpointCreationFailed(format!(
242                            "no consumer registered for direct:{name}"
243                        ));
244                        error!(endpoint_name = %name, error = %err, "direct send failed");
245                        err
246                    })?
247                    .clone()
248            };
249
250            let (reply_tx, reply_rx) = oneshot::channel();
251            sender.send((exchange, reply_tx)).await.map_err(|err| {
252                error!(endpoint_name = %name, error = %err, "direct send failed");
253                CamelError::ChannelClosed
254            })?;
255
256            let result = reply_rx.await.map_err(|err| {
257                error!(endpoint_name = %name, error = %err, "direct send failed");
258                CamelError::ChannelClosed
259            })?;
260
261            debug!(endpoint_name = %name, "direct message sent");
262            result
263        })
264    }
265}
266
267// ---------------------------------------------------------------------------
268// Tests
269// ---------------------------------------------------------------------------
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use camel_component_api::ExchangeEnvelope;
275    use camel_component_api::Message;
276    use camel_component_api::NoOpComponentContext;
277    use std::task::RawWakerVTable;
278    use tower::ServiceExt;
279
280    fn noop_waker() -> std::task::Waker {
281        const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
282        const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
283        unsafe { std::task::Waker::from_raw(RAW) }
284    }
285
286    fn test_producer_ctx() -> ProducerContext {
287        ProducerContext::new()
288    }
289
290    #[test]
291    fn test_direct_component_scheme() {
292        let component = DirectComponent::new();
293        assert_eq!(component.scheme(), "direct");
294    }
295
296    #[test]
297    fn test_direct_component_default() {
298        let component = DirectComponent::default();
299        assert_eq!(component.scheme(), "direct");
300    }
301
302    #[test]
303    fn test_direct_config_from_uri() {
304        let config = DirectConfig::from_uri("direct:orders").unwrap();
305        assert_eq!(config.name, "orders");
306    }
307
308    #[test]
309    fn test_direct_endpoint_uri() {
310        let component = DirectComponent::new();
311        let endpoint = component
312            .create_endpoint("direct:uri-check", &NoOpComponentContext)
313            .unwrap();
314        assert_eq!(endpoint.uri(), "direct:uri-check");
315    }
316
317    #[test]
318    fn test_direct_creates_endpoint() {
319        let component = DirectComponent::new();
320        let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
321        assert!(endpoint.is_ok());
322    }
323
324    #[test]
325    fn test_direct_wrong_scheme() {
326        let component = DirectComponent::new();
327        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
328        assert!(result.is_err());
329    }
330
331    #[test]
332    fn test_direct_endpoint_creates_consumer() {
333        let component = DirectComponent::new();
334        let endpoint = component
335            .create_endpoint("direct:foo", &NoOpComponentContext)
336            .unwrap();
337        assert!(endpoint.create_consumer().is_ok());
338    }
339
340    #[test]
341    fn test_direct_endpoint_creates_producer() {
342        let ctx = test_producer_ctx();
343        let component = DirectComponent::new();
344        let endpoint = component
345            .create_endpoint("direct:foo", &NoOpComponentContext)
346            .unwrap();
347        assert!(endpoint.create_producer(&ctx).is_ok());
348    }
349
350    #[test]
351    fn test_direct_empty_name_rejected() {
352        let component = DirectComponent::new();
353        match component.create_endpoint("direct:", &NoOpComponentContext) {
354            Err(e) => assert!(
355                e.to_string().contains("must not be empty"),
356                "unexpected error: {e}"
357            ),
358            Ok(_) => panic!("expected error for empty name"),
359        }
360    }
361
362    #[tokio::test]
363    async fn test_direct_producer_no_consumer_registered() {
364        let ctx = test_producer_ctx();
365        let component = DirectComponent::new();
366        let endpoint = component
367            .create_endpoint("direct:missing", &NoOpComponentContext)
368            .unwrap();
369        let producer = endpoint.create_producer(&ctx).unwrap();
370
371        let exchange = Exchange::new(Message::new("test"));
372        let result = producer.oneshot(exchange).await;
373        assert!(result.is_err());
374    }
375
376    #[tokio::test]
377    async fn test_direct_producer_consumer_roundtrip() {
378        let component = DirectComponent::new();
379
380        // Create consumer endpoint and start it
381        let consumer_endpoint = component
382            .create_endpoint("direct:test", &NoOpComponentContext)
383            .unwrap();
384        let mut consumer = consumer_endpoint.create_consumer().unwrap();
385
386        // The route channel now carries ExchangeEnvelope (request-reply support).
387        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
388        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
389
390        // Start the consumer in a background task
391        tokio::spawn(async move {
392            consumer.start(ctx).await.unwrap();
393        });
394
395        // Give the consumer a moment to register
396        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
397
398        // Spawn a pipeline simulator that reads envelopes and replies Ok.
399        tokio::spawn(async move {
400            while let Some(envelope) = route_rx.recv().await {
401                let ExchangeEnvelope { exchange, reply_tx } = envelope;
402                if let Some(tx) = reply_tx {
403                    let _ = tx.send(Ok(exchange));
404                }
405            }
406        });
407
408        // Now send an exchange via the producer
409        let ctx = test_producer_ctx();
410        let producer_endpoint = component
411            .create_endpoint("direct:test", &NoOpComponentContext)
412            .unwrap();
413        let producer = producer_endpoint.create_producer(&ctx).unwrap();
414
415        let exchange = Exchange::new(Message::new("hello direct"));
416        let result = producer.oneshot(exchange).await;
417
418        assert!(result.is_ok());
419        let reply = result.unwrap();
420        assert_eq!(reply.input.body.as_text(), Some("hello direct"));
421    }
422
423    #[tokio::test]
424    async fn test_direct_propagates_error_when_no_handler() {
425        let component = DirectComponent::new();
426
427        let consumer_endpoint = component
428            .create_endpoint("direct:err-test", &NoOpComponentContext)
429            .unwrap();
430        let mut consumer = consumer_endpoint.create_consumer().unwrap();
431
432        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
433        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
434
435        tokio::spawn(async move {
436            consumer.start(ctx).await.unwrap();
437        });
438
439        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
440
441        // Pipeline simulator that replies with Err (simulates no error handler).
442        tokio::spawn(async move {
443            while let Some(envelope) = route_rx.recv().await {
444                if let Some(tx) = envelope.reply_tx {
445                    let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
446                }
447            }
448        });
449
450        let ctx = test_producer_ctx();
451        let producer_endpoint = component
452            .create_endpoint("direct:err-test", &NoOpComponentContext)
453            .unwrap();
454        let producer = producer_endpoint.create_producer(&ctx).unwrap();
455
456        let exchange = Exchange::new(Message::new("test"));
457        let result = producer.oneshot(exchange).await;
458        assert!(result.is_err());
459        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
460    }
461
462    #[tokio::test]
463    async fn test_direct_consumer_stop_unregisters() {
464        let component = DirectComponent::new();
465        let endpoint = component
466            .create_endpoint("direct:cleanup", &NoOpComponentContext)
467            .unwrap();
468
469        // We need a consumer to register
470        let mut consumer = endpoint.create_consumer().unwrap();
471
472        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
473        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
474
475        // Start consumer in background
476        let handle = tokio::spawn(async move {
477            consumer.start(ctx).await.unwrap();
478        });
479
480        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
481
482        // Verify the name is registered
483        {
484            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
485            assert!(reg.contains_key("cleanup"));
486        }
487
488        // Create a new consumer just to call stop (stop removes from registry)
489        let mut stop_consumer = DirectConsumer {
490            name: "cleanup".to_string(),
491            registry: Arc::clone(&component.registry),
492        };
493        stop_consumer.stop().await.unwrap();
494
495        // Verify removed from registry
496        {
497            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
498            assert!(!reg.contains_key("cleanup"));
499        }
500
501        handle.abort();
502    }
503
504    #[tokio::test]
505    async fn test_direct_consumer_respects_cancellation() {
506        use tokio_util::sync::CancellationToken;
507
508        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
509        let token = CancellationToken::new();
510        let (tx, _rx) = mpsc::channel(16);
511        let ctx = ConsumerContext::new(tx, token.clone());
512
513        let mut consumer = DirectConsumer {
514            name: "cancel-test".to_string(),
515            registry: registry.clone(),
516        };
517
518        let handle = tokio::spawn(async move {
519            consumer.start(ctx).await.unwrap();
520        });
521
522        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
523        assert!(
524            registry
525                .lock()
526                .unwrap_or_else(|e| e.into_inner())
527                .contains_key("cancel-test")
528        );
529
530        token.cancel();
531        let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
532        assert!(
533            result.is_ok(),
534            "Consumer should have stopped after cancellation"
535        );
536
537        // After cancellation, the consumer should have cleaned up the registry
538        assert!(
539            !registry
540                .lock()
541                .unwrap_or_else(|e| e.into_inner())
542                .contains_key("cancel-test")
543        );
544    }
545
546    #[tokio::test]
547    async fn test_direct_consumer_stop_missing_entry_is_ok() {
548        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
549        let mut consumer = DirectConsumer {
550            name: "never-registered".to_string(),
551            registry,
552        };
553        let result = consumer.stop().await;
554        assert!(result.is_ok());
555    }
556
557    #[test]
558    fn test_poll_ready_endpoint_not_registered() {
559        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
560        let producer = DirectProducer {
561            name: "missing".to_string(),
562            registry,
563        };
564        let waker = noop_waker();
565        let mut cx = Context::from_waker(&waker);
566        let mut producer = producer;
567        let result = producer.poll_ready(&mut cx);
568        assert!(matches!(
569            result,
570            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
571        ));
572    }
573
574    #[test]
575    fn test_poll_ready_endpoint_registered() {
576        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
577        let (tx, _rx) =
578            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
579        registry.lock().unwrap().insert("active".to_string(), tx);
580        let producer = DirectProducer {
581            name: "active".to_string(),
582            registry,
583        };
584        let waker = noop_waker();
585        let mut cx = Context::from_waker(&waker);
586        let mut producer = producer;
587        let result = producer.poll_ready(&mut cx);
588        assert!(matches!(result, Poll::Ready(Ok(()))));
589    }
590
591    #[test]
592    fn test_poll_ready_channel_closed() {
593        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
594        let (tx, rx) =
595            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
596        drop(rx);
597        registry.lock().unwrap().insert("closed".to_string(), tx);
598        let producer = DirectProducer {
599            name: "closed".to_string(),
600            registry,
601        };
602        let waker = noop_waker();
603        let mut cx = Context::from_waker(&waker);
604        let mut producer = producer;
605        let result = producer.poll_ready(&mut cx);
606        assert!(matches!(
607            result,
608            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
609        ));
610    }
611}