Skip to main content

camel_processor/
multicast.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8    Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11// ── Metadata property keys ─────────────────────────────────────────────
12
13/// Property key for the zero-based index of the endpoint being invoked.
14pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15/// Property key indicating whether this is the last endpoint invocation.
16pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18// ── MulticastService ───────────────────────────────────────────────────
19
20/// Tower Service implementing the Multicast EIP.
21///
22/// Sends a message to multiple endpoints, processing each independently,
23/// and then aggregating the results.
24///
25/// Supports both sequential and parallel processing modes, configurable
26/// via [`MulticastConfig::parallel`]. When parallel mode is enabled,
27/// all endpoints are invoked concurrently with optional concurrency
28/// limiting via [`MulticastConfig::parallel_limit`].
29#[derive(Clone)]
30pub struct MulticastService {
31    endpoints: Vec<BoxProcessor>,
32    config: MulticastConfig,
33}
34
35impl MulticastService {
36    /// Create a new `MulticastService` from a list of endpoints and a [`MulticastConfig`].
37    pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Self {
38        Self { endpoints, config }
39    }
40}
41
42impl Service<Exchange> for MulticastService {
43    type Response = Exchange;
44    type Error = CamelError;
45    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
46
47    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48        // Check all endpoints for readiness
49        for endpoint in &mut self.endpoints {
50            match endpoint.poll_ready(cx) {
51                Poll::Pending => return Poll::Pending,
52                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
53                Poll::Ready(Ok(())) => {}
54            }
55        }
56        Poll::Ready(Ok(()))
57    }
58
59    fn call(&mut self, exchange: Exchange) -> Self::Future {
60        let original = exchange.clone();
61        let endpoints = self.endpoints.clone();
62        let config = self.config.clone();
63
64        Box::pin(async move {
65            // If no endpoints, return original exchange unchanged
66            if endpoints.is_empty() {
67                return Ok(original);
68            }
69
70            let total = endpoints.len();
71
72            let results = if config.parallel {
73                // Process endpoints in parallel
74                process_parallel(exchange, endpoints, config.parallel_limit, total).await
75            } else {
76                // Process each endpoint sequentially
77                process_sequential(exchange, endpoints, config.stop_on_exception, total).await
78            };
79
80            // Aggregate results per strategy
81            aggregate(results, original, config.aggregation)
82        })
83    }
84}
85
86// ── Sequential processing ──────────────────────────────────────────────
87
88async fn process_sequential(
89    exchange: Exchange,
90    endpoints: Vec<BoxProcessor>,
91    stop_on_exception: bool,
92    total: usize,
93) -> Vec<Result<Exchange, CamelError>> {
94    let mut results = Vec::with_capacity(endpoints.len());
95
96    for (i, endpoint) in endpoints.into_iter().enumerate() {
97        // Clone the exchange for each endpoint
98        let mut cloned_exchange = exchange.clone();
99
100        // Set multicast metadata properties
101        cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
102        cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
103
104        let mut endpoint = endpoint;
105        match tower::ServiceExt::ready(&mut endpoint).await {
106            Err(e) => {
107                results.push(Err(e));
108                if stop_on_exception {
109                    break;
110                }
111            }
112            Ok(svc) => {
113                let result = svc.call(cloned_exchange).await;
114                let is_err = result.is_err();
115                results.push(result);
116                if stop_on_exception && is_err {
117                    break;
118                }
119            }
120        }
121    }
122
123    results
124}
125
126// ── Parallel processing ────────────────────────────────────────────────
127
128async fn process_parallel(
129    exchange: Exchange,
130    endpoints: Vec<BoxProcessor>,
131    parallel_limit: Option<usize>,
132    total: usize,
133) -> Vec<Result<Exchange, CamelError>> {
134    use std::sync::Arc;
135    use tokio::sync::Semaphore;
136
137    let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
138
139    // Build futures for each endpoint
140    let futures: Vec<_> = endpoints
141        .into_iter()
142        .enumerate()
143        .map(|(i, mut endpoint)| {
144            let mut ex = exchange.clone();
145            ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
146            ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
147            let sem = semaphore.clone();
148            async move {
149                // Acquire semaphore permit if limit is set
150                let _permit = match &sem {
151                    Some(s) => match s.acquire().await {
152                        Ok(p) => Some(p),
153                        Err(_) => {
154                            return Err(CamelError::ProcessorError("semaphore closed".to_string()));
155                        }
156                    },
157                    None => None,
158                };
159
160                // Wait for endpoint to be ready, then call it
161                tower::ServiceExt::ready(&mut endpoint).await?;
162                endpoint.call(ex).await
163            }
164        })
165        .collect();
166
167    // Execute all futures concurrently and collect results
168    futures::future::join_all(futures).await
169}
170
171// ── Aggregation ────────────────────────────────────────────────────────
172
173fn aggregate(
174    results: Vec<Result<Exchange, CamelError>>,
175    original: Exchange,
176    strategy: MulticastStrategy,
177) -> Result<Exchange, CamelError> {
178    match strategy {
179        MulticastStrategy::LastWins => {
180            // Return the last result (error or success).
181            // If last result is Err and stop_on_exception=false, return that error.
182            results.into_iter().last().unwrap_or_else(|| Ok(original))
183        }
184        MulticastStrategy::CollectAll => {
185            // Collect all bodies into a JSON array. Errors propagate.
186            let mut bodies = Vec::new();
187            for result in results {
188                let ex = result?;
189                let value = match &ex.input.body {
190                    Body::Text(s) => Value::String(s.clone()),
191                    Body::Json(v) => v.clone(),
192                    Body::Xml(s) => Value::String(s.clone()),
193                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
194                    Body::Empty => Value::Null,
195                    Body::Stream(s) => serde_json::json!({
196                        "_stream": {
197                            "origin": s.metadata.origin,
198                            "placeholder": true,
199                            "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
200                        }
201                    }),
202                };
203                bodies.push(value);
204            }
205            let mut out = original;
206            out.input.body = Body::Json(Value::Array(bodies));
207            Ok(out)
208        }
209        MulticastStrategy::Original => Ok(original),
210        MulticastStrategy::Custom(fold_fn) => {
211            // Fold using the custom function, starting from the first result.
212            let mut iter = results.into_iter();
213            let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
214            iter.try_fold(first, |acc, next_result| {
215                let next = next_result?;
216                Ok(fold_fn(acc, next))
217            })
218        }
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225    use camel_api::{BoxProcessorExt, Message};
226    use std::sync::Arc;
227    use std::sync::atomic::Ordering;
228    use tower::ServiceExt;
229
230    // ── Test helpers ───────────────────────────────────────────────────
231
232    fn make_exchange(body: &str) -> Exchange {
233        Exchange::new(Message::new(body))
234    }
235
236    fn uppercase_processor() -> BoxProcessor {
237        BoxProcessor::from_fn(|mut ex: Exchange| {
238            Box::pin(async move {
239                if let Body::Text(s) = &ex.input.body {
240                    ex.input.body = Body::Text(s.to_uppercase());
241                }
242                Ok(ex)
243            })
244        })
245    }
246
247    fn failing_processor() -> BoxProcessor {
248        BoxProcessor::from_fn(|_ex| {
249            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
250        })
251    }
252
253    // ── 1. Sequential + LastWins ───────────────────────────────────────
254
255    #[tokio::test]
256    async fn test_multicast_sequential_last_wins() {
257        let endpoints = vec![
258            uppercase_processor(),
259            uppercase_processor(),
260            uppercase_processor(),
261        ];
262
263        let config = MulticastConfig::new(); // LastWins by default
264        let mut svc = MulticastService::new(endpoints, config);
265
266        let result = svc
267            .ready()
268            .await
269            .unwrap()
270            .call(make_exchange("hello"))
271            .await
272            .unwrap();
273
274        assert_eq!(result.input.body.as_text(), Some("HELLO"));
275    }
276
277    // ── 2. Sequential + CollectAll ─────────────────────────────────────
278
279    #[tokio::test]
280    async fn test_multicast_sequential_collect_all() {
281        let endpoints = vec![
282            uppercase_processor(),
283            uppercase_processor(),
284            uppercase_processor(),
285        ];
286
287        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
288        let mut svc = MulticastService::new(endpoints, config);
289
290        let result = svc
291            .ready()
292            .await
293            .unwrap()
294            .call(make_exchange("hello"))
295            .await
296            .unwrap();
297
298        let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
299        match &result.input.body {
300            Body::Json(v) => assert_eq!(*v, expected),
301            other => panic!("expected JSON body, got {other:?}"),
302        }
303    }
304
305    // ── 3. Sequential + Original ───────────────────────────────────────
306
307    #[tokio::test]
308    async fn test_multicast_sequential_original() {
309        let endpoints = vec![
310            uppercase_processor(),
311            uppercase_processor(),
312            uppercase_processor(),
313        ];
314
315        let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
316        let mut svc = MulticastService::new(endpoints, config);
317
318        let result = svc
319            .ready()
320            .await
321            .unwrap()
322            .call(make_exchange("hello"))
323            .await
324            .unwrap();
325
326        // Original body should be unchanged
327        assert_eq!(result.input.body.as_text(), Some("hello"));
328    }
329
330    // ── 4. Sequential + Custom aggregation ─────────────────────────────
331
332    #[tokio::test]
333    async fn test_multicast_sequential_custom_aggregation() {
334        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
335            Arc::new(|mut acc: Exchange, next: Exchange| {
336                let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
337                let next_text = next.input.body.as_text().unwrap_or("").to_string();
338                acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
339                acc
340            });
341
342        let endpoints = vec![
343            uppercase_processor(),
344            uppercase_processor(),
345            uppercase_processor(),
346        ];
347
348        let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
349        let mut svc = MulticastService::new(endpoints, config);
350
351        let result = svc
352            .ready()
353            .await
354            .unwrap()
355            .call(make_exchange("a"))
356            .await
357            .unwrap();
358
359        assert_eq!(result.input.body.as_text(), Some("A+A+A"));
360    }
361
362    // ── 5. Stop on exception ───────────────────────────────────────────
363
364    #[tokio::test]
365    async fn test_multicast_stop_on_exception() {
366        let endpoints = vec![
367            uppercase_processor(),
368            failing_processor(),
369            uppercase_processor(),
370        ];
371
372        let config = MulticastConfig::new().stop_on_exception(true);
373        let mut svc = MulticastService::new(endpoints, config);
374
375        let result = svc
376            .ready()
377            .await
378            .unwrap()
379            .call(make_exchange("hello"))
380            .await;
381
382        assert!(result.is_err(), "expected error due to stop_on_exception");
383    }
384
385    // ── 6. Continue on exception ───────────────────────────────────────
386
387    #[tokio::test]
388    async fn test_multicast_continue_on_exception() {
389        let endpoints = vec![
390            uppercase_processor(),
391            failing_processor(),
392            uppercase_processor(),
393        ];
394
395        let config = MulticastConfig::new()
396            .stop_on_exception(false)
397            .aggregation(MulticastStrategy::LastWins);
398        let mut svc = MulticastService::new(endpoints, config);
399
400        let result = svc
401            .ready()
402            .await
403            .unwrap()
404            .call(make_exchange("hello"))
405            .await;
406
407        // LastWins: last endpoint succeeded, so result should be OK
408        assert!(result.is_ok(), "last endpoint should succeed");
409        assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
410    }
411
412    // ── 7. Stop on exception with fail_on_nth ─────────────────────────────
413
414    #[tokio::test]
415    async fn test_multicast_stop_on_exception_halts_early() {
416        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
417
418        // Track which endpoints actually execute
419        let executed = Arc::new(AtomicUsize::new(0));
420
421        let exec_clone1 = Arc::clone(&executed);
422        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
423            let e = Arc::clone(&exec_clone1);
424            Box::pin(async move {
425                e.fetch_add(1, AtomicOrdering::SeqCst);
426                Ok(ex)
427            })
428        });
429
430        let exec_clone2 = Arc::clone(&executed);
431        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
432            let e = Arc::clone(&exec_clone2);
433            Box::pin(async move {
434                e.fetch_add(1, AtomicOrdering::SeqCst);
435                Err(CamelError::ProcessorError("fail on 1".into()))
436            })
437        });
438
439        let exec_clone3 = Arc::clone(&executed);
440        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
441            let e = Arc::clone(&exec_clone3);
442            Box::pin(async move {
443                e.fetch_add(1, AtomicOrdering::SeqCst);
444                Ok(ex)
445            })
446        });
447
448        let endpoints = vec![endpoint0, endpoint1, endpoint2];
449        let config = MulticastConfig::new().stop_on_exception(true);
450        let mut svc = MulticastService::new(endpoints, config);
451
452        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
453        assert!(result.is_err(), "should fail at endpoint 1");
454
455        // Only endpoints 0 and 1 should have executed (2 should be skipped)
456        let count = executed.load(AtomicOrdering::SeqCst);
457        assert_eq!(
458            count, 2,
459            "endpoint 2 should not have executed due to stop_on_exception"
460        );
461    }
462
463    // ── 8. Continue on exception with fail_on_nth ─────────────────────────
464
465    #[tokio::test]
466    async fn test_multicast_continue_on_exception_executes_all() {
467        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
468
469        // Track which endpoints actually execute
470        let executed = Arc::new(AtomicUsize::new(0));
471
472        let exec_clone1 = Arc::clone(&executed);
473        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
474            let e = Arc::clone(&exec_clone1);
475            Box::pin(async move {
476                e.fetch_add(1, AtomicOrdering::SeqCst);
477                Ok(ex)
478            })
479        });
480
481        let exec_clone2 = Arc::clone(&executed);
482        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
483            let e = Arc::clone(&exec_clone2);
484            Box::pin(async move {
485                e.fetch_add(1, AtomicOrdering::SeqCst);
486                Err(CamelError::ProcessorError("fail on 1".into()))
487            })
488        });
489
490        let exec_clone3 = Arc::clone(&executed);
491        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
492            let e = Arc::clone(&exec_clone3);
493            Box::pin(async move {
494                e.fetch_add(1, AtomicOrdering::SeqCst);
495                Ok(ex)
496            })
497        });
498
499        let endpoints = vec![endpoint0, endpoint1, endpoint2];
500        let config = MulticastConfig::new()
501            .stop_on_exception(false)
502            .aggregation(MulticastStrategy::LastWins);
503        let mut svc = MulticastService::new(endpoints, config);
504
505        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
506        assert!(result.is_ok(), "last endpoint should succeed");
507
508        // All 3 endpoints should have executed
509        let count = executed.load(AtomicOrdering::SeqCst);
510        assert_eq!(
511            count, 3,
512            "all endpoints should have executed despite error in endpoint 1"
513        );
514    }
515
516    // ── 9. Empty endpoints ─────────────────────────────────────────────
517
518    #[tokio::test]
519    async fn test_multicast_empty_endpoints() {
520        let endpoints: Vec<BoxProcessor> = vec![];
521
522        let config = MulticastConfig::new();
523        let mut svc = MulticastService::new(endpoints, config);
524
525        let mut ex = make_exchange("hello");
526        ex.set_property("marker", Value::Bool(true));
527
528        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
529        assert_eq!(result.input.body.as_text(), Some("hello"));
530        assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
531    }
532
533    // ── 10. Metadata properties ─────────────────────────────────────────
534
535    #[tokio::test]
536    async fn test_multicast_metadata_properties() {
537        // Use a pipeline that records the metadata into the body as JSON
538        let recorder = BoxProcessor::from_fn(|ex: Exchange| {
539            Box::pin(async move {
540                let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
541                let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
542                let body = serde_json::json!({
543                    "index": idx,
544                    "complete": complete,
545                });
546                let mut out = ex;
547                out.input.body = Body::Json(body);
548                Ok(out)
549            })
550        });
551
552        let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
553
554        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
555        let mut svc = MulticastService::new(endpoints, config);
556
557        let result = svc
558            .ready()
559            .await
560            .unwrap()
561            .call(make_exchange("x"))
562            .await
563            .unwrap();
564
565        let expected = serde_json::json!([
566            {"index": 0, "complete": false},
567            {"index": 1, "complete": false},
568            {"index": 2, "complete": true},
569        ]);
570        match &result.input.body {
571            Body::Json(v) => assert_eq!(*v, expected),
572            other => panic!("expected JSON body, got {other:?}"),
573        }
574    }
575
576    // ── 11. poll_ready delegates to endpoints ────────────────────────────
577
578    #[tokio::test]
579    async fn test_poll_ready_delegates_to_endpoints() {
580        use std::sync::atomic::AtomicBool;
581
582        // A service that is initially not ready, then becomes ready.
583        #[derive(Clone)]
584        struct DelayedReady {
585            ready: Arc<AtomicBool>,
586        }
587
588        impl Service<Exchange> for DelayedReady {
589            type Response = Exchange;
590            type Error = CamelError;
591            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
592
593            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
594                if self.ready.load(Ordering::SeqCst) {
595                    Poll::Ready(Ok(()))
596                } else {
597                    cx.waker().wake_by_ref();
598                    Poll::Pending
599                }
600            }
601
602            fn call(&mut self, exchange: Exchange) -> Self::Future {
603                Box::pin(async move { Ok(exchange) })
604            }
605        }
606
607        let ready_flag = Arc::new(AtomicBool::new(false));
608        let inner = DelayedReady {
609            ready: Arc::clone(&ready_flag),
610        };
611        let boxed: BoxProcessor = BoxProcessor::new(inner);
612
613        let config = MulticastConfig::new();
614        let mut svc = MulticastService::new(vec![boxed], config);
615
616        // First poll should be Pending.
617        let waker = futures::task::noop_waker();
618        let mut cx = Context::from_waker(&waker);
619        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
620        assert!(
621            poll.is_pending(),
622            "expected Pending when endpoint not ready"
623        );
624
625        // Mark endpoint as ready.
626        ready_flag.store(true, Ordering::SeqCst);
627
628        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
629        assert!(
630            matches!(poll, Poll::Ready(Ok(()))),
631            "expected Ready after endpoint becomes ready"
632        );
633    }
634
635    // ── 12. CollectAll with error propagates ────────────────────────────
636
637    #[tokio::test]
638    async fn test_multicast_collect_all_error_propagates() {
639        let endpoints = vec![
640            uppercase_processor(),
641            failing_processor(),
642            uppercase_processor(),
643        ];
644
645        let config = MulticastConfig::new()
646            .stop_on_exception(false)
647            .aggregation(MulticastStrategy::CollectAll);
648        let mut svc = MulticastService::new(endpoints, config);
649
650        let result = svc
651            .ready()
652            .await
653            .unwrap()
654            .call(make_exchange("hello"))
655            .await;
656
657        assert!(result.is_err(), "CollectAll should propagate first error");
658    }
659
660    // ── 13. LastWins with error last returns error ──────────────────────
661
662    #[tokio::test]
663    async fn test_multicast_last_wins_error_last() {
664        let endpoints = vec![
665            uppercase_processor(),
666            uppercase_processor(),
667            failing_processor(),
668        ];
669
670        let config = MulticastConfig::new()
671            .stop_on_exception(false)
672            .aggregation(MulticastStrategy::LastWins);
673        let mut svc = MulticastService::new(endpoints, config);
674
675        let result = svc
676            .ready()
677            .await
678            .unwrap()
679            .call(make_exchange("hello"))
680            .await;
681
682        assert!(result.is_err(), "LastWins should return last error");
683    }
684
685    // ── 14. Custom aggregation with error propagates ────────────────────
686
687    #[tokio::test]
688    async fn test_multicast_custom_error_propagates() {
689        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
690            Arc::new(|acc: Exchange, _next: Exchange| acc);
691
692        let endpoints = vec![
693            uppercase_processor(),
694            failing_processor(),
695            uppercase_processor(),
696        ];
697
698        let config = MulticastConfig::new()
699            .stop_on_exception(false)
700            .aggregation(MulticastStrategy::Custom(joiner));
701        let mut svc = MulticastService::new(endpoints, config);
702
703        let result = svc
704            .ready()
705            .await
706            .unwrap()
707            .call(make_exchange("hello"))
708            .await;
709
710        assert!(
711            result.is_err(),
712            "Custom aggregation should propagate errors"
713        );
714    }
715
716    // ── 15. Parallel + CollectAll basic ─────────────────────────────────
717
718    #[tokio::test]
719    async fn test_multicast_parallel_basic() {
720        let endpoints = vec![uppercase_processor(), uppercase_processor()];
721
722        let config = MulticastConfig::new()
723            .parallel(true)
724            .aggregation(MulticastStrategy::CollectAll);
725        let mut svc = MulticastService::new(endpoints, config);
726
727        let result = svc
728            .ready()
729            .await
730            .unwrap()
731            .call(make_exchange("test"))
732            .await
733            .unwrap();
734
735        // Both endpoints uppercase "test" → ["TEST", "TEST"]
736        // Note: parallel order is not guaranteed for CollectAll, but with identical processors it doesn't matter
737        match &result.input.body {
738            Body::Json(v) => {
739                let arr = v.as_array().expect("expected array");
740                assert_eq!(arr.len(), 2);
741                assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
742            }
743            other => panic!("expected JSON body, got {:?}", other),
744        }
745    }
746
747    // ── 16. Parallel with concurrency limit ─────────────────────────────
748
749    #[tokio::test]
750    async fn test_multicast_parallel_with_limit() {
751        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
752
753        let concurrent = Arc::new(AtomicUsize::new(0));
754        let max_concurrent = Arc::new(AtomicUsize::new(0));
755
756        let endpoints: Vec<BoxProcessor> = (0..4)
757            .map(|_| {
758                let c = Arc::clone(&concurrent);
759                let mc = Arc::clone(&max_concurrent);
760                BoxProcessor::from_fn(move |ex: Exchange| {
761                    let c = Arc::clone(&c);
762                    let mc = Arc::clone(&mc);
763                    Box::pin(async move {
764                        let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
765                        mc.fetch_max(current, AtomicOrdering::SeqCst);
766                        tokio::task::yield_now().await;
767                        c.fetch_sub(1, AtomicOrdering::SeqCst);
768                        Ok(ex)
769                    })
770                })
771            })
772            .collect();
773
774        let config = MulticastConfig::new().parallel(true).parallel_limit(2);
775        let mut svc = MulticastService::new(endpoints, config);
776
777        let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
778
779        let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
780        assert!(
781            observed_max <= 2,
782            "max concurrency was {}, expected <= 2",
783            observed_max
784        );
785    }
786
787    // ── 17. Stream body creates valid JSON placeholder ────────────────────
788
789    async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
790        use bytes::Bytes;
791        use camel_api::{Body, StreamBody, StreamMetadata};
792        use futures::stream;
793        use std::sync::Arc;
794        use tokio::sync::Mutex;
795
796        let chunks = vec![Ok(Bytes::from("test"))];
797        let stream_body = StreamBody {
798            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
799            metadata: StreamMetadata {
800                origin,
801                ..Default::default()
802            },
803        };
804
805        let stream_body_clone = stream_body.clone();
806        let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
807            let body_clone = stream_body_clone.clone();
808            Box::pin(async move {
809                let mut out = ex;
810                out.input.body = Body::Stream(body_clone);
811                Ok(out)
812            })
813        })];
814
815        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
816        let mut svc = MulticastService::new(endpoints, config);
817
818        svc.ready()
819            .await
820            .unwrap()
821            .call(Exchange::new(Message::new("")))
822            .await
823            .unwrap()
824    }
825
826    #[tokio::test]
827    async fn test_multicast_stream_bodies_creates_valid_json() {
828        use camel_api::Body;
829
830        let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
831
832        let Body::Json(value) = &result.input.body else {
833            panic!("Expected Json body, got {:?}", result.input.body);
834        };
835
836        let json_str = serde_json::to_string(&value).unwrap();
837        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
838
839        assert!(parsed.is_array());
840        let arr = parsed.as_array().unwrap();
841        assert_eq!(arr.len(), 1);
842        assert!(arr[0]["_stream"].is_object());
843        assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
844        assert_eq!(arr[0]["_stream"]["placeholder"], true);
845    }
846
847    // ── 18. Stream body with None origin creates valid JSON ──────────────
848
849    #[tokio::test]
850    async fn test_multicast_stream_with_none_origin_creates_valid_json() {
851        use camel_api::Body;
852
853        let result = setup_multicast_stream_test(None).await;
854
855        let Body::Json(value) = &result.input.body else {
856            panic!("Expected Json body, got {:?}", result.input.body);
857        };
858
859        let json_str = serde_json::to_string(&value).unwrap();
860        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
861
862        assert!(parsed.is_array());
863        let arr = parsed.as_array().unwrap();
864        assert_eq!(arr.len(), 1);
865        assert!(arr[0]["_stream"].is_object());
866        assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
867        assert_eq!(arr[0]["_stream"]["placeholder"], true);
868    }
869}