1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8 Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18#[derive(Clone)]
30pub struct MulticastService {
31 endpoints: Vec<BoxProcessor>,
32 config: MulticastConfig,
33}
34
35impl MulticastService {
36 pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Result<Self, CamelError> {
38 config.validate()?;
39 Ok(Self { endpoints, config })
40 }
41}
42
43impl Service<Exchange> for MulticastService {
44 type Response = Exchange;
45 type Error = CamelError;
46 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
47
48 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 Poll::Ready(Ok(()))
54 }
55
56 fn call(&mut self, exchange: Exchange) -> Self::Future {
57 let original = exchange.clone();
58 let endpoints = self.endpoints.clone();
59 let config = self.config.clone();
60
61 Box::pin(async move {
62 if endpoints.is_empty() {
64 return Ok(original);
65 }
66
67 let total = endpoints.len();
68
69 let results = if config.parallel {
70 process_parallel(exchange, endpoints, config.parallel_limit, total).await
72 } else {
73 process_sequential(exchange, endpoints, config.stop_on_exception, total).await
75 };
76
77 aggregate(results, original, config.aggregation)
79 })
80 }
81}
82
83async fn process_sequential(
86 exchange: Exchange,
87 endpoints: Vec<BoxProcessor>,
88 stop_on_exception: bool,
89 total: usize,
90) -> Vec<Result<Exchange, CamelError>> {
91 let mut results = Vec::with_capacity(endpoints.len());
92
93 for (i, endpoint) in endpoints.into_iter().enumerate() {
94 let mut cloned_exchange = exchange.clone();
96
97 cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
99 cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
100
101 let mut endpoint = endpoint;
102 match tower::ServiceExt::ready(&mut endpoint).await {
103 Err(e) => {
104 results.push(Err(e));
105 if stop_on_exception {
106 break;
107 }
108 }
109 Ok(svc) => {
110 let result = svc.call(cloned_exchange).await;
111 let is_err = result.is_err();
112 results.push(result);
113 if stop_on_exception && is_err {
114 break;
115 }
116 }
117 }
118 }
119
120 results
121}
122
123async fn process_parallel(
126 exchange: Exchange,
127 endpoints: Vec<BoxProcessor>,
128 parallel_limit: Option<usize>,
129 total: usize,
130) -> Vec<Result<Exchange, CamelError>> {
131 use std::sync::Arc;
132 use tokio::sync::Semaphore;
133
134 let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
135
136 let futures: Vec<_> = endpoints
138 .into_iter()
139 .enumerate()
140 .map(|(i, mut endpoint)| {
141 let mut ex = exchange.clone();
142 ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
143 ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
144 let sem = semaphore.clone();
145 async move {
146 let _permit = match &sem {
148 Some(s) => match s.acquire().await {
149 Ok(p) => Some(p),
150 Err(_) => {
151 return Err(CamelError::ProcessorError("semaphore closed".to_string()));
152 }
153 },
154 None => None,
155 };
156
157 tower::ServiceExt::ready(&mut endpoint).await?;
160 endpoint.call(ex).await
161 }
162 })
163 .collect();
164
165 futures::future::join_all(futures).await
167}
168
169fn aggregate(
172 results: Vec<Result<Exchange, CamelError>>,
173 original: Exchange,
174 strategy: MulticastStrategy,
175) -> Result<Exchange, CamelError> {
176 match strategy {
177 MulticastStrategy::LastWins => {
178 results.into_iter().last().unwrap_or_else(|| Ok(original))
181 }
182 MulticastStrategy::CollectAll => {
183 let mut bodies = Vec::new();
185 for result in results {
186 let ex = result?;
187 let value = match &ex.input.body {
188 Body::Text(s) => Value::String(s.clone()),
189 Body::Json(v) => v.clone(),
190 Body::Xml(s) => Value::String(s.clone()),
191 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
192 Body::Empty => Value::Null,
193 Body::Stream(s) => serde_json::json!({
194 "_stream": {
195 "origin": s.metadata.origin,
196 "placeholder": true,
197 "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
198 }
199 }),
200 };
201 bodies.push(value);
202 }
203 let mut out = original;
204 out.input.body = Body::Json(Value::Array(bodies));
205 Ok(out)
206 }
207 MulticastStrategy::Original => Ok(original),
208 MulticastStrategy::Custom(fold_fn) => {
209 let mut iter = results.into_iter();
211 let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
212 iter.try_fold(first, |acc, next_result| {
213 let next = next_result?;
214 Ok(fold_fn(acc, next))
215 })
216 }
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use camel_api::{BoxProcessorExt, Message};
224 use std::sync::Arc;
225 use std::sync::atomic::Ordering;
226 use tower::ServiceExt;
227
228 fn make_exchange(body: &str) -> Exchange {
231 Exchange::new(Message::new(body))
232 }
233
234 fn uppercase_processor() -> BoxProcessor {
235 BoxProcessor::from_fn(|mut ex: Exchange| {
236 Box::pin(async move {
237 if let Body::Text(s) = &ex.input.body {
238 ex.input.body = Body::Text(s.to_uppercase());
239 }
240 Ok(ex)
241 })
242 })
243 }
244
245 fn failing_processor() -> BoxProcessor {
246 BoxProcessor::from_fn(|_ex| {
247 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
248 })
249 }
250
251 #[test]
252 fn test_multicast_zero_parallel_limit_rejected() {
253 let config = MulticastConfig::new().parallel(true).parallel_limit(0);
254 let result = MulticastService::new(vec![passthrough_processor()], config);
255 assert!(result.is_err(), "zero parallel_limit should return Err");
256 }
257
258 fn passthrough_processor() -> BoxProcessor {
259 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
260 }
261
262 #[tokio::test]
265 async fn test_multicast_sequential_last_wins() {
266 let endpoints = vec![
267 uppercase_processor(),
268 uppercase_processor(),
269 uppercase_processor(),
270 ];
271
272 let config = MulticastConfig::new(); let mut svc = MulticastService::new(endpoints, config).unwrap();
274
275 let result = svc
276 .ready()
277 .await
278 .unwrap()
279 .call(make_exchange("hello"))
280 .await
281 .unwrap();
282
283 assert_eq!(result.input.body.as_text(), Some("HELLO"));
284 }
285
286 #[tokio::test]
289 async fn test_multicast_sequential_collect_all() {
290 let endpoints = vec![
291 uppercase_processor(),
292 uppercase_processor(),
293 uppercase_processor(),
294 ];
295
296 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
297 let mut svc = MulticastService::new(endpoints, config).unwrap();
298
299 let result = svc
300 .ready()
301 .await
302 .unwrap()
303 .call(make_exchange("hello"))
304 .await
305 .unwrap();
306
307 let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
308 match &result.input.body {
309 Body::Json(v) => assert_eq!(*v, expected),
310 other => panic!("expected JSON body, got {other:?}"),
311 }
312 }
313
314 #[tokio::test]
317 async fn test_multicast_sequential_original() {
318 let endpoints = vec![
319 uppercase_processor(),
320 uppercase_processor(),
321 uppercase_processor(),
322 ];
323
324 let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
325 let mut svc = MulticastService::new(endpoints, config).unwrap();
326
327 let result = svc
328 .ready()
329 .await
330 .unwrap()
331 .call(make_exchange("hello"))
332 .await
333 .unwrap();
334
335 assert_eq!(result.input.body.as_text(), Some("hello"));
337 }
338
339 #[tokio::test]
342 async fn test_multicast_sequential_custom_aggregation() {
343 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
344 Arc::new(|mut acc: Exchange, next: Exchange| {
345 let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
346 let next_text = next.input.body.as_text().unwrap_or("").to_string();
347 acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
348 acc
349 });
350
351 let endpoints = vec![
352 uppercase_processor(),
353 uppercase_processor(),
354 uppercase_processor(),
355 ];
356
357 let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
358 let mut svc = MulticastService::new(endpoints, config).unwrap();
359
360 let result = svc
361 .ready()
362 .await
363 .unwrap()
364 .call(make_exchange("a"))
365 .await
366 .unwrap();
367
368 assert_eq!(result.input.body.as_text(), Some("A+A+A"));
369 }
370
371 #[tokio::test]
374 async fn test_multicast_stop_on_exception() {
375 let endpoints = vec![
376 uppercase_processor(),
377 failing_processor(),
378 uppercase_processor(),
379 ];
380
381 let config = MulticastConfig::new().stop_on_exception(true);
382 let mut svc = MulticastService::new(endpoints, config).unwrap();
383
384 let result = svc
385 .ready()
386 .await
387 .unwrap()
388 .call(make_exchange("hello"))
389 .await;
390
391 assert!(result.is_err(), "expected error due to stop_on_exception");
392 }
393
394 #[tokio::test]
397 async fn test_multicast_continue_on_exception() {
398 let endpoints = vec![
399 uppercase_processor(),
400 failing_processor(),
401 uppercase_processor(),
402 ];
403
404 let config = MulticastConfig::new()
405 .stop_on_exception(false)
406 .aggregation(MulticastStrategy::LastWins);
407 let mut svc = MulticastService::new(endpoints, config).unwrap();
408
409 let result = svc
410 .ready()
411 .await
412 .unwrap()
413 .call(make_exchange("hello"))
414 .await;
415
416 assert!(result.is_ok(), "last endpoint should succeed");
418 assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
419 }
420
421 #[tokio::test]
424 async fn test_multicast_stop_on_exception_halts_early() {
425 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
426
427 let executed = Arc::new(AtomicUsize::new(0));
429
430 let exec_clone1 = Arc::clone(&executed);
431 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
432 let e = Arc::clone(&exec_clone1);
433 Box::pin(async move {
434 e.fetch_add(1, AtomicOrdering::SeqCst);
435 Ok(ex)
436 })
437 });
438
439 let exec_clone2 = Arc::clone(&executed);
440 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
441 let e = Arc::clone(&exec_clone2);
442 Box::pin(async move {
443 e.fetch_add(1, AtomicOrdering::SeqCst);
444 Err(CamelError::ProcessorError("fail on 1".into()))
445 })
446 });
447
448 let exec_clone3 = Arc::clone(&executed);
449 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
450 let e = Arc::clone(&exec_clone3);
451 Box::pin(async move {
452 e.fetch_add(1, AtomicOrdering::SeqCst);
453 Ok(ex)
454 })
455 });
456
457 let endpoints = vec![endpoint0, endpoint1, endpoint2];
458 let config = MulticastConfig::new().stop_on_exception(true);
459 let mut svc = MulticastService::new(endpoints, config).unwrap();
460
461 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
462 assert!(result.is_err(), "should fail at endpoint 1");
463
464 let count = executed.load(AtomicOrdering::SeqCst);
466 assert_eq!(
467 count, 2,
468 "endpoint 2 should not have executed due to stop_on_exception"
469 );
470 }
471
472 #[tokio::test]
475 async fn test_multicast_continue_on_exception_executes_all() {
476 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
477
478 let executed = Arc::new(AtomicUsize::new(0));
480
481 let exec_clone1 = Arc::clone(&executed);
482 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
483 let e = Arc::clone(&exec_clone1);
484 Box::pin(async move {
485 e.fetch_add(1, AtomicOrdering::SeqCst);
486 Ok(ex)
487 })
488 });
489
490 let exec_clone2 = Arc::clone(&executed);
491 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
492 let e = Arc::clone(&exec_clone2);
493 Box::pin(async move {
494 e.fetch_add(1, AtomicOrdering::SeqCst);
495 Err(CamelError::ProcessorError("fail on 1".into()))
496 })
497 });
498
499 let exec_clone3 = Arc::clone(&executed);
500 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
501 let e = Arc::clone(&exec_clone3);
502 Box::pin(async move {
503 e.fetch_add(1, AtomicOrdering::SeqCst);
504 Ok(ex)
505 })
506 });
507
508 let endpoints = vec![endpoint0, endpoint1, endpoint2];
509 let config = MulticastConfig::new()
510 .stop_on_exception(false)
511 .aggregation(MulticastStrategy::LastWins);
512 let mut svc = MulticastService::new(endpoints, config).unwrap();
513
514 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
515 assert!(result.is_ok(), "last endpoint should succeed");
516
517 let count = executed.load(AtomicOrdering::SeqCst);
519 assert_eq!(
520 count, 3,
521 "all endpoints should have executed despite error in endpoint 1"
522 );
523 }
524
525 #[tokio::test]
528 async fn test_multicast_empty_endpoints() {
529 let endpoints: Vec<BoxProcessor> = vec![];
530
531 let config = MulticastConfig::new();
532 let mut svc = MulticastService::new(endpoints, config).unwrap();
533
534 let mut ex = make_exchange("hello");
535 ex.set_property("marker", Value::Bool(true));
536
537 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
538 assert_eq!(result.input.body.as_text(), Some("hello"));
539 assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
540 }
541
542 #[tokio::test]
545 async fn test_multicast_metadata_properties() {
546 let recorder = BoxProcessor::from_fn(|ex: Exchange| {
548 Box::pin(async move {
549 let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
550 let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
551 let body = serde_json::json!({
552 "index": idx,
553 "complete": complete,
554 });
555 let mut out = ex;
556 out.input.body = Body::Json(body);
557 Ok(out)
558 })
559 });
560
561 let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
562
563 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
564 let mut svc = MulticastService::new(endpoints, config).unwrap();
565
566 let result = svc
567 .ready()
568 .await
569 .unwrap()
570 .call(make_exchange("x"))
571 .await
572 .unwrap();
573
574 let expected = serde_json::json!([
575 {"index": 0, "complete": false},
576 {"index": 1, "complete": false},
577 {"index": 2, "complete": true},
578 ]);
579 match &result.input.body {
580 Body::Json(v) => assert_eq!(*v, expected),
581 other => panic!("expected JSON body, got {other:?}"),
582 }
583 }
584
585 #[tokio::test]
588 async fn test_poll_ready_returns_ready_immediately() {
589 use std::sync::atomic::AtomicBool;
590
591 #[derive(Clone)]
593 struct NeverReady {
594 _ready: Arc<AtomicBool>,
595 }
596
597 impl Service<Exchange> for NeverReady {
598 type Response = Exchange;
599 type Error = CamelError;
600 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
601
602 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
603 cx.waker().wake_by_ref();
604 Poll::Pending
605 }
606
607 fn call(&mut self, exchange: Exchange) -> Self::Future {
608 Box::pin(async move { Ok(exchange) })
609 }
610 }
611
612 let inner = NeverReady {
613 _ready: Arc::new(AtomicBool::new(false)),
614 };
615 let boxed: BoxProcessor = BoxProcessor::new(inner);
616
617 let config = MulticastConfig::new();
618 let mut svc = MulticastService::new(vec![boxed], config).unwrap();
619
620 let waker = futures::task::noop_waker();
624 let mut cx = Context::from_waker(&waker);
625 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
626 assert!(
627 matches!(poll, Poll::Ready(Ok(()))),
628 "expected Ready(Ok(())) even when endpoint is not ready"
629 );
630 }
631
632 #[tokio::test]
635 async fn test_multicast_collect_all_error_propagates() {
636 let endpoints = vec![
637 uppercase_processor(),
638 failing_processor(),
639 uppercase_processor(),
640 ];
641
642 let config = MulticastConfig::new()
643 .stop_on_exception(false)
644 .aggregation(MulticastStrategy::CollectAll);
645 let mut svc = MulticastService::new(endpoints, config).unwrap();
646
647 let result = svc
648 .ready()
649 .await
650 .unwrap()
651 .call(make_exchange("hello"))
652 .await;
653
654 assert!(result.is_err(), "CollectAll should propagate first error");
655 }
656
657 #[tokio::test]
660 async fn test_multicast_last_wins_error_last() {
661 let endpoints = vec![
662 uppercase_processor(),
663 uppercase_processor(),
664 failing_processor(),
665 ];
666
667 let config = MulticastConfig::new()
668 .stop_on_exception(false)
669 .aggregation(MulticastStrategy::LastWins);
670 let mut svc = MulticastService::new(endpoints, config).unwrap();
671
672 let result = svc
673 .ready()
674 .await
675 .unwrap()
676 .call(make_exchange("hello"))
677 .await;
678
679 assert!(result.is_err(), "LastWins should return last error");
680 }
681
682 #[tokio::test]
685 async fn test_multicast_custom_error_propagates() {
686 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
687 Arc::new(|acc: Exchange, _next: Exchange| acc);
688
689 let endpoints = vec![
690 uppercase_processor(),
691 failing_processor(),
692 uppercase_processor(),
693 ];
694
695 let config = MulticastConfig::new()
696 .stop_on_exception(false)
697 .aggregation(MulticastStrategy::Custom(joiner));
698 let mut svc = MulticastService::new(endpoints, config).unwrap();
699
700 let result = svc
701 .ready()
702 .await
703 .unwrap()
704 .call(make_exchange("hello"))
705 .await;
706
707 assert!(
708 result.is_err(),
709 "Custom aggregation should propagate errors"
710 );
711 }
712
713 #[tokio::test]
716 async fn test_multicast_parallel_basic() {
717 let endpoints = vec![uppercase_processor(), uppercase_processor()];
718
719 let config = MulticastConfig::new()
720 .parallel(true)
721 .aggregation(MulticastStrategy::CollectAll);
722 let mut svc = MulticastService::new(endpoints, config).unwrap();
723
724 let result = svc
725 .ready()
726 .await
727 .unwrap()
728 .call(make_exchange("test"))
729 .await
730 .unwrap();
731
732 match &result.input.body {
735 Body::Json(v) => {
736 let arr = v.as_array().expect("expected array");
737 assert_eq!(arr.len(), 2);
738 assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
739 }
740 other => panic!("expected JSON body, got {:?}", other),
741 }
742 }
743
744 #[tokio::test]
747 async fn test_multicast_parallel_with_limit() {
748 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
749
750 let concurrent = Arc::new(AtomicUsize::new(0));
751 let max_concurrent = Arc::new(AtomicUsize::new(0));
752
753 let endpoints: Vec<BoxProcessor> = (0..4)
754 .map(|_| {
755 let c = Arc::clone(&concurrent);
756 let mc = Arc::clone(&max_concurrent);
757 BoxProcessor::from_fn(move |ex: Exchange| {
758 let c = Arc::clone(&c);
759 let mc = Arc::clone(&mc);
760 Box::pin(async move {
761 let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
762 mc.fetch_max(current, AtomicOrdering::SeqCst);
763 tokio::task::yield_now().await;
764 c.fetch_sub(1, AtomicOrdering::SeqCst);
765 Ok(ex)
766 })
767 })
768 })
769 .collect();
770
771 let config = MulticastConfig::new().parallel(true).parallel_limit(2);
772 let mut svc = MulticastService::new(endpoints, config).unwrap();
773
774 let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
775
776 let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
777 assert!(
778 observed_max <= 2,
779 "max concurrency was {}, expected <= 2",
780 observed_max
781 );
782 }
783
784 async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
787 use bytes::Bytes;
788 use camel_api::{Body, StreamBody, StreamMetadata};
789 use futures::stream;
790 use std::sync::Arc;
791 use tokio::sync::Mutex;
792
793 let chunks = vec![Ok(Bytes::from("test"))];
794 let stream_body = StreamBody {
795 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
796 metadata: StreamMetadata {
797 origin,
798 ..Default::default()
799 },
800 };
801
802 let stream_body_clone = stream_body.clone();
803 let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
804 let body_clone = stream_body_clone.clone();
805 Box::pin(async move {
806 let mut out = ex;
807 out.input.body = Body::Stream(body_clone);
808 Ok(out)
809 })
810 })];
811
812 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
813 let mut svc = MulticastService::new(endpoints, config).unwrap();
814
815 svc.ready()
816 .await
817 .unwrap()
818 .call(Exchange::new(Message::new("")))
819 .await
820 .unwrap()
821 }
822
823 #[tokio::test]
824 async fn test_multicast_stream_bodies_creates_valid_json() {
825 use camel_api::Body;
826
827 let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
828
829 let Body::Json(value) = &result.input.body else {
830 panic!("Expected Json body, got {:?}", result.input.body);
831 };
832
833 let json_str = serde_json::to_string(&value).unwrap();
834 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
835
836 assert!(parsed.is_array());
837 let arr = parsed.as_array().unwrap();
838 assert_eq!(arr.len(), 1);
839 assert!(arr[0]["_stream"].is_object());
840 assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
841 assert_eq!(arr[0]["_stream"]["placeholder"], true);
842 }
843
844 #[tokio::test]
847 async fn test_multicast_stream_with_none_origin_creates_valid_json() {
848 use camel_api::Body;
849
850 let result = setup_multicast_stream_test(None).await;
851
852 let Body::Json(value) = &result.input.body else {
853 panic!("Expected Json body, got {:?}", result.input.body);
854 };
855
856 let json_str = serde_json::to_string(&value).unwrap();
857 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
858
859 assert!(parsed.is_array());
860 let arr = parsed.as_array().unwrap();
861 assert_eq!(arr.len(), 1);
862 assert!(arr[0]["_stream"].is_object());
863 assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
864 assert_eq!(arr[0]["_stream"]["placeholder"], true);
865 }
866
867 #[tokio::test]
870 async fn test_poll_ready_error_does_not_poison_multicast() {
871 use std::sync::atomic::AtomicUsize;
872
873 #[derive(Clone)]
875 struct FailingReadyService {
876 call_count: Arc<AtomicUsize>,
877 }
878
879 impl Service<Exchange> for FailingReadyService {
880 type Response = Exchange;
881 type Error = CamelError;
882 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
883
884 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
885 Poll::Ready(Err(CamelError::ProcessorError("ready-fail".into())))
886 }
887
888 fn call(&mut self, exchange: Exchange) -> Self::Future {
889 self.call_count.fetch_add(1, Ordering::SeqCst);
890 Box::pin(async move { Ok(exchange) })
891 }
892 }
893
894 let call_count = Arc::new(AtomicUsize::new(0));
895 let failing_svc = FailingReadyService {
896 call_count: Arc::clone(&call_count),
897 };
898 let failing_boxed: BoxProcessor = BoxProcessor::new(failing_svc);
899
900 let endpoints = vec![uppercase_processor(), failing_boxed, uppercase_processor()];
901
902 let config = MulticastConfig::new()
903 .stop_on_exception(false)
904 .aggregation(MulticastStrategy::LastWins);
905 let mut svc = MulticastService::new(endpoints, config).unwrap();
906
907 let result = svc.ready().await;
911 assert!(
912 result.is_ok(),
913 "poll_ready should not fail-fast; got error: {:?}",
914 result.err()
915 );
916
917 let result = result.unwrap().call(make_exchange("hello")).await;
918 assert!(
919 result.is_ok(),
920 "LastWins with last endpoint succeeding should return Ok; got: {:?}",
921 result.err()
922 );
923
924 assert_eq!(
928 result.unwrap().input.body.as_text(),
929 Some("HELLO"),
930 "last successful endpoint should have uppercased"
931 );
932
933 assert_eq!(call_count.load(Ordering::SeqCst), 0);
935 }
936
937 #[tokio::test]
938 async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
939 let endpoints = vec![
940 BoxProcessor::from_fn(|mut ex: Exchange| {
941 Box::pin(async move {
942 ex.input.body = Body::Bytes(vec![65, 66].into());
943 Ok(ex)
944 })
945 }),
946 BoxProcessor::from_fn(|mut ex: Exchange| {
947 Box::pin(async move {
948 ex.input.body = Body::Xml("<a/>".to_string());
949 Ok(ex)
950 })
951 }),
952 BoxProcessor::from_fn(|mut ex: Exchange| {
953 Box::pin(async move {
954 ex.input.body = Body::Empty;
955 Ok(ex)
956 })
957 }),
958 ];
959
960 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
961 let mut svc = MulticastService::new(endpoints, config).unwrap();
962 let result = svc
963 .ready()
964 .await
965 .unwrap()
966 .call(make_exchange("x"))
967 .await
968 .unwrap();
969
970 match result.input.body {
971 Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
972 _ => panic!("expected json"),
973 }
974 }
975}