Skip to main content

camel_processor/
recipient_list.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tokio::task::JoinSet;
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig};
10use camel_api::recipient_list::RecipientListConfig;
11use camel_api::{Body, CamelError, Exchange, Value};
12
13use crate::endpoint_pipeline::EndpointPipelineService;
14
15#[derive(Clone)]
16pub struct RecipientListService {
17    config: RecipientListConfig,
18    pipeline: EndpointPipelineService,
19}
20
21impl RecipientListService {
22    pub fn new(
23        config: RecipientListConfig,
24        endpoint_resolver: camel_api::EndpointResolver,
25    ) -> Result<Self, CamelError> {
26        config.validate()?;
27        let pipeline_config = EndpointPipelineConfig {
28            cache_size: EndpointPipelineConfig::from_signed(1000),
29            ignore_invalid_endpoints: false,
30        };
31        Ok(Self {
32            config,
33            pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
34        })
35    }
36}
37
38impl Service<Exchange> for RecipientListService {
39    type Response = Exchange;
40    type Error = CamelError;
41    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
42
43    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
44        Poll::Ready(Ok(()))
45    }
46
47    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
48        let config = self.config.clone();
49        let pipeline = self.pipeline.clone();
50
51        Box::pin(async move {
52            let uris_raw = (config.expression)(&exchange);
53            if uris_raw.is_empty() {
54                return Ok(exchange);
55            }
56
57            let uris: Vec<&str> = uris_raw
58                .split(&config.delimiter)
59                .map(|s| s.trim())
60                .filter(|s| !s.is_empty())
61                .collect();
62            if uris.is_empty() {
63                return Ok(exchange);
64            }
65
66            if config.parallel {
67                let original_for_aggregate = exchange.clone();
68                let mut endpoints_to_call = Vec::with_capacity(uris.len());
69                for uri in &uris {
70                    if let Some(endpoint) = pipeline.resolve(uri)? {
71                        endpoints_to_call.push((uri.to_string(), endpoint));
72                    }
73                }
74
75                let mut results: Vec<Exchange> = Vec::with_capacity(endpoints_to_call.len());
76                let mut join_set = JoinSet::new();
77                let mut iter = endpoints_to_call.into_iter();
78                let raw_limit = config.parallel_limit.unwrap_or(results.capacity());
79                let limit = raw_limit.max(1).min(results.capacity().max(1));
80
81                for _ in 0..limit {
82                    if let Some((uri, mut endpoint)) = iter.next() {
83                        let mut cloned = original_for_aggregate.clone();
84                        cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
85                        join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
86                    }
87                }
88
89                while let Some(result) = join_set.join_next().await {
90                    match result {
91                        Ok(Ok(ex)) => results.push(ex),
92                        Ok(Err(e)) if config.stop_on_exception => {
93                            join_set.abort_all();
94                            return Err(e);
95                        }
96                        _ => {}
97                    }
98
99                    if let Some((uri, mut endpoint)) = iter.next() {
100                        let mut cloned = original_for_aggregate.clone();
101                        cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
102                        join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
103                    }
104                }
105
106                exchange = aggregate_results(config.strategy, original_for_aggregate, results);
107            } else {
108                let mut results: Vec<Exchange> = Vec::new();
109                let original_for_aggregate = exchange.clone();
110                for uri in &uris {
111                    let endpoint = match pipeline.resolve(uri)? {
112                        Some(e) => e,
113                        None => continue,
114                    };
115                    exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
116                    let mut endpoint = endpoint;
117                    let result = endpoint.ready().await?.call(exchange.clone()).await;
118                    match result {
119                        Ok(ex) => {
120                            results.push(ex.clone());
121                            exchange = ex;
122                        }
123                        Err(e) if config.stop_on_exception => return Err(e),
124                        Err(_) => continue,
125                    }
126                }
127                exchange = aggregate_results(config.strategy, original_for_aggregate, results);
128            }
129
130            Ok(exchange)
131        })
132    }
133}
134
135fn aggregate_results(
136    strategy: camel_api::MulticastStrategy,
137    original: Exchange,
138    results: Vec<Exchange>,
139) -> Exchange {
140    match strategy {
141        camel_api::MulticastStrategy::LastWins => results.into_iter().last().unwrap_or(original),
142        camel_api::MulticastStrategy::Original => original,
143        camel_api::MulticastStrategy::CollectAll => {
144            let bodies: Vec<Value> = results
145                .iter()
146                .map(|ex| match &ex.input.body {
147                    Body::Text(s) => Value::String(s.clone()),
148                    Body::Json(v) => v.clone(),
149                    Body::Xml(s) => Value::String(s.clone()),
150                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
151                    Body::Empty => Value::Null,
152                    Body::Stream(s) => serde_json::json!({
153                        "_stream": {
154                            "origin": s.metadata.origin,
155                            "placeholder": true,
156                            "hint": "Materialize exchange body with .into_bytes() before recipient-list aggregation"
157                        }
158                    }),
159                })
160                .collect();
161            let mut result = results.into_iter().last().unwrap_or(original);
162            result.input.body = camel_api::Body::from(Value::Array(bodies));
163            result
164        }
165        camel_api::MulticastStrategy::Custom(fn_) => {
166            results.into_iter().fold(original, |acc, ex| fn_(acc, ex))
167        }
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174    use camel_api::MulticastStrategy;
175    use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Message};
176    use std::collections::HashMap;
177    use std::sync::Arc;
178    use std::sync::atomic::{AtomicUsize, Ordering};
179    use std::time::{Duration, Instant};
180    use tokio::sync::Mutex;
181    use tokio::time::sleep;
182
183    fn mock_resolver() -> camel_api::EndpointResolver {
184        Arc::new(|uri: &str| {
185            if uri.starts_with("mock:") {
186                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
187            } else {
188                None
189            }
190        })
191    }
192
193    #[tokio::test]
194    async fn recipient_list_single_destination() {
195        let call_count = Arc::new(AtomicUsize::new(0));
196        let count_clone = call_count.clone();
197
198        let resolver = Arc::new(move |uri: &str| {
199            if uri == "mock:a" {
200                let count = count_clone.clone();
201                Some(BoxProcessor::from_fn(move |ex| {
202                    count.fetch_add(1, Ordering::SeqCst);
203                    Box::pin(async move { Ok(ex) })
204                }))
205            } else {
206                None
207            }
208        });
209
210        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| "mock:a".to_string()));
211
212        let mut svc = RecipientListService::new(config, resolver).unwrap();
213        let ex = Exchange::new(Message::new("test"));
214        let result = svc.ready().await.unwrap().call(ex).await;
215
216        assert!(result.is_ok());
217        assert_eq!(call_count.load(Ordering::SeqCst), 1);
218    }
219
220    #[tokio::test]
221    async fn recipient_list_multiple_destinations() {
222        let call_count = Arc::new(AtomicUsize::new(0));
223        let count_clone = call_count.clone();
224
225        let resolver = Arc::new(move |uri: &str| {
226            if uri.starts_with("mock:") {
227                let count = count_clone.clone();
228                Some(BoxProcessor::from_fn(move |ex| {
229                    count.fetch_add(1, Ordering::SeqCst);
230                    Box::pin(async move { Ok(ex) })
231                }))
232            } else {
233                None
234            }
235        });
236
237        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
238            "mock:a,mock:b,mock:c".to_string()
239        }));
240
241        let mut svc = RecipientListService::new(config, resolver).unwrap();
242        let ex = Exchange::new(Message::new("test"));
243        let result = svc.ready().await.unwrap().call(ex).await;
244
245        assert!(result.is_ok());
246        assert_eq!(call_count.load(Ordering::SeqCst), 3);
247    }
248
249    #[tokio::test]
250    async fn recipient_list_empty_expression() {
251        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| String::new()));
252
253        let mut svc = RecipientListService::new(config, mock_resolver()).unwrap();
254        let ex = Exchange::new(Message::new("test"));
255        let result = svc.ready().await.unwrap().call(ex).await;
256
257        assert!(result.is_ok());
258    }
259
260    #[tokio::test]
261    async fn recipient_list_invalid_endpoint_error() {
262        let config =
263            RecipientListConfig::new(Arc::new(|_ex: &Exchange| "invalid:endpoint".to_string()));
264
265        let mut svc = RecipientListService::new(config, mock_resolver()).unwrap();
266        let ex = Exchange::new(Message::new("test"));
267        let result = svc.ready().await.unwrap().call(ex).await;
268
269        assert!(result.is_err());
270        assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
271    }
272
273    #[tokio::test]
274    async fn recipient_list_custom_delimiter() {
275        use std::sync::Mutex;
276
277        let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
278
279        let resolver = {
280            let order = order.clone();
281            Arc::new(move |uri: &str| {
282                let order = order.clone();
283                let uri = uri.to_string();
284                Some(BoxProcessor::from_fn(move |ex| {
285                    order.lock().unwrap().push(uri.clone());
286                    Box::pin(async move { Ok(ex) })
287                }))
288            })
289        };
290
291        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
292            "mock:x|mock:y|mock:z".to_string()
293        }))
294        .delimiter("|");
295
296        let mut svc = RecipientListService::new(config, resolver).unwrap();
297        let ex = Exchange::new(Message::new("test"));
298        svc.ready().await.unwrap().call(ex).await.unwrap();
299
300        let order = order.lock().unwrap();
301        assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
302    }
303
304    #[tokio::test]
305    async fn recipient_list_expression_evaluated_once() {
306        let expr_count = Arc::new(AtomicUsize::new(0));
307        let expr_count_clone = expr_count.clone();
308
309        let config = RecipientListConfig::new(Arc::new(move |_ex: &Exchange| {
310            expr_count_clone.fetch_add(1, Ordering::SeqCst);
311            "mock:a,mock:b".to_string()
312        }));
313
314        let mut svc = RecipientListService::new(config, mock_resolver()).unwrap();
315        let ex = Exchange::new(Message::new("test"));
316        svc.ready().await.unwrap().call(ex).await.unwrap();
317
318        assert_eq!(
319            expr_count.load(Ordering::SeqCst),
320            1,
321            "Expression must be evaluated exactly once"
322        );
323    }
324
325    #[tokio::test]
326    async fn recipient_list_ignores_empty_uri_tokens() {
327        let call_count = Arc::new(AtomicUsize::new(0));
328        let call_count_clone = call_count.clone();
329
330        let resolver = Arc::new(move |uri: &str| {
331            if uri.starts_with("mock:") {
332                let count = call_count_clone.clone();
333                Some(BoxProcessor::from_fn(move |ex| {
334                    count.fetch_add(1, Ordering::SeqCst);
335                    Box::pin(async move { Ok(ex) })
336                }))
337            } else {
338                None
339            }
340        });
341
342        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
343            " ,mock:a, ,mock:b,, ".to_string()
344        }));
345
346        let mut svc = RecipientListService::new(config, resolver).unwrap();
347        let ex = Exchange::new(Message::new("test"));
348        let result = svc.ready().await.unwrap().call(ex).await;
349        assert!(result.is_ok());
350        assert_eq!(call_count.load(Ordering::SeqCst), 2);
351    }
352
353    #[tokio::test]
354    async fn recipient_list_mutation_between_steps() {
355        let resolver = Arc::new(|uri: &str| {
356            if uri == "mock:mutate" {
357                Some(BoxProcessor::from_fn(|mut ex| {
358                    ex.input.body = camel_api::Body::Text("mutated".to_string());
359                    Box::pin(async move { Ok(ex) })
360                }))
361            } else if uri == "mock:verify" {
362                Some(BoxProcessor::from_fn(|ex| {
363                    let body = ex.input.body.as_text().unwrap_or("").to_string();
364                    assert_eq!(body, "mutated");
365                    Box::pin(async move { Ok(ex) })
366                }))
367            } else {
368                None
369            }
370        });
371
372        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
373            "mock:mutate,mock:verify".to_string()
374        }));
375
376        let mut svc = RecipientListService::new(config, resolver).unwrap();
377        let ex = Exchange::new(Message::new("original"));
378        let result = svc.ready().await.unwrap().call(ex).await;
379
380        assert!(result.is_ok());
381    }
382
383    #[tokio::test]
384    async fn recipient_list_parallel_executes_concurrently() {
385        let records: Arc<Mutex<Vec<(String, Instant, Instant)>>> = Arc::new(Mutex::new(Vec::new()));
386
387        let resolver = {
388            let records = records.clone();
389            Arc::new(move |uri: &str| {
390                if uri.starts_with("mock:") {
391                    let records = records.clone();
392                    let uri = uri.to_string();
393                    Some(BoxProcessor::from_fn(move |ex| {
394                        let records = records.clone();
395                        let uri = uri.clone();
396                        Box::pin(async move {
397                            let start = Instant::now();
398                            sleep(Duration::from_millis(100)).await;
399                            let end = Instant::now();
400                            records.lock().await.push((uri, start, end));
401                            Ok(ex)
402                        })
403                    }))
404                } else {
405                    None
406                }
407            })
408        };
409
410        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
411            "mock:a,mock:b,mock:c".to_string()
412        }))
413        .parallel(true);
414
415        let mut svc = RecipientListService::new(config, resolver).unwrap();
416        let ex = Exchange::new(Message::new("test"));
417        svc.ready().await.unwrap().call(ex).await.unwrap();
418
419        let records = records.lock().await;
420        assert_eq!(records.len(), 3);
421
422        let mut overlap_found = false;
423        for i in 0..records.len() {
424            for j in (i + 1)..records.len() {
425                let (_, a_start, a_end) = records[i];
426                let (_, b_start, b_end) = records[j];
427                if a_start < b_end && b_start < a_end {
428                    overlap_found = true;
429                    break;
430                }
431            }
432            if overlap_found {
433                break;
434            }
435        }
436
437        assert!(overlap_found);
438    }
439
440    #[tokio::test]
441    async fn recipient_list_parallel_stop_on_exception_returns_error() {
442        let resolver = Arc::new(|uri: &str| {
443            if uri == "mock:err" {
444                Some(BoxProcessor::from_fn(|_ex| {
445                    Box::pin(async { Err(CamelError::ProcessorError("boom".to_string())) })
446                }))
447            } else if uri.starts_with("mock:") {
448                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
449            } else {
450                None
451            }
452        });
453
454        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
455            "mock:a,mock:err,mock:c".to_string()
456        }))
457        .parallel(true)
458        .stop_on_exception(true);
459
460        let mut svc = RecipientListService::new(config, resolver).unwrap();
461        let ex = Exchange::new(Message::new("test"));
462        let result = svc.ready().await.unwrap().call(ex).await;
463        assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
464    }
465
466    #[tokio::test]
467    async fn recipient_list_parallel_limit_respects_limit() {
468        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
469            "mock:a,mock:b,mock:c,mock:d".to_string()
470        }))
471        .parallel(true)
472        .parallel_limit(2);
473
474        let resolver = Arc::new(|uri: &str| {
475            if uri.starts_with("mock:") {
476                Some(BoxProcessor::from_fn(|ex| {
477                    Box::pin(async move {
478                        sleep(Duration::from_millis(100)).await;
479                        Ok(ex)
480                    })
481                }))
482            } else {
483                None
484            }
485        });
486
487        let mut svc = RecipientListService::new(config, resolver).unwrap();
488        let ex = Exchange::new(Message::new("test"));
489        let start = Instant::now();
490        svc.ready().await.unwrap().call(ex).await.unwrap();
491        let elapsed = start.elapsed();
492
493        assert!(elapsed >= Duration::from_millis(180));
494        assert!(elapsed < Duration::from_millis(350));
495    }
496
497    #[tokio::test]
498    async fn recipient_list_collect_all_strategy() {
499        let resolver = Arc::new(|uri: &str| {
500            if uri == "mock:a" {
501                Some(BoxProcessor::from_fn(|mut ex| {
502                    ex.input.body = Body::Text("a".to_string());
503                    Box::pin(async move { Ok(ex) })
504                }))
505            } else if uri == "mock:b" {
506                Some(BoxProcessor::from_fn(|mut ex| {
507                    ex.input.body = Body::Text("b".to_string());
508                    Box::pin(async move { Ok(ex) })
509                }))
510            } else if uri == "mock:c" {
511                Some(BoxProcessor::from_fn(|mut ex| {
512                    ex.input.body = Body::Text("c".to_string());
513                    Box::pin(async move { Ok(ex) })
514                }))
515            } else {
516                None
517            }
518        });
519
520        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
521            "mock:a,mock:b,mock:c".to_string()
522        }))
523        .strategy(MulticastStrategy::CollectAll);
524
525        let mut svc = RecipientListService::new(config, resolver).unwrap();
526        let ex = Exchange::new(Message::new("seed"));
527        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
528
529        assert_eq!(
530            result.input.body,
531            Body::from(Value::Array(vec![
532                Value::String("a".to_string()),
533                Value::String("b".to_string()),
534                Value::String("c".to_string()),
535            ]))
536        );
537    }
538
539    #[tokio::test]
540    async fn recipient_list_original_strategy() {
541        let resolver = Arc::new(|uri: &str| {
542            if uri.starts_with("mock:") {
543                let label = uri.to_string();
544                Some(BoxProcessor::from_fn(move |mut ex| {
545                    let label = label.clone();
546                    ex.input.body = Body::Text(format!("mutated-{label}"));
547                    Box::pin(async move { Ok(ex) })
548                }))
549            } else {
550                None
551            }
552        });
553
554        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
555            "mock:a,mock:b,mock:c".to_string()
556        }))
557        .strategy(MulticastStrategy::Original);
558
559        let mut svc = RecipientListService::new(config, resolver).unwrap();
560        let ex = Exchange::new(Message::new("original"));
561        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
562
563        assert_eq!(result.input.body.as_text(), Some("original"));
564    }
565
566    #[tokio::test]
567    async fn recipient_list_last_wins_strategy() {
568        let payloads: Arc<HashMap<String, String>> = Arc::new(HashMap::from([
569            ("mock:a".to_string(), "first".to_string()),
570            ("mock:b".to_string(), "second".to_string()),
571            ("mock:c".to_string(), "third".to_string()),
572        ]));
573
574        let resolver = {
575            let payloads = payloads.clone();
576            Arc::new(move |uri: &str| {
577                if let Some(payload) = payloads.get(uri) {
578                    let payload = payload.clone();
579                    Some(BoxProcessor::from_fn(move |mut ex| {
580                        let payload = payload.clone();
581                        ex.input.body = Body::Text(payload);
582                        Box::pin(async move { Ok(ex) })
583                    }))
584                } else {
585                    None
586                }
587            })
588        };
589
590        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
591            "mock:a,mock:b,mock:c".to_string()
592        }))
593        .strategy(MulticastStrategy::LastWins);
594
595        let mut svc = RecipientListService::new(config, resolver).unwrap();
596        let ex = Exchange::new(Message::new("seed"));
597        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
598
599        assert_eq!(result.input.body.as_text(), Some("third"));
600    }
601}