Skip to main content

camel_processor/
multicast.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8    Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11// ── Metadata property keys ─────────────────────────────────────────────
12
13/// Property key for the zero-based index of the endpoint being invoked.
14pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15/// Property key indicating whether this is the last endpoint invocation.
16pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18// ── MulticastService ───────────────────────────────────────────────────
19
20/// Tower Service implementing the Multicast EIP.
21///
22/// Sends a message to multiple endpoints, processing each independently,
23/// and then aggregating the results.
24///
25/// Supports both sequential and parallel processing modes, configurable
26/// via [`MulticastConfig::parallel`]. When parallel mode is enabled,
27/// all endpoints are invoked concurrently with optional concurrency
28/// limiting via [`MulticastConfig::parallel_limit`].
29#[derive(Clone)]
30pub struct MulticastService {
31    endpoints: Vec<BoxProcessor>,
32    config: MulticastConfig,
33}
34
35impl MulticastService {
36    /// Create a new `MulticastService` from a list of endpoints and a [`MulticastConfig`].
37    pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Self {
38        if config.parallel
39            && let Some(limit) = config.parallel_limit
40        {
41            assert!(limit > 0, "parallel_limit must be > 0");
42        }
43        Self { endpoints, config }
44    }
45}
46
47impl Service<Exchange> for MulticastService {
48    type Response = Exchange;
49    type Error = CamelError;
50    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
51
52    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53        // Check all endpoints for readiness
54        for endpoint in &mut self.endpoints {
55            match endpoint.poll_ready(cx) {
56                Poll::Pending => return Poll::Pending,
57                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
58                Poll::Ready(Ok(())) => {}
59            }
60        }
61        Poll::Ready(Ok(()))
62    }
63
64    fn call(&mut self, exchange: Exchange) -> Self::Future {
65        let original = exchange.clone();
66        let endpoints = self.endpoints.clone();
67        let config = self.config.clone();
68
69        Box::pin(async move {
70            // If no endpoints, return original exchange unchanged
71            if endpoints.is_empty() {
72                return Ok(original);
73            }
74
75            let total = endpoints.len();
76
77            let results = if config.parallel {
78                // Process endpoints in parallel
79                process_parallel(exchange, endpoints, config.parallel_limit, total).await
80            } else {
81                // Process each endpoint sequentially
82                process_sequential(exchange, endpoints, config.stop_on_exception, total).await
83            };
84
85            // Aggregate results per strategy
86            aggregate(results, original, config.aggregation)
87        })
88    }
89}
90
91// ── Sequential processing ──────────────────────────────────────────────
92
93async fn process_sequential(
94    exchange: Exchange,
95    endpoints: Vec<BoxProcessor>,
96    stop_on_exception: bool,
97    total: usize,
98) -> Vec<Result<Exchange, CamelError>> {
99    let mut results = Vec::with_capacity(endpoints.len());
100
101    for (i, endpoint) in endpoints.into_iter().enumerate() {
102        // Clone the exchange for each endpoint
103        let mut cloned_exchange = exchange.clone();
104
105        // Set multicast metadata properties
106        cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
107        cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
108
109        let mut endpoint = endpoint;
110        match tower::ServiceExt::ready(&mut endpoint).await {
111            Err(e) => {
112                results.push(Err(e));
113                if stop_on_exception {
114                    break;
115                }
116            }
117            Ok(svc) => {
118                let result = svc.call(cloned_exchange).await;
119                let is_err = result.is_err();
120                results.push(result);
121                if stop_on_exception && is_err {
122                    break;
123                }
124            }
125        }
126    }
127
128    results
129}
130
131// ── Parallel processing ────────────────────────────────────────────────
132
133async fn process_parallel(
134    exchange: Exchange,
135    endpoints: Vec<BoxProcessor>,
136    parallel_limit: Option<usize>,
137    total: usize,
138) -> Vec<Result<Exchange, CamelError>> {
139    use std::sync::Arc;
140    use tokio::sync::Semaphore;
141
142    let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
143
144    // Build futures for each endpoint
145    let futures: Vec<_> = endpoints
146        .into_iter()
147        .enumerate()
148        .map(|(i, mut endpoint)| {
149            let mut ex = exchange.clone();
150            ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
151            ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
152            let sem = semaphore.clone();
153            async move {
154                // Acquire semaphore permit if limit is set
155                let _permit = match &sem {
156                    Some(s) => match s.acquire().await {
157                        Ok(p) => Some(p),
158                        Err(_) => {
159                            return Err(CamelError::ProcessorError("semaphore closed".to_string()));
160                        }
161                    },
162                    None => None,
163                };
164
165                // Wait for endpoint to be ready, then call it
166                tower::ServiceExt::ready(&mut endpoint).await?;
167                endpoint.call(ex).await
168            }
169        })
170        .collect();
171
172    // Execute all futures concurrently and collect results
173    futures::future::join_all(futures).await
174}
175
176// ── Aggregation ────────────────────────────────────────────────────────
177
178fn aggregate(
179    results: Vec<Result<Exchange, CamelError>>,
180    original: Exchange,
181    strategy: MulticastStrategy,
182) -> Result<Exchange, CamelError> {
183    match strategy {
184        MulticastStrategy::LastWins => {
185            // Return the last result (error or success).
186            // If last result is Err and stop_on_exception=false, return that error.
187            results.into_iter().last().unwrap_or_else(|| Ok(original))
188        }
189        MulticastStrategy::CollectAll => {
190            // Collect all bodies into a JSON array. Errors propagate.
191            let mut bodies = Vec::new();
192            for result in results {
193                let ex = result?;
194                let value = match &ex.input.body {
195                    Body::Text(s) => Value::String(s.clone()),
196                    Body::Json(v) => v.clone(),
197                    Body::Xml(s) => Value::String(s.clone()),
198                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
199                    Body::Empty => Value::Null,
200                    Body::Stream(s) => serde_json::json!({
201                        "_stream": {
202                            "origin": s.metadata.origin,
203                            "placeholder": true,
204                            "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
205                        }
206                    }),
207                };
208                bodies.push(value);
209            }
210            let mut out = original;
211            out.input.body = Body::Json(Value::Array(bodies));
212            Ok(out)
213        }
214        MulticastStrategy::Original => Ok(original),
215        MulticastStrategy::Custom(fold_fn) => {
216            // Fold using the custom function, starting from the first result.
217            let mut iter = results.into_iter();
218            let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
219            iter.try_fold(first, |acc, next_result| {
220                let next = next_result?;
221                Ok(fold_fn(acc, next))
222            })
223        }
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use camel_api::{BoxProcessorExt, Message};
231    use std::sync::Arc;
232    use std::sync::atomic::Ordering;
233    use tower::ServiceExt;
234
235    // ── Test helpers ───────────────────────────────────────────────────
236
237    fn make_exchange(body: &str) -> Exchange {
238        Exchange::new(Message::new(body))
239    }
240
241    fn uppercase_processor() -> BoxProcessor {
242        BoxProcessor::from_fn(|mut ex: Exchange| {
243            Box::pin(async move {
244                if let Body::Text(s) = &ex.input.body {
245                    ex.input.body = Body::Text(s.to_uppercase());
246                }
247                Ok(ex)
248            })
249        })
250    }
251
252    fn failing_processor() -> BoxProcessor {
253        BoxProcessor::from_fn(|_ex| {
254            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
255        })
256    }
257
258    #[test]
259    fn test_multicast_zero_parallel_limit_rejected() {
260        let config = MulticastConfig::new().parallel(true).parallel_limit(0);
261        let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
262            MulticastService::new(vec![passthrough_processor()], config);
263        }));
264        assert!(result.is_err(), "zero parallel_limit should panic");
265    }
266
267    fn passthrough_processor() -> BoxProcessor {
268        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
269    }
270
271    // ── 1. Sequential + LastWins ───────────────────────────────────────
272
273    #[tokio::test]
274    async fn test_multicast_sequential_last_wins() {
275        let endpoints = vec![
276            uppercase_processor(),
277            uppercase_processor(),
278            uppercase_processor(),
279        ];
280
281        let config = MulticastConfig::new(); // LastWins by default
282        let mut svc = MulticastService::new(endpoints, config);
283
284        let result = svc
285            .ready()
286            .await
287            .unwrap()
288            .call(make_exchange("hello"))
289            .await
290            .unwrap();
291
292        assert_eq!(result.input.body.as_text(), Some("HELLO"));
293    }
294
295    // ── 2. Sequential + CollectAll ─────────────────────────────────────
296
297    #[tokio::test]
298    async fn test_multicast_sequential_collect_all() {
299        let endpoints = vec![
300            uppercase_processor(),
301            uppercase_processor(),
302            uppercase_processor(),
303        ];
304
305        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
306        let mut svc = MulticastService::new(endpoints, config);
307
308        let result = svc
309            .ready()
310            .await
311            .unwrap()
312            .call(make_exchange("hello"))
313            .await
314            .unwrap();
315
316        let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
317        match &result.input.body {
318            Body::Json(v) => assert_eq!(*v, expected),
319            other => panic!("expected JSON body, got {other:?}"),
320        }
321    }
322
323    // ── 3. Sequential + Original ───────────────────────────────────────
324
325    #[tokio::test]
326    async fn test_multicast_sequential_original() {
327        let endpoints = vec![
328            uppercase_processor(),
329            uppercase_processor(),
330            uppercase_processor(),
331        ];
332
333        let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
334        let mut svc = MulticastService::new(endpoints, config);
335
336        let result = svc
337            .ready()
338            .await
339            .unwrap()
340            .call(make_exchange("hello"))
341            .await
342            .unwrap();
343
344        // Original body should be unchanged
345        assert_eq!(result.input.body.as_text(), Some("hello"));
346    }
347
348    // ── 4. Sequential + Custom aggregation ─────────────────────────────
349
350    #[tokio::test]
351    async fn test_multicast_sequential_custom_aggregation() {
352        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
353            Arc::new(|mut acc: Exchange, next: Exchange| {
354                let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
355                let next_text = next.input.body.as_text().unwrap_or("").to_string();
356                acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
357                acc
358            });
359
360        let endpoints = vec![
361            uppercase_processor(),
362            uppercase_processor(),
363            uppercase_processor(),
364        ];
365
366        let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
367        let mut svc = MulticastService::new(endpoints, config);
368
369        let result = svc
370            .ready()
371            .await
372            .unwrap()
373            .call(make_exchange("a"))
374            .await
375            .unwrap();
376
377        assert_eq!(result.input.body.as_text(), Some("A+A+A"));
378    }
379
380    // ── 5. Stop on exception ───────────────────────────────────────────
381
382    #[tokio::test]
383    async fn test_multicast_stop_on_exception() {
384        let endpoints = vec![
385            uppercase_processor(),
386            failing_processor(),
387            uppercase_processor(),
388        ];
389
390        let config = MulticastConfig::new().stop_on_exception(true);
391        let mut svc = MulticastService::new(endpoints, config);
392
393        let result = svc
394            .ready()
395            .await
396            .unwrap()
397            .call(make_exchange("hello"))
398            .await;
399
400        assert!(result.is_err(), "expected error due to stop_on_exception");
401    }
402
403    // ── 6. Continue on exception ───────────────────────────────────────
404
405    #[tokio::test]
406    async fn test_multicast_continue_on_exception() {
407        let endpoints = vec![
408            uppercase_processor(),
409            failing_processor(),
410            uppercase_processor(),
411        ];
412
413        let config = MulticastConfig::new()
414            .stop_on_exception(false)
415            .aggregation(MulticastStrategy::LastWins);
416        let mut svc = MulticastService::new(endpoints, config);
417
418        let result = svc
419            .ready()
420            .await
421            .unwrap()
422            .call(make_exchange("hello"))
423            .await;
424
425        // LastWins: last endpoint succeeded, so result should be OK
426        assert!(result.is_ok(), "last endpoint should succeed");
427        assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
428    }
429
430    // ── 7. Stop on exception with fail_on_nth ─────────────────────────────
431
432    #[tokio::test]
433    async fn test_multicast_stop_on_exception_halts_early() {
434        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
435
436        // Track which endpoints actually execute
437        let executed = Arc::new(AtomicUsize::new(0));
438
439        let exec_clone1 = Arc::clone(&executed);
440        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
441            let e = Arc::clone(&exec_clone1);
442            Box::pin(async move {
443                e.fetch_add(1, AtomicOrdering::SeqCst);
444                Ok(ex)
445            })
446        });
447
448        let exec_clone2 = Arc::clone(&executed);
449        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
450            let e = Arc::clone(&exec_clone2);
451            Box::pin(async move {
452                e.fetch_add(1, AtomicOrdering::SeqCst);
453                Err(CamelError::ProcessorError("fail on 1".into()))
454            })
455        });
456
457        let exec_clone3 = Arc::clone(&executed);
458        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
459            let e = Arc::clone(&exec_clone3);
460            Box::pin(async move {
461                e.fetch_add(1, AtomicOrdering::SeqCst);
462                Ok(ex)
463            })
464        });
465
466        let endpoints = vec![endpoint0, endpoint1, endpoint2];
467        let config = MulticastConfig::new().stop_on_exception(true);
468        let mut svc = MulticastService::new(endpoints, config);
469
470        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
471        assert!(result.is_err(), "should fail at endpoint 1");
472
473        // Only endpoints 0 and 1 should have executed (2 should be skipped)
474        let count = executed.load(AtomicOrdering::SeqCst);
475        assert_eq!(
476            count, 2,
477            "endpoint 2 should not have executed due to stop_on_exception"
478        );
479    }
480
481    // ── 8. Continue on exception with fail_on_nth ─────────────────────────
482
483    #[tokio::test]
484    async fn test_multicast_continue_on_exception_executes_all() {
485        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
486
487        // Track which endpoints actually execute
488        let executed = Arc::new(AtomicUsize::new(0));
489
490        let exec_clone1 = Arc::clone(&executed);
491        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
492            let e = Arc::clone(&exec_clone1);
493            Box::pin(async move {
494                e.fetch_add(1, AtomicOrdering::SeqCst);
495                Ok(ex)
496            })
497        });
498
499        let exec_clone2 = Arc::clone(&executed);
500        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
501            let e = Arc::clone(&exec_clone2);
502            Box::pin(async move {
503                e.fetch_add(1, AtomicOrdering::SeqCst);
504                Err(CamelError::ProcessorError("fail on 1".into()))
505            })
506        });
507
508        let exec_clone3 = Arc::clone(&executed);
509        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
510            let e = Arc::clone(&exec_clone3);
511            Box::pin(async move {
512                e.fetch_add(1, AtomicOrdering::SeqCst);
513                Ok(ex)
514            })
515        });
516
517        let endpoints = vec![endpoint0, endpoint1, endpoint2];
518        let config = MulticastConfig::new()
519            .stop_on_exception(false)
520            .aggregation(MulticastStrategy::LastWins);
521        let mut svc = MulticastService::new(endpoints, config);
522
523        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
524        assert!(result.is_ok(), "last endpoint should succeed");
525
526        // All 3 endpoints should have executed
527        let count = executed.load(AtomicOrdering::SeqCst);
528        assert_eq!(
529            count, 3,
530            "all endpoints should have executed despite error in endpoint 1"
531        );
532    }
533
534    // ── 9. Empty endpoints ─────────────────────────────────────────────
535
536    #[tokio::test]
537    async fn test_multicast_empty_endpoints() {
538        let endpoints: Vec<BoxProcessor> = vec![];
539
540        let config = MulticastConfig::new();
541        let mut svc = MulticastService::new(endpoints, config);
542
543        let mut ex = make_exchange("hello");
544        ex.set_property("marker", Value::Bool(true));
545
546        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
547        assert_eq!(result.input.body.as_text(), Some("hello"));
548        assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
549    }
550
551    // ── 10. Metadata properties ─────────────────────────────────────────
552
553    #[tokio::test]
554    async fn test_multicast_metadata_properties() {
555        // Use a pipeline that records the metadata into the body as JSON
556        let recorder = BoxProcessor::from_fn(|ex: Exchange| {
557            Box::pin(async move {
558                let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
559                let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
560                let body = serde_json::json!({
561                    "index": idx,
562                    "complete": complete,
563                });
564                let mut out = ex;
565                out.input.body = Body::Json(body);
566                Ok(out)
567            })
568        });
569
570        let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
571
572        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
573        let mut svc = MulticastService::new(endpoints, config);
574
575        let result = svc
576            .ready()
577            .await
578            .unwrap()
579            .call(make_exchange("x"))
580            .await
581            .unwrap();
582
583        let expected = serde_json::json!([
584            {"index": 0, "complete": false},
585            {"index": 1, "complete": false},
586            {"index": 2, "complete": true},
587        ]);
588        match &result.input.body {
589            Body::Json(v) => assert_eq!(*v, expected),
590            other => panic!("expected JSON body, got {other:?}"),
591        }
592    }
593
594    // ── 11. poll_ready delegates to endpoints ────────────────────────────
595
596    #[tokio::test]
597    async fn test_poll_ready_delegates_to_endpoints() {
598        use std::sync::atomic::AtomicBool;
599
600        // A service that is initially not ready, then becomes ready.
601        #[derive(Clone)]
602        struct DelayedReady {
603            ready: Arc<AtomicBool>,
604        }
605
606        impl Service<Exchange> for DelayedReady {
607            type Response = Exchange;
608            type Error = CamelError;
609            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
610
611            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
612                if self.ready.load(Ordering::SeqCst) {
613                    Poll::Ready(Ok(()))
614                } else {
615                    cx.waker().wake_by_ref();
616                    Poll::Pending
617                }
618            }
619
620            fn call(&mut self, exchange: Exchange) -> Self::Future {
621                Box::pin(async move { Ok(exchange) })
622            }
623        }
624
625        let ready_flag = Arc::new(AtomicBool::new(false));
626        let inner = DelayedReady {
627            ready: Arc::clone(&ready_flag),
628        };
629        let boxed: BoxProcessor = BoxProcessor::new(inner);
630
631        let config = MulticastConfig::new();
632        let mut svc = MulticastService::new(vec![boxed], config);
633
634        // First poll should be Pending.
635        let waker = futures::task::noop_waker();
636        let mut cx = Context::from_waker(&waker);
637        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
638        assert!(
639            poll.is_pending(),
640            "expected Pending when endpoint not ready"
641        );
642
643        // Mark endpoint as ready.
644        ready_flag.store(true, Ordering::SeqCst);
645
646        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
647        assert!(
648            matches!(poll, Poll::Ready(Ok(()))),
649            "expected Ready after endpoint becomes ready"
650        );
651    }
652
653    // ── 12. CollectAll with error propagates ────────────────────────────
654
655    #[tokio::test]
656    async fn test_multicast_collect_all_error_propagates() {
657        let endpoints = vec![
658            uppercase_processor(),
659            failing_processor(),
660            uppercase_processor(),
661        ];
662
663        let config = MulticastConfig::new()
664            .stop_on_exception(false)
665            .aggregation(MulticastStrategy::CollectAll);
666        let mut svc = MulticastService::new(endpoints, config);
667
668        let result = svc
669            .ready()
670            .await
671            .unwrap()
672            .call(make_exchange("hello"))
673            .await;
674
675        assert!(result.is_err(), "CollectAll should propagate first error");
676    }
677
678    // ── 13. LastWins with error last returns error ──────────────────────
679
680    #[tokio::test]
681    async fn test_multicast_last_wins_error_last() {
682        let endpoints = vec![
683            uppercase_processor(),
684            uppercase_processor(),
685            failing_processor(),
686        ];
687
688        let config = MulticastConfig::new()
689            .stop_on_exception(false)
690            .aggregation(MulticastStrategy::LastWins);
691        let mut svc = MulticastService::new(endpoints, config);
692
693        let result = svc
694            .ready()
695            .await
696            .unwrap()
697            .call(make_exchange("hello"))
698            .await;
699
700        assert!(result.is_err(), "LastWins should return last error");
701    }
702
703    // ── 14. Custom aggregation with error propagates ────────────────────
704
705    #[tokio::test]
706    async fn test_multicast_custom_error_propagates() {
707        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
708            Arc::new(|acc: Exchange, _next: Exchange| acc);
709
710        let endpoints = vec![
711            uppercase_processor(),
712            failing_processor(),
713            uppercase_processor(),
714        ];
715
716        let config = MulticastConfig::new()
717            .stop_on_exception(false)
718            .aggregation(MulticastStrategy::Custom(joiner));
719        let mut svc = MulticastService::new(endpoints, config);
720
721        let result = svc
722            .ready()
723            .await
724            .unwrap()
725            .call(make_exchange("hello"))
726            .await;
727
728        assert!(
729            result.is_err(),
730            "Custom aggregation should propagate errors"
731        );
732    }
733
734    // ── 15. Parallel + CollectAll basic ─────────────────────────────────
735
736    #[tokio::test]
737    async fn test_multicast_parallel_basic() {
738        let endpoints = vec![uppercase_processor(), uppercase_processor()];
739
740        let config = MulticastConfig::new()
741            .parallel(true)
742            .aggregation(MulticastStrategy::CollectAll);
743        let mut svc = MulticastService::new(endpoints, config);
744
745        let result = svc
746            .ready()
747            .await
748            .unwrap()
749            .call(make_exchange("test"))
750            .await
751            .unwrap();
752
753        // Both endpoints uppercase "test" → ["TEST", "TEST"]
754        // Note: parallel order is not guaranteed for CollectAll, but with identical processors it doesn't matter
755        match &result.input.body {
756            Body::Json(v) => {
757                let arr = v.as_array().expect("expected array");
758                assert_eq!(arr.len(), 2);
759                assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
760            }
761            other => panic!("expected JSON body, got {:?}", other),
762        }
763    }
764
765    // ── 16. Parallel with concurrency limit ─────────────────────────────
766
767    #[tokio::test]
768    async fn test_multicast_parallel_with_limit() {
769        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
770
771        let concurrent = Arc::new(AtomicUsize::new(0));
772        let max_concurrent = Arc::new(AtomicUsize::new(0));
773
774        let endpoints: Vec<BoxProcessor> = (0..4)
775            .map(|_| {
776                let c = Arc::clone(&concurrent);
777                let mc = Arc::clone(&max_concurrent);
778                BoxProcessor::from_fn(move |ex: Exchange| {
779                    let c = Arc::clone(&c);
780                    let mc = Arc::clone(&mc);
781                    Box::pin(async move {
782                        let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
783                        mc.fetch_max(current, AtomicOrdering::SeqCst);
784                        tokio::task::yield_now().await;
785                        c.fetch_sub(1, AtomicOrdering::SeqCst);
786                        Ok(ex)
787                    })
788                })
789            })
790            .collect();
791
792        let config = MulticastConfig::new().parallel(true).parallel_limit(2);
793        let mut svc = MulticastService::new(endpoints, config);
794
795        let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
796
797        let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
798        assert!(
799            observed_max <= 2,
800            "max concurrency was {}, expected <= 2",
801            observed_max
802        );
803    }
804
805    // ── 17. Stream body creates valid JSON placeholder ────────────────────
806
807    async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
808        use bytes::Bytes;
809        use camel_api::{Body, StreamBody, StreamMetadata};
810        use futures::stream;
811        use std::sync::Arc;
812        use tokio::sync::Mutex;
813
814        let chunks = vec![Ok(Bytes::from("test"))];
815        let stream_body = StreamBody {
816            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
817            metadata: StreamMetadata {
818                origin,
819                ..Default::default()
820            },
821        };
822
823        let stream_body_clone = stream_body.clone();
824        let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
825            let body_clone = stream_body_clone.clone();
826            Box::pin(async move {
827                let mut out = ex;
828                out.input.body = Body::Stream(body_clone);
829                Ok(out)
830            })
831        })];
832
833        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
834        let mut svc = MulticastService::new(endpoints, config);
835
836        svc.ready()
837            .await
838            .unwrap()
839            .call(Exchange::new(Message::new("")))
840            .await
841            .unwrap()
842    }
843
844    #[tokio::test]
845    async fn test_multicast_stream_bodies_creates_valid_json() {
846        use camel_api::Body;
847
848        let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
849
850        let Body::Json(value) = &result.input.body else {
851            panic!("Expected Json body, got {:?}", result.input.body);
852        };
853
854        let json_str = serde_json::to_string(&value).unwrap();
855        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
856
857        assert!(parsed.is_array());
858        let arr = parsed.as_array().unwrap();
859        assert_eq!(arr.len(), 1);
860        assert!(arr[0]["_stream"].is_object());
861        assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
862        assert_eq!(arr[0]["_stream"]["placeholder"], true);
863    }
864
865    // ── 18. Stream body with None origin creates valid JSON ──────────────
866
867    #[tokio::test]
868    async fn test_multicast_stream_with_none_origin_creates_valid_json() {
869        use camel_api::Body;
870
871        let result = setup_multicast_stream_test(None).await;
872
873        let Body::Json(value) = &result.input.body else {
874            panic!("Expected Json body, got {:?}", result.input.body);
875        };
876
877        let json_str = serde_json::to_string(&value).unwrap();
878        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
879
880        assert!(parsed.is_array());
881        let arr = parsed.as_array().unwrap();
882        assert_eq!(arr.len(), 1);
883        assert!(arr[0]["_stream"].is_object());
884        assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
885        assert_eq!(arr[0]["_stream"]["placeholder"], true);
886    }
887
888    #[tokio::test]
889    async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
890        let endpoints = vec![
891            BoxProcessor::from_fn(|mut ex: Exchange| {
892                Box::pin(async move {
893                    ex.input.body = Body::Bytes(vec![65, 66].into());
894                    Ok(ex)
895                })
896            }),
897            BoxProcessor::from_fn(|mut ex: Exchange| {
898                Box::pin(async move {
899                    ex.input.body = Body::Xml("<a/>".to_string());
900                    Ok(ex)
901                })
902            }),
903            BoxProcessor::from_fn(|mut ex: Exchange| {
904                Box::pin(async move {
905                    ex.input.body = Body::Empty;
906                    Ok(ex)
907                })
908            }),
909        ];
910
911        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
912        let mut svc = MulticastService::new(endpoints, config);
913        let result = svc
914            .ready()
915            .await
916            .unwrap()
917            .call(make_exchange("x"))
918            .await
919            .unwrap();
920
921        match result.input.body {
922            Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
923            _ => panic!("expected json"),
924        }
925    }
926}