Skip to main content

camel_component_direct/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use async_trait::async_trait;
8use tokio::sync::{Mutex, mpsc, oneshot};
9use tower::Service;
10
11use camel_component_api::UriConfig;
12use camel_component_api::{BoxProcessor, CamelError, Exchange};
13use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
14
15// ---------------------------------------------------------------------------
16// Shared state: maps endpoint names to senders that deliver exchanges to the
17// consumer side.  Each entry holds a sender of `(Exchange, oneshot::Sender)`
18// so the producer can wait for the consumer's pipeline to finish processing
19// and receive the (possibly transformed) exchange back.
20// ---------------------------------------------------------------------------
21
22type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
23type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
24
25// ---------------------------------------------------------------------------
26// DirectConfig
27// ---------------------------------------------------------------------------
28
29/// Configuration for Direct endpoints parsed from URIs.
30///
31/// URI format: `direct:name`
32///
33/// Example: `direct:foo` creates an endpoint named "foo"
34#[derive(Debug, Clone, UriConfig)]
35#[uri_scheme = "direct"]
36#[uri_config(crate = "camel_component_api")]
37pub struct DirectConfig {
38    /// Endpoint name (path portion).
39    pub name: String,
40}
41
42// ---------------------------------------------------------------------------
43// DirectComponent
44// ---------------------------------------------------------------------------
45
46/// The Direct component provides in-memory synchronous communication between
47/// routes.
48///
49/// URI format: `direct:name`
50///
51/// A producer sending to `direct:foo` will block until the consumer on
52/// `direct:foo` has finished processing the exchange.
53pub struct DirectComponent {
54    registry: DirectRegistry,
55}
56
57impl DirectComponent {
58    pub fn new() -> Self {
59        Self {
60            registry: Arc::new(Mutex::new(HashMap::new())),
61        }
62    }
63}
64
65impl Default for DirectComponent {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl Component for DirectComponent {
72    fn scheme(&self) -> &str {
73        "direct"
74    }
75
76    fn create_endpoint(
77        &self,
78        uri: &str,
79        _ctx: &dyn camel_component_api::ComponentContext,
80    ) -> Result<Box<dyn Endpoint>, CamelError> {
81        let config = DirectConfig::from_uri(uri)?;
82        Ok(Box::new(DirectEndpoint {
83            uri: uri.to_string(),
84            name: config.name,
85            registry: Arc::clone(&self.registry),
86        }))
87    }
88}
89
90// ---------------------------------------------------------------------------
91// DirectEndpoint
92// ---------------------------------------------------------------------------
93
94struct DirectEndpoint {
95    uri: String,
96    name: String,
97    registry: DirectRegistry,
98}
99
100impl Endpoint for DirectEndpoint {
101    fn uri(&self) -> &str {
102        &self.uri
103    }
104
105    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
106        Ok(Box::new(DirectConsumer {
107            name: self.name.clone(),
108            registry: Arc::clone(&self.registry),
109        }))
110    }
111
112    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
113        Ok(BoxProcessor::new(DirectProducer {
114            name: self.name.clone(),
115            registry: Arc::clone(&self.registry),
116        }))
117    }
118}
119
120// ---------------------------------------------------------------------------
121// DirectConsumer
122// ---------------------------------------------------------------------------
123
124/// The Direct consumer registers itself in the shared registry and forwards
125/// incoming exchanges to the route pipeline via `ConsumerContext`.
126struct DirectConsumer {
127    name: String,
128    registry: DirectRegistry,
129}
130
131#[async_trait]
132impl Consumer for DirectConsumer {
133    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
134        // Create a channel for producers to send exchanges to this consumer.
135        let (tx, mut rx) =
136            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
137
138        // Register ourselves so producers can find us.
139        {
140            let mut reg = self.registry.lock().await;
141            reg.insert(self.name.clone(), tx);
142        }
143
144        // Process incoming exchanges with cooperative cancellation.
145        loop {
146            tokio::select! {
147                _ = context.cancelled() => {
148                    tracing::debug!("Direct '{}' received cancellation, stopping", self.name);
149                    break;
150                }
151                msg = rx.recv() => {
152                    match msg {
153                        Some((exchange, reply_tx)) => {
154                            let result = context.send_and_wait(exchange).await;
155                            let _ = reply_tx.send(result);
156                        }
157                        None => break,
158                    }
159                }
160            }
161        }
162
163        // Cleanup: remove from registry on exit
164        {
165            let mut reg = self.registry.lock().await;
166            reg.remove(&self.name);
167        }
168
169        Ok(())
170    }
171
172    async fn stop(&mut self) -> Result<(), CamelError> {
173        // Remove from registry so no new producers can send to us.
174        let mut reg = self.registry.lock().await;
175        reg.remove(&self.name);
176        Ok(())
177    }
178}
179
180// ---------------------------------------------------------------------------
181// DirectProducer
182// ---------------------------------------------------------------------------
183
184/// The Direct producer sends an exchange to the named direct endpoint and
185/// waits for a reply (synchronous in-memory call).
186#[derive(Clone)]
187struct DirectProducer {
188    name: String,
189    registry: DirectRegistry,
190}
191
192impl Service<Exchange> for DirectProducer {
193    type Response = Exchange;
194    type Error = CamelError;
195    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
196
197    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
198        Poll::Ready(Ok(()))
199    }
200
201    fn call(&mut self, exchange: Exchange) -> Self::Future {
202        let name = self.name.clone();
203        let registry = Arc::clone(&self.registry);
204
205        Box::pin(async move {
206            let reg = registry.lock().await;
207            let sender = reg.get(&name).ok_or_else(|| {
208                CamelError::EndpointCreationFailed(format!(
209                    "no consumer registered for direct:{name}"
210                ))
211            })?;
212
213            let (reply_tx, reply_rx) = oneshot::channel();
214            sender
215                .send((exchange, reply_tx))
216                .await
217                .map_err(|_| CamelError::ChannelClosed)?;
218
219            // Drop the lock before awaiting the reply to avoid deadlocks.
220            drop(reg);
221
222            // Propagate Ok or Err from the subroute pipeline.
223            reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
224        })
225    }
226}
227
228// ---------------------------------------------------------------------------
229// Tests
230// ---------------------------------------------------------------------------
231
232#[cfg(test)]
233mod tests {
234    use super::*;
235    use camel_component_api::ExchangeEnvelope;
236    use camel_component_api::Message;
237    use camel_component_api::NoOpComponentContext;
238    use tower::ServiceExt;
239
240    fn test_producer_ctx() -> ProducerContext {
241        ProducerContext::new()
242    }
243
244    #[test]
245    fn test_direct_component_scheme() {
246        let component = DirectComponent::new();
247        assert_eq!(component.scheme(), "direct");
248    }
249
250    #[test]
251    fn test_direct_creates_endpoint() {
252        let component = DirectComponent::new();
253        let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
254        assert!(endpoint.is_ok());
255    }
256
257    #[test]
258    fn test_direct_wrong_scheme() {
259        let component = DirectComponent::new();
260        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
261        assert!(result.is_err());
262    }
263
264    #[test]
265    fn test_direct_endpoint_creates_consumer() {
266        let component = DirectComponent::new();
267        let endpoint = component
268            .create_endpoint("direct:foo", &NoOpComponentContext)
269            .unwrap();
270        assert!(endpoint.create_consumer().is_ok());
271    }
272
273    #[test]
274    fn test_direct_endpoint_creates_producer() {
275        let ctx = test_producer_ctx();
276        let component = DirectComponent::new();
277        let endpoint = component
278            .create_endpoint("direct:foo", &NoOpComponentContext)
279            .unwrap();
280        assert!(endpoint.create_producer(&ctx).is_ok());
281    }
282
283    #[tokio::test]
284    async fn test_direct_producer_no_consumer_registered() {
285        let ctx = test_producer_ctx();
286        let component = DirectComponent::new();
287        let endpoint = component
288            .create_endpoint("direct:missing", &NoOpComponentContext)
289            .unwrap();
290        let producer = endpoint.create_producer(&ctx).unwrap();
291
292        let exchange = Exchange::new(Message::new("test"));
293        let result = producer.oneshot(exchange).await;
294        assert!(result.is_err());
295    }
296
297    #[tokio::test]
298    async fn test_direct_producer_consumer_roundtrip() {
299        let component = DirectComponent::new();
300
301        // Create consumer endpoint and start it
302        let consumer_endpoint = component
303            .create_endpoint("direct:test", &NoOpComponentContext)
304            .unwrap();
305        let mut consumer = consumer_endpoint.create_consumer().unwrap();
306
307        // The route channel now carries ExchangeEnvelope (request-reply support).
308        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
309        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
310
311        // Start the consumer in a background task
312        tokio::spawn(async move {
313            consumer.start(ctx).await.unwrap();
314        });
315
316        // Give the consumer a moment to register
317        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
318
319        // Spawn a pipeline simulator that reads envelopes and replies Ok.
320        tokio::spawn(async move {
321            while let Some(envelope) = route_rx.recv().await {
322                let ExchangeEnvelope { exchange, reply_tx } = envelope;
323                if let Some(tx) = reply_tx {
324                    let _ = tx.send(Ok(exchange));
325                }
326            }
327        });
328
329        // Now send an exchange via the producer
330        let ctx = test_producer_ctx();
331        let producer_endpoint = component
332            .create_endpoint("direct:test", &NoOpComponentContext)
333            .unwrap();
334        let producer = producer_endpoint.create_producer(&ctx).unwrap();
335
336        let exchange = Exchange::new(Message::new("hello direct"));
337        let result = producer.oneshot(exchange).await;
338
339        assert!(result.is_ok());
340        let reply = result.unwrap();
341        assert_eq!(reply.input.body.as_text(), Some("hello direct"));
342    }
343
344    #[tokio::test]
345    async fn test_direct_propagates_error_when_no_handler() {
346        let component = DirectComponent::new();
347
348        let consumer_endpoint = component
349            .create_endpoint("direct:err-test", &NoOpComponentContext)
350            .unwrap();
351        let mut consumer = consumer_endpoint.create_consumer().unwrap();
352
353        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
354        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
355
356        tokio::spawn(async move {
357            consumer.start(ctx).await.unwrap();
358        });
359
360        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
361
362        // Pipeline simulator that replies with Err (simulates no error handler).
363        tokio::spawn(async move {
364            while let Some(envelope) = route_rx.recv().await {
365                if let Some(tx) = envelope.reply_tx {
366                    let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
367                }
368            }
369        });
370
371        let ctx = test_producer_ctx();
372        let producer_endpoint = component
373            .create_endpoint("direct:err-test", &NoOpComponentContext)
374            .unwrap();
375        let producer = producer_endpoint.create_producer(&ctx).unwrap();
376
377        let exchange = Exchange::new(Message::new("test"));
378        let result = producer.oneshot(exchange).await;
379        assert!(result.is_err());
380        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
381    }
382
383    #[tokio::test]
384    async fn test_direct_consumer_stop_unregisters() {
385        let component = DirectComponent::new();
386        let endpoint = component
387            .create_endpoint("direct:cleanup", &NoOpComponentContext)
388            .unwrap();
389
390        // We need a consumer to register
391        let mut consumer = endpoint.create_consumer().unwrap();
392
393        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
394        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
395
396        // Start consumer in background
397        let handle = tokio::spawn(async move {
398            consumer.start(ctx).await.unwrap();
399        });
400
401        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
402
403        // Verify the name is registered
404        {
405            let reg = component.registry.lock().await;
406            assert!(reg.contains_key("cleanup"));
407        }
408
409        // Create a new consumer just to call stop (stop removes from registry)
410        let mut stop_consumer = DirectConsumer {
411            name: "cleanup".to_string(),
412            registry: Arc::clone(&component.registry),
413        };
414        stop_consumer.stop().await.unwrap();
415
416        // Verify removed from registry
417        {
418            let reg = component.registry.lock().await;
419            assert!(!reg.contains_key("cleanup"));
420        }
421
422        handle.abort();
423    }
424
425    #[tokio::test]
426    async fn test_direct_consumer_respects_cancellation() {
427        use tokio_util::sync::CancellationToken;
428
429        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
430        let token = CancellationToken::new();
431        let (tx, _rx) = mpsc::channel(16);
432        let ctx = ConsumerContext::new(tx, token.clone());
433
434        let mut consumer = DirectConsumer {
435            name: "cancel-test".to_string(),
436            registry: registry.clone(),
437        };
438
439        let handle = tokio::spawn(async move {
440            consumer.start(ctx).await.unwrap();
441        });
442
443        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
444        assert!(registry.lock().await.contains_key("cancel-test"));
445
446        token.cancel();
447        let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
448        assert!(
449            result.is_ok(),
450            "Consumer should have stopped after cancellation"
451        );
452
453        // After cancellation, the consumer should have cleaned up the registry
454        assert!(!registry.lock().await.contains_key("cancel-test"));
455    }
456}