1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tokio::task::JoinSet;
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig};
10use camel_api::recipient_list::RecipientListConfig;
11use camel_api::{Body, CamelError, Exchange, Value};
12
13use crate::endpoint_pipeline::EndpointPipelineService;
14
15#[derive(Clone)]
16pub struct RecipientListService {
17 config: RecipientListConfig,
18 pipeline: EndpointPipelineService,
19}
20
21impl RecipientListService {
22 pub fn new(
23 config: RecipientListConfig,
24 endpoint_resolver: camel_api::EndpointResolver,
25 ) -> Self {
26 let pipeline_config = EndpointPipelineConfig {
27 cache_size: EndpointPipelineConfig::from_signed(1000),
28 ignore_invalid_endpoints: false,
29 };
30 Self {
31 config,
32 pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
33 }
34 }
35}
36
37impl Service<Exchange> for RecipientListService {
38 type Response = Exchange;
39 type Error = CamelError;
40 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
41
42 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
43 Poll::Ready(Ok(()))
44 }
45
46 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
47 let config = self.config.clone();
48 let pipeline = self.pipeline.clone();
49
50 Box::pin(async move {
51 let uris_raw = (config.expression)(&exchange);
52 if uris_raw.is_empty() {
53 return Ok(exchange);
54 }
55
56 let uris: Vec<&str> = uris_raw
57 .split(&config.delimiter)
58 .map(|s| s.trim())
59 .filter(|s| !s.is_empty())
60 .collect();
61 if uris.is_empty() {
62 return Ok(exchange);
63 }
64
65 if config.parallel {
66 let original_for_aggregate = exchange.clone();
67 let mut endpoints_to_call = Vec::with_capacity(uris.len());
68 for uri in &uris {
69 if let Some(endpoint) = pipeline.resolve(uri)? {
70 endpoints_to_call.push((uri.to_string(), endpoint));
71 }
72 }
73
74 let mut results: Vec<Exchange> = Vec::with_capacity(endpoints_to_call.len());
75 let mut join_set = JoinSet::new();
76 let mut iter = endpoints_to_call.into_iter();
77 let raw_limit = config.parallel_limit.unwrap_or(results.capacity());
78 let limit = raw_limit.max(1).min(results.capacity().max(1));
79
80 for _ in 0..limit {
81 if let Some((uri, mut endpoint)) = iter.next() {
82 let mut cloned = original_for_aggregate.clone();
83 cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
84 join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
85 }
86 }
87
88 while let Some(result) = join_set.join_next().await {
89 match result {
90 Ok(Ok(ex)) => results.push(ex),
91 Ok(Err(e)) if config.stop_on_exception => {
92 join_set.abort_all();
93 return Err(e);
94 }
95 _ => {}
96 }
97
98 if let Some((uri, mut endpoint)) = iter.next() {
99 let mut cloned = original_for_aggregate.clone();
100 cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
101 join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
102 }
103 }
104
105 exchange = aggregate_results(config.strategy, original_for_aggregate, results);
106 } else {
107 let mut results: Vec<Exchange> = Vec::new();
108 let original_for_aggregate = exchange.clone();
109 for uri in &uris {
110 let endpoint = match pipeline.resolve(uri)? {
111 Some(e) => e,
112 None => continue,
113 };
114 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
115 let mut endpoint = endpoint;
116 let result = endpoint.ready().await?.call(exchange.clone()).await;
117 match result {
118 Ok(ex) => {
119 results.push(ex.clone());
120 exchange = ex;
121 }
122 Err(e) if config.stop_on_exception => return Err(e),
123 Err(_) => continue,
124 }
125 }
126 exchange = aggregate_results(config.strategy, original_for_aggregate, results);
127 }
128
129 Ok(exchange)
130 })
131 }
132}
133
134fn aggregate_results(
135 strategy: camel_api::MulticastStrategy,
136 original: Exchange,
137 results: Vec<Exchange>,
138) -> Exchange {
139 match strategy {
140 camel_api::MulticastStrategy::LastWins => results.into_iter().last().unwrap_or(original),
141 camel_api::MulticastStrategy::Original => original,
142 camel_api::MulticastStrategy::CollectAll => {
143 let bodies: Vec<Value> = results
144 .iter()
145 .map(|ex| match &ex.input.body {
146 Body::Text(s) => Value::String(s.clone()),
147 Body::Json(v) => v.clone(),
148 Body::Xml(s) => Value::String(s.clone()),
149 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
150 Body::Empty => Value::Null,
151 Body::Stream(s) => serde_json::json!({
152 "_stream": {
153 "origin": s.metadata.origin,
154 "placeholder": true,
155 "hint": "Materialize exchange body with .into_bytes() before recipient-list aggregation"
156 }
157 }),
158 })
159 .collect();
160 let mut result = results.into_iter().last().unwrap_or(original);
161 result.input.body = camel_api::Body::from(Value::Array(bodies));
162 result
163 }
164 camel_api::MulticastStrategy::Custom(fn_) => {
165 results.into_iter().fold(original, |acc, ex| fn_(acc, ex))
166 }
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use camel_api::MulticastStrategy;
174 use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Message};
175 use std::collections::HashMap;
176 use std::sync::Arc;
177 use std::sync::atomic::{AtomicUsize, Ordering};
178 use std::time::{Duration, Instant};
179 use tokio::sync::Mutex;
180 use tokio::time::sleep;
181
182 fn mock_resolver() -> camel_api::EndpointResolver {
183 Arc::new(|uri: &str| {
184 if uri.starts_with("mock:") {
185 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
186 } else {
187 None
188 }
189 })
190 }
191
192 #[tokio::test]
193 async fn recipient_list_single_destination() {
194 let call_count = Arc::new(AtomicUsize::new(0));
195 let count_clone = call_count.clone();
196
197 let resolver = Arc::new(move |uri: &str| {
198 if uri == "mock:a" {
199 let count = count_clone.clone();
200 Some(BoxProcessor::from_fn(move |ex| {
201 count.fetch_add(1, Ordering::SeqCst);
202 Box::pin(async move { Ok(ex) })
203 }))
204 } else {
205 None
206 }
207 });
208
209 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| "mock:a".to_string()));
210
211 let mut svc = RecipientListService::new(config, resolver);
212 let ex = Exchange::new(Message::new("test"));
213 let result = svc.ready().await.unwrap().call(ex).await;
214
215 assert!(result.is_ok());
216 assert_eq!(call_count.load(Ordering::SeqCst), 1);
217 }
218
219 #[tokio::test]
220 async fn recipient_list_multiple_destinations() {
221 let call_count = Arc::new(AtomicUsize::new(0));
222 let count_clone = call_count.clone();
223
224 let resolver = Arc::new(move |uri: &str| {
225 if uri.starts_with("mock:") {
226 let count = count_clone.clone();
227 Some(BoxProcessor::from_fn(move |ex| {
228 count.fetch_add(1, Ordering::SeqCst);
229 Box::pin(async move { Ok(ex) })
230 }))
231 } else {
232 None
233 }
234 });
235
236 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
237 "mock:a,mock:b,mock:c".to_string()
238 }));
239
240 let mut svc = RecipientListService::new(config, resolver);
241 let ex = Exchange::new(Message::new("test"));
242 let result = svc.ready().await.unwrap().call(ex).await;
243
244 assert!(result.is_ok());
245 assert_eq!(call_count.load(Ordering::SeqCst), 3);
246 }
247
248 #[tokio::test]
249 async fn recipient_list_empty_expression() {
250 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| String::new()));
251
252 let mut svc = RecipientListService::new(config, mock_resolver());
253 let ex = Exchange::new(Message::new("test"));
254 let result = svc.ready().await.unwrap().call(ex).await;
255
256 assert!(result.is_ok());
257 }
258
259 #[tokio::test]
260 async fn recipient_list_invalid_endpoint_error() {
261 let config =
262 RecipientListConfig::new(Arc::new(|_ex: &Exchange| "invalid:endpoint".to_string()));
263
264 let mut svc = RecipientListService::new(config, mock_resolver());
265 let ex = Exchange::new(Message::new("test"));
266 let result = svc.ready().await.unwrap().call(ex).await;
267
268 assert!(result.is_err());
269 assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
270 }
271
272 #[tokio::test]
273 async fn recipient_list_custom_delimiter() {
274 use std::sync::Mutex;
275
276 let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
277
278 let resolver = {
279 let order = order.clone();
280 Arc::new(move |uri: &str| {
281 let order = order.clone();
282 let uri = uri.to_string();
283 Some(BoxProcessor::from_fn(move |ex| {
284 order.lock().unwrap().push(uri.clone());
285 Box::pin(async move { Ok(ex) })
286 }))
287 })
288 };
289
290 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
291 "mock:x|mock:y|mock:z".to_string()
292 }))
293 .delimiter("|");
294
295 let mut svc = RecipientListService::new(config, resolver);
296 let ex = Exchange::new(Message::new("test"));
297 svc.ready().await.unwrap().call(ex).await.unwrap();
298
299 let order = order.lock().unwrap();
300 assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
301 }
302
303 #[tokio::test]
304 async fn recipient_list_expression_evaluated_once() {
305 let expr_count = Arc::new(AtomicUsize::new(0));
306 let expr_count_clone = expr_count.clone();
307
308 let config = RecipientListConfig::new(Arc::new(move |_ex: &Exchange| {
309 expr_count_clone.fetch_add(1, Ordering::SeqCst);
310 "mock:a,mock:b".to_string()
311 }));
312
313 let mut svc = RecipientListService::new(config, mock_resolver());
314 let ex = Exchange::new(Message::new("test"));
315 svc.ready().await.unwrap().call(ex).await.unwrap();
316
317 assert_eq!(
318 expr_count.load(Ordering::SeqCst),
319 1,
320 "Expression must be evaluated exactly once"
321 );
322 }
323
324 #[tokio::test]
325 async fn recipient_list_ignores_empty_uri_tokens() {
326 let call_count = Arc::new(AtomicUsize::new(0));
327 let call_count_clone = call_count.clone();
328
329 let resolver = Arc::new(move |uri: &str| {
330 if uri.starts_with("mock:") {
331 let count = call_count_clone.clone();
332 Some(BoxProcessor::from_fn(move |ex| {
333 count.fetch_add(1, Ordering::SeqCst);
334 Box::pin(async move { Ok(ex) })
335 }))
336 } else {
337 None
338 }
339 });
340
341 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
342 " ,mock:a, ,mock:b,, ".to_string()
343 }));
344
345 let mut svc = RecipientListService::new(config, resolver);
346 let ex = Exchange::new(Message::new("test"));
347 let result = svc.ready().await.unwrap().call(ex).await;
348 assert!(result.is_ok());
349 assert_eq!(call_count.load(Ordering::SeqCst), 2);
350 }
351
352 #[tokio::test]
353 async fn recipient_list_mutation_between_steps() {
354 let resolver = Arc::new(|uri: &str| {
355 if uri == "mock:mutate" {
356 Some(BoxProcessor::from_fn(|mut ex| {
357 ex.input.body = camel_api::Body::Text("mutated".to_string());
358 Box::pin(async move { Ok(ex) })
359 }))
360 } else if uri == "mock:verify" {
361 Some(BoxProcessor::from_fn(|ex| {
362 let body = ex.input.body.as_text().unwrap_or("").to_string();
363 assert_eq!(body, "mutated");
364 Box::pin(async move { Ok(ex) })
365 }))
366 } else {
367 None
368 }
369 });
370
371 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
372 "mock:mutate,mock:verify".to_string()
373 }));
374
375 let mut svc = RecipientListService::new(config, resolver);
376 let ex = Exchange::new(Message::new("original"));
377 let result = svc.ready().await.unwrap().call(ex).await;
378
379 assert!(result.is_ok());
380 }
381
382 #[tokio::test]
383 async fn recipient_list_parallel_executes_concurrently() {
384 let records: Arc<Mutex<Vec<(String, Instant, Instant)>>> = Arc::new(Mutex::new(Vec::new()));
385
386 let resolver = {
387 let records = records.clone();
388 Arc::new(move |uri: &str| {
389 if uri.starts_with("mock:") {
390 let records = records.clone();
391 let uri = uri.to_string();
392 Some(BoxProcessor::from_fn(move |ex| {
393 let records = records.clone();
394 let uri = uri.clone();
395 Box::pin(async move {
396 let start = Instant::now();
397 sleep(Duration::from_millis(100)).await;
398 let end = Instant::now();
399 records.lock().await.push((uri, start, end));
400 Ok(ex)
401 })
402 }))
403 } else {
404 None
405 }
406 })
407 };
408
409 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
410 "mock:a,mock:b,mock:c".to_string()
411 }))
412 .parallel(true);
413
414 let mut svc = RecipientListService::new(config, resolver);
415 let ex = Exchange::new(Message::new("test"));
416 svc.ready().await.unwrap().call(ex).await.unwrap();
417
418 let records = records.lock().await;
419 assert_eq!(records.len(), 3);
420
421 let mut overlap_found = false;
422 for i in 0..records.len() {
423 for j in (i + 1)..records.len() {
424 let (_, a_start, a_end) = records[i];
425 let (_, b_start, b_end) = records[j];
426 if a_start < b_end && b_start < a_end {
427 overlap_found = true;
428 break;
429 }
430 }
431 if overlap_found {
432 break;
433 }
434 }
435
436 assert!(overlap_found);
437 }
438
439 #[tokio::test]
440 async fn recipient_list_parallel_stop_on_exception_returns_error() {
441 let resolver = Arc::new(|uri: &str| {
442 if uri == "mock:err" {
443 Some(BoxProcessor::from_fn(|_ex| {
444 Box::pin(async { Err(CamelError::ProcessorError("boom".to_string())) })
445 }))
446 } else if uri.starts_with("mock:") {
447 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
448 } else {
449 None
450 }
451 });
452
453 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
454 "mock:a,mock:err,mock:c".to_string()
455 }))
456 .parallel(true)
457 .stop_on_exception(true);
458
459 let mut svc = RecipientListService::new(config, resolver);
460 let ex = Exchange::new(Message::new("test"));
461 let result = svc.ready().await.unwrap().call(ex).await;
462 assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
463 }
464
465 #[tokio::test]
466 async fn recipient_list_parallel_limit_respects_limit() {
467 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
468 "mock:a,mock:b,mock:c,mock:d".to_string()
469 }))
470 .parallel(true)
471 .parallel_limit(2);
472
473 let resolver = Arc::new(|uri: &str| {
474 if uri.starts_with("mock:") {
475 Some(BoxProcessor::from_fn(|ex| {
476 Box::pin(async move {
477 sleep(Duration::from_millis(100)).await;
478 Ok(ex)
479 })
480 }))
481 } else {
482 None
483 }
484 });
485
486 let mut svc = RecipientListService::new(config, resolver);
487 let ex = Exchange::new(Message::new("test"));
488 let start = Instant::now();
489 svc.ready().await.unwrap().call(ex).await.unwrap();
490 let elapsed = start.elapsed();
491
492 assert!(elapsed >= Duration::from_millis(180));
493 assert!(elapsed < Duration::from_millis(350));
494 }
495
496 #[tokio::test]
497 async fn recipient_list_collect_all_strategy() {
498 let resolver = Arc::new(|uri: &str| {
499 if uri == "mock:a" {
500 Some(BoxProcessor::from_fn(|mut ex| {
501 ex.input.body = Body::Text("a".to_string());
502 Box::pin(async move { Ok(ex) })
503 }))
504 } else if uri == "mock:b" {
505 Some(BoxProcessor::from_fn(|mut ex| {
506 ex.input.body = Body::Text("b".to_string());
507 Box::pin(async move { Ok(ex) })
508 }))
509 } else if uri == "mock:c" {
510 Some(BoxProcessor::from_fn(|mut ex| {
511 ex.input.body = Body::Text("c".to_string());
512 Box::pin(async move { Ok(ex) })
513 }))
514 } else {
515 None
516 }
517 });
518
519 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
520 "mock:a,mock:b,mock:c".to_string()
521 }))
522 .strategy(MulticastStrategy::CollectAll);
523
524 let mut svc = RecipientListService::new(config, resolver);
525 let ex = Exchange::new(Message::new("seed"));
526 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
527
528 assert_eq!(
529 result.input.body,
530 Body::from(Value::Array(vec![
531 Value::String("a".to_string()),
532 Value::String("b".to_string()),
533 Value::String("c".to_string()),
534 ]))
535 );
536 }
537
538 #[tokio::test]
539 async fn recipient_list_original_strategy() {
540 let resolver = Arc::new(|uri: &str| {
541 if uri.starts_with("mock:") {
542 let label = uri.to_string();
543 Some(BoxProcessor::from_fn(move |mut ex| {
544 let label = label.clone();
545 ex.input.body = Body::Text(format!("mutated-{label}"));
546 Box::pin(async move { Ok(ex) })
547 }))
548 } else {
549 None
550 }
551 });
552
553 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
554 "mock:a,mock:b,mock:c".to_string()
555 }))
556 .strategy(MulticastStrategy::Original);
557
558 let mut svc = RecipientListService::new(config, resolver);
559 let ex = Exchange::new(Message::new("original"));
560 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
561
562 assert_eq!(result.input.body.as_text(), Some("original"));
563 }
564
565 #[tokio::test]
566 async fn recipient_list_last_wins_strategy() {
567 let payloads: Arc<HashMap<String, String>> = Arc::new(HashMap::from([
568 ("mock:a".to_string(), "first".to_string()),
569 ("mock:b".to_string(), "second".to_string()),
570 ("mock:c".to_string(), "third".to_string()),
571 ]));
572
573 let resolver = {
574 let payloads = payloads.clone();
575 Arc::new(move |uri: &str| {
576 if let Some(payload) = payloads.get(uri) {
577 let payload = payload.clone();
578 Some(BoxProcessor::from_fn(move |mut ex| {
579 let payload = payload.clone();
580 ex.input.body = Body::Text(payload);
581 Box::pin(async move { Ok(ex) })
582 }))
583 } else {
584 None
585 }
586 })
587 };
588
589 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
590 "mock:a,mock:b,mock:c".to_string()
591 }))
592 .strategy(MulticastStrategy::LastWins);
593
594 let mut svc = RecipientListService::new(config, resolver);
595 let ex = Exchange::new(Message::new("seed"));
596 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
597
598 assert_eq!(result.input.body.as_text(), Some("third"));
599 }
600}