Skip to main content

camel_component_direct/
lib.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use async_trait::async_trait;
8use tokio::sync::{Mutex, mpsc, oneshot};
9use tower::Service;
10
11use camel_component_api::UriConfig;
12use camel_component_api::{BoxProcessor, CamelError, Exchange};
13use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
14
15// ---------------------------------------------------------------------------
16// Shared state: maps endpoint names to senders that deliver exchanges to the
17// consumer side.  Each entry holds a sender of `(Exchange, oneshot::Sender)`
18// so the producer can wait for the consumer's pipeline to finish processing
19// and receive the (possibly transformed) exchange back.
20// ---------------------------------------------------------------------------
21
22type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
23type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
24
25// ---------------------------------------------------------------------------
26// DirectConfig
27// ---------------------------------------------------------------------------
28
29/// Configuration for Direct endpoints parsed from URIs.
30///
31/// URI format: `direct:name`
32///
33/// Example: `direct:foo` creates an endpoint named "foo"
34#[derive(Debug, Clone, UriConfig)]
35#[uri_scheme = "direct"]
36#[uri_config(crate = "camel_component_api")]
37pub struct DirectConfig {
38    /// Endpoint name (path portion).
39    pub name: String,
40}
41
42// ---------------------------------------------------------------------------
43// DirectComponent
44// ---------------------------------------------------------------------------
45
46/// The Direct component provides in-memory synchronous communication between
47/// routes.
48///
49/// URI format: `direct:name`
50///
51/// A producer sending to `direct:foo` will block until the consumer on
52/// `direct:foo` has finished processing the exchange.
53pub struct DirectComponent {
54    registry: DirectRegistry,
55}
56
57impl DirectComponent {
58    pub fn new() -> Self {
59        Self {
60            registry: Arc::new(Mutex::new(HashMap::new())),
61        }
62    }
63}
64
65impl Default for DirectComponent {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71impl Component for DirectComponent {
72    fn scheme(&self) -> &str {
73        "direct"
74    }
75
76    fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
77        let config = DirectConfig::from_uri(uri)?;
78        Ok(Box::new(DirectEndpoint {
79            uri: uri.to_string(),
80            name: config.name,
81            registry: Arc::clone(&self.registry),
82        }))
83    }
84}
85
86// ---------------------------------------------------------------------------
87// DirectEndpoint
88// ---------------------------------------------------------------------------
89
90struct DirectEndpoint {
91    uri: String,
92    name: String,
93    registry: DirectRegistry,
94}
95
96impl Endpoint for DirectEndpoint {
97    fn uri(&self) -> &str {
98        &self.uri
99    }
100
101    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
102        Ok(Box::new(DirectConsumer {
103            name: self.name.clone(),
104            registry: Arc::clone(&self.registry),
105        }))
106    }
107
108    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
109        Ok(BoxProcessor::new(DirectProducer {
110            name: self.name.clone(),
111            registry: Arc::clone(&self.registry),
112        }))
113    }
114}
115
116// ---------------------------------------------------------------------------
117// DirectConsumer
118// ---------------------------------------------------------------------------
119
120/// The Direct consumer registers itself in the shared registry and forwards
121/// incoming exchanges to the route pipeline via `ConsumerContext`.
122struct DirectConsumer {
123    name: String,
124    registry: DirectRegistry,
125}
126
127#[async_trait]
128impl Consumer for DirectConsumer {
129    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
130        // Create a channel for producers to send exchanges to this consumer.
131        let (tx, mut rx) =
132            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
133
134        // Register ourselves so producers can find us.
135        {
136            let mut reg = self.registry.lock().await;
137            reg.insert(self.name.clone(), tx);
138        }
139
140        // Process incoming exchanges with cooperative cancellation.
141        loop {
142            tokio::select! {
143                _ = context.cancelled() => {
144                    tracing::debug!("Direct '{}' received cancellation, stopping", self.name);
145                    break;
146                }
147                msg = rx.recv() => {
148                    match msg {
149                        Some((exchange, reply_tx)) => {
150                            let result = context.send_and_wait(exchange).await;
151                            let _ = reply_tx.send(result);
152                        }
153                        None => break,
154                    }
155                }
156            }
157        }
158
159        // Cleanup: remove from registry on exit
160        {
161            let mut reg = self.registry.lock().await;
162            reg.remove(&self.name);
163        }
164
165        Ok(())
166    }
167
168    async fn stop(&mut self) -> Result<(), CamelError> {
169        // Remove from registry so no new producers can send to us.
170        let mut reg = self.registry.lock().await;
171        reg.remove(&self.name);
172        Ok(())
173    }
174}
175
176// ---------------------------------------------------------------------------
177// DirectProducer
178// ---------------------------------------------------------------------------
179
180/// The Direct producer sends an exchange to the named direct endpoint and
181/// waits for a reply (synchronous in-memory call).
182#[derive(Clone)]
183struct DirectProducer {
184    name: String,
185    registry: DirectRegistry,
186}
187
188impl Service<Exchange> for DirectProducer {
189    type Response = Exchange;
190    type Error = CamelError;
191    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
192
193    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194        Poll::Ready(Ok(()))
195    }
196
197    fn call(&mut self, exchange: Exchange) -> Self::Future {
198        let name = self.name.clone();
199        let registry = Arc::clone(&self.registry);
200
201        Box::pin(async move {
202            let reg = registry.lock().await;
203            let sender = reg.get(&name).ok_or_else(|| {
204                CamelError::EndpointCreationFailed(format!(
205                    "no consumer registered for direct:{name}"
206                ))
207            })?;
208
209            let (reply_tx, reply_rx) = oneshot::channel();
210            sender
211                .send((exchange, reply_tx))
212                .await
213                .map_err(|_| CamelError::ChannelClosed)?;
214
215            // Drop the lock before awaiting the reply to avoid deadlocks.
216            drop(reg);
217
218            // Propagate Ok or Err from the subroute pipeline.
219            reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
220        })
221    }
222}
223
224// ---------------------------------------------------------------------------
225// Tests
226// ---------------------------------------------------------------------------
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use camel_component_api::ExchangeEnvelope;
232    use camel_component_api::Message;
233    use tower::ServiceExt;
234
235    fn test_producer_ctx() -> ProducerContext {
236        ProducerContext::new()
237    }
238
239    #[test]
240    fn test_direct_component_scheme() {
241        let component = DirectComponent::new();
242        assert_eq!(component.scheme(), "direct");
243    }
244
245    #[test]
246    fn test_direct_creates_endpoint() {
247        let component = DirectComponent::new();
248        let endpoint = component.create_endpoint("direct:foo");
249        assert!(endpoint.is_ok());
250    }
251
252    #[test]
253    fn test_direct_wrong_scheme() {
254        let component = DirectComponent::new();
255        let result = component.create_endpoint("timer:tick");
256        assert!(result.is_err());
257    }
258
259    #[test]
260    fn test_direct_endpoint_creates_consumer() {
261        let component = DirectComponent::new();
262        let endpoint = component.create_endpoint("direct:foo").unwrap();
263        assert!(endpoint.create_consumer().is_ok());
264    }
265
266    #[test]
267    fn test_direct_endpoint_creates_producer() {
268        let ctx = test_producer_ctx();
269        let component = DirectComponent::new();
270        let endpoint = component.create_endpoint("direct:foo").unwrap();
271        assert!(endpoint.create_producer(&ctx).is_ok());
272    }
273
274    #[tokio::test]
275    async fn test_direct_producer_no_consumer_registered() {
276        let ctx = test_producer_ctx();
277        let component = DirectComponent::new();
278        let endpoint = component.create_endpoint("direct:missing").unwrap();
279        let producer = endpoint.create_producer(&ctx).unwrap();
280
281        let exchange = Exchange::new(Message::new("test"));
282        let result = producer.oneshot(exchange).await;
283        assert!(result.is_err());
284    }
285
286    #[tokio::test]
287    async fn test_direct_producer_consumer_roundtrip() {
288        let component = DirectComponent::new();
289
290        // Create consumer endpoint and start it
291        let consumer_endpoint = component.create_endpoint("direct:test").unwrap();
292        let mut consumer = consumer_endpoint.create_consumer().unwrap();
293
294        // The route channel now carries ExchangeEnvelope (request-reply support).
295        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
296        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
297
298        // Start the consumer in a background task
299        tokio::spawn(async move {
300            consumer.start(ctx).await.unwrap();
301        });
302
303        // Give the consumer a moment to register
304        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
305
306        // Spawn a pipeline simulator that reads envelopes and replies Ok.
307        tokio::spawn(async move {
308            while let Some(envelope) = route_rx.recv().await {
309                let ExchangeEnvelope { exchange, reply_tx } = envelope;
310                if let Some(tx) = reply_tx {
311                    let _ = tx.send(Ok(exchange));
312                }
313            }
314        });
315
316        // Now send an exchange via the producer
317        let ctx = test_producer_ctx();
318        let producer_endpoint = component.create_endpoint("direct:test").unwrap();
319        let producer = producer_endpoint.create_producer(&ctx).unwrap();
320
321        let exchange = Exchange::new(Message::new("hello direct"));
322        let result = producer.oneshot(exchange).await;
323
324        assert!(result.is_ok());
325        let reply = result.unwrap();
326        assert_eq!(reply.input.body.as_text(), Some("hello direct"));
327    }
328
329    #[tokio::test]
330    async fn test_direct_propagates_error_when_no_handler() {
331        let component = DirectComponent::new();
332
333        let consumer_endpoint = component.create_endpoint("direct:err-test").unwrap();
334        let mut consumer = consumer_endpoint.create_consumer().unwrap();
335
336        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
337        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
338
339        tokio::spawn(async move {
340            consumer.start(ctx).await.unwrap();
341        });
342
343        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
344
345        // Pipeline simulator that replies with Err (simulates no error handler).
346        tokio::spawn(async move {
347            while let Some(envelope) = route_rx.recv().await {
348                if let Some(tx) = envelope.reply_tx {
349                    let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
350                }
351            }
352        });
353
354        let ctx = test_producer_ctx();
355        let producer_endpoint = component.create_endpoint("direct:err-test").unwrap();
356        let producer = producer_endpoint.create_producer(&ctx).unwrap();
357
358        let exchange = Exchange::new(Message::new("test"));
359        let result = producer.oneshot(exchange).await;
360        assert!(result.is_err());
361        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
362    }
363
364    #[tokio::test]
365    async fn test_direct_consumer_stop_unregisters() {
366        let component = DirectComponent::new();
367        let endpoint = component.create_endpoint("direct:cleanup").unwrap();
368
369        // We need a consumer to register
370        let mut consumer = endpoint.create_consumer().unwrap();
371
372        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
373        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
374
375        // Start consumer in background
376        let handle = tokio::spawn(async move {
377            consumer.start(ctx).await.unwrap();
378        });
379
380        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
381
382        // Verify the name is registered
383        {
384            let reg = component.registry.lock().await;
385            assert!(reg.contains_key("cleanup"));
386        }
387
388        // Create a new consumer just to call stop (stop removes from registry)
389        let mut stop_consumer = DirectConsumer {
390            name: "cleanup".to_string(),
391            registry: Arc::clone(&component.registry),
392        };
393        stop_consumer.stop().await.unwrap();
394
395        // Verify removed from registry
396        {
397            let reg = component.registry.lock().await;
398            assert!(!reg.contains_key("cleanup"));
399        }
400
401        handle.abort();
402    }
403
404    #[tokio::test]
405    async fn test_direct_consumer_respects_cancellation() {
406        use tokio_util::sync::CancellationToken;
407
408        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
409        let token = CancellationToken::new();
410        let (tx, _rx) = mpsc::channel(16);
411        let ctx = ConsumerContext::new(tx, token.clone());
412
413        let mut consumer = DirectConsumer {
414            name: "cancel-test".to_string(),
415            registry: registry.clone(),
416        };
417
418        let handle = tokio::spawn(async move {
419            consumer.start(ctx).await.unwrap();
420        });
421
422        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
423        assert!(registry.lock().await.contains_key("cancel-test"));
424
425        token.cancel();
426        let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
427        assert!(
428            result.is_ok(),
429            "Consumer should have stopped after cancellation"
430        );
431
432        // After cancellation, the consumer should have cleaned up the registry
433        assert!(!registry.lock().await.contains_key("cancel-test"));
434    }
435}