Skip to main content

camel_component_direct/
lib.rs

1//! In-memory direct component for rust-camel — synchronous point-to-point
2//! channel between routes sharing the same context with no serialization overhead.
3//!
4//! Main types: `DirectComponent`, `DirectEndpoint`, `DirectConsumer`, `DirectProducer`.
5
6use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tokio_util::sync::CancellationToken;
17use tower::Service;
18
19use camel_component_api::parse_uri;
20use camel_component_api::{BoxProcessor, CamelError, Exchange};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tracing::{debug, error, info, warn};
23
24// ---------------------------------------------------------------------------
25// Shared state: maps endpoint names to senders that deliver exchanges to the
26// consumer side.  Each entry holds a sender of `(Exchange, oneshot::Sender)`
27// so the producer can wait for the consumer's pipeline to finish processing
28// and receive the (possibly transformed) exchange back.
29// ---------------------------------------------------------------------------
30
31type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
32type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
33type AcquirePermitFut =
34    Pin<Box<dyn Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send>>;
35
36// ---------------------------------------------------------------------------
37// Validation helpers
38// ---------------------------------------------------------------------------
39
40/// Validate the direct endpoint name (the part after `direct:`).
41fn validate_name(name: &str) -> Result<(), CamelError> {
42    if name.trim().is_empty() {
43        return Err(CamelError::InvalidUri(
44            "direct: endpoint name must not be empty".to_string(),
45        ));
46    }
47    if name.contains(char::is_whitespace) {
48        return Err(CamelError::InvalidUri(
49            "direct: endpoint name must not contain whitespace".to_string(),
50        ));
51    }
52    Ok(())
53}
54
55// ---------------------------------------------------------------------------
56// DirectConfig
57// ---------------------------------------------------------------------------
58
59/// Configuration for Direct endpoints parsed from URIs.
60///
61/// URI format: `direct:name[?timeout_ms=30000]`
62///
63/// Example: `direct:foo` creates an endpoint named "foo"
64#[derive(Debug, Clone)]
65pub struct DirectConfig {
66    /// Endpoint name (path portion).
67    pub name: String,
68    /// Timeout in milliseconds for producer `call()`. Defaults to 30 000 ms.
69    pub timeout_ms: Option<u64>,
70    /// When false, the producer returns immediately if no consumer is registered.
71    /// TODO(DIR-001): implement non-blocking send
72    pub block: Option<bool>,
73    /// When false, skip readiness error if no consumer registered.
74    pub fail_if_no_consumers: Option<bool>,
75    /// TODO(DIR-005): implement exchangePattern override
76    pub exchange_pattern: Option<String>,
77}
78
79impl DirectConfig {
80    pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
81        let parts = parse_uri(uri)?;
82        if parts.scheme != "direct" {
83            return Err(CamelError::InvalidUri(format!(
84                "invalid scheme '{}', expected 'direct'",
85                parts.scheme
86            )));
87        }
88
89        let parse_bool = |name: &str, value: &str| -> Result<bool, CamelError> {
90            match value.to_ascii_lowercase().as_str() {
91                "true" | "1" | "yes" => Ok(true),
92                "false" | "0" | "no" => Ok(false),
93                _ => Err(CamelError::InvalidUri(format!(
94                    "invalid value for {}: invalid boolean value: '{}'",
95                    name, value
96                ))),
97            }
98        };
99
100        let timeout_ms = parts
101            .params
102            .get("timeout_ms")
103            .map(|v| {
104                v.parse::<u64>().map_err(|e| {
105                    CamelError::InvalidUri(format!("invalid value for timeout_ms: {}", e))
106                })
107            })
108            .transpose()?;
109
110        let block = parts
111            .params
112            .get("block")
113            .map(|v| parse_bool("block", v))
114            .transpose()?;
115
116        let fail_if_no_consumers = parts
117            .params
118            .get("fail_if_no_consumers")
119            .or_else(|| parts.params.get("failIfNoConsumers"))
120            .map(|v| parse_bool("fail_if_no_consumers", v))
121            .transpose()?;
122
123        let exchange_pattern = parts
124            .params
125            .get("exchange_pattern")
126            .or_else(|| parts.params.get("exchangePattern"))
127            .cloned();
128
129        Ok(Self {
130            name: parts.path,
131            timeout_ms,
132            block,
133            fail_if_no_consumers,
134            exchange_pattern,
135        })
136    }
137}
138
139// ---------------------------------------------------------------------------
140// DirectComponent
141// ---------------------------------------------------------------------------
142
143/// The Direct component provides in-memory synchronous communication between
144/// routes.
145///
146/// URI format: `direct:name`
147///
148/// A producer sending to `direct:foo` will block until the consumer on
149/// `direct:foo` has finished processing the exchange.
150pub struct DirectComponent {
151    registry: DirectRegistry,
152}
153
154impl DirectComponent {
155    pub fn new() -> Self {
156        Self {
157            registry: Arc::new(Mutex::new(HashMap::new())),
158        }
159    }
160}
161
162impl Default for DirectComponent {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168impl Component for DirectComponent {
169    fn scheme(&self) -> &str {
170        "direct"
171    }
172
173    fn create_endpoint(
174        &self,
175        uri: &str,
176        _ctx: &dyn camel_component_api::ComponentContext,
177    ) -> Result<Box<dyn Endpoint>, CamelError> {
178        let config = DirectConfig::from_uri(uri)?;
179        validate_name(&config.name)?;
180        let name = config.name.clone();
181        debug!(endpoint_name = %name, "direct endpoint created");
182        Ok(Box::new(DirectEndpoint {
183            uri: uri.to_string(),
184            config,
185            registry: Arc::clone(&self.registry),
186        }))
187    }
188}
189
190// ---------------------------------------------------------------------------
191// DirectEndpoint
192// ---------------------------------------------------------------------------
193
194struct DirectEndpoint {
195    uri: String,
196    config: DirectConfig,
197    registry: DirectRegistry,
198}
199
200impl Endpoint for DirectEndpoint {
201    fn uri(&self) -> &str {
202        &self.uri
203    }
204
205    fn create_consumer(
206        &self,
207        rt: Arc<dyn camel_component_api::RuntimeObservability>,
208    ) -> Result<Box<dyn Consumer>, CamelError> {
209        Ok(Box::new(DirectConsumer::new(
210            self.config.name.clone(),
211            Arc::clone(&self.registry),
212            rt,
213        )))
214    }
215
216    fn create_producer(
217        &self,
218        _rt: Arc<dyn camel_component_api::RuntimeObservability>,
219        _ctx: &ProducerContext,
220    ) -> Result<BoxProcessor, CamelError> {
221        Ok(BoxProcessor::new(DirectProducer {
222            name: self.config.name.clone(),
223            registry: Arc::clone(&self.registry),
224            config: self.config.clone(),
225            semaphore: Arc::new(Semaphore::new(1)),
226            pending_permit: None,
227            acquire_fut: None,
228            fail_if_no_consumers: self.config.fail_if_no_consumers,
229        }))
230    }
231}
232
233// ---------------------------------------------------------------------------
234// DirectConsumer
235// ---------------------------------------------------------------------------
236
237/// The Direct consumer registers itself in the shared registry and forwards
238/// incoming exchanges to the route pipeline via `ConsumerContext`.
239struct DirectConsumer {
240    name: String,
241    registry: DirectRegistry,
242    cancel: Option<CancellationToken>,
243    handle: Option<JoinHandle<Result<(), CamelError>>>,
244    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
245    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
246    #[allow(dead_code)]
247    runtime: Arc<dyn camel_component_api::RuntimeObservability>,
248}
249
250impl DirectConsumer {
251    fn new(
252        name: String,
253        registry: DirectRegistry,
254        runtime: Arc<dyn camel_component_api::RuntimeObservability>,
255    ) -> Self {
256        Self {
257            name,
258            registry,
259            cancel: None,
260            handle: None,
261            runtime,
262        }
263    }
264}
265
266#[async_trait]
267impl Consumer for DirectConsumer {
268    async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
269        // Create a channel for producers to send exchanges to this consumer.
270        let (tx, mut rx) =
271            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
272
273        // Register ourselves so producers can find us.
274        {
275            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
276            if let Some(existing) = reg.get(&self.name)
277                && !existing.is_closed()
278            {
279                return Err(CamelError::EndpointCreationFailed(format!(
280                    "direct endpoint '{}' already has a registered consumer",
281                    self.name
282                )));
283            }
284            reg.insert(self.name.clone(), tx);
285        }
286
287        let name = self.name.clone();
288        let registry = Arc::clone(&self.registry);
289        let cancel = context.cancel_token();
290        let cancel_clone = cancel.clone();
291
292        info!(endpoint_name = %self.name, "direct consumer started");
293
294        // Spawn the consumer loop so start() returns immediately.
295        let handle = tokio::spawn(async move {
296            loop {
297                tokio::select! {
298                    _ = cancel_clone.cancelled() => {
299                        debug!(endpoint_name = %name, "direct consumer received cancellation");
300                        break;
301                    }
302                    msg = rx.recv() => {
303                        match msg {
304                            Some((exchange, reply_tx)) => {
305                                debug!(
306                                    endpoint_name = %name,
307                                    exchange_id = %exchange.correlation_id,
308                                    "direct consumer received exchange"
309                                );
310                                let result = context.send_and_wait(exchange).await;
311                                if let Err(ref err) = result {
312                                    // (category b′: send_and_wait returned Err on a normal-data send,
313                                    // meaning the route handler did NOT absorb the failure —
314                                    // see ADR-0012 "b-bridged discriminator". This emitter is the
315                                    // only ERROR signal for the unhandled failure; must stay loud.)
316                                    // TODO(ADR-0012-e-metrics): wire increment_errors("b-prime:direct:send-and-wait") via bd rc-mf3
317                                    // log-policy: outside-contract
318                                    error!( // allow-log-levels
319                                        endpoint_name = %name,
320                                        error = %err,
321                                        "direct consumer pipeline error"
322                                    );
323                                }
324                                let _ = reply_tx.send(result);
325                            }
326                            None => break,
327                        }
328                    }
329                }
330            }
331
332            // Cleanup: remove from registry on exit
333            {
334                let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
335                reg.remove(&name);
336            }
337
338            debug!(endpoint_name = %name, "direct consumer stopped");
339            Ok(())
340        });
341
342        self.cancel = Some(cancel);
343        self.handle = Some(handle);
344        Ok(())
345    }
346
347    async fn stop(&mut self) -> Result<(), CamelError> {
348        // Cancel the consumer loop if we have a cancellation token.
349        if let Some(cancel) = self.cancel.take() {
350            cancel.cancel();
351        }
352
353        // Wait for the spawned task to finish (with a 5s timeout).
354        if let Some(mut h) = self.handle.take() {
355            if tokio::time::timeout(Duration::from_secs(5), &mut h)
356                .await
357                .is_err()
358            {
359                h.abort();
360                let _ = h.await;
361                warn!(endpoint_name = %self.name, "consumer task did not stop in 5s; aborted");
362                // Aborted task cannot clean up its own registry entry — do it here.
363                let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
364                reg.remove(&self.name);
365            }
366        } else {
367            // No join handle — just remove from registry.
368            let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
369            reg.remove(&self.name);
370        }
371
372        debug!(endpoint_name = %self.name, "direct consumer stopped");
373        Ok(())
374    }
375
376    fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
377        // Take the handle so spawn_consumer_task can monitor it for unexpected
378        // panics. The consumer loop exits cleanly via the CancellationToken, so
379        // stop() drives shutdown through cancel.take(); the task removes itself
380        // from the registry on exit.
381        self.handle.take()
382    }
383}
384
385// ---------------------------------------------------------------------------
386// DirectProducer
387// ---------------------------------------------------------------------------
388
389/// The Direct producer sends an exchange to the named direct endpoint and
390/// waits for a reply (synchronous in-memory call).
391struct DirectProducer {
392    name: String,
393    registry: DirectRegistry,
394    config: DirectConfig,
395    semaphore: Arc<Semaphore>,
396    pending_permit: Option<OwnedSemaphorePermit>,
397    acquire_fut: Option<AcquirePermitFut>,
398    fail_if_no_consumers: Option<bool>,
399}
400
401impl Clone for DirectProducer {
402    fn clone(&self) -> Self {
403        Self {
404            name: self.name.clone(),
405            registry: self.registry.clone(),
406            config: self.config.clone(),
407            semaphore: self.semaphore.clone(),
408            pending_permit: None,
409            acquire_fut: None,
410            fail_if_no_consumers: self.fail_if_no_consumers,
411        }
412    }
413}
414
415impl Service<Exchange> for DirectProducer {
416    type Response = Exchange;
417    type Error = CamelError;
418    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
419
420    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
421        // If we already hold a permit we are ready.
422        if self.pending_permit.is_some() {
423            return Poll::Ready(Ok(()));
424        }
425
426        // Check that the endpoint is registered.
427        {
428            let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
429            match reg.get(&self.name) {
430                None => {
431                    if self.fail_if_no_consumers != Some(false) {
432                        return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
433                            "direct endpoint '{}' not registered",
434                            self.name
435                        ))));
436                    }
437                }
438                Some(sender) if sender.is_closed() => {
439                    return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
440                        "direct endpoint '{}' channel closed",
441                        self.name
442                    ))));
443                }
444                Some(_) => {}
445            }
446        }
447
448        // Acquire a semaphore permit (bounded concurrency).
449        let fut = self
450            .acquire_fut
451            .get_or_insert_with(|| Box::pin(Arc::clone(&self.semaphore).acquire_owned()));
452        match fut.as_mut().poll(cx) {
453            Poll::Ready(Ok(permit)) => {
454                self.acquire_fut = None;
455                self.pending_permit = Some(permit);
456                Poll::Ready(Ok(()))
457            }
458            Poll::Ready(Err(_)) => Poll::Ready(Err(CamelError::ChannelClosed)),
459            Poll::Pending => Poll::Pending,
460        }
461    }
462
463    fn call(&mut self, exchange: Exchange) -> Self::Future {
464        let _permit = match self.pending_permit.take() {
465            Some(p) => p,
466            None => {
467                return Box::pin(async {
468                    Err(CamelError::ProcessorError(
469                        "call() invoked without poll_ready()".into(),
470                    ))
471                });
472            }
473        };
474
475        let name = self.name.clone();
476        let registry = Arc::clone(&self.registry);
477        let timeout = Duration::from_millis(self.config.timeout_ms.unwrap_or(30_000));
478        let exchange_id = exchange.correlation_id.clone();
479
480        debug!(
481            endpoint_name = %name,
482            exchange_id = %exchange_id,
483            "direct producer call entry"
484        );
485
486        Box::pin(async move {
487            tokio::time::timeout(timeout, async {
488                let sender = {
489                    let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
490                    reg.get(&name)
491                        .ok_or_else(|| {
492                            let err = CamelError::EndpointCreationFailed(format!(
493                                "no consumer registered for direct:{name}"
494                            ));
495                            // log-policy: handler-owned
496                            // (category a: producer send failure inside the route pipeline)
497                            warn!(endpoint_name = %name, error = %err, "direct send failed");
498                            err
499                        })?
500                        .clone()
501                };
502
503                let (reply_tx, reply_rx) = oneshot::channel();
504                sender.send((exchange, reply_tx)).await.map_err(|err| {
505                    // log-policy: handler-owned
506                    // (category a: producer send failure inside the route pipeline)
507                    warn!(endpoint_name = %name, error = %err, "direct send failed");
508                    CamelError::ChannelClosed
509                })?;
510
511                let result = reply_rx.await.map_err(|err| {
512                    // log-policy: handler-owned
513                    // (category a: producer send failure inside the route pipeline)
514                    warn!(endpoint_name = %name, error = %err, "direct send failed");
515                    CamelError::ChannelClosed
516                })?;
517
518                debug!(endpoint_name = %name, "direct message sent");
519                result
520            })
521            .await
522            .map_err(|_| CamelError::ProcessorError(format!("direct:{name} call timed out")))?
523        })
524    }
525}
526
527// ---------------------------------------------------------------------------
528// Tests
529// ---------------------------------------------------------------------------
530
531#[cfg(test)]
532mod tests {
533    use camel_component_api::test_support::PanicRuntimeObservability;
534    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
535        std::sync::Arc::new(PanicRuntimeObservability)
536    }
537
538    use super::*;
539    use camel_component_api::ExchangeEnvelope;
540    use camel_component_api::Message;
541    use camel_component_api::NoOpComponentContext;
542    use std::task::RawWakerVTable;
543    use tower::ServiceExt;
544
545    fn noop_waker() -> std::task::Waker {
546        const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
547        const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
548        unsafe { std::task::Waker::from_raw(RAW) }
549    }
550
551    fn test_producer_ctx() -> ProducerContext {
552        ProducerContext::new()
553    }
554
555    #[test]
556    fn test_direct_component_scheme() {
557        let component = DirectComponent::new();
558        assert_eq!(component.scheme(), "direct");
559    }
560
561    #[test]
562    fn test_direct_component_default() {
563        let component = DirectComponent::default();
564        assert_eq!(component.scheme(), "direct");
565    }
566
567    #[test]
568    fn test_direct_config_from_uri() {
569        let config = DirectConfig::from_uri("direct:orders").unwrap();
570        assert_eq!(config.name, "orders");
571    }
572
573    #[test]
574    fn test_direct_endpoint_uri() {
575        let component = DirectComponent::new();
576        let endpoint = component
577            .create_endpoint("direct:uri-check", &NoOpComponentContext)
578            .unwrap();
579        assert_eq!(endpoint.uri(), "direct:uri-check");
580    }
581
582    #[test]
583    fn test_direct_creates_endpoint() {
584        let component = DirectComponent::new();
585        let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
586        assert!(endpoint.is_ok());
587    }
588
589    #[test]
590    fn test_direct_wrong_scheme() {
591        let component = DirectComponent::new();
592        let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
593        assert!(result.is_err());
594    }
595
596    #[test]
597    fn test_direct_endpoint_creates_consumer() {
598        let component = DirectComponent::new();
599        let endpoint = component
600            .create_endpoint("direct:foo", &NoOpComponentContext)
601            .unwrap();
602        assert!(endpoint.create_consumer(rt()).is_ok());
603    }
604
605    #[test]
606    fn test_direct_endpoint_creates_producer() {
607        let ctx = test_producer_ctx();
608        let component = DirectComponent::new();
609        let endpoint = component
610            .create_endpoint("direct:foo", &NoOpComponentContext)
611            .unwrap();
612        assert!(endpoint.create_producer(rt(), &ctx).is_ok());
613    }
614
615    #[test]
616    fn test_direct_empty_name_rejected() {
617        let component = DirectComponent::new();
618        match component.create_endpoint("direct:", &NoOpComponentContext) {
619            Err(e) => assert!(
620                e.to_string().contains("must not be empty"),
621                "unexpected error: {e}"
622            ),
623            Ok(_) => panic!("expected error for empty name"),
624        }
625    }
626
627    #[tokio::test]
628    async fn test_direct_producer_no_consumer_registered() {
629        let ctx = test_producer_ctx();
630        let component = DirectComponent::new();
631        let endpoint = component
632            .create_endpoint("direct:missing", &NoOpComponentContext)
633            .unwrap();
634        let producer = endpoint.create_producer(rt(), &ctx).unwrap();
635
636        let exchange = Exchange::new(Message::new("test"));
637        let result = producer.oneshot(exchange).await;
638        assert!(result.is_err());
639    }
640
641    #[tokio::test]
642    async fn test_direct_duplicate_consumer_returns_error() {
643        let component = DirectComponent::new();
644        let endpoint = component
645            .create_endpoint("direct:dup", &NoOpComponentContext)
646            .unwrap();
647
648        let mut consumer_a = endpoint.create_consumer(rt()).unwrap();
649        let mut consumer_b = endpoint.create_consumer(rt()).unwrap();
650
651        let (route_tx_a, _route_rx_a) = mpsc::channel::<ExchangeEnvelope>(16);
652        let ctx_a = ConsumerContext::new(route_tx_a, tokio_util::sync::CancellationToken::new());
653        consumer_a.start(ctx_a).await.unwrap();
654
655        let (route_tx_b, _route_rx_b) = mpsc::channel::<ExchangeEnvelope>(16);
656        let ctx_b = ConsumerContext::new(route_tx_b, tokio_util::sync::CancellationToken::new());
657        let result = consumer_b.start(ctx_b).await;
658
659        assert!(matches!(
660            result,
661            Err(CamelError::EndpointCreationFailed(msg))
662                if msg.contains("already has a registered consumer")
663        ));
664
665        consumer_a.stop().await.unwrap();
666    }
667
668    #[tokio::test]
669    async fn test_direct_producer_consumer_roundtrip() {
670        let component = DirectComponent::new();
671
672        // Create consumer endpoint and start it
673        let consumer_endpoint = component
674            .create_endpoint("direct:test", &NoOpComponentContext)
675            .unwrap();
676        let mut consumer = consumer_endpoint.create_consumer(rt()).unwrap();
677
678        // The route channel now carries ExchangeEnvelope (request-reply support).
679        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
680        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
681
682        // Start the consumer in a background task
683        tokio::spawn(async move {
684            consumer.start(ctx).await.unwrap();
685        });
686
687        // Give the consumer a moment to register
688        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
689
690        // Spawn a pipeline simulator that reads envelopes and replies Ok.
691        tokio::spawn(async move {
692            while let Some(envelope) = route_rx.recv().await {
693                let ExchangeEnvelope { exchange, reply_tx } = envelope;
694                if let Some(tx) = reply_tx {
695                    let _ = tx.send(Ok(exchange));
696                }
697            }
698        });
699
700        // Now send an exchange via the producer
701        let ctx = test_producer_ctx();
702        let producer_endpoint = component
703            .create_endpoint("direct:test", &NoOpComponentContext)
704            .unwrap();
705        let producer = producer_endpoint.create_producer(rt(), &ctx).unwrap();
706
707        let exchange = Exchange::new(Message::new("hello direct"));
708        let result = producer.oneshot(exchange).await;
709
710        assert!(result.is_ok());
711        let reply = result.unwrap();
712        assert_eq!(reply.input.body.as_text(), Some("hello direct"));
713    }
714
715    #[tokio::test]
716    async fn test_direct_propagates_error_when_no_handler() {
717        let component = DirectComponent::new();
718
719        let consumer_endpoint = component
720            .create_endpoint("direct:err-test", &NoOpComponentContext)
721            .unwrap();
722        let mut consumer = consumer_endpoint.create_consumer(rt()).unwrap();
723
724        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
725        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
726
727        tokio::spawn(async move {
728            consumer.start(ctx).await.unwrap();
729        });
730
731        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
732
733        // Pipeline simulator that replies with Err (simulates no error handler).
734        tokio::spawn(async move {
735            while let Some(envelope) = route_rx.recv().await {
736                if let Some(tx) = envelope.reply_tx {
737                    let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
738                }
739            }
740        });
741
742        let ctx = test_producer_ctx();
743        let producer_endpoint = component
744            .create_endpoint("direct:err-test", &NoOpComponentContext)
745            .unwrap();
746        let producer = producer_endpoint.create_producer(rt(), &ctx).unwrap();
747
748        let exchange = Exchange::new(Message::new("test"));
749        let result = producer.oneshot(exchange).await;
750        assert!(result.is_err());
751        assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
752    }
753
754    #[tokio::test]
755    async fn test_direct_consumer_stop_unregisters() {
756        let component = DirectComponent::new();
757        let endpoint = component
758            .create_endpoint("direct:cleanup", &NoOpComponentContext)
759            .unwrap();
760
761        // We need a consumer to register
762        let mut consumer = endpoint.create_consumer(rt()).unwrap();
763
764        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
765        let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
766
767        // Start consumer in background
768        let handle = tokio::spawn(async move {
769            consumer.start(ctx).await.unwrap();
770        });
771
772        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
773
774        // Verify the name is registered
775        {
776            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
777            assert!(reg.contains_key("cleanup"));
778        }
779
780        // Create a new consumer just to call stop (stop removes from registry)
781        let mut stop_consumer = DirectConsumer {
782            name: "cleanup".to_string(),
783            registry: Arc::clone(&component.registry),
784            cancel: None,
785            handle: None,
786            runtime: rt(),
787        };
788        stop_consumer.stop().await.unwrap();
789
790        // Verify removed from registry
791        {
792            let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
793            assert!(!reg.contains_key("cleanup"));
794        }
795
796        handle.abort();
797    }
798
799    #[tokio::test]
800    async fn test_direct_consumer_respects_cancellation() {
801        use tokio_util::sync::CancellationToken;
802
803        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
804        let token = CancellationToken::new();
805        let (tx, _rx) = mpsc::channel(16);
806        let ctx = ConsumerContext::new(tx, token.clone());
807
808        let mut consumer = DirectConsumer {
809            name: "cancel-test".to_string(),
810            registry: registry.clone(),
811            cancel: None,
812            handle: None,
813            runtime: rt(),
814        };
815
816        let handle = tokio::spawn(async move {
817            consumer.start(ctx).await.unwrap();
818        });
819
820        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
821        assert!(
822            registry
823                .lock()
824                .unwrap_or_else(|e| e.into_inner())
825                .contains_key("cancel-test")
826        );
827
828        token.cancel();
829        // start() now spawns the loop internally and returns immediately,
830        // so the outer handle completes right away. Give the inner task time
831        // to react to the cancellation and clean up the registry.
832        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
833
834        // After cancellation, the consumer should have cleaned up the registry
835        assert!(
836            !registry
837                .lock()
838                .unwrap_or_else(|e| e.into_inner())
839                .contains_key("cancel-test")
840        );
841
842        let _ = handle.await;
843    }
844
845    #[tokio::test]
846    async fn test_direct_consumer_stop_missing_entry_is_ok() {
847        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
848        let mut consumer = DirectConsumer {
849            name: "never-registered".to_string(),
850            registry,
851            cancel: None,
852            handle: None,
853            runtime: rt(),
854        };
855        let result = consumer.stop().await;
856        assert!(result.is_ok());
857    }
858
859    #[test]
860    fn test_poll_ready_endpoint_not_registered() {
861        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
862        let producer = DirectProducer {
863            name: "missing".to_string(),
864            registry,
865            config: DirectConfig {
866                name: "missing".to_string(),
867                timeout_ms: None,
868                block: None,
869                fail_if_no_consumers: None,
870                exchange_pattern: None,
871            },
872            semaphore: Arc::new(Semaphore::new(1)),
873            pending_permit: None,
874            acquire_fut: None,
875            fail_if_no_consumers: None,
876        };
877        let waker = noop_waker();
878        let mut cx = Context::from_waker(&waker);
879        let mut producer = producer;
880        let result = producer.poll_ready(&mut cx);
881        assert!(matches!(
882            result,
883            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
884        ));
885    }
886
887    #[test]
888    fn test_poll_ready_endpoint_registered() {
889        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
890        let (tx, _rx) =
891            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
892        registry.lock().unwrap().insert("active".to_string(), tx);
893        let producer = DirectProducer {
894            name: "active".to_string(),
895            registry,
896            config: DirectConfig {
897                name: "active".to_string(),
898                timeout_ms: None,
899                block: None,
900                fail_if_no_consumers: None,
901                exchange_pattern: None,
902            },
903            semaphore: Arc::new(Semaphore::new(1)),
904            pending_permit: None,
905            acquire_fut: None,
906            fail_if_no_consumers: None,
907        };
908        let waker = noop_waker();
909        let mut cx = Context::from_waker(&waker);
910        let mut producer = producer;
911        let result = producer.poll_ready(&mut cx);
912        assert!(matches!(result, Poll::Ready(Ok(()))));
913    }
914
915    #[test]
916    fn test_poll_ready_allows_missing_consumer_when_fail_if_no_consumers_false() {
917        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
918        let producer = DirectProducer {
919            name: "missing-ok".to_string(),
920            registry,
921            config: DirectConfig {
922                name: "missing-ok".to_string(),
923                timeout_ms: None,
924                block: None,
925                fail_if_no_consumers: Some(false),
926                exchange_pattern: None,
927            },
928            semaphore: Arc::new(Semaphore::new(1)),
929            pending_permit: None,
930            acquire_fut: None,
931            fail_if_no_consumers: Some(false),
932        };
933
934        let waker = noop_waker();
935        let mut cx = Context::from_waker(&waker);
936        let mut producer = producer;
937        let result = producer.poll_ready(&mut cx);
938        assert!(matches!(result, Poll::Ready(Ok(()))));
939    }
940
941    #[test]
942    fn test_poll_ready_channel_closed() {
943        let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
944        let (tx, rx) =
945            mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
946        drop(rx);
947        registry.lock().unwrap().insert("closed".to_string(), tx);
948        let producer = DirectProducer {
949            name: "closed".to_string(),
950            registry,
951            config: DirectConfig {
952                name: "closed".to_string(),
953                timeout_ms: None,
954                block: None,
955                fail_if_no_consumers: None,
956                exchange_pattern: None,
957            },
958            semaphore: Arc::new(Semaphore::new(1)),
959            pending_permit: None,
960            acquire_fut: None,
961            fail_if_no_consumers: None,
962        };
963        let waker = noop_waker();
964        let mut cx = Context::from_waker(&waker);
965        let mut producer = producer;
966        let result = producer.poll_ready(&mut cx);
967        assert!(matches!(
968            result,
969            Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
970        ));
971    }
972
973    #[tokio::test]
974    async fn test_direct_stop_cancels_loop() {
975        use tokio_util::sync::CancellationToken;
976
977        let component = DirectComponent::new();
978        let endpoint = component
979            .create_endpoint("direct:stop-test", &NoOpComponentContext)
980            .unwrap();
981        let mut consumer = endpoint.create_consumer(rt()).unwrap();
982
983        let token = CancellationToken::new();
984        let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
985        let ctx = ConsumerContext::new(route_tx, token.clone());
986
987        // start() blocks — it should run the consumer loop on a JoinHandle
988        // and return immediately. But currently start() IS the loop.
989        // We test stop() cancels the loop.
990        let handle = tokio::spawn(async move {
991            consumer.start(ctx).await.unwrap();
992        });
993
994        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
995        assert!(
996            component
997                .registry
998                .lock()
999                .unwrap_or_else(|e| e.into_inner())
1000                .contains_key("stop-test")
1001        );
1002
1003        // Create a new consumer just for stop
1004        let mut stop_consumer = DirectConsumer {
1005            name: "stop-test".to_string(),
1006            registry: Arc::clone(&component.registry),
1007            cancel: Some(token.clone()),
1008            handle: None,
1009            runtime: rt(),
1010        };
1011        stop_consumer.stop().await.unwrap();
1012
1013        // The consumer loop should finish within 2s after stop
1014        let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
1015        assert!(result.is_ok(), "Consumer loop did not stop within 2s");
1016
1017        // Registry should be cleaned up
1018        assert!(
1019            !component
1020                .registry
1021                .lock()
1022                .unwrap_or_else(|e| e.into_inner())
1023                .contains_key("stop-test")
1024        );
1025    }
1026
1027    #[tokio::test]
1028    async fn test_direct_producer_timeout() {
1029        let component = DirectComponent::new();
1030        let endpoint = component
1031            .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1032            .unwrap();
1033        let mut consumer = endpoint.create_consumer(rt()).unwrap();
1034
1035        // Consumer that never replies (simulates a stuck pipeline)
1036        let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
1037        let token = tokio_util::sync::CancellationToken::new();
1038        let ctx = ConsumerContext::new(route_tx, token.clone());
1039        tokio::spawn(async move {
1040            consumer.start(ctx).await.unwrap();
1041        });
1042
1043        // Drain envelopes but hold onto reply_tx so the producer never gets a reply
1044        tokio::spawn(async move {
1045            let mut held_reply: Vec<oneshot::Sender<Result<Exchange, CamelError>>> = Vec::new();
1046            while let Some(envelope) = route_rx.recv().await {
1047                held_reply.push(envelope.reply_tx.unwrap());
1048            }
1049            drop(held_reply);
1050        });
1051
1052        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1053
1054        // Create producer with a short timeout
1055        let _ = test_producer_ctx();
1056        let _producer_endpoint = component
1057            .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1058            .unwrap();
1059        let producer = DirectProducer {
1060            name: "timeout-test".to_string(),
1061            registry: Arc::clone(&component.registry),
1062            config: DirectConfig {
1063                name: "timeout-test".to_string(),
1064                timeout_ms: Some(100), // 100ms timeout
1065                block: None,
1066                fail_if_no_consumers: None,
1067                exchange_pattern: None,
1068            },
1069            semaphore: Arc::new(Semaphore::new(1)),
1070            pending_permit: None,
1071            acquire_fut: None,
1072            fail_if_no_consumers: None,
1073        };
1074
1075        let exchange = Exchange::new(Message::new("test"));
1076        let mut svc = producer;
1077        let _ = svc.poll_ready(&mut Context::from_waker(&noop_waker()));
1078        let result = svc.call(exchange).await;
1079        assert!(result.is_err(), "Expected timeout error");
1080        assert!(
1081            result.unwrap_err().to_string().contains("timed out"),
1082            "Expected timeout message"
1083        );
1084
1085        token.cancel();
1086    }
1087
1088    #[test]
1089    fn test_empty_endpoint_name_rejected() {
1090        let result = DirectConfig::from_uri("direct:");
1091        // from_uri may parse empty name — validate catches it
1092        if let Ok(config) = result {
1093            assert!(
1094                validate_name(&config.name).is_err(),
1095                "expected validation error for empty name"
1096            );
1097        }
1098        // Also verify via Component (the main entry point)
1099        let component = DirectComponent::new();
1100        let result = component.create_endpoint("direct:", &NoOpComponentContext);
1101        assert!(result.is_err(), "empty endpoint name must be rejected");
1102    }
1103
1104    #[test]
1105    fn test_whitespace_endpoint_name_rejected() {
1106        let result = DirectConfig::from_uri("direct:my endpoint");
1107        if let Ok(config) = result {
1108            assert!(
1109                validate_name(&config.name).is_err(),
1110                "expected validation error for whitespace in name"
1111            );
1112        }
1113        let component = DirectComponent::new();
1114        let result = component.create_endpoint("direct:my endpoint", &NoOpComponentContext);
1115        assert!(result.is_err(), "whitespace endpoint name must be rejected");
1116    }
1117
1118    #[test]
1119    fn test_valid_endpoint_name_accepted() {
1120        let component = DirectComponent::new();
1121        let result = component.create_endpoint("direct:my-endpoint", &NoOpComponentContext);
1122        assert!(result.is_ok(), "valid endpoint name should be accepted");
1123    }
1124}