Skip to main content

camel_processor/
multicast.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8    Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11// ── Metadata property keys ─────────────────────────────────────────────
12
13/// Property key for the zero-based index of the endpoint being invoked.
14pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15/// Property key indicating whether this is the last endpoint invocation.
16pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18// ── MulticastService ───────────────────────────────────────────────────
19
20/// Tower Service implementing the Multicast EIP.
21///
22/// Sends a message to multiple endpoints, processing each independently,
23/// and then aggregating the results.
24///
25/// Supports both sequential and parallel processing modes, configurable
26/// via [`MulticastConfig::parallel`]. When parallel mode is enabled,
27/// all endpoints are invoked concurrently with optional concurrency
28/// limiting via [`MulticastConfig::parallel_limit`].
29#[derive(Clone)]
30pub struct MulticastService {
31    endpoints: Vec<BoxProcessor>,
32    config: MulticastConfig,
33}
34
35impl MulticastService {
36    /// Create a new `MulticastService` from a list of endpoints and a [`MulticastConfig`].
37    pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Result<Self, CamelError> {
38        config.validate()?;
39        Ok(Self { endpoints, config })
40    }
41}
42
43impl Service<Exchange> for MulticastService {
44    type Response = Exchange;
45    type Error = CamelError;
46    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
47
48    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49        // Do NOT aggregate endpoint readiness here.
50        // Each endpoint's readiness is checked per-endpoint inside
51        // process_sequential / process_parallel where stop_on_exception
52        // is respected. Fail-fast here would bypass that logic entirely.
53        Poll::Ready(Ok(()))
54    }
55
56    fn call(&mut self, exchange: Exchange) -> Self::Future {
57        let original = exchange.clone();
58        let endpoints = self.endpoints.clone();
59        let config = self.config.clone();
60
61        Box::pin(async move {
62            // If no endpoints, return original exchange unchanged
63            if endpoints.is_empty() {
64                return Ok(original);
65            }
66
67            let total = endpoints.len();
68
69            let results = if config.parallel {
70                // Process endpoints in parallel
71                process_parallel(exchange, endpoints, config.parallel_limit, total).await
72            } else {
73                // Process each endpoint sequentially
74                process_sequential(exchange, endpoints, config.stop_on_exception, total).await
75            };
76
77            // Aggregate results per strategy
78            aggregate(results, original, config.aggregation)
79        })
80    }
81}
82
83// ── Sequential processing ──────────────────────────────────────────────
84
85async fn process_sequential(
86    exchange: Exchange,
87    endpoints: Vec<BoxProcessor>,
88    stop_on_exception: bool,
89    total: usize,
90) -> Vec<Result<Exchange, CamelError>> {
91    let mut results = Vec::with_capacity(endpoints.len());
92
93    for (i, endpoint) in endpoints.into_iter().enumerate() {
94        // Clone the exchange for each endpoint
95        let mut cloned_exchange = exchange.clone();
96
97        // Set multicast metadata properties
98        cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
99        cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
100
101        let mut endpoint = endpoint;
102        match tower::ServiceExt::ready(&mut endpoint).await {
103            Err(e) => {
104                results.push(Err(e));
105                if stop_on_exception {
106                    break;
107                }
108            }
109            Ok(svc) => {
110                let result = svc.call(cloned_exchange).await;
111                let is_err = result.is_err();
112                results.push(result);
113                if stop_on_exception && is_err {
114                    break;
115                }
116            }
117        }
118    }
119
120    results
121}
122
123// ── Parallel processing ────────────────────────────────────────────────
124
125async fn process_parallel(
126    exchange: Exchange,
127    endpoints: Vec<BoxProcessor>,
128    parallel_limit: Option<usize>,
129    total: usize,
130) -> Vec<Result<Exchange, CamelError>> {
131    use std::sync::Arc;
132    use tokio::sync::Semaphore;
133
134    let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
135
136    // Build futures for each endpoint
137    let futures: Vec<_> = endpoints
138        .into_iter()
139        .enumerate()
140        .map(|(i, mut endpoint)| {
141            let mut ex = exchange.clone();
142            ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
143            ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
144            let sem = semaphore.clone();
145            async move {
146                // Acquire semaphore permit if limit is set
147                let _permit = match &sem {
148                    Some(s) => match s.acquire().await {
149                        Ok(p) => Some(p),
150                        Err(_) => {
151                            return Err(CamelError::ProcessorError("semaphore closed".to_string()));
152                        }
153                    },
154                    None => None,
155                };
156
157                // Readiness errors propagate via `?` into the per-endpoint result;
158                // join_all ensures all endpoints run independently (no early abort).
159                tower::ServiceExt::ready(&mut endpoint).await?;
160                endpoint.call(ex).await
161            }
162        })
163        .collect();
164
165    // Execute all futures concurrently and collect results
166    futures::future::join_all(futures).await
167}
168
169// ── Aggregation ────────────────────────────────────────────────────────
170
171fn aggregate(
172    results: Vec<Result<Exchange, CamelError>>,
173    original: Exchange,
174    strategy: MulticastStrategy,
175) -> Result<Exchange, CamelError> {
176    match strategy {
177        MulticastStrategy::LastWins => {
178            // Return the last result (error or success).
179            // If last result is Err and stop_on_exception=false, return that error.
180            results.into_iter().last().unwrap_or_else(|| Ok(original))
181        }
182        MulticastStrategy::CollectAll => {
183            // Collect all bodies into a JSON array. Errors propagate.
184            let mut bodies = Vec::new();
185            for result in results {
186                let ex = result?;
187                let value = match &ex.input.body {
188                    Body::Text(s) => Value::String(s.clone()),
189                    Body::Json(v) => v.clone(),
190                    Body::Xml(s) => Value::String(s.clone()),
191                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
192                    Body::Empty => Value::Null,
193                    Body::Stream(s) => serde_json::json!({
194                        "_stream": {
195                            "origin": s.metadata.origin,
196                            "placeholder": true,
197                            "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
198                        }
199                    }),
200                };
201                bodies.push(value);
202            }
203            let mut out = original;
204            out.input.body = Body::Json(Value::Array(bodies));
205            Ok(out)
206        }
207        MulticastStrategy::Original => Ok(original),
208        MulticastStrategy::Custom(fold_fn) => {
209            // Fold using the custom function, starting from the first result.
210            let mut iter = results.into_iter();
211            let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
212            iter.try_fold(first, |acc, next_result| {
213                let next = next_result?;
214                Ok(fold_fn(acc, next))
215            })
216        }
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use camel_api::{BoxProcessorExt, Message};
224    use std::sync::Arc;
225    use std::sync::atomic::Ordering;
226    use tower::ServiceExt;
227
228    // ── Test helpers ───────────────────────────────────────────────────
229
230    fn make_exchange(body: &str) -> Exchange {
231        Exchange::new(Message::new(body))
232    }
233
234    fn uppercase_processor() -> BoxProcessor {
235        BoxProcessor::from_fn(|mut ex: Exchange| {
236            Box::pin(async move {
237                if let Body::Text(s) = &ex.input.body {
238                    ex.input.body = Body::Text(s.to_uppercase());
239                }
240                Ok(ex)
241            })
242        })
243    }
244
245    fn failing_processor() -> BoxProcessor {
246        BoxProcessor::from_fn(|_ex| {
247            Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
248        })
249    }
250
251    #[test]
252    fn test_multicast_zero_parallel_limit_rejected() {
253        let config = MulticastConfig::new().parallel(true).parallel_limit(0);
254        let result = MulticastService::new(vec![passthrough_processor()], config);
255        assert!(result.is_err(), "zero parallel_limit should return Err");
256    }
257
258    fn passthrough_processor() -> BoxProcessor {
259        BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
260    }
261
262    // ── 1. Sequential + LastWins ───────────────────────────────────────
263
264    #[tokio::test]
265    async fn test_multicast_sequential_last_wins() {
266        let endpoints = vec![
267            uppercase_processor(),
268            uppercase_processor(),
269            uppercase_processor(),
270        ];
271
272        let config = MulticastConfig::new(); // LastWins by default
273        let mut svc = MulticastService::new(endpoints, config).unwrap();
274
275        let result = svc
276            .ready()
277            .await
278            .unwrap()
279            .call(make_exchange("hello"))
280            .await
281            .unwrap();
282
283        assert_eq!(result.input.body.as_text(), Some("HELLO"));
284    }
285
286    // ── 2. Sequential + CollectAll ─────────────────────────────────────
287
288    #[tokio::test]
289    async fn test_multicast_sequential_collect_all() {
290        let endpoints = vec![
291            uppercase_processor(),
292            uppercase_processor(),
293            uppercase_processor(),
294        ];
295
296        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
297        let mut svc = MulticastService::new(endpoints, config).unwrap();
298
299        let result = svc
300            .ready()
301            .await
302            .unwrap()
303            .call(make_exchange("hello"))
304            .await
305            .unwrap();
306
307        let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
308        match &result.input.body {
309            Body::Json(v) => assert_eq!(*v, expected),
310            other => panic!("expected JSON body, got {other:?}"),
311        }
312    }
313
314    // ── 3. Sequential + Original ───────────────────────────────────────
315
316    #[tokio::test]
317    async fn test_multicast_sequential_original() {
318        let endpoints = vec![
319            uppercase_processor(),
320            uppercase_processor(),
321            uppercase_processor(),
322        ];
323
324        let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
325        let mut svc = MulticastService::new(endpoints, config).unwrap();
326
327        let result = svc
328            .ready()
329            .await
330            .unwrap()
331            .call(make_exchange("hello"))
332            .await
333            .unwrap();
334
335        // Original body should be unchanged
336        assert_eq!(result.input.body.as_text(), Some("hello"));
337    }
338
339    // ── 4. Sequential + Custom aggregation ─────────────────────────────
340
341    #[tokio::test]
342    async fn test_multicast_sequential_custom_aggregation() {
343        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
344            Arc::new(|mut acc: Exchange, next: Exchange| {
345                let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
346                let next_text = next.input.body.as_text().unwrap_or("").to_string();
347                acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
348                acc
349            });
350
351        let endpoints = vec![
352            uppercase_processor(),
353            uppercase_processor(),
354            uppercase_processor(),
355        ];
356
357        let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
358        let mut svc = MulticastService::new(endpoints, config).unwrap();
359
360        let result = svc
361            .ready()
362            .await
363            .unwrap()
364            .call(make_exchange("a"))
365            .await
366            .unwrap();
367
368        assert_eq!(result.input.body.as_text(), Some("A+A+A"));
369    }
370
371    // ── 5. Stop on exception ───────────────────────────────────────────
372
373    #[tokio::test]
374    async fn test_multicast_stop_on_exception() {
375        let endpoints = vec![
376            uppercase_processor(),
377            failing_processor(),
378            uppercase_processor(),
379        ];
380
381        let config = MulticastConfig::new().stop_on_exception(true);
382        let mut svc = MulticastService::new(endpoints, config).unwrap();
383
384        let result = svc
385            .ready()
386            .await
387            .unwrap()
388            .call(make_exchange("hello"))
389            .await;
390
391        assert!(result.is_err(), "expected error due to stop_on_exception");
392    }
393
394    // ── 6. Continue on exception ───────────────────────────────────────
395
396    #[tokio::test]
397    async fn test_multicast_continue_on_exception() {
398        let endpoints = vec![
399            uppercase_processor(),
400            failing_processor(),
401            uppercase_processor(),
402        ];
403
404        let config = MulticastConfig::new()
405            .stop_on_exception(false)
406            .aggregation(MulticastStrategy::LastWins);
407        let mut svc = MulticastService::new(endpoints, config).unwrap();
408
409        let result = svc
410            .ready()
411            .await
412            .unwrap()
413            .call(make_exchange("hello"))
414            .await;
415
416        // LastWins: last endpoint succeeded, so result should be OK
417        assert!(result.is_ok(), "last endpoint should succeed");
418        assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
419    }
420
421    // ── 7. Stop on exception with fail_on_nth ─────────────────────────────
422
423    #[tokio::test]
424    async fn test_multicast_stop_on_exception_halts_early() {
425        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
426
427        // Track which endpoints actually execute
428        let executed = Arc::new(AtomicUsize::new(0));
429
430        let exec_clone1 = Arc::clone(&executed);
431        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
432            let e = Arc::clone(&exec_clone1);
433            Box::pin(async move {
434                e.fetch_add(1, AtomicOrdering::SeqCst);
435                Ok(ex)
436            })
437        });
438
439        let exec_clone2 = Arc::clone(&executed);
440        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
441            let e = Arc::clone(&exec_clone2);
442            Box::pin(async move {
443                e.fetch_add(1, AtomicOrdering::SeqCst);
444                Err(CamelError::ProcessorError("fail on 1".into()))
445            })
446        });
447
448        let exec_clone3 = Arc::clone(&executed);
449        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
450            let e = Arc::clone(&exec_clone3);
451            Box::pin(async move {
452                e.fetch_add(1, AtomicOrdering::SeqCst);
453                Ok(ex)
454            })
455        });
456
457        let endpoints = vec![endpoint0, endpoint1, endpoint2];
458        let config = MulticastConfig::new().stop_on_exception(true);
459        let mut svc = MulticastService::new(endpoints, config).unwrap();
460
461        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
462        assert!(result.is_err(), "should fail at endpoint 1");
463
464        // Only endpoints 0 and 1 should have executed (2 should be skipped)
465        let count = executed.load(AtomicOrdering::SeqCst);
466        assert_eq!(
467            count, 2,
468            "endpoint 2 should not have executed due to stop_on_exception"
469        );
470    }
471
472    // ── 8. Continue on exception with fail_on_nth ─────────────────────────
473
474    #[tokio::test]
475    async fn test_multicast_continue_on_exception_executes_all() {
476        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
477
478        // Track which endpoints actually execute
479        let executed = Arc::new(AtomicUsize::new(0));
480
481        let exec_clone1 = Arc::clone(&executed);
482        let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
483            let e = Arc::clone(&exec_clone1);
484            Box::pin(async move {
485                e.fetch_add(1, AtomicOrdering::SeqCst);
486                Ok(ex)
487            })
488        });
489
490        let exec_clone2 = Arc::clone(&executed);
491        let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
492            let e = Arc::clone(&exec_clone2);
493            Box::pin(async move {
494                e.fetch_add(1, AtomicOrdering::SeqCst);
495                Err(CamelError::ProcessorError("fail on 1".into()))
496            })
497        });
498
499        let exec_clone3 = Arc::clone(&executed);
500        let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
501            let e = Arc::clone(&exec_clone3);
502            Box::pin(async move {
503                e.fetch_add(1, AtomicOrdering::SeqCst);
504                Ok(ex)
505            })
506        });
507
508        let endpoints = vec![endpoint0, endpoint1, endpoint2];
509        let config = MulticastConfig::new()
510            .stop_on_exception(false)
511            .aggregation(MulticastStrategy::LastWins);
512        let mut svc = MulticastService::new(endpoints, config).unwrap();
513
514        let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
515        assert!(result.is_ok(), "last endpoint should succeed");
516
517        // All 3 endpoints should have executed
518        let count = executed.load(AtomicOrdering::SeqCst);
519        assert_eq!(
520            count, 3,
521            "all endpoints should have executed despite error in endpoint 1"
522        );
523    }
524
525    // ── 9. Empty endpoints ─────────────────────────────────────────────
526
527    #[tokio::test]
528    async fn test_multicast_empty_endpoints() {
529        let endpoints: Vec<BoxProcessor> = vec![];
530
531        let config = MulticastConfig::new();
532        let mut svc = MulticastService::new(endpoints, config).unwrap();
533
534        let mut ex = make_exchange("hello");
535        ex.set_property("marker", Value::Bool(true));
536
537        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
538        assert_eq!(result.input.body.as_text(), Some("hello"));
539        assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
540    }
541
542    // ── 10. Metadata properties ─────────────────────────────────────────
543
544    #[tokio::test]
545    async fn test_multicast_metadata_properties() {
546        // Use a pipeline that records the metadata into the body as JSON
547        let recorder = BoxProcessor::from_fn(|ex: Exchange| {
548            Box::pin(async move {
549                let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
550                let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
551                let body = serde_json::json!({
552                    "index": idx,
553                    "complete": complete,
554                });
555                let mut out = ex;
556                out.input.body = Body::Json(body);
557                Ok(out)
558            })
559        });
560
561        let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
562
563        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
564        let mut svc = MulticastService::new(endpoints, config).unwrap();
565
566        let result = svc
567            .ready()
568            .await
569            .unwrap()
570            .call(make_exchange("x"))
571            .await
572            .unwrap();
573
574        let expected = serde_json::json!([
575            {"index": 0, "complete": false},
576            {"index": 1, "complete": false},
577            {"index": 2, "complete": true},
578        ]);
579        match &result.input.body {
580            Body::Json(v) => assert_eq!(*v, expected),
581            other => panic!("expected JSON body, got {other:?}"),
582        }
583    }
584
585    // ── 11. poll_ready returns Ready immediately ───────────────────────
586
587    #[tokio::test]
588    async fn test_poll_ready_returns_ready_immediately() {
589        use std::sync::atomic::AtomicBool;
590
591        // A service that is never ready on its own.
592        #[derive(Clone)]
593        struct NeverReady {
594            _ready: Arc<AtomicBool>,
595        }
596
597        impl Service<Exchange> for NeverReady {
598            type Response = Exchange;
599            type Error = CamelError;
600            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
601
602            fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
603                cx.waker().wake_by_ref();
604                Poll::Pending
605            }
606
607            fn call(&mut self, exchange: Exchange) -> Self::Future {
608                Box::pin(async move { Ok(exchange) })
609            }
610        }
611
612        let inner = NeverReady {
613            _ready: Arc::new(AtomicBool::new(false)),
614        };
615        let boxed: BoxProcessor = BoxProcessor::new(inner);
616
617        let config = MulticastConfig::new();
618        let mut svc = MulticastService::new(vec![boxed], config).unwrap();
619
620        // MulticastService::poll_ready should return Ready(Ok(())) immediately,
621        // even when the underlying endpoint is not ready. Readiness is deferred
622        // to inside call() where stop_on_exception is respected.
623        let waker = futures::task::noop_waker();
624        let mut cx = Context::from_waker(&waker);
625        let poll = Pin::new(&mut svc).poll_ready(&mut cx);
626        assert!(
627            matches!(poll, Poll::Ready(Ok(()))),
628            "expected Ready(Ok(())) even when endpoint is not ready"
629        );
630    }
631
632    // ── 12. CollectAll with error propagates ────────────────────────────
633
634    #[tokio::test]
635    async fn test_multicast_collect_all_error_propagates() {
636        let endpoints = vec![
637            uppercase_processor(),
638            failing_processor(),
639            uppercase_processor(),
640        ];
641
642        let config = MulticastConfig::new()
643            .stop_on_exception(false)
644            .aggregation(MulticastStrategy::CollectAll);
645        let mut svc = MulticastService::new(endpoints, config).unwrap();
646
647        let result = svc
648            .ready()
649            .await
650            .unwrap()
651            .call(make_exchange("hello"))
652            .await;
653
654        assert!(result.is_err(), "CollectAll should propagate first error");
655    }
656
657    // ── 13. LastWins with error last returns error ──────────────────────
658
659    #[tokio::test]
660    async fn test_multicast_last_wins_error_last() {
661        let endpoints = vec![
662            uppercase_processor(),
663            uppercase_processor(),
664            failing_processor(),
665        ];
666
667        let config = MulticastConfig::new()
668            .stop_on_exception(false)
669            .aggregation(MulticastStrategy::LastWins);
670        let mut svc = MulticastService::new(endpoints, config).unwrap();
671
672        let result = svc
673            .ready()
674            .await
675            .unwrap()
676            .call(make_exchange("hello"))
677            .await;
678
679        assert!(result.is_err(), "LastWins should return last error");
680    }
681
682    // ── 14. Custom aggregation with error propagates ────────────────────
683
684    #[tokio::test]
685    async fn test_multicast_custom_error_propagates() {
686        let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
687            Arc::new(|acc: Exchange, _next: Exchange| acc);
688
689        let endpoints = vec![
690            uppercase_processor(),
691            failing_processor(),
692            uppercase_processor(),
693        ];
694
695        let config = MulticastConfig::new()
696            .stop_on_exception(false)
697            .aggregation(MulticastStrategy::Custom(joiner));
698        let mut svc = MulticastService::new(endpoints, config).unwrap();
699
700        let result = svc
701            .ready()
702            .await
703            .unwrap()
704            .call(make_exchange("hello"))
705            .await;
706
707        assert!(
708            result.is_err(),
709            "Custom aggregation should propagate errors"
710        );
711    }
712
713    // ── 15. Parallel + CollectAll basic ─────────────────────────────────
714
715    #[tokio::test]
716    async fn test_multicast_parallel_basic() {
717        let endpoints = vec![uppercase_processor(), uppercase_processor()];
718
719        let config = MulticastConfig::new()
720            .parallel(true)
721            .aggregation(MulticastStrategy::CollectAll);
722        let mut svc = MulticastService::new(endpoints, config).unwrap();
723
724        let result = svc
725            .ready()
726            .await
727            .unwrap()
728            .call(make_exchange("test"))
729            .await
730            .unwrap();
731
732        // Both endpoints uppercase "test" → ["TEST", "TEST"]
733        // Note: parallel order is not guaranteed for CollectAll, but with identical processors it doesn't matter
734        match &result.input.body {
735            Body::Json(v) => {
736                let arr = v.as_array().expect("expected array");
737                assert_eq!(arr.len(), 2);
738                assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
739            }
740            other => panic!("expected JSON body, got {:?}", other),
741        }
742    }
743
744    // ── 16. Parallel with concurrency limit ─────────────────────────────
745
746    #[tokio::test]
747    async fn test_multicast_parallel_with_limit() {
748        use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
749
750        let concurrent = Arc::new(AtomicUsize::new(0));
751        let max_concurrent = Arc::new(AtomicUsize::new(0));
752
753        let endpoints: Vec<BoxProcessor> = (0..4)
754            .map(|_| {
755                let c = Arc::clone(&concurrent);
756                let mc = Arc::clone(&max_concurrent);
757                BoxProcessor::from_fn(move |ex: Exchange| {
758                    let c = Arc::clone(&c);
759                    let mc = Arc::clone(&mc);
760                    Box::pin(async move {
761                        let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
762                        mc.fetch_max(current, AtomicOrdering::SeqCst);
763                        tokio::task::yield_now().await;
764                        c.fetch_sub(1, AtomicOrdering::SeqCst);
765                        Ok(ex)
766                    })
767                })
768            })
769            .collect();
770
771        let config = MulticastConfig::new().parallel(true).parallel_limit(2);
772        let mut svc = MulticastService::new(endpoints, config).unwrap();
773
774        let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
775
776        let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
777        assert!(
778            observed_max <= 2,
779            "max concurrency was {}, expected <= 2",
780            observed_max
781        );
782    }
783
784    // ── 17. Stream body creates valid JSON placeholder ────────────────────
785
786    async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
787        use bytes::Bytes;
788        use camel_api::{Body, StreamBody, StreamMetadata};
789        use futures::stream;
790        use std::sync::Arc;
791        use tokio::sync::Mutex;
792
793        let chunks = vec![Ok(Bytes::from("test"))];
794        let stream_body = StreamBody {
795            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
796            metadata: StreamMetadata {
797                origin,
798                ..Default::default()
799            },
800        };
801
802        let stream_body_clone = stream_body.clone();
803        let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
804            let body_clone = stream_body_clone.clone();
805            Box::pin(async move {
806                let mut out = ex;
807                out.input.body = Body::Stream(body_clone);
808                Ok(out)
809            })
810        })];
811
812        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
813        let mut svc = MulticastService::new(endpoints, config).unwrap();
814
815        svc.ready()
816            .await
817            .unwrap()
818            .call(Exchange::new(Message::new("")))
819            .await
820            .unwrap()
821    }
822
823    #[tokio::test]
824    async fn test_multicast_stream_bodies_creates_valid_json() {
825        use camel_api::Body;
826
827        let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
828
829        let Body::Json(value) = &result.input.body else {
830            panic!("Expected Json body, got {:?}", result.input.body);
831        };
832
833        let json_str = serde_json::to_string(&value).unwrap();
834        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
835
836        assert!(parsed.is_array());
837        let arr = parsed.as_array().unwrap();
838        assert_eq!(arr.len(), 1);
839        assert!(arr[0]["_stream"].is_object());
840        assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
841        assert_eq!(arr[0]["_stream"]["placeholder"], true);
842    }
843
844    // ── 18. Stream body with None origin creates valid JSON ──────────────
845
846    #[tokio::test]
847    async fn test_multicast_stream_with_none_origin_creates_valid_json() {
848        use camel_api::Body;
849
850        let result = setup_multicast_stream_test(None).await;
851
852        let Body::Json(value) = &result.input.body else {
853            panic!("Expected Json body, got {:?}", result.input.body);
854        };
855
856        let json_str = serde_json::to_string(&value).unwrap();
857        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
858
859        assert!(parsed.is_array());
860        let arr = parsed.as_array().unwrap();
861        assert_eq!(arr.len(), 1);
862        assert!(arr[0]["_stream"].is_object());
863        assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
864        assert_eq!(arr[0]["_stream"]["placeholder"], true);
865    }
866
867    // ── 19. poll_ready error does not poison multicast ──────────────────
868
869    #[tokio::test]
870    async fn test_poll_ready_error_does_not_poison_multicast() {
871        use std::sync::atomic::AtomicUsize;
872
873        // A service whose poll_ready always returns Ready(Err(...)).
874        #[derive(Clone)]
875        struct FailingReadyService {
876            call_count: Arc<AtomicUsize>,
877        }
878
879        impl Service<Exchange> for FailingReadyService {
880            type Response = Exchange;
881            type Error = CamelError;
882            type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
883
884            fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
885                Poll::Ready(Err(CamelError::ProcessorError("ready-fail".into())))
886            }
887
888            fn call(&mut self, exchange: Exchange) -> Self::Future {
889                self.call_count.fetch_add(1, Ordering::SeqCst);
890                Box::pin(async move { Ok(exchange) })
891            }
892        }
893
894        let call_count = Arc::new(AtomicUsize::new(0));
895        let failing_svc = FailingReadyService {
896            call_count: Arc::clone(&call_count),
897        };
898        let failing_boxed: BoxProcessor = BoxProcessor::new(failing_svc);
899
900        let endpoints = vec![uppercase_processor(), failing_boxed, uppercase_processor()];
901
902        let config = MulticastConfig::new()
903            .stop_on_exception(false)
904            .aggregation(MulticastStrategy::LastWins);
905        let mut svc = MulticastService::new(endpoints, config).unwrap();
906
907        // With the fix, poll_ready returns Ready(Ok(())) so ready() succeeds.
908        // The failing endpoint's readiness error is handled inside call(),
909        // where stop_on_exception=false means processing continues.
910        let result = svc.ready().await;
911        assert!(
912            result.is_ok(),
913            "poll_ready should not fail-fast; got error: {:?}",
914            result.err()
915        );
916
917        let result = result.unwrap().call(make_exchange("hello")).await;
918        assert!(
919            result.is_ok(),
920            "LastWins with last endpoint succeeding should return Ok; got: {:?}",
921            result.err()
922        );
923
924        // The successful endpoints (0 and 2) should have been processed.
925        // The failing endpoint (1) should have had its call bypassed due to
926        // readiness failure inside process_sequential, but the others continue.
927        assert_eq!(
928            result.unwrap().input.body.as_text(),
929            Some("HELLO"),
930            "last successful endpoint should have uppercased"
931        );
932
933        // Verify the failing endpoint's call() was never invoked
934        assert_eq!(call_count.load(Ordering::SeqCst), 0);
935    }
936
937    #[tokio::test]
938    async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
939        let endpoints = vec![
940            BoxProcessor::from_fn(|mut ex: Exchange| {
941                Box::pin(async move {
942                    ex.input.body = Body::Bytes(vec![65, 66].into());
943                    Ok(ex)
944                })
945            }),
946            BoxProcessor::from_fn(|mut ex: Exchange| {
947                Box::pin(async move {
948                    ex.input.body = Body::Xml("<a/>".to_string());
949                    Ok(ex)
950                })
951            }),
952            BoxProcessor::from_fn(|mut ex: Exchange| {
953                Box::pin(async move {
954                    ex.input.body = Body::Empty;
955                    Ok(ex)
956                })
957            }),
958        ];
959
960        let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
961        let mut svc = MulticastService::new(endpoints, config).unwrap();
962        let result = svc
963            .ready()
964            .await
965            .unwrap()
966            .call(make_exchange("x"))
967            .await
968            .unwrap();
969
970        match result.input.body {
971            Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
972            _ => panic!("expected json"),
973        }
974    }
975}