Skip to main content

camel_processor/
recipient_list.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tokio::task::JoinSet;
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig};
10use camel_api::recipient_list::RecipientListConfig;
11use camel_api::{Body, CamelError, Exchange, Value};
12
13use crate::endpoint_pipeline::EndpointPipelineService;
14
15#[derive(Clone)]
16pub struct RecipientListService {
17    config: RecipientListConfig,
18    pipeline: EndpointPipelineService,
19}
20
21impl RecipientListService {
22    pub fn new(
23        config: RecipientListConfig,
24        endpoint_resolver: camel_api::EndpointResolver,
25    ) -> Self {
26        let pipeline_config = EndpointPipelineConfig {
27            cache_size: EndpointPipelineConfig::from_signed(1000),
28            ignore_invalid_endpoints: false,
29        };
30        Self {
31            config,
32            pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
33        }
34    }
35}
36
37impl Service<Exchange> for RecipientListService {
38    type Response = Exchange;
39    type Error = CamelError;
40    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
41
42    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
43        Poll::Ready(Ok(()))
44    }
45
46    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
47        let config = self.config.clone();
48        let pipeline = self.pipeline.clone();
49
50        Box::pin(async move {
51            let uris_raw = (config.expression)(&exchange);
52            if uris_raw.is_empty() {
53                return Ok(exchange);
54            }
55
56            let uris: Vec<&str> = uris_raw
57                .split(&config.delimiter)
58                .map(|s| s.trim())
59                .filter(|s| !s.is_empty())
60                .collect();
61            if uris.is_empty() {
62                return Ok(exchange);
63            }
64
65            if config.parallel {
66                let original_for_aggregate = exchange.clone();
67                let mut endpoints_to_call = Vec::with_capacity(uris.len());
68                for uri in &uris {
69                    if let Some(endpoint) = pipeline.resolve(uri)? {
70                        endpoints_to_call.push((uri.to_string(), endpoint));
71                    }
72                }
73
74                let mut results: Vec<Exchange> = Vec::with_capacity(endpoints_to_call.len());
75                let mut join_set = JoinSet::new();
76                let mut iter = endpoints_to_call.into_iter();
77                let raw_limit = config.parallel_limit.unwrap_or(results.capacity());
78                let limit = raw_limit.max(1).min(results.capacity().max(1));
79
80                for _ in 0..limit {
81                    if let Some((uri, mut endpoint)) = iter.next() {
82                        let mut cloned = original_for_aggregate.clone();
83                        cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
84                        join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
85                    }
86                }
87
88                while let Some(result) = join_set.join_next().await {
89                    match result {
90                        Ok(Ok(ex)) => results.push(ex),
91                        Ok(Err(e)) if config.stop_on_exception => {
92                            join_set.abort_all();
93                            return Err(e);
94                        }
95                        _ => {}
96                    }
97
98                    if let Some((uri, mut endpoint)) = iter.next() {
99                        let mut cloned = original_for_aggregate.clone();
100                        cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
101                        join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
102                    }
103                }
104
105                exchange = aggregate_results(config.strategy, original_for_aggregate, results);
106            } else {
107                let mut results: Vec<Exchange> = Vec::new();
108                let original_for_aggregate = exchange.clone();
109                for uri in &uris {
110                    let endpoint = match pipeline.resolve(uri)? {
111                        Some(e) => e,
112                        None => continue,
113                    };
114                    exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
115                    let mut endpoint = endpoint;
116                    let result = endpoint.ready().await?.call(exchange.clone()).await;
117                    match result {
118                        Ok(ex) => {
119                            results.push(ex.clone());
120                            exchange = ex;
121                        }
122                        Err(e) if config.stop_on_exception => return Err(e),
123                        Err(_) => continue,
124                    }
125                }
126                exchange = aggregate_results(config.strategy, original_for_aggregate, results);
127            }
128
129            Ok(exchange)
130        })
131    }
132}
133
134fn aggregate_results(
135    strategy: camel_api::MulticastStrategy,
136    original: Exchange,
137    results: Vec<Exchange>,
138) -> Exchange {
139    match strategy {
140        camel_api::MulticastStrategy::LastWins => results.into_iter().last().unwrap_or(original),
141        camel_api::MulticastStrategy::Original => original,
142        camel_api::MulticastStrategy::CollectAll => {
143            let bodies: Vec<Value> = results
144                .iter()
145                .map(|ex| match &ex.input.body {
146                    Body::Text(s) => Value::String(s.clone()),
147                    Body::Json(v) => v.clone(),
148                    Body::Xml(s) => Value::String(s.clone()),
149                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
150                    Body::Empty => Value::Null,
151                    Body::Stream(s) => serde_json::json!({
152                        "_stream": {
153                            "origin": s.metadata.origin,
154                            "placeholder": true,
155                            "hint": "Materialize exchange body with .into_bytes() before recipient-list aggregation"
156                        }
157                    }),
158                })
159                .collect();
160            let mut result = results.into_iter().last().unwrap_or(original);
161            result.input.body = camel_api::Body::from(Value::Array(bodies));
162            result
163        }
164        camel_api::MulticastStrategy::Custom(fn_) => {
165            results.into_iter().fold(original, |acc, ex| fn_(acc, ex))
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use camel_api::MulticastStrategy;
174    use camel_api::{BoxProcessor, BoxProcessorExt, Message};
175    use std::collections::HashMap;
176    use std::sync::Arc;
177    use std::sync::atomic::{AtomicUsize, Ordering};
178    use std::time::{Duration, Instant};
179    use tokio::sync::Mutex;
180    use tokio::time::sleep;
181
182    fn mock_resolver() -> camel_api::EndpointResolver {
183        Arc::new(|uri: &str| {
184            if uri.starts_with("mock:") {
185                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
186            } else {
187                None
188            }
189        })
190    }
191
192    #[tokio::test]
193    async fn recipient_list_single_destination() {
194        let call_count = Arc::new(AtomicUsize::new(0));
195        let count_clone = call_count.clone();
196
197        let resolver = Arc::new(move |uri: &str| {
198            if uri == "mock:a" {
199                let count = count_clone.clone();
200                Some(BoxProcessor::from_fn(move |ex| {
201                    count.fetch_add(1, Ordering::SeqCst);
202                    Box::pin(async move { Ok(ex) })
203                }))
204            } else {
205                None
206            }
207        });
208
209        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| "mock:a".to_string()));
210
211        let mut svc = RecipientListService::new(config, resolver);
212        let ex = Exchange::new(Message::new("test"));
213        let result = svc.ready().await.unwrap().call(ex).await;
214
215        assert!(result.is_ok());
216        assert_eq!(call_count.load(Ordering::SeqCst), 1);
217    }
218
219    #[tokio::test]
220    async fn recipient_list_multiple_destinations() {
221        let call_count = Arc::new(AtomicUsize::new(0));
222        let count_clone = call_count.clone();
223
224        let resolver = Arc::new(move |uri: &str| {
225            if uri.starts_with("mock:") {
226                let count = count_clone.clone();
227                Some(BoxProcessor::from_fn(move |ex| {
228                    count.fetch_add(1, Ordering::SeqCst);
229                    Box::pin(async move { Ok(ex) })
230                }))
231            } else {
232                None
233            }
234        });
235
236        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
237            "mock:a,mock:b,mock:c".to_string()
238        }));
239
240        let mut svc = RecipientListService::new(config, resolver);
241        let ex = Exchange::new(Message::new("test"));
242        let result = svc.ready().await.unwrap().call(ex).await;
243
244        assert!(result.is_ok());
245        assert_eq!(call_count.load(Ordering::SeqCst), 3);
246    }
247
248    #[tokio::test]
249    async fn recipient_list_empty_expression() {
250        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| String::new()));
251
252        let mut svc = RecipientListService::new(config, mock_resolver());
253        let ex = Exchange::new(Message::new("test"));
254        let result = svc.ready().await.unwrap().call(ex).await;
255
256        assert!(result.is_ok());
257    }
258
259    #[tokio::test]
260    async fn recipient_list_invalid_endpoint_error() {
261        let config =
262            RecipientListConfig::new(Arc::new(|_ex: &Exchange| "invalid:endpoint".to_string()));
263
264        let mut svc = RecipientListService::new(config, mock_resolver());
265        let ex = Exchange::new(Message::new("test"));
266        let result = svc.ready().await.unwrap().call(ex).await;
267
268        assert!(result.is_err());
269        assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
270    }
271
272    #[tokio::test]
273    async fn recipient_list_custom_delimiter() {
274        use std::sync::Mutex;
275
276        let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
277
278        let resolver = {
279            let order = order.clone();
280            Arc::new(move |uri: &str| {
281                let order = order.clone();
282                let uri = uri.to_string();
283                Some(BoxProcessor::from_fn(move |ex| {
284                    order.lock().unwrap().push(uri.clone());
285                    Box::pin(async move { Ok(ex) })
286                }))
287            })
288        };
289
290        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
291            "mock:x|mock:y|mock:z".to_string()
292        }))
293        .delimiter("|");
294
295        let mut svc = RecipientListService::new(config, resolver);
296        let ex = Exchange::new(Message::new("test"));
297        svc.ready().await.unwrap().call(ex).await.unwrap();
298
299        let order = order.lock().unwrap();
300        assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
301    }
302
303    #[tokio::test]
304    async fn recipient_list_expression_evaluated_once() {
305        let expr_count = Arc::new(AtomicUsize::new(0));
306        let expr_count_clone = expr_count.clone();
307
308        let config = RecipientListConfig::new(Arc::new(move |_ex: &Exchange| {
309            expr_count_clone.fetch_add(1, Ordering::SeqCst);
310            "mock:a,mock:b".to_string()
311        }));
312
313        let mut svc = RecipientListService::new(config, mock_resolver());
314        let ex = Exchange::new(Message::new("test"));
315        svc.ready().await.unwrap().call(ex).await.unwrap();
316
317        assert_eq!(
318            expr_count.load(Ordering::SeqCst),
319            1,
320            "Expression must be evaluated exactly once"
321        );
322    }
323
324    #[tokio::test]
325    async fn recipient_list_mutation_between_steps() {
326        let resolver = Arc::new(|uri: &str| {
327            if uri == "mock:mutate" {
328                Some(BoxProcessor::from_fn(|mut ex| {
329                    ex.input.body = camel_api::Body::Text("mutated".to_string());
330                    Box::pin(async move { Ok(ex) })
331                }))
332            } else if uri == "mock:verify" {
333                Some(BoxProcessor::from_fn(|ex| {
334                    let body = ex.input.body.as_text().unwrap_or("").to_string();
335                    assert_eq!(body, "mutated");
336                    Box::pin(async move { Ok(ex) })
337                }))
338            } else {
339                None
340            }
341        });
342
343        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
344            "mock:mutate,mock:verify".to_string()
345        }));
346
347        let mut svc = RecipientListService::new(config, resolver);
348        let ex = Exchange::new(Message::new("original"));
349        let result = svc.ready().await.unwrap().call(ex).await;
350
351        assert!(result.is_ok());
352    }
353
354    #[tokio::test]
355    async fn recipient_list_parallel_executes_concurrently() {
356        let records: Arc<Mutex<Vec<(String, Instant, Instant)>>> = Arc::new(Mutex::new(Vec::new()));
357
358        let resolver = {
359            let records = records.clone();
360            Arc::new(move |uri: &str| {
361                if uri.starts_with("mock:") {
362                    let records = records.clone();
363                    let uri = uri.to_string();
364                    Some(BoxProcessor::from_fn(move |ex| {
365                        let records = records.clone();
366                        let uri = uri.clone();
367                        Box::pin(async move {
368                            let start = Instant::now();
369                            sleep(Duration::from_millis(100)).await;
370                            let end = Instant::now();
371                            records.lock().await.push((uri, start, end));
372                            Ok(ex)
373                        })
374                    }))
375                } else {
376                    None
377                }
378            })
379        };
380
381        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
382            "mock:a,mock:b,mock:c".to_string()
383        }))
384        .parallel(true);
385
386        let mut svc = RecipientListService::new(config, resolver);
387        let ex = Exchange::new(Message::new("test"));
388        svc.ready().await.unwrap().call(ex).await.unwrap();
389
390        let records = records.lock().await;
391        assert_eq!(records.len(), 3);
392
393        let mut overlap_found = false;
394        for i in 0..records.len() {
395            for j in (i + 1)..records.len() {
396                let (_, a_start, a_end) = records[i];
397                let (_, b_start, b_end) = records[j];
398                if a_start < b_end && b_start < a_end {
399                    overlap_found = true;
400                    break;
401                }
402            }
403            if overlap_found {
404                break;
405            }
406        }
407
408        assert!(overlap_found);
409    }
410
411    #[tokio::test]
412    async fn recipient_list_parallel_limit_respects_limit() {
413        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
414            "mock:a,mock:b,mock:c,mock:d".to_string()
415        }))
416        .parallel(true)
417        .parallel_limit(2);
418
419        let resolver = Arc::new(|uri: &str| {
420            if uri.starts_with("mock:") {
421                Some(BoxProcessor::from_fn(|ex| {
422                    Box::pin(async move {
423                        sleep(Duration::from_millis(100)).await;
424                        Ok(ex)
425                    })
426                }))
427            } else {
428                None
429            }
430        });
431
432        let mut svc = RecipientListService::new(config, resolver);
433        let ex = Exchange::new(Message::new("test"));
434        let start = Instant::now();
435        svc.ready().await.unwrap().call(ex).await.unwrap();
436        let elapsed = start.elapsed();
437
438        assert!(elapsed >= Duration::from_millis(180));
439        assert!(elapsed < Duration::from_millis(350));
440    }
441
442    #[tokio::test]
443    async fn recipient_list_collect_all_strategy() {
444        let resolver = Arc::new(|uri: &str| {
445            if uri == "mock:a" {
446                Some(BoxProcessor::from_fn(|mut ex| {
447                    ex.input.body = Body::Text("a".to_string());
448                    Box::pin(async move { Ok(ex) })
449                }))
450            } else if uri == "mock:b" {
451                Some(BoxProcessor::from_fn(|mut ex| {
452                    ex.input.body = Body::Text("b".to_string());
453                    Box::pin(async move { Ok(ex) })
454                }))
455            } else if uri == "mock:c" {
456                Some(BoxProcessor::from_fn(|mut ex| {
457                    ex.input.body = Body::Text("c".to_string());
458                    Box::pin(async move { Ok(ex) })
459                }))
460            } else {
461                None
462            }
463        });
464
465        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
466            "mock:a,mock:b,mock:c".to_string()
467        }))
468        .strategy(MulticastStrategy::CollectAll);
469
470        let mut svc = RecipientListService::new(config, resolver);
471        let ex = Exchange::new(Message::new("seed"));
472        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
473
474        assert_eq!(
475            result.input.body,
476            Body::from(Value::Array(vec![
477                Value::String("a".to_string()),
478                Value::String("b".to_string()),
479                Value::String("c".to_string()),
480            ]))
481        );
482    }
483
484    #[tokio::test]
485    async fn recipient_list_original_strategy() {
486        let resolver = Arc::new(|uri: &str| {
487            if uri.starts_with("mock:") {
488                let label = uri.to_string();
489                Some(BoxProcessor::from_fn(move |mut ex| {
490                    let label = label.clone();
491                    ex.input.body = Body::Text(format!("mutated-{label}"));
492                    Box::pin(async move { Ok(ex) })
493                }))
494            } else {
495                None
496            }
497        });
498
499        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
500            "mock:a,mock:b,mock:c".to_string()
501        }))
502        .strategy(MulticastStrategy::Original);
503
504        let mut svc = RecipientListService::new(config, resolver);
505        let ex = Exchange::new(Message::new("original"));
506        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
507
508        assert_eq!(result.input.body.as_text(), Some("original"));
509    }
510
511    #[tokio::test]
512    async fn recipient_list_last_wins_strategy() {
513        let payloads: Arc<HashMap<String, String>> = Arc::new(HashMap::from([
514            ("mock:a".to_string(), "first".to_string()),
515            ("mock:b".to_string(), "second".to_string()),
516            ("mock:c".to_string(), "third".to_string()),
517        ]));
518
519        let resolver = {
520            let payloads = payloads.clone();
521            Arc::new(move |uri: &str| {
522                if let Some(payload) = payloads.get(uri) {
523                    let payload = payload.clone();
524                    Some(BoxProcessor::from_fn(move |mut ex| {
525                        let payload = payload.clone();
526                        ex.input.body = Body::Text(payload);
527                        Box::pin(async move { Ok(ex) })
528                    }))
529                } else {
530                    None
531                }
532            })
533        };
534
535        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
536            "mock:a,mock:b,mock:c".to_string()
537        }))
538        .strategy(MulticastStrategy::LastWins);
539
540        let mut svc = RecipientListService::new(config, resolver);
541        let ex = Exchange::new(Message::new("seed"));
542        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
543
544        assert_eq!(result.input.body.as_text(), Some("third"));
545    }
546}