1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use tower::Service;
6
7use camel_api::{
8 Body, BoxProcessor, CamelError, Exchange, MulticastConfig, MulticastStrategy, Value,
9};
10
11pub const CAMEL_MULTICAST_INDEX: &str = "CamelMulticastIndex";
15pub const CAMEL_MULTICAST_COMPLETE: &str = "CamelMulticastComplete";
17
18#[derive(Clone)]
30pub struct MulticastService {
31 endpoints: Vec<BoxProcessor>,
32 config: MulticastConfig,
33}
34
35impl MulticastService {
36 pub fn new(endpoints: Vec<BoxProcessor>, config: MulticastConfig) -> Self {
38 Self { endpoints, config }
39 }
40}
41
42impl Service<Exchange> for MulticastService {
43 type Response = Exchange;
44 type Error = CamelError;
45 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
46
47 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
48 for endpoint in &mut self.endpoints {
50 match endpoint.poll_ready(cx) {
51 Poll::Pending => return Poll::Pending,
52 Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
53 Poll::Ready(Ok(())) => {}
54 }
55 }
56 Poll::Ready(Ok(()))
57 }
58
59 fn call(&mut self, exchange: Exchange) -> Self::Future {
60 let original = exchange.clone();
61 let endpoints = self.endpoints.clone();
62 let config = self.config.clone();
63
64 Box::pin(async move {
65 if endpoints.is_empty() {
67 return Ok(original);
68 }
69
70 let total = endpoints.len();
71
72 let results = if config.parallel {
73 process_parallel(exchange, endpoints, config.parallel_limit, total).await
75 } else {
76 process_sequential(exchange, endpoints, config.stop_on_exception, total).await
78 };
79
80 aggregate(results, original, config.aggregation)
82 })
83 }
84}
85
86async fn process_sequential(
89 exchange: Exchange,
90 endpoints: Vec<BoxProcessor>,
91 stop_on_exception: bool,
92 total: usize,
93) -> Vec<Result<Exchange, CamelError>> {
94 let mut results = Vec::with_capacity(endpoints.len());
95
96 for (i, endpoint) in endpoints.into_iter().enumerate() {
97 let mut cloned_exchange = exchange.clone();
99
100 cloned_exchange.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
102 cloned_exchange.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
103
104 let mut endpoint = endpoint;
105 match tower::ServiceExt::ready(&mut endpoint).await {
106 Err(e) => {
107 results.push(Err(e));
108 if stop_on_exception {
109 break;
110 }
111 }
112 Ok(svc) => {
113 let result = svc.call(cloned_exchange).await;
114 let is_err = result.is_err();
115 results.push(result);
116 if stop_on_exception && is_err {
117 break;
118 }
119 }
120 }
121 }
122
123 results
124}
125
126async fn process_parallel(
129 exchange: Exchange,
130 endpoints: Vec<BoxProcessor>,
131 parallel_limit: Option<usize>,
132 total: usize,
133) -> Vec<Result<Exchange, CamelError>> {
134 use std::sync::Arc;
135 use tokio::sync::Semaphore;
136
137 let semaphore = parallel_limit.map(|limit| Arc::new(Semaphore::new(limit)));
138
139 let futures: Vec<_> = endpoints
141 .into_iter()
142 .enumerate()
143 .map(|(i, mut endpoint)| {
144 let mut ex = exchange.clone();
145 ex.set_property(CAMEL_MULTICAST_INDEX, Value::from(i as i64));
146 ex.set_property(CAMEL_MULTICAST_COMPLETE, Value::Bool(i == total - 1));
147 let sem = semaphore.clone();
148 async move {
149 let _permit = match &sem {
151 Some(s) => match s.acquire().await {
152 Ok(p) => Some(p),
153 Err(_) => {
154 return Err(CamelError::ProcessorError("semaphore closed".to_string()));
155 }
156 },
157 None => None,
158 };
159
160 tower::ServiceExt::ready(&mut endpoint).await?;
162 endpoint.call(ex).await
163 }
164 })
165 .collect();
166
167 futures::future::join_all(futures).await
169}
170
171fn aggregate(
174 results: Vec<Result<Exchange, CamelError>>,
175 original: Exchange,
176 strategy: MulticastStrategy,
177) -> Result<Exchange, CamelError> {
178 match strategy {
179 MulticastStrategy::LastWins => {
180 results.into_iter().last().unwrap_or_else(|| Ok(original))
183 }
184 MulticastStrategy::CollectAll => {
185 let mut bodies = Vec::new();
187 for result in results {
188 let ex = result?;
189 let value = match &ex.input.body {
190 Body::Text(s) => Value::String(s.clone()),
191 Body::Json(v) => v.clone(),
192 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
193 Body::Empty => Value::Null,
194 Body::Stream(s) => serde_json::json!({
195 "_stream": {
196 "origin": s.metadata.origin,
197 "placeholder": true,
198 "hint": "Materialize exchange body with .into_bytes() before multicast aggregation"
199 }
200 }),
201 };
202 bodies.push(value);
203 }
204 let mut out = original;
205 out.input.body = Body::Json(Value::Array(bodies));
206 Ok(out)
207 }
208 MulticastStrategy::Original => Ok(original),
209 MulticastStrategy::Custom(fold_fn) => {
210 let mut iter = results.into_iter();
212 let first = iter.next().unwrap_or_else(|| Ok(original.clone()))?;
213 iter.try_fold(first, |acc, next_result| {
214 let next = next_result?;
215 Ok(fold_fn(acc, next))
216 })
217 }
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use camel_api::{BoxProcessorExt, Message};
225 use std::sync::Arc;
226 use std::sync::atomic::Ordering;
227 use tower::ServiceExt;
228
229 fn make_exchange(body: &str) -> Exchange {
232 Exchange::new(Message::new(body))
233 }
234
235 fn uppercase_processor() -> BoxProcessor {
236 BoxProcessor::from_fn(|mut ex: Exchange| {
237 Box::pin(async move {
238 if let Body::Text(s) = &ex.input.body {
239 ex.input.body = Body::Text(s.to_uppercase());
240 }
241 Ok(ex)
242 })
243 })
244 }
245
246 fn failing_processor() -> BoxProcessor {
247 BoxProcessor::from_fn(|_ex| {
248 Box::pin(async { Err(CamelError::ProcessorError("boom".into())) })
249 })
250 }
251
252 #[tokio::test]
255 async fn test_multicast_sequential_last_wins() {
256 let endpoints = vec![
257 uppercase_processor(),
258 uppercase_processor(),
259 uppercase_processor(),
260 ];
261
262 let config = MulticastConfig::new(); let mut svc = MulticastService::new(endpoints, config);
264
265 let result = svc
266 .ready()
267 .await
268 .unwrap()
269 .call(make_exchange("hello"))
270 .await
271 .unwrap();
272
273 assert_eq!(result.input.body.as_text(), Some("HELLO"));
274 }
275
276 #[tokio::test]
279 async fn test_multicast_sequential_collect_all() {
280 let endpoints = vec![
281 uppercase_processor(),
282 uppercase_processor(),
283 uppercase_processor(),
284 ];
285
286 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
287 let mut svc = MulticastService::new(endpoints, config);
288
289 let result = svc
290 .ready()
291 .await
292 .unwrap()
293 .call(make_exchange("hello"))
294 .await
295 .unwrap();
296
297 let expected = serde_json::json!(["HELLO", "HELLO", "HELLO"]);
298 match &result.input.body {
299 Body::Json(v) => assert_eq!(*v, expected),
300 other => panic!("expected JSON body, got {other:?}"),
301 }
302 }
303
304 #[tokio::test]
307 async fn test_multicast_sequential_original() {
308 let endpoints = vec![
309 uppercase_processor(),
310 uppercase_processor(),
311 uppercase_processor(),
312 ];
313
314 let config = MulticastConfig::new().aggregation(MulticastStrategy::Original);
315 let mut svc = MulticastService::new(endpoints, config);
316
317 let result = svc
318 .ready()
319 .await
320 .unwrap()
321 .call(make_exchange("hello"))
322 .await
323 .unwrap();
324
325 assert_eq!(result.input.body.as_text(), Some("hello"));
327 }
328
329 #[tokio::test]
332 async fn test_multicast_sequential_custom_aggregation() {
333 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
334 Arc::new(|mut acc: Exchange, next: Exchange| {
335 let acc_text = acc.input.body.as_text().unwrap_or("").to_string();
336 let next_text = next.input.body.as_text().unwrap_or("").to_string();
337 acc.input.body = Body::Text(format!("{acc_text}+{next_text}"));
338 acc
339 });
340
341 let endpoints = vec![
342 uppercase_processor(),
343 uppercase_processor(),
344 uppercase_processor(),
345 ];
346
347 let config = MulticastConfig::new().aggregation(MulticastStrategy::Custom(joiner));
348 let mut svc = MulticastService::new(endpoints, config);
349
350 let result = svc
351 .ready()
352 .await
353 .unwrap()
354 .call(make_exchange("a"))
355 .await
356 .unwrap();
357
358 assert_eq!(result.input.body.as_text(), Some("A+A+A"));
359 }
360
361 #[tokio::test]
364 async fn test_multicast_stop_on_exception() {
365 let endpoints = vec![
366 uppercase_processor(),
367 failing_processor(),
368 uppercase_processor(),
369 ];
370
371 let config = MulticastConfig::new().stop_on_exception(true);
372 let mut svc = MulticastService::new(endpoints, config);
373
374 let result = svc
375 .ready()
376 .await
377 .unwrap()
378 .call(make_exchange("hello"))
379 .await;
380
381 assert!(result.is_err(), "expected error due to stop_on_exception");
382 }
383
384 #[tokio::test]
387 async fn test_multicast_continue_on_exception() {
388 let endpoints = vec![
389 uppercase_processor(),
390 failing_processor(),
391 uppercase_processor(),
392 ];
393
394 let config = MulticastConfig::new()
395 .stop_on_exception(false)
396 .aggregation(MulticastStrategy::LastWins);
397 let mut svc = MulticastService::new(endpoints, config);
398
399 let result = svc
400 .ready()
401 .await
402 .unwrap()
403 .call(make_exchange("hello"))
404 .await;
405
406 assert!(result.is_ok(), "last endpoint should succeed");
408 assert_eq!(result.unwrap().input.body.as_text(), Some("HELLO"));
409 }
410
411 #[tokio::test]
414 async fn test_multicast_stop_on_exception_halts_early() {
415 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
416
417 let executed = Arc::new(AtomicUsize::new(0));
419
420 let exec_clone1 = Arc::clone(&executed);
421 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
422 let e = Arc::clone(&exec_clone1);
423 Box::pin(async move {
424 e.fetch_add(1, AtomicOrdering::SeqCst);
425 Ok(ex)
426 })
427 });
428
429 let exec_clone2 = Arc::clone(&executed);
430 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
431 let e = Arc::clone(&exec_clone2);
432 Box::pin(async move {
433 e.fetch_add(1, AtomicOrdering::SeqCst);
434 Err(CamelError::ProcessorError("fail on 1".into()))
435 })
436 });
437
438 let exec_clone3 = Arc::clone(&executed);
439 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
440 let e = Arc::clone(&exec_clone3);
441 Box::pin(async move {
442 e.fetch_add(1, AtomicOrdering::SeqCst);
443 Ok(ex)
444 })
445 });
446
447 let endpoints = vec![endpoint0, endpoint1, endpoint2];
448 let config = MulticastConfig::new().stop_on_exception(true);
449 let mut svc = MulticastService::new(endpoints, config);
450
451 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
452 assert!(result.is_err(), "should fail at endpoint 1");
453
454 let count = executed.load(AtomicOrdering::SeqCst);
456 assert_eq!(
457 count, 2,
458 "endpoint 2 should not have executed due to stop_on_exception"
459 );
460 }
461
462 #[tokio::test]
465 async fn test_multicast_continue_on_exception_executes_all() {
466 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
467
468 let executed = Arc::new(AtomicUsize::new(0));
470
471 let exec_clone1 = Arc::clone(&executed);
472 let endpoint0 = BoxProcessor::from_fn(move |ex: Exchange| {
473 let e = Arc::clone(&exec_clone1);
474 Box::pin(async move {
475 e.fetch_add(1, AtomicOrdering::SeqCst);
476 Ok(ex)
477 })
478 });
479
480 let exec_clone2 = Arc::clone(&executed);
481 let endpoint1 = BoxProcessor::from_fn(move |_ex: Exchange| {
482 let e = Arc::clone(&exec_clone2);
483 Box::pin(async move {
484 e.fetch_add(1, AtomicOrdering::SeqCst);
485 Err(CamelError::ProcessorError("fail on 1".into()))
486 })
487 });
488
489 let exec_clone3 = Arc::clone(&executed);
490 let endpoint2 = BoxProcessor::from_fn(move |ex: Exchange| {
491 let e = Arc::clone(&exec_clone3);
492 Box::pin(async move {
493 e.fetch_add(1, AtomicOrdering::SeqCst);
494 Ok(ex)
495 })
496 });
497
498 let endpoints = vec![endpoint0, endpoint1, endpoint2];
499 let config = MulticastConfig::new()
500 .stop_on_exception(false)
501 .aggregation(MulticastStrategy::LastWins);
502 let mut svc = MulticastService::new(endpoints, config);
503
504 let result = svc.ready().await.unwrap().call(make_exchange("x")).await;
505 assert!(result.is_ok(), "last endpoint should succeed");
506
507 let count = executed.load(AtomicOrdering::SeqCst);
509 assert_eq!(
510 count, 3,
511 "all endpoints should have executed despite error in endpoint 1"
512 );
513 }
514
515 #[tokio::test]
518 async fn test_multicast_empty_endpoints() {
519 let endpoints: Vec<BoxProcessor> = vec![];
520
521 let config = MulticastConfig::new();
522 let mut svc = MulticastService::new(endpoints, config);
523
524 let mut ex = make_exchange("hello");
525 ex.set_property("marker", Value::Bool(true));
526
527 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
528 assert_eq!(result.input.body.as_text(), Some("hello"));
529 assert_eq!(result.property("marker"), Some(&Value::Bool(true)));
530 }
531
532 #[tokio::test]
535 async fn test_multicast_metadata_properties() {
536 let recorder = BoxProcessor::from_fn(|ex: Exchange| {
538 Box::pin(async move {
539 let idx = ex.property(CAMEL_MULTICAST_INDEX).cloned();
540 let complete = ex.property(CAMEL_MULTICAST_COMPLETE).cloned();
541 let body = serde_json::json!({
542 "index": idx,
543 "complete": complete,
544 });
545 let mut out = ex;
546 out.input.body = Body::Json(body);
547 Ok(out)
548 })
549 });
550
551 let endpoints = vec![recorder.clone(), recorder.clone(), recorder];
552
553 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
554 let mut svc = MulticastService::new(endpoints, config);
555
556 let result = svc
557 .ready()
558 .await
559 .unwrap()
560 .call(make_exchange("x"))
561 .await
562 .unwrap();
563
564 let expected = serde_json::json!([
565 {"index": 0, "complete": false},
566 {"index": 1, "complete": false},
567 {"index": 2, "complete": true},
568 ]);
569 match &result.input.body {
570 Body::Json(v) => assert_eq!(*v, expected),
571 other => panic!("expected JSON body, got {other:?}"),
572 }
573 }
574
575 #[tokio::test]
578 async fn test_poll_ready_delegates_to_endpoints() {
579 use std::sync::atomic::AtomicBool;
580
581 #[derive(Clone)]
583 struct DelayedReady {
584 ready: Arc<AtomicBool>,
585 }
586
587 impl Service<Exchange> for DelayedReady {
588 type Response = Exchange;
589 type Error = CamelError;
590 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
591
592 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
593 if self.ready.load(Ordering::SeqCst) {
594 Poll::Ready(Ok(()))
595 } else {
596 cx.waker().wake_by_ref();
597 Poll::Pending
598 }
599 }
600
601 fn call(&mut self, exchange: Exchange) -> Self::Future {
602 Box::pin(async move { Ok(exchange) })
603 }
604 }
605
606 let ready_flag = Arc::new(AtomicBool::new(false));
607 let inner = DelayedReady {
608 ready: Arc::clone(&ready_flag),
609 };
610 let boxed: BoxProcessor = BoxProcessor::new(inner);
611
612 let config = MulticastConfig::new();
613 let mut svc = MulticastService::new(vec![boxed], config);
614
615 let waker = futures::task::noop_waker();
617 let mut cx = Context::from_waker(&waker);
618 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
619 assert!(
620 poll.is_pending(),
621 "expected Pending when endpoint not ready"
622 );
623
624 ready_flag.store(true, Ordering::SeqCst);
626
627 let poll = Pin::new(&mut svc).poll_ready(&mut cx);
628 assert!(
629 matches!(poll, Poll::Ready(Ok(()))),
630 "expected Ready after endpoint becomes ready"
631 );
632 }
633
634 #[tokio::test]
637 async fn test_multicast_collect_all_error_propagates() {
638 let endpoints = vec![
639 uppercase_processor(),
640 failing_processor(),
641 uppercase_processor(),
642 ];
643
644 let config = MulticastConfig::new()
645 .stop_on_exception(false)
646 .aggregation(MulticastStrategy::CollectAll);
647 let mut svc = MulticastService::new(endpoints, config);
648
649 let result = svc
650 .ready()
651 .await
652 .unwrap()
653 .call(make_exchange("hello"))
654 .await;
655
656 assert!(result.is_err(), "CollectAll should propagate first error");
657 }
658
659 #[tokio::test]
662 async fn test_multicast_last_wins_error_last() {
663 let endpoints = vec![
664 uppercase_processor(),
665 uppercase_processor(),
666 failing_processor(),
667 ];
668
669 let config = MulticastConfig::new()
670 .stop_on_exception(false)
671 .aggregation(MulticastStrategy::LastWins);
672 let mut svc = MulticastService::new(endpoints, config);
673
674 let result = svc
675 .ready()
676 .await
677 .unwrap()
678 .call(make_exchange("hello"))
679 .await;
680
681 assert!(result.is_err(), "LastWins should return last error");
682 }
683
684 #[tokio::test]
687 async fn test_multicast_custom_error_propagates() {
688 let joiner: Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync> =
689 Arc::new(|acc: Exchange, _next: Exchange| acc);
690
691 let endpoints = vec![
692 uppercase_processor(),
693 failing_processor(),
694 uppercase_processor(),
695 ];
696
697 let config = MulticastConfig::new()
698 .stop_on_exception(false)
699 .aggregation(MulticastStrategy::Custom(joiner));
700 let mut svc = MulticastService::new(endpoints, config);
701
702 let result = svc
703 .ready()
704 .await
705 .unwrap()
706 .call(make_exchange("hello"))
707 .await;
708
709 assert!(
710 result.is_err(),
711 "Custom aggregation should propagate errors"
712 );
713 }
714
715 #[tokio::test]
718 async fn test_multicast_parallel_basic() {
719 let endpoints = vec![uppercase_processor(), uppercase_processor()];
720
721 let config = MulticastConfig::new()
722 .parallel(true)
723 .aggregation(MulticastStrategy::CollectAll);
724 let mut svc = MulticastService::new(endpoints, config);
725
726 let result = svc
727 .ready()
728 .await
729 .unwrap()
730 .call(make_exchange("test"))
731 .await
732 .unwrap();
733
734 match &result.input.body {
737 Body::Json(v) => {
738 let arr = v.as_array().expect("expected array");
739 assert_eq!(arr.len(), 2);
740 assert!(arr.iter().all(|v| v.as_str() == Some("TEST")));
741 }
742 other => panic!("expected JSON body, got {:?}", other),
743 }
744 }
745
746 #[tokio::test]
749 async fn test_multicast_parallel_with_limit() {
750 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
751
752 let concurrent = Arc::new(AtomicUsize::new(0));
753 let max_concurrent = Arc::new(AtomicUsize::new(0));
754
755 let endpoints: Vec<BoxProcessor> = (0..4)
756 .map(|_| {
757 let c = Arc::clone(&concurrent);
758 let mc = Arc::clone(&max_concurrent);
759 BoxProcessor::from_fn(move |ex: Exchange| {
760 let c = Arc::clone(&c);
761 let mc = Arc::clone(&mc);
762 Box::pin(async move {
763 let current = c.fetch_add(1, AtomicOrdering::SeqCst) + 1;
764 mc.fetch_max(current, AtomicOrdering::SeqCst);
765 tokio::task::yield_now().await;
766 c.fetch_sub(1, AtomicOrdering::SeqCst);
767 Ok(ex)
768 })
769 })
770 })
771 .collect();
772
773 let config = MulticastConfig::new().parallel(true).parallel_limit(2);
774 let mut svc = MulticastService::new(endpoints, config);
775
776 let _ = svc.ready().await.unwrap().call(make_exchange("x")).await;
777
778 let observed_max = max_concurrent.load(std::sync::atomic::Ordering::SeqCst);
779 assert!(
780 observed_max <= 2,
781 "max concurrency was {}, expected <= 2",
782 observed_max
783 );
784 }
785
786 async fn setup_multicast_stream_test(origin: Option<String>) -> Exchange {
789 use bytes::Bytes;
790 use camel_api::{Body, StreamBody, StreamMetadata};
791 use futures::stream;
792 use std::sync::Arc;
793 use tokio::sync::Mutex;
794
795 let chunks = vec![Ok(Bytes::from("test"))];
796 let stream_body = StreamBody {
797 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
798 metadata: StreamMetadata {
799 origin,
800 ..Default::default()
801 },
802 };
803
804 let stream_body_clone = stream_body.clone();
805 let endpoints = vec![BoxProcessor::from_fn(move |ex: Exchange| {
806 let body_clone = stream_body_clone.clone();
807 Box::pin(async move {
808 let mut out = ex;
809 out.input.body = Body::Stream(body_clone);
810 Ok(out)
811 })
812 })];
813
814 let config = MulticastConfig::new().aggregation(MulticastStrategy::CollectAll);
815 let mut svc = MulticastService::new(endpoints, config);
816
817 svc.ready()
818 .await
819 .unwrap()
820 .call(Exchange::new(Message::new("")))
821 .await
822 .unwrap()
823 }
824
825 #[tokio::test]
826 async fn test_multicast_stream_bodies_creates_valid_json() {
827 use camel_api::Body;
828
829 let result = setup_multicast_stream_test(Some("http://example.com/data".to_string())).await;
830
831 let Body::Json(value) = &result.input.body else {
832 panic!("Expected Json body, got {:?}", result.input.body);
833 };
834
835 let json_str = serde_json::to_string(&value).unwrap();
836 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
837
838 assert!(parsed.is_array());
839 let arr = parsed.as_array().unwrap();
840 assert_eq!(arr.len(), 1);
841 assert!(arr[0]["_stream"].is_object());
842 assert_eq!(arr[0]["_stream"]["origin"], "http://example.com/data");
843 assert_eq!(arr[0]["_stream"]["placeholder"], true);
844 }
845
846 #[tokio::test]
849 async fn test_multicast_stream_with_none_origin_creates_valid_json() {
850 use camel_api::Body;
851
852 let result = setup_multicast_stream_test(None).await;
853
854 let Body::Json(value) = &result.input.body else {
855 panic!("Expected Json body, got {:?}", result.input.body);
856 };
857
858 let json_str = serde_json::to_string(&value).unwrap();
859 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
860
861 assert!(parsed.is_array());
862 let arr = parsed.as_array().unwrap();
863 assert_eq!(arr.len(), 1);
864 assert!(arr[0]["_stream"].is_object());
865 assert_eq!(arr[0]["_stream"]["origin"], serde_json::Value::Null);
866 assert_eq!(arr[0]["_stream"]["placeholder"], true);
867 }
868}