1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tokio::task::JoinSet;
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig};
10use camel_api::recipient_list::RecipientListConfig;
11use camel_api::{Body, CamelError, Exchange, Value};
12
13use crate::endpoint_pipeline::EndpointPipelineService;
14
15#[derive(Clone)]
16pub struct RecipientListService {
17 config: RecipientListConfig,
18 pipeline: EndpointPipelineService,
19}
20
21impl RecipientListService {
22 pub fn new(
23 config: RecipientListConfig,
24 endpoint_resolver: camel_api::EndpointResolver,
25 ) -> Self {
26 let pipeline_config = EndpointPipelineConfig {
27 cache_size: EndpointPipelineConfig::from_signed(1000),
28 ignore_invalid_endpoints: false,
29 };
30 Self {
31 config,
32 pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
33 }
34 }
35}
36
37impl Service<Exchange> for RecipientListService {
38 type Response = Exchange;
39 type Error = CamelError;
40 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
41
42 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
43 Poll::Ready(Ok(()))
44 }
45
46 fn call(&mut self, mut exchange: Exchange) -> Self::Future {
47 let config = self.config.clone();
48 let pipeline = self.pipeline.clone();
49
50 Box::pin(async move {
51 let uris_raw = (config.expression)(&exchange);
52 if uris_raw.is_empty() {
53 return Ok(exchange);
54 }
55
56 let uris: Vec<&str> = uris_raw
57 .split(&config.delimiter)
58 .map(|s| s.trim())
59 .filter(|s| !s.is_empty())
60 .collect();
61 if uris.is_empty() {
62 return Ok(exchange);
63 }
64
65 if config.parallel {
66 let original_for_aggregate = exchange.clone();
67 let mut endpoints_to_call = Vec::with_capacity(uris.len());
68 for uri in &uris {
69 if let Some(endpoint) = pipeline.resolve(uri)? {
70 endpoints_to_call.push((uri.to_string(), endpoint));
71 }
72 }
73
74 let mut results: Vec<Exchange> = Vec::with_capacity(endpoints_to_call.len());
75 let mut join_set = JoinSet::new();
76 let mut iter = endpoints_to_call.into_iter();
77 let raw_limit = config.parallel_limit.unwrap_or(results.capacity());
78 let limit = raw_limit.max(1).min(results.capacity().max(1));
79
80 for _ in 0..limit {
81 if let Some((uri, mut endpoint)) = iter.next() {
82 let mut cloned = original_for_aggregate.clone();
83 cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
84 join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
85 }
86 }
87
88 while let Some(result) = join_set.join_next().await {
89 match result {
90 Ok(Ok(ex)) => results.push(ex),
91 Ok(Err(e)) if config.stop_on_exception => {
92 join_set.abort_all();
93 return Err(e);
94 }
95 _ => {}
96 }
97
98 if let Some((uri, mut endpoint)) = iter.next() {
99 let mut cloned = original_for_aggregate.clone();
100 cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
101 join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
102 }
103 }
104
105 exchange = aggregate_results(config.strategy, original_for_aggregate, results);
106 } else {
107 let mut results: Vec<Exchange> = Vec::new();
108 let original_for_aggregate = exchange.clone();
109 for uri in &uris {
110 let endpoint = match pipeline.resolve(uri)? {
111 Some(e) => e,
112 None => continue,
113 };
114 exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
115 let mut endpoint = endpoint;
116 let result = endpoint.ready().await?.call(exchange.clone()).await;
117 match result {
118 Ok(ex) => {
119 results.push(ex.clone());
120 exchange = ex;
121 }
122 Err(e) if config.stop_on_exception => return Err(e),
123 Err(_) => continue,
124 }
125 }
126 exchange = aggregate_results(config.strategy, original_for_aggregate, results);
127 }
128
129 Ok(exchange)
130 })
131 }
132}
133
134fn aggregate_results(
135 strategy: camel_api::MulticastStrategy,
136 original: Exchange,
137 results: Vec<Exchange>,
138) -> Exchange {
139 match strategy {
140 camel_api::MulticastStrategy::LastWins => results.into_iter().last().unwrap_or(original),
141 camel_api::MulticastStrategy::Original => original,
142 camel_api::MulticastStrategy::CollectAll => {
143 let bodies: Vec<Value> = results
144 .iter()
145 .map(|ex| match &ex.input.body {
146 Body::Text(s) => Value::String(s.clone()),
147 Body::Json(v) => v.clone(),
148 Body::Xml(s) => Value::String(s.clone()),
149 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
150 Body::Empty => Value::Null,
151 Body::Stream(s) => serde_json::json!({
152 "_stream": {
153 "origin": s.metadata.origin,
154 "placeholder": true,
155 "hint": "Materialize exchange body with .into_bytes() before recipient-list aggregation"
156 }
157 }),
158 })
159 .collect();
160 let mut result = results.into_iter().last().unwrap_or(original);
161 result.input.body = camel_api::Body::from(Value::Array(bodies));
162 result
163 }
164 camel_api::MulticastStrategy::Custom(fn_) => {
165 results.into_iter().fold(original, |acc, ex| fn_(acc, ex))
166 }
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use camel_api::MulticastStrategy;
174 use camel_api::{BoxProcessor, BoxProcessorExt, Message};
175 use std::collections::HashMap;
176 use std::sync::Arc;
177 use std::sync::atomic::{AtomicUsize, Ordering};
178 use std::time::{Duration, Instant};
179 use tokio::sync::Mutex;
180 use tokio::time::sleep;
181
182 fn mock_resolver() -> camel_api::EndpointResolver {
183 Arc::new(|uri: &str| {
184 if uri.starts_with("mock:") {
185 Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
186 } else {
187 None
188 }
189 })
190 }
191
192 #[tokio::test]
193 async fn recipient_list_single_destination() {
194 let call_count = Arc::new(AtomicUsize::new(0));
195 let count_clone = call_count.clone();
196
197 let resolver = Arc::new(move |uri: &str| {
198 if uri == "mock:a" {
199 let count = count_clone.clone();
200 Some(BoxProcessor::from_fn(move |ex| {
201 count.fetch_add(1, Ordering::SeqCst);
202 Box::pin(async move { Ok(ex) })
203 }))
204 } else {
205 None
206 }
207 });
208
209 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| "mock:a".to_string()));
210
211 let mut svc = RecipientListService::new(config, resolver);
212 let ex = Exchange::new(Message::new("test"));
213 let result = svc.ready().await.unwrap().call(ex).await;
214
215 assert!(result.is_ok());
216 assert_eq!(call_count.load(Ordering::SeqCst), 1);
217 }
218
219 #[tokio::test]
220 async fn recipient_list_multiple_destinations() {
221 let call_count = Arc::new(AtomicUsize::new(0));
222 let count_clone = call_count.clone();
223
224 let resolver = Arc::new(move |uri: &str| {
225 if uri.starts_with("mock:") {
226 let count = count_clone.clone();
227 Some(BoxProcessor::from_fn(move |ex| {
228 count.fetch_add(1, Ordering::SeqCst);
229 Box::pin(async move { Ok(ex) })
230 }))
231 } else {
232 None
233 }
234 });
235
236 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
237 "mock:a,mock:b,mock:c".to_string()
238 }));
239
240 let mut svc = RecipientListService::new(config, resolver);
241 let ex = Exchange::new(Message::new("test"));
242 let result = svc.ready().await.unwrap().call(ex).await;
243
244 assert!(result.is_ok());
245 assert_eq!(call_count.load(Ordering::SeqCst), 3);
246 }
247
248 #[tokio::test]
249 async fn recipient_list_empty_expression() {
250 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| String::new()));
251
252 let mut svc = RecipientListService::new(config, mock_resolver());
253 let ex = Exchange::new(Message::new("test"));
254 let result = svc.ready().await.unwrap().call(ex).await;
255
256 assert!(result.is_ok());
257 }
258
259 #[tokio::test]
260 async fn recipient_list_invalid_endpoint_error() {
261 let config =
262 RecipientListConfig::new(Arc::new(|_ex: &Exchange| "invalid:endpoint".to_string()));
263
264 let mut svc = RecipientListService::new(config, mock_resolver());
265 let ex = Exchange::new(Message::new("test"));
266 let result = svc.ready().await.unwrap().call(ex).await;
267
268 assert!(result.is_err());
269 assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
270 }
271
272 #[tokio::test]
273 async fn recipient_list_custom_delimiter() {
274 use std::sync::Mutex;
275
276 let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
277
278 let resolver = {
279 let order = order.clone();
280 Arc::new(move |uri: &str| {
281 let order = order.clone();
282 let uri = uri.to_string();
283 Some(BoxProcessor::from_fn(move |ex| {
284 order.lock().unwrap().push(uri.clone());
285 Box::pin(async move { Ok(ex) })
286 }))
287 })
288 };
289
290 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
291 "mock:x|mock:y|mock:z".to_string()
292 }))
293 .delimiter("|");
294
295 let mut svc = RecipientListService::new(config, resolver);
296 let ex = Exchange::new(Message::new("test"));
297 svc.ready().await.unwrap().call(ex).await.unwrap();
298
299 let order = order.lock().unwrap();
300 assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
301 }
302
303 #[tokio::test]
304 async fn recipient_list_expression_evaluated_once() {
305 let expr_count = Arc::new(AtomicUsize::new(0));
306 let expr_count_clone = expr_count.clone();
307
308 let config = RecipientListConfig::new(Arc::new(move |_ex: &Exchange| {
309 expr_count_clone.fetch_add(1, Ordering::SeqCst);
310 "mock:a,mock:b".to_string()
311 }));
312
313 let mut svc = RecipientListService::new(config, mock_resolver());
314 let ex = Exchange::new(Message::new("test"));
315 svc.ready().await.unwrap().call(ex).await.unwrap();
316
317 assert_eq!(
318 expr_count.load(Ordering::SeqCst),
319 1,
320 "Expression must be evaluated exactly once"
321 );
322 }
323
324 #[tokio::test]
325 async fn recipient_list_mutation_between_steps() {
326 let resolver = Arc::new(|uri: &str| {
327 if uri == "mock:mutate" {
328 Some(BoxProcessor::from_fn(|mut ex| {
329 ex.input.body = camel_api::Body::Text("mutated".to_string());
330 Box::pin(async move { Ok(ex) })
331 }))
332 } else if uri == "mock:verify" {
333 Some(BoxProcessor::from_fn(|ex| {
334 let body = ex.input.body.as_text().unwrap_or("").to_string();
335 assert_eq!(body, "mutated");
336 Box::pin(async move { Ok(ex) })
337 }))
338 } else {
339 None
340 }
341 });
342
343 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
344 "mock:mutate,mock:verify".to_string()
345 }));
346
347 let mut svc = RecipientListService::new(config, resolver);
348 let ex = Exchange::new(Message::new("original"));
349 let result = svc.ready().await.unwrap().call(ex).await;
350
351 assert!(result.is_ok());
352 }
353
354 #[tokio::test]
355 async fn recipient_list_parallel_executes_concurrently() {
356 let records: Arc<Mutex<Vec<(String, Instant, Instant)>>> = Arc::new(Mutex::new(Vec::new()));
357
358 let resolver = {
359 let records = records.clone();
360 Arc::new(move |uri: &str| {
361 if uri.starts_with("mock:") {
362 let records = records.clone();
363 let uri = uri.to_string();
364 Some(BoxProcessor::from_fn(move |ex| {
365 let records = records.clone();
366 let uri = uri.clone();
367 Box::pin(async move {
368 let start = Instant::now();
369 sleep(Duration::from_millis(100)).await;
370 let end = Instant::now();
371 records.lock().await.push((uri, start, end));
372 Ok(ex)
373 })
374 }))
375 } else {
376 None
377 }
378 })
379 };
380
381 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
382 "mock:a,mock:b,mock:c".to_string()
383 }))
384 .parallel(true);
385
386 let mut svc = RecipientListService::new(config, resolver);
387 let ex = Exchange::new(Message::new("test"));
388 svc.ready().await.unwrap().call(ex).await.unwrap();
389
390 let records = records.lock().await;
391 assert_eq!(records.len(), 3);
392
393 let mut overlap_found = false;
394 for i in 0..records.len() {
395 for j in (i + 1)..records.len() {
396 let (_, a_start, a_end) = records[i];
397 let (_, b_start, b_end) = records[j];
398 if a_start < b_end && b_start < a_end {
399 overlap_found = true;
400 break;
401 }
402 }
403 if overlap_found {
404 break;
405 }
406 }
407
408 assert!(overlap_found);
409 }
410
411 #[tokio::test]
412 async fn recipient_list_parallel_limit_respects_limit() {
413 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
414 "mock:a,mock:b,mock:c,mock:d".to_string()
415 }))
416 .parallel(true)
417 .parallel_limit(2);
418
419 let resolver = Arc::new(|uri: &str| {
420 if uri.starts_with("mock:") {
421 Some(BoxProcessor::from_fn(|ex| {
422 Box::pin(async move {
423 sleep(Duration::from_millis(100)).await;
424 Ok(ex)
425 })
426 }))
427 } else {
428 None
429 }
430 });
431
432 let mut svc = RecipientListService::new(config, resolver);
433 let ex = Exchange::new(Message::new("test"));
434 let start = Instant::now();
435 svc.ready().await.unwrap().call(ex).await.unwrap();
436 let elapsed = start.elapsed();
437
438 assert!(elapsed >= Duration::from_millis(180));
439 assert!(elapsed < Duration::from_millis(350));
440 }
441
442 #[tokio::test]
443 async fn recipient_list_collect_all_strategy() {
444 let resolver = Arc::new(|uri: &str| {
445 if uri == "mock:a" {
446 Some(BoxProcessor::from_fn(|mut ex| {
447 ex.input.body = Body::Text("a".to_string());
448 Box::pin(async move { Ok(ex) })
449 }))
450 } else if uri == "mock:b" {
451 Some(BoxProcessor::from_fn(|mut ex| {
452 ex.input.body = Body::Text("b".to_string());
453 Box::pin(async move { Ok(ex) })
454 }))
455 } else if uri == "mock:c" {
456 Some(BoxProcessor::from_fn(|mut ex| {
457 ex.input.body = Body::Text("c".to_string());
458 Box::pin(async move { Ok(ex) })
459 }))
460 } else {
461 None
462 }
463 });
464
465 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
466 "mock:a,mock:b,mock:c".to_string()
467 }))
468 .strategy(MulticastStrategy::CollectAll);
469
470 let mut svc = RecipientListService::new(config, resolver);
471 let ex = Exchange::new(Message::new("seed"));
472 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
473
474 assert_eq!(
475 result.input.body,
476 Body::from(Value::Array(vec![
477 Value::String("a".to_string()),
478 Value::String("b".to_string()),
479 Value::String("c".to_string()),
480 ]))
481 );
482 }
483
484 #[tokio::test]
485 async fn recipient_list_original_strategy() {
486 let resolver = Arc::new(|uri: &str| {
487 if uri.starts_with("mock:") {
488 let label = uri.to_string();
489 Some(BoxProcessor::from_fn(move |mut ex| {
490 let label = label.clone();
491 ex.input.body = Body::Text(format!("mutated-{label}"));
492 Box::pin(async move { Ok(ex) })
493 }))
494 } else {
495 None
496 }
497 });
498
499 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
500 "mock:a,mock:b,mock:c".to_string()
501 }))
502 .strategy(MulticastStrategy::Original);
503
504 let mut svc = RecipientListService::new(config, resolver);
505 let ex = Exchange::new(Message::new("original"));
506 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
507
508 assert_eq!(result.input.body.as_text(), Some("original"));
509 }
510
511 #[tokio::test]
512 async fn recipient_list_last_wins_strategy() {
513 let payloads: Arc<HashMap<String, String>> = Arc::new(HashMap::from([
514 ("mock:a".to_string(), "first".to_string()),
515 ("mock:b".to_string(), "second".to_string()),
516 ("mock:c".to_string(), "third".to_string()),
517 ]));
518
519 let resolver = {
520 let payloads = payloads.clone();
521 Arc::new(move |uri: &str| {
522 if let Some(payload) = payloads.get(uri) {
523 let payload = payload.clone();
524 Some(BoxProcessor::from_fn(move |mut ex| {
525 let payload = payload.clone();
526 ex.input.body = Body::Text(payload);
527 Box::pin(async move { Ok(ex) })
528 }))
529 } else {
530 None
531 }
532 })
533 };
534
535 let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
536 "mock:a,mock:b,mock:c".to_string()
537 }))
538 .strategy(MulticastStrategy::LastWins);
539
540 let mut svc = RecipientListService::new(config, resolver);
541 let ex = Exchange::new(Message::new("seed"));
542 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
543
544 assert_eq!(result.input.body.as_text(), Some("third"));
545 }
546}