1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tokio_util::sync::CancellationToken;
17use tower::Service;
18
19use camel_component_api::parse_uri;
20use camel_component_api::{BoxProcessor, CamelError, Exchange};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tracing::{debug, error, info, warn};
23
24type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
32type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
33type AcquirePermitFut =
34 Pin<Box<dyn Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send>>;
35
36fn validate_name(name: &str) -> Result<(), CamelError> {
42 if name.trim().is_empty() {
43 return Err(CamelError::InvalidUri(
44 "direct: endpoint name must not be empty".to_string(),
45 ));
46 }
47 if name.contains(char::is_whitespace) {
48 return Err(CamelError::InvalidUri(
49 "direct: endpoint name must not contain whitespace".to_string(),
50 ));
51 }
52 Ok(())
53}
54
55#[derive(Debug, Clone)]
65pub struct DirectConfig {
66 pub name: String,
68 pub timeout_ms: Option<u64>,
70 pub block: Option<bool>,
73 pub fail_if_no_consumers: Option<bool>,
75 pub exchange_pattern: Option<String>,
77}
78
79impl DirectConfig {
80 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
81 let parts = parse_uri(uri)?;
82 if parts.scheme != "direct" {
83 return Err(CamelError::InvalidUri(format!(
84 "invalid scheme '{}', expected 'direct'",
85 parts.scheme
86 )));
87 }
88
89 let parse_bool = |name: &str, value: &str| -> Result<bool, CamelError> {
90 match value.to_ascii_lowercase().as_str() {
91 "true" | "1" | "yes" => Ok(true),
92 "false" | "0" | "no" => Ok(false),
93 _ => Err(CamelError::InvalidUri(format!(
94 "invalid value for {}: invalid boolean value: '{}'",
95 name, value
96 ))),
97 }
98 };
99
100 let timeout_ms = parts
101 .params
102 .get("timeout_ms")
103 .map(|v| {
104 v.parse::<u64>().map_err(|e| {
105 CamelError::InvalidUri(format!("invalid value for timeout_ms: {}", e))
106 })
107 })
108 .transpose()?;
109
110 let block = parts
111 .params
112 .get("block")
113 .map(|v| parse_bool("block", v))
114 .transpose()?;
115
116 let fail_if_no_consumers = parts
117 .params
118 .get("fail_if_no_consumers")
119 .or_else(|| parts.params.get("failIfNoConsumers"))
120 .map(|v| parse_bool("fail_if_no_consumers", v))
121 .transpose()?;
122
123 let exchange_pattern = parts
124 .params
125 .get("exchange_pattern")
126 .or_else(|| parts.params.get("exchangePattern"))
127 .cloned();
128
129 Ok(Self {
130 name: parts.path,
131 timeout_ms,
132 block,
133 fail_if_no_consumers,
134 exchange_pattern,
135 })
136 }
137}
138
139pub struct DirectComponent {
151 registry: DirectRegistry,
152}
153
154impl DirectComponent {
155 pub fn new() -> Self {
156 Self {
157 registry: Arc::new(Mutex::new(HashMap::new())),
158 }
159 }
160}
161
162impl Default for DirectComponent {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168impl Component for DirectComponent {
169 fn scheme(&self) -> &str {
170 "direct"
171 }
172
173 fn create_endpoint(
174 &self,
175 uri: &str,
176 _ctx: &dyn camel_component_api::ComponentContext,
177 ) -> Result<Box<dyn Endpoint>, CamelError> {
178 let config = DirectConfig::from_uri(uri)?;
179 validate_name(&config.name)?;
180 let name = config.name.clone();
181 debug!(endpoint_name = %name, "direct endpoint created");
182 Ok(Box::new(DirectEndpoint {
183 uri: uri.to_string(),
184 config,
185 registry: Arc::clone(&self.registry),
186 }))
187 }
188}
189
190struct DirectEndpoint {
195 uri: String,
196 config: DirectConfig,
197 registry: DirectRegistry,
198}
199
200impl Endpoint for DirectEndpoint {
201 fn uri(&self) -> &str {
202 &self.uri
203 }
204
205 fn create_consumer(
206 &self,
207 rt: Arc<dyn camel_component_api::RuntimeObservability>,
208 ) -> Result<Box<dyn Consumer>, CamelError> {
209 Ok(Box::new(DirectConsumer::new(
210 self.config.name.clone(),
211 Arc::clone(&self.registry),
212 rt,
213 )))
214 }
215
216 fn create_producer(
217 &self,
218 _rt: Arc<dyn camel_component_api::RuntimeObservability>,
219 _ctx: &ProducerContext,
220 ) -> Result<BoxProcessor, CamelError> {
221 Ok(BoxProcessor::new(DirectProducer {
222 name: self.config.name.clone(),
223 registry: Arc::clone(&self.registry),
224 config: self.config.clone(),
225 semaphore: Arc::new(Semaphore::new(1)),
226 pending_permit: None,
227 acquire_fut: None,
228 fail_if_no_consumers: self.config.fail_if_no_consumers,
229 }))
230 }
231}
232
233struct DirectConsumer {
240 name: String,
241 registry: DirectRegistry,
242 cancel: Option<CancellationToken>,
243 handle: Option<JoinHandle<Result<(), CamelError>>>,
244 #[allow(dead_code)]
247 runtime: Arc<dyn camel_component_api::RuntimeObservability>,
248}
249
250impl DirectConsumer {
251 fn new(
252 name: String,
253 registry: DirectRegistry,
254 runtime: Arc<dyn camel_component_api::RuntimeObservability>,
255 ) -> Self {
256 Self {
257 name,
258 registry,
259 cancel: None,
260 handle: None,
261 runtime,
262 }
263 }
264}
265
266#[async_trait]
267impl Consumer for DirectConsumer {
268 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
269 let (tx, mut rx) =
271 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
272
273 {
275 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
276 if let Some(existing) = reg.get(&self.name)
277 && !existing.is_closed()
278 {
279 return Err(CamelError::EndpointCreationFailed(format!(
280 "direct endpoint '{}' already has a registered consumer",
281 self.name
282 )));
283 }
284 reg.insert(self.name.clone(), tx);
285 }
286
287 let name = self.name.clone();
288 let registry = Arc::clone(&self.registry);
289 let cancel = context.cancel_token();
290 let cancel_clone = cancel.clone();
291
292 info!(endpoint_name = %self.name, "direct consumer started");
293
294 let handle = tokio::spawn(async move {
296 loop {
297 tokio::select! {
298 _ = cancel_clone.cancelled() => {
299 debug!(endpoint_name = %name, "direct consumer received cancellation");
300 break;
301 }
302 msg = rx.recv() => {
303 match msg {
304 Some((exchange, reply_tx)) => {
305 debug!(
306 endpoint_name = %name,
307 exchange_id = %exchange.correlation_id,
308 "direct consumer received exchange"
309 );
310 let result = context.send_and_wait(exchange).await;
311 if let Err(ref err) = result {
312 error!( endpoint_name = %name,
320 error = %err,
321 "direct consumer pipeline error"
322 );
323 }
324 let _ = reply_tx.send(result);
325 }
326 None => break,
327 }
328 }
329 }
330 }
331
332 {
334 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
335 reg.remove(&name);
336 }
337
338 debug!(endpoint_name = %name, "direct consumer stopped");
339 Ok(())
340 });
341
342 self.cancel = Some(cancel);
343 self.handle = Some(handle);
344 Ok(())
345 }
346
347 async fn stop(&mut self) -> Result<(), CamelError> {
348 if let Some(cancel) = self.cancel.take() {
350 cancel.cancel();
351 }
352
353 if let Some(mut h) = self.handle.take() {
355 if tokio::time::timeout(Duration::from_secs(5), &mut h)
356 .await
357 .is_err()
358 {
359 h.abort();
360 let _ = h.await;
361 warn!(endpoint_name = %self.name, "consumer task did not stop in 5s; aborted");
362 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
364 reg.remove(&self.name);
365 }
366 } else {
367 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
369 reg.remove(&self.name);
370 }
371
372 debug!(endpoint_name = %self.name, "direct consumer stopped");
373 Ok(())
374 }
375
376 fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
377 self.handle.take()
382 }
383}
384
385struct DirectProducer {
392 name: String,
393 registry: DirectRegistry,
394 config: DirectConfig,
395 semaphore: Arc<Semaphore>,
396 pending_permit: Option<OwnedSemaphorePermit>,
397 acquire_fut: Option<AcquirePermitFut>,
398 fail_if_no_consumers: Option<bool>,
399}
400
401impl Clone for DirectProducer {
402 fn clone(&self) -> Self {
403 Self {
404 name: self.name.clone(),
405 registry: self.registry.clone(),
406 config: self.config.clone(),
407 semaphore: self.semaphore.clone(),
408 pending_permit: None,
409 acquire_fut: None,
410 fail_if_no_consumers: self.fail_if_no_consumers,
411 }
412 }
413}
414
415impl Service<Exchange> for DirectProducer {
416 type Response = Exchange;
417 type Error = CamelError;
418 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
419
420 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
421 if self.pending_permit.is_some() {
423 return Poll::Ready(Ok(()));
424 }
425
426 {
428 let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
429 match reg.get(&self.name) {
430 None => {
431 if self.fail_if_no_consumers != Some(false) {
432 return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
433 "direct endpoint '{}' not registered",
434 self.name
435 ))));
436 }
437 }
438 Some(sender) if sender.is_closed() => {
439 return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
440 "direct endpoint '{}' channel closed",
441 self.name
442 ))));
443 }
444 Some(_) => {}
445 }
446 }
447
448 let fut = self
450 .acquire_fut
451 .get_or_insert_with(|| Box::pin(Arc::clone(&self.semaphore).acquire_owned()));
452 match fut.as_mut().poll(cx) {
453 Poll::Ready(Ok(permit)) => {
454 self.acquire_fut = None;
455 self.pending_permit = Some(permit);
456 Poll::Ready(Ok(()))
457 }
458 Poll::Ready(Err(_)) => Poll::Ready(Err(CamelError::ChannelClosed)),
459 Poll::Pending => Poll::Pending,
460 }
461 }
462
463 fn call(&mut self, exchange: Exchange) -> Self::Future {
464 let _permit = match self.pending_permit.take() {
465 Some(p) => p,
466 None => {
467 return Box::pin(async {
468 Err(CamelError::ProcessorError(
469 "call() invoked without poll_ready()".into(),
470 ))
471 });
472 }
473 };
474
475 let name = self.name.clone();
476 let registry = Arc::clone(&self.registry);
477 let timeout = Duration::from_millis(self.config.timeout_ms.unwrap_or(30_000));
478 let exchange_id = exchange.correlation_id.clone();
479
480 debug!(
481 endpoint_name = %name,
482 exchange_id = %exchange_id,
483 "direct producer call entry"
484 );
485
486 Box::pin(async move {
487 tokio::time::timeout(timeout, async {
488 let sender = {
489 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
490 reg.get(&name)
491 .ok_or_else(|| {
492 let err = CamelError::EndpointCreationFailed(format!(
493 "no consumer registered for direct:{name}"
494 ));
495 warn!(endpoint_name = %name, error = %err, "direct send failed");
498 err
499 })?
500 .clone()
501 };
502
503 let (reply_tx, reply_rx) = oneshot::channel();
504 sender.send((exchange, reply_tx)).await.map_err(|err| {
505 warn!(endpoint_name = %name, error = %err, "direct send failed");
508 CamelError::ChannelClosed
509 })?;
510
511 let result = reply_rx.await.map_err(|err| {
512 warn!(endpoint_name = %name, error = %err, "direct send failed");
515 CamelError::ChannelClosed
516 })?;
517
518 debug!(endpoint_name = %name, "direct message sent");
519 result
520 })
521 .await
522 .map_err(|_| CamelError::ProcessorError(format!("direct:{name} call timed out")))?
523 })
524 }
525}
526
527#[cfg(test)]
532mod tests {
533 use camel_component_api::test_support::PanicRuntimeObservability;
534 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
535 std::sync::Arc::new(PanicRuntimeObservability)
536 }
537
538 use super::*;
539 use camel_component_api::ExchangeEnvelope;
540 use camel_component_api::Message;
541 use camel_component_api::NoOpComponentContext;
542 use std::task::RawWakerVTable;
543 use tower::ServiceExt;
544
545 fn noop_waker() -> std::task::Waker {
546 const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
547 const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
548 unsafe { std::task::Waker::from_raw(RAW) }
549 }
550
551 fn test_producer_ctx() -> ProducerContext {
552 ProducerContext::new()
553 }
554
555 #[test]
556 fn test_direct_component_scheme() {
557 let component = DirectComponent::new();
558 assert_eq!(component.scheme(), "direct");
559 }
560
561 #[test]
562 fn test_direct_component_default() {
563 let component = DirectComponent::default();
564 assert_eq!(component.scheme(), "direct");
565 }
566
567 #[test]
568 fn test_direct_config_from_uri() {
569 let config = DirectConfig::from_uri("direct:orders").unwrap();
570 assert_eq!(config.name, "orders");
571 }
572
573 #[test]
574 fn test_direct_endpoint_uri() {
575 let component = DirectComponent::new();
576 let endpoint = component
577 .create_endpoint("direct:uri-check", &NoOpComponentContext)
578 .unwrap();
579 assert_eq!(endpoint.uri(), "direct:uri-check");
580 }
581
582 #[test]
583 fn test_direct_creates_endpoint() {
584 let component = DirectComponent::new();
585 let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
586 assert!(endpoint.is_ok());
587 }
588
589 #[test]
590 fn test_direct_wrong_scheme() {
591 let component = DirectComponent::new();
592 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
593 assert!(result.is_err());
594 }
595
596 #[test]
597 fn test_direct_endpoint_creates_consumer() {
598 let component = DirectComponent::new();
599 let endpoint = component
600 .create_endpoint("direct:foo", &NoOpComponentContext)
601 .unwrap();
602 assert!(endpoint.create_consumer(rt()).is_ok());
603 }
604
605 #[test]
606 fn test_direct_endpoint_creates_producer() {
607 let ctx = test_producer_ctx();
608 let component = DirectComponent::new();
609 let endpoint = component
610 .create_endpoint("direct:foo", &NoOpComponentContext)
611 .unwrap();
612 assert!(endpoint.create_producer(rt(), &ctx).is_ok());
613 }
614
615 #[test]
616 fn test_direct_empty_name_rejected() {
617 let component = DirectComponent::new();
618 match component.create_endpoint("direct:", &NoOpComponentContext) {
619 Err(e) => assert!(
620 e.to_string().contains("must not be empty"),
621 "unexpected error: {e}"
622 ),
623 Ok(_) => panic!("expected error for empty name"),
624 }
625 }
626
627 #[tokio::test]
628 async fn test_direct_producer_no_consumer_registered() {
629 let ctx = test_producer_ctx();
630 let component = DirectComponent::new();
631 let endpoint = component
632 .create_endpoint("direct:missing", &NoOpComponentContext)
633 .unwrap();
634 let producer = endpoint.create_producer(rt(), &ctx).unwrap();
635
636 let exchange = Exchange::new(Message::new("test"));
637 let result = producer.oneshot(exchange).await;
638 assert!(result.is_err());
639 }
640
641 #[tokio::test]
642 async fn test_direct_duplicate_consumer_returns_error() {
643 let component = DirectComponent::new();
644 let endpoint = component
645 .create_endpoint("direct:dup", &NoOpComponentContext)
646 .unwrap();
647
648 let mut consumer_a = endpoint.create_consumer(rt()).unwrap();
649 let mut consumer_b = endpoint.create_consumer(rt()).unwrap();
650
651 let (route_tx_a, _route_rx_a) = mpsc::channel::<ExchangeEnvelope>(16);
652 let ctx_a = ConsumerContext::new(route_tx_a, tokio_util::sync::CancellationToken::new());
653 consumer_a.start(ctx_a).await.unwrap();
654
655 let (route_tx_b, _route_rx_b) = mpsc::channel::<ExchangeEnvelope>(16);
656 let ctx_b = ConsumerContext::new(route_tx_b, tokio_util::sync::CancellationToken::new());
657 let result = consumer_b.start(ctx_b).await;
658
659 assert!(matches!(
660 result,
661 Err(CamelError::EndpointCreationFailed(msg))
662 if msg.contains("already has a registered consumer")
663 ));
664
665 consumer_a.stop().await.unwrap();
666 }
667
668 #[tokio::test]
669 async fn test_direct_producer_consumer_roundtrip() {
670 let component = DirectComponent::new();
671
672 let consumer_endpoint = component
674 .create_endpoint("direct:test", &NoOpComponentContext)
675 .unwrap();
676 let mut consumer = consumer_endpoint.create_consumer(rt()).unwrap();
677
678 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
680 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
681
682 tokio::spawn(async move {
684 consumer.start(ctx).await.unwrap();
685 });
686
687 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
689
690 tokio::spawn(async move {
692 while let Some(envelope) = route_rx.recv().await {
693 let ExchangeEnvelope { exchange, reply_tx } = envelope;
694 if let Some(tx) = reply_tx {
695 let _ = tx.send(Ok(exchange));
696 }
697 }
698 });
699
700 let ctx = test_producer_ctx();
702 let producer_endpoint = component
703 .create_endpoint("direct:test", &NoOpComponentContext)
704 .unwrap();
705 let producer = producer_endpoint.create_producer(rt(), &ctx).unwrap();
706
707 let exchange = Exchange::new(Message::new("hello direct"));
708 let result = producer.oneshot(exchange).await;
709
710 assert!(result.is_ok());
711 let reply = result.unwrap();
712 assert_eq!(reply.input.body.as_text(), Some("hello direct"));
713 }
714
715 #[tokio::test]
716 async fn test_direct_propagates_error_when_no_handler() {
717 let component = DirectComponent::new();
718
719 let consumer_endpoint = component
720 .create_endpoint("direct:err-test", &NoOpComponentContext)
721 .unwrap();
722 let mut consumer = consumer_endpoint.create_consumer(rt()).unwrap();
723
724 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
725 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
726
727 tokio::spawn(async move {
728 consumer.start(ctx).await.unwrap();
729 });
730
731 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
732
733 tokio::spawn(async move {
735 while let Some(envelope) = route_rx.recv().await {
736 if let Some(tx) = envelope.reply_tx {
737 let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
738 }
739 }
740 });
741
742 let ctx = test_producer_ctx();
743 let producer_endpoint = component
744 .create_endpoint("direct:err-test", &NoOpComponentContext)
745 .unwrap();
746 let producer = producer_endpoint.create_producer(rt(), &ctx).unwrap();
747
748 let exchange = Exchange::new(Message::new("test"));
749 let result = producer.oneshot(exchange).await;
750 assert!(result.is_err());
751 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
752 }
753
754 #[tokio::test]
755 async fn test_direct_consumer_stop_unregisters() {
756 let component = DirectComponent::new();
757 let endpoint = component
758 .create_endpoint("direct:cleanup", &NoOpComponentContext)
759 .unwrap();
760
761 let mut consumer = endpoint.create_consumer(rt()).unwrap();
763
764 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
765 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
766
767 let handle = tokio::spawn(async move {
769 consumer.start(ctx).await.unwrap();
770 });
771
772 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
773
774 {
776 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
777 assert!(reg.contains_key("cleanup"));
778 }
779
780 let mut stop_consumer = DirectConsumer {
782 name: "cleanup".to_string(),
783 registry: Arc::clone(&component.registry),
784 cancel: None,
785 handle: None,
786 runtime: rt(),
787 };
788 stop_consumer.stop().await.unwrap();
789
790 {
792 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
793 assert!(!reg.contains_key("cleanup"));
794 }
795
796 handle.abort();
797 }
798
799 #[tokio::test]
800 async fn test_direct_consumer_respects_cancellation() {
801 use tokio_util::sync::CancellationToken;
802
803 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
804 let token = CancellationToken::new();
805 let (tx, _rx) = mpsc::channel(16);
806 let ctx = ConsumerContext::new(tx, token.clone());
807
808 let mut consumer = DirectConsumer {
809 name: "cancel-test".to_string(),
810 registry: registry.clone(),
811 cancel: None,
812 handle: None,
813 runtime: rt(),
814 };
815
816 let handle = tokio::spawn(async move {
817 consumer.start(ctx).await.unwrap();
818 });
819
820 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
821 assert!(
822 registry
823 .lock()
824 .unwrap_or_else(|e| e.into_inner())
825 .contains_key("cancel-test")
826 );
827
828 token.cancel();
829 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
833
834 assert!(
836 !registry
837 .lock()
838 .unwrap_or_else(|e| e.into_inner())
839 .contains_key("cancel-test")
840 );
841
842 let _ = handle.await;
843 }
844
845 #[tokio::test]
846 async fn test_direct_consumer_stop_missing_entry_is_ok() {
847 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
848 let mut consumer = DirectConsumer {
849 name: "never-registered".to_string(),
850 registry,
851 cancel: None,
852 handle: None,
853 runtime: rt(),
854 };
855 let result = consumer.stop().await;
856 assert!(result.is_ok());
857 }
858
859 #[test]
860 fn test_poll_ready_endpoint_not_registered() {
861 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
862 let producer = DirectProducer {
863 name: "missing".to_string(),
864 registry,
865 config: DirectConfig {
866 name: "missing".to_string(),
867 timeout_ms: None,
868 block: None,
869 fail_if_no_consumers: None,
870 exchange_pattern: None,
871 },
872 semaphore: Arc::new(Semaphore::new(1)),
873 pending_permit: None,
874 acquire_fut: None,
875 fail_if_no_consumers: None,
876 };
877 let waker = noop_waker();
878 let mut cx = Context::from_waker(&waker);
879 let mut producer = producer;
880 let result = producer.poll_ready(&mut cx);
881 assert!(matches!(
882 result,
883 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
884 ));
885 }
886
887 #[test]
888 fn test_poll_ready_endpoint_registered() {
889 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
890 let (tx, _rx) =
891 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
892 registry.lock().unwrap().insert("active".to_string(), tx);
893 let producer = DirectProducer {
894 name: "active".to_string(),
895 registry,
896 config: DirectConfig {
897 name: "active".to_string(),
898 timeout_ms: None,
899 block: None,
900 fail_if_no_consumers: None,
901 exchange_pattern: None,
902 },
903 semaphore: Arc::new(Semaphore::new(1)),
904 pending_permit: None,
905 acquire_fut: None,
906 fail_if_no_consumers: None,
907 };
908 let waker = noop_waker();
909 let mut cx = Context::from_waker(&waker);
910 let mut producer = producer;
911 let result = producer.poll_ready(&mut cx);
912 assert!(matches!(result, Poll::Ready(Ok(()))));
913 }
914
915 #[test]
916 fn test_poll_ready_allows_missing_consumer_when_fail_if_no_consumers_false() {
917 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
918 let producer = DirectProducer {
919 name: "missing-ok".to_string(),
920 registry,
921 config: DirectConfig {
922 name: "missing-ok".to_string(),
923 timeout_ms: None,
924 block: None,
925 fail_if_no_consumers: Some(false),
926 exchange_pattern: None,
927 },
928 semaphore: Arc::new(Semaphore::new(1)),
929 pending_permit: None,
930 acquire_fut: None,
931 fail_if_no_consumers: Some(false),
932 };
933
934 let waker = noop_waker();
935 let mut cx = Context::from_waker(&waker);
936 let mut producer = producer;
937 let result = producer.poll_ready(&mut cx);
938 assert!(matches!(result, Poll::Ready(Ok(()))));
939 }
940
941 #[test]
942 fn test_poll_ready_channel_closed() {
943 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
944 let (tx, rx) =
945 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
946 drop(rx);
947 registry.lock().unwrap().insert("closed".to_string(), tx);
948 let producer = DirectProducer {
949 name: "closed".to_string(),
950 registry,
951 config: DirectConfig {
952 name: "closed".to_string(),
953 timeout_ms: None,
954 block: None,
955 fail_if_no_consumers: None,
956 exchange_pattern: None,
957 },
958 semaphore: Arc::new(Semaphore::new(1)),
959 pending_permit: None,
960 acquire_fut: None,
961 fail_if_no_consumers: None,
962 };
963 let waker = noop_waker();
964 let mut cx = Context::from_waker(&waker);
965 let mut producer = producer;
966 let result = producer.poll_ready(&mut cx);
967 assert!(matches!(
968 result,
969 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
970 ));
971 }
972
973 #[tokio::test]
974 async fn test_direct_stop_cancels_loop() {
975 use tokio_util::sync::CancellationToken;
976
977 let component = DirectComponent::new();
978 let endpoint = component
979 .create_endpoint("direct:stop-test", &NoOpComponentContext)
980 .unwrap();
981 let mut consumer = endpoint.create_consumer(rt()).unwrap();
982
983 let token = CancellationToken::new();
984 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
985 let ctx = ConsumerContext::new(route_tx, token.clone());
986
987 let handle = tokio::spawn(async move {
991 consumer.start(ctx).await.unwrap();
992 });
993
994 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
995 assert!(
996 component
997 .registry
998 .lock()
999 .unwrap_or_else(|e| e.into_inner())
1000 .contains_key("stop-test")
1001 );
1002
1003 let mut stop_consumer = DirectConsumer {
1005 name: "stop-test".to_string(),
1006 registry: Arc::clone(&component.registry),
1007 cancel: Some(token.clone()),
1008 handle: None,
1009 runtime: rt(),
1010 };
1011 stop_consumer.stop().await.unwrap();
1012
1013 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
1015 assert!(result.is_ok(), "Consumer loop did not stop within 2s");
1016
1017 assert!(
1019 !component
1020 .registry
1021 .lock()
1022 .unwrap_or_else(|e| e.into_inner())
1023 .contains_key("stop-test")
1024 );
1025 }
1026
1027 #[tokio::test]
1028 async fn test_direct_producer_timeout() {
1029 let component = DirectComponent::new();
1030 let endpoint = component
1031 .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1032 .unwrap();
1033 let mut consumer = endpoint.create_consumer(rt()).unwrap();
1034
1035 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
1037 let token = tokio_util::sync::CancellationToken::new();
1038 let ctx = ConsumerContext::new(route_tx, token.clone());
1039 tokio::spawn(async move {
1040 consumer.start(ctx).await.unwrap();
1041 });
1042
1043 tokio::spawn(async move {
1045 let mut held_reply: Vec<oneshot::Sender<Result<Exchange, CamelError>>> = Vec::new();
1046 while let Some(envelope) = route_rx.recv().await {
1047 held_reply.push(envelope.reply_tx.unwrap());
1048 }
1049 drop(held_reply);
1050 });
1051
1052 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1053
1054 let _ = test_producer_ctx();
1056 let _producer_endpoint = component
1057 .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1058 .unwrap();
1059 let producer = DirectProducer {
1060 name: "timeout-test".to_string(),
1061 registry: Arc::clone(&component.registry),
1062 config: DirectConfig {
1063 name: "timeout-test".to_string(),
1064 timeout_ms: Some(100), block: None,
1066 fail_if_no_consumers: None,
1067 exchange_pattern: None,
1068 },
1069 semaphore: Arc::new(Semaphore::new(1)),
1070 pending_permit: None,
1071 acquire_fut: None,
1072 fail_if_no_consumers: None,
1073 };
1074
1075 let exchange = Exchange::new(Message::new("test"));
1076 let mut svc = producer;
1077 let _ = svc.poll_ready(&mut Context::from_waker(&noop_waker()));
1078 let result = svc.call(exchange).await;
1079 assert!(result.is_err(), "Expected timeout error");
1080 assert!(
1081 result.unwrap_err().to_string().contains("timed out"),
1082 "Expected timeout message"
1083 );
1084
1085 token.cancel();
1086 }
1087
1088 #[test]
1089 fn test_empty_endpoint_name_rejected() {
1090 let result = DirectConfig::from_uri("direct:");
1091 if let Ok(config) = result {
1093 assert!(
1094 validate_name(&config.name).is_err(),
1095 "expected validation error for empty name"
1096 );
1097 }
1098 let component = DirectComponent::new();
1100 let result = component.create_endpoint("direct:", &NoOpComponentContext);
1101 assert!(result.is_err(), "empty endpoint name must be rejected");
1102 }
1103
1104 #[test]
1105 fn test_whitespace_endpoint_name_rejected() {
1106 let result = DirectConfig::from_uri("direct:my endpoint");
1107 if let Ok(config) = result {
1108 assert!(
1109 validate_name(&config.name).is_err(),
1110 "expected validation error for whitespace in name"
1111 );
1112 }
1113 let component = DirectComponent::new();
1114 let result = component.create_endpoint("direct:my endpoint", &NoOpComponentContext);
1115 assert!(result.is_err(), "whitespace endpoint name must be rejected");
1116 }
1117
1118 #[test]
1119 fn test_valid_endpoint_name_accepted() {
1120 let component = DirectComponent::new();
1121 let result = component.create_endpoint("direct:my-endpoint", &NoOpComponentContext);
1122 assert!(result.is_ok(), "valid endpoint name should be accepted");
1123 }
1124}