1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::{Arc, Mutex};
10use std::task::{Context, Poll};
11
12use async_trait::async_trait;
13use tokio::sync::{mpsc, oneshot};
14use tower::Service;
15
16use camel_component_api::UriConfig;
17use camel_component_api::{BoxProcessor, CamelError, Exchange};
18use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
19use tracing::{debug, error, info};
20
21type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
29type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
30
31#[derive(Debug, Clone, UriConfig)]
41#[uri_scheme = "direct"]
42#[uri_config(crate = "camel_component_api")]
43pub struct DirectConfig {
44 pub name: String,
46}
47
48pub struct DirectComponent {
60 registry: DirectRegistry,
61}
62
63impl DirectComponent {
64 pub fn new() -> Self {
65 Self {
66 registry: Arc::new(Mutex::new(HashMap::new())),
67 }
68 }
69}
70
71impl Default for DirectComponent {
72 fn default() -> Self {
73 Self::new()
74 }
75}
76
77impl Component for DirectComponent {
78 fn scheme(&self) -> &str {
79 "direct"
80 }
81
82 fn create_endpoint(
83 &self,
84 uri: &str,
85 _ctx: &dyn camel_component_api::ComponentContext,
86 ) -> Result<Box<dyn Endpoint>, CamelError> {
87 let config = DirectConfig::from_uri(uri)?;
88 if config.name.trim().is_empty() {
89 return Err(CamelError::InvalidUri(
90 "direct: endpoint name must not be empty".to_string(),
91 ));
92 }
93 let name = config.name.clone();
94 debug!(endpoint_name = %name, "direct endpoint created");
95 Ok(Box::new(DirectEndpoint {
96 uri: uri.to_string(),
97 name: config.name,
98 registry: Arc::clone(&self.registry),
99 }))
100 }
101}
102
103struct DirectEndpoint {
108 uri: String,
109 name: String,
110 registry: DirectRegistry,
111}
112
113impl Endpoint for DirectEndpoint {
114 fn uri(&self) -> &str {
115 &self.uri
116 }
117
118 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
119 Ok(Box::new(DirectConsumer {
120 name: self.name.clone(),
121 registry: Arc::clone(&self.registry),
122 }))
123 }
124
125 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
126 Ok(BoxProcessor::new(DirectProducer {
127 name: self.name.clone(),
128 registry: Arc::clone(&self.registry),
129 }))
130 }
131}
132
133struct DirectConsumer {
140 name: String,
141 registry: DirectRegistry,
142}
143
144#[async_trait]
145impl Consumer for DirectConsumer {
146 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
147 let (tx, mut rx) =
149 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
150
151 {
153 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
154 reg.insert(self.name.clone(), tx);
155 }
156
157 info!(endpoint_name = %self.name, "direct consumer started");
158
159 loop {
161 tokio::select! {
162 _ = context.cancelled() => {
163 debug!(endpoint_name = %self.name, "direct consumer received cancellation");
164 break;
165 }
166 msg = rx.recv() => {
167 match msg {
168 Some((exchange, reply_tx)) => {
169 let result = context.send_and_wait(exchange).await;
170 let _ = reply_tx.send(result);
171 }
172 None => break,
173 }
174 }
175 }
176 }
177
178 {
180 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
181 reg.remove(&self.name);
182 }
183
184 debug!(endpoint_name = %self.name, "direct consumer stopped");
185
186 Ok(())
187 }
188
189 async fn stop(&mut self) -> Result<(), CamelError> {
190 let mut reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
192 reg.remove(&self.name);
193 debug!(endpoint_name = %self.name, "direct consumer stopped");
194 Ok(())
195 }
196}
197
198#[derive(Clone)]
205struct DirectProducer {
206 name: String,
207 registry: DirectRegistry,
208}
209
210impl Service<Exchange> for DirectProducer {
211 type Response = Exchange;
212 type Error = CamelError;
213 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
214
215 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216 let reg = self.registry.lock().unwrap_or_else(|e| e.into_inner());
217 match reg.get(&self.name) {
218 None => Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
219 "direct endpoint '{}' not registered",
220 self.name
221 )))),
222 Some(sender) if sender.is_closed() => {
223 Poll::Ready(Err(CamelError::EndpointCreationFailed(format!(
224 "direct endpoint '{}' channel closed",
225 self.name
226 ))))
227 }
228 Some(_) => Poll::Ready(Ok(())),
229 }
230 }
231
232 fn call(&mut self, exchange: Exchange) -> Self::Future {
233 let name = self.name.clone();
234 let registry = Arc::clone(&self.registry);
235
236 Box::pin(async move {
237 let sender = {
238 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
239 reg.get(&name)
240 .ok_or_else(|| {
241 let err = CamelError::EndpointCreationFailed(format!(
242 "no consumer registered for direct:{name}"
243 ));
244 error!(endpoint_name = %name, error = %err, "direct send failed");
245 err
246 })?
247 .clone()
248 };
249
250 let (reply_tx, reply_rx) = oneshot::channel();
251 sender.send((exchange, reply_tx)).await.map_err(|err| {
252 error!(endpoint_name = %name, error = %err, "direct send failed");
253 CamelError::ChannelClosed
254 })?;
255
256 let result = reply_rx.await.map_err(|err| {
257 error!(endpoint_name = %name, error = %err, "direct send failed");
258 CamelError::ChannelClosed
259 })?;
260
261 debug!(endpoint_name = %name, "direct message sent");
262 result
263 })
264 }
265}
266
267#[cfg(test)]
272mod tests {
273 use super::*;
274 use camel_component_api::ExchangeEnvelope;
275 use camel_component_api::Message;
276 use camel_component_api::NoOpComponentContext;
277 use std::task::RawWakerVTable;
278 use tower::ServiceExt;
279
280 fn noop_waker() -> std::task::Waker {
281 const VTABLE: RawWakerVTable = RawWakerVTable::new(|_| RAW, |_| {}, |_| {}, |_| {});
282 const RAW: std::task::RawWaker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
283 unsafe { std::task::Waker::from_raw(RAW) }
284 }
285
286 fn test_producer_ctx() -> ProducerContext {
287 ProducerContext::new()
288 }
289
290 #[test]
291 fn test_direct_component_scheme() {
292 let component = DirectComponent::new();
293 assert_eq!(component.scheme(), "direct");
294 }
295
296 #[test]
297 fn test_direct_component_default() {
298 let component = DirectComponent::default();
299 assert_eq!(component.scheme(), "direct");
300 }
301
302 #[test]
303 fn test_direct_config_from_uri() {
304 let config = DirectConfig::from_uri("direct:orders").unwrap();
305 assert_eq!(config.name, "orders");
306 }
307
308 #[test]
309 fn test_direct_endpoint_uri() {
310 let component = DirectComponent::new();
311 let endpoint = component
312 .create_endpoint("direct:uri-check", &NoOpComponentContext)
313 .unwrap();
314 assert_eq!(endpoint.uri(), "direct:uri-check");
315 }
316
317 #[test]
318 fn test_direct_creates_endpoint() {
319 let component = DirectComponent::new();
320 let endpoint = component.create_endpoint("direct:foo", &NoOpComponentContext);
321 assert!(endpoint.is_ok());
322 }
323
324 #[test]
325 fn test_direct_wrong_scheme() {
326 let component = DirectComponent::new();
327 let result = component.create_endpoint("timer:tick", &NoOpComponentContext);
328 assert!(result.is_err());
329 }
330
331 #[test]
332 fn test_direct_endpoint_creates_consumer() {
333 let component = DirectComponent::new();
334 let endpoint = component
335 .create_endpoint("direct:foo", &NoOpComponentContext)
336 .unwrap();
337 assert!(endpoint.create_consumer().is_ok());
338 }
339
340 #[test]
341 fn test_direct_endpoint_creates_producer() {
342 let ctx = test_producer_ctx();
343 let component = DirectComponent::new();
344 let endpoint = component
345 .create_endpoint("direct:foo", &NoOpComponentContext)
346 .unwrap();
347 assert!(endpoint.create_producer(&ctx).is_ok());
348 }
349
350 #[test]
351 fn test_direct_empty_name_rejected() {
352 let component = DirectComponent::new();
353 match component.create_endpoint("direct:", &NoOpComponentContext) {
354 Err(e) => assert!(
355 e.to_string().contains("must not be empty"),
356 "unexpected error: {e}"
357 ),
358 Ok(_) => panic!("expected error for empty name"),
359 }
360 }
361
362 #[tokio::test]
363 async fn test_direct_producer_no_consumer_registered() {
364 let ctx = test_producer_ctx();
365 let component = DirectComponent::new();
366 let endpoint = component
367 .create_endpoint("direct:missing", &NoOpComponentContext)
368 .unwrap();
369 let producer = endpoint.create_producer(&ctx).unwrap();
370
371 let exchange = Exchange::new(Message::new("test"));
372 let result = producer.oneshot(exchange).await;
373 assert!(result.is_err());
374 }
375
376 #[tokio::test]
377 async fn test_direct_producer_consumer_roundtrip() {
378 let component = DirectComponent::new();
379
380 let consumer_endpoint = component
382 .create_endpoint("direct:test", &NoOpComponentContext)
383 .unwrap();
384 let mut consumer = consumer_endpoint.create_consumer().unwrap();
385
386 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
388 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
389
390 tokio::spawn(async move {
392 consumer.start(ctx).await.unwrap();
393 });
394
395 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
397
398 tokio::spawn(async move {
400 while let Some(envelope) = route_rx.recv().await {
401 let ExchangeEnvelope { exchange, reply_tx } = envelope;
402 if let Some(tx) = reply_tx {
403 let _ = tx.send(Ok(exchange));
404 }
405 }
406 });
407
408 let ctx = test_producer_ctx();
410 let producer_endpoint = component
411 .create_endpoint("direct:test", &NoOpComponentContext)
412 .unwrap();
413 let producer = producer_endpoint.create_producer(&ctx).unwrap();
414
415 let exchange = Exchange::new(Message::new("hello direct"));
416 let result = producer.oneshot(exchange).await;
417
418 assert!(result.is_ok());
419 let reply = result.unwrap();
420 assert_eq!(reply.input.body.as_text(), Some("hello direct"));
421 }
422
423 #[tokio::test]
424 async fn test_direct_propagates_error_when_no_handler() {
425 let component = DirectComponent::new();
426
427 let consumer_endpoint = component
428 .create_endpoint("direct:err-test", &NoOpComponentContext)
429 .unwrap();
430 let mut consumer = consumer_endpoint.create_consumer().unwrap();
431
432 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
433 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
434
435 tokio::spawn(async move {
436 consumer.start(ctx).await.unwrap();
437 });
438
439 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
440
441 tokio::spawn(async move {
443 while let Some(envelope) = route_rx.recv().await {
444 if let Some(tx) = envelope.reply_tx {
445 let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
446 }
447 }
448 });
449
450 let ctx = test_producer_ctx();
451 let producer_endpoint = component
452 .create_endpoint("direct:err-test", &NoOpComponentContext)
453 .unwrap();
454 let producer = producer_endpoint.create_producer(&ctx).unwrap();
455
456 let exchange = Exchange::new(Message::new("test"));
457 let result = producer.oneshot(exchange).await;
458 assert!(result.is_err());
459 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
460 }
461
462 #[tokio::test]
463 async fn test_direct_consumer_stop_unregisters() {
464 let component = DirectComponent::new();
465 let endpoint = component
466 .create_endpoint("direct:cleanup", &NoOpComponentContext)
467 .unwrap();
468
469 let mut consumer = endpoint.create_consumer().unwrap();
471
472 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
473 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
474
475 let handle = tokio::spawn(async move {
477 consumer.start(ctx).await.unwrap();
478 });
479
480 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
481
482 {
484 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
485 assert!(reg.contains_key("cleanup"));
486 }
487
488 let mut stop_consumer = DirectConsumer {
490 name: "cleanup".to_string(),
491 registry: Arc::clone(&component.registry),
492 };
493 stop_consumer.stop().await.unwrap();
494
495 {
497 let reg = component.registry.lock().unwrap_or_else(|e| e.into_inner());
498 assert!(!reg.contains_key("cleanup"));
499 }
500
501 handle.abort();
502 }
503
504 #[tokio::test]
505 async fn test_direct_consumer_respects_cancellation() {
506 use tokio_util::sync::CancellationToken;
507
508 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
509 let token = CancellationToken::new();
510 let (tx, _rx) = mpsc::channel(16);
511 let ctx = ConsumerContext::new(tx, token.clone());
512
513 let mut consumer = DirectConsumer {
514 name: "cancel-test".to_string(),
515 registry: registry.clone(),
516 };
517
518 let handle = tokio::spawn(async move {
519 consumer.start(ctx).await.unwrap();
520 });
521
522 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
523 assert!(
524 registry
525 .lock()
526 .unwrap_or_else(|e| e.into_inner())
527 .contains_key("cancel-test")
528 );
529
530 token.cancel();
531 let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
532 assert!(
533 result.is_ok(),
534 "Consumer should have stopped after cancellation"
535 );
536
537 assert!(
539 !registry
540 .lock()
541 .unwrap_or_else(|e| e.into_inner())
542 .contains_key("cancel-test")
543 );
544 }
545
546 #[tokio::test]
547 async fn test_direct_consumer_stop_missing_entry_is_ok() {
548 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
549 let mut consumer = DirectConsumer {
550 name: "never-registered".to_string(),
551 registry,
552 };
553 let result = consumer.stop().await;
554 assert!(result.is_ok());
555 }
556
557 #[test]
558 fn test_poll_ready_endpoint_not_registered() {
559 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
560 let producer = DirectProducer {
561 name: "missing".to_string(),
562 registry,
563 };
564 let waker = noop_waker();
565 let mut cx = Context::from_waker(&waker);
566 let mut producer = producer;
567 let result = producer.poll_ready(&mut cx);
568 assert!(matches!(
569 result,
570 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
571 ));
572 }
573
574 #[test]
575 fn test_poll_ready_endpoint_registered() {
576 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
577 let (tx, _rx) =
578 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
579 registry.lock().unwrap().insert("active".to_string(), tx);
580 let producer = DirectProducer {
581 name: "active".to_string(),
582 registry,
583 };
584 let waker = noop_waker();
585 let mut cx = Context::from_waker(&waker);
586 let mut producer = producer;
587 let result = producer.poll_ready(&mut cx);
588 assert!(matches!(result, Poll::Ready(Ok(()))));
589 }
590
591 #[test]
592 fn test_poll_ready_channel_closed() {
593 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
594 let (tx, rx) =
595 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(1);
596 drop(rx);
597 registry.lock().unwrap().insert("closed".to_string(), tx);
598 let producer = DirectProducer {
599 name: "closed".to_string(),
600 registry,
601 };
602 let waker = noop_waker();
603 let mut cx = Context::from_waker(&waker);
604 let mut producer = producer;
605 let result = producer.poll_ready(&mut cx);
606 assert!(matches!(
607 result,
608 Poll::Ready(Err(CamelError::EndpointCreationFailed(_)))
609 ));
610 }
611}