1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8 Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18#[derive(Clone)]
30pub struct MulticastService {
31 endpoints: Vec<BoxProcessor>,
32 config: MulticastConfig,
33}
34
35impl MulticastService {
36 pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Self {
38 Self { endpoints, config }
39 }
40}
41
42impl Service<Exchange> for MulticastService {
43 type Response = Exchange;
44 type Error = CamelError;
45 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
46
47 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48 for endpoint in &mut self.endpoints {
50 match endpoint.poll_ready(cx) {
51 Poll::Pending => return Poll::Pending,
52 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
53 Poll::Ready(Ok(())) => {}
54 }
55 }
56 Poll::Ready(Ok(()))
57 }
58
59 fn call(&mut self, exchange: Exchange) -> Self::Future {
60 let original = exchange.clone();
61 let endpoints = self.endpoints.clone();
62 let config = self.config.clone();
63
64 Box::pin(async move {
65 if endpoints.is_empty() {
67 return Ok(original);
68 }
69
70 let total = endpoints.len();
71
72 let results = if config.parallel {
73 process_parallel(exchange, endpoints, config.parallel_limit, total).await
75 } else {
76 process_sequential(exchange, endpoints, config.stop_on_exception, total).await
78 };
79
80 aggregate(results, original, config.aggregation)
82 })
83 }
84}
85
86async fn process_sequential(
89 exchange: Exchange,
90 endpoints: Vec<BoxProcessor>,
91 stop_on_exception: bool,
92 total: usize,
93) -> Vec<Result<Exchange, CamelError>> {
94 let mut results = Vec::with_capacity(endpoints.len());
95
96 for (i, endpoint) in endpoints.into_iter().enumerate() {
97 let mut cloned_exchange = exchange.clone();
99
100 cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
102 cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
103
104 let mut endpoint = endpoint;
105 match tower::ServiceExt::ready(&mut endpoint).await {
106 Err(e) => {
107 results.push(Err(e));
108 if stop_on_exception {
109 break;
110 }
111 }
112 Ok(svc) => {
113 let result = svc.call(cloned_exchange).await;
114 let is_err = result.is_err();
115 results.push(result);
116 if stop_on_exception && is_err {
117 break;
118 }
119 }
120 }
121 }
122
123 results
124}
125
126async fn process_parallel(
129 exchange: Exchange,
130 endpoints: Vec<BoxProcessor>,
131 parallel_limit: Option<usize>,
132 total: usize,
133) -> Vec<Result<Exchange, CamelError>> {
134 use std::sync::Arc;
135 use tokio::sync::Semaphore;
136
137 let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
138
139 let futures: Vec<_> = endpoints
141 .into_iter()
142 .enumerate()
143 .map(|(i, mut endpoint)| {
144 let mut ex = exchange.clone();
145 ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
146 ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
147 let sem = semaphore.clone();
148 async move {
149 let _permit = match &sem {
151 Some(s) => match s.acquire().await {
152 Ok(p) => Some(p),
153 Err(_) => {
154 return Err(CamelError::ProcessorError("semaphore closed".to_string()));
155 }
156 },
157 None => None,
158 };
159
160 tower::ServiceExt::ready(&mut endpoint).await?;
162 endpoint.call(ex).await
163 }
164 })
165 .collect();
166
167 futures::future::join_all(futures).await
169}
170
171fn aggregate(
174 results: Vec<Result<Exchange, CamelError>>,
175 original: Exchange,
176 strategy: MulticastStrategy,
177) -> Result<Exchange, CamelError> {
178 match strategy {
179 MulticastStrategy::LastWins => {
180 results.into_iter().last().unwrap_or_else(|| Ok(original))
183 }
184 MulticastStrategy::CollectAll => {
185 let mut bodies = Vec::new();
187 for result in results {
188 let ex = result?;
189 let value = match &ex.input.body {
190 Body::Text(s) => Value::String(s.clone()),
191 Body::Json(v) => v.clone(),
192 Body::Xml(s) => Value::String(s.clone()),
193 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
194 Body::Empty => Value::Null,
195 Body::Stream(s) => serde_json::json!({
196 "_stream": {
197 "origin": s.metadata.origin,
198 "placeholder": true,
199 "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
200 }
201 }),
202 };
203 bodies.push(value);
204 }
205 let mut out = original;
206 out.input.body = Body::Json(Value::Array(bodies));
207 Ok(out)
208 }
209 MulticastStrategy::Original => Ok(original),
210 MulticastStrategy::Custom(fold_fn) => {
211 let mut iter = results.into_iter();
213 let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
214 iter.try_fold(first, |acc, next_result| {
215 let next = next_result?;
216 Ok(fold_fn(acc, next))
217 })
218 }
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use super::*;
225 use camel_api::{BoxProcessorExt, Message};
226 use std::sync::Arc;
227 use std::sync::atomic::Ordering;
228 use tower::ServiceExt;
229
230 fn make_exchange(body: &str) -> Exchange {
233 Exchange::new(Message::new(body))
234 }
235
236 fn uppercase_processor() -> BoxProcessor {
237 BoxProcessor::from_fn(|mut ex: Exchange| {
238 Box::pin(async move {
239 if let Body::Text(s) = &ex.input.body {
240 ex.input.body = Body::Text(s.to_uppercase());
241 }
242 Ok(ex)
243 })
244 })
245 }
246
247 fn failing_processor() -> BoxProcessor {
248 BoxProcessor::from_fn(|_ex| {
249 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
250 })
251 }
252
253 #[tokio::test]
256 async fn test_multicast_sequential_last_wins() {
257 let endpoints = vec![
258 uppercase_processor(),
259 uppercase_processor(),
260 uppercase_processor(),
261 ];
262
263 let config = MulticastConfig::new(); let mut svc = MulticastService::new(endpoints, config);
265
266 let result = svc
267 .ready()
268 .await
269 .unwrap()
270 .call(make_exchange("hello"))
271 .await
272 .unwrap();
273
274 assert_eq!(result.input.body.as_text(), Some("HELLO"));
275 }
276
277 #[tokio::test]
280 async fn test_multicast_sequential_collect_all() {
281 let endpoints = vec![
282 uppercase_processor(),
283 uppercase_processor(),
284 uppercase_processor(),
285 ];
286
287 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
288 let mut svc = MulticastService::new(endpoints, config);
289
290 let result = svc
291 .ready()
292 .await
293 .unwrap()
294 .call(make_exchange("hello"))
295 .await
296 .unwrap();
297
298 let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
299 match &result.input.body {
300 Body::Json(v) => assert_eq!(*v, expected),
301 other => panic!("expected JSON body, got {other:?}"),
302 }
303 }
304
305 #[tokio::test]
308 async fn test_multicast_sequential_original() {
309 let endpoints = vec![
310 uppercase_processor(),
311 uppercase_processor(),
312 uppercase_processor(),
313 ];
314
315 let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
316 let mut svc = MulticastService::new(endpoints, config);
317
318 let result = svc
319 .ready()
320 .await
321 .unwrap()
322 .call(make_exchange("hello"))
323 .await
324 .unwrap();
325
326 assert_eq!(result.input.body.as_text(), Some("hello"));
328 }
329
330 #[tokio::test]
333 async fn test_multicast_sequential_custom_aggregation() {
334 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
335 Arc::new(|mut acc: Exchange, next: Exchange| {
336 let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
337 let next_text = next.input.body.as_text().unwrap_or("").to_string();
338 acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
339 acc
340 });
341
342 let endpoints = vec![
343 uppercase_processor(),
344 uppercase_processor(),
345 uppercase_processor(),
346 ];
347
348 let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
349 let mut svc = MulticastService::new(endpoints, config);
350
351 let result = svc
352 .ready()
353 .await
354 .unwrap()
355 .call(make_exchange("a"))
356 .await
357 .unwrap();
358
359 assert_eq!(result.input.body.as_text(), Some("A+A+A"));
360 }
361
362 #[tokio::test]
365 async fn test_multicast_stop_on_exception() {
366 let endpoints = vec![
367 uppercase_processor(),
368 failing_processor(),
369 uppercase_processor(),
370 ];
371
372 let config = MulticastConfig::new().stop_on_exception(true);
373 let mut svc = MulticastService::new(endpoints, config);
374
375 let result = svc
376 .ready()
377 .await
378 .unwrap()
379 .call(make_exchange("hello"))
380 .await;
381
382 assert!(result.is_err(), "expected error due to stop_on_exception");
383 }
384
385 #[tokio::test]
388 async fn test_multicast_continue_on_exception() {
389 let endpoints = vec![
390 uppercase_processor(),
391 failing_processor(),
392 uppercase_processor(),
393 ];
394
395 let config = MulticastConfig::new()
396 .stop_on_exception(false)
397 .aggregation(MulticastStrategy::LastWins);
398 let mut svc = MulticastService::new(endpoints, config);
399
400 let result = svc
401 .ready()
402 .await
403 .unwrap()
404 .call(make_exchange("hello"))
405 .await;
406
407 assert!(result.is_ok(), "last endpoint should succeed");
409 assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
410 }
411
412 #[tokio::test]
415 async fn test_multicast_stop_on_exception_halts_early() {
416 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
417
418 let executed = Arc::new(AtomicUsize::new(0));
420
421 let exec_clone1 = Arc::clone(&executed);
422 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
423 let e = Arc::clone(&exec_clone1);
424 Box::pin(async move {
425 e.fetch_add(1, AtomicOrdering::SeqCst);
426 Ok(ex)
427 })
428 });
429
430 let exec_clone2 = Arc::clone(&executed);
431 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
432 let e = Arc::clone(&exec_clone2);
433 Box::pin(async move {
434 e.fetch_add(1, AtomicOrdering::SeqCst);
435 Err(CamelError::ProcessorError("fail on 1".into()))
436 })
437 });
438
439 let exec_clone3 = Arc::clone(&executed);
440 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
441 let e = Arc::clone(&exec_clone3);
442 Box::pin(async move {
443 e.fetch_add(1, AtomicOrdering::SeqCst);
444 Ok(ex)
445 })
446 });
447
448 let endpoints = vec![endpoint0, endpoint1, endpoint2];
449 let config = MulticastConfig::new().stop_on_exception(true);
450 let mut svc = MulticastService::new(endpoints, config);
451
452 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
453 assert!(result.is_err(), "should fail at endpoint 1");
454
455 let count = executed.load(AtomicOrdering::SeqCst);
457 assert_eq!(
458 count, 2,
459 "endpoint 2 should not have executed due to stop_on_exception"
460 );
461 }
462
463 #[tokio::test]
466 async fn test_multicast_continue_on_exception_executes_all() {
467 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
468
469 let executed = Arc::new(AtomicUsize::new(0));
471
472 let exec_clone1 = Arc::clone(&executed);
473 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
474 let e = Arc::clone(&exec_clone1);
475 Box::pin(async move {
476 e.fetch_add(1, AtomicOrdering::SeqCst);
477 Ok(ex)
478 })
479 });
480
481 let exec_clone2 = Arc::clone(&executed);
482 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
483 let e = Arc::clone(&exec_clone2);
484 Box::pin(async move {
485 e.fetch_add(1, AtomicOrdering::SeqCst);
486 Err(CamelError::ProcessorError("fail on 1".into()))
487 })
488 });
489
490 let exec_clone3 = Arc::clone(&executed);
491 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
492 let e = Arc::clone(&exec_clone3);
493 Box::pin(async move {
494 e.fetch_add(1, AtomicOrdering::SeqCst);
495 Ok(ex)
496 })
497 });
498
499 let endpoints = vec![endpoint0, endpoint1, endpoint2];
500 let config = MulticastConfig::new()
501 .stop_on_exception(false)
502 .aggregation(MulticastStrategy::LastWins);
503 let mut svc = MulticastService::new(endpoints, config);
504
505 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
506 assert!(result.is_ok(), "last endpoint should succeed");
507
508 let count = executed.load(AtomicOrdering::SeqCst);
510 assert_eq!(
511 count, 3,
512 "all endpoints should have executed despite error in endpoint 1"
513 );
514 }
515
516 #[tokio::test]
519 async fn test_multicast_empty_endpoints() {
520 let endpoints: Vec<BoxProcessor> = vec![];
521
522 let config = MulticastConfig::new();
523 let mut svc = MulticastService::new(endpoints, config);
524
525 let mut ex = make_exchange("hello");
526 ex.set_property("marker", Value::Bool(true));
527
528 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
529 assert_eq!(result.input.body.as_text(), Some("hello"));
530 assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
531 }
532
533 #[tokio::test]
536 async fn test_multicast_metadata_properties() {
537 let recorder = BoxProcessor::from_fn(|ex: Exchange| {
539 Box::pin(async move {
540 let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
541 let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
542 let body = serde_json::json!({
543 "index": idx,
544 "complete": complete,
545 });
546 let mut out = ex;
547 out.input.body = Body::Json(body);
548 Ok(out)
549 })
550 });
551
552 let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
553
554 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
555 let mut svc = MulticastService::new(endpoints, config);
556
557 let result = svc
558 .ready()
559 .await
560 .unwrap()
561 .call(make_exchange("x"))
562 .await
563 .unwrap();
564
565 let expected = serde_json::json!([
566 {"index": 0, "complete": false},
567 {"index": 1, "complete": false},
568 {"index": 2, "complete": true},
569 ]);
570 match &result.input.body {
571 Body::Json(v) => assert_eq!(*v, expected),
572 other => panic!("expected JSON body, got {other:?}"),
573 }
574 }
575
576 #[tokio::test]
579 async fn test_poll_ready_delegates_to_endpoints() {
580 use std::sync::atomic::AtomicBool;
581
582 #[derive(Clone)]
584 struct DelayedReady {
585 ready: Arc<AtomicBool>,
586 }
587
588 impl Service<Exchange> for DelayedReady {
589 type Response = Exchange;
590 type Error = CamelError;
591 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
592
593 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
594 if self.ready.load(Ordering::SeqCst) {
595 Poll::Ready(Ok(()))
596 } else {
597 cx.waker().wake_by_ref();
598 Poll::Pending
599 }
600 }
601
602 fn call(&mut self, exchange: Exchange) -> Self::Future {
603 Box::pin(async move { Ok(exchange) })
604 }
605 }
606
607 let ready_flag = Arc::new(AtomicBool::new(false));
608 let inner = DelayedReady {
609 ready: Arc::clone(&ready_flag),
610 };
611 let boxed: BoxProcessor = BoxProcessor::new(inner);
612
613 let config = MulticastConfig::new();
614 let mut svc = MulticastService::new(vec![boxed], config);
615
616 let waker = futures::task::noop_waker();
618 let mut cx = Context::from_waker(&waker);
619 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
620 assert!(
621 poll.is_pending(),
622 "expected Pending when endpoint not ready"
623 );
624
625 ready_flag.store(true, Ordering::SeqCst);
627
628 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
629 assert!(
630 matches!(poll, Poll::Ready(Ok(()))),
631 "expected Ready after endpoint becomes ready"
632 );
633 }
634
635 #[tokio::test]
638 async fn test_multicast_collect_all_error_propagates() {
639 let endpoints = vec![
640 uppercase_processor(),
641 failing_processor(),
642 uppercase_processor(),
643 ];
644
645 let config = MulticastConfig::new()
646 .stop_on_exception(false)
647 .aggregation(MulticastStrategy::CollectAll);
648 let mut svc = MulticastService::new(endpoints, config);
649
650 let result = svc
651 .ready()
652 .await
653 .unwrap()
654 .call(make_exchange("hello"))
655 .await;
656
657 assert!(result.is_err(), "CollectAll should propagate first error");
658 }
659
660 #[tokio::test]
663 async fn test_multicast_last_wins_error_last() {
664 let endpoints = vec![
665 uppercase_processor(),
666 uppercase_processor(),
667 failing_processor(),
668 ];
669
670 let config = MulticastConfig::new()
671 .stop_on_exception(false)
672 .aggregation(MulticastStrategy::LastWins);
673 let mut svc = MulticastService::new(endpoints, config);
674
675 let result = svc
676 .ready()
677 .await
678 .unwrap()
679 .call(make_exchange("hello"))
680 .await;
681
682 assert!(result.is_err(), "LastWins should return last error");
683 }
684
685 #[tokio::test]
688 async fn test_multicast_custom_error_propagates() {
689 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
690 Arc::new(|acc: Exchange, _next: Exchange| acc);
691
692 let endpoints = vec![
693 uppercase_processor(),
694 failing_processor(),
695 uppercase_processor(),
696 ];
697
698 let config = MulticastConfig::new()
699 .stop_on_exception(false)
700 .aggregation(MulticastStrategy::Custom(joiner));
701 let mut svc = MulticastService::new(endpoints, config);
702
703 let result = svc
704 .ready()
705 .await
706 .unwrap()
707 .call(make_exchange("hello"))
708 .await;
709
710 assert!(
711 result.is_err(),
712 "Custom aggregation should propagate errors"
713 );
714 }
715
716 #[tokio::test]
719 async fn test_multicast_parallel_basic() {
720 let endpoints = vec![uppercase_processor(), uppercase_processor()];
721
722 let config = MulticastConfig::new()
723 .parallel(true)
724 .aggregation(MulticastStrategy::CollectAll);
725 let mut svc = MulticastService::new(endpoints, config);
726
727 let result = svc
728 .ready()
729 .await
730 .unwrap()
731 .call(make_exchange("test"))
732 .await
733 .unwrap();
734
735 match &result.input.body {
738 Body::Json(v) => {
739 let arr = v.as_array().expect("expected array");
740 assert_eq!(arr.len(), 2);
741 assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
742 }
743 other => panic!("expected JSON body, got {:?}", other),
744 }
745 }
746
747 #[tokio::test]
750 async fn test_multicast_parallel_with_limit() {
751 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
752
753 let concurrent = Arc::new(AtomicUsize::new(0));
754 let max_concurrent = Arc::new(AtomicUsize::new(0));
755
756 let endpoints: Vec<BoxProcessor> = (0..4)
757 .map(|_| {
758 let c = Arc::clone(&concurrent);
759 let mc = Arc::clone(&max_concurrent);
760 BoxProcessor::from_fn(move |ex: Exchange| {
761 let c = Arc::clone(&c);
762 let mc = Arc::clone(&mc);
763 Box::pin(async move {
764 let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
765 mc.fetch_max(current, AtomicOrdering::SeqCst);
766 tokio::task::yield_now().await;
767 c.fetch_sub(1, AtomicOrdering::SeqCst);
768 Ok(ex)
769 })
770 })
771 })
772 .collect();
773
774 let config = MulticastConfig::new().parallel(true).parallel_limit(2);
775 let mut svc = MulticastService::new(endpoints, config);
776
777 let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
778
779 let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
780 assert!(
781 observed_max <= 2,
782 "max concurrency was {}, expected <= 2",
783 observed_max
784 );
785 }
786
787 async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
790 use bytes::Bytes;
791 use camel_api::{Body, StreamBody, StreamMetadata};
792 use futures::stream;
793 use std::sync::Arc;
794 use tokio::sync::Mutex;
795
796 let chunks = vec![Ok(Bytes::from("test"))];
797 let stream_body = StreamBody {
798 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
799 metadata: StreamMetadata {
800 origin,
801 ..Default::default()
802 },
803 };
804
805 let stream_body_clone = stream_body.clone();
806 let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
807 let body_clone = stream_body_clone.clone();
808 Box::pin(async move {
809 let mut out = ex;
810 out.input.body = Body::Stream(body_clone);
811 Ok(out)
812 })
813 })];
814
815 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
816 let mut svc = MulticastService::new(endpoints, config);
817
818 svc.ready()
819 .await
820 .unwrap()
821 .call(Exchange::new(Message::new("")))
822 .await
823 .unwrap()
824 }
825
826 #[tokio::test]
827 async fn test_multicast_stream_bodies_creates_valid_json() {
828 use camel_api::Body;
829
830 let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
831
832 let Body::Json(value) = &result.input.body else {
833 panic!("Expected Json body, got {:?}", result.input.body);
834 };
835
836 let json_str = serde_json::to_string(&value).unwrap();
837 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
838
839 assert!(parsed.is_array());
840 let arr = parsed.as_array().unwrap();
841 assert_eq!(arr.len(), 1);
842 assert!(arr[0]["_stream"].is_object());
843 assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
844 assert_eq!(arr[0]["_stream"]["placeholder"], true);
845 }
846
847 #[tokio::test]
850 async fn test_multicast_stream_with_none_origin_creates_valid_json() {
851 use camel_api::Body;
852
853 let result = setup_multicast_stream_test(None).await;
854
855 let Body::Json(value) = &result.input.body else {
856 panic!("Expected Json body, got {:?}", result.input.body);
857 };
858
859 let json_str = serde_json::to_string(&value).unwrap();
860 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
861
862 assert!(parsed.is_array());
863 let arr = parsed.as_array().unwrap();
864 assert_eq!(arr.len(), 1);
865 assert!(arr[0]["_stream"].is_object());
866 assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
867 assert_eq!(arr[0]["_stream"]["placeholder"], true);
868 }
869}