Skip to main content

camel_processor/
multicast.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8    Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11// ── Metadata property keys ─────────────────────────────────────────────
12
13/// Property key for the zero-based index of the endpoint being invoked.
14pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15/// Property key indicating whether this is the last endpoint invocation.
16pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18// ── MulticastService ───────────────────────────────────────────────────
19
20/// Tower Service implementing the Multicast EIP.
21///
22/// Sends a message to multiple endpoints, processing each independently,
23/// and then aggregating the results.
24///
25/// Supports both sequential and parallel processing modes, configurable
26/// via [`MulticastConfig::parallel`]. When parallel mode is enabled,
27/// all endpoints are invoked concurrently with optional concurrency
28/// limiting via [`MulticastConfig::parallel_limit`].
29#[derive(Clone)]
30pub struct MulticastService {
31    endpoints: Vec<BoxProcessor>,
32    config: MulticastConfig,
33}
34
35impl MulticastService {
36    /// Create a new `MulticastService` from a list of endpoints and a [`MulticastConfig`].
37    pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Self {
38        Self { endpoints, config }
39    }
40}
41
42impl Service<Exchange> for MulticastService {
43    type Response = Exchange;
44    type Error = CamelError;
45    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
46
47    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48        // Check all endpoints for readiness
49        for endpoint in &mut self.endpoints {
50            match endpoint.poll_ready(cx) {
51                Poll::Pending => return Poll::Pending,
52                Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
53                Poll::Ready(Ok(())) => {}
54            }
55        }
56        Poll::Ready(Ok(()))
57    }
58
59    fn call(&mut self, exchange: Exchange) -> Self::Future {
60        let original = exchange.clone();
61        let endpoints = self.endpoints.clone();
62        let config = self.config.clone();
63
64        Box::pin(async move {
65            // If no endpoints, return original exchange unchanged
66            if endpoints.is_empty() {
67                return Ok(original);
68            }
69
70            let total = endpoints.len();
71
72            let results = if config.parallel {
73                // Process endpoints in parallel
74                process_parallel(exchange, endpoints, config.parallel_limit, total).await
75            } else {
76                // Process each endpoint sequentially
77                process_sequential(exchange, endpoints, config.stop_on_exception, total).await
78            };
79
80            // Aggregate results per strategy
81            aggregate(results, original, config.aggregation)
82        })
83    }
84}
85
86// ── Sequential processing ──────────────────────────────────────────────
87
88async fn process_sequential(
89    exchange: Exchange,
90    endpoints: Vec<BoxProcessor>,
91    stop_on_exception: bool,
92    total: usize,
93) -> Vec<Result<Exchange, CamelError>> {
94    let mut results = Vec::with_capacity(endpoints.len());
95
96    for (i, endpoint) in endpoints.into_iter().enumerate() {
97        // Clone the exchange for each endpoint
98        let mut cloned_exchange = exchange.clone();
99
100        // Set multicast metadata properties
101        cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
102        cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
103
104        let mut endpoint = endpoint;
105        match tower::ServiceExt::ready(&mut endpoint).await {
106            Err(e) => {
107                results.push(Err(e));
108                if stop_on_exception {
109                    break;
110                }
111            }
112            Ok(svc) => {
113                let result = svc.call(cloned_exchange).await;
114                let is_err = result.is_err();
115                results.push(result);
116                if stop_on_exception && is_err {
117                    break;
118                }
119            }
120        }
121    }
122
123    results
124}
125
126// ── Parallel processing ────────────────────────────────────────────────
127
128async fn process_parallel(
129    exchange: Exchange,
130    endpoints: Vec<BoxProcessor>,
131    parallel_limit: Option<usize>,
132    total: usize,
133) -> Vec<Result<Exchange, CamelError>> {
134    use std::sync::Arc;
135    use tokio::sync::Semaphore;
136
137    let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
138
139    // Build futures for each endpoint
140    let futures: Vec<_> = endpoints
141        .into_iter()
142        .enumerate()
143        .map(|(i, mut endpoint)| {
144            let mut ex = exchange.clone();
145            ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
146            ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
147            let sem = semaphore.clone();
148            async move {
149                // Acquire semaphore permit if limit is set
150                let _permit = match &sem {
151                    Some(s) => match s.acquire().await {
152                        Ok(p) => Some(p),
153                        Err(_) => {
154                            return Err(CamelError::ProcessorError("semaphore closed".to_string()));
155                        }
156                    },
157                    None => None,
158                };
159
160                // Wait for endpoint to be ready, then call it
161                tower::ServiceExt::ready(&mut endpoint).await?;
162                endpoint.call(ex).await
163            }
164        })
165        .collect();
166
167    // Execute all futures concurrently and collect results
168    futures::future::join_all(futures).await
169}
170
171// ── Aggregation ────────────────────────────────────────────────────────
172
173fn aggregate(
174    results: Vec<Result<Exchange, CamelError>>,
175    original: Exchange,
176    strategy: MulticastStrategy,
177) -> Result<Exchange, CamelError> {
178    match strategy {
179        MulticastStrategy::LastWins => {
180            // Return the last result (error or success).
181            // If last result is Err and stop_on_exception=false, return that error.
182            results.into_iter().last().unwrap_or_else(|| Ok(original))
183        }
184        MulticastStrategy::CollectAll => {
185            // Collect all bodies into a JSON array. Errors propagate.
186            let mut bodies = Vec::new();
187            for result in results {
188                let ex = result?;
189                let value = match &ex.input.body {
190                    Body::Text(s) => Value::String(s.clone()),
191                    Body::Json(v) => v.clone(),
192                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
193                    Body::Empty => Value::Null,
194                    Body::Stream(s) => serde_json::json!({
195                        "_stream": {
196                            "origin": s.metadata.origin,
197                            "placeholder": true,
198                            "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
199                        }
200                    }),
201                };
202                bodies.push(value);
203            }
204            let mut out = original;
205            out.input.body = Body::Json(Value::Array(bodies));
206            Ok(out)
207        }
208        MulticastStrategy::Original => Ok(original),
209        MulticastStrategy::Custom(fold_fn) => {
210            // Fold using the custom function, starting from the first result.
211            let mut iter = results.into_iter();
212            let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
213            iter.try_fold(first, |acc, next_result| {
214                let next = next_result?;
215                Ok(fold_fn(acc, next))
216            })
217        }
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use camel_api::{BoxProcessorExt, Message};
225    use std::sync::Arc;
226    use std::sync::atomic::Ordering;
227    use tower::ServiceExt;
228
229    // ── Test helpers ───────────────────────────────────────────────────
230
231    fn make_exchange(body: &str) -> Exchange {
232        Exchange::new(Message::new(body))
233    }
234
235    fn uppercase_processor() -> BoxProcessor {
236        BoxProcessor::from_fn(|mut ex: Exchange| {
237            Box::pin(async move {
238                if let Body::Text(s) = &ex.input.body {
239                    ex.input.body = Body::Text(s.to_uppercase());
240                }
241                Ok(ex)
242            })
243        })
244    }
245
246    fn failing_processor() -> BoxProcessor {
247        BoxProcessor::from_fn(|_ex| {
248            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
249        })
250    }
251
252    // ── 1. Sequential + LastWins ───────────────────────────────────────
253
254    #[tokio::test]
255    async fn test_multicast_sequential_last_wins() {
256        let endpoints = vec![
257            uppercase_processor(),
258            uppercase_processor(),
259            uppercase_processor(),
260        ];
261
262        let config = MulticastConfig::new(); // LastWins by default
263        let mut svc = MulticastService::new(endpoints, config);
264
265        let result = svc
266            .ready()
267            .await
268            .unwrap()
269            .call(make_exchange("hello"))
270            .await
271            .unwrap();
272
273        assert_eq!(result.input.body.as_text(), Some("HELLO"));
274    }
275
276    // ── 2. Sequential + CollectAll ─────────────────────────────────────
277
278    #[tokio::test]
279    async fn test_multicast_sequential_collect_all() {
280        let endpoints = vec![
281            uppercase_processor(),
282            uppercase_processor(),
283            uppercase_processor(),
284        ];
285
286        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
287        let mut svc = MulticastService::new(endpoints, config);
288
289        let result = svc
290            .ready()
291            .await
292            .unwrap()
293            .call(make_exchange("hello"))
294            .await
295            .unwrap();
296
297        let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
298        match &result.input.body {
299            Body::Json(v) => assert_eq!(*v, expected),
300            other => panic!("expected JSON body, got {other:?}"),
301        }
302    }
303
304    // ── 3. Sequential + Original ───────────────────────────────────────
305
306    #[tokio::test]
307    async fn test_multicast_sequential_original() {
308        let endpoints = vec![
309            uppercase_processor(),
310            uppercase_processor(),
311            uppercase_processor(),
312        ];
313
314        let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
315        let mut svc = MulticastService::new(endpoints, config);
316
317        let result = svc
318            .ready()
319            .await
320            .unwrap()
321            .call(make_exchange("hello"))
322            .await
323            .unwrap();
324
325        // Original body should be unchanged
326        assert_eq!(result.input.body.as_text(), Some("hello"));
327    }
328
329    // ── 4. Sequential + Custom aggregation ─────────────────────────────
330
331    #[tokio::test]
332    async fn test_multicast_sequential_custom_aggregation() {
333        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
334            Arc::new(|mut acc: Exchange, next: Exchange| {
335                let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
336                let next_text = next.input.body.as_text().unwrap_or("").to_string();
337                acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
338                acc
339            });
340
341        let endpoints = vec![
342            uppercase_processor(),
343            uppercase_processor(),
344            uppercase_processor(),
345        ];
346
347        let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
348        let mut svc = MulticastService::new(endpoints, config);
349
350        let result = svc
351            .ready()
352            .await
353            .unwrap()
354            .call(make_exchange("a"))
355            .await
356            .unwrap();
357
358        assert_eq!(result.input.body.as_text(), Some("A+A+A"));
359    }
360
361    // ── 5. Stop on exception ───────────────────────────────────────────
362
363    #[tokio::test]
364    async fn test_multicast_stop_on_exception() {
365        let endpoints = vec![
366            uppercase_processor(),
367            failing_processor(),
368            uppercase_processor(),
369        ];
370
371        let config = MulticastConfig::new().stop_on_exception(true);
372        let mut svc = MulticastService::new(endpoints, config);
373
374        let result = svc
375            .ready()
376            .await
377            .unwrap()
378            .call(make_exchange("hello"))
379            .await;
380
381        assert!(result.is_err(), "expected error due to stop_on_exception");
382    }
383
384    // ── 6. Continue on exception ───────────────────────────────────────
385
386    #[tokio::test]
387    async fn test_multicast_continue_on_exception() {
388        let endpoints = vec![
389            uppercase_processor(),
390            failing_processor(),
391            uppercase_processor(),
392        ];
393
394        let config = MulticastConfig::new()
395            .stop_on_exception(false)
396            .aggregation(MulticastStrategy::LastWins);
397        let mut svc = MulticastService::new(endpoints, config);
398
399        let result = svc
400            .ready()
401            .await
402            .unwrap()
403            .call(make_exchange("hello"))
404            .await;
405
406        // LastWins: last endpoint succeeded, so result should be OK
407        assert!(result.is_ok(), "last endpoint should succeed");
408        assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
409    }
410
411    // ── 7. Stop on exception with fail_on_nth ─────────────────────────────
412
413    #[tokio::test]
414    async fn test_multicast_stop_on_exception_halts_early() {
415        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
416
417        // Track which endpoints actually execute
418        let executed = Arc::new(AtomicUsize::new(0));
419
420        let exec_clone1 = Arc::clone(&executed);
421        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
422            let e = Arc::clone(&exec_clone1);
423            Box::pin(async move {
424                e.fetch_add(1, AtomicOrdering::SeqCst);
425                Ok(ex)
426            })
427        });
428
429        let exec_clone2 = Arc::clone(&executed);
430        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
431            let e = Arc::clone(&exec_clone2);
432            Box::pin(async move {
433                e.fetch_add(1, AtomicOrdering::SeqCst);
434                Err(CamelError::ProcessorError("fail on 1".into()))
435            })
436        });
437
438        let exec_clone3 = Arc::clone(&executed);
439        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
440            let e = Arc::clone(&exec_clone3);
441            Box::pin(async move {
442                e.fetch_add(1, AtomicOrdering::SeqCst);
443                Ok(ex)
444            })
445        });
446
447        let endpoints = vec![endpoint0, endpoint1, endpoint2];
448        let config = MulticastConfig::new().stop_on_exception(true);
449        let mut svc = MulticastService::new(endpoints, config);
450
451        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
452        assert!(result.is_err(), "should fail at endpoint 1");
453
454        // Only endpoints 0 and 1 should have executed (2 should be skipped)
455        let count = executed.load(AtomicOrdering::SeqCst);
456        assert_eq!(
457            count, 2,
458            "endpoint 2 should not have executed due to stop_on_exception"
459        );
460    }
461
462    // ── 8. Continue on exception with fail_on_nth ─────────────────────────
463
464    #[tokio::test]
465    async fn test_multicast_continue_on_exception_executes_all() {
466        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
467
468        // Track which endpoints actually execute
469        let executed = Arc::new(AtomicUsize::new(0));
470
471        let exec_clone1 = Arc::clone(&executed);
472        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
473            let e = Arc::clone(&exec_clone1);
474            Box::pin(async move {
475                e.fetch_add(1, AtomicOrdering::SeqCst);
476                Ok(ex)
477            })
478        });
479
480        let exec_clone2 = Arc::clone(&executed);
481        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
482            let e = Arc::clone(&exec_clone2);
483            Box::pin(async move {
484                e.fetch_add(1, AtomicOrdering::SeqCst);
485                Err(CamelError::ProcessorError("fail on 1".into()))
486            })
487        });
488
489        let exec_clone3 = Arc::clone(&executed);
490        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
491            let e = Arc::clone(&exec_clone3);
492            Box::pin(async move {
493                e.fetch_add(1, AtomicOrdering::SeqCst);
494                Ok(ex)
495            })
496        });
497
498        let endpoints = vec![endpoint0, endpoint1, endpoint2];
499        let config = MulticastConfig::new()
500            .stop_on_exception(false)
501            .aggregation(MulticastStrategy::LastWins);
502        let mut svc = MulticastService::new(endpoints, config);
503
504        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
505        assert!(result.is_ok(), "last endpoint should succeed");
506
507        // All 3 endpoints should have executed
508        let count = executed.load(AtomicOrdering::SeqCst);
509        assert_eq!(
510            count, 3,
511            "all endpoints should have executed despite error in endpoint 1"
512        );
513    }
514
515    // ── 9. Empty endpoints ─────────────────────────────────────────────
516
517    #[tokio::test]
518    async fn test_multicast_empty_endpoints() {
519        let endpoints: Vec<BoxProcessor> = vec![];
520
521        let config = MulticastConfig::new();
522        let mut svc = MulticastService::new(endpoints, config);
523
524        let mut ex = make_exchange("hello");
525        ex.set_property("marker", Value::Bool(true));
526
527        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
528        assert_eq!(result.input.body.as_text(), Some("hello"));
529        assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
530    }
531
532    // ── 10. Metadata properties ─────────────────────────────────────────
533
534    #[tokio::test]
535    async fn test_multicast_metadata_properties() {
536        // Use a pipeline that records the metadata into the body as JSON
537        let recorder = BoxProcessor::from_fn(|ex: Exchange| {
538            Box::pin(async move {
539                let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
540                let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
541                let body = serde_json::json!({
542                    "index": idx,
543                    "complete": complete,
544                });
545                let mut out = ex;
546                out.input.body = Body::Json(body);
547                Ok(out)
548            })
549        });
550
551        let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
552
553        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
554        let mut svc = MulticastService::new(endpoints, config);
555
556        let result = svc
557            .ready()
558            .await
559            .unwrap()
560            .call(make_exchange("x"))
561            .await
562            .unwrap();
563
564        let expected = serde_json::json!([
565            {"index": 0, "complete": false},
566            {"index": 1, "complete": false},
567            {"index": 2, "complete": true},
568        ]);
569        match &result.input.body {
570            Body::Json(v) => assert_eq!(*v, expected),
571            other => panic!("expected JSON body, got {other:?}"),
572        }
573    }
574
575    // ── 11. poll_ready delegates to endpoints ────────────────────────────
576
577    #[tokio::test]
578    async fn test_poll_ready_delegates_to_endpoints() {
579        use std::sync::atomic::AtomicBool;
580
581        // A service that is initially not ready, then becomes ready.
582        #[derive(Clone)]
583        struct DelayedReady {
584            ready: Arc<AtomicBool>,
585        }
586
587        impl Service<Exchange> for DelayedReady {
588            type Response = Exchange;
589            type Error = CamelError;
590            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
591
592            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
593                if self.ready.load(Ordering::SeqCst) {
594                    Poll::Ready(Ok(()))
595                } else {
596                    cx.waker().wake_by_ref();
597                    Poll::Pending
598                }
599            }
600
601            fn call(&mut self, exchange: Exchange) -> Self::Future {
602                Box::pin(async move { Ok(exchange) })
603            }
604        }
605
606        let ready_flag = Arc::new(AtomicBool::new(false));
607        let inner = DelayedReady {
608            ready: Arc::clone(&ready_flag),
609        };
610        let boxed: BoxProcessor = BoxProcessor::new(inner);
611
612        let config = MulticastConfig::new();
613        let mut svc = MulticastService::new(vec![boxed], config);
614
615        // First poll should be Pending.
616        let waker = futures::task::noop_waker();
617        let mut cx = Context::from_waker(&waker);
618        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
619        assert!(
620            poll.is_pending(),
621            "expected Pending when endpoint not ready"
622        );
623
624        // Mark endpoint as ready.
625        ready_flag.store(true, Ordering::SeqCst);
626
627        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
628        assert!(
629            matches!(poll, Poll::Ready(Ok(()))),
630            "expected Ready after endpoint becomes ready"
631        );
632    }
633
634    // ── 12. CollectAll with error propagates ────────────────────────────
635
636    #[tokio::test]
637    async fn test_multicast_collect_all_error_propagates() {
638        let endpoints = vec![
639            uppercase_processor(),
640            failing_processor(),
641            uppercase_processor(),
642        ];
643
644        let config = MulticastConfig::new()
645            .stop_on_exception(false)
646            .aggregation(MulticastStrategy::CollectAll);
647        let mut svc = MulticastService::new(endpoints, config);
648
649        let result = svc
650            .ready()
651            .await
652            .unwrap()
653            .call(make_exchange("hello"))
654            .await;
655
656        assert!(result.is_err(), "CollectAll should propagate first error");
657    }
658
659    // ── 13. LastWins with error last returns error ──────────────────────
660
661    #[tokio::test]
662    async fn test_multicast_last_wins_error_last() {
663        let endpoints = vec![
664            uppercase_processor(),
665            uppercase_processor(),
666            failing_processor(),
667        ];
668
669        let config = MulticastConfig::new()
670            .stop_on_exception(false)
671            .aggregation(MulticastStrategy::LastWins);
672        let mut svc = MulticastService::new(endpoints, config);
673
674        let result = svc
675            .ready()
676            .await
677            .unwrap()
678            .call(make_exchange("hello"))
679            .await;
680
681        assert!(result.is_err(), "LastWins should return last error");
682    }
683
684    // ── 14. Custom aggregation with error propagates ────────────────────
685
686    #[tokio::test]
687    async fn test_multicast_custom_error_propagates() {
688        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
689            Arc::new(|acc: Exchange, _next: Exchange| acc);
690
691        let endpoints = vec![
692            uppercase_processor(),
693            failing_processor(),
694            uppercase_processor(),
695        ];
696
697        let config = MulticastConfig::new()
698            .stop_on_exception(false)
699            .aggregation(MulticastStrategy::Custom(joiner));
700        let mut svc = MulticastService::new(endpoints, config);
701
702        let result = svc
703            .ready()
704            .await
705            .unwrap()
706            .call(make_exchange("hello"))
707            .await;
708
709        assert!(
710            result.is_err(),
711            "Custom aggregation should propagate errors"
712        );
713    }
714
715    // ── 15. Parallel + CollectAll basic ─────────────────────────────────
716
717    #[tokio::test]
718    async fn test_multicast_parallel_basic() {
719        let endpoints = vec![uppercase_processor(), uppercase_processor()];
720
721        let config = MulticastConfig::new()
722            .parallel(true)
723            .aggregation(MulticastStrategy::CollectAll);
724        let mut svc = MulticastService::new(endpoints, config);
725
726        let result = svc
727            .ready()
728            .await
729            .unwrap()
730            .call(make_exchange("test"))
731            .await
732            .unwrap();
733
734        // Both endpoints uppercase "test" → ["TEST", "TEST"]
735        // Note: parallel order is not guaranteed for CollectAll, but with identical processors it doesn't matter
736        match &result.input.body {
737            Body::Json(v) => {
738                let arr = v.as_array().expect("expected array");
739                assert_eq!(arr.len(), 2);
740                assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
741            }
742            other => panic!("expected JSON body, got {:?}", other),
743        }
744    }
745
746    // ── 16. Parallel with concurrency limit ─────────────────────────────
747
748    #[tokio::test]
749    async fn test_multicast_parallel_with_limit() {
750        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
751
752        let concurrent = Arc::new(AtomicUsize::new(0));
753        let max_concurrent = Arc::new(AtomicUsize::new(0));
754
755        let endpoints: Vec<BoxProcessor> = (0..4)
756            .map(|_| {
757                let c = Arc::clone(&concurrent);
758                let mc = Arc::clone(&max_concurrent);
759                BoxProcessor::from_fn(move |ex: Exchange| {
760                    let c = Arc::clone(&c);
761                    let mc = Arc::clone(&mc);
762                    Box::pin(async move {
763                        let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
764                        mc.fetch_max(current, AtomicOrdering::SeqCst);
765                        tokio::task::yield_now().await;
766                        c.fetch_sub(1, AtomicOrdering::SeqCst);
767                        Ok(ex)
768                    })
769                })
770            })
771            .collect();
772
773        let config = MulticastConfig::new().parallel(true).parallel_limit(2);
774        let mut svc = MulticastService::new(endpoints, config);
775
776        let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
777
778        let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
779        assert!(
780            observed_max <= 2,
781            "max concurrency was {}, expected <= 2",
782            observed_max
783        );
784    }
785
786    // ── 17. Stream body creates valid JSON placeholder ────────────────────
787
788    async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
789        use bytes::Bytes;
790        use camel_api::{Body, StreamBody, StreamMetadata};
791        use futures::stream;
792        use std::sync::Arc;
793        use tokio::sync::Mutex;
794
795        let chunks = vec![Ok(Bytes::from("test"))];
796        let stream_body = StreamBody {
797            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
798            metadata: StreamMetadata {
799                origin,
800                ..Default::default()
801            },
802        };
803
804        let stream_body_clone = stream_body.clone();
805        let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
806            let body_clone = stream_body_clone.clone();
807            Box::pin(async move {
808                let mut out = ex;
809                out.input.body = Body::Stream(body_clone);
810                Ok(out)
811            })
812        })];
813
814        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
815        let mut svc = MulticastService::new(endpoints, config);
816
817        svc.ready()
818            .await
819            .unwrap()
820            .call(Exchange::new(Message::new("")))
821            .await
822            .unwrap()
823    }
824
825    #[tokio::test]
826    async fn test_multicast_stream_bodies_creates_valid_json() {
827        use camel_api::Body;
828
829        let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
830
831        let Body::Json(value) = &result.input.body else {
832            panic!("Expected Json body, got {:?}", result.input.body);
833        };
834
835        let json_str = serde_json::to_string(&value).unwrap();
836        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
837
838        assert!(parsed.is_array());
839        let arr = parsed.as_array().unwrap();
840        assert_eq!(arr.len(), 1);
841        assert!(arr[0]["_stream"].is_object());
842        assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
843        assert_eq!(arr[0]["_stream"]["placeholder"], true);
844    }
845
846    // ── 18. Stream body with None origin creates valid JSON ──────────────
847
848    #[tokio::test]
849    async fn test_multicast_stream_with_none_origin_creates_valid_json() {
850        use camel_api::Body;
851
852        let result = setup_multicast_stream_test(None).await;
853
854        let Body::Json(value) = &result.input.body else {
855            panic!("Expected Json body, got {:?}", result.input.body);
856        };
857
858        let json_str = serde_json::to_string(&value).unwrap();
859        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
860
861        assert!(parsed.is_array());
862        let arr = parsed.as_array().unwrap();
863        assert_eq!(arr.len(), 1);
864        assert!(arr[0]["_stream"].is_object());
865        assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
866        assert_eq!(arr[0]["_stream"]["placeholder"], true);
867    }
868}