1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8 Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18#[derive(Clone)]
30pub struct MulticastService {
31 endpoints: Vec<BoxProcessor>,
32 config: MulticastConfig,
33}
34
35impl MulticastService {
36 pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Result<Self, CamelError> {
38 config.validate()?;
39 Ok(Self { endpoints, config })
40 }
41}
42
43impl Service<Exchange> for MulticastService {
44 type Response = Exchange;
45 type Error = CamelError;
46 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
47
48 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
49 for endpoint in &mut self.endpoints {
51 match endpoint.poll_ready(cx) {
52 Poll::Pending => return Poll::Pending,
53 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
54 Poll::Ready(Ok(())) => {}
55 }
56 }
57 Poll::Ready(Ok(()))
58 }
59
60 fn call(&mut self, exchange: Exchange) -> Self::Future {
61 let original = exchange.clone();
62 let endpoints = self.endpoints.clone();
63 let config = self.config.clone();
64
65 Box::pin(async move {
66 if endpoints.is_empty() {
68 return Ok(original);
69 }
70
71 let total = endpoints.len();
72
73 let results = if config.parallel {
74 process_parallel(exchange, endpoints, config.parallel_limit, total).await
76 } else {
77 process_sequential(exchange, endpoints, config.stop_on_exception, total).await
79 };
80
81 aggregate(results, original, config.aggregation)
83 })
84 }
85}
86
87async fn process_sequential(
90 exchange: Exchange,
91 endpoints: Vec<BoxProcessor>,
92 stop_on_exception: bool,
93 total: usize,
94) -> Vec<Result<Exchange, CamelError>> {
95 let mut results = Vec::with_capacity(endpoints.len());
96
97 for (i, endpoint) in endpoints.into_iter().enumerate() {
98 let mut cloned_exchange = exchange.clone();
100
101 cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
103 cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
104
105 let mut endpoint = endpoint;
106 match tower::ServiceExt::ready(&mut endpoint).await {
107 Err(e) => {
108 results.push(Err(e));
109 if stop_on_exception {
110 break;
111 }
112 }
113 Ok(svc) => {
114 let result = svc.call(cloned_exchange).await;
115 let is_err = result.is_err();
116 results.push(result);
117 if stop_on_exception && is_err {
118 break;
119 }
120 }
121 }
122 }
123
124 results
125}
126
127async fn process_parallel(
130 exchange: Exchange,
131 endpoints: Vec<BoxProcessor>,
132 parallel_limit: Option<usize>,
133 total: usize,
134) -> Vec<Result<Exchange, CamelError>> {
135 use std::sync::Arc;
136 use tokio::sync::Semaphore;
137
138 let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
139
140 let futures: Vec<_> = endpoints
142 .into_iter()
143 .enumerate()
144 .map(|(i, mut endpoint)| {
145 let mut ex = exchange.clone();
146 ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
147 ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
148 let sem = semaphore.clone();
149 async move {
150 let _permit = match &sem {
152 Some(s) => match s.acquire().await {
153 Ok(p) => Some(p),
154 Err(_) => {
155 return Err(CamelError::ProcessorError("semaphore closed".to_string()));
156 }
157 },
158 None => None,
159 };
160
161 tower::ServiceExt::ready(&mut endpoint).await?;
163 endpoint.call(ex).await
164 }
165 })
166 .collect();
167
168 futures::future::join_all(futures).await
170}
171
172fn aggregate(
175 results: Vec<Result<Exchange, CamelError>>,
176 original: Exchange,
177 strategy: MulticastStrategy,
178) -> Result<Exchange, CamelError> {
179 match strategy {
180 MulticastStrategy::LastWins => {
181 results.into_iter().last().unwrap_or_else(|| Ok(original))
184 }
185 MulticastStrategy::CollectAll => {
186 let mut bodies = Vec::new();
188 for result in results {
189 let ex = result?;
190 let value = match &ex.input.body {
191 Body::Text(s) => Value::String(s.clone()),
192 Body::Json(v) => v.clone(),
193 Body::Xml(s) => Value::String(s.clone()),
194 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
195 Body::Empty => Value::Null,
196 Body::Stream(s) => serde_json::json!({
197 "_stream": {
198 "origin": s.metadata.origin,
199 "placeholder": true,
200 "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
201 }
202 }),
203 };
204 bodies.push(value);
205 }
206 let mut out = original;
207 out.input.body = Body::Json(Value::Array(bodies));
208 Ok(out)
209 }
210 MulticastStrategy::Original => Ok(original),
211 MulticastStrategy::Custom(fold_fn) => {
212 let mut iter = results.into_iter();
214 let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
215 iter.try_fold(first, |acc, next_result| {
216 let next = next_result?;
217 Ok(fold_fn(acc, next))
218 })
219 }
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use camel_api::{BoxProcessorExt, Message};
227 use std::sync::Arc;
228 use std::sync::atomic::Ordering;
229 use tower::ServiceExt;
230
231 fn make_exchange(body: &str) -> Exchange {
234 Exchange::new(Message::new(body))
235 }
236
237 fn uppercase_processor() -> BoxProcessor {
238 BoxProcessor::from_fn(|mut ex: Exchange| {
239 Box::pin(async move {
240 if let Body::Text(s) = &ex.input.body {
241 ex.input.body = Body::Text(s.to_uppercase());
242 }
243 Ok(ex)
244 })
245 })
246 }
247
248 fn failing_processor() -> BoxProcessor {
249 BoxProcessor::from_fn(|_ex| {
250 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
251 })
252 }
253
254 #[test]
255 fn test_multicast_zero_parallel_limit_rejected() {
256 let config = MulticastConfig::new().parallel(true).parallel_limit(0);
257 let result = MulticastService::new(vec![passthrough_processor()], config);
258 assert!(result.is_err(), "zero parallel_limit should return Err");
259 }
260
261 fn passthrough_processor() -> BoxProcessor {
262 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
263 }
264
265 #[tokio::test]
268 async fn test_multicast_sequential_last_wins() {
269 let endpoints = vec![
270 uppercase_processor(),
271 uppercase_processor(),
272 uppercase_processor(),
273 ];
274
275 let config = MulticastConfig::new(); let mut svc = MulticastService::new(endpoints, config).unwrap();
277
278 let result = svc
279 .ready()
280 .await
281 .unwrap()
282 .call(make_exchange("hello"))
283 .await
284 .unwrap();
285
286 assert_eq!(result.input.body.as_text(), Some("HELLO"));
287 }
288
289 #[tokio::test]
292 async fn test_multicast_sequential_collect_all() {
293 let endpoints = vec![
294 uppercase_processor(),
295 uppercase_processor(),
296 uppercase_processor(),
297 ];
298
299 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
300 let mut svc = MulticastService::new(endpoints, config).unwrap();
301
302 let result = svc
303 .ready()
304 .await
305 .unwrap()
306 .call(make_exchange("hello"))
307 .await
308 .unwrap();
309
310 let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
311 match &result.input.body {
312 Body::Json(v) => assert_eq!(*v, expected),
313 other => panic!("expected JSON body, got {other:?}"),
314 }
315 }
316
317 #[tokio::test]
320 async fn test_multicast_sequential_original() {
321 let endpoints = vec![
322 uppercase_processor(),
323 uppercase_processor(),
324 uppercase_processor(),
325 ];
326
327 let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
328 let mut svc = MulticastService::new(endpoints, config).unwrap();
329
330 let result = svc
331 .ready()
332 .await
333 .unwrap()
334 .call(make_exchange("hello"))
335 .await
336 .unwrap();
337
338 assert_eq!(result.input.body.as_text(), Some("hello"));
340 }
341
342 #[tokio::test]
345 async fn test_multicast_sequential_custom_aggregation() {
346 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
347 Arc::new(|mut acc: Exchange, next: Exchange| {
348 let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
349 let next_text = next.input.body.as_text().unwrap_or("").to_string();
350 acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
351 acc
352 });
353
354 let endpoints = vec![
355 uppercase_processor(),
356 uppercase_processor(),
357 uppercase_processor(),
358 ];
359
360 let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
361 let mut svc = MulticastService::new(endpoints, config).unwrap();
362
363 let result = svc
364 .ready()
365 .await
366 .unwrap()
367 .call(make_exchange("a"))
368 .await
369 .unwrap();
370
371 assert_eq!(result.input.body.as_text(), Some("A+A+A"));
372 }
373
374 #[tokio::test]
377 async fn test_multicast_stop_on_exception() {
378 let endpoints = vec![
379 uppercase_processor(),
380 failing_processor(),
381 uppercase_processor(),
382 ];
383
384 let config = MulticastConfig::new().stop_on_exception(true);
385 let mut svc = MulticastService::new(endpoints, config).unwrap();
386
387 let result = svc
388 .ready()
389 .await
390 .unwrap()
391 .call(make_exchange("hello"))
392 .await;
393
394 assert!(result.is_err(), "expected error due to stop_on_exception");
395 }
396
397 #[tokio::test]
400 async fn test_multicast_continue_on_exception() {
401 let endpoints = vec![
402 uppercase_processor(),
403 failing_processor(),
404 uppercase_processor(),
405 ];
406
407 let config = MulticastConfig::new()
408 .stop_on_exception(false)
409 .aggregation(MulticastStrategy::LastWins);
410 let mut svc = MulticastService::new(endpoints, config).unwrap();
411
412 let result = svc
413 .ready()
414 .await
415 .unwrap()
416 .call(make_exchange("hello"))
417 .await;
418
419 assert!(result.is_ok(), "last endpoint should succeed");
421 assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
422 }
423
424 #[tokio::test]
427 async fn test_multicast_stop_on_exception_halts_early() {
428 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
429
430 let executed = Arc::new(AtomicUsize::new(0));
432
433 let exec_clone1 = Arc::clone(&executed);
434 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
435 let e = Arc::clone(&exec_clone1);
436 Box::pin(async move {
437 e.fetch_add(1, AtomicOrdering::SeqCst);
438 Ok(ex)
439 })
440 });
441
442 let exec_clone2 = Arc::clone(&executed);
443 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
444 let e = Arc::clone(&exec_clone2);
445 Box::pin(async move {
446 e.fetch_add(1, AtomicOrdering::SeqCst);
447 Err(CamelError::ProcessorError("fail on 1".into()))
448 })
449 });
450
451 let exec_clone3 = Arc::clone(&executed);
452 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
453 let e = Arc::clone(&exec_clone3);
454 Box::pin(async move {
455 e.fetch_add(1, AtomicOrdering::SeqCst);
456 Ok(ex)
457 })
458 });
459
460 let endpoints = vec![endpoint0, endpoint1, endpoint2];
461 let config = MulticastConfig::new().stop_on_exception(true);
462 let mut svc = MulticastService::new(endpoints, config).unwrap();
463
464 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
465 assert!(result.is_err(), "should fail at endpoint 1");
466
467 let count = executed.load(AtomicOrdering::SeqCst);
469 assert_eq!(
470 count, 2,
471 "endpoint 2 should not have executed due to stop_on_exception"
472 );
473 }
474
475 #[tokio::test]
478 async fn test_multicast_continue_on_exception_executes_all() {
479 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
480
481 let executed = Arc::new(AtomicUsize::new(0));
483
484 let exec_clone1 = Arc::clone(&executed);
485 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
486 let e = Arc::clone(&exec_clone1);
487 Box::pin(async move {
488 e.fetch_add(1, AtomicOrdering::SeqCst);
489 Ok(ex)
490 })
491 });
492
493 let exec_clone2 = Arc::clone(&executed);
494 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
495 let e = Arc::clone(&exec_clone2);
496 Box::pin(async move {
497 e.fetch_add(1, AtomicOrdering::SeqCst);
498 Err(CamelError::ProcessorError("fail on 1".into()))
499 })
500 });
501
502 let exec_clone3 = Arc::clone(&executed);
503 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
504 let e = Arc::clone(&exec_clone3);
505 Box::pin(async move {
506 e.fetch_add(1, AtomicOrdering::SeqCst);
507 Ok(ex)
508 })
509 });
510
511 let endpoints = vec![endpoint0, endpoint1, endpoint2];
512 let config = MulticastConfig::new()
513 .stop_on_exception(false)
514 .aggregation(MulticastStrategy::LastWins);
515 let mut svc = MulticastService::new(endpoints, config).unwrap();
516
517 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
518 assert!(result.is_ok(), "last endpoint should succeed");
519
520 let count = executed.load(AtomicOrdering::SeqCst);
522 assert_eq!(
523 count, 3,
524 "all endpoints should have executed despite error in endpoint 1"
525 );
526 }
527
528 #[tokio::test]
531 async fn test_multicast_empty_endpoints() {
532 let endpoints: Vec<BoxProcessor> = vec![];
533
534 let config = MulticastConfig::new();
535 let mut svc = MulticastService::new(endpoints, config).unwrap();
536
537 let mut ex = make_exchange("hello");
538 ex.set_property("marker", Value::Bool(true));
539
540 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
541 assert_eq!(result.input.body.as_text(), Some("hello"));
542 assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
543 }
544
545 #[tokio::test]
548 async fn test_multicast_metadata_properties() {
549 let recorder = BoxProcessor::from_fn(|ex: Exchange| {
551 Box::pin(async move {
552 let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
553 let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
554 let body = serde_json::json!({
555 "index": idx,
556 "complete": complete,
557 });
558 let mut out = ex;
559 out.input.body = Body::Json(body);
560 Ok(out)
561 })
562 });
563
564 let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
565
566 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
567 let mut svc = MulticastService::new(endpoints, config).unwrap();
568
569 let result = svc
570 .ready()
571 .await
572 .unwrap()
573 .call(make_exchange("x"))
574 .await
575 .unwrap();
576
577 let expected = serde_json::json!([
578 {"index": 0, "complete": false},
579 {"index": 1, "complete": false},
580 {"index": 2, "complete": true},
581 ]);
582 match &result.input.body {
583 Body::Json(v) => assert_eq!(*v, expected),
584 other => panic!("expected JSON body, got {other:?}"),
585 }
586 }
587
588 #[tokio::test]
591 async fn test_poll_ready_delegates_to_endpoints() {
592 use std::sync::atomic::AtomicBool;
593
594 #[derive(Clone)]
596 struct DelayedReady {
597 ready: Arc<AtomicBool>,
598 }
599
600 impl Service<Exchange> for DelayedReady {
601 type Response = Exchange;
602 type Error = CamelError;
603 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
604
605 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
606 if self.ready.load(Ordering::SeqCst) {
607 Poll::Ready(Ok(()))
608 } else {
609 cx.waker().wake_by_ref();
610 Poll::Pending
611 }
612 }
613
614 fn call(&mut self, exchange: Exchange) -> Self::Future {
615 Box::pin(async move { Ok(exchange) })
616 }
617 }
618
619 let ready_flag = Arc::new(AtomicBool::new(false));
620 let inner = DelayedReady {
621 ready: Arc::clone(&ready_flag),
622 };
623 let boxed: BoxProcessor = BoxProcessor::new(inner);
624
625 let config = MulticastConfig::new();
626 let mut svc = MulticastService::new(vec![boxed], config).unwrap();
627
628 let waker = futures::task::noop_waker();
630 let mut cx = Context::from_waker(&waker);
631 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
632 assert!(
633 poll.is_pending(),
634 "expected Pending when endpoint not ready"
635 );
636
637 ready_flag.store(true, Ordering::SeqCst);
639
640 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
641 assert!(
642 matches!(poll, Poll::Ready(Ok(()))),
643 "expected Ready after endpoint becomes ready"
644 );
645 }
646
647 #[tokio::test]
650 async fn test_multicast_collect_all_error_propagates() {
651 let endpoints = vec![
652 uppercase_processor(),
653 failing_processor(),
654 uppercase_processor(),
655 ];
656
657 let config = MulticastConfig::new()
658 .stop_on_exception(false)
659 .aggregation(MulticastStrategy::CollectAll);
660 let mut svc = MulticastService::new(endpoints, config).unwrap();
661
662 let result = svc
663 .ready()
664 .await
665 .unwrap()
666 .call(make_exchange("hello"))
667 .await;
668
669 assert!(result.is_err(), "CollectAll should propagate first error");
670 }
671
672 #[tokio::test]
675 async fn test_multicast_last_wins_error_last() {
676 let endpoints = vec![
677 uppercase_processor(),
678 uppercase_processor(),
679 failing_processor(),
680 ];
681
682 let config = MulticastConfig::new()
683 .stop_on_exception(false)
684 .aggregation(MulticastStrategy::LastWins);
685 let mut svc = MulticastService::new(endpoints, config).unwrap();
686
687 let result = svc
688 .ready()
689 .await
690 .unwrap()
691 .call(make_exchange("hello"))
692 .await;
693
694 assert!(result.is_err(), "LastWins should return last error");
695 }
696
697 #[tokio::test]
700 async fn test_multicast_custom_error_propagates() {
701 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
702 Arc::new(|acc: Exchange, _next: Exchange| acc);
703
704 let endpoints = vec![
705 uppercase_processor(),
706 failing_processor(),
707 uppercase_processor(),
708 ];
709
710 let config = MulticastConfig::new()
711 .stop_on_exception(false)
712 .aggregation(MulticastStrategy::Custom(joiner));
713 let mut svc = MulticastService::new(endpoints, config).unwrap();
714
715 let result = svc
716 .ready()
717 .await
718 .unwrap()
719 .call(make_exchange("hello"))
720 .await;
721
722 assert!(
723 result.is_err(),
724 "Custom aggregation should propagate errors"
725 );
726 }
727
728 #[tokio::test]
731 async fn test_multicast_parallel_basic() {
732 let endpoints = vec![uppercase_processor(), uppercase_processor()];
733
734 let config = MulticastConfig::new()
735 .parallel(true)
736 .aggregation(MulticastStrategy::CollectAll);
737 let mut svc = MulticastService::new(endpoints, config).unwrap();
738
739 let result = svc
740 .ready()
741 .await
742 .unwrap()
743 .call(make_exchange("test"))
744 .await
745 .unwrap();
746
747 match &result.input.body {
750 Body::Json(v) => {
751 let arr = v.as_array().expect("expected array");
752 assert_eq!(arr.len(), 2);
753 assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
754 }
755 other => panic!("expected JSON body, got {:?}", other),
756 }
757 }
758
759 #[tokio::test]
762 async fn test_multicast_parallel_with_limit() {
763 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
764
765 let concurrent = Arc::new(AtomicUsize::new(0));
766 let max_concurrent = Arc::new(AtomicUsize::new(0));
767
768 let endpoints: Vec<BoxProcessor> = (0..4)
769 .map(|_| {
770 let c = Arc::clone(&concurrent);
771 let mc = Arc::clone(&max_concurrent);
772 BoxProcessor::from_fn(move |ex: Exchange| {
773 let c = Arc::clone(&c);
774 let mc = Arc::clone(&mc);
775 Box::pin(async move {
776 let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
777 mc.fetch_max(current, AtomicOrdering::SeqCst);
778 tokio::task::yield_now().await;
779 c.fetch_sub(1, AtomicOrdering::SeqCst);
780 Ok(ex)
781 })
782 })
783 })
784 .collect();
785
786 let config = MulticastConfig::new().parallel(true).parallel_limit(2);
787 let mut svc = MulticastService::new(endpoints, config).unwrap();
788
789 let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
790
791 let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
792 assert!(
793 observed_max <= 2,
794 "max concurrency was {}, expected <= 2",
795 observed_max
796 );
797 }
798
799 async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
802 use bytes::Bytes;
803 use camel_api::{Body, StreamBody, StreamMetadata};
804 use futures::stream;
805 use std::sync::Arc;
806 use tokio::sync::Mutex;
807
808 let chunks = vec![Ok(Bytes::from("test"))];
809 let stream_body = StreamBody {
810 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
811 metadata: StreamMetadata {
812 origin,
813 ..Default::default()
814 },
815 };
816
817 let stream_body_clone = stream_body.clone();
818 let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
819 let body_clone = stream_body_clone.clone();
820 Box::pin(async move {
821 let mut out = ex;
822 out.input.body = Body::Stream(body_clone);
823 Ok(out)
824 })
825 })];
826
827 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
828 let mut svc = MulticastService::new(endpoints, config).unwrap();
829
830 svc.ready()
831 .await
832 .unwrap()
833 .call(Exchange::new(Message::new("")))
834 .await
835 .unwrap()
836 }
837
838 #[tokio::test]
839 async fn test_multicast_stream_bodies_creates_valid_json() {
840 use camel_api::Body;
841
842 let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
843
844 let Body::Json(value) = &result.input.body else {
845 panic!("Expected Json body, got {:?}", result.input.body);
846 };
847
848 let json_str = serde_json::to_string(&value).unwrap();
849 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
850
851 assert!(parsed.is_array());
852 let arr = parsed.as_array().unwrap();
853 assert_eq!(arr.len(), 1);
854 assert!(arr[0]["_stream"].is_object());
855 assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
856 assert_eq!(arr[0]["_stream"]["placeholder"], true);
857 }
858
859 #[tokio::test]
862 async fn test_multicast_stream_with_none_origin_creates_valid_json() {
863 use camel_api::Body;
864
865 let result = setup_multicast_stream_test(None).await;
866
867 let Body::Json(value) = &result.input.body else {
868 panic!("Expected Json body, got {:?}", result.input.body);
869 };
870
871 let json_str = serde_json::to_string(&value).unwrap();
872 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
873
874 assert!(parsed.is_array());
875 let arr = parsed.as_array().unwrap();
876 assert_eq!(arr.len(), 1);
877 assert!(arr[0]["_stream"].is_object());
878 assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
879 assert_eq!(arr[0]["_stream"]["placeholder"], true);
880 }
881
882 #[tokio::test]
883 async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
884 let endpoints = vec![
885 BoxProcessor::from_fn(|mut ex: Exchange| {
886 Box::pin(async move {
887 ex.input.body = Body::Bytes(vec![65, 66].into());
888 Ok(ex)
889 })
890 }),
891 BoxProcessor::from_fn(|mut ex: Exchange| {
892 Box::pin(async move {
893 ex.input.body = Body::Xml("<a/>".to_string());
894 Ok(ex)
895 })
896 }),
897 BoxProcessor::from_fn(|mut ex: Exchange| {
898 Box::pin(async move {
899 ex.input.body = Body::Empty;
900 Ok(ex)
901 })
902 }),
903 ];
904
905 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
906 let mut svc = MulticastService::new(endpoints, config).unwrap();
907 let result = svc
908 .ready()
909 .await
910 .unwrap()
911 .call(make_exchange("x"))
912 .await
913 .unwrap();
914
915 match result.input.body {
916 Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
917 _ => panic!("expected json"),
918 }
919 }
920}