1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11use std::time::Duration;
12
13use async_trait::async_trait;
14use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore, mpsc, oneshot};
15use tokio::task::JoinHandle;
16use tokio_util::sync::CancellationToken;
17use tower::Service;
18
19use camel_component_api::parse_uri;
20use camel_component_api::{BoxProcessor, CamelError, Exchange};
21use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
22use tracing::{debug, error, info, warn};
23
24type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
32type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
33type AcquirePermitFut =
34 Pin<Box<dyn Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send>>;
35
36fn validate_name(name: &str) -> Result<(), CamelError> {
42 if name.trim().is_empty() {
43 return Err(CamelError::InvalidUri(
44 "direct: endpoint name must not be empty".to_string(),
45 ));
46 }
47 if name.contains(char::is_whitespace) {
48 return Err(CamelError::InvalidUri(
49 "direct: endpoint name must not contain whitespace".to_string(),
50 ));
51 }
52 Ok(())
53}
54
55#[derive(Debug, Clone)]
65pub struct DirectConfig {
66 pub name: String,
68 pub timeout_ms: Option<u64>,
70 pub block: Option<bool>,
73 pub fail_if_no_consumers: Option<bool>,
75 pub bridge_error_handler: Option<bool>,
77 pub exchange_pattern: Option<String>,
79}
80
81impl DirectConfig {
82 pub fn from_uri(uri: &str) -> Result<Self, CamelError> {
83 let parts = parse_uri(uri)?;
84 if parts.scheme != "direct" {
85 return Err(CamelError::InvalidUri(format!(
86 "invalid scheme '{}', expected 'direct'",
87 parts.scheme
88 )));
89 }
90
91 let parse_bool = |name: &str, value: &str| -> Result<bool, CamelError> {
92 match value.to_ascii_lowercase().as_str() {
93 "true" | "1" | "yes" => Ok(true),
94 "false" | "0" | "no" => Ok(false),
95 _ => Err(CamelError::InvalidUri(format!(
96 "invalid value for {}: invalid boolean value: '{}'",
97 name, value
98 ))),
99 }
100 };
101
102 let timeout_ms = parts
103 .params
104 .get("timeout_ms")
105 .map(|v| {
106 v.parse::<u64>().map_err(|e| {
107 CamelError::InvalidUri(format!("invalid value for timeout_ms: {}", e))
108 })
109 })
110 .transpose()?;
111
112 let block = parts
113 .params
114 .get("block")
115 .map(|v| parse_bool("block", v))
116 .transpose()?;
117
118 let fail_if_no_consumers = parts
119 .params
120 .get("fail_if_no_consumers")
121 .or_else(|| parts.params.get("failIfNoConsumers"))
122 .map(|v| parse_bool("fail_if_no_consumers", v))
123 .transpose()?;
124
125 let bridge_error_handler = parts
126 .params
127 .get("bridge_error_handler")
128 .or_else(|| parts.params.get("bridgeErrorHandler"))
129 .map(|v| parse_bool("bridge_error_handler", v))
130 .transpose()?;
131
132 let exchange_pattern = parts
133 .params
134 .get("exchange_pattern")
135 .or_else(|| parts.params.get("exchangePattern"))
136 .cloned();
137
138 Ok(Self {
139 name: parts.path,
140 timeout_ms,
141 block,
142 fail_if_no_consumers,
143 bridge_error_handler,
144 exchange_pattern,
145 })
146 }
147}
148
149pub struct DirectComponent {
161 registry: DirectRegistry,
162}
163
164impl DirectComponent {
165 pub fn new() -> Self {
166 Self {
167 registry: Arc::new(Mutex::new(HashMap::new())),
168 }
169 }
170}
171
172impl Default for DirectComponent {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl Component for DirectComponent {
179 fn scheme(&self) -> &str {
180 "direct"
181 }
182
183 fn create_endpoint(
184 &self,
185 uri: &str,
186 _ctx: &dyn camel_component_api::ComponentContext,
187 ) -> Result<Box<dyn Endpoint>, CamelError> {
188 let config = DirectConfig::from_uri(uri)?;
189 validate_name(&config.name)?;
190 let name = config.name.clone();
191 debug!(endpoint_name = %name, "direct endpoint created");
192 Ok(Box::new(DirectEndpoint {
193 uri: uri.to_string(),
194 config,
195 registry: Arc::clone(&self.registry),
196 }))
197 }
198}
199
200struct DirectEndpoint {
205 uri: String,
206 config: DirectConfig,
207 registry: DirectRegistry,
208}
209
210impl Endpoint for DirectEndpoint {
211 fn uri(&self) -> &str {
212 &self.uri
213 }
214
215 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
216 Ok(Box::new(DirectConsumer {
217 name: self.config.name.clone(),
218 registry: Arc::clone(&self.registry),
219 cancel: None,
220 handle: None,
221 }))
222 }
223
224 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
225 Ok(BoxProcessor::new(DirectProducer {
226 name: self.config.name.clone(),
227 registry: Arc::clone(&self.registry),
228 config: self.config.clone(),
229 semaphore: Arc::new(Semaphore::new(1)),
230 pending_permit: None,
231 acquire_fut: None,
232 fail_if_no_consumers: self.config.fail_if_no_consumers,
233 }))
234 }
235}
236
237struct DirectConsumer {
244 name: String,
245 registry: DirectRegistry,
246 cancel: Option<CancellationToken>,
247 handle: Option<JoinHandle<Result<(), CamelError>>>,
248}
249
250#[async_trait]
251impl Consumer for DirectConsumer {
252 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
253 let (tx, mut rx) =
255 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
256
257 {
259 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
260 if let Some(existing) = reg.get(&self.name)
261 && !existing.is_closed()
262 {
263 return Err(CamelError::EndpointCreationFailed(format!(
264 "direct endpoint '{}' already has a registered consumer",
265 self.name
266 )));
267 }
268 reg.insert(self.name.clone(), tx);
269 }
270
271 let name = self.name.clone();
272 let registry = Arc::clone(&self.registry);
273 let cancel = context.cancel_token();
274 let cancel_clone = cancel.clone();
275
276 info!(endpoint_name = %self.name, "direct consumer started");
277
278 let handle = tokio::spawn(async move {
280 loop {
281 tokio::select! {
282 _ = cancel_clone.cancelled() => {
283 debug!(endpoint_name = %name, "direct consumer received cancellation");
284 break;
285 }
286 msg = rx.recv() => {
287 match msg {
288 Some((exchange, reply_tx)) => {
289 debug!(
290 endpoint_name = %name,
291 exchange_id = %exchange.correlation_id,
292 "direct consumer received exchange"
293 );
294 let result = context.send_and_wait(exchange).await;
295 if let Err(ref err) = result {
296 error!(
297 endpoint_name = %name,
298 error = %err,
299 "direct consumer pipeline error"
300 );
301 }
302 let _ = reply_tx.send(result);
303 }
304 None => break,
305 }
306 }
307 }
308 }
309
310 {
312 let mut reg = registry.lock().unwrap_or_else(|e| e.into_inner());
313 reg.remove(&name);
314 }
315
316 debug!(endpoint_name = %name, "direct consumer stopped");
317 Ok(())
318 });
319
320 self.cancel = Some(cancel);
321 self.handle = Some(handle);
322 Ok(())
323 }
324
325 async fn stop(&mut self) -> Result<(), CamelError> {
326 if let Some(cancel) = self.cancel.take() {
328 cancel.cancel();
329 }
330
331 if let Some(mut h) = self.handle.take() {
333 if tokio::time::timeout(Duration::from_secs(5), &mut h)
334 .await
335 .is_err()
336 {
337 h.abort();
338 let _ = h.await;
339 warn!(endpoint_name = %self.name, "consumer task did not stop in 5s; aborted");
340 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
342 reg.remove(&self.name);
343 }
344 } else {
345 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
347 reg.remove(&self.name);
348 }
349
350 debug!(endpoint_name = %self.name, "direct consumer stopped");
351 Ok(())
352 }
353
354 fn background_task_handle(&mut self) -> Option<JoinHandle<Result<(), CamelError>>> {
355 self.handle.take()
360 }
361}
362
363struct DirectProducer {
370 name: String,
371 registry: DirectRegistry,
372 config: DirectConfig,
373 semaphore: Arc<Semaphore>,
374 pending_permit: Option<OwnedSemaphorePermit>,
375 acquire_fut: Option<AcquirePermitFut>,
376 fail_if_no_consumers: Option<bool>,
377}
378
379impl Clone for DirectProducer {
380 fn clone(&self) -> Self {
381 Self {
382 name: self.name.clone(),
383 registry: self.registry.clone(),
384 config: self.config.clone(),
385 semaphore: self.semaphore.clone(),
386 pending_permit: None,
387 acquire_fut: None,
388 fail_if_no_consumers: self.fail_if_no_consumers,
389 }
390 }
391}
392
393impl Service<Exchange> for DirectProducer {
394 type Response = Exchange;
395 type Error = CamelError;
396 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
397
398 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
399 if self.pending_permit.is_some() {
401 return Poll::Ready(Ok(()));
402 }
403
404 {
406 let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
407 match reg.get(&self.name) {
408 None => {
409 if self.fail_if_no_consumers != Some(false) {
410 return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
411 "direct endpoint '{}' not registered",
412 self.name
413 ))));
414 }
415 }
416 Some(sender) if sender.is_closed() => {
417 return Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
418 "direct endpoint '{}' channel closed",
419 self.name
420 ))));
421 }
422 Some(_) => {}
423 }
424 }
425
426 let fut = self
428 .acquire_fut
429 .get_or_insert_with(|| Box::pin(Arc::clone(&self.semaphore).acquire_owned()));
430 match fut.as_mut().poll(cx) {
431 Poll::Ready(Ok(permit)) => {
432 self.acquire_fut = None;
433 self.pending_permit = Some(permit);
434 Poll::Ready(Ok(()))
435 }
436 Poll::Ready(Err(_)) => Poll::Ready(Err(CamelError::ChannelClosed)),
437 Poll::Pending => Poll::Pending,
438 }
439 }
440
441 fn call(&mut self, exchange: Exchange) -> Self::Future {
442 let _permit = match self.pending_permit.take() {
443 Some(p) => p,
444 None => {
445 return Box::pin(async {
446 Err(CamelError::ProcessorError(
447 "call() invoked without poll_ready()".into(),
448 ))
449 });
450 }
451 };
452
453 let name = self.name.clone();
454 let registry = Arc::clone(&self.registry);
455 let timeout = Duration::from_millis(self.config.timeout_ms.unwrap_or(30_000));
456 let exchange_id = exchange.correlation_id.clone();
457
458 debug!(
459 endpoint_name = %name,
460 exchange_id = %exchange_id,
461 "direct producer call entry"
462 );
463
464 Box::pin(async move {
465 tokio::time::timeout(timeout, async {
466 let sender = {
467 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
468 reg.get(&name)
469 .ok_or_else(|| {
470 let err = CamelError::EndpointCreationFailed(format!(
471 "no consumer registered for direct:{name}"
472 ));
473 error!(endpoint_name = %name, error = %err, "direct send failed");
474 err
475 })?
476 .clone()
477 };
478
479 let (reply_tx, reply_rx) = oneshot::channel();
480 sender.send((exchange, reply_tx)).await.map_err(|err| {
481 error!(endpoint_name = %name, error = %err, "direct send failed");
482 CamelError::ChannelClosed
483 })?;
484
485 let result = reply_rx.await.map_err(|err| {
486 error!(endpoint_name = %name, error = %err, "direct send failed");
487 CamelError::ChannelClosed
488 })?;
489
490 debug!(endpoint_name = %name, "direct message sent");
491 result
492 })
493 .await
494 .map_err(|_| CamelError::ProcessorError(format!("direct:{name} call timed out")))?
495 })
496 }
497}
498
499#[cfg(test)]
504mod tests {
505 use super::*;
506 use camel_component_api::ExchangeEnvelope;
507 use camel_component_api::Message;
508 use camel_component_api::NoOpComponentContext;
509 use std::task::RawWakerVTable;
510 use tower::ServiceExt;
511
512 fn noop_waker() -> std::task::Waker {
513 const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
514 const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
515 unsafe { std::task::Waker::from_raw(RAW) }
516 }
517
518 fn test_producer_ctx() -> ProducerContext {
519 ProducerContext::new()
520 }
521
522 #[test]
523 fn test_direct_component_scheme() {
524 let component = DirectComponent::new();
525 assert_eq!(component.scheme(), "direct");
526 }
527
528 #[test]
529 fn test_direct_component_default() {
530 let component = DirectComponent::default();
531 assert_eq!(component.scheme(), "direct");
532 }
533
534 #[test]
535 fn test_direct_config_from_uri() {
536 let config = DirectConfig::from_uri("direct:orders").unwrap();
537 assert_eq!(config.name, "orders");
538 }
539
540 #[test]
541 fn test_direct_endpoint_uri() {
542 let component = DirectComponent::new();
543 let endpoint = component
544 .create_endpoint("direct:uri-check", &NoOpComponentContext)
545 .unwrap();
546 assert_eq!(endpoint.uri(), "direct:uri-check");
547 }
548
549 #[test]
550 fn test_direct_creates_endpoint() {
551 let component = DirectComponent::new();
552 let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
553 assert!(endpoint.is_ok());
554 }
555
556 #[test]
557 fn test_direct_wrong_scheme() {
558 let component = DirectComponent::new();
559 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
560 assert!(result.is_err());
561 }
562
563 #[test]
564 fn test_direct_endpoint_creates_consumer() {
565 let component = DirectComponent::new();
566 let endpoint = component
567 .create_endpoint("direct:foo", &NoOpComponentContext)
568 .unwrap();
569 assert!(endpoint.create_consumer().is_ok());
570 }
571
572 #[test]
573 fn test_direct_endpoint_creates_producer() {
574 let ctx = test_producer_ctx();
575 let component = DirectComponent::new();
576 let endpoint = component
577 .create_endpoint("direct:foo", &NoOpComponentContext)
578 .unwrap();
579 assert!(endpoint.create_producer(&ctx).is_ok());
580 }
581
582 #[test]
583 fn test_direct_empty_name_rejected() {
584 let component = DirectComponent::new();
585 match component.create_endpoint("direct:", &NoOpComponentContext) {
586 Err(e) => assert!(
587 e.to_string().contains("must not be empty"),
588 "unexpected error: {e}"
589 ),
590 Ok(_) => panic!("expected error for empty name"),
591 }
592 }
593
594 #[tokio::test]
595 async fn test_direct_producer_no_consumer_registered() {
596 let ctx = test_producer_ctx();
597 let component = DirectComponent::new();
598 let endpoint = component
599 .create_endpoint("direct:missing", &NoOpComponentContext)
600 .unwrap();
601 let producer = endpoint.create_producer(&ctx).unwrap();
602
603 let exchange = Exchange::new(Message::new("test"));
604 let result = producer.oneshot(exchange).await;
605 assert!(result.is_err());
606 }
607
608 #[tokio::test]
609 async fn test_direct_duplicate_consumer_returns_error() {
610 let component = DirectComponent::new();
611 let endpoint = component
612 .create_endpoint("direct:dup", &NoOpComponentContext)
613 .unwrap();
614
615 let mut consumer_a = endpoint.create_consumer().unwrap();
616 let mut consumer_b = endpoint.create_consumer().unwrap();
617
618 let (route_tx_a, _route_rx_a) = mpsc::channel::<ExchangeEnvelope>(16);
619 let ctx_a = ConsumerContext::new(route_tx_a, tokio_util::sync::CancellationToken::new());
620 consumer_a.start(ctx_a).await.unwrap();
621
622 let (route_tx_b, _route_rx_b) = mpsc::channel::<ExchangeEnvelope>(16);
623 let ctx_b = ConsumerContext::new(route_tx_b, tokio_util::sync::CancellationToken::new());
624 let result = consumer_b.start(ctx_b).await;
625
626 assert!(matches!(
627 result,
628 Err(CamelError::EndpointCreationFailed(msg))
629 if msg.contains("already has a registered consumer")
630 ));
631
632 consumer_a.stop().await.unwrap();
633 }
634
635 #[tokio::test]
636 async fn test_direct_producer_consumer_roundtrip() {
637 let component = DirectComponent::new();
638
639 let consumer_endpoint = component
641 .create_endpoint("direct:test", &NoOpComponentContext)
642 .unwrap();
643 let mut consumer = consumer_endpoint.create_consumer().unwrap();
644
645 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
647 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
648
649 tokio::spawn(async move {
651 consumer.start(ctx).await.unwrap();
652 });
653
654 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
656
657 tokio::spawn(async move {
659 while let Some(envelope) = route_rx.recv().await {
660 let ExchangeEnvelope { exchange, reply_tx } = envelope;
661 if let Some(tx) = reply_tx {
662 let _ = tx.send(Ok(exchange));
663 }
664 }
665 });
666
667 let ctx = test_producer_ctx();
669 let producer_endpoint = component
670 .create_endpoint("direct:test", &NoOpComponentContext)
671 .unwrap();
672 let producer = producer_endpoint.create_producer(&ctx).unwrap();
673
674 let exchange = Exchange::new(Message::new("hello direct"));
675 let result = producer.oneshot(exchange).await;
676
677 assert!(result.is_ok());
678 let reply = result.unwrap();
679 assert_eq!(reply.input.body.as_text(), Some("hello direct"));
680 }
681
682 #[tokio::test]
683 async fn test_direct_propagates_error_when_no_handler() {
684 let component = DirectComponent::new();
685
686 let consumer_endpoint = component
687 .create_endpoint("direct:err-test", &NoOpComponentContext)
688 .unwrap();
689 let mut consumer = consumer_endpoint.create_consumer().unwrap();
690
691 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
692 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
693
694 tokio::spawn(async move {
695 consumer.start(ctx).await.unwrap();
696 });
697
698 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
699
700 tokio::spawn(async move {
702 while let Some(envelope) = route_rx.recv().await {
703 if let Some(tx) = envelope.reply_tx {
704 let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
705 }
706 }
707 });
708
709 let ctx = test_producer_ctx();
710 let producer_endpoint = component
711 .create_endpoint("direct:err-test", &NoOpComponentContext)
712 .unwrap();
713 let producer = producer_endpoint.create_producer(&ctx).unwrap();
714
715 let exchange = Exchange::new(Message::new("test"));
716 let result = producer.oneshot(exchange).await;
717 assert!(result.is_err());
718 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
719 }
720
721 #[tokio::test]
722 async fn test_direct_consumer_stop_unregisters() {
723 let component = DirectComponent::new();
724 let endpoint = component
725 .create_endpoint("direct:cleanup", &NoOpComponentContext)
726 .unwrap();
727
728 let mut consumer = endpoint.create_consumer().unwrap();
730
731 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
732 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
733
734 let handle = tokio::spawn(async move {
736 consumer.start(ctx).await.unwrap();
737 });
738
739 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
740
741 {
743 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
744 assert!(reg.contains_key("cleanup"));
745 }
746
747 let mut stop_consumer = DirectConsumer {
749 name: "cleanup".to_string(),
750 registry: Arc::clone(&component.registry),
751 cancel: None,
752 handle: None,
753 };
754 stop_consumer.stop().await.unwrap();
755
756 {
758 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
759 assert!(!reg.contains_key("cleanup"));
760 }
761
762 handle.abort();
763 }
764
765 #[tokio::test]
766 async fn test_direct_consumer_respects_cancellation() {
767 use tokio_util::sync::CancellationToken;
768
769 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
770 let token = CancellationToken::new();
771 let (tx, _rx) = mpsc::channel(16);
772 let ctx = ConsumerContext::new(tx, token.clone());
773
774 let mut consumer = DirectConsumer {
775 name: "cancel-test".to_string(),
776 registry: registry.clone(),
777 cancel: None,
778 handle: None,
779 };
780
781 let handle = tokio::spawn(async move {
782 consumer.start(ctx).await.unwrap();
783 });
784
785 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
786 assert!(
787 registry
788 .lock()
789 .unwrap_or_else(|e| e.into_inner())
790 .contains_key("cancel-test")
791 );
792
793 token.cancel();
794 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
798
799 assert!(
801 !registry
802 .lock()
803 .unwrap_or_else(|e| e.into_inner())
804 .contains_key("cancel-test")
805 );
806
807 let _ = handle.await;
808 }
809
810 #[tokio::test]
811 async fn test_direct_consumer_stop_missing_entry_is_ok() {
812 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
813 let mut consumer = DirectConsumer {
814 name: "never-registered".to_string(),
815 registry,
816 cancel: None,
817 handle: None,
818 };
819 let result = consumer.stop().await;
820 assert!(result.is_ok());
821 }
822
823 #[test]
824 fn test_poll_ready_endpoint_not_registered() {
825 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
826 let producer = DirectProducer {
827 name: "missing".to_string(),
828 registry,
829 config: DirectConfig {
830 name: "missing".to_string(),
831 timeout_ms: None,
832 block: None,
833 fail_if_no_consumers: None,
834 bridge_error_handler: None,
835 exchange_pattern: None,
836 },
837 semaphore: Arc::new(Semaphore::new(1)),
838 pending_permit: None,
839 acquire_fut: None,
840 fail_if_no_consumers: None,
841 };
842 let waker = noop_waker();
843 let mut cx = Context::from_waker(&waker);
844 let mut producer = producer;
845 let result = producer.poll_ready(&mut cx);
846 assert!(matches!(
847 result,
848 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
849 ));
850 }
851
852 #[test]
853 fn test_poll_ready_endpoint_registered() {
854 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
855 let (tx, _rx) =
856 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
857 registry.lock().unwrap().insert("active".to_string(), tx);
858 let producer = DirectProducer {
859 name: "active".to_string(),
860 registry,
861 config: DirectConfig {
862 name: "active".to_string(),
863 timeout_ms: None,
864 block: None,
865 fail_if_no_consumers: None,
866 bridge_error_handler: None,
867 exchange_pattern: None,
868 },
869 semaphore: Arc::new(Semaphore::new(1)),
870 pending_permit: None,
871 acquire_fut: None,
872 fail_if_no_consumers: None,
873 };
874 let waker = noop_waker();
875 let mut cx = Context::from_waker(&waker);
876 let mut producer = producer;
877 let result = producer.poll_ready(&mut cx);
878 assert!(matches!(result, Poll::Ready(Ok(()))));
879 }
880
881 #[test]
882 fn test_poll_ready_allows_missing_consumer_when_fail_if_no_consumers_false() {
883 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
884 let producer = DirectProducer {
885 name: "missing-ok".to_string(),
886 registry,
887 config: DirectConfig {
888 name: "missing-ok".to_string(),
889 timeout_ms: None,
890 block: None,
891 fail_if_no_consumers: Some(false),
892 bridge_error_handler: None,
893 exchange_pattern: None,
894 },
895 semaphore: Arc::new(Semaphore::new(1)),
896 pending_permit: None,
897 acquire_fut: None,
898 fail_if_no_consumers: Some(false),
899 };
900
901 let waker = noop_waker();
902 let mut cx = Context::from_waker(&waker);
903 let mut producer = producer;
904 let result = producer.poll_ready(&mut cx);
905 assert!(matches!(result, Poll::Ready(Ok(()))));
906 }
907
908 #[test]
909 fn test_poll_ready_channel_closed() {
910 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
911 let (tx, rx) =
912 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
913 drop(rx);
914 registry.lock().unwrap().insert("closed".to_string(), tx);
915 let producer = DirectProducer {
916 name: "closed".to_string(),
917 registry,
918 config: DirectConfig {
919 name: "closed".to_string(),
920 timeout_ms: None,
921 block: None,
922 fail_if_no_consumers: None,
923 bridge_error_handler: None,
924 exchange_pattern: None,
925 },
926 semaphore: Arc::new(Semaphore::new(1)),
927 pending_permit: None,
928 acquire_fut: None,
929 fail_if_no_consumers: None,
930 };
931 let waker = noop_waker();
932 let mut cx = Context::from_waker(&waker);
933 let mut producer = producer;
934 let result = producer.poll_ready(&mut cx);
935 assert!(matches!(
936 result,
937 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
938 ));
939 }
940
941 #[tokio::test]
942 async fn test_direct_stop_cancels_loop() {
943 use tokio_util::sync::CancellationToken;
944
945 let component = DirectComponent::new();
946 let endpoint = component
947 .create_endpoint("direct:stop-test", &NoOpComponentContext)
948 .unwrap();
949 let mut consumer = endpoint.create_consumer().unwrap();
950
951 let token = CancellationToken::new();
952 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
953 let ctx = ConsumerContext::new(route_tx, token.clone());
954
955 let handle = tokio::spawn(async move {
959 consumer.start(ctx).await.unwrap();
960 });
961
962 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
963 assert!(
964 component
965 .registry
966 .lock()
967 .unwrap_or_else(|e| e.into_inner())
968 .contains_key("stop-test")
969 );
970
971 let mut stop_consumer = DirectConsumer {
973 name: "stop-test".to_string(),
974 registry: Arc::clone(&component.registry),
975 cancel: Some(token.clone()),
976 handle: None,
977 };
978 stop_consumer.stop().await.unwrap();
979
980 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await;
982 assert!(result.is_ok(), "Consumer loop did not stop within 2s");
983
984 assert!(
986 !component
987 .registry
988 .lock()
989 .unwrap_or_else(|e| e.into_inner())
990 .contains_key("stop-test")
991 );
992 }
993
994 #[tokio::test]
995 async fn test_direct_producer_timeout() {
996 let component = DirectComponent::new();
997 let endpoint = component
998 .create_endpoint("direct:timeout-test", &NoOpComponentContext)
999 .unwrap();
1000 let mut consumer = endpoint.create_consumer().unwrap();
1001
1002 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
1004 let token = tokio_util::sync::CancellationToken::new();
1005 let ctx = ConsumerContext::new(route_tx, token.clone());
1006 tokio::spawn(async move {
1007 consumer.start(ctx).await.unwrap();
1008 });
1009
1010 tokio::spawn(async move {
1012 let mut held_reply: Vec<oneshot::Sender<Result<Exchange, CamelError>>> = Vec::new();
1013 while let Some(envelope) = route_rx.recv().await {
1014 held_reply.push(envelope.reply_tx.unwrap());
1015 }
1016 drop(held_reply);
1017 });
1018
1019 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1020
1021 let _ = test_producer_ctx();
1023 let _producer_endpoint = component
1024 .create_endpoint("direct:timeout-test", &NoOpComponentContext)
1025 .unwrap();
1026 let producer = DirectProducer {
1027 name: "timeout-test".to_string(),
1028 registry: Arc::clone(&component.registry),
1029 config: DirectConfig {
1030 name: "timeout-test".to_string(),
1031 timeout_ms: Some(100), block: None,
1033 fail_if_no_consumers: None,
1034 bridge_error_handler: None,
1035 exchange_pattern: None,
1036 },
1037 semaphore: Arc::new(Semaphore::new(1)),
1038 pending_permit: None,
1039 acquire_fut: None,
1040 fail_if_no_consumers: None,
1041 };
1042
1043 let exchange = Exchange::new(Message::new("test"));
1044 let mut svc = producer;
1045 let _ = svc.poll_ready(&mut Context::from_waker(&noop_waker()));
1046 let result = svc.call(exchange).await;
1047 assert!(result.is_err(), "Expected timeout error");
1048 assert!(
1049 result.unwrap_err().to_string().contains("timed out"),
1050 "Expected timeout message"
1051 );
1052
1053 token.cancel();
1054 }
1055
1056 #[test]
1057 fn test_empty_endpoint_name_rejected() {
1058 let result = DirectConfig::from_uri("direct:");
1059 if let Ok(config) = result {
1061 assert!(
1062 validate_name(&config.name).is_err(),
1063 "expected validation error for empty name"
1064 );
1065 }
1066 let component = DirectComponent::new();
1068 let result = component.create_endpoint("direct:", &NoOpComponentContext);
1069 assert!(result.is_err(), "empty endpoint name must be rejected");
1070 }
1071
1072 #[test]
1073 fn test_whitespace_endpoint_name_rejected() {
1074 let result = DirectConfig::from_uri("direct:my endpoint");
1075 if let Ok(config) = result {
1076 assert!(
1077 validate_name(&config.name).is_err(),
1078 "expected validation error for whitespace in name"
1079 );
1080 }
1081 let component = DirectComponent::new();
1082 let result = component.create_endpoint("direct:my endpoint", &NoOpComponentContext);
1083 assert!(result.is_err(), "whitespace endpoint name must be rejected");
1084 }
1085
1086 #[test]
1087 fn test_valid_endpoint_name_accepted() {
1088 let component = DirectComponent::new();
1089 let result = component.create_endpoint("direct:my-endpoint", &NoOpComponentContext);
1090 assert!(result.is_ok(), "valid endpoint name should be accepted");
1091 }
1092}