camel_component_direct/
lib.rs1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use async_trait::async_trait;
8use tokio::sync::{Mutex, mpsc, oneshot};
9use tower::Service;
10
11use camel_component_api::UriConfig;
12use camel_component_api::{BoxProcessor, CamelError, Exchange};
13use camel_component_api::{Component, Consumer, ConsumerContext, Endpoint, ProducerContext};
14
15type DirectSender = mpsc::Sender<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>;
23type DirectRegistry = Arc<Mutex<HashMap<String, DirectSender>>>;
24
25#[derive(Debug, Clone, UriConfig)]
35#[uri_scheme = "direct"]
36#[uri_config(crate = "camel_component_api")]
37pub struct DirectConfig {
38 pub name: String,
40}
41
42pub struct DirectComponent {
54 registry: DirectRegistry,
55}
56
57impl DirectComponent {
58 pub fn new() -> Self {
59 Self {
60 registry: Arc::new(Mutex::new(HashMap::new())),
61 }
62 }
63}
64
65impl Default for DirectComponent {
66 fn default() -> Self {
67 Self::new()
68 }
69}
70
71impl Component for DirectComponent {
72 fn scheme(&self) -> &str {
73 "direct"
74 }
75
76 fn create_endpoint(&self, uri: &str) -> Result<Box<dyn Endpoint>, CamelError> {
77 let config = DirectConfig::from_uri(uri)?;
78 Ok(Box::new(DirectEndpoint {
79 uri: uri.to_string(),
80 name: config.name,
81 registry: Arc::clone(&self.registry),
82 }))
83 }
84}
85
86struct DirectEndpoint {
91 uri: String,
92 name: String,
93 registry: DirectRegistry,
94}
95
96impl Endpoint for DirectEndpoint {
97 fn uri(&self) -> &str {
98 &self.uri
99 }
100
101 fn create_consumer(&self) -> Result<Box<dyn Consumer>, CamelError> {
102 Ok(Box::new(DirectConsumer {
103 name: self.name.clone(),
104 registry: Arc::clone(&self.registry),
105 }))
106 }
107
108 fn create_producer(&self, _ctx: &ProducerContext) -> Result<BoxProcessor, CamelError> {
109 Ok(BoxProcessor::new(DirectProducer {
110 name: self.name.clone(),
111 registry: Arc::clone(&self.registry),
112 }))
113 }
114}
115
116struct DirectConsumer {
123 name: String,
124 registry: DirectRegistry,
125}
126
127#[async_trait]
128impl Consumer for DirectConsumer {
129 async fn start(&mut self, context: ConsumerContext) -> Result<(), CamelError> {
130 let (tx, mut rx) =
132 mpsc::channel::<(Exchange, oneshot::Sender<Result<Exchange, CamelError>>)>(32);
133
134 {
136 let mut reg = self.registry.lock().await;
137 reg.insert(self.name.clone(), tx);
138 }
139
140 loop {
142 tokio::select! {
143 _ = context.cancelled() => {
144 tracing::debug!("Direct '{}' received cancellation, stopping", self.name);
145 break;
146 }
147 msg = rx.recv() => {
148 match msg {
149 Some((exchange, reply_tx)) => {
150 let result = context.send_and_wait(exchange).await;
151 let _ = reply_tx.send(result);
152 }
153 None => break,
154 }
155 }
156 }
157 }
158
159 {
161 let mut reg = self.registry.lock().await;
162 reg.remove(&self.name);
163 }
164
165 Ok(())
166 }
167
168 async fn stop(&mut self) -> Result<(), CamelError> {
169 let mut reg = self.registry.lock().await;
171 reg.remove(&self.name);
172 Ok(())
173 }
174}
175
176#[derive(Clone)]
183struct DirectProducer {
184 name: String,
185 registry: DirectRegistry,
186}
187
188impl Service<Exchange> for DirectProducer {
189 type Response = Exchange;
190 type Error = CamelError;
191 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
192
193 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
194 Poll::Ready(Ok(()))
195 }
196
197 fn call(&mut self, exchange: Exchange) -> Self::Future {
198 let name = self.name.clone();
199 let registry = Arc::clone(&self.registry);
200
201 Box::pin(async move {
202 let reg = registry.lock().await;
203 let sender = reg.get(&name).ok_or_else(|| {
204 CamelError::EndpointCreationFailed(format!(
205 "no consumer registered for direct:{name}"
206 ))
207 })?;
208
209 let (reply_tx, reply_rx) = oneshot::channel();
210 sender
211 .send((exchange, reply_tx))
212 .await
213 .map_err(|_| CamelError::ChannelClosed)?;
214
215 drop(reg);
217
218 reply_rx.await.map_err(|_| CamelError::ChannelClosed)?
220 })
221 }
222}
223
224#[cfg(test)]
229mod tests {
230 use super::*;
231 use camel_component_api::ExchangeEnvelope;
232 use camel_component_api::Message;
233 use tower::ServiceExt;
234
235 fn test_producer_ctx() -> ProducerContext {
236 ProducerContext::new()
237 }
238
239 #[test]
240 fn test_direct_component_scheme() {
241 let component = DirectComponent::new();
242 assert_eq!(component.scheme(), "direct");
243 }
244
245 #[test]
246 fn test_direct_creates_endpoint() {
247 let component = DirectComponent::new();
248 let endpoint = component.create_endpoint("direct:foo");
249 assert!(endpoint.is_ok());
250 }
251
252 #[test]
253 fn test_direct_wrong_scheme() {
254 let component = DirectComponent::new();
255 let result = component.create_endpoint("timer:tick");
256 assert!(result.is_err());
257 }
258
259 #[test]
260 fn test_direct_endpoint_creates_consumer() {
261 let component = DirectComponent::new();
262 let endpoint = component.create_endpoint("direct:foo").unwrap();
263 assert!(endpoint.create_consumer().is_ok());
264 }
265
266 #[test]
267 fn test_direct_endpoint_creates_producer() {
268 let ctx = test_producer_ctx();
269 let component = DirectComponent::new();
270 let endpoint = component.create_endpoint("direct:foo").unwrap();
271 assert!(endpoint.create_producer(&ctx).is_ok());
272 }
273
274 #[tokio::test]
275 async fn test_direct_producer_no_consumer_registered() {
276 let ctx = test_producer_ctx();
277 let component = DirectComponent::new();
278 let endpoint = component.create_endpoint("direct:missing").unwrap();
279 let producer = endpoint.create_producer(&ctx).unwrap();
280
281 let exchange = Exchange::new(Message::new("test"));
282 let result = producer.oneshot(exchange).await;
283 assert!(result.is_err());
284 }
285
286 #[tokio::test]
287 async fn test_direct_producer_consumer_roundtrip() {
288 let component = DirectComponent::new();
289
290 let consumer_endpoint = component.create_endpoint("direct:test").unwrap();
292 let mut consumer = consumer_endpoint.create_consumer().unwrap();
293
294 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
296 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
297
298 tokio::spawn(async move {
300 consumer.start(ctx).await.unwrap();
301 });
302
303 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
305
306 tokio::spawn(async move {
308 while let Some(envelope) = route_rx.recv().await {
309 let ExchangeEnvelope { exchange, reply_tx } = envelope;
310 if let Some(tx) = reply_tx {
311 let _ = tx.send(Ok(exchange));
312 }
313 }
314 });
315
316 let ctx = test_producer_ctx();
318 let producer_endpoint = component.create_endpoint("direct:test").unwrap();
319 let producer = producer_endpoint.create_producer(&ctx).unwrap();
320
321 let exchange = Exchange::new(Message::new("hello direct"));
322 let result = producer.oneshot(exchange).await;
323
324 assert!(result.is_ok());
325 let reply = result.unwrap();
326 assert_eq!(reply.input.body.as_text(), Some("hello direct"));
327 }
328
329 #[tokio::test]
330 async fn test_direct_propagates_error_when_no_handler() {
331 let component = DirectComponent::new();
332
333 let consumer_endpoint = component.create_endpoint("direct:err-test").unwrap();
334 let mut consumer = consumer_endpoint.create_consumer().unwrap();
335
336 let (route_tx, mut route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
337 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
338
339 tokio::spawn(async move {
340 consumer.start(ctx).await.unwrap();
341 });
342
343 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
344
345 tokio::spawn(async move {
347 while let Some(envelope) = route_rx.recv().await {
348 if let Some(tx) = envelope.reply_tx {
349 let _ = tx.send(Err(CamelError::ProcessorError("subroute failed".into())));
350 }
351 }
352 });
353
354 let ctx = test_producer_ctx();
355 let producer_endpoint = component.create_endpoint("direct:err-test").unwrap();
356 let producer = producer_endpoint.create_producer(&ctx).unwrap();
357
358 let exchange = Exchange::new(Message::new("test"));
359 let result = producer.oneshot(exchange).await;
360 assert!(result.is_err());
361 assert!(matches!(result.unwrap_err(), CamelError::ProcessorError(_)));
362 }
363
364 #[tokio::test]
365 async fn test_direct_consumer_stop_unregisters() {
366 let component = DirectComponent::new();
367 let endpoint = component.create_endpoint("direct:cleanup").unwrap();
368
369 let mut consumer = endpoint.create_consumer().unwrap();
371
372 let (route_tx, _route_rx) = mpsc::channel::<ExchangeEnvelope>(16);
373 let ctx = ConsumerContext::new(route_tx, tokio_util::sync::CancellationToken::new());
374
375 let handle = tokio::spawn(async move {
377 consumer.start(ctx).await.unwrap();
378 });
379
380 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
381
382 {
384 let reg = component.registry.lock().await;
385 assert!(reg.contains_key("cleanup"));
386 }
387
388 let mut stop_consumer = DirectConsumer {
390 name: "cleanup".to_string(),
391 registry: Arc::clone(&component.registry),
392 };
393 stop_consumer.stop().await.unwrap();
394
395 {
397 let reg = component.registry.lock().await;
398 assert!(!reg.contains_key("cleanup"));
399 }
400
401 handle.abort();
402 }
403
404 #[tokio::test]
405 async fn test_direct_consumer_respects_cancellation() {
406 use tokio_util::sync::CancellationToken;
407
408 let registry: DirectRegistry = Arc::new(Mutex::new(HashMap::new()));
409 let token = CancellationToken::new();
410 let (tx, _rx) = mpsc::channel(16);
411 let ctx = ConsumerContext::new(tx, token.clone());
412
413 let mut consumer = DirectConsumer {
414 name: "cancel-test".to_string(),
415 registry: registry.clone(),
416 };
417
418 let handle = tokio::spawn(async move {
419 consumer.start(ctx).await.unwrap();
420 });
421
422 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
423 assert!(registry.lock().await.contains_key("cancel-test"));
424
425 token.cancel();
426 let result = tokio::time::timeout(std::time::Duration::from_secs(1), handle).await;
427 assert!(
428 result.is_ok(),
429 "Consumer should have stopped after cancellation"
430 );
431
432 assert!(!registry.lock().await.contains_key("cancel-test"));
434 }
435}