Skip to main content

camel_component_direct/
lib.rs

1//! In-memory direct component for rust-camel — synchronous point-to-point
2//! channel between routes sharing the same context with no serialization overhead.
3//!
4//! Main types: `DirectComponent`, `DirectEndpoint`, `DirectConsumer`, `DirectProducer`.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tokio_util::sync::CancellationToken;
17use tower::Service;
18
19use camel_component_api::parse_uri;
20use camel_component_api::{BoxProcessor, CamelError, Exchange};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tracing::{debug, error, info, warn};
23
24// ---------------------------------------------------------------------------
25// Shared state: maps endpoint names to senders that deliver exchanges to the
26// consumer side.  Each entry holds a sender of `(Exchange, oneshot::Sender)`
27// so the producer can wait for the consumer's pipeline to finish processing
28// and receive the (possibly transformed) exchange back.
29// ---------------------------------------------------------------------------
30
31type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
32type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
33type AcquirePermitFut =
34    Pin<Box<dyn Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send>>;
35
36// ---------------------------------------------------------------------------
37// Validation helpers
38// ---------------------------------------------------------------------------
39
40/// Validate the direct endpoint name (the part after `direct:`).
41fn validate_name(name: &str) -> Result<(), CamelError> {
42    if name.trim().is_empty() {
43        return Err(CamelError::InvalidUri(
44            "direct: endpoint name must not be empty".to_string(),
45        ));
46    }
47    if name.contains(char::is_whitespace) {
48        return Err(CamelError::InvalidUri(
49            "direct: endpoint name must not contain whitespace".to_string(),
50        ));
51    }
52    Ok(())
53}
54
55// ---------------------------------------------------------------------------
56// DirectConfig
57// ---------------------------------------------------------------------------
58
59/// Configuration for Direct endpoints parsed from URIs.
60///
61/// URI format: `direct:name[?timeout_ms=30000]`
62///
63/// Example: `direct:foo` creates an endpoint named "foo"
64#[derive(Debug, Clone)]
65pub struct DirectConfig {
66    /// Endpoint name (path portion).
67    pub name: String,
68    /// Timeout in milliseconds for producer `call()`. Defaults to 30 000 ms.
69    pub timeout_ms: Option<u64>,
70    /// When false, the producer returns immediately if no consumer is registered.
71    /// TODO(DIR-001): implement non-blocking send
72    pub block: Option<bool>,
73    /// When false, skip readiness error if no consumer registered.
74    pub fail_if_no_consumers: Option<bool>,
75    /// TODO(DIR-005): implement bridgeErrorHandler routing
76    pub bridge_error_handler: Option<bool>,
77    /// TODO(DIR-005): implement exchangePattern override
78    pub exchange_pattern: Option<String>,
79}
80
81impl DirectConfig {
82    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
83        let parts = parse_uri(uri)?;
84        if parts.scheme != "direct" {
85            return Err(CamelError::InvalidUri(format!(
86                "invalid scheme '{}', expected 'direct'",
87                parts.scheme
88            )));
89        }
90
91        let parse_bool = |name: &str, value: &str| -> Result<bool, CamelError> {
92            match value.to_ascii_lowercase().as_str() {
93                "true" | "1" | "yes" => Ok(true),
94                "false" | "0" | "no" => Ok(false),
95                _ => Err(CamelError::InvalidUri(format!(
96                    "invalid value for {}: invalid boolean value: '{}'",
97                    name, value
98                ))),
99            }
100        };
101
102        let timeout_ms = parts
103            .params
104            .get("timeout_ms")
105            .map(|v| {
106                v.parse::<u64>().map_err(|e| {
107                    CamelError::InvalidUri(format!("invalid value for timeout_ms: {}", e))
108                })
109            })
110            .transpose()?;
111
112        let block = parts
113            .params
114            .get("block")
115            .map(|v| parse_bool("block", v))
116            .transpose()?;
117
118        let fail_if_no_consumers = parts
119            .params
120            .get("fail_if_no_consumers")
121            .or_else(|| parts.params.get("failIfNoConsumers"))
122            .map(|v| parse_bool("fail_if_no_consumers", v))
123            .transpose()?;
124
125        let bridge_error_handler = parts
126            .params
127            .get("bridge_error_handler")
128            .or_else(|| parts.params.get("bridgeErrorHandler"))
129            .map(|v| parse_bool("bridge_error_handler", v))
130            .transpose()?;
131
132        let exchange_pattern = parts
133            .params
134            .get("exchange_pattern")
135            .or_else(|| parts.params.get("exchangePattern"))
136            .cloned();
137
138        Ok(Self {
139            name: parts.path,
140            timeout_ms,
141            block,
142            fail_if_no_consumers,
143            bridge_error_handler,
144            exchange_pattern,
145        })
146    }
147}
148
149// ---------------------------------------------------------------------------
150// DirectComponent
151// ---------------------------------------------------------------------------
152
153/// The Direct component provides in-memory synchronous communication between
154/// routes.
155///
156/// URI format: `direct:name`
157///
158/// A producer sending to `direct:foo` will block until the consumer on
159/// `direct:foo` has finished processing the exchange.
160pub struct DirectComponent {
161    registry: DirectRegistry,
162}
163
164impl DirectComponent {
165    pub fn new() -> Self {
166        Self {
167            registry: Arc::new(Mutex::new(HashMap::new())),
168        }
169    }
170}
171
172impl Default for DirectComponent {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178impl Component for DirectComponent {
179    fn scheme(&self) -> &str {
180        "direct"
181    }
182
183    fn create_endpoint(
184        &self,
185        uri: &str,
186        _ctx: &dyn camel_component_api::ComponentContext,
187    ) -> Result<Box<dyn Endpoint>, CamelError> {
188        let config = DirectConfig::from_uri(uri)?;
189        validate_name(&config.name)?;
190        let name = config.name.clone();
191        debug!(endpoint_name = %name, "direct endpoint created");
192        Ok(Box::new(DirectEndpoint {
193            uri: uri.to_string(),
194            config,
195            registry: Arc::clone(&self.registry),
196        }))
197    }
198}
199
200// ---------------------------------------------------------------------------
201// DirectEndpoint
202// ---------------------------------------------------------------------------
203
204struct DirectEndpoint {
205    uri: String,
206    config: DirectConfig,
207    registry: DirectRegistry,
208}
209
210impl Endpoint for DirectEndpoint {
211    fn uri(&self) -> &str {
212        &self.uri
213    }
214
215    fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
216        Ok(Box::new(DirectConsumer {
217            name: self.config.name.clone(),
218            registry: Arc::clone(&self.registry),
219            cancel: None,
220            handle: None,
221        }))
222    }
223
224    fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
225        Ok(BoxProcessor::new(DirectProducer {
226            name: self.config.name.clone(),
227            registry: Arc::clone(&self.registry),
228            config: self.config.clone(),
229            semaphore: Arc::new(Semaphore::new(1)),
230            pending_permit: None,
231            acquire_fut: None,
232            fail_if_no_consumers: self.config.fail_if_no_consumers,
233        }))
234    }
235}
236
237// ---------------------------------------------------------------------------
238// DirectConsumer
239// ---------------------------------------------------------------------------
240
241/// The Direct consumer registers itself in the shared registry and forwards
242/// incoming exchanges to the route pipeline via `ConsumerContext`.
243struct DirectConsumer {
244    name: String,
245    registry: DirectRegistry,
246    cancel: Option<CancellationToken>,
247    handle: Option<JoinHandle<Result<(), CamelError>>>,
248}
249
250#[async_trait]
251impl Consumer for DirectConsumer {
252    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
253        // Create a channel for producers to send exchanges to this consumer.
254        let (tx, mut rx) =
255            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
256
257        // Register ourselves so producers can find us.
258        {
259            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
260            if let Some(existing) = reg.get(&self.name)
261                && !existing.is_closed()
262            {
263                return Err(CamelError::EndpointCreationFailed(format!(
264                    "direct endpoint '{}' already has a registered consumer",
265                    self.name
266                )));
267            }
268            reg.insert(self.name.clone(), tx);
269        }
270
271        let name = self.name.clone();
272        let registry = Arc::clone(&self.registry);
273        let cancel = context.cancel_token();
274        let cancel_clone = cancel.clone();
275
276        info!(endpoint_name = %self.name, "direct consumer started");
277
278        // Spawn the consumer loop so start() returns immediately.
279        let handle = tokio::spawn(async move {
280            loop {
281                tokio::select! {
282                    _ = cancel_clone.cancelled() => {
283                        debug!(endpoint_name = %name, "direct consumer received cancellation");
284                        break;
285                    }
286                    msg = rx.recv() => {
287                        match msg {
288                            Some((exchange, reply_tx)) => {
289                                debug!(
290                                    endpoint_name = %name,
291                                    exchange_id = %exchange.correlation_id,
292                                    "direct consumer received exchange"
293                                );
294                                let result = context.send_and_wait(exchange).await;
295                                if let Err(ref err) = result {
296                                    error!(
297                                        endpoint_name = %name,
298                                        error = %err,
299                                        "direct consumer pipeline error"
300                                    );
301                                }
302                                let _ = reply_tx.send(result);
303                            }
304                            None => break,
305                        }
306                    }
307                }
308            }
309
310            // Cleanup: remove from registry on exit
311            {
312                let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
313                reg.remove(&name);
314            }
315
316            debug!(endpoint_name = %name, "direct consumer stopped");
317            Ok(())
318        });
319
320        self.cancel = Some(cancel);
321        self.handle = Some(handle);
322        Ok(())
323    }
324
325    async fn stop(&mut self) -> Result<(), CamelError> {
326        // Cancel the consumer loop if we have a cancellation token.
327        if let Some(cancel) = self.cancel.take() {
328            cancel.cancel();
329        }
330
331        // Wait for the spawned task to finish (with a 5s timeout).
332        if let Some(mut h) = self.handle.take() {
333            if tokio::time::timeout(Duration::from_secs(5), &mut h)
334                .await
335                .is_err()
336            {
337                h.abort();
338                let _ = h.await;
339                warn!(endpoint_name = %self.name, "consumer task did not stop in 5s; aborted");
340                // Aborted task cannot clean up its own registry entry — do it here.
341                let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
342                reg.remove(&self.name);
343            }
344        } else {
345            // No join handle — just remove from registry.
346            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
347            reg.remove(&self.name);
348        }
349
350        debug!(endpoint_name = %self.name, "direct consumer stopped");
351        Ok(())
352    }
353
354    fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
355        // Take the handle so spawn_consumer_task can monitor it for unexpected
356        // panics. The consumer loop exits cleanly via the CancellationToken, so
357        // stop() drives shutdown through cancel.take(); the task removes itself
358        // from the registry on exit.
359        self.handle.take()
360    }
361}
362
363// ---------------------------------------------------------------------------
364// DirectProducer
365// ---------------------------------------------------------------------------
366
367/// The Direct producer sends an exchange to the named direct endpoint and
368/// waits for a reply (synchronous in-memory call).
369struct DirectProducer {
370    name: String,
371    registry: DirectRegistry,
372    config: DirectConfig,
373    semaphore: Arc<Semaphore>,
374    pending_permit: Option<OwnedSemaphorePermit>,
375    acquire_fut: Option<AcquirePermitFut>,
376    fail_if_no_consumers: Option<bool>,
377}
378
379impl Clone for DirectProducer {
380    fn clone(&self) -> Self {
381        Self {
382            name: self.name.clone(),
383            registry: self.registry.clone(),
384            config: self.config.clone(),
385            semaphore: self.semaphore.clone(),
386            pending_permit: None,
387            acquire_fut: None,
388            fail_if_no_consumers: self.fail_if_no_consumers,
389        }
390    }
391}
392
393impl Service<Exchange> for DirectProducer {
394    type Response = Exchange;
395    type Error = CamelError;
396    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
397
398    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
399        // If we already hold a permit we are ready.
400        if self.pending_permit.is_some() {
401            return Poll::Ready(Ok(()));
402        }
403
404        // Check that the endpoint is registered.
405        {
406            let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
407            match reg.get(&self.name) {
408                None => {
409                    if self.fail_if_no_consumers != Some(false) {
410                        return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
411                            "direct endpoint '{}' not registered",
412                            self.name
413                        ))));
414                    }
415                }
416                Some(sender) if sender.is_closed() => {
417                    return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
418                        "direct endpoint '{}' channel closed",
419                        self.name
420                    ))));
421                }
422                Some(_) => {}
423            }
424        }
425
426        // Acquire a semaphore permit (bounded concurrency).
427        let fut = self
428            .acquire_fut
429            .get_or_insert_with(|| Box::pin(Arc::clone(&self.semaphore).acquire_owned()));
430        match fut.as_mut().poll(cx) {
431            Poll::Ready(Ok(permit)) => {
432                self.acquire_fut = None;
433                self.pending_permit = Some(permit);
434                Poll::Ready(Ok(()))
435            }
436            Poll::Ready(Err(_)) => Poll::Ready(Err(CamelError::ChannelClosed)),
437            Poll::Pending => Poll::Pending,
438        }
439    }
440
441    fn call(&mut self, exchange: Exchange) -> Self::Future {
442        let _permit = match self.pending_permit.take() {
443            Some(p) => p,
444            None => {
445                return Box::pin(async {
446                    Err(CamelError::ProcessorError(
447                        "call() invoked without poll_ready()".into(),
448                    ))
449                });
450            }
451        };
452
453        let name = self.name.clone();
454        let registry = Arc::clone(&self.registry);
455        let timeout = Duration::from_millis(self.config.timeout_ms.unwrap_or(30_000));
456        let exchange_id = exchange.correlation_id.clone();
457
458        debug!(
459            endpoint_name = %name,
460            exchange_id = %exchange_id,
461            "direct producer call entry"
462        );
463
464        Box::pin(async move {
465            tokio::time::timeout(timeout, async {
466                let sender = {
467                    let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
468                    reg.get(&name)
469                        .ok_or_else(|| {
470                            let err = CamelError::EndpointCreationFailed(format!(
471                                "no consumer registered for direct:{name}"
472                            ));
473                            error!(endpoint_name = %name, error = %err, "direct send failed");
474                            err
475                        })?
476                        .clone()
477                };
478
479                let (reply_tx, reply_rx) = oneshot::channel();
480                sender.send((exchange, reply_tx)).await.map_err(|err| {
481                    error!(endpoint_name = %name, error = %err, "direct send failed");
482                    CamelError::ChannelClosed
483                })?;
484
485                let result = reply_rx.await.map_err(|err| {
486                    error!(endpoint_name = %name, error = %err, "direct send failed");
487                    CamelError::ChannelClosed
488                })?;
489
490                debug!(endpoint_name = %name, "direct message sent");
491                result
492            })
493            .await
494            .map_err(|_| CamelError::ProcessorError(format!("direct:{name} call timed out")))?
495        })
496    }
497}
498
499// ---------------------------------------------------------------------------
500// Tests
501// ---------------------------------------------------------------------------
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use camel_component_api::ExchangeEnvelope;
507    use camel_component_api::Message;
508    use camel_component_api::NoOpComponentContext;
509    use std::task::RawWakerVTable;
510    use tower::ServiceExt;
511
512    fn noop_waker() -> std::task::Waker {
513        const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
514        const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
515        unsafe { std::task::Waker::from_raw(RAW) }
516    }
517
518    fn test_producer_ctx() -> ProducerContext {
519        ProducerContext::new()
520    }
521
522    #[test]
523    fn test_direct_component_scheme() {
524        let component = DirectComponent::new();
525        assert_eq!(component.scheme(), "direct");
526    }
527
528    #[test]
529    fn test_direct_component_default() {
530        let component = DirectComponent::default();
531        assert_eq!(component.scheme(), "direct");
532    }
533
534    #[test]
535    fn test_direct_config_from_uri() {
536        let config = DirectConfig::from_uri("direct:orders").unwrap();
537        assert_eq!(config.name, "orders");
538    }
539
540    #[test]
541    fn test_direct_endpoint_uri() {
542        let component = DirectComponent::new();
543        let endpoint = component
544            .create_endpoint("direct:uri-check", &NoOpComponentContext)
545            .unwrap();
546        assert_eq!(endpoint.uri(), "direct:uri-check");
547    }
548
549    #[test]
550    fn test_direct_creates_endpoint() {
551        let component = DirectComponent::new();
552        let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
553        assert!(endpoint.is_ok());
554    }
555
556    #[test]
557    fn test_direct_wrong_scheme() {
558        let component = DirectComponent::new();
559        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
560        assert!(result.is_err());
561    }
562
563    #[test]
564    fn test_direct_endpoint_creates_consumer() {
565        let component = DirectComponent::new();
566        let endpoint = component
567            .create_endpoint("direct:foo", &NoOpComponentContext)
568            .unwrap();
569        assert!(endpoint.create_consumer().is_ok());
570    }
571
572    #[test]
573    fn test_direct_endpoint_creates_producer() {
574        let ctx = test_producer_ctx();
575        let component = DirectComponent::new();
576        let endpoint = component
577            .create_endpoint("direct:foo", &NoOpComponentContext)
578            .unwrap();
579        assert!(endpoint.create_producer(&ctx).is_ok());
580    }
581
582    #[test]
583    fn test_direct_empty_name_rejected() {
584        let component = DirectComponent::new();
585        match component.create_endpoint("direct:", &NoOpComponentContext) {
586            Err(e) => assert!(
587                e.to_string().contains("must not be empty"),
588                "unexpected error: {e}"
589            ),
590            Ok(_) => panic!("expected error for empty name"),
591        }
592    }
593
594    #[tokio::test]
595    async fn test_direct_producer_no_consumer_registered() {
596        let ctx = test_producer_ctx();
597        let component = DirectComponent::new();
598        let endpoint = component
599            .create_endpoint("direct:missing", &NoOpComponentContext)
600            .unwrap();
601        let producer = endpoint.create_producer(&ctx).unwrap();
602
603        let exchange = Exchange::new(Message::new("test"));
604        let result = producer.oneshot(exchange).await;
605        assert!(result.is_err());
606    }
607
608    #[tokio::test]
609    async fn test_direct_duplicate_consumer_returns_error() {
610        let component = DirectComponent::new();
611        let endpoint = component
612            .create_endpoint("direct:dup", &NoOpComponentContext)
613            .unwrap();
614
615        let mut consumer_a = endpoint.create_consumer().unwrap();
616        let mut consumer_b = endpoint.create_consumer().unwrap();
617
618        let (route_tx_a, _route_rx_a) = mpsc::channel::<ExchangeEnvelope>(16);
619        let ctx_a = ConsumerContext::new(route_tx_a, tokio_util::sync::CancellationToken::new());
620        consumer_a.start(ctx_a).await.unwrap();
621
622        let (route_tx_b, _route_rx_b) = mpsc::channel::<ExchangeEnvelope>(16);
623        let ctx_b = ConsumerContext::new(route_tx_b, tokio_util::sync::CancellationToken::new());
624        let result = consumer_b.start(ctx_b).await;
625
626        assert!(matches!(
627            result,
628            Err(CamelError::EndpointCreationFailed(msg))
629                if msg.contains("already has a registered consumer")
630        ));
631
632        consumer_a.stop().await.unwrap();
633    }
634
635    #[tokio::test]
636    async fn test_direct_producer_consumer_roundtrip() {
637        let component = DirectComponent::new();
638
639        // Create consumer endpoint and start it
640        let consumer_endpoint = component
641            .create_endpoint("direct:test", &NoOpComponentContext)
642            .unwrap();
643        let mut consumer = consumer_endpoint.create_consumer().unwrap();
644
645        // The route channel now carries ExchangeEnvelope (request-reply support).
646        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
647        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
648
649        // Start the consumer in a background task
650        tokio::spawn(async move {
651            consumer.start(ctx).await.unwrap();
652        });
653
654        // Give the consumer a moment to register
655        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
656
657        // Spawn a pipeline simulator that reads envelopes and replies Ok.
658        tokio::spawn(async move {
659            while let Some(envelope) = route_rx.recv().await {
660                let ExchangeEnvelope { exchange, reply_tx } = envelope;
661                if let Some(tx) = reply_tx {
662                    let _ = tx.send(Ok(exchange));
663                }
664            }
665        });
666
667        // Now send an exchange via the producer
668        let ctx = test_producer_ctx();
669        let producer_endpoint = component
670            .create_endpoint("direct:test", &NoOpComponentContext)
671            .unwrap();
672        let producer = producer_endpoint.create_producer(&ctx).unwrap();
673
674        let exchange = Exchange::new(Message::new("hello direct"));
675        let result = producer.oneshot(exchange).await;
676
677        assert!(result.is_ok());
678        let reply = result.unwrap();
679        assert_eq!(reply.input.body.as_text(), Some("hello direct"));
680    }
681
682    #[tokio::test]
683    async fn test_direct_propagates_error_when_no_handler() {
684        let component = DirectComponent::new();
685
686        let consumer_endpoint = component
687            .create_endpoint("direct:err-test", &NoOpComponentContext)
688            .unwrap();
689        let mut consumer = consumer_endpoint.create_consumer().unwrap();
690
691        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
692        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
693
694        tokio::spawn(async move {
695            consumer.start(ctx).await.unwrap();
696        });
697
698        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
699
700        // Pipeline simulator that replies with Err (simulates no error handler).
701        tokio::spawn(async move {
702            while let Some(envelope) = route_rx.recv().await {
703                if let Some(tx) = envelope.reply_tx {
704                    let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
705                }
706            }
707        });
708
709        let ctx = test_producer_ctx();
710        let producer_endpoint = component
711            .create_endpoint("direct:err-test", &NoOpComponentContext)
712            .unwrap();
713        let producer = producer_endpoint.create_producer(&ctx).unwrap();
714
715        let exchange = Exchange::new(Message::new("test"));
716        let result = producer.oneshot(exchange).await;
717        assert!(result.is_err());
718        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
719    }
720
721    #[tokio::test]
722    async fn test_direct_consumer_stop_unregisters() {
723        let component = DirectComponent::new();
724        let endpoint = component
725            .create_endpoint("direct:cleanup", &NoOpComponentContext)
726            .unwrap();
727
728        // We need a consumer to register
729        let mut consumer = endpoint.create_consumer().unwrap();
730
731        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
732        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
733
734        // Start consumer in background
735        let handle = tokio::spawn(async move {
736            consumer.start(ctx).await.unwrap();
737        });
738
739        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
740
741        // Verify the name is registered
742        {
743            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
744            assert!(reg.contains_key("cleanup"));
745        }
746
747        // Create a new consumer just to call stop (stop removes from registry)
748        let mut stop_consumer = DirectConsumer {
749            name: "cleanup".to_string(),
750            registry: Arc::clone(&component.registry),
751            cancel: None,
752            handle: None,
753        };
754        stop_consumer.stop().await.unwrap();
755
756        // Verify removed from registry
757        {
758            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
759            assert!(!reg.contains_key("cleanup"));
760        }
761
762        handle.abort();
763    }
764
765    #[tokio::test]
766    async fn test_direct_consumer_respects_cancellation() {
767        use tokio_util::sync::CancellationToken;
768
769        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
770        let token = CancellationToken::new();
771        let (tx, _rx) = mpsc::channel(16);
772        let ctx = ConsumerContext::new(tx, token.clone());
773
774        let mut consumer = DirectConsumer {
775            name: "cancel-test".to_string(),
776            registry: registry.clone(),
777            cancel: None,
778            handle: None,
779        };
780
781        let handle = tokio::spawn(async move {
782            consumer.start(ctx).await.unwrap();
783        });
784
785        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
786        assert!(
787            registry
788                .lock()
789                .unwrap_or_else(|e| e.into_inner())
790                .contains_key("cancel-test")
791        );
792
793        token.cancel();
794        // start() now spawns the loop internally and returns immediately,
795        // so the outer handle completes right away. Give the inner task time
796        // to react to the cancellation and clean up the registry.
797        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
798
799        // After cancellation, the consumer should have cleaned up the registry
800        assert!(
801            !registry
802                .lock()
803                .unwrap_or_else(|e| e.into_inner())
804                .contains_key("cancel-test")
805        );
806
807        let _ = handle.await;
808    }
809
810    #[tokio::test]
811    async fn test_direct_consumer_stop_missing_entry_is_ok() {
812        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
813        let mut consumer = DirectConsumer {
814            name: "never-registered".to_string(),
815            registry,
816            cancel: None,
817            handle: None,
818        };
819        let result = consumer.stop().await;
820        assert!(result.is_ok());
821    }
822
823    #[test]
824    fn test_poll_ready_endpoint_not_registered() {
825        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
826        let producer = DirectProducer {
827            name: "missing".to_string(),
828            registry,
829            config: DirectConfig {
830                name: "missing".to_string(),
831                timeout_ms: None,
832                block: None,
833                fail_if_no_consumers: None,
834                bridge_error_handler: None,
835                exchange_pattern: None,
836            },
837            semaphore: Arc::new(Semaphore::new(1)),
838            pending_permit: None,
839            acquire_fut: None,
840            fail_if_no_consumers: None,
841        };
842        let waker = noop_waker();
843        let mut cx = Context::from_waker(&waker);
844        let mut producer = producer;
845        let result = producer.poll_ready(&mut cx);
846        assert!(matches!(
847            result,
848            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
849        ));
850    }
851
852    #[test]
853    fn test_poll_ready_endpoint_registered() {
854        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
855        let (tx, _rx) =
856            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
857        registry.lock().unwrap().insert("active".to_string(), tx);
858        let producer = DirectProducer {
859            name: "active".to_string(),
860            registry,
861            config: DirectConfig {
862                name: "active".to_string(),
863                timeout_ms: None,
864                block: None,
865                fail_if_no_consumers: None,
866                bridge_error_handler: None,
867                exchange_pattern: None,
868            },
869            semaphore: Arc::new(Semaphore::new(1)),
870            pending_permit: None,
871            acquire_fut: None,
872            fail_if_no_consumers: None,
873        };
874        let waker = noop_waker();
875        let mut cx = Context::from_waker(&waker);
876        let mut producer = producer;
877        let result = producer.poll_ready(&mut cx);
878        assert!(matches!(result, Poll::Ready(Ok(()))));
879    }
880
881    #[test]
882    fn test_poll_ready_allows_missing_consumer_when_fail_if_no_consumers_false() {
883        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
884        let producer = DirectProducer {
885            name: "missing-ok".to_string(),
886            registry,
887            config: DirectConfig {
888                name: "missing-ok".to_string(),
889                timeout_ms: None,
890                block: None,
891                fail_if_no_consumers: Some(false),
892                bridge_error_handler: None,
893                exchange_pattern: None,
894            },
895            semaphore: Arc::new(Semaphore::new(1)),
896            pending_permit: None,
897            acquire_fut: None,
898            fail_if_no_consumers: Some(false),
899        };
900
901        let waker = noop_waker();
902        let mut cx = Context::from_waker(&waker);
903        let mut producer = producer;
904        let result = producer.poll_ready(&mut cx);
905        assert!(matches!(result, Poll::Ready(Ok(()))));
906    }
907
908    #[test]
909    fn test_poll_ready_channel_closed() {
910        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
911        let (tx, rx) =
912            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
913        drop(rx);
914        registry.lock().unwrap().insert("closed".to_string(), tx);
915        let producer = DirectProducer {
916            name: "closed".to_string(),
917            registry,
918            config: DirectConfig {
919                name: "closed".to_string(),
920                timeout_ms: None,
921                block: None,
922                fail_if_no_consumers: None,
923                bridge_error_handler: None,
924                exchange_pattern: None,
925            },
926            semaphore: Arc::new(Semaphore::new(1)),
927            pending_permit: None,
928            acquire_fut: None,
929            fail_if_no_consumers: None,
930        };
931        let waker = noop_waker();
932        let mut cx = Context::from_waker(&waker);
933        let mut producer = producer;
934        let result = producer.poll_ready(&mut cx);
935        assert!(matches!(
936            result,
937            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
938        ));
939    }
940
941    #[tokio::test]
942    async fn test_direct_stop_cancels_loop() {
943        use tokio_util::sync::CancellationToken;
944
945        let component = DirectComponent::new();
946        let endpoint = component
947            .create_endpoint("direct:stop-test", &NoOpComponentContext)
948            .unwrap();
949        let mut consumer = endpoint.create_consumer().unwrap();
950
951        let token = CancellationToken::new();
952        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
953        let ctx = ConsumerContext::new(route_tx, token.clone());
954
955        // start() blocks — it should run the consumer loop on a JoinHandle
956        // and return immediately. But currently start() IS the loop.
957        // We test stop() cancels the loop.
958        let handle = tokio::spawn(async move {
959            consumer.start(ctx).await.unwrap();
960        });
961
962        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
963        assert!(
964            component
965                .registry
966                .lock()
967                .unwrap_or_else(|e| e.into_inner())
968                .contains_key("stop-test")
969        );
970
971        // Create a new consumer just for stop
972        let mut stop_consumer = DirectConsumer {
973            name: "stop-test".to_string(),
974            registry: Arc::clone(&component.registry),
975            cancel: Some(token.clone()),
976            handle: None,
977        };
978        stop_consumer.stop().await.unwrap();
979
980        // The consumer loop should finish within 2s after stop
981        let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
982        assert!(result.is_ok(), "Consumer loop did not stop within 2s");
983
984        // Registry should be cleaned up
985        assert!(
986            !component
987                .registry
988                .lock()
989                .unwrap_or_else(|e| e.into_inner())
990                .contains_key("stop-test")
991        );
992    }
993
994    #[tokio::test]
995    async fn test_direct_producer_timeout() {
996        let component = DirectComponent::new();
997        let endpoint = component
998            .create_endpoint("direct:timeout-test", &NoOpComponentContext)
999            .unwrap();
1000        let mut consumer = endpoint.create_consumer().unwrap();
1001
1002        // Consumer that never replies (simulates a stuck pipeline)
1003        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
1004        let token = tokio_util::sync::CancellationToken::new();
1005        let ctx = ConsumerContext::new(route_tx, token.clone());
1006        tokio::spawn(async move {
1007            consumer.start(ctx).await.unwrap();
1008        });
1009
1010        // Drain envelopes but hold onto reply_tx so the producer never gets a reply
1011        tokio::spawn(async move {
1012            let mut held_reply: Vec<oneshot::Sender<Result<Exchange, CamelError>>> = Vec::new();
1013            while let Some(envelope) = route_rx.recv().await {
1014                held_reply.push(envelope.reply_tx.unwrap());
1015            }
1016            drop(held_reply);
1017        });
1018
1019        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1020
1021        // Create producer with a short timeout
1022        let _ = test_producer_ctx();
1023        let _producer_endpoint = component
1024            .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1025            .unwrap();
1026        let producer = DirectProducer {
1027            name: "timeout-test".to_string(),
1028            registry: Arc::clone(&component.registry),
1029            config: DirectConfig {
1030                name: "timeout-test".to_string(),
1031                timeout_ms: Some(100), // 100ms timeout
1032                block: None,
1033                fail_if_no_consumers: None,
1034                bridge_error_handler: None,
1035                exchange_pattern: None,
1036            },
1037            semaphore: Arc::new(Semaphore::new(1)),
1038            pending_permit: None,
1039            acquire_fut: None,
1040            fail_if_no_consumers: None,
1041        };
1042
1043        let exchange = Exchange::new(Message::new("test"));
1044        let mut svc = producer;
1045        let _ = svc.poll_ready(&mut Context::from_waker(&noop_waker()));
1046        let result = svc.call(exchange).await;
1047        assert!(result.is_err(), "Expected timeout error");
1048        assert!(
1049            result.unwrap_err().to_string().contains("timed out"),
1050            "Expected timeout message"
1051        );
1052
1053        token.cancel();
1054    }
1055
1056    #[test]
1057    fn test_empty_endpoint_name_rejected() {
1058        let result = DirectConfig::from_uri("direct:");
1059        // from_uri may parse empty name — validate catches it
1060        if let Ok(config) = result {
1061            assert!(
1062                validate_name(&config.name).is_err(),
1063                "expected validation error for empty name"
1064            );
1065        }
1066        // Also verify via Component (the main entry point)
1067        let component = DirectComponent::new();
1068        let result = component.create_endpoint("direct:", &NoOpComponentContext);
1069        assert!(result.is_err(), "empty endpoint name must be rejected");
1070    }
1071
1072    #[test]
1073    fn test_whitespace_endpoint_name_rejected() {
1074        let result = DirectConfig::from_uri("direct:my endpoint");
1075        if let Ok(config) = result {
1076            assert!(
1077                validate_name(&config.name).is_err(),
1078                "expected validation error for whitespace in name"
1079            );
1080        }
1081        let component = DirectComponent::new();
1082        let result = component.create_endpoint("direct:my endpoint", &NoOpComponentContext);
1083        assert!(result.is_err(), "whitespace endpoint name must be rejected");
1084    }
1085
1086    #[test]
1087    fn test_valid_endpoint_name_accepted() {
1088        let component = DirectComponent::new();
1089        let result = component.create_endpoint("direct:my-endpoint", &NoOpComponentContext);
1090        assert!(result.is_ok(), "valid endpoint name should be accepted");
1091    }
1092}