1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tokio::task::JoinSet;
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig};
10use camel_api::recipient_list::RecipientListConfig;
11use camel_api::{Body, CamelError, Exchange, Value};
12
13use crate::endpoint_pipeline::EndpointPipelineService;
14
15#[derive(Clone)]
16pub struct RecipientListService {
17 config: RecipientListConfig,
18 pipeline: EndpointPipelineService,
19}
20
21impl RecipientListService {
22 pub fn new(
23 config: RecipientListConfig,
24 endpoint_resolver: camel_api::EndpointResolver,
25 ) -> Result<Self, CamelError> {
26 config.validate()?;
27 let pipeline_config = EndpointPipelineConfig {
28 cache_size: EndpointPipelineConfig::from_signed(1000),
29 ignore_invalid_endpoints: false,
30 };
31 Ok(Self {
32 config,
33 pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
34 })
35 }
36}
37
38impl Service<Exchange> for RecipientListService {
39 type Response = Exchange;
40 type Error = CamelError;
41 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
42
43 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44 Poll::Ready(Ok(()))
45 }
46
47 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
48 let config = self.config.clone();
49 let pipeline = self.pipeline.clone();
50
51 Box::pin(async move {
52 let uris_raw = (config.expression)(&exchange);
53 if uris_raw.is_empty() {
54 return Ok(exchange);
55 }
56
57 let uris: Vec<&str> = uris_raw
58 .split(&config.delimiter)
59 .map(|s| s.trim())
60 .filter(|s| !s.is_empty())
61 .collect();
62 if uris.is_empty() {
63 return Ok(exchange);
64 }
65
66 if config.parallel {
67 let original_for_aggregate = exchange.clone();
68 let mut endpoints_to_call = Vec::with_capacity(uris.len());
69 for uri in &uris {
70 if let Some(endpoint) = pipeline.resolve(uri)? {
71 endpoints_to_call.push((uri.to_string(), endpoint));
72 }
73 }
74
75 let mut results: Vec<Exchange> = Vec::with_capacity(endpoints_to_call.len());
76 let mut join_set = JoinSet::new();
77 let mut iter = endpoints_to_call.into_iter();
78 let raw_limit = config.parallel_limit.unwrap_or(results.capacity());
79 let limit = raw_limit.max(1).min(results.capacity().max(1));
80
81 for _ in 0..limit {
82 if let Some((uri, mut endpoint)) = iter.next() {
83 let mut cloned = original_for_aggregate.clone();
84 cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
85 join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
86 }
87 }
88
89 while let Some(result) = join_set.join_next().await {
90 match result {
91 Ok(Ok(ex)) => results.push(ex),
92 Ok(Err(e)) if config.stop_on_exception => {
93 join_set.abort_all();
94 return Err(e);
95 }
96 _ => {}
97 }
98
99 if let Some((uri, mut endpoint)) = iter.next() {
100 let mut cloned = original_for_aggregate.clone();
101 cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
102 join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
103 }
104 }
105
106 exchange = aggregate_results(config.strategy, original_for_aggregate, results);
107 } else {
108 let mut results: Vec<Exchange> = Vec::new();
109 let original_for_aggregate = exchange.clone();
110 for uri in &uris {
111 let endpoint = match pipeline.resolve(uri)? {
112 Some(e) => e,
113 None => continue,
114 };
115 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
116 let mut endpoint = endpoint;
117 let result = endpoint.ready().await?.call(exchange.clone()).await;
118 match result {
119 Ok(ex) => {
120 results.push(ex.clone());
121 exchange = ex;
122 }
123 Err(e) if config.stop_on_exception => return Err(e),
124 Err(_) => continue,
125 }
126 }
127 exchange = aggregate_results(config.strategy, original_for_aggregate, results);
128 }
129
130 Ok(exchange)
131 })
132 }
133}
134
135fn aggregate_results(
136 strategy: camel_api::MulticastStrategy,
137 original: Exchange,
138 results: Vec<Exchange>,
139) -> Exchange {
140 match strategy {
141 camel_api::MulticastStrategy::LastWins => results.into_iter().last().unwrap_or(original),
142 camel_api::MulticastStrategy::Original => original,
143 camel_api::MulticastStrategy::CollectAll => {
144 let bodies: Vec<Value> = results
145 .iter()
146 .map(|ex| match &ex.input.body {
147 Body::Text(s) => Value::String(s.clone()),
148 Body::Json(v) => v.clone(),
149 Body::Xml(s) => Value::String(s.clone()),
150 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
151 Body::Empty => Value::Null,
152 Body::Stream(s) => serde_json::json!({
153 "_stream": {
154 "origin": s.metadata.origin,
155 "placeholder": true,
156 "hint": "Materialize exchange body with .into_bytes() before recipient-list aggregation"
157 }
158 }),
159 })
160 .collect();
161 let mut result = results.into_iter().last().unwrap_or(original);
162 result.input.body = camel_api::Body::from(Value::Array(bodies));
163 result
164 }
165 camel_api::MulticastStrategy::Custom(fn_) => {
166 results.into_iter().fold(original, |acc, ex| fn_(acc, ex))
167 }
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174 use camel_api::MulticastStrategy;
175 use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Message};
176 use std::collections::HashMap;
177 use std::sync::Arc;
178 use std::sync::atomic::{AtomicUsize, Ordering};
179 use std::time::{Duration, Instant};
180 use tokio::sync::Mutex;
181 use tokio::time::sleep;
182
183 fn mock_resolver() -> camel_api::EndpointResolver {
184 Arc::new(|uri: &str| {
185 if uri.starts_with("mock:") {
186 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
187 } else {
188 None
189 }
190 })
191 }
192
193 #[tokio::test]
194 async fn recipient_list_single_destination() {
195 let call_count = Arc::new(AtomicUsize::new(0));
196 let count_clone = call_count.clone();
197
198 let resolver = Arc::new(move |uri: &str| {
199 if uri == "mock:a" {
200 let count = count_clone.clone();
201 Some(BoxProcessor::from_fn(move |ex| {
202 count.fetch_add(1, Ordering::SeqCst);
203 Box::pin(async move { Ok(ex) })
204 }))
205 } else {
206 None
207 }
208 });
209
210 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| "mock:a".to_string()));
211
212 let mut svc = RecipientListService::new(config, resolver).unwrap();
213 let ex = Exchange::new(Message::new("test"));
214 let result = svc.ready().await.unwrap().call(ex).await;
215
216 assert!(result.is_ok());
217 assert_eq!(call_count.load(Ordering::SeqCst), 1);
218 }
219
220 #[tokio::test]
221 async fn recipient_list_multiple_destinations() {
222 let call_count = Arc::new(AtomicUsize::new(0));
223 let count_clone = call_count.clone();
224
225 let resolver = Arc::new(move |uri: &str| {
226 if uri.starts_with("mock:") {
227 let count = count_clone.clone();
228 Some(BoxProcessor::from_fn(move |ex| {
229 count.fetch_add(1, Ordering::SeqCst);
230 Box::pin(async move { Ok(ex) })
231 }))
232 } else {
233 None
234 }
235 });
236
237 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
238 "mock:a,mock:b,mock:c".to_string()
239 }));
240
241 let mut svc = RecipientListService::new(config, resolver).unwrap();
242 let ex = Exchange::new(Message::new("test"));
243 let result = svc.ready().await.unwrap().call(ex).await;
244
245 assert!(result.is_ok());
246 assert_eq!(call_count.load(Ordering::SeqCst), 3);
247 }
248
249 #[tokio::test]
250 async fn recipient_list_empty_expression() {
251 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| String::new()));
252
253 let mut svc = RecipientListService::new(config, mock_resolver()).unwrap();
254 let ex = Exchange::new(Message::new("test"));
255 let result = svc.ready().await.unwrap().call(ex).await;
256
257 assert!(result.is_ok());
258 }
259
260 #[tokio::test]
261 async fn recipient_list_invalid_endpoint_error() {
262 let config =
263 RecipientListConfig::new(Arc::new(|_ex: &Exchange| "invalid:endpoint".to_string()));
264
265 let mut svc = RecipientListService::new(config, mock_resolver()).unwrap();
266 let ex = Exchange::new(Message::new("test"));
267 let result = svc.ready().await.unwrap().call(ex).await;
268
269 assert!(result.is_err());
270 assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
271 }
272
273 #[tokio::test]
274 async fn recipient_list_custom_delimiter() {
275 use std::sync::Mutex;
276
277 let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
278
279 let resolver = {
280 let order = order.clone();
281 Arc::new(move |uri: &str| {
282 let order = order.clone();
283 let uri = uri.to_string();
284 Some(BoxProcessor::from_fn(move |ex| {
285 order.lock().unwrap().push(uri.clone());
286 Box::pin(async move { Ok(ex) })
287 }))
288 })
289 };
290
291 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
292 "mock:x|mock:y|mock:z".to_string()
293 }))
294 .delimiter("|");
295
296 let mut svc = RecipientListService::new(config, resolver).unwrap();
297 let ex = Exchange::new(Message::new("test"));
298 svc.ready().await.unwrap().call(ex).await.unwrap();
299
300 let order = order.lock().unwrap();
301 assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
302 }
303
304 #[tokio::test]
305 async fn recipient_list_expression_evaluated_once() {
306 let expr_count = Arc::new(AtomicUsize::new(0));
307 let expr_count_clone = expr_count.clone();
308
309 let config = RecipientListConfig::new(Arc::new(move |_ex: &Exchange| {
310 expr_count_clone.fetch_add(1, Ordering::SeqCst);
311 "mock:a,mock:b".to_string()
312 }));
313
314 let mut svc = RecipientListService::new(config, mock_resolver()).unwrap();
315 let ex = Exchange::new(Message::new("test"));
316 svc.ready().await.unwrap().call(ex).await.unwrap();
317
318 assert_eq!(
319 expr_count.load(Ordering::SeqCst),
320 1,
321 "Expression must be evaluated exactly once"
322 );
323 }
324
325 #[tokio::test]
326 async fn recipient_list_ignores_empty_uri_tokens() {
327 let call_count = Arc::new(AtomicUsize::new(0));
328 let call_count_clone = call_count.clone();
329
330 let resolver = Arc::new(move |uri: &str| {
331 if uri.starts_with("mock:") {
332 let count = call_count_clone.clone();
333 Some(BoxProcessor::from_fn(move |ex| {
334 count.fetch_add(1, Ordering::SeqCst);
335 Box::pin(async move { Ok(ex) })
336 }))
337 } else {
338 None
339 }
340 });
341
342 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
343 " ,mock:a, ,mock:b,, ".to_string()
344 }));
345
346 let mut svc = RecipientListService::new(config, resolver).unwrap();
347 let ex = Exchange::new(Message::new("test"));
348 let result = svc.ready().await.unwrap().call(ex).await;
349 assert!(result.is_ok());
350 assert_eq!(call_count.load(Ordering::SeqCst), 2);
351 }
352
353 #[tokio::test]
354 async fn recipient_list_mutation_between_steps() {
355 let resolver = Arc::new(|uri: &str| {
356 if uri == "mock:mutate" {
357 Some(BoxProcessor::from_fn(|mut ex| {
358 ex.input.body = camel_api::Body::Text("mutated".to_string());
359 Box::pin(async move { Ok(ex) })
360 }))
361 } else if uri == "mock:verify" {
362 Some(BoxProcessor::from_fn(|ex| {
363 let body = ex.input.body.as_text().unwrap_or("").to_string();
364 assert_eq!(body, "mutated");
365 Box::pin(async move { Ok(ex) })
366 }))
367 } else {
368 None
369 }
370 });
371
372 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
373 "mock:mutate,mock:verify".to_string()
374 }));
375
376 let mut svc = RecipientListService::new(config, resolver).unwrap();
377 let ex = Exchange::new(Message::new("original"));
378 let result = svc.ready().await.unwrap().call(ex).await;
379
380 assert!(result.is_ok());
381 }
382
383 #[tokio::test]
384 async fn recipient_list_parallel_executes_concurrently() {
385 let records: Arc<Mutex<Vec<(String, Instant, Instant)>>> = Arc::new(Mutex::new(Vec::new()));
386
387 let resolver = {
388 let records = records.clone();
389 Arc::new(move |uri: &str| {
390 if uri.starts_with("mock:") {
391 let records = records.clone();
392 let uri = uri.to_string();
393 Some(BoxProcessor::from_fn(move |ex| {
394 let records = records.clone();
395 let uri = uri.clone();
396 Box::pin(async move {
397 let start = Instant::now();
398 sleep(Duration::from_millis(100)).await;
399 let end = Instant::now();
400 records.lock().await.push((uri, start, end));
401 Ok(ex)
402 })
403 }))
404 } else {
405 None
406 }
407 })
408 };
409
410 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
411 "mock:a,mock:b,mock:c".to_string()
412 }))
413 .parallel(true);
414
415 let mut svc = RecipientListService::new(config, resolver).unwrap();
416 let ex = Exchange::new(Message::new("test"));
417 svc.ready().await.unwrap().call(ex).await.unwrap();
418
419 let records = records.lock().await;
420 assert_eq!(records.len(), 3);
421
422 let mut overlap_found = false;
423 for i in 0..records.len() {
424 for j in (i + 1)..records.len() {
425 let (_, a_start, a_end) = records[i];
426 let (_, b_start, b_end) = records[j];
427 if a_start < b_end && b_start < a_end {
428 overlap_found = true;
429 break;
430 }
431 }
432 if overlap_found {
433 break;
434 }
435 }
436
437 assert!(overlap_found);
438 }
439
440 #[tokio::test]
441 async fn recipient_list_parallel_stop_on_exception_returns_error() {
442 let resolver = Arc::new(|uri: &str| {
443 if uri == "mock:err" {
444 Some(BoxProcessor::from_fn(|_ex| {
445 Box::pin(async { Err(CamelError::ProcessorError("boom".to_string())) })
446 }))
447 } else if uri.starts_with("mock:") {
448 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
449 } else {
450 None
451 }
452 });
453
454 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
455 "mock:a,mock:err,mock:c".to_string()
456 }))
457 .parallel(true)
458 .stop_on_exception(true);
459
460 let mut svc = RecipientListService::new(config, resolver).unwrap();
461 let ex = Exchange::new(Message::new("test"));
462 let result = svc.ready().await.unwrap().call(ex).await;
463 assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
464 }
465
466 #[tokio::test]
467 async fn recipient_list_parallel_limit_respects_limit() {
468 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
469 "mock:a,mock:b,mock:c,mock:d".to_string()
470 }))
471 .parallel(true)
472 .parallel_limit(2);
473
474 let resolver = Arc::new(|uri: &str| {
475 if uri.starts_with("mock:") {
476 Some(BoxProcessor::from_fn(|ex| {
477 Box::pin(async move {
478 sleep(Duration::from_millis(100)).await;
479 Ok(ex)
480 })
481 }))
482 } else {
483 None
484 }
485 });
486
487 let mut svc = RecipientListService::new(config, resolver).unwrap();
488 let ex = Exchange::new(Message::new("test"));
489 let start = Instant::now();
490 svc.ready().await.unwrap().call(ex).await.unwrap();
491 let elapsed = start.elapsed();
492
493 assert!(elapsed >= Duration::from_millis(180));
494 assert!(elapsed < Duration::from_millis(350));
495 }
496
497 #[tokio::test]
498 async fn recipient_list_collect_all_strategy() {
499 let resolver = Arc::new(|uri: &str| {
500 if uri == "mock:a" {
501 Some(BoxProcessor::from_fn(|mut ex| {
502 ex.input.body = Body::Text("a".to_string());
503 Box::pin(async move { Ok(ex) })
504 }))
505 } else if uri == "mock:b" {
506 Some(BoxProcessor::from_fn(|mut ex| {
507 ex.input.body = Body::Text("b".to_string());
508 Box::pin(async move { Ok(ex) })
509 }))
510 } else if uri == "mock:c" {
511 Some(BoxProcessor::from_fn(|mut ex| {
512 ex.input.body = Body::Text("c".to_string());
513 Box::pin(async move { Ok(ex) })
514 }))
515 } else {
516 None
517 }
518 });
519
520 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
521 "mock:a,mock:b,mock:c".to_string()
522 }))
523 .strategy(MulticastStrategy::CollectAll);
524
525 let mut svc = RecipientListService::new(config, resolver).unwrap();
526 let ex = Exchange::new(Message::new("seed"));
527 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
528
529 assert_eq!(
530 result.input.body,
531 Body::from(Value::Array(vec![
532 Value::String("a".to_string()),
533 Value::String("b".to_string()),
534 Value::String("c".to_string()),
535 ]))
536 );
537 }
538
539 #[tokio::test]
540 async fn recipient_list_original_strategy() {
541 let resolver = Arc::new(|uri: &str| {
542 if uri.starts_with("mock:") {
543 let label = uri.to_string();
544 Some(BoxProcessor::from_fn(move |mut ex| {
545 let label = label.clone();
546 ex.input.body = Body::Text(format!("mutated-{label}"));
547 Box::pin(async move { Ok(ex) })
548 }))
549 } else {
550 None
551 }
552 });
553
554 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
555 "mock:a,mock:b,mock:c".to_string()
556 }))
557 .strategy(MulticastStrategy::Original);
558
559 let mut svc = RecipientListService::new(config, resolver).unwrap();
560 let ex = Exchange::new(Message::new("original"));
561 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
562
563 assert_eq!(result.input.body.as_text(), Some("original"));
564 }
565
566 #[tokio::test]
567 async fn recipient_list_last_wins_strategy() {
568 let payloads: Arc<HashMap<String, String>> = Arc::new(HashMap::from([
569 ("mock:a".to_string(), "first".to_string()),
570 ("mock:b".to_string(), "second".to_string()),
571 ("mock:c".to_string(), "third".to_string()),
572 ]));
573
574 let resolver = {
575 let payloads = payloads.clone();
576 Arc::new(move |uri: &str| {
577 if let Some(payload) = payloads.get(uri) {
578 let payload = payload.clone();
579 Some(BoxProcessor::from_fn(move |mut ex| {
580 let payload = payload.clone();
581 ex.input.body = Body::Text(payload);
582 Box::pin(async move { Ok(ex) })
583 }))
584 } else {
585 None
586 }
587 })
588 };
589
590 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
591 "mock:a,mock:b,mock:c".to_string()
592 }))
593 .strategy(MulticastStrategy::LastWins);
594
595 let mut svc = RecipientListService::new(config, resolver).unwrap();
596 let ex = Exchange::new(Message::new("seed"));
597 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
598
599 assert_eq!(result.input.body.as_text(), Some("third"));
600 }
601}