1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8 Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18#[derive(Clone)]
30pub struct MulticastService {
31 endpoints: Vec<BoxProcessor>,
32 config: MulticastConfig,
33}
34
35impl MulticastService {
36 pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Self {
38 if config.parallel
39 && let Some(limit) = config.parallel_limit
40 {
41 assert!(limit > 0, "parallel_limit must be > 0");
42 }
43 Self { endpoints, config }
44 }
45}
46
47impl Service<Exchange> for MulticastService {
48 type Response = Exchange;
49 type Error = CamelError;
50 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
51
52 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
53 for endpoint in &mut self.endpoints {
55 match endpoint.poll_ready(cx) {
56 Poll::Pending => return Poll::Pending,
57 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
58 Poll::Ready(Ok(())) => {}
59 }
60 }
61 Poll::Ready(Ok(()))
62 }
63
64 fn call(&mut self, exchange: Exchange) -> Self::Future {
65 let original = exchange.clone();
66 let endpoints = self.endpoints.clone();
67 let config = self.config.clone();
68
69 Box::pin(async move {
70 if endpoints.is_empty() {
72 return Ok(original);
73 }
74
75 let total = endpoints.len();
76
77 let results = if config.parallel {
78 process_parallel(exchange, endpoints, config.parallel_limit, total).await
80 } else {
81 process_sequential(exchange, endpoints, config.stop_on_exception, total).await
83 };
84
85 aggregate(results, original, config.aggregation)
87 })
88 }
89}
90
91async fn process_sequential(
94 exchange: Exchange,
95 endpoints: Vec<BoxProcessor>,
96 stop_on_exception: bool,
97 total: usize,
98) -> Vec<Result<Exchange, CamelError>> {
99 let mut results = Vec::with_capacity(endpoints.len());
100
101 for (i, endpoint) in endpoints.into_iter().enumerate() {
102 let mut cloned_exchange = exchange.clone();
104
105 cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
107 cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
108
109 let mut endpoint = endpoint;
110 match tower::ServiceExt::ready(&mut endpoint).await {
111 Err(e) => {
112 results.push(Err(e));
113 if stop_on_exception {
114 break;
115 }
116 }
117 Ok(svc) => {
118 let result = svc.call(cloned_exchange).await;
119 let is_err = result.is_err();
120 results.push(result);
121 if stop_on_exception && is_err {
122 break;
123 }
124 }
125 }
126 }
127
128 results
129}
130
131async fn process_parallel(
134 exchange: Exchange,
135 endpoints: Vec<BoxProcessor>,
136 parallel_limit: Option<usize>,
137 total: usize,
138) -> Vec<Result<Exchange, CamelError>> {
139 use std::sync::Arc;
140 use tokio::sync::Semaphore;
141
142 let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
143
144 let futures: Vec<_> = endpoints
146 .into_iter()
147 .enumerate()
148 .map(|(i, mut endpoint)| {
149 let mut ex = exchange.clone();
150 ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
151 ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
152 let sem = semaphore.clone();
153 async move {
154 let _permit = match &sem {
156 Some(s) => match s.acquire().await {
157 Ok(p) => Some(p),
158 Err(_) => {
159 return Err(CamelError::ProcessorError("semaphore closed".to_string()));
160 }
161 },
162 None => None,
163 };
164
165 tower::ServiceExt::ready(&mut endpoint).await?;
167 endpoint.call(ex).await
168 }
169 })
170 .collect();
171
172 futures::future::join_all(futures).await
174}
175
176fn aggregate(
179 results: Vec<Result<Exchange, CamelError>>,
180 original: Exchange,
181 strategy: MulticastStrategy,
182) -> Result<Exchange, CamelError> {
183 match strategy {
184 MulticastStrategy::LastWins => {
185 results.into_iter().last().unwrap_or_else(|| Ok(original))
188 }
189 MulticastStrategy::CollectAll => {
190 let mut bodies = Vec::new();
192 for result in results {
193 let ex = result?;
194 let value = match &ex.input.body {
195 Body::Text(s) => Value::String(s.clone()),
196 Body::Json(v) => v.clone(),
197 Body::Xml(s) => Value::String(s.clone()),
198 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
199 Body::Empty => Value::Null,
200 Body::Stream(s) => serde_json::json!({
201 "_stream": {
202 "origin": s.metadata.origin,
203 "placeholder": true,
204 "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
205 }
206 }),
207 };
208 bodies.push(value);
209 }
210 let mut out = original;
211 out.input.body = Body::Json(Value::Array(bodies));
212 Ok(out)
213 }
214 MulticastStrategy::Original => Ok(original),
215 MulticastStrategy::Custom(fold_fn) => {
216 let mut iter = results.into_iter();
218 let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
219 iter.try_fold(first, |acc, next_result| {
220 let next = next_result?;
221 Ok(fold_fn(acc, next))
222 })
223 }
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use camel_api::{BoxProcessorExt, Message};
231 use std::sync::Arc;
232 use std::sync::atomic::Ordering;
233 use tower::ServiceExt;
234
235 fn make_exchange(body: &str) -> Exchange {
238 Exchange::new(Message::new(body))
239 }
240
241 fn uppercase_processor() -> BoxProcessor {
242 BoxProcessor::from_fn(|mut ex: Exchange| {
243 Box::pin(async move {
244 if let Body::Text(s) = &ex.input.body {
245 ex.input.body = Body::Text(s.to_uppercase());
246 }
247 Ok(ex)
248 })
249 })
250 }
251
252 fn failing_processor() -> BoxProcessor {
253 BoxProcessor::from_fn(|_ex| {
254 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
255 })
256 }
257
258 #[test]
259 fn test_multicast_zero_parallel_limit_rejected() {
260 let config = MulticastConfig::new().parallel(true).parallel_limit(0);
261 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
262 MulticastService::new(vec![passthrough_processor()], config);
263 }));
264 assert!(result.is_err(), "zero parallel_limit should panic");
265 }
266
267 fn passthrough_processor() -> BoxProcessor {
268 BoxProcessor::from_fn(|ex| Box::pin(async move { Ok(ex) }))
269 }
270
271 #[tokio::test]
274 async fn test_multicast_sequential_last_wins() {
275 let endpoints = vec![
276 uppercase_processor(),
277 uppercase_processor(),
278 uppercase_processor(),
279 ];
280
281 let config = MulticastConfig::new(); let mut svc = MulticastService::new(endpoints, config);
283
284 let result = svc
285 .ready()
286 .await
287 .unwrap()
288 .call(make_exchange("hello"))
289 .await
290 .unwrap();
291
292 assert_eq!(result.input.body.as_text(), Some("HELLO"));
293 }
294
295 #[tokio::test]
298 async fn test_multicast_sequential_collect_all() {
299 let endpoints = vec![
300 uppercase_processor(),
301 uppercase_processor(),
302 uppercase_processor(),
303 ];
304
305 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
306 let mut svc = MulticastService::new(endpoints, config);
307
308 let result = svc
309 .ready()
310 .await
311 .unwrap()
312 .call(make_exchange("hello"))
313 .await
314 .unwrap();
315
316 let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
317 match &result.input.body {
318 Body::Json(v) => assert_eq!(*v, expected),
319 other => panic!("expected JSON body, got {other:?}"),
320 }
321 }
322
323 #[tokio::test]
326 async fn test_multicast_sequential_original() {
327 let endpoints = vec![
328 uppercase_processor(),
329 uppercase_processor(),
330 uppercase_processor(),
331 ];
332
333 let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
334 let mut svc = MulticastService::new(endpoints, config);
335
336 let result = svc
337 .ready()
338 .await
339 .unwrap()
340 .call(make_exchange("hello"))
341 .await
342 .unwrap();
343
344 assert_eq!(result.input.body.as_text(), Some("hello"));
346 }
347
348 #[tokio::test]
351 async fn test_multicast_sequential_custom_aggregation() {
352 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
353 Arc::new(|mut acc: Exchange, next: Exchange| {
354 let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
355 let next_text = next.input.body.as_text().unwrap_or("").to_string();
356 acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
357 acc
358 });
359
360 let endpoints = vec![
361 uppercase_processor(),
362 uppercase_processor(),
363 uppercase_processor(),
364 ];
365
366 let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
367 let mut svc = MulticastService::new(endpoints, config);
368
369 let result = svc
370 .ready()
371 .await
372 .unwrap()
373 .call(make_exchange("a"))
374 .await
375 .unwrap();
376
377 assert_eq!(result.input.body.as_text(), Some("A+A+A"));
378 }
379
380 #[tokio::test]
383 async fn test_multicast_stop_on_exception() {
384 let endpoints = vec![
385 uppercase_processor(),
386 failing_processor(),
387 uppercase_processor(),
388 ];
389
390 let config = MulticastConfig::new().stop_on_exception(true);
391 let mut svc = MulticastService::new(endpoints, config);
392
393 let result = svc
394 .ready()
395 .await
396 .unwrap()
397 .call(make_exchange("hello"))
398 .await;
399
400 assert!(result.is_err(), "expected error due to stop_on_exception");
401 }
402
403 #[tokio::test]
406 async fn test_multicast_continue_on_exception() {
407 let endpoints = vec![
408 uppercase_processor(),
409 failing_processor(),
410 uppercase_processor(),
411 ];
412
413 let config = MulticastConfig::new()
414 .stop_on_exception(false)
415 .aggregation(MulticastStrategy::LastWins);
416 let mut svc = MulticastService::new(endpoints, config);
417
418 let result = svc
419 .ready()
420 .await
421 .unwrap()
422 .call(make_exchange("hello"))
423 .await;
424
425 assert!(result.is_ok(), "last endpoint should succeed");
427 assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
428 }
429
430 #[tokio::test]
433 async fn test_multicast_stop_on_exception_halts_early() {
434 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
435
436 let executed = Arc::new(AtomicUsize::new(0));
438
439 let exec_clone1 = Arc::clone(&executed);
440 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
441 let e = Arc::clone(&exec_clone1);
442 Box::pin(async move {
443 e.fetch_add(1, AtomicOrdering::SeqCst);
444 Ok(ex)
445 })
446 });
447
448 let exec_clone2 = Arc::clone(&executed);
449 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
450 let e = Arc::clone(&exec_clone2);
451 Box::pin(async move {
452 e.fetch_add(1, AtomicOrdering::SeqCst);
453 Err(CamelError::ProcessorError("fail on 1".into()))
454 })
455 });
456
457 let exec_clone3 = Arc::clone(&executed);
458 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
459 let e = Arc::clone(&exec_clone3);
460 Box::pin(async move {
461 e.fetch_add(1, AtomicOrdering::SeqCst);
462 Ok(ex)
463 })
464 });
465
466 let endpoints = vec![endpoint0, endpoint1, endpoint2];
467 let config = MulticastConfig::new().stop_on_exception(true);
468 let mut svc = MulticastService::new(endpoints, config);
469
470 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
471 assert!(result.is_err(), "should fail at endpoint 1");
472
473 let count = executed.load(AtomicOrdering::SeqCst);
475 assert_eq!(
476 count, 2,
477 "endpoint 2 should not have executed due to stop_on_exception"
478 );
479 }
480
481 #[tokio::test]
484 async fn test_multicast_continue_on_exception_executes_all() {
485 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
486
487 let executed = Arc::new(AtomicUsize::new(0));
489
490 let exec_clone1 = Arc::clone(&executed);
491 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
492 let e = Arc::clone(&exec_clone1);
493 Box::pin(async move {
494 e.fetch_add(1, AtomicOrdering::SeqCst);
495 Ok(ex)
496 })
497 });
498
499 let exec_clone2 = Arc::clone(&executed);
500 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
501 let e = Arc::clone(&exec_clone2);
502 Box::pin(async move {
503 e.fetch_add(1, AtomicOrdering::SeqCst);
504 Err(CamelError::ProcessorError("fail on 1".into()))
505 })
506 });
507
508 let exec_clone3 = Arc::clone(&executed);
509 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
510 let e = Arc::clone(&exec_clone3);
511 Box::pin(async move {
512 e.fetch_add(1, AtomicOrdering::SeqCst);
513 Ok(ex)
514 })
515 });
516
517 let endpoints = vec![endpoint0, endpoint1, endpoint2];
518 let config = MulticastConfig::new()
519 .stop_on_exception(false)
520 .aggregation(MulticastStrategy::LastWins);
521 let mut svc = MulticastService::new(endpoints, config);
522
523 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
524 assert!(result.is_ok(), "last endpoint should succeed");
525
526 let count = executed.load(AtomicOrdering::SeqCst);
528 assert_eq!(
529 count, 3,
530 "all endpoints should have executed despite error in endpoint 1"
531 );
532 }
533
534 #[tokio::test]
537 async fn test_multicast_empty_endpoints() {
538 let endpoints: Vec<BoxProcessor> = vec![];
539
540 let config = MulticastConfig::new();
541 let mut svc = MulticastService::new(endpoints, config);
542
543 let mut ex = make_exchange("hello");
544 ex.set_property("marker", Value::Bool(true));
545
546 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
547 assert_eq!(result.input.body.as_text(), Some("hello"));
548 assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
549 }
550
551 #[tokio::test]
554 async fn test_multicast_metadata_properties() {
555 let recorder = BoxProcessor::from_fn(|ex: Exchange| {
557 Box::pin(async move {
558 let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
559 let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
560 let body = serde_json::json!({
561 "index": idx,
562 "complete": complete,
563 });
564 let mut out = ex;
565 out.input.body = Body::Json(body);
566 Ok(out)
567 })
568 });
569
570 let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
571
572 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
573 let mut svc = MulticastService::new(endpoints, config);
574
575 let result = svc
576 .ready()
577 .await
578 .unwrap()
579 .call(make_exchange("x"))
580 .await
581 .unwrap();
582
583 let expected = serde_json::json!([
584 {"index": 0, "complete": false},
585 {"index": 1, "complete": false},
586 {"index": 2, "complete": true},
587 ]);
588 match &result.input.body {
589 Body::Json(v) => assert_eq!(*v, expected),
590 other => panic!("expected JSON body, got {other:?}"),
591 }
592 }
593
594 #[tokio::test]
597 async fn test_poll_ready_delegates_to_endpoints() {
598 use std::sync::atomic::AtomicBool;
599
600 #[derive(Clone)]
602 struct DelayedReady {
603 ready: Arc<AtomicBool>,
604 }
605
606 impl Service<Exchange> for DelayedReady {
607 type Response = Exchange;
608 type Error = CamelError;
609 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
610
611 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
612 if self.ready.load(Ordering::SeqCst) {
613 Poll::Ready(Ok(()))
614 } else {
615 cx.waker().wake_by_ref();
616 Poll::Pending
617 }
618 }
619
620 fn call(&mut self, exchange: Exchange) -> Self::Future {
621 Box::pin(async move { Ok(exchange) })
622 }
623 }
624
625 let ready_flag = Arc::new(AtomicBool::new(false));
626 let inner = DelayedReady {
627 ready: Arc::clone(&ready_flag),
628 };
629 let boxed: BoxProcessor = BoxProcessor::new(inner);
630
631 let config = MulticastConfig::new();
632 let mut svc = MulticastService::new(vec![boxed], config);
633
634 let waker = futures::task::noop_waker();
636 let mut cx = Context::from_waker(&waker);
637 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
638 assert!(
639 poll.is_pending(),
640 "expected Pending when endpoint not ready"
641 );
642
643 ready_flag.store(true, Ordering::SeqCst);
645
646 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
647 assert!(
648 matches!(poll, Poll::Ready(Ok(()))),
649 "expected Ready after endpoint becomes ready"
650 );
651 }
652
653 #[tokio::test]
656 async fn test_multicast_collect_all_error_propagates() {
657 let endpoints = vec![
658 uppercase_processor(),
659 failing_processor(),
660 uppercase_processor(),
661 ];
662
663 let config = MulticastConfig::new()
664 .stop_on_exception(false)
665 .aggregation(MulticastStrategy::CollectAll);
666 let mut svc = MulticastService::new(endpoints, config);
667
668 let result = svc
669 .ready()
670 .await
671 .unwrap()
672 .call(make_exchange("hello"))
673 .await;
674
675 assert!(result.is_err(), "CollectAll should propagate first error");
676 }
677
678 #[tokio::test]
681 async fn test_multicast_last_wins_error_last() {
682 let endpoints = vec![
683 uppercase_processor(),
684 uppercase_processor(),
685 failing_processor(),
686 ];
687
688 let config = MulticastConfig::new()
689 .stop_on_exception(false)
690 .aggregation(MulticastStrategy::LastWins);
691 let mut svc = MulticastService::new(endpoints, config);
692
693 let result = svc
694 .ready()
695 .await
696 .unwrap()
697 .call(make_exchange("hello"))
698 .await;
699
700 assert!(result.is_err(), "LastWins should return last error");
701 }
702
703 #[tokio::test]
706 async fn test_multicast_custom_error_propagates() {
707 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
708 Arc::new(|acc: Exchange, _next: Exchange| acc);
709
710 let endpoints = vec![
711 uppercase_processor(),
712 failing_processor(),
713 uppercase_processor(),
714 ];
715
716 let config = MulticastConfig::new()
717 .stop_on_exception(false)
718 .aggregation(MulticastStrategy::Custom(joiner));
719 let mut svc = MulticastService::new(endpoints, config);
720
721 let result = svc
722 .ready()
723 .await
724 .unwrap()
725 .call(make_exchange("hello"))
726 .await;
727
728 assert!(
729 result.is_err(),
730 "Custom aggregation should propagate errors"
731 );
732 }
733
734 #[tokio::test]
737 async fn test_multicast_parallel_basic() {
738 let endpoints = vec![uppercase_processor(), uppercase_processor()];
739
740 let config = MulticastConfig::new()
741 .parallel(true)
742 .aggregation(MulticastStrategy::CollectAll);
743 let mut svc = MulticastService::new(endpoints, config);
744
745 let result = svc
746 .ready()
747 .await
748 .unwrap()
749 .call(make_exchange("test"))
750 .await
751 .unwrap();
752
753 match &result.input.body {
756 Body::Json(v) => {
757 let arr = v.as_array().expect("expected array");
758 assert_eq!(arr.len(), 2);
759 assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
760 }
761 other => panic!("expected JSON body, got {:?}", other),
762 }
763 }
764
765 #[tokio::test]
768 async fn test_multicast_parallel_with_limit() {
769 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
770
771 let concurrent = Arc::new(AtomicUsize::new(0));
772 let max_concurrent = Arc::new(AtomicUsize::new(0));
773
774 let endpoints: Vec<BoxProcessor> = (0..4)
775 .map(|_| {
776 let c = Arc::clone(&concurrent);
777 let mc = Arc::clone(&max_concurrent);
778 BoxProcessor::from_fn(move |ex: Exchange| {
779 let c = Arc::clone(&c);
780 let mc = Arc::clone(&mc);
781 Box::pin(async move {
782 let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
783 mc.fetch_max(current, AtomicOrdering::SeqCst);
784 tokio::task::yield_now().await;
785 c.fetch_sub(1, AtomicOrdering::SeqCst);
786 Ok(ex)
787 })
788 })
789 })
790 .collect();
791
792 let config = MulticastConfig::new().parallel(true).parallel_limit(2);
793 let mut svc = MulticastService::new(endpoints, config);
794
795 let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
796
797 let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
798 assert!(
799 observed_max <= 2,
800 "max concurrency was {}, expected <= 2",
801 observed_max
802 );
803 }
804
805 async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
808 use bytes::Bytes;
809 use camel_api::{Body, StreamBody, StreamMetadata};
810 use futures::stream;
811 use std::sync::Arc;
812 use tokio::sync::Mutex;
813
814 let chunks = vec![Ok(Bytes::from("test"))];
815 let stream_body = StreamBody {
816 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
817 metadata: StreamMetadata {
818 origin,
819 ..Default::default()
820 },
821 };
822
823 let stream_body_clone = stream_body.clone();
824 let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
825 let body_clone = stream_body_clone.clone();
826 Box::pin(async move {
827 let mut out = ex;
828 out.input.body = Body::Stream(body_clone);
829 Ok(out)
830 })
831 })];
832
833 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
834 let mut svc = MulticastService::new(endpoints, config);
835
836 svc.ready()
837 .await
838 .unwrap()
839 .call(Exchange::new(Message::new("")))
840 .await
841 .unwrap()
842 }
843
844 #[tokio::test]
845 async fn test_multicast_stream_bodies_creates_valid_json() {
846 use camel_api::Body;
847
848 let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
849
850 let Body::Json(value) = &result.input.body else {
851 panic!("Expected Json body, got {:?}", result.input.body);
852 };
853
854 let json_str = serde_json::to_string(&value).unwrap();
855 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
856
857 assert!(parsed.is_array());
858 let arr = parsed.as_array().unwrap();
859 assert_eq!(arr.len(), 1);
860 assert!(arr[0]["_stream"].is_object());
861 assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
862 assert_eq!(arr[0]["_stream"]["placeholder"], true);
863 }
864
865 #[tokio::test]
868 async fn test_multicast_stream_with_none_origin_creates_valid_json() {
869 use camel_api::Body;
870
871 let result = setup_multicast_stream_test(None).await;
872
873 let Body::Json(value) = &result.input.body else {
874 panic!("Expected Json body, got {:?}", result.input.body);
875 };
876
877 let json_str = serde_json::to_string(&value).unwrap();
878 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
879
880 assert!(parsed.is_array());
881 let arr = parsed.as_array().unwrap();
882 assert_eq!(arr.len(), 1);
883 assert!(arr[0]["_stream"].is_object());
884 assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
885 assert_eq!(arr[0]["_stream"]["placeholder"], true);
886 }
887
888 #[tokio::test]
889 async fn test_multicast_collect_all_converts_bytes_xml_and_empty() {
890 let endpoints = vec![
891 BoxProcessor::from_fn(|mut ex: Exchange| {
892 Box::pin(async move {
893 ex.input.body = Body::Bytes(vec![65, 66].into());
894 Ok(ex)
895 })
896 }),
897 BoxProcessor::from_fn(|mut ex: Exchange| {
898 Box::pin(async move {
899 ex.input.body = Body::Xml("<a/>".to_string());
900 Ok(ex)
901 })
902 }),
903 BoxProcessor::from_fn(|mut ex: Exchange| {
904 Box::pin(async move {
905 ex.input.body = Body::Empty;
906 Ok(ex)
907 })
908 }),
909 ];
910
911 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
912 let mut svc = MulticastService::new(endpoints, config);
913 let result = svc
914 .ready()
915 .await
916 .unwrap()
917 .call(make_exchange("x"))
918 .await
919 .unwrap();
920
921 match result.input.body {
922 Body::Json(v) => assert_eq!(v, serde_json::json!(["AB", "<a/>", null])),
923 _ => panic!("expected json"),
924 }
925 }
926}