Skip to main content

camel_processor/
multicast.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8    Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11// ── Metadata property keys ─────────────────────────────────────────────
12
13/// Property key for the zero-based index of the endpoint being invoked.
14pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15/// Property key indicating whether this is the last endpoint invocation.
16pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18// ── MulticastService ───────────────────────────────────────────────────
19
20/// Tower Service implementing the Multicast EIP.
21///
22/// Sends a message to multiple endpoints, processing each independently,
23/// and then aggregating the results.
24///
25/// Supports both sequential and parallel processing modes, configurable
26/// via [`MulticastConfig::parallel`]. When parallel mode is enabled,
27/// all endpoints are invoked concurrently with optional concurrency
28/// limiting via [`MulticastConfig::parallel_limit`].
29#[derive(Clone)]
30pub struct MulticastService {
31    endpoints: Vec<BoxProcessor>,
32    config: MulticastConfig,
33}
34
35impl MulticastService {
36    /// Create a new `MulticastService` from a list of endpoints and a [`MulticastConfig`].
37    pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Result<Self, CamelError> {
38        config.validate()?;
39        Ok(Self { endpoints, config })
40    }
41}
42
43impl Service<Exchange> for MulticastService {
44    type Response = Exchange;
45    type Error = CamelError;
46    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
47
48    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        // Check all endpoints for readiness
50        for endpoint in &mut self.endpoints {
51            match endpoint.poll_ready(cx) {
52                Poll::Pending => return Poll::Pending,
53                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
54                Poll::Ready(Ok(())) => {}
55            }
56        }
57        Poll::Ready(Ok(()))
58    }
59
60    fn call(&mut self, exchange: Exchange) -> Self::Future {
61        let original = exchange.clone();
62        let endpoints = self.endpoints.clone();
63        let config = self.config.clone();
64
65        Box::pin(async move {
66            // If no endpoints, return original exchange unchanged
67            if endpoints.is_empty() {
68                return Ok(original);
69            }
70
71            let total = endpoints.len();
72
73            let results = if config.parallel {
74                // Process endpoints in parallel
75                process_parallel(exchange, endpoints, config.parallel_limit, total).await
76            } else {
77                // Process each endpoint sequentially
78                process_sequential(exchange, endpoints, config.stop_on_exception, total).await
79            };
80
81            // Aggregate results per strategy
82            aggregate(results, original, config.aggregation)
83        })
84    }
85}
86
87// ── Sequential processing ──────────────────────────────────────────────
88
89async fn process_sequential(
90    exchange: Exchange,
91    endpoints: Vec<BoxProcessor>,
92    stop_on_exception: bool,
93    total: usize,
94) -> Vec<Result<Exchange, CamelError>> {
95    let mut results = Vec::with_capacity(endpoints.len());
96
97    for (i, endpoint) in endpoints.into_iter().enumerate() {
98        // Clone the exchange for each endpoint
99        let mut cloned_exchange = exchange.clone();
100
101        // Set multicast metadata properties
102        cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
103        cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
104
105        let mut endpoint = endpoint;
106        match tower::ServiceExt::ready(&mut endpoint).await {
107            Err(e) => {
108                results.push(Err(e));
109                if stop_on_exception {
110                    break;
111                }
112            }
113            Ok(svc) => {
114                let result = svc.call(cloned_exchange).await;
115                let is_err = result.is_err();
116                results.push(result);
117                if stop_on_exception && is_err {
118                    break;
119                }
120            }
121        }
122    }
123
124    results
125}
126
127// ── Parallel processing ────────────────────────────────────────────────
128
129async fn process_parallel(
130    exchange: Exchange,
131    endpoints: Vec<BoxProcessor>,
132    parallel_limit: Option<usize>,
133    total: usize,
134) -> Vec<Result<Exchange, CamelError>> {
135    use std::sync::Arc;
136    use tokio::sync::Semaphore;
137
138    let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
139
140    // Build futures for each endpoint
141    let futures: Vec<_> = endpoints
142        .into_iter()
143        .enumerate()
144        .map(|(i, mut endpoint)| {
145            let mut ex = exchange.clone();
146            ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
147            ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
148            let sem = semaphore.clone();
149            async move {
150                // Acquire semaphore permit if limit is set
151                let _permit = match &sem {
152                    Some(s) => match s.acquire().await {
153                        Ok(p) => Some(p),
154                        Err(_) => {
155                            return Err(CamelError::ProcessorError("semaphore closed".to_string()));
156                        }
157                    },
158                    None => None,
159                };
160
161                // Wait for endpoint to be ready, then call it
162                tower::ServiceExt::ready(&mut endpoint).await?;
163                endpoint.call(ex).await
164            }
165        })
166        .collect();
167
168    // Execute all futures concurrently and collect results
169    futures::future::join_all(futures).await
170}
171
172// ── Aggregation ────────────────────────────────────────────────────────
173
174fn aggregate(
175    results: Vec<Result<Exchange, CamelError>>,
176    original: Exchange,
177    strategy: MulticastStrategy,
178) -> Result<Exchange, CamelError> {
179    match strategy {
180        MulticastStrategy::LastWins => {
181            // Return the last result (error or success).
182            // If last result is Err and stop_on_exception=false, return that error.
183            results.into_iter().last().unwrap_or_else(|| Ok(original))
184        }
185        MulticastStrategy::CollectAll => {
186            // Collect all bodies into a JSON array. Errors propagate.
187            let mut bodies = Vec::new();
188            for result in results {
189                let ex = result?;
190                let value = match &ex.input.body {
191                    Body::Text(s) => Value::String(s.clone()),
192                    Body::Json(v) => v.clone(),
193                    Body::Xml(s) => Value::String(s.clone()),
194                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
195                    Body::Empty => Value::Null,
196                    Body::Stream(s) => serde_json::json!({
197                        "_stream": {
198                            "origin": s.metadata.origin,
199                            "placeholder": true,
200                            "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
201                        }
202                    }),
203                };
204                bodies.push(value);
205            }
206            let mut out = original;
207            out.input.body = Body::Json(Value::Array(bodies));
208            Ok(out)
209        }
210        MulticastStrategy::Original => Ok(original),
211        MulticastStrategy::Custom(fold_fn) => {
212            // Fold using the custom function, starting from the first result.
213            let mut iter = results.into_iter();
214            let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
215            iter.try_fold(first, |acc, next_result| {
216                let next = next_result?;
217                Ok(fold_fn(acc, next))
218            })
219        }
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use camel_api::{BoxProcessorExt, Message};
227    use std::sync::Arc;
228    use std::sync::atomic::Ordering;
229    use tower::ServiceExt;
230
231    // ── Test helpers ───────────────────────────────────────────────────
232
233    fn make_exchange(body: &str) -> Exchange {
234        Exchange::new(Message::new(body))
235    }
236
237    fn uppercase_processor() -> BoxProcessor {
238        BoxProcessor::from_fn(|mut ex: Exchange| {
239            Box::pin(async move {
240                if let Body::Text(s) = &ex.input.body {
241                    ex.input.body = Body::Text(s.to_uppercase());
242                }
243                Ok(ex)
244            })
245        })
246    }
247
248    fn failing_processor() -> BoxProcessor {
249        BoxProcessor::from_fn(|_ex| {
250            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
251        })
252    }
253
254    #[test]
255    fn test_multicast_zero_parallel_limit_rejected() {
256        let config = MulticastConfig::new().parallel(true).parallel_limit(0);
257        let result = MulticastService::new(vec![passthrough_processor()], config);
258        assert!(result.is_err(), "zero parallel_limit should return Err");
259    }
260
261    fn passthrough_processor() -> BoxProcessor {
262        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
263    }
264
265    // ── 1. Sequential + LastWins ───────────────────────────────────────
266
267    #[tokio::test]
268    async fn test_multicast_sequential_last_wins() {
269        let endpoints = vec![
270            uppercase_processor(),
271            uppercase_processor(),
272            uppercase_processor(),
273        ];
274
275        let config = MulticastConfig::new(); // LastWins by default
276        let mut svc = MulticastService::new(endpoints, config).unwrap();
277
278        let result = svc
279            .ready()
280            .await
281            .unwrap()
282            .call(make_exchange("hello"))
283            .await
284            .unwrap();
285
286        assert_eq!(result.input.body.as_text(), Some("HELLO"));
287    }
288
289    // ── 2. Sequential + CollectAll ─────────────────────────────────────
290
291    #[tokio::test]
292    async fn test_multicast_sequential_collect_all() {
293        let endpoints = vec![
294            uppercase_processor(),
295            uppercase_processor(),
296            uppercase_processor(),
297        ];
298
299        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
300        let mut svc = MulticastService::new(endpoints, config).unwrap();
301
302        let result = svc
303            .ready()
304            .await
305            .unwrap()
306            .call(make_exchange("hello"))
307            .await
308            .unwrap();
309
310        let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
311        match &result.input.body {
312            Body::Json(v) => assert_eq!(*v, expected),
313            other => panic!("expected JSON body, got {other:?}"),
314        }
315    }
316
317    // ── 3. Sequential + Original ───────────────────────────────────────
318
319    #[tokio::test]
320    async fn test_multicast_sequential_original() {
321        let endpoints = vec![
322            uppercase_processor(),
323            uppercase_processor(),
324            uppercase_processor(),
325        ];
326
327        let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
328        let mut svc = MulticastService::new(endpoints, config).unwrap();
329
330        let result = svc
331            .ready()
332            .await
333            .unwrap()
334            .call(make_exchange("hello"))
335            .await
336            .unwrap();
337
338        // Original body should be unchanged
339        assert_eq!(result.input.body.as_text(), Some("hello"));
340    }
341
342    // ── 4. Sequential + Custom aggregation ─────────────────────────────
343
344    #[tokio::test]
345    async fn test_multicast_sequential_custom_aggregation() {
346        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
347            Arc::new(|mut acc: Exchange, next: Exchange| {
348                let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
349                let next_text = next.input.body.as_text().unwrap_or("").to_string();
350                acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
351                acc
352            });
353
354        let endpoints = vec![
355            uppercase_processor(),
356            uppercase_processor(),
357            uppercase_processor(),
358        ];
359
360        let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
361        let mut svc = MulticastService::new(endpoints, config).unwrap();
362
363        let result = svc
364            .ready()
365            .await
366            .unwrap()
367            .call(make_exchange("a"))
368            .await
369            .unwrap();
370
371        assert_eq!(result.input.body.as_text(), Some("A+A+A"));
372    }
373
374    // ── 5. Stop on exception ───────────────────────────────────────────
375
376    #[tokio::test]
377    async fn test_multicast_stop_on_exception() {
378        let endpoints = vec![
379            uppercase_processor(),
380            failing_processor(),
381            uppercase_processor(),
382        ];
383
384        let config = MulticastConfig::new().stop_on_exception(true);
385        let mut svc = MulticastService::new(endpoints, config).unwrap();
386
387        let result = svc
388            .ready()
389            .await
390            .unwrap()
391            .call(make_exchange("hello"))
392            .await;
393
394        assert!(result.is_err(), "expected error due to stop_on_exception");
395    }
396
397    // ── 6. Continue on exception ───────────────────────────────────────
398
399    #[tokio::test]
400    async fn test_multicast_continue_on_exception() {
401        let endpoints = vec![
402            uppercase_processor(),
403            failing_processor(),
404            uppercase_processor(),
405        ];
406
407        let config = MulticastConfig::new()
408            .stop_on_exception(false)
409            .aggregation(MulticastStrategy::LastWins);
410        let mut svc = MulticastService::new(endpoints, config).unwrap();
411
412        let result = svc
413            .ready()
414            .await
415            .unwrap()
416            .call(make_exchange("hello"))
417            .await;
418
419        // LastWins: last endpoint succeeded, so result should be OK
420        assert!(result.is_ok(), "last endpoint should succeed");
421        assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
422    }
423
424    // ── 7. Stop on exception with fail_on_nth ─────────────────────────────
425
426    #[tokio::test]
427    async fn test_multicast_stop_on_exception_halts_early() {
428        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
429
430        // Track which endpoints actually execute
431        let executed = Arc::new(AtomicUsize::new(0));
432
433        let exec_clone1 = Arc::clone(&executed);
434        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
435            let e = Arc::clone(&exec_clone1);
436            Box::pin(async move {
437                e.fetch_add(1, AtomicOrdering::SeqCst);
438                Ok(ex)
439            })
440        });
441
442        let exec_clone2 = Arc::clone(&executed);
443        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
444            let e = Arc::clone(&exec_clone2);
445            Box::pin(async move {
446                e.fetch_add(1, AtomicOrdering::SeqCst);
447                Err(CamelError::ProcessorError("fail on 1".into()))
448            })
449        });
450
451        let exec_clone3 = Arc::clone(&executed);
452        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
453            let e = Arc::clone(&exec_clone3);
454            Box::pin(async move {
455                e.fetch_add(1, AtomicOrdering::SeqCst);
456                Ok(ex)
457            })
458        });
459
460        let endpoints = vec![endpoint0, endpoint1, endpoint2];
461        let config = MulticastConfig::new().stop_on_exception(true);
462        let mut svc = MulticastService::new(endpoints, config).unwrap();
463
464        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
465        assert!(result.is_err(), "should fail at endpoint 1");
466
467        // Only endpoints 0 and 1 should have executed (2 should be skipped)
468        let count = executed.load(AtomicOrdering::SeqCst);
469        assert_eq!(
470            count, 2,
471            "endpoint 2 should not have executed due to stop_on_exception"
472        );
473    }
474
475    // ── 8. Continue on exception with fail_on_nth ─────────────────────────
476
477    #[tokio::test]
478    async fn test_multicast_continue_on_exception_executes_all() {
479        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
480
481        // Track which endpoints actually execute
482        let executed = Arc::new(AtomicUsize::new(0));
483
484        let exec_clone1 = Arc::clone(&executed);
485        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
486            let e = Arc::clone(&exec_clone1);
487            Box::pin(async move {
488                e.fetch_add(1, AtomicOrdering::SeqCst);
489                Ok(ex)
490            })
491        });
492
493        let exec_clone2 = Arc::clone(&executed);
494        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
495            let e = Arc::clone(&exec_clone2);
496            Box::pin(async move {
497                e.fetch_add(1, AtomicOrdering::SeqCst);
498                Err(CamelError::ProcessorError("fail on 1".into()))
499            })
500        });
501
502        let exec_clone3 = Arc::clone(&executed);
503        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
504            let e = Arc::clone(&exec_clone3);
505            Box::pin(async move {
506                e.fetch_add(1, AtomicOrdering::SeqCst);
507                Ok(ex)
508            })
509        });
510
511        let endpoints = vec![endpoint0, endpoint1, endpoint2];
512        let config = MulticastConfig::new()
513            .stop_on_exception(false)
514            .aggregation(MulticastStrategy::LastWins);
515        let mut svc = MulticastService::new(endpoints, config).unwrap();
516
517        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
518        assert!(result.is_ok(), "last endpoint should succeed");
519
520        // All 3 endpoints should have executed
521        let count = executed.load(AtomicOrdering::SeqCst);
522        assert_eq!(
523            count, 3,
524            "all endpoints should have executed despite error in endpoint 1"
525        );
526    }
527
528    // ── 9. Empty endpoints ─────────────────────────────────────────────
529
530    #[tokio::test]
531    async fn test_multicast_empty_endpoints() {
532        let endpoints: Vec<BoxProcessor> = vec![];
533
534        let config = MulticastConfig::new();
535        let mut svc = MulticastService::new(endpoints, config).unwrap();
536
537        let mut ex = make_exchange("hello");
538        ex.set_property("marker", Value::Bool(true));
539
540        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
541        assert_eq!(result.input.body.as_text(), Some("hello"));
542        assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
543    }
544
545    // ── 10. Metadata properties ─────────────────────────────────────────
546
547    #[tokio::test]
548    async fn test_multicast_metadata_properties() {
549        // Use a pipeline that records the metadata into the body as JSON
550        let recorder = BoxProcessor::from_fn(|ex: Exchange| {
551            Box::pin(async move {
552                let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
553                let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
554                let body = serde_json::json!({
555                    "index": idx,
556                    "complete": complete,
557                });
558                let mut out = ex;
559                out.input.body = Body::Json(body);
560                Ok(out)
561            })
562        });
563
564        let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
565
566        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
567        let mut svc = MulticastService::new(endpoints, config).unwrap();
568
569        let result = svc
570            .ready()
571            .await
572            .unwrap()
573            .call(make_exchange("x"))
574            .await
575            .unwrap();
576
577        let expected = serde_json::json!([
578            {"index": 0, "complete": false},
579            {"index": 1, "complete": false},
580            {"index": 2, "complete": true},
581        ]);
582        match &result.input.body {
583            Body::Json(v) => assert_eq!(*v, expected),
584            other => panic!("expected JSON body, got {other:?}"),
585        }
586    }
587
588    // ── 11. poll_ready delegates to endpoints ────────────────────────────
589
590    #[tokio::test]
591    async fn test_poll_ready_delegates_to_endpoints() {
592        use std::sync::atomic::AtomicBool;
593
594        // A service that is initially not ready, then becomes ready.
595        #[derive(Clone)]
596        struct DelayedReady {
597            ready: Arc<AtomicBool>,
598        }
599
600        impl Service<Exchange> for DelayedReady {
601            type Response = Exchange;
602            type Error = CamelError;
603            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
604
605            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
606                if self.ready.load(Ordering::SeqCst) {
607                    Poll::Ready(Ok(()))
608                } else {
609                    cx.waker().wake_by_ref();
610                    Poll::Pending
611                }
612            }
613
614            fn call(&mut self, exchange: Exchange) -> Self::Future {
615                Box::pin(async move { Ok(exchange) })
616            }
617        }
618
619        let ready_flag = Arc::new(AtomicBool::new(false));
620        let inner = DelayedReady {
621            ready: Arc::clone(&ready_flag),
622        };
623        let boxed: BoxProcessor = BoxProcessor::new(inner);
624
625        let config = MulticastConfig::new();
626        let mut svc = MulticastService::new(vec![boxed], config).unwrap();
627
628        // First poll should be Pending.
629        let waker = futures::task::noop_waker();
630        let mut cx = Context::from_waker(&waker);
631        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
632        assert!(
633            poll.is_pending(),
634            "expected Pending when endpoint not ready"
635        );
636
637        // Mark endpoint as ready.
638        ready_flag.store(true, Ordering::SeqCst);
639
640        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
641        assert!(
642            matches!(poll, Poll::Ready(Ok(()))),
643            "expected Ready after endpoint becomes ready"
644        );
645    }
646
647    // ── 12. CollectAll with error propagates ────────────────────────────
648
649    #[tokio::test]
650    async fn test_multicast_collect_all_error_propagates() {
651        let endpoints = vec![
652            uppercase_processor(),
653            failing_processor(),
654            uppercase_processor(),
655        ];
656
657        let config = MulticastConfig::new()
658            .stop_on_exception(false)
659            .aggregation(MulticastStrategy::CollectAll);
660        let mut svc = MulticastService::new(endpoints, config).unwrap();
661
662        let result = svc
663            .ready()
664            .await
665            .unwrap()
666            .call(make_exchange("hello"))
667            .await;
668
669        assert!(result.is_err(), "CollectAll should propagate first error");
670    }
671
672    // ── 13. LastWins with error last returns error ──────────────────────
673
674    #[tokio::test]
675    async fn test_multicast_last_wins_error_last() {
676        let endpoints = vec![
677            uppercase_processor(),
678            uppercase_processor(),
679            failing_processor(),
680        ];
681
682        let config = MulticastConfig::new()
683            .stop_on_exception(false)
684            .aggregation(MulticastStrategy::LastWins);
685        let mut svc = MulticastService::new(endpoints, config).unwrap();
686
687        let result = svc
688            .ready()
689            .await
690            .unwrap()
691            .call(make_exchange("hello"))
692            .await;
693
694        assert!(result.is_err(), "LastWins should return last error");
695    }
696
697    // ── 14. Custom aggregation with error propagates ────────────────────
698
699    #[tokio::test]
700    async fn test_multicast_custom_error_propagates() {
701        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
702            Arc::new(|acc: Exchange, _next: Exchange| acc);
703
704        let endpoints = vec![
705            uppercase_processor(),
706            failing_processor(),
707            uppercase_processor(),
708        ];
709
710        let config = MulticastConfig::new()
711            .stop_on_exception(false)
712            .aggregation(MulticastStrategy::Custom(joiner));
713        let mut svc = MulticastService::new(endpoints, config).unwrap();
714
715        let result = svc
716            .ready()
717            .await
718            .unwrap()
719            .call(make_exchange("hello"))
720            .await;
721
722        assert!(
723            result.is_err(),
724            "Custom aggregation should propagate errors"
725        );
726    }
727
728    // ── 15. Parallel + CollectAll basic ─────────────────────────────────
729
730    #[tokio::test]
731    async fn test_multicast_parallel_basic() {
732        let endpoints = vec![uppercase_processor(), uppercase_processor()];
733
734        let config = MulticastConfig::new()
735            .parallel(true)
736            .aggregation(MulticastStrategy::CollectAll);
737        let mut svc = MulticastService::new(endpoints, config).unwrap();
738
739        let result = svc
740            .ready()
741            .await
742            .unwrap()
743            .call(make_exchange("test"))
744            .await
745            .unwrap();
746
747        // Both endpoints uppercase "test" → ["TEST", "TEST"]
748        // Note: parallel order is not guaranteed for CollectAll, but with identical processors it doesn't matter
749        match &result.input.body {
750            Body::Json(v) => {
751                let arr = v.as_array().expect("expected array");
752                assert_eq!(arr.len(), 2);
753                assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
754            }
755            other => panic!("expected JSON body, got {:?}", other),
756        }
757    }
758
759    // ── 16. Parallel with concurrency limit ─────────────────────────────
760
761    #[tokio::test]
762    async fn test_multicast_parallel_with_limit() {
763        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
764
765        let concurrent = Arc::new(AtomicUsize::new(0));
766        let max_concurrent = Arc::new(AtomicUsize::new(0));
767
768        let endpoints: Vec<BoxProcessor> = (0..4)
769            .map(|_| {
770                let c = Arc::clone(&concurrent);
771                let mc = Arc::clone(&max_concurrent);
772                BoxProcessor::from_fn(move |ex: Exchange| {
773                    let c = Arc::clone(&c);
774                    let mc = Arc::clone(&mc);
775                    Box::pin(async move {
776                        let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
777                        mc.fetch_max(current, AtomicOrdering::SeqCst);
778                        tokio::task::yield_now().await;
779                        c.fetch_sub(1, AtomicOrdering::SeqCst);
780                        Ok(ex)
781                    })
782                })
783            })
784            .collect();
785
786        let config = MulticastConfig::new().parallel(true).parallel_limit(2);
787        let mut svc = MulticastService::new(endpoints, config).unwrap();
788
789        let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
790
791        let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
792        assert!(
793            observed_max <= 2,
794            "max concurrency was {}, expected <= 2",
795            observed_max
796        );
797    }
798
799    // ── 17. Stream body creates valid JSON placeholder ────────────────────
800
801    async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
802        use bytes::Bytes;
803        use camel_api::{Body, StreamBody, StreamMetadata};
804        use futures::stream;
805        use std::sync::Arc;
806        use tokio::sync::Mutex;
807
808        let chunks = vec![Ok(Bytes::from("test"))];
809        let stream_body = StreamBody {
810            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
811            metadata: StreamMetadata {
812                origin,
813                ..Default::default()
814            },
815        };
816
817        let stream_body_clone = stream_body.clone();
818        let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
819            let body_clone = stream_body_clone.clone();
820            Box::pin(async move {
821                let mut out = ex;
822                out.input.body = Body::Stream(body_clone);
823                Ok(out)
824            })
825        })];
826
827        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
828        let mut svc = MulticastService::new(endpoints, config).unwrap();
829
830        svc.ready()
831            .await
832            .unwrap()
833            .call(Exchange::new(Message::new("")))
834            .await
835            .unwrap()
836    }
837
838    #[tokio::test]
839    async fn test_multicast_stream_bodies_creates_valid_json() {
840        use camel_api::Body;
841
842        let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
843
844        let Body::Json(value) = &result.input.body else {
845            panic!("Expected Json body, got {:?}", result.input.body);
846        };
847
848        let json_str = serde_json::to_string(&value).unwrap();
849        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
850
851        assert!(parsed.is_array());
852        let arr = parsed.as_array().unwrap();
853        assert_eq!(arr.len(), 1);
854        assert!(arr[0]["_stream"].is_object());
855        assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
856        assert_eq!(arr[0]["_stream"]["placeholder"], true);
857    }
858
859    // ── 18. Stream body with None origin creates valid JSON ──────────────
860
861    #[tokio::test]
862    async fn test_multicast_stream_with_none_origin_creates_valid_json() {
863        use camel_api::Body;
864
865        let result = setup_multicast_stream_test(None).await;
866
867        let Body::Json(value) = &result.input.body else {
868            panic!("Expected Json body, got {:?}", result.input.body);
869        };
870
871        let json_str = serde_json::to_string(&value).unwrap();
872        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
873
874        assert!(parsed.is_array());
875        let arr = parsed.as_array().unwrap();
876        assert_eq!(arr.len(), 1);
877        assert!(arr[0]["_stream"].is_object());
878        assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
879        assert_eq!(arr[0]["_stream"]["placeholder"], true);
880    }
881
882    #[tokio::test]
883    async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
884        let endpoints = vec![
885            BoxProcessor::from_fn(|mut ex: Exchange| {
886                Box::pin(async move {
887                    ex.input.body = Body::Bytes(vec![65, 66].into());
888                    Ok(ex)
889                })
890            }),
891            BoxProcessor::from_fn(|mut ex: Exchange| {
892                Box::pin(async move {
893                    ex.input.body = Body::Xml("<a/>".to_string());
894                    Ok(ex)
895                })
896            }),
897            BoxProcessor::from_fn(|mut ex: Exchange| {
898                Box::pin(async move {
899                    ex.input.body = Body::Empty;
900                    Ok(ex)
901                })
902            }),
903        ];
904
905        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
906        let mut svc = MulticastService::new(endpoints, config).unwrap();
907        let result = svc
908            .ready()
909            .await
910            .unwrap()
911            .call(make_exchange("x"))
912            .await
913            .unwrap();
914
915        match result.input.body {
916            Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
917            _ => panic!("expected json"),
918        }
919    }
920}