Skip to main content

camel_processor/
recipient_list.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tokio::task::JoinSet;
6use tower::Service;
7use tower::ServiceExt;
8
9use camel_api::endpoint_pipeline::{CAMEL_SLIP_ENDPOINT, EndpointPipelineConfig};
10use camel_api::recipient_list::RecipientListConfig;
11use camel_api::{Body, CamelError, Exchange, Value};
12
13use crate::endpoint_pipeline::EndpointPipelineService;
14
15#[derive(Clone)]
16pub struct RecipientListService {
17    config: RecipientListConfig,
18    pipeline: EndpointPipelineService,
19}
20
21impl RecipientListService {
22    pub fn new(
23        config: RecipientListConfig,
24        endpoint_resolver: camel_api::EndpointResolver,
25    ) -> Self {
26        let pipeline_config = EndpointPipelineConfig {
27            cache_size: EndpointPipelineConfig::from_signed(1000),
28            ignore_invalid_endpoints: false,
29        };
30        Self {
31            config,
32            pipeline: EndpointPipelineService::new(endpoint_resolver, pipeline_config),
33        }
34    }
35}
36
37impl Service<Exchange> for RecipientListService {
38    type Response = Exchange;
39    type Error = CamelError;
40    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
41
42    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
43        Poll::Ready(Ok(()))
44    }
45
46    fn call(&mut self, mut exchange: Exchange) -> Self::Future {
47        let config = self.config.clone();
48        let pipeline = self.pipeline.clone();
49
50        Box::pin(async move {
51            let uris_raw = (config.expression)(&exchange);
52            if uris_raw.is_empty() {
53                return Ok(exchange);
54            }
55
56            let uris: Vec<&str> = uris_raw
57                .split(&config.delimiter)
58                .map(|s| s.trim())
59                .filter(|s| !s.is_empty())
60                .collect();
61            if uris.is_empty() {
62                return Ok(exchange);
63            }
64
65            if config.parallel {
66                let original_for_aggregate = exchange.clone();
67                let mut endpoints_to_call = Vec::with_capacity(uris.len());
68                for uri in &uris {
69                    if let Some(endpoint) = pipeline.resolve(uri)? {
70                        endpoints_to_call.push((uri.to_string(), endpoint));
71                    }
72                }
73
74                let mut results: Vec<Exchange> = Vec::with_capacity(endpoints_to_call.len());
75                let mut join_set = JoinSet::new();
76                let mut iter = endpoints_to_call.into_iter();
77                let raw_limit = config.parallel_limit.unwrap_or(results.capacity());
78                let limit = raw_limit.max(1).min(results.capacity().max(1));
79
80                for _ in 0..limit {
81                    if let Some((uri, mut endpoint)) = iter.next() {
82                        let mut cloned = original_for_aggregate.clone();
83                        cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
84                        join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
85                    }
86                }
87
88                while let Some(result) = join_set.join_next().await {
89                    match result {
90                        Ok(Ok(ex)) => results.push(ex),
91                        Ok(Err(e)) if config.stop_on_exception => {
92                            join_set.abort_all();
93                            return Err(e);
94                        }
95                        _ => {}
96                    }
97
98                    if let Some((uri, mut endpoint)) = iter.next() {
99                        let mut cloned = original_for_aggregate.clone();
100                        cloned.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri));
101                        join_set.spawn(async move { endpoint.ready().await?.call(cloned).await });
102                    }
103                }
104
105                exchange = aggregate_results(config.strategy, original_for_aggregate, results);
106            } else {
107                let mut results: Vec<Exchange> = Vec::new();
108                let original_for_aggregate = exchange.clone();
109                for uri in &uris {
110                    let endpoint = match pipeline.resolve(uri)? {
111                        Some(e) => e,
112                        None => continue,
113                    };
114                    exchange.set_property(CAMEL_SLIP_ENDPOINT, Value::String(uri.to_string()));
115                    let mut endpoint = endpoint;
116                    let result = endpoint.ready().await?.call(exchange.clone()).await;
117                    match result {
118                        Ok(ex) => {
119                            results.push(ex.clone());
120                            exchange = ex;
121                        }
122                        Err(e) if config.stop_on_exception => return Err(e),
123                        Err(_) => continue,
124                    }
125                }
126                exchange = aggregate_results(config.strategy, original_for_aggregate, results);
127            }
128
129            Ok(exchange)
130        })
131    }
132}
133
134fn aggregate_results(
135    strategy: camel_api::MulticastStrategy,
136    original: Exchange,
137    results: Vec<Exchange>,
138) -> Exchange {
139    match strategy {
140        camel_api::MulticastStrategy::LastWins => results.into_iter().last().unwrap_or(original),
141        camel_api::MulticastStrategy::Original => original,
142        camel_api::MulticastStrategy::CollectAll => {
143            let bodies: Vec<Value> = results
144                .iter()
145                .map(|ex| match &ex.input.body {
146                    Body::Text(s) => Value::String(s.clone()),
147                    Body::Json(v) => v.clone(),
148                    Body::Xml(s) => Value::String(s.clone()),
149                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
150                    Body::Empty => Value::Null,
151                    Body::Stream(s) => serde_json::json!({
152                        "_stream": {
153                            "origin": s.metadata.origin,
154                            "placeholder": true,
155                            "hint": "Materialize exchange body with .into_bytes() before recipient-list aggregation"
156                        }
157                    }),
158                })
159                .collect();
160            let mut result = results.into_iter().last().unwrap_or(original);
161            result.input.body = camel_api::Body::from(Value::Array(bodies));
162            result
163        }
164        camel_api::MulticastStrategy::Custom(fn_) => {
165            results.into_iter().fold(original, |acc, ex| fn_(acc, ex))
166        }
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use camel_api::MulticastStrategy;
174    use camel_api::{BoxProcessor, BoxProcessorExt, CamelError, Message};
175    use std::collections::HashMap;
176    use std::sync::Arc;
177    use std::sync::atomic::{AtomicUsize, Ordering};
178    use std::time::{Duration, Instant};
179    use tokio::sync::Mutex;
180    use tokio::time::sleep;
181
182    fn mock_resolver() -> camel_api::EndpointResolver {
183        Arc::new(|uri: &str| {
184            if uri.starts_with("mock:") {
185                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
186            } else {
187                None
188            }
189        })
190    }
191
192    #[tokio::test]
193    async fn recipient_list_single_destination() {
194        let call_count = Arc::new(AtomicUsize::new(0));
195        let count_clone = call_count.clone();
196
197        let resolver = Arc::new(move |uri: &str| {
198            if uri == "mock:a" {
199                let count = count_clone.clone();
200                Some(BoxProcessor::from_fn(move |ex| {
201                    count.fetch_add(1, Ordering::SeqCst);
202                    Box::pin(async move { Ok(ex) })
203                }))
204            } else {
205                None
206            }
207        });
208
209        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| "mock:a".to_string()));
210
211        let mut svc = RecipientListService::new(config, resolver);
212        let ex = Exchange::new(Message::new("test"));
213        let result = svc.ready().await.unwrap().call(ex).await;
214
215        assert!(result.is_ok());
216        assert_eq!(call_count.load(Ordering::SeqCst), 1);
217    }
218
219    #[tokio::test]
220    async fn recipient_list_multiple_destinations() {
221        let call_count = Arc::new(AtomicUsize::new(0));
222        let count_clone = call_count.clone();
223
224        let resolver = Arc::new(move |uri: &str| {
225            if uri.starts_with("mock:") {
226                let count = count_clone.clone();
227                Some(BoxProcessor::from_fn(move |ex| {
228                    count.fetch_add(1, Ordering::SeqCst);
229                    Box::pin(async move { Ok(ex) })
230                }))
231            } else {
232                None
233            }
234        });
235
236        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
237            "mock:a,mock:b,mock:c".to_string()
238        }));
239
240        let mut svc = RecipientListService::new(config, resolver);
241        let ex = Exchange::new(Message::new("test"));
242        let result = svc.ready().await.unwrap().call(ex).await;
243
244        assert!(result.is_ok());
245        assert_eq!(call_count.load(Ordering::SeqCst), 3);
246    }
247
248    #[tokio::test]
249    async fn recipient_list_empty_expression() {
250        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| String::new()));
251
252        let mut svc = RecipientListService::new(config, mock_resolver());
253        let ex = Exchange::new(Message::new("test"));
254        let result = svc.ready().await.unwrap().call(ex).await;
255
256        assert!(result.is_ok());
257    }
258
259    #[tokio::test]
260    async fn recipient_list_invalid_endpoint_error() {
261        let config =
262            RecipientListConfig::new(Arc::new(|_ex: &Exchange| "invalid:endpoint".to_string()));
263
264        let mut svc = RecipientListService::new(config, mock_resolver());
265        let ex = Exchange::new(Message::new("test"));
266        let result = svc.ready().await.unwrap().call(ex).await;
267
268        assert!(result.is_err());
269        assert!(result.unwrap_err().to_string().contains("Invalid endpoint"));
270    }
271
272    #[tokio::test]
273    async fn recipient_list_custom_delimiter() {
274        use std::sync::Mutex;
275
276        let order: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
277
278        let resolver = {
279            let order = order.clone();
280            Arc::new(move |uri: &str| {
281                let order = order.clone();
282                let uri = uri.to_string();
283                Some(BoxProcessor::from_fn(move |ex| {
284                    order.lock().unwrap().push(uri.clone());
285                    Box::pin(async move { Ok(ex) })
286                }))
287            })
288        };
289
290        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
291            "mock:x|mock:y|mock:z".to_string()
292        }))
293        .delimiter("|");
294
295        let mut svc = RecipientListService::new(config, resolver);
296        let ex = Exchange::new(Message::new("test"));
297        svc.ready().await.unwrap().call(ex).await.unwrap();
298
299        let order = order.lock().unwrap();
300        assert_eq!(*order, vec!["mock:x", "mock:y", "mock:z"]);
301    }
302
303    #[tokio::test]
304    async fn recipient_list_expression_evaluated_once() {
305        let expr_count = Arc::new(AtomicUsize::new(0));
306        let expr_count_clone = expr_count.clone();
307
308        let config = RecipientListConfig::new(Arc::new(move |_ex: &Exchange| {
309            expr_count_clone.fetch_add(1, Ordering::SeqCst);
310            "mock:a,mock:b".to_string()
311        }));
312
313        let mut svc = RecipientListService::new(config, mock_resolver());
314        let ex = Exchange::new(Message::new("test"));
315        svc.ready().await.unwrap().call(ex).await.unwrap();
316
317        assert_eq!(
318            expr_count.load(Ordering::SeqCst),
319            1,
320            "Expression must be evaluated exactly once"
321        );
322    }
323
324    #[tokio::test]
325    async fn recipient_list_ignores_empty_uri_tokens() {
326        let call_count = Arc::new(AtomicUsize::new(0));
327        let call_count_clone = call_count.clone();
328
329        let resolver = Arc::new(move |uri: &str| {
330            if uri.starts_with("mock:") {
331                let count = call_count_clone.clone();
332                Some(BoxProcessor::from_fn(move |ex| {
333                    count.fetch_add(1, Ordering::SeqCst);
334                    Box::pin(async move { Ok(ex) })
335                }))
336            } else {
337                None
338            }
339        });
340
341        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
342            " ,mock:a, ,mock:b,, ".to_string()
343        }));
344
345        let mut svc = RecipientListService::new(config, resolver);
346        let ex = Exchange::new(Message::new("test"));
347        let result = svc.ready().await.unwrap().call(ex).await;
348        assert!(result.is_ok());
349        assert_eq!(call_count.load(Ordering::SeqCst), 2);
350    }
351
352    #[tokio::test]
353    async fn recipient_list_mutation_between_steps() {
354        let resolver = Arc::new(|uri: &str| {
355            if uri == "mock:mutate" {
356                Some(BoxProcessor::from_fn(|mut ex| {
357                    ex.input.body = camel_api::Body::Text("mutated".to_string());
358                    Box::pin(async move { Ok(ex) })
359                }))
360            } else if uri == "mock:verify" {
361                Some(BoxProcessor::from_fn(|ex| {
362                    let body = ex.input.body.as_text().unwrap_or("").to_string();
363                    assert_eq!(body, "mutated");
364                    Box::pin(async move { Ok(ex) })
365                }))
366            } else {
367                None
368            }
369        });
370
371        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
372            "mock:mutate,mock:verify".to_string()
373        }));
374
375        let mut svc = RecipientListService::new(config, resolver);
376        let ex = Exchange::new(Message::new("original"));
377        let result = svc.ready().await.unwrap().call(ex).await;
378
379        assert!(result.is_ok());
380    }
381
382    #[tokio::test]
383    async fn recipient_list_parallel_executes_concurrently() {
384        let records: Arc<Mutex<Vec<(String, Instant, Instant)>>> = Arc::new(Mutex::new(Vec::new()));
385
386        let resolver = {
387            let records = records.clone();
388            Arc::new(move |uri: &str| {
389                if uri.starts_with("mock:") {
390                    let records = records.clone();
391                    let uri = uri.to_string();
392                    Some(BoxProcessor::from_fn(move |ex| {
393                        let records = records.clone();
394                        let uri = uri.clone();
395                        Box::pin(async move {
396                            let start = Instant::now();
397                            sleep(Duration::from_millis(100)).await;
398                            let end = Instant::now();
399                            records.lock().await.push((uri, start, end));
400                            Ok(ex)
401                        })
402                    }))
403                } else {
404                    None
405                }
406            })
407        };
408
409        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
410            "mock:a,mock:b,mock:c".to_string()
411        }))
412        .parallel(true);
413
414        let mut svc = RecipientListService::new(config, resolver);
415        let ex = Exchange::new(Message::new("test"));
416        svc.ready().await.unwrap().call(ex).await.unwrap();
417
418        let records = records.lock().await;
419        assert_eq!(records.len(), 3);
420
421        let mut overlap_found = false;
422        for i in 0..records.len() {
423            for j in (i + 1)..records.len() {
424                let (_, a_start, a_end) = records[i];
425                let (_, b_start, b_end) = records[j];
426                if a_start < b_end && b_start < a_end {
427                    overlap_found = true;
428                    break;
429                }
430            }
431            if overlap_found {
432                break;
433            }
434        }
435
436        assert!(overlap_found);
437    }
438
439    #[tokio::test]
440    async fn recipient_list_parallel_stop_on_exception_returns_error() {
441        let resolver = Arc::new(|uri: &str| {
442            if uri == "mock:err" {
443                Some(BoxProcessor::from_fn(|_ex| {
444                    Box::pin(async { Err(CamelError::ProcessorError("boom".to_string())) })
445                }))
446            } else if uri.starts_with("mock:") {
447                Some(BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) })))
448            } else {
449                None
450            }
451        });
452
453        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
454            "mock:a,mock:err,mock:c".to_string()
455        }))
456        .parallel(true)
457        .stop_on_exception(true);
458
459        let mut svc = RecipientListService::new(config, resolver);
460        let ex = Exchange::new(Message::new("test"));
461        let result = svc.ready().await.unwrap().call(ex).await;
462        assert!(matches!(result, Err(CamelError::ProcessorError(msg)) if msg == "boom"));
463    }
464
465    #[tokio::test]
466    async fn recipient_list_parallel_limit_respects_limit() {
467        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
468            "mock:a,mock:b,mock:c,mock:d".to_string()
469        }))
470        .parallel(true)
471        .parallel_limit(2);
472
473        let resolver = Arc::new(|uri: &str| {
474            if uri.starts_with("mock:") {
475                Some(BoxProcessor::from_fn(|ex| {
476                    Box::pin(async move {
477                        sleep(Duration::from_millis(100)).await;
478                        Ok(ex)
479                    })
480                }))
481            } else {
482                None
483            }
484        });
485
486        let mut svc = RecipientListService::new(config, resolver);
487        let ex = Exchange::new(Message::new("test"));
488        let start = Instant::now();
489        svc.ready().await.unwrap().call(ex).await.unwrap();
490        let elapsed = start.elapsed();
491
492        assert!(elapsed >= Duration::from_millis(180));
493        assert!(elapsed < Duration::from_millis(350));
494    }
495
496    #[tokio::test]
497    async fn recipient_list_collect_all_strategy() {
498        let resolver = Arc::new(|uri: &str| {
499            if uri == "mock:a" {
500                Some(BoxProcessor::from_fn(|mut ex| {
501                    ex.input.body = Body::Text("a".to_string());
502                    Box::pin(async move { Ok(ex) })
503                }))
504            } else if uri == "mock:b" {
505                Some(BoxProcessor::from_fn(|mut ex| {
506                    ex.input.body = Body::Text("b".to_string());
507                    Box::pin(async move { Ok(ex) })
508                }))
509            } else if uri == "mock:c" {
510                Some(BoxProcessor::from_fn(|mut ex| {
511                    ex.input.body = Body::Text("c".to_string());
512                    Box::pin(async move { Ok(ex) })
513                }))
514            } else {
515                None
516            }
517        });
518
519        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
520            "mock:a,mock:b,mock:c".to_string()
521        }))
522        .strategy(MulticastStrategy::CollectAll);
523
524        let mut svc = RecipientListService::new(config, resolver);
525        let ex = Exchange::new(Message::new("seed"));
526        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
527
528        assert_eq!(
529            result.input.body,
530            Body::from(Value::Array(vec![
531                Value::String("a".to_string()),
532                Value::String("b".to_string()),
533                Value::String("c".to_string()),
534            ]))
535        );
536    }
537
538    #[tokio::test]
539    async fn recipient_list_original_strategy() {
540        let resolver = Arc::new(|uri: &str| {
541            if uri.starts_with("mock:") {
542                let label = uri.to_string();
543                Some(BoxProcessor::from_fn(move |mut ex| {
544                    let label = label.clone();
545                    ex.input.body = Body::Text(format!("mutated-{label}"));
546                    Box::pin(async move { Ok(ex) })
547                }))
548            } else {
549                None
550            }
551        });
552
553        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
554            "mock:a,mock:b,mock:c".to_string()
555        }))
556        .strategy(MulticastStrategy::Original);
557
558        let mut svc = RecipientListService::new(config, resolver);
559        let ex = Exchange::new(Message::new("original"));
560        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
561
562        assert_eq!(result.input.body.as_text(), Some("original"));
563    }
564
565    #[tokio::test]
566    async fn recipient_list_last_wins_strategy() {
567        let payloads: Arc<HashMap<String, String>> = Arc::new(HashMap::from([
568            ("mock:a".to_string(), "first".to_string()),
569            ("mock:b".to_string(), "second".to_string()),
570            ("mock:c".to_string(), "third".to_string()),
571        ]));
572
573        let resolver = {
574            let payloads = payloads.clone();
575            Arc::new(move |uri: &str| {
576                if let Some(payload) = payloads.get(uri) {
577                    let payload = payload.clone();
578                    Some(BoxProcessor::from_fn(move |mut ex| {
579                        let payload = payload.clone();
580                        ex.input.body = Body::Text(payload);
581                        Box::pin(async move { Ok(ex) })
582                    }))
583                } else {
584                    None
585                }
586            })
587        };
588
589        let config = RecipientListConfig::new(Arc::new(|_ex: &Exchange| {
590            "mock:a,mock:b,mock:c".to_string()
591        }))
592        .strategy(MulticastStrategy::LastWins);
593
594        let mut svc = RecipientListService::new(config, resolver);
595        let ex = Exchange::new(Message::new("seed"));
596        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
597
598        assert_eq!(result.input.body.as_text(), Some("third"));
599    }
600}