1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tokio_util::sync::CancellationToken;
17use tower::Service;
18
19use camel_component_api::parse_uri;
20use camel_component_api::{BoxProcessor, CamelError, Exchange};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tracing::{debug, error, info, warn};
23
24type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
32type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
33type AcquirePermitFut =
34 Pin<Box<dyn Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send>>;
35
36fn validate_name(name: &str) -> Result<(), CamelError> {
42 if name.trim().is_empty() {
43 return Err(CamelError::InvalidUri(
44 "direct: endpoint name must not be empty".to_string(),
45 ));
46 }
47 if name.contains(char::is_whitespace) {
48 return Err(CamelError::InvalidUri(
49 "direct: endpoint name must not contain whitespace".to_string(),
50 ));
51 }
52 Ok(())
53}
54
55#[derive(Debug, Clone)]
65pub struct DirectConfig {
66 pub name: String,
68 pub timeout_ms: Option<u64>,
70 pub block: Option<bool>,
73 pub fail_if_no_consumers: Option<bool>,
75 pub bridge_error_handler: Option<bool>,
77 pub exchange_pattern: Option<String>,
79}
80
81impl DirectConfig {
82 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
83 let parts = parse_uri(uri)?;
84 if parts.scheme != "direct" {
85 return Err(CamelError::InvalidUri(format!(
86 "invalid scheme '{}', expected 'direct'",
87 parts.scheme
88 )));
89 }
90
91 let parse_bool = |name: &str, value: &str| -> Result<bool, CamelError> {
92 match value.to_ascii_lowercase().as_str() {
93 "true" | "1" | "yes" => Ok(true),
94 "false" | "0" | "no" => Ok(false),
95 _ => Err(CamelError::InvalidUri(format!(
96 "invalid value for {}: invalid boolean value: '{}'",
97 name, value
98 ))),
99 }
100 };
101
102 let timeout_ms = parts
103 .params
104 .get("timeout_ms")
105 .map(|v| {
106 v.parse::<u64>().map_err(|e| {
107 CamelError::InvalidUri(format!("invalid value for timeout_ms: {}", e))
108 })
109 })
110 .transpose()?;
111
112 let block = parts
113 .params
114 .get("block")
115 .map(|v| parse_bool("block", v))
116 .transpose()?;
117
118 let fail_if_no_consumers = parts
119 .params
120 .get("fail_if_no_consumers")
121 .or_else(|| parts.params.get("failIfNoConsumers"))
122 .map(|v| parse_bool("fail_if_no_consumers", v))
123 .transpose()?;
124
125 let bridge_error_handler = parts
126 .params
127 .get("bridge_error_handler")
128 .or_else(|| parts.params.get("bridgeErrorHandler"))
129 .map(|v| parse_bool("bridge_error_handler", v))
130 .transpose()?;
131
132 let exchange_pattern = parts
133 .params
134 .get("exchange_pattern")
135 .or_else(|| parts.params.get("exchangePattern"))
136 .cloned();
137
138 Ok(Self {
139 name: parts.path,
140 timeout_ms,
141 block,
142 fail_if_no_consumers,
143 bridge_error_handler,
144 exchange_pattern,
145 })
146 }
147}
148
149pub struct DirectComponent {
161 registry: DirectRegistry,
162}
163
164impl DirectComponent {
165 pub fn new() -> Self {
166 Self {
167 registry: Arc::new(Mutex::new(HashMap::new())),
168 }
169 }
170}
171
172impl Default for DirectComponent {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl Component for DirectComponent {
179 fn scheme(&self) -> &str {
180 "direct"
181 }
182
183 fn create_endpoint(
184 &self,
185 uri: &str,
186 _ctx: &dyn camel_component_api::ComponentContext,
187 ) -> Result<Box<dyn Endpoint>, CamelError> {
188 let config = DirectConfig::from_uri(uri)?;
189 validate_name(&config.name)?;
190 let name = config.name.clone();
191 debug!(endpoint_name = %name, "direct endpoint created");
192 Ok(Box::new(DirectEndpoint {
193 uri: uri.to_string(),
194 config,
195 registry: Arc::clone(&self.registry),
196 }))
197 }
198}
199
200struct DirectEndpoint {
205 uri: String,
206 config: DirectConfig,
207 registry: DirectRegistry,
208}
209
210impl Endpoint for DirectEndpoint {
211 fn uri(&self) -> &str {
212 &self.uri
213 }
214
215 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
216 Ok(Box::new(DirectConsumer {
217 name: self.config.name.clone(),
218 registry: Arc::clone(&self.registry),
219 cancel: None,
220 handle: None,
221 }))
222 }
223
224 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
225 Ok(BoxProcessor::new(DirectProducer {
226 name: self.config.name.clone(),
227 registry: Arc::clone(&self.registry),
228 config: self.config.clone(),
229 semaphore: Arc::new(Semaphore::new(1)),
230 pending_permit: None,
231 acquire_fut: None,
232 fail_if_no_consumers: self.config.fail_if_no_consumers,
233 }))
234 }
235}
236
237struct DirectConsumer {
244 name: String,
245 registry: DirectRegistry,
246 cancel: Option<CancellationToken>,
247 handle: Option<JoinHandle<()>>,
248}
249
250#[async_trait]
251impl Consumer for DirectConsumer {
252 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
253 let (tx, mut rx) =
255 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
256
257 {
259 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
260 if let Some(existing) = reg.get(&self.name)
261 && !existing.is_closed()
262 {
263 return Err(CamelError::EndpointCreationFailed(format!(
264 "direct endpoint '{}' already has a registered consumer",
265 self.name
266 )));
267 }
268 reg.insert(self.name.clone(), tx);
269 }
270
271 let name = self.name.clone();
272 let registry = Arc::clone(&self.registry);
273 let cancel = context.cancel_token();
274 let cancel_clone = cancel.clone();
275
276 info!(endpoint_name = %self.name, "direct consumer started");
277
278 let handle = tokio::spawn(async move {
280 loop {
281 tokio::select! {
282 _ = cancel_clone.cancelled() => {
283 debug!(endpoint_name = %name, "direct consumer received cancellation");
284 break;
285 }
286 msg = rx.recv() => {
287 match msg {
288 Some((exchange, reply_tx)) => {
289 debug!(
290 endpoint_name = %name,
291 exchange_id = %exchange.correlation_id,
292 "direct consumer received exchange"
293 );
294 let result = context.send_and_wait(exchange).await;
295 if let Err(ref err) = result {
296 error!(
297 endpoint_name = %name,
298 error = %err,
299 "direct consumer pipeline error"
300 );
301 }
302 let _ = reply_tx.send(result);
303 }
304 None => break,
305 }
306 }
307 }
308 }
309
310 {
312 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
313 reg.remove(&name);
314 }
315
316 debug!(endpoint_name = %name, "direct consumer stopped");
317 });
318
319 self.cancel = Some(cancel);
320 self.handle = Some(handle);
321 Ok(())
322 }
323
324 async fn stop(&mut self) -> Result<(), CamelError> {
325 if let Some(cancel) = self.cancel.take() {
327 cancel.cancel();
328 }
329
330 if let Some(mut h) = self.handle.take() {
332 if tokio::time::timeout(Duration::from_secs(5), &mut h)
333 .await
334 .is_err()
335 {
336 h.abort();
337 let _ = h.await;
338 warn!(endpoint_name = %self.name, "consumer task did not stop in 5s; aborted");
339 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
341 reg.remove(&self.name);
342 }
343 } else {
344 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
346 reg.remove(&self.name);
347 }
348
349 debug!(endpoint_name = %self.name, "direct consumer stopped");
350 Ok(())
351 }
352}
353
354struct DirectProducer {
361 name: String,
362 registry: DirectRegistry,
363 config: DirectConfig,
364 semaphore: Arc<Semaphore>,
365 pending_permit: Option<OwnedSemaphorePermit>,
366 acquire_fut: Option<AcquirePermitFut>,
367 fail_if_no_consumers: Option<bool>,
368}
369
370impl Clone for DirectProducer {
371 fn clone(&self) -> Self {
372 Self {
373 name: self.name.clone(),
374 registry: self.registry.clone(),
375 config: self.config.clone(),
376 semaphore: self.semaphore.clone(),
377 pending_permit: None,
378 acquire_fut: None,
379 fail_if_no_consumers: self.fail_if_no_consumers,
380 }
381 }
382}
383
384impl Service<Exchange> for DirectProducer {
385 type Response = Exchange;
386 type Error = CamelError;
387 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
388
389 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
390 if self.pending_permit.is_some() {
392 return Poll::Ready(Ok(()));
393 }
394
395 {
397 let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
398 match reg.get(&self.name) {
399 None => {
400 if self.fail_if_no_consumers != Some(false) {
401 return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
402 "direct endpoint '{}' not registered",
403 self.name
404 ))));
405 }
406 }
407 Some(sender) if sender.is_closed() => {
408 return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
409 "direct endpoint '{}' channel closed",
410 self.name
411 ))));
412 }
413 Some(_) => {}
414 }
415 }
416
417 let fut = self
419 .acquire_fut
420 .get_or_insert_with(|| Box::pin(Arc::clone(&self.semaphore).acquire_owned()));
421 match fut.as_mut().poll(cx) {
422 Poll::Ready(Ok(permit)) => {
423 self.acquire_fut = None;
424 self.pending_permit = Some(permit);
425 Poll::Ready(Ok(()))
426 }
427 Poll::Ready(Err(_)) => Poll::Ready(Err(CamelError::ChannelClosed)),
428 Poll::Pending => Poll::Pending,
429 }
430 }
431
432 fn call(&mut self, exchange: Exchange) -> Self::Future {
433 let _permit = match self.pending_permit.take() {
434 Some(p) => p,
435 None => {
436 return Box::pin(async {
437 Err(CamelError::ProcessorError(
438 "call() invoked without poll_ready()".into(),
439 ))
440 });
441 }
442 };
443
444 let name = self.name.clone();
445 let registry = Arc::clone(&self.registry);
446 let timeout = Duration::from_millis(self.config.timeout_ms.unwrap_or(30_000));
447 let exchange_id = exchange.correlation_id.clone();
448
449 debug!(
450 endpoint_name = %name,
451 exchange_id = %exchange_id,
452 "direct producer call entry"
453 );
454
455 Box::pin(async move {
456 tokio::time::timeout(timeout, async {
457 let sender = {
458 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
459 reg.get(&name)
460 .ok_or_else(|| {
461 let err = CamelError::EndpointCreationFailed(format!(
462 "no consumer registered for direct:{name}"
463 ));
464 error!(endpoint_name = %name, error = %err, "direct send failed");
465 err
466 })?
467 .clone()
468 };
469
470 let (reply_tx, reply_rx) = oneshot::channel();
471 sender.send((exchange, reply_tx)).await.map_err(|err| {
472 error!(endpoint_name = %name, error = %err, "direct send failed");
473 CamelError::ChannelClosed
474 })?;
475
476 let result = reply_rx.await.map_err(|err| {
477 error!(endpoint_name = %name, error = %err, "direct send failed");
478 CamelError::ChannelClosed
479 })?;
480
481 debug!(endpoint_name = %name, "direct message sent");
482 result
483 })
484 .await
485 .map_err(|_| CamelError::ProcessorError(format!("direct:{name} call timed out")))?
486 })
487 }
488}
489
490#[cfg(test)]
495mod tests {
496 use super::*;
497 use camel_component_api::ExchangeEnvelope;
498 use camel_component_api::Message;
499 use camel_component_api::NoOpComponentContext;
500 use std::task::RawWakerVTable;
501 use tower::ServiceExt;
502
503 fn noop_waker() -> std::task::Waker {
504 const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
505 const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
506 unsafe { std::task::Waker::from_raw(RAW) }
507 }
508
509 fn test_producer_ctx() -> ProducerContext {
510 ProducerContext::new()
511 }
512
513 #[test]
514 fn test_direct_component_scheme() {
515 let component = DirectComponent::new();
516 assert_eq!(component.scheme(), "direct");
517 }
518
519 #[test]
520 fn test_direct_component_default() {
521 let component = DirectComponent::default();
522 assert_eq!(component.scheme(), "direct");
523 }
524
525 #[test]
526 fn test_direct_config_from_uri() {
527 let config = DirectConfig::from_uri("direct:orders").unwrap();
528 assert_eq!(config.name, "orders");
529 }
530
531 #[test]
532 fn test_direct_endpoint_uri() {
533 let component = DirectComponent::new();
534 let endpoint = component
535 .create_endpoint("direct:uri-check", &NoOpComponentContext)
536 .unwrap();
537 assert_eq!(endpoint.uri(), "direct:uri-check");
538 }
539
540 #[test]
541 fn test_direct_creates_endpoint() {
542 let component = DirectComponent::new();
543 let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
544 assert!(endpoint.is_ok());
545 }
546
547 #[test]
548 fn test_direct_wrong_scheme() {
549 let component = DirectComponent::new();
550 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
551 assert!(result.is_err());
552 }
553
554 #[test]
555 fn test_direct_endpoint_creates_consumer() {
556 let component = DirectComponent::new();
557 let endpoint = component
558 .create_endpoint("direct:foo", &NoOpComponentContext)
559 .unwrap();
560 assert!(endpoint.create_consumer().is_ok());
561 }
562
563 #[test]
564 fn test_direct_endpoint_creates_producer() {
565 let ctx = test_producer_ctx();
566 let component = DirectComponent::new();
567 let endpoint = component
568 .create_endpoint("direct:foo", &NoOpComponentContext)
569 .unwrap();
570 assert!(endpoint.create_producer(&ctx).is_ok());
571 }
572
573 #[test]
574 fn test_direct_empty_name_rejected() {
575 let component = DirectComponent::new();
576 match component.create_endpoint("direct:", &NoOpComponentContext) {
577 Err(e) => assert!(
578 e.to_string().contains("must not be empty"),
579 "unexpected error: {e}"
580 ),
581 Ok(_) => panic!("expected error for empty name"),
582 }
583 }
584
585 #[tokio::test]
586 async fn test_direct_producer_no_consumer_registered() {
587 let ctx = test_producer_ctx();
588 let component = DirectComponent::new();
589 let endpoint = component
590 .create_endpoint("direct:missing", &NoOpComponentContext)
591 .unwrap();
592 let producer = endpoint.create_producer(&ctx).unwrap();
593
594 let exchange = Exchange::new(Message::new("test"));
595 let result = producer.oneshot(exchange).await;
596 assert!(result.is_err());
597 }
598
599 #[tokio::test]
600 async fn test_direct_duplicate_consumer_returns_error() {
601 let component = DirectComponent::new();
602 let endpoint = component
603 .create_endpoint("direct:dup", &NoOpComponentContext)
604 .unwrap();
605
606 let mut consumer_a = endpoint.create_consumer().unwrap();
607 let mut consumer_b = endpoint.create_consumer().unwrap();
608
609 let (route_tx_a, _route_rx_a) = mpsc::channel::<ExchangeEnvelope>(16);
610 let ctx_a = ConsumerContext::new(route_tx_a, tokio_util::sync::CancellationToken::new());
611 consumer_a.start(ctx_a).await.unwrap();
612
613 let (route_tx_b, _route_rx_b) = mpsc::channel::<ExchangeEnvelope>(16);
614 let ctx_b = ConsumerContext::new(route_tx_b, tokio_util::sync::CancellationToken::new());
615 let result = consumer_b.start(ctx_b).await;
616
617 assert!(matches!(
618 result,
619 Err(CamelError::EndpointCreationFailed(msg))
620 if msg.contains("already has a registered consumer")
621 ));
622
623 consumer_a.stop().await.unwrap();
624 }
625
626 #[tokio::test]
627 async fn test_direct_producer_consumer_roundtrip() {
628 let component = DirectComponent::new();
629
630 let consumer_endpoint = component
632 .create_endpoint("direct:test", &NoOpComponentContext)
633 .unwrap();
634 let mut consumer = consumer_endpoint.create_consumer().unwrap();
635
636 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
638 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
639
640 tokio::spawn(async move {
642 consumer.start(ctx).await.unwrap();
643 });
644
645 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
647
648 tokio::spawn(async move {
650 while let Some(envelope) = route_rx.recv().await {
651 let ExchangeEnvelope { exchange, reply_tx } = envelope;
652 if let Some(tx) = reply_tx {
653 let _ = tx.send(Ok(exchange));
654 }
655 }
656 });
657
658 let ctx = test_producer_ctx();
660 let producer_endpoint = component
661 .create_endpoint("direct:test", &NoOpComponentContext)
662 .unwrap();
663 let producer = producer_endpoint.create_producer(&ctx).unwrap();
664
665 let exchange = Exchange::new(Message::new("hello direct"));
666 let result = producer.oneshot(exchange).await;
667
668 assert!(result.is_ok());
669 let reply = result.unwrap();
670 assert_eq!(reply.input.body.as_text(), Some("hello direct"));
671 }
672
673 #[tokio::test]
674 async fn test_direct_propagates_error_when_no_handler() {
675 let component = DirectComponent::new();
676
677 let consumer_endpoint = component
678 .create_endpoint("direct:err-test", &NoOpComponentContext)
679 .unwrap();
680 let mut consumer = consumer_endpoint.create_consumer().unwrap();
681
682 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
683 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
684
685 tokio::spawn(async move {
686 consumer.start(ctx).await.unwrap();
687 });
688
689 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
690
691 tokio::spawn(async move {
693 while let Some(envelope) = route_rx.recv().await {
694 if let Some(tx) = envelope.reply_tx {
695 let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
696 }
697 }
698 });
699
700 let ctx = test_producer_ctx();
701 let producer_endpoint = component
702 .create_endpoint("direct:err-test", &NoOpComponentContext)
703 .unwrap();
704 let producer = producer_endpoint.create_producer(&ctx).unwrap();
705
706 let exchange = Exchange::new(Message::new("test"));
707 let result = producer.oneshot(exchange).await;
708 assert!(result.is_err());
709 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
710 }
711
712 #[tokio::test]
713 async fn test_direct_consumer_stop_unregisters() {
714 let component = DirectComponent::new();
715 let endpoint = component
716 .create_endpoint("direct:cleanup", &NoOpComponentContext)
717 .unwrap();
718
719 let mut consumer = endpoint.create_consumer().unwrap();
721
722 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
723 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
724
725 let handle = tokio::spawn(async move {
727 consumer.start(ctx).await.unwrap();
728 });
729
730 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
731
732 {
734 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
735 assert!(reg.contains_key("cleanup"));
736 }
737
738 let mut stop_consumer = DirectConsumer {
740 name: "cleanup".to_string(),
741 registry: Arc::clone(&component.registry),
742 cancel: None,
743 handle: None,
744 };
745 stop_consumer.stop().await.unwrap();
746
747 {
749 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
750 assert!(!reg.contains_key("cleanup"));
751 }
752
753 handle.abort();
754 }
755
756 #[tokio::test]
757 async fn test_direct_consumer_respects_cancellation() {
758 use tokio_util::sync::CancellationToken;
759
760 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
761 let token = CancellationToken::new();
762 let (tx, _rx) = mpsc::channel(16);
763 let ctx = ConsumerContext::new(tx, token.clone());
764
765 let mut consumer = DirectConsumer {
766 name: "cancel-test".to_string(),
767 registry: registry.clone(),
768 cancel: None,
769 handle: None,
770 };
771
772 let handle = tokio::spawn(async move {
773 consumer.start(ctx).await.unwrap();
774 });
775
776 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
777 assert!(
778 registry
779 .lock()
780 .unwrap_or_else(|e| e.into_inner())
781 .contains_key("cancel-test")
782 );
783
784 token.cancel();
785 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
789
790 assert!(
792 !registry
793 .lock()
794 .unwrap_or_else(|e| e.into_inner())
795 .contains_key("cancel-test")
796 );
797
798 let _ = handle.await;
799 }
800
801 #[tokio::test]
802 async fn test_direct_consumer_stop_missing_entry_is_ok() {
803 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
804 let mut consumer = DirectConsumer {
805 name: "never-registered".to_string(),
806 registry,
807 cancel: None,
808 handle: None,
809 };
810 let result = consumer.stop().await;
811 assert!(result.is_ok());
812 }
813
814 #[test]
815 fn test_poll_ready_endpoint_not_registered() {
816 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
817 let producer = DirectProducer {
818 name: "missing".to_string(),
819 registry,
820 config: DirectConfig {
821 name: "missing".to_string(),
822 timeout_ms: None,
823 block: None,
824 fail_if_no_consumers: None,
825 bridge_error_handler: None,
826 exchange_pattern: None,
827 },
828 semaphore: Arc::new(Semaphore::new(1)),
829 pending_permit: None,
830 acquire_fut: None,
831 fail_if_no_consumers: None,
832 };
833 let waker = noop_waker();
834 let mut cx = Context::from_waker(&waker);
835 let mut producer = producer;
836 let result = producer.poll_ready(&mut cx);
837 assert!(matches!(
838 result,
839 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
840 ));
841 }
842
843 #[test]
844 fn test_poll_ready_endpoint_registered() {
845 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
846 let (tx, _rx) =
847 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
848 registry.lock().unwrap().insert("active".to_string(), tx);
849 let producer = DirectProducer {
850 name: "active".to_string(),
851 registry,
852 config: DirectConfig {
853 name: "active".to_string(),
854 timeout_ms: None,
855 block: None,
856 fail_if_no_consumers: None,
857 bridge_error_handler: None,
858 exchange_pattern: None,
859 },
860 semaphore: Arc::new(Semaphore::new(1)),
861 pending_permit: None,
862 acquire_fut: None,
863 fail_if_no_consumers: None,
864 };
865 let waker = noop_waker();
866 let mut cx = Context::from_waker(&waker);
867 let mut producer = producer;
868 let result = producer.poll_ready(&mut cx);
869 assert!(matches!(result, Poll::Ready(Ok(()))));
870 }
871
872 #[test]
873 fn test_poll_ready_allows_missing_consumer_when_fail_if_no_consumers_false() {
874 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
875 let producer = DirectProducer {
876 name: "missing-ok".to_string(),
877 registry,
878 config: DirectConfig {
879 name: "missing-ok".to_string(),
880 timeout_ms: None,
881 block: None,
882 fail_if_no_consumers: Some(false),
883 bridge_error_handler: None,
884 exchange_pattern: None,
885 },
886 semaphore: Arc::new(Semaphore::new(1)),
887 pending_permit: None,
888 acquire_fut: None,
889 fail_if_no_consumers: Some(false),
890 };
891
892 let waker = noop_waker();
893 let mut cx = Context::from_waker(&waker);
894 let mut producer = producer;
895 let result = producer.poll_ready(&mut cx);
896 assert!(matches!(result, Poll::Ready(Ok(()))));
897 }
898
899 #[test]
900 fn test_poll_ready_channel_closed() {
901 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
902 let (tx, rx) =
903 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
904 drop(rx);
905 registry.lock().unwrap().insert("closed".to_string(), tx);
906 let producer = DirectProducer {
907 name: "closed".to_string(),
908 registry,
909 config: DirectConfig {
910 name: "closed".to_string(),
911 timeout_ms: None,
912 block: None,
913 fail_if_no_consumers: None,
914 bridge_error_handler: None,
915 exchange_pattern: None,
916 },
917 semaphore: Arc::new(Semaphore::new(1)),
918 pending_permit: None,
919 acquire_fut: None,
920 fail_if_no_consumers: None,
921 };
922 let waker = noop_waker();
923 let mut cx = Context::from_waker(&waker);
924 let mut producer = producer;
925 let result = producer.poll_ready(&mut cx);
926 assert!(matches!(
927 result,
928 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
929 ));
930 }
931
932 #[tokio::test]
933 async fn test_direct_stop_cancels_loop() {
934 use tokio_util::sync::CancellationToken;
935
936 let component = DirectComponent::new();
937 let endpoint = component
938 .create_endpoint("direct:stop-test", &NoOpComponentContext)
939 .unwrap();
940 let mut consumer = endpoint.create_consumer().unwrap();
941
942 let token = CancellationToken::new();
943 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
944 let ctx = ConsumerContext::new(route_tx, token.clone());
945
946 let handle = tokio::spawn(async move {
950 consumer.start(ctx).await.unwrap();
951 });
952
953 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
954 assert!(
955 component
956 .registry
957 .lock()
958 .unwrap_or_else(|e| e.into_inner())
959 .contains_key("stop-test")
960 );
961
962 let mut stop_consumer = DirectConsumer {
964 name: "stop-test".to_string(),
965 registry: Arc::clone(&component.registry),
966 cancel: Some(token.clone()),
967 handle: None,
968 };
969 stop_consumer.stop().await.unwrap();
970
971 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
973 assert!(result.is_ok(), "Consumer loop did not stop within 2s");
974
975 assert!(
977 !component
978 .registry
979 .lock()
980 .unwrap_or_else(|e| e.into_inner())
981 .contains_key("stop-test")
982 );
983 }
984
985 #[tokio::test]
986 async fn test_direct_producer_timeout() {
987 let component = DirectComponent::new();
988 let endpoint = component
989 .create_endpoint("direct:timeout-test", &NoOpComponentContext)
990 .unwrap();
991 let mut consumer = endpoint.create_consumer().unwrap();
992
993 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
995 let token = tokio_util::sync::CancellationToken::new();
996 let ctx = ConsumerContext::new(route_tx, token.clone());
997 tokio::spawn(async move {
998 consumer.start(ctx).await.unwrap();
999 });
1000
1001 tokio::spawn(async move {
1003 let mut held_reply: Vec<oneshot::Sender<Result<Exchange, CamelError>>> = Vec::new();
1004 while let Some(envelope) = route_rx.recv().await {
1005 held_reply.push(envelope.reply_tx.unwrap());
1006 }
1007 drop(held_reply);
1008 });
1009
1010 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1011
1012 let _ = test_producer_ctx();
1014 let _producer_endpoint = component
1015 .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1016 .unwrap();
1017 let producer = DirectProducer {
1018 name: "timeout-test".to_string(),
1019 registry: Arc::clone(&component.registry),
1020 config: DirectConfig {
1021 name: "timeout-test".to_string(),
1022 timeout_ms: Some(100), block: None,
1024 fail_if_no_consumers: None,
1025 bridge_error_handler: None,
1026 exchange_pattern: None,
1027 },
1028 semaphore: Arc::new(Semaphore::new(1)),
1029 pending_permit: None,
1030 acquire_fut: None,
1031 fail_if_no_consumers: None,
1032 };
1033
1034 let exchange = Exchange::new(Message::new("test"));
1035 let mut svc = producer;
1036 let _ = svc.poll_ready(&mut Context::from_waker(&noop_waker()));
1037 let result = svc.call(exchange).await;
1038 assert!(result.is_err(), "Expected timeout error");
1039 assert!(
1040 result.unwrap_err().to_string().contains("timed out"),
1041 "Expected timeout message"
1042 );
1043
1044 token.cancel();
1045 }
1046
1047 #[test]
1048 fn test_empty_endpoint_name_rejected() {
1049 let result = DirectConfig::from_uri("direct:");
1050 if let Ok(config) = result {
1052 assert!(
1053 validate_name(&config.name).is_err(),
1054 "expected validation error for empty name"
1055 );
1056 }
1057 let component = DirectComponent::new();
1059 let result = component.create_endpoint("direct:", &NoOpComponentContext);
1060 assert!(result.is_err(), "empty endpoint name must be rejected");
1061 }
1062
1063 #[test]
1064 fn test_whitespace_endpoint_name_rejected() {
1065 let result = DirectConfig::from_uri("direct:my endpoint");
1066 if let Ok(config) = result {
1067 assert!(
1068 validate_name(&config.name).is_err(),
1069 "expected validation error for whitespace in name"
1070 );
1071 }
1072 let component = DirectComponent::new();
1073 let result = component.create_endpoint("direct:my endpoint", &NoOpComponentContext);
1074 assert!(result.is_err(), "whitespace endpoint name must be rejected");
1075 }
1076
1077 #[test]
1078 fn test_valid_endpoint_name_accepted() {
1079 let component = DirectComponent::new();
1080 let result = component.create_endpoint("direct:my-endpoint", &NoOpComponentContext);
1081 assert!(result.is_ok(), "valid endpoint name should be accepted");
1082 }
1083}