Skip to main content

camel_component_direct/
lib.rs

1//! In-memory direct component for rust-camel — synchronous point-to-point
2//! channel between routes sharing the same context with no serialization overhead.
3//!
4//! Main types: `DirectComponent`, `DirectEndpoint`, `DirectConsumer`, `DirectProducer`.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tokio_util::sync::CancellationToken;
17use tower::Service;
18
19use camel_component_api::parse_uri;
20use camel_component_api::{BoxProcessor, CamelError, Exchange};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tracing::{debug, error, info, warn};
23
24// ---------------------------------------------------------------------------
25// Shared state: maps endpoint names to senders that deliver exchanges to the
26// consumer side.  Each entry holds a sender of `(Exchange, oneshot::Sender)`
27// so the producer can wait for the consumer's pipeline to finish processing
28// and receive the (possibly transformed) exchange back.
29// ---------------------------------------------------------------------------
30
31type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
32type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
33type AcquirePermitFut =
34    Pin<Box<dyn Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send>>;
35
36// ---------------------------------------------------------------------------
37// Validation helpers
38// ---------------------------------------------------------------------------
39
40/// Validate the direct endpoint name (the part after `direct:`).
41fn validate_name(name: &str) -> Result<(), CamelError> {
42    if name.trim().is_empty() {
43        return Err(CamelError::InvalidUri(
44            "direct: endpoint name must not be empty".to_string(),
45        ));
46    }
47    if name.contains(char::is_whitespace) {
48        return Err(CamelError::InvalidUri(
49            "direct: endpoint name must not contain whitespace".to_string(),
50        ));
51    }
52    Ok(())
53}
54
55// ---------------------------------------------------------------------------
56// DirectConfig
57// ---------------------------------------------------------------------------
58
59/// Configuration for Direct endpoints parsed from URIs.
60///
61/// URI format: `direct:name[?timeout_ms=30000]`
62///
63/// Example: `direct:foo` creates an endpoint named "foo"
64#[derive(Debug, Clone)]
65pub struct DirectConfig {
66    /// Endpoint name (path portion).
67    pub name: String,
68    /// Timeout in milliseconds for producer `call()`. Defaults to 30 000 ms.
69    pub timeout_ms: Option<u64>,
70    /// When false, the producer returns immediately if no consumer is registered.
71    /// TODO(DIR-001): implement non-blocking send
72    pub block: Option<bool>,
73    /// When false, skip readiness error if no consumer registered.
74    pub fail_if_no_consumers: Option<bool>,
75    /// TODO(DIR-005): implement bridgeErrorHandler routing
76    pub bridge_error_handler: Option<bool>,
77    /// TODO(DIR-005): implement exchangePattern override
78    pub exchange_pattern: Option<String>,
79}
80
81impl DirectConfig {
82    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
83        let parts = parse_uri(uri)?;
84        if parts.scheme != "direct" {
85            return Err(CamelError::InvalidUri(format!(
86                "invalid scheme '{}', expected 'direct'",
87                parts.scheme
88            )));
89        }
90
91        let parse_bool = |name: &str, value: &str| -> Result<bool, CamelError> {
92            match value.to_ascii_lowercase().as_str() {
93                "true" | "1" | "yes" => Ok(true),
94                "false" | "0" | "no" => Ok(false),
95                _ => Err(CamelError::InvalidUri(format!(
96                    "invalid value for {}: invalid boolean value: '{}'",
97                    name, value
98                ))),
99            }
100        };
101
102        let timeout_ms = parts
103            .params
104            .get("timeout_ms")
105            .map(|v| {
106                v.parse::<u64>().map_err(|e| {
107                    CamelError::InvalidUri(format!("invalid value for timeout_ms: {}", e))
108                })
109            })
110            .transpose()?;
111
112        let block = parts
113            .params
114            .get("block")
115            .map(|v| parse_bool("block", v))
116            .transpose()?;
117
118        let fail_if_no_consumers = parts
119            .params
120            .get("fail_if_no_consumers")
121            .or_else(|| parts.params.get("failIfNoConsumers"))
122            .map(|v| parse_bool("fail_if_no_consumers", v))
123            .transpose()?;
124
125        let bridge_error_handler = parts
126            .params
127            .get("bridge_error_handler")
128            .or_else(|| parts.params.get("bridgeErrorHandler"))
129            .map(|v| parse_bool("bridge_error_handler", v))
130            .transpose()?;
131
132        let exchange_pattern = parts
133            .params
134            .get("exchange_pattern")
135            .or_else(|| parts.params.get("exchangePattern"))
136            .cloned();
137
138        Ok(Self {
139            name: parts.path,
140            timeout_ms,
141            block,
142            fail_if_no_consumers,
143            bridge_error_handler,
144            exchange_pattern,
145        })
146    }
147}
148
149// ---------------------------------------------------------------------------
150// DirectComponent
151// ---------------------------------------------------------------------------
152
153/// The Direct component provides in-memory synchronous communication between
154/// routes.
155///
156/// URI format: `direct:name`
157///
158/// A producer sending to `direct:foo` will block until the consumer on
159/// `direct:foo` has finished processing the exchange.
160pub struct DirectComponent {
161    registry: DirectRegistry,
162}
163
164impl DirectComponent {
165    pub fn new() -> Self {
166        Self {
167            registry: Arc::new(Mutex::new(HashMap::new())),
168        }
169    }
170}
171
172impl Default for DirectComponent {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178impl Component for DirectComponent {
179    fn scheme(&self) -> &str {
180        "direct"
181    }
182
183    fn create_endpoint(
184        &self,
185        uri: &str,
186        _ctx: &dyn camel_component_api::ComponentContext,
187    ) -> Result<Box<dyn Endpoint>, CamelError> {
188        let config = DirectConfig::from_uri(uri)?;
189        validate_name(&config.name)?;
190        let name = config.name.clone();
191        debug!(endpoint_name = %name, "direct endpoint created");
192        Ok(Box::new(DirectEndpoint {
193            uri: uri.to_string(),
194            config,
195            registry: Arc::clone(&self.registry),
196        }))
197    }
198}
199
200// ---------------------------------------------------------------------------
201// DirectEndpoint
202// ---------------------------------------------------------------------------
203
204struct DirectEndpoint {
205    uri: String,
206    config: DirectConfig,
207    registry: DirectRegistry,
208}
209
210impl Endpoint for DirectEndpoint {
211    fn uri(&self) -> &str {
212        &self.uri
213    }
214
215    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
216        Ok(Box::new(DirectConsumer {
217            name: self.config.name.clone(),
218            registry: Arc::clone(&self.registry),
219            cancel: None,
220            handle: None,
221        }))
222    }
223
224    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
225        Ok(BoxProcessor::new(DirectProducer {
226            name: self.config.name.clone(),
227            registry: Arc::clone(&self.registry),
228            config: self.config.clone(),
229            semaphore: Arc::new(Semaphore::new(1)),
230            pending_permit: None,
231            acquire_fut: None,
232            fail_if_no_consumers: self.config.fail_if_no_consumers,
233        }))
234    }
235}
236
237// ---------------------------------------------------------------------------
238// DirectConsumer
239// ---------------------------------------------------------------------------
240
241/// The Direct consumer registers itself in the shared registry and forwards
242/// incoming exchanges to the route pipeline via `ConsumerContext`.
243struct DirectConsumer {
244    name: String,
245    registry: DirectRegistry,
246    cancel: Option<CancellationToken>,
247    handle: Option<JoinHandle<()>>,
248}
249
250#[async_trait]
251impl Consumer for DirectConsumer {
252    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
253        // Create a channel for producers to send exchanges to this consumer.
254        let (tx, mut rx) =
255            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
256
257        // Register ourselves so producers can find us.
258        {
259            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
260            if let Some(existing) = reg.get(&self.name)
261                && !existing.is_closed()
262            {
263                return Err(CamelError::EndpointCreationFailed(format!(
264                    "direct endpoint '{}' already has a registered consumer",
265                    self.name
266                )));
267            }
268            reg.insert(self.name.clone(), tx);
269        }
270
271        let name = self.name.clone();
272        let registry = Arc::clone(&self.registry);
273        let cancel = context.cancel_token();
274        let cancel_clone = cancel.clone();
275
276        info!(endpoint_name = %self.name, "direct consumer started");
277
278        // Spawn the consumer loop so start() returns immediately.
279        let handle = tokio::spawn(async move {
280            loop {
281                tokio::select! {
282                    _ = cancel_clone.cancelled() => {
283                        debug!(endpoint_name = %name, "direct consumer received cancellation");
284                        break;
285                    }
286                    msg = rx.recv() => {
287                        match msg {
288                            Some((exchange, reply_tx)) => {
289                                debug!(
290                                    endpoint_name = %name,
291                                    exchange_id = %exchange.correlation_id,
292                                    "direct consumer received exchange"
293                                );
294                                let result = context.send_and_wait(exchange).await;
295                                if let Err(ref err) = result {
296                                    error!(
297                                        endpoint_name = %name,
298                                        error = %err,
299                                        "direct consumer pipeline error"
300                                    );
301                                }
302                                let _ = reply_tx.send(result);
303                            }
304                            None => break,
305                        }
306                    }
307                }
308            }
309
310            // Cleanup: remove from registry on exit
311            {
312                let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
313                reg.remove(&name);
314            }
315
316            debug!(endpoint_name = %name, "direct consumer stopped");
317        });
318
319        self.cancel = Some(cancel);
320        self.handle = Some(handle);
321        Ok(())
322    }
323
324    async fn stop(&mut self) -> Result<(), CamelError> {
325        // Cancel the consumer loop if we have a cancellation token.
326        if let Some(cancel) = self.cancel.take() {
327            cancel.cancel();
328        }
329
330        // Wait for the spawned task to finish (with a 5s timeout).
331        if let Some(mut h) = self.handle.take() {
332            if tokio::time::timeout(Duration::from_secs(5), &mut h)
333                .await
334                .is_err()
335            {
336                h.abort();
337                let _ = h.await;
338                warn!(endpoint_name = %self.name, "consumer task did not stop in 5s; aborted");
339                // Aborted task cannot clean up its own registry entry — do it here.
340                let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
341                reg.remove(&self.name);
342            }
343        } else {
344            // No join handle — just remove from registry.
345            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
346            reg.remove(&self.name);
347        }
348
349        debug!(endpoint_name = %self.name, "direct consumer stopped");
350        Ok(())
351    }
352}
353
354// ---------------------------------------------------------------------------
355// DirectProducer
356// ---------------------------------------------------------------------------
357
358/// The Direct producer sends an exchange to the named direct endpoint and
359/// waits for a reply (synchronous in-memory call).
360struct DirectProducer {
361    name: String,
362    registry: DirectRegistry,
363    config: DirectConfig,
364    semaphore: Arc<Semaphore>,
365    pending_permit: Option<OwnedSemaphorePermit>,
366    acquire_fut: Option<AcquirePermitFut>,
367    fail_if_no_consumers: Option<bool>,
368}
369
370impl Clone for DirectProducer {
371    fn clone(&self) -> Self {
372        Self {
373            name: self.name.clone(),
374            registry: self.registry.clone(),
375            config: self.config.clone(),
376            semaphore: self.semaphore.clone(),
377            pending_permit: None,
378            acquire_fut: None,
379            fail_if_no_consumers: self.fail_if_no_consumers,
380        }
381    }
382}
383
384impl Service<Exchange> for DirectProducer {
385    type Response = Exchange;
386    type Error = CamelError;
387    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
388
389    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
390        // If we already hold a permit we are ready.
391        if self.pending_permit.is_some() {
392            return Poll::Ready(Ok(()));
393        }
394
395        // Check that the endpoint is registered.
396        {
397            let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
398            match reg.get(&self.name) {
399                None => {
400                    if self.fail_if_no_consumers != Some(false) {
401                        return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
402                            "direct endpoint '{}' not registered",
403                            self.name
404                        ))));
405                    }
406                }
407                Some(sender) if sender.is_closed() => {
408                    return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
409                        "direct endpoint '{}' channel closed",
410                        self.name
411                    ))));
412                }
413                Some(_) => {}
414            }
415        }
416
417        // Acquire a semaphore permit (bounded concurrency).
418        let fut = self
419            .acquire_fut
420            .get_or_insert_with(|| Box::pin(Arc::clone(&self.semaphore).acquire_owned()));
421        match fut.as_mut().poll(cx) {
422            Poll::Ready(Ok(permit)) => {
423                self.acquire_fut = None;
424                self.pending_permit = Some(permit);
425                Poll::Ready(Ok(()))
426            }
427            Poll::Ready(Err(_)) => Poll::Ready(Err(CamelError::ChannelClosed)),
428            Poll::Pending => Poll::Pending,
429        }
430    }
431
432    fn call(&mut self, exchange: Exchange) -> Self::Future {
433        let _permit = match self.pending_permit.take() {
434            Some(p) => p,
435            None => {
436                return Box::pin(async {
437                    Err(CamelError::ProcessorError(
438                        "call() invoked without poll_ready()".into(),
439                    ))
440                });
441            }
442        };
443
444        let name = self.name.clone();
445        let registry = Arc::clone(&self.registry);
446        let timeout = Duration::from_millis(self.config.timeout_ms.unwrap_or(30_000));
447        let exchange_id = exchange.correlation_id.clone();
448
449        debug!(
450            endpoint_name = %name,
451            exchange_id = %exchange_id,
452            "direct producer call entry"
453        );
454
455        Box::pin(async move {
456            tokio::time::timeout(timeout, async {
457                let sender = {
458                    let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
459                    reg.get(&name)
460                        .ok_or_else(|| {
461                            let err = CamelError::EndpointCreationFailed(format!(
462                                "no consumer registered for direct:{name}"
463                            ));
464                            error!(endpoint_name = %name, error = %err, "direct send failed");
465                            err
466                        })?
467                        .clone()
468                };
469
470                let (reply_tx, reply_rx) = oneshot::channel();
471                sender.send((exchange, reply_tx)).await.map_err(|err| {
472                    error!(endpoint_name = %name, error = %err, "direct send failed");
473                    CamelError::ChannelClosed
474                })?;
475
476                let result = reply_rx.await.map_err(|err| {
477                    error!(endpoint_name = %name, error = %err, "direct send failed");
478                    CamelError::ChannelClosed
479                })?;
480
481                debug!(endpoint_name = %name, "direct message sent");
482                result
483            })
484            .await
485            .map_err(|_| CamelError::ProcessorError(format!("direct:{name} call timed out")))?
486        })
487    }
488}
489
490// ---------------------------------------------------------------------------
491// Tests
492// ---------------------------------------------------------------------------
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use camel_component_api::ExchangeEnvelope;
498    use camel_component_api::Message;
499    use camel_component_api::NoOpComponentContext;
500    use std::task::RawWakerVTable;
501    use tower::ServiceExt;
502
503    fn noop_waker() -> std::task::Waker {
504        const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
505        const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
506        unsafe { std::task::Waker::from_raw(RAW) }
507    }
508
509    fn test_producer_ctx() -> ProducerContext {
510        ProducerContext::new()
511    }
512
513    #[test]
514    fn test_direct_component_scheme() {
515        let component = DirectComponent::new();
516        assert_eq!(component.scheme(), "direct");
517    }
518
519    #[test]
520    fn test_direct_component_default() {
521        let component = DirectComponent::default();
522        assert_eq!(component.scheme(), "direct");
523    }
524
525    #[test]
526    fn test_direct_config_from_uri() {
527        let config = DirectConfig::from_uri("direct:orders").unwrap();
528        assert_eq!(config.name, "orders");
529    }
530
531    #[test]
532    fn test_direct_endpoint_uri() {
533        let component = DirectComponent::new();
534        let endpoint = component
535            .create_endpoint("direct:uri-check", &NoOpComponentContext)
536            .unwrap();
537        assert_eq!(endpoint.uri(), "direct:uri-check");
538    }
539
540    #[test]
541    fn test_direct_creates_endpoint() {
542        let component = DirectComponent::new();
543        let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
544        assert!(endpoint.is_ok());
545    }
546
547    #[test]
548    fn test_direct_wrong_scheme() {
549        let component = DirectComponent::new();
550        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
551        assert!(result.is_err());
552    }
553
554    #[test]
555    fn test_direct_endpoint_creates_consumer() {
556        let component = DirectComponent::new();
557        let endpoint = component
558            .create_endpoint("direct:foo", &NoOpComponentContext)
559            .unwrap();
560        assert!(endpoint.create_consumer().is_ok());
561    }
562
563    #[test]
564    fn test_direct_endpoint_creates_producer() {
565        let ctx = test_producer_ctx();
566        let component = DirectComponent::new();
567        let endpoint = component
568            .create_endpoint("direct:foo", &NoOpComponentContext)
569            .unwrap();
570        assert!(endpoint.create_producer(&ctx).is_ok());
571    }
572
573    #[test]
574    fn test_direct_empty_name_rejected() {
575        let component = DirectComponent::new();
576        match component.create_endpoint("direct:", &NoOpComponentContext) {
577            Err(e) => assert!(
578                e.to_string().contains("must not be empty"),
579                "unexpected error: {e}"
580            ),
581            Ok(_) => panic!("expected error for empty name"),
582        }
583    }
584
585    #[tokio::test]
586    async fn test_direct_producer_no_consumer_registered() {
587        let ctx = test_producer_ctx();
588        let component = DirectComponent::new();
589        let endpoint = component
590            .create_endpoint("direct:missing", &NoOpComponentContext)
591            .unwrap();
592        let producer = endpoint.create_producer(&ctx).unwrap();
593
594        let exchange = Exchange::new(Message::new("test"));
595        let result = producer.oneshot(exchange).await;
596        assert!(result.is_err());
597    }
598
599    #[tokio::test]
600    async fn test_direct_duplicate_consumer_returns_error() {
601        let component = DirectComponent::new();
602        let endpoint = component
603            .create_endpoint("direct:dup", &NoOpComponentContext)
604            .unwrap();
605
606        let mut consumer_a = endpoint.create_consumer().unwrap();
607        let mut consumer_b = endpoint.create_consumer().unwrap();
608
609        let (route_tx_a, _route_rx_a) = mpsc::channel::<ExchangeEnvelope>(16);
610        let ctx_a = ConsumerContext::new(route_tx_a, tokio_util::sync::CancellationToken::new());
611        consumer_a.start(ctx_a).await.unwrap();
612
613        let (route_tx_b, _route_rx_b) = mpsc::channel::<ExchangeEnvelope>(16);
614        let ctx_b = ConsumerContext::new(route_tx_b, tokio_util::sync::CancellationToken::new());
615        let result = consumer_b.start(ctx_b).await;
616
617        assert!(matches!(
618            result,
619            Err(CamelError::EndpointCreationFailed(msg))
620                if msg.contains("already has a registered consumer")
621        ));
622
623        consumer_a.stop().await.unwrap();
624    }
625
626    #[tokio::test]
627    async fn test_direct_producer_consumer_roundtrip() {
628        let component = DirectComponent::new();
629
630        // Create consumer endpoint and start it
631        let consumer_endpoint = component
632            .create_endpoint("direct:test", &NoOpComponentContext)
633            .unwrap();
634        let mut consumer = consumer_endpoint.create_consumer().unwrap();
635
636        // The route channel now carries ExchangeEnvelope (request-reply support).
637        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
638        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
639
640        // Start the consumer in a background task
641        tokio::spawn(async move {
642            consumer.start(ctx).await.unwrap();
643        });
644
645        // Give the consumer a moment to register
646        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
647
648        // Spawn a pipeline simulator that reads envelopes and replies Ok.
649        tokio::spawn(async move {
650            while let Some(envelope) = route_rx.recv().await {
651                let ExchangeEnvelope { exchange, reply_tx } = envelope;
652                if let Some(tx) = reply_tx {
653                    let _ = tx.send(Ok(exchange));
654                }
655            }
656        });
657
658        // Now send an exchange via the producer
659        let ctx = test_producer_ctx();
660        let producer_endpoint = component
661            .create_endpoint("direct:test", &NoOpComponentContext)
662            .unwrap();
663        let producer = producer_endpoint.create_producer(&ctx).unwrap();
664
665        let exchange = Exchange::new(Message::new("hello direct"));
666        let result = producer.oneshot(exchange).await;
667
668        assert!(result.is_ok());
669        let reply = result.unwrap();
670        assert_eq!(reply.input.body.as_text(), Some("hello direct"));
671    }
672
673    #[tokio::test]
674    async fn test_direct_propagates_error_when_no_handler() {
675        let component = DirectComponent::new();
676
677        let consumer_endpoint = component
678            .create_endpoint("direct:err-test", &NoOpComponentContext)
679            .unwrap();
680        let mut consumer = consumer_endpoint.create_consumer().unwrap();
681
682        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
683        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
684
685        tokio::spawn(async move {
686            consumer.start(ctx).await.unwrap();
687        });
688
689        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
690
691        // Pipeline simulator that replies with Err (simulates no error handler).
692        tokio::spawn(async move {
693            while let Some(envelope) = route_rx.recv().await {
694                if let Some(tx) = envelope.reply_tx {
695                    let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
696                }
697            }
698        });
699
700        let ctx = test_producer_ctx();
701        let producer_endpoint = component
702            .create_endpoint("direct:err-test", &NoOpComponentContext)
703            .unwrap();
704        let producer = producer_endpoint.create_producer(&ctx).unwrap();
705
706        let exchange = Exchange::new(Message::new("test"));
707        let result = producer.oneshot(exchange).await;
708        assert!(result.is_err());
709        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
710    }
711
712    #[tokio::test]
713    async fn test_direct_consumer_stop_unregisters() {
714        let component = DirectComponent::new();
715        let endpoint = component
716            .create_endpoint("direct:cleanup", &NoOpComponentContext)
717            .unwrap();
718
719        // We need a consumer to register
720        let mut consumer = endpoint.create_consumer().unwrap();
721
722        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
723        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
724
725        // Start consumer in background
726        let handle = tokio::spawn(async move {
727            consumer.start(ctx).await.unwrap();
728        });
729
730        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
731
732        // Verify the name is registered
733        {
734            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
735            assert!(reg.contains_key("cleanup"));
736        }
737
738        // Create a new consumer just to call stop (stop removes from registry)
739        let mut stop_consumer = DirectConsumer {
740            name: "cleanup".to_string(),
741            registry: Arc::clone(&component.registry),
742            cancel: None,
743            handle: None,
744        };
745        stop_consumer.stop().await.unwrap();
746
747        // Verify removed from registry
748        {
749            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
750            assert!(!reg.contains_key("cleanup"));
751        }
752
753        handle.abort();
754    }
755
756    #[tokio::test]
757    async fn test_direct_consumer_respects_cancellation() {
758        use tokio_util::sync::CancellationToken;
759
760        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
761        let token = CancellationToken::new();
762        let (tx, _rx) = mpsc::channel(16);
763        let ctx = ConsumerContext::new(tx, token.clone());
764
765        let mut consumer = DirectConsumer {
766            name: "cancel-test".to_string(),
767            registry: registry.clone(),
768            cancel: None,
769            handle: None,
770        };
771
772        let handle = tokio::spawn(async move {
773            consumer.start(ctx).await.unwrap();
774        });
775
776        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
777        assert!(
778            registry
779                .lock()
780                .unwrap_or_else(|e| e.into_inner())
781                .contains_key("cancel-test")
782        );
783
784        token.cancel();
785        // start() now spawns the loop internally and returns immediately,
786        // so the outer handle completes right away. Give the inner task time
787        // to react to the cancellation and clean up the registry.
788        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
789
790        // After cancellation, the consumer should have cleaned up the registry
791        assert!(
792            !registry
793                .lock()
794                .unwrap_or_else(|e| e.into_inner())
795                .contains_key("cancel-test")
796        );
797
798        let _ = handle.await;
799    }
800
801    #[tokio::test]
802    async fn test_direct_consumer_stop_missing_entry_is_ok() {
803        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
804        let mut consumer = DirectConsumer {
805            name: "never-registered".to_string(),
806            registry,
807            cancel: None,
808            handle: None,
809        };
810        let result = consumer.stop().await;
811        assert!(result.is_ok());
812    }
813
814    #[test]
815    fn test_poll_ready_endpoint_not_registered() {
816        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
817        let producer = DirectProducer {
818            name: "missing".to_string(),
819            registry,
820            config: DirectConfig {
821                name: "missing".to_string(),
822                timeout_ms: None,
823                block: None,
824                fail_if_no_consumers: None,
825                bridge_error_handler: None,
826                exchange_pattern: None,
827            },
828            semaphore: Arc::new(Semaphore::new(1)),
829            pending_permit: None,
830            acquire_fut: None,
831            fail_if_no_consumers: None,
832        };
833        let waker = noop_waker();
834        let mut cx = Context::from_waker(&waker);
835        let mut producer = producer;
836        let result = producer.poll_ready(&mut cx);
837        assert!(matches!(
838            result,
839            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
840        ));
841    }
842
843    #[test]
844    fn test_poll_ready_endpoint_registered() {
845        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
846        let (tx, _rx) =
847            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
848        registry.lock().unwrap().insert("active".to_string(), tx);
849        let producer = DirectProducer {
850            name: "active".to_string(),
851            registry,
852            config: DirectConfig {
853                name: "active".to_string(),
854                timeout_ms: None,
855                block: None,
856                fail_if_no_consumers: None,
857                bridge_error_handler: None,
858                exchange_pattern: None,
859            },
860            semaphore: Arc::new(Semaphore::new(1)),
861            pending_permit: None,
862            acquire_fut: None,
863            fail_if_no_consumers: None,
864        };
865        let waker = noop_waker();
866        let mut cx = Context::from_waker(&waker);
867        let mut producer = producer;
868        let result = producer.poll_ready(&mut cx);
869        assert!(matches!(result, Poll::Ready(Ok(()))));
870    }
871
872    #[test]
873    fn test_poll_ready_allows_missing_consumer_when_fail_if_no_consumers_false() {
874        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
875        let producer = DirectProducer {
876            name: "missing-ok".to_string(),
877            registry,
878            config: DirectConfig {
879                name: "missing-ok".to_string(),
880                timeout_ms: None,
881                block: None,
882                fail_if_no_consumers: Some(false),
883                bridge_error_handler: None,
884                exchange_pattern: None,
885            },
886            semaphore: Arc::new(Semaphore::new(1)),
887            pending_permit: None,
888            acquire_fut: None,
889            fail_if_no_consumers: Some(false),
890        };
891
892        let waker = noop_waker();
893        let mut cx = Context::from_waker(&waker);
894        let mut producer = producer;
895        let result = producer.poll_ready(&mut cx);
896        assert!(matches!(result, Poll::Ready(Ok(()))));
897    }
898
899    #[test]
900    fn test_poll_ready_channel_closed() {
901        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
902        let (tx, rx) =
903            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
904        drop(rx);
905        registry.lock().unwrap().insert("closed".to_string(), tx);
906        let producer = DirectProducer {
907            name: "closed".to_string(),
908            registry,
909            config: DirectConfig {
910                name: "closed".to_string(),
911                timeout_ms: None,
912                block: None,
913                fail_if_no_consumers: None,
914                bridge_error_handler: None,
915                exchange_pattern: None,
916            },
917            semaphore: Arc::new(Semaphore::new(1)),
918            pending_permit: None,
919            acquire_fut: None,
920            fail_if_no_consumers: None,
921        };
922        let waker = noop_waker();
923        let mut cx = Context::from_waker(&waker);
924        let mut producer = producer;
925        let result = producer.poll_ready(&mut cx);
926        assert!(matches!(
927            result,
928            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
929        ));
930    }
931
932    #[tokio::test]
933    async fn test_direct_stop_cancels_loop() {
934        use tokio_util::sync::CancellationToken;
935
936        let component = DirectComponent::new();
937        let endpoint = component
938            .create_endpoint("direct:stop-test", &NoOpComponentContext)
939            .unwrap();
940        let mut consumer = endpoint.create_consumer().unwrap();
941
942        let token = CancellationToken::new();
943        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
944        let ctx = ConsumerContext::new(route_tx, token.clone());
945
946        // start() blocks — it should run the consumer loop on a JoinHandle
947        // and return immediately. But currently start() IS the loop.
948        // We test stop() cancels the loop.
949        let handle = tokio::spawn(async move {
950            consumer.start(ctx).await.unwrap();
951        });
952
953        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
954        assert!(
955            component
956                .registry
957                .lock()
958                .unwrap_or_else(|e| e.into_inner())
959                .contains_key("stop-test")
960        );
961
962        // Create a new consumer just for stop
963        let mut stop_consumer = DirectConsumer {
964            name: "stop-test".to_string(),
965            registry: Arc::clone(&component.registry),
966            cancel: Some(token.clone()),
967            handle: None,
968        };
969        stop_consumer.stop().await.unwrap();
970
971        // The consumer loop should finish within 2s after stop
972        let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
973        assert!(result.is_ok(), "Consumer loop did not stop within 2s");
974
975        // Registry should be cleaned up
976        assert!(
977            !component
978                .registry
979                .lock()
980                .unwrap_or_else(|e| e.into_inner())
981                .contains_key("stop-test")
982        );
983    }
984
985    #[tokio::test]
986    async fn test_direct_producer_timeout() {
987        let component = DirectComponent::new();
988        let endpoint = component
989            .create_endpoint("direct:timeout-test", &NoOpComponentContext)
990            .unwrap();
991        let mut consumer = endpoint.create_consumer().unwrap();
992
993        // Consumer that never replies (simulates a stuck pipeline)
994        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
995        let token = tokio_util::sync::CancellationToken::new();
996        let ctx = ConsumerContext::new(route_tx, token.clone());
997        tokio::spawn(async move {
998            consumer.start(ctx).await.unwrap();
999        });
1000
1001        // Drain envelopes but hold onto reply_tx so the producer never gets a reply
1002        tokio::spawn(async move {
1003            let mut held_reply: Vec<oneshot::Sender<Result<Exchange, CamelError>>> = Vec::new();
1004            while let Some(envelope) = route_rx.recv().await {
1005                held_reply.push(envelope.reply_tx.unwrap());
1006            }
1007            drop(held_reply);
1008        });
1009
1010        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1011
1012        // Create producer with a short timeout
1013        let _ = test_producer_ctx();
1014        let _producer_endpoint = component
1015            .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1016            .unwrap();
1017        let producer = DirectProducer {
1018            name: "timeout-test".to_string(),
1019            registry: Arc::clone(&component.registry),
1020            config: DirectConfig {
1021                name: "timeout-test".to_string(),
1022                timeout_ms: Some(100), // 100ms timeout
1023                block: None,
1024                fail_if_no_consumers: None,
1025                bridge_error_handler: None,
1026                exchange_pattern: None,
1027            },
1028            semaphore: Arc::new(Semaphore::new(1)),
1029            pending_permit: None,
1030            acquire_fut: None,
1031            fail_if_no_consumers: None,
1032        };
1033
1034        let exchange = Exchange::new(Message::new("test"));
1035        let mut svc = producer;
1036        let _ = svc.poll_ready(&mut Context::from_waker(&noop_waker()));
1037        let result = svc.call(exchange).await;
1038        assert!(result.is_err(), "Expected timeout error");
1039        assert!(
1040            result.unwrap_err().to_string().contains("timed out"),
1041            "Expected timeout message"
1042        );
1043
1044        token.cancel();
1045    }
1046
1047    #[test]
1048    fn test_empty_endpoint_name_rejected() {
1049        let result = DirectConfig::from_uri("direct:");
1050        // from_uri may parse empty name — validate catches it
1051        if let Ok(config) = result {
1052            assert!(
1053                validate_name(&config.name).is_err(),
1054                "expected validation error for empty name"
1055            );
1056        }
1057        // Also verify via Component (the main entry point)
1058        let component = DirectComponent::new();
1059        let result = component.create_endpoint("direct:", &NoOpComponentContext);
1060        assert!(result.is_err(), "empty endpoint name must be rejected");
1061    }
1062
1063    #[test]
1064    fn test_whitespace_endpoint_name_rejected() {
1065        let result = DirectConfig::from_uri("direct:my endpoint");
1066        if let Ok(config) = result {
1067            assert!(
1068                validate_name(&config.name).is_err(),
1069                "expected validation error for whitespace in name"
1070            );
1071        }
1072        let component = DirectComponent::new();
1073        let result = component.create_endpoint("direct:my endpoint", &NoOpComponentContext);
1074        assert!(result.is_err(), "whitespace endpoint name must be rejected");
1075    }
1076
1077    #[test]
1078    fn test_valid_endpoint_name_accepted() {
1079        let component = DirectComponent::new();
1080        let result = component.create_endpoint("direct:my-endpoint", &NoOpComponentContext);
1081        assert!(result.is_ok(), "valid endpoint name should be accepted");
1082    }
1083}