1use bevy_ecs::prelude::{Entity, World};
19
20use std::sync::OnceLock;
21
22use crate::{ManageInput, OperationError, OperationResult, OperationRoster, OrBroken};
23
24mod anonymous_stream;
25pub use anonymous_stream::*;
26
27mod dynamically_named_stream;
28pub use dynamically_named_stream::*;
29
30mod named_stream;
31pub use named_stream::*;
32
33mod stream_availability;
34pub use stream_availability::*;
35
36mod stream_buffer;
37pub use stream_buffer::*;
38
39mod stream_channel;
40pub use stream_channel::*;
41
42mod stream_filter;
43pub use stream_filter::*;
44
45mod stream_pack;
46pub use stream_pack::*;
47
48mod stream_target_map;
49pub use stream_target_map::*;
50
51pub use crossflow_derive::{Stream, StreamPack};
54pub trait StreamEffect: 'static + Send + Sync + Sized {
67 type Input: 'static + Send + Sync;
68 type Output: 'static + Send + Sync;
69
70 fn side_effect(
73 input: Self::Input,
74 request: &mut StreamRequest,
75 ) -> Result<Self::Output, OperationError>;
76}
77
78#[derive(Stream)]
82pub struct StreamOf<T: 'static + Send + Sync>(std::marker::PhantomData<fn(T)>);
83
84impl<T: 'static + Send + Sync> Clone for StreamOf<T> {
85 fn clone(&self) -> Self {
86 *self
87 }
88}
89
90impl<T: 'static + Send + Sync> Copy for StreamOf<T> {}
91
92impl<T: 'static + Send + Sync> std::fmt::Debug for StreamOf<T> {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 static NAME: OnceLock<String> = OnceLock::new();
95 let name = NAME.get_or_init(|| format!("StreamOf<{}>", std::any::type_name::<T>(),));
96
97 f.debug_struct(name.as_str()).finish()
98 }
99}
100
101impl<T: 'static + Send + Sync> StreamEffect for StreamOf<T> {
102 type Input = T;
103 type Output = T;
104
105 fn side_effect(
107 input: Self::Input,
108 _: &mut StreamRequest,
109 ) -> Result<Self::Output, OperationError> {
110 Ok(input)
111 }
112}
113
114pub struct StreamRequest<'a> {
119 pub source: Entity,
121 pub session: Entity,
123 pub target: Option<Entity>,
125 pub world: &'a mut World,
127 pub roster: &'a mut OperationRoster,
129}
130
131impl<'a> StreamRequest<'a> {
132 pub fn send_output<T: 'static + Send + Sync>(self, output: T) -> OperationResult {
133 let Self {
134 session,
135 target,
136 world,
137 roster,
138 ..
139 } = self;
140 if let Some(target) = target {
141 world
142 .get_entity_mut(target)
143 .or_broken()?
144 .give_input(session, output, roster)?;
145 }
146
147 Ok(())
148 }
149}
150
151#[cfg(test)]
152pub(crate) mod tests {
153 use crate::{dyn_node::*, prelude::*, testing::*};
154 use std::borrow::Cow;
155
156 #[test]
157 fn test_single_stream() {
158 let mut context = TestingContext::minimal_plugins();
159
160 let count_blocking_srv = context.command(|commands| {
161 commands.spawn_service(|In(input): BlockingServiceInput<u32, StreamOf<u32>>| {
162 for i in 0..input.request {
163 input.streams.send(i);
164 }
165 return input.request;
166 })
167 });
168
169 test_counting_stream(count_blocking_srv, &mut context);
170
171 let count_async_srv = context.command(|commands| {
172 commands.spawn_service(
173 |In(input): AsyncServiceInput<u32, StreamOf<u32>>| async move {
174 for i in 0..input.request {
175 input.streams.send(i);
176 }
177 return input.request;
178 },
179 )
180 });
181
182 test_counting_stream(count_async_srv, &mut context);
183
184 let count_blocking_callback = (|In(input): BlockingCallbackInput<u32, StreamOf<u32>>| {
185 for i in 0..input.request {
186 input.streams.send(i);
187 }
188 return input.request;
189 })
190 .as_callback();
191
192 test_counting_stream(count_blocking_callback, &mut context);
193
194 let count_async_callback =
195 (|In(input): AsyncCallbackInput<u32, StreamOf<u32>>| async move {
196 for i in 0..input.request {
197 input.streams.send(i);
198 }
199 return input.request;
200 })
201 .as_callback();
202
203 test_counting_stream(count_async_callback, &mut context);
204
205 let count_blocking_map = (|input: BlockingMap<u32, StreamOf<u32>>| {
206 for i in 0..input.request {
207 input.streams.send(i);
208 }
209 return input.request;
210 })
211 .as_map();
212
213 test_counting_stream(count_blocking_map, &mut context);
214
215 let count_async_map = (|input: AsyncMap<u32, StreamOf<u32>>| async move {
216 for i in 0..input.request {
217 input.streams.send(i);
218 }
219 return input.request;
220 })
221 .as_map();
222
223 test_counting_stream(count_async_map, &mut context);
224 }
225
226 fn test_counting_stream(
227 provider: impl Provider<Request = u32, Response = u32, Streams = StreamOf<u32>>,
228 context: &mut TestingContext,
229 ) {
230 let mut capture = context.command(|commands| commands.request(10, provider).capture());
231
232 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
233 let r = capture.outcome.try_recv().unwrap().unwrap();
234 assert_eq!(r, 10);
235
236 let mut stream: Vec<u32> = Vec::new();
237 while let Ok(r) = capture.streams.try_recv() {
238 stream.push(r);
239 }
240 assert_eq!(stream, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
241 context.assert_no_errors();
242 }
243
244 type FormatStreams = (StreamOf<u32>, StreamOf<i32>, StreamOf<f32>);
245 #[test]
246 fn test_tuple_stream() {
247 let mut context = TestingContext::minimal_plugins();
248
249 let parse_blocking_srv = context.command(|commands| {
250 commands.spawn_service(|In(input): BlockingServiceInput<String, FormatStreams>| {
251 impl_formatting_streams_blocking(input.request, input.streams);
252 })
253 });
254
255 validate_formatting_stream(parse_blocking_srv, &mut context);
256
257 let parse_async_srv = context.command(|commands| {
258 commands.spawn_service(
259 |In(input): AsyncServiceInput<String, FormatStreams>| async move {
260 impl_formatting_streams_async(input.request, input.streams);
261 },
262 )
263 });
264
265 validate_formatting_stream(parse_async_srv, &mut context);
266
267 let parse_continuous_srv = context
268 .app
269 .spawn_continuous_service(Update, impl_formatting_streams_continuous);
270
271 validate_formatting_stream(parse_continuous_srv, &mut context);
272
273 let parse_blocking_callback =
274 (|In(input): BlockingCallbackInput<String, FormatStreams>| {
275 impl_formatting_streams_blocking(input.request, input.streams);
276 })
277 .as_callback();
278
279 validate_formatting_stream(parse_blocking_callback, &mut context);
280
281 let parse_async_callback =
282 (|In(input): AsyncCallbackInput<String, FormatStreams>| async move {
283 impl_formatting_streams_async(input.request, input.streams);
284 })
285 .as_callback();
286
287 validate_formatting_stream(parse_async_callback, &mut context);
288
289 let parse_blocking_map = (|input: BlockingMap<String, FormatStreams>| {
290 impl_formatting_streams_blocking(input.request, input.streams);
291 })
292 .as_map();
293
294 validate_formatting_stream(parse_blocking_map, &mut context);
295
296 let parse_async_map = (|input: AsyncMap<String, FormatStreams>| async move {
297 impl_formatting_streams_async(input.request, input.streams);
298 })
299 .as_map();
300
301 validate_formatting_stream(parse_async_map, &mut context);
302
303 let make_workflow = |service: Service<String, (), FormatStreams>| {
304 move |scope: Scope<String, (), FormatStreams>, builder: &mut Builder| {
305 let node = builder
306 .chain(scope.start)
307 .map_block(move |value| (value, service.into()))
308 .then_injection_node();
309
310 builder.connect(node.streams.0, scope.streams.0);
311 builder.connect(node.streams.1, scope.streams.1);
312 builder.connect(node.streams.2, scope.streams.2);
313
314 builder.connect(node.output, scope.terminate);
315 }
316 };
317
318 let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
319 validate_formatting_stream(blocking_injection_workflow, &mut context);
320
321 let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
322 validate_formatting_stream(async_injection_workflow, &mut context);
323
324 let continuous_injection_workflow =
325 context.spawn_workflow(make_workflow(parse_continuous_srv));
326 validate_formatting_stream(continuous_injection_workflow, &mut context);
327
328 let nested_workflow = context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
329 let inner_node = builder
330 .chain(scope.start)
331 .then_node(continuous_injection_workflow);
332 builder.connect(inner_node.streams.0, scope.streams.0);
333 builder.connect(inner_node.streams.1, scope.streams.1);
334 builder.connect(inner_node.streams.2, scope.streams.2);
335 builder.connect(inner_node.output, scope.terminate);
336 });
337 validate_formatting_stream(nested_workflow, &mut context);
338
339 let double_nested_workflow =
340 context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
341 let inner_node = builder.create_node(nested_workflow);
342 builder.connect(scope.start, inner_node.input);
343 builder.connect(inner_node.streams.0, scope.streams.0);
344 builder.connect(inner_node.streams.1, scope.streams.1);
345 builder.connect(inner_node.streams.2, scope.streams.2);
346 builder.connect(inner_node.output, scope.terminate);
347 });
348 validate_formatting_stream(double_nested_workflow, &mut context);
349 }
350
351 fn impl_formatting_streams_blocking(
352 request: String,
353 streams: <FormatStreams as StreamPack>::StreamBuffers,
354 ) {
355 if let Ok(value) = request.parse::<u32>() {
356 streams.0.send(value);
357 }
358
359 if let Ok(value) = request.parse::<i32>() {
360 streams.1.send(value);
361 }
362
363 if let Ok(value) = request.parse::<f32>() {
364 streams.2.send(value);
365 }
366 }
367
368 fn impl_formatting_streams_async(
369 request: String,
370 streams: <FormatStreams as StreamPack>::StreamChannels,
371 ) {
372 if let Ok(value) = request.parse::<u32>() {
373 streams.0.send(value);
374 }
375
376 if let Ok(value) = request.parse::<i32>() {
377 streams.1.send(value);
378 }
379
380 if let Ok(value) = request.parse::<f32>() {
381 streams.2.send(value);
382 }
383 }
384
385 fn impl_formatting_streams_continuous(
386 In(ContinuousService { key }): In<ContinuousService<String, (), FormatStreams>>,
387 mut param: ContinuousQuery<String, (), FormatStreams>,
388 ) {
389 param.get_mut(&key).unwrap().for_each(|order| {
390 if let Ok(value) = order.request().parse::<u32>() {
391 order.streams().0.send(value);
392 }
393
394 if let Ok(value) = order.request().parse::<i32>() {
395 order.streams().1.send(value);
396 }
397
398 if let Ok(value) = order.request().parse::<f32>() {
399 order.streams().2.send(value);
400 }
401
402 order.respond(());
403 });
404 }
405
406 fn validate_formatting_stream(
407 provider: impl Provider<Request = String, Response = (), Streams = FormatStreams> + Clone,
408 context: &mut TestingContext,
409 ) {
410 let mut capture = context
411 .command(|commands| commands.request("5".to_owned(), provider.clone()).capture());
412
413 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
414 assert!(capture.outcome.is_available());
415 context.assert_no_errors();
416
417 let outcome: FormatOutcome = capture.into();
418 assert_eq!(outcome.stream_u32, [5]);
419 assert_eq!(outcome.stream_i32, [5]);
420 assert_eq!(outcome.stream_f32, [5.0]);
421
422 let mut capture = context.command(|commands| {
423 commands
424 .request("-2".to_owned(), provider.clone())
425 .capture()
426 });
427
428 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
429 assert!(capture.outcome.is_available());
430 assert!(context.no_unhandled_errors());
431
432 let outcome: FormatOutcome = capture.into();
433 assert!(outcome.stream_u32.is_empty());
434 assert_eq!(outcome.stream_i32, [-2]);
435 assert_eq!(outcome.stream_f32, [-2.0]);
436
437 let mut capture = context.command(|commands| {
438 commands
439 .request("6.7".to_owned(), provider.clone())
440 .capture()
441 });
442
443 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
444 assert!(capture.outcome.is_available());
445 context.assert_no_errors();
446
447 let outcome: FormatOutcome = capture.into();
448 assert!(outcome.stream_u32.is_empty());
449 assert!(outcome.stream_i32.is_empty());
450 assert_eq!(outcome.stream_f32, [6.7]);
451
452 let mut capture = context.command(|commands| {
453 commands
454 .request("hello".to_owned(), provider.clone())
455 .capture()
456 });
457
458 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
459 assert!(capture.outcome.is_available());
460 context.assert_no_errors();
461
462 let outcome: FormatOutcome = capture.into();
463 assert!(outcome.stream_u32.is_empty());
464 assert!(outcome.stream_i32.is_empty());
465 assert!(outcome.stream_f32.is_empty());
466 }
467
468 #[derive(Default)]
469 struct FormatOutcome {
470 stream_u32: Vec<u32>,
471 stream_i32: Vec<i32>,
472 stream_f32: Vec<f32>,
473 }
474
475 impl From<Capture<(), FormatStreams>> for FormatOutcome {
476 fn from(mut capture: Capture<(), FormatStreams>) -> Self {
477 let mut result = Self::default();
478 while let Ok(r) = capture.streams.0.try_recv() {
479 result.stream_u32.push(r);
480 }
481
482 while let Ok(r) = capture.streams.1.try_recv() {
483 result.stream_i32.push(r);
484 }
485
486 while let Ok(r) = capture.streams.2.try_recv() {
487 result.stream_f32.push(r);
488 }
489
490 result
491 }
492 }
493
494 #[test]
495 fn test_stream_pack() {
496 let mut context = TestingContext::minimal_plugins();
497
498 let parse_blocking_srv = context.command(|commands| {
499 commands.spawn_service(
500 |In(input): BlockingServiceInput<Vec<String>, TestStreamPack>| {
501 impl_stream_pack_test_blocking(input.request, input.streams);
502 },
503 )
504 });
505
506 validate_stream_pack(parse_blocking_srv, &mut context);
507
508 let parse_async_srv = context.command(|commands| {
509 commands.spawn_service(
510 |In(input): AsyncServiceInput<Vec<String>, TestStreamPack>| async move {
511 impl_stream_pack_test_async(input.request, input.streams);
512 },
513 )
514 });
515
516 validate_stream_pack(parse_async_srv, &mut context);
517
518 let parse_continuous_srv = context
519 .app
520 .spawn_continuous_service(Update, impl_stream_pack_test_continuous);
521
522 validate_stream_pack(parse_continuous_srv, &mut context);
523
524 let parse_blocking_callback =
525 (|In(input): BlockingCallbackInput<Vec<String>, TestStreamPack>| {
526 impl_stream_pack_test_blocking(input.request, input.streams);
527 })
528 .as_callback();
529
530 validate_stream_pack(parse_blocking_callback, &mut context);
531
532 let parse_async_callback =
533 (|In(input): AsyncCallbackInput<Vec<String>, TestStreamPack>| async move {
534 impl_stream_pack_test_async(input.request, input.streams);
535 })
536 .as_callback();
537
538 validate_stream_pack(parse_async_callback, &mut context);
539
540 let parse_blocking_map = (|input: BlockingMap<Vec<String>, TestStreamPack>| {
541 impl_stream_pack_test_blocking(input.request, input.streams);
542 })
543 .as_map();
544
545 validate_stream_pack(parse_blocking_map, &mut context);
546
547 let parse_async_map = (|input: AsyncMap<Vec<String>, TestStreamPack>| async move {
548 impl_stream_pack_test_async(input.request, input.streams);
549 })
550 .as_map();
551
552 validate_stream_pack(parse_async_map, &mut context);
553
554 let make_workflow = |service: Service<Vec<String>, (), TestStreamPack>| {
555 move |scope: Scope<Vec<String>, (), TestStreamPack>, builder: &mut Builder| {
556 let node = builder
557 .chain(scope.start)
558 .map_block(move |value| (value, service.into()))
559 .then_injection_node();
560
561 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
562 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
563 builder.connect(node.streams.stream_string, scope.streams.stream_string);
564
565 builder.connect(node.output, scope.terminate);
566 }
567 };
568
569 let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
570 validate_stream_pack(blocking_injection_workflow, &mut context);
571
572 let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
573 validate_stream_pack(async_injection_workflow, &mut context);
574
575 let continuous_injection_workflow =
576 context.spawn_workflow(make_workflow(parse_continuous_srv));
577 validate_stream_pack(continuous_injection_workflow, &mut context);
578
579 let nested_workflow =
580 context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
581 let node = builder.chain(scope.start).then_node(parse_continuous_srv);
582
583 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
584 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
585 builder.connect(node.streams.stream_string, scope.streams.stream_string);
586
587 builder.connect(node.output, scope.terminate);
588 });
589 validate_stream_pack(nested_workflow, &mut context);
590
591 let double_nested_workflow =
592 context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
593 let node = builder.chain(scope.start).then_node(nested_workflow);
594
595 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
596 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
597 builder.connect(node.streams.stream_string, scope.streams.stream_string);
598
599 builder.connect(node.output, scope.terminate);
600 });
601 validate_stream_pack(double_nested_workflow, &mut context);
602
603 let scoped_workflow =
604 context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
605 let inner_scope =
606 builder.create_scope::<_, _, TestStreamPack, _>(|scope, builder| {
607 let node = builder.chain(scope.start).then_node(parse_continuous_srv);
608
609 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
610 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
611 builder.connect(node.streams.stream_string, scope.streams.stream_string);
612
613 builder.connect(node.output, scope.terminate);
614 });
615
616 builder.connect(scope.start, inner_scope.input);
617
618 builder.connect(inner_scope.streams.stream_u32, scope.streams.stream_u32);
619 builder.connect(inner_scope.streams.stream_i32, scope.streams.stream_i32);
620 builder.connect(
621 inner_scope.streams.stream_string,
622 scope.streams.stream_string,
623 );
624
625 builder.connect(inner_scope.output, scope.terminate);
626 });
627 validate_stream_pack(scoped_workflow, &mut context);
628
629 let dyn_stream_workflow =
630 context.spawn_workflow::<Vec<String>, (), TestStreamPack, _>(|scope, builder| {
631 let dyn_scope_input: DynOutput = scope.start.into();
632
633 let node = builder.create_node(parse_continuous_srv);
634 let mut dyn_node: DynNode = node.into();
635
636 dyn_scope_input
637 .connect_to(&dyn_node.input, builder)
638 .unwrap();
639
640 let dyn_scope_stream_u32: DynInputSlot = scope.streams.stream_u32.into();
641 let dyn_node_stream_u32 = dyn_node.streams.take_named("stream_u32").unwrap();
642 dyn_node_stream_u32
643 .connect_to(&dyn_scope_stream_u32, builder)
644 .unwrap();
645
646 let dyn_scope_stream_i32: DynInputSlot = scope.streams.stream_i32.into();
647 let dyn_node_stream_i32 = dyn_node.streams.take_named("stream_i32").unwrap();
648 dyn_node_stream_i32
649 .connect_to(&dyn_scope_stream_i32, builder)
650 .unwrap();
651
652 let dyn_scope_stream_string: DynInputSlot = scope.streams.stream_string.into();
653 let dyn_node_stream_string = dyn_node.streams.take_named("stream_string").unwrap();
654 dyn_node_stream_string
655 .connect_to(&dyn_scope_stream_string, builder)
656 .unwrap();
657
658 let terminate: DynInputSlot = scope.terminate.into();
659 dyn_node.output.connect_to(&terminate, builder).unwrap();
660 });
661 validate_stream_pack(dyn_stream_workflow, &mut context);
662
663 validate_dynamically_named_stream_receiver(parse_blocking_srv, &mut context);
666 validate_dynamically_named_stream_receiver(parse_async_srv, &mut context);
667 validate_dynamically_named_stream_receiver(parse_continuous_srv, &mut context);
668 validate_dynamically_named_stream_receiver(blocking_injection_workflow, &mut context);
669 validate_dynamically_named_stream_receiver(async_injection_workflow, &mut context);
670 validate_dynamically_named_stream_receiver(continuous_injection_workflow, &mut context);
671 validate_dynamically_named_stream_receiver(nested_workflow, &mut context);
672 validate_dynamically_named_stream_receiver(double_nested_workflow, &mut context);
673 }
674
675 fn validate_stream_pack(
676 provider: impl Provider<Request = Vec<String>, Response = (), Streams = TestStreamPack> + Clone,
677 context: &mut TestingContext,
678 ) {
679 let request = vec![
680 "5".to_owned(),
681 "10".to_owned(),
682 "-3".to_owned(),
683 "-27".to_owned(),
684 "hello".to_owned(),
685 ];
686
687 let mut capture =
688 context.command(|commands| commands.request(request, provider.clone()).capture());
689
690 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
691 assert!(capture.outcome.is_available());
692 context.assert_no_errors();
693
694 let outcome: StreamMapOutcome = capture.into();
695 assert_eq!(outcome.stream_u32, [5, 10]);
696 assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
697 assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
698
699 let request = vec![];
700
701 let mut capture =
702 context.command(|commands| commands.request(request, provider.clone()).capture());
703
704 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
705 assert!(capture.outcome.is_available());
706 context.assert_no_errors();
707
708 let outcome: StreamMapOutcome = capture.into();
709 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
710 assert_eq!(outcome.stream_i32, Vec::<i32>::new());
711 assert_eq!(outcome.stream_string, Vec::<String>::new());
712
713 let request = vec![
714 "foo".to_string(),
715 "bar".to_string(),
716 "1.32".to_string(),
717 "-8".to_string(),
718 ];
719
720 let mut capture =
721 context.command(|commands| commands.request(request, provider.clone()).capture());
722
723 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
724 assert!(capture.outcome.is_available());
725
726 let outcome: StreamMapOutcome = capture.into();
727 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
728 assert_eq!(outcome.stream_i32, [-8]);
729 assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
730 }
731
732 fn validate_dynamically_named_stream_receiver(
733 provider: Service<Vec<String>, (), TestStreamPack>,
734 context: &mut TestingContext,
735 ) {
736 let provider: Service<Vec<String>, (), TestDynamicNamedStreams> =
737 provider.optional_stream_cast();
738
739 let request = vec![
740 "5".to_owned(),
741 "10".to_owned(),
742 "-3".to_owned(),
743 "-27".to_owned(),
744 "hello".to_owned(),
745 ];
746
747 let mut capture =
748 context.command(|commands| commands.request(request, provider.clone()).capture());
749
750 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
751 assert!(capture.outcome.is_available());
752 assert!(
753 context.no_unhandled_errors(),
754 "{:#?}",
755 context.get_unhandled_errors()
756 );
757
758 let outcome: StreamMapOutcome = capture.try_into().unwrap();
759 assert_eq!(outcome.stream_u32, [5, 10]);
760 assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
761 assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
762
763 let request = vec![];
764
765 let mut capture =
766 context.command(|commands| commands.request(request, provider.clone()).capture());
767
768 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
769 assert!(capture.outcome.is_available());
770 assert!(
771 context.no_unhandled_errors(),
772 "{:#?}",
773 context.get_unhandled_errors()
774 );
775
776 let outcome: StreamMapOutcome = capture.try_into().unwrap();
777 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
778 assert_eq!(outcome.stream_i32, Vec::<i32>::new());
779 assert_eq!(outcome.stream_string, Vec::<String>::new());
780
781 let request = vec![
782 "foo".to_string(),
783 "bar".to_string(),
784 "1.32".to_string(),
785 "-8".to_string(),
786 ];
787
788 let mut capture =
789 context.command(|commands| commands.request(request, provider.clone()).capture());
790
791 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
792 assert!(capture.outcome.is_available());
793
794 let outcome: StreamMapOutcome = capture.try_into().unwrap();
795 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
796 assert_eq!(outcome.stream_i32, [-8]);
797 assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
798 }
799
800 fn impl_stream_pack_test_blocking(
801 request: Vec<String>,
802 streams: <TestStreamPack as StreamPack>::StreamBuffers,
803 ) {
804 for r in request {
805 if let Ok(value) = r.parse::<u32>() {
806 streams.stream_u32.send(value);
807 }
808
809 if let Ok(value) = r.parse::<i32>() {
810 streams.stream_i32.send(value);
811 }
812
813 streams.stream_string.send(r);
814 }
815 }
816
817 fn impl_stream_pack_test_async(
818 request: Vec<String>,
819 streams: <TestStreamPack as StreamPack>::StreamChannels,
820 ) {
821 for r in request {
822 if let Ok(value) = r.parse::<u32>() {
823 streams.stream_u32.send(value);
824 }
825
826 if let Ok(value) = r.parse::<i32>() {
827 streams.stream_i32.send(value);
828 }
829
830 streams.stream_string.send(r);
831 }
832 }
833
834 fn impl_stream_pack_test_continuous(
835 In(ContinuousService { key }): In<ContinuousService<Vec<String>, (), TestStreamPack>>,
836 mut param: ContinuousQuery<Vec<String>, (), TestStreamPack>,
837 ) {
838 param.get_mut(&key).unwrap().for_each(|order| {
839 for r in order.request().clone() {
840 if let Ok(value) = r.parse::<u32>() {
841 order.streams().stream_u32.send(value);
842 }
843
844 if let Ok(value) = r.parse::<i32>() {
845 order.streams().stream_i32.send(value);
846 }
847
848 order.streams().stream_string.send(r);
849 }
850
851 order.respond(());
852 });
853 }
854
855 #[derive(Default)]
856 struct StreamMapOutcome {
857 stream_u32: Vec<u32>,
858 stream_i32: Vec<i32>,
859 stream_string: Vec<String>,
860 }
861
862 impl From<Capture<(), TestStreamPack>> for StreamMapOutcome {
863 fn from(mut capture: Capture<(), TestStreamPack>) -> Self {
864 let mut result = Self::default();
865 while let Ok(r) = capture.streams.stream_u32.try_recv() {
866 result.stream_u32.push(r);
867 }
868
869 while let Ok(r) = capture.streams.stream_i32.try_recv() {
870 result.stream_i32.push(r);
871 }
872
873 while let Ok(r) = capture.streams.stream_string.try_recv() {
874 result.stream_string.push(r);
875 }
876
877 result
878 }
879 }
880
881 type TestDynamicNamedStreams = (
882 DynamicallyNamedStream<StreamOf<u32>>,
883 DynamicallyNamedStream<StreamOf<i32>>,
884 DynamicallyNamedStream<StreamOf<String>>,
885 );
886
887 impl TryFrom<Capture<(), TestDynamicNamedStreams>> for StreamMapOutcome {
888 type Error = UnknownName;
889 fn try_from(
890 mut capture: Capture<(), TestDynamicNamedStreams>,
891 ) -> Result<Self, Self::Error> {
892 let mut result = Self::default();
893 while let Ok(NamedValue { name, value }) = capture.streams.0.try_recv() {
894 if name == "stream_u32" {
895 result.stream_u32.push(value);
896 } else {
897 return Err(UnknownName { name });
898 }
899 }
900
901 while let Ok(NamedValue { name, value }) = capture.streams.1.try_recv() {
902 if name == "stream_i32" {
903 result.stream_i32.push(value);
904 } else {
905 return Err(UnknownName { name });
906 }
907 }
908
909 while let Ok(NamedValue { name, value }) = capture.streams.2.try_recv() {
910 if name == "stream_string" {
911 result.stream_string.push(value);
912 } else {
913 return Err(UnknownName { name });
914 }
915 }
916
917 Ok(result)
918 }
919 }
920
921 #[test]
922 fn test_dynamically_named_streams() {
923 let mut context = TestingContext::minimal_plugins();
924
925 let parse_blocking_srv = context.command(|commands| {
926 commands.spawn_service(
927 |In(input): BlockingServiceInput<NamedInputs, TestDynamicNamedStreams>| {
928 impl_dynamically_named_streams_blocking(input.request, input.streams);
929 },
930 )
931 });
932
933 validate_dynamically_named_streams(parse_blocking_srv, &mut context);
934
935 let parse_async_srv = context.command(|commands| {
936 commands.spawn_service(
937 |In(input): AsyncServiceInput<NamedInputs, TestDynamicNamedStreams>| async move {
938 impl_dynamically_named_streams_async(input.request, input.streams);
939 },
940 )
941 });
942
943 validate_dynamically_named_streams(parse_async_srv, &mut context);
944
945 let parse_continuous_srv = context
946 .app
947 .spawn_continuous_service(Update, impl_dynamically_named_streams_continuous);
948
949 validate_dynamically_named_streams(parse_continuous_srv, &mut context);
950
951 let parse_blocking_callback =
952 (|In(input): BlockingCallbackInput<NamedInputs, TestDynamicNamedStreams>| {
953 impl_dynamically_named_streams_blocking(input.request, input.streams);
954 })
955 .as_callback();
956
957 validate_dynamically_named_streams(parse_blocking_callback, &mut context);
958
959 let parse_async_callback =
960 (|In(input): AsyncCallbackInput<NamedInputs, TestDynamicNamedStreams>| async move {
961 impl_dynamically_named_streams_async(input.request, input.streams);
962 })
963 .as_callback();
964
965 validate_dynamically_named_streams(parse_async_callback, &mut context);
966
967 let parse_blocking_map = (|input: BlockingMap<NamedInputs, TestDynamicNamedStreams>| {
968 impl_dynamically_named_streams_blocking(input.request, input.streams);
969 })
970 .as_map();
971
972 validate_dynamically_named_streams(parse_blocking_map, &mut context);
973
974 let parse_async_map = (|input: AsyncMap<NamedInputs, TestDynamicNamedStreams>| async move {
975 impl_dynamically_named_streams_async(input.request, input.streams);
976 })
977 .as_map();
978
979 validate_dynamically_named_streams(parse_async_map, &mut context);
980
981 let make_workflow = |service: Service<NamedInputs, (), TestDynamicNamedStreams>| {
982 move |scope: Scope<NamedInputs, (), TestDynamicNamedStreams>, builder: &mut Builder| {
983 let node = builder
984 .chain(scope.start)
985 .map_block(move |value| (value, service.into()))
986 .then_injection_node();
987
988 builder.connect(node.streams.0, scope.streams.0);
989 builder.connect(node.streams.1, scope.streams.1);
990 builder.connect(node.streams.2, scope.streams.2);
991
992 builder.connect(node.output, scope.terminate);
993 }
994 };
995
996 let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
997 validate_dynamically_named_streams(blocking_injection_workflow, &mut context);
998
999 let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
1000 validate_dynamically_named_streams(async_injection_workflow, &mut context);
1001
1002 let continuous_injection_workflow =
1003 context.spawn_workflow(make_workflow(parse_continuous_srv));
1004 validate_dynamically_named_streams(continuous_injection_workflow, &mut context);
1005
1006 let nested_workflow =
1007 context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1008 let node = builder.chain(scope.start).then_node(parse_continuous_srv);
1009
1010 builder.connect(node.streams.0, scope.streams.0);
1011 builder.connect(node.streams.1, scope.streams.1);
1012 builder.connect(node.streams.2, scope.streams.2);
1013
1014 builder.connect(node.output, scope.terminate);
1015 });
1016 validate_dynamically_named_streams(nested_workflow, &mut context);
1017
1018 let double_nested_workflow =
1019 context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1020 let node = builder.chain(scope.start).then_node(nested_workflow);
1021
1022 builder.connect(node.streams.0, scope.streams.0);
1023 builder.connect(node.streams.1, scope.streams.1);
1024 builder.connect(node.streams.2, scope.streams.2);
1025
1026 builder.connect(node.output, scope.terminate);
1027 });
1028 validate_dynamically_named_streams(double_nested_workflow, &mut context);
1029
1030 let scoped_workflow =
1031 context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1032 let inner_scope =
1033 builder.create_scope::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1034 let node = builder.chain(scope.start).then_node(parse_continuous_srv);
1035
1036 builder.connect(node.streams.0, scope.streams.0);
1037 builder.connect(node.streams.1, scope.streams.1);
1038 builder.connect(node.streams.2, scope.streams.2);
1039
1040 builder.connect(node.output, scope.terminate);
1041 });
1042
1043 builder.connect(scope.start, inner_scope.input);
1044
1045 builder.connect(inner_scope.streams.0, scope.streams.0);
1046 builder.connect(inner_scope.streams.1, scope.streams.1);
1047 builder.connect(inner_scope.streams.2, scope.streams.2);
1048
1049 builder.connect(inner_scope.output, scope.terminate);
1050 });
1051 validate_dynamically_named_streams(scoped_workflow, &mut context);
1052
1053 validate_dynamically_named_streams_into_stream_pack(parse_blocking_srv, &mut context);
1056 validate_dynamically_named_streams_into_stream_pack(parse_async_srv, &mut context);
1057 validate_dynamically_named_streams_into_stream_pack(parse_continuous_srv, &mut context);
1058 validate_dynamically_named_streams_into_stream_pack(
1059 blocking_injection_workflow,
1060 &mut context,
1061 );
1062 validate_dynamically_named_streams_into_stream_pack(async_injection_workflow, &mut context);
1063 validate_dynamically_named_streams_into_stream_pack(
1064 continuous_injection_workflow,
1065 &mut context,
1066 );
1067 validate_dynamically_named_streams_into_stream_pack(nested_workflow, &mut context);
1068 validate_dynamically_named_streams_into_stream_pack(double_nested_workflow, &mut context);
1069 }
1070
1071 fn validate_dynamically_named_streams(
1072 provider: impl Provider<Request = NamedInputs, Response = (), Streams = TestDynamicNamedStreams>
1073 + Clone,
1074 context: &mut TestingContext,
1075 ) {
1076 let expected_values_u32 = vec![
1077 NamedValue::new("stream_u32", 5),
1078 NamedValue::new("stream_u32", 10),
1079 NamedValue::new("stream_i32", 12),
1080 ];
1081
1082 let expected_values_i32 = vec![
1083 NamedValue::new("stream_i32", 2),
1084 NamedValue::new("stream_i32", -5),
1085 NamedValue::new("stream_u32", 7),
1086 ];
1087
1088 let expected_values_string = vec![
1089 NamedValue::new("stream_string", "hello".to_owned()),
1090 NamedValue::new("stream_string", "8".to_owned()),
1091 NamedValue::new("stream_u32", "22".to_owned()),
1092 ];
1093
1094 let request = NamedInputs {
1095 values_u32: expected_values_u32.clone(),
1096 values_i32: expected_values_i32.clone(),
1097 values_string: expected_values_string.clone(),
1098 };
1099
1100 let mut capture = context.command(|commands| commands.request(request, provider).capture());
1101
1102 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
1103 assert!(capture.outcome.is_available());
1104 context.assert_no_errors();
1105
1106 let received_values_u32 = collect_received_values(capture.streams.0);
1107 assert_eq!(expected_values_u32, received_values_u32);
1108
1109 let received_values_i32 = collect_received_values(capture.streams.1);
1110 assert_eq!(expected_values_i32, received_values_i32);
1111
1112 let received_values_string = collect_received_values(capture.streams.2);
1113 assert_eq!(expected_values_string, received_values_string);
1114 }
1115
1116 pub fn collect_received_values<T>(mut receiver: crate::Receiver<T>) -> Vec<T> {
1117 let mut result = Vec::new();
1118 while let Ok(value) = receiver.try_recv() {
1119 result.push(value);
1120 }
1121 result
1122 }
1123
1124 fn validate_dynamically_named_streams_into_stream_pack(
1125 provider: Service<NamedInputs, (), TestDynamicNamedStreams>,
1126 context: &mut TestingContext,
1127 ) {
1128 let provider: Service<NamedInputs, (), TestStreamPack> = provider.optional_stream_cast();
1129
1130 let request = NamedInputs {
1131 values_u32: vec![
1132 NamedValue::new("stream_u32", 5),
1133 NamedValue::new("stream_u32", 10),
1134 NamedValue::new("stream_i32", 12),
1137 ],
1138 values_i32: vec![
1139 NamedValue::new("stream_i32", 2),
1140 NamedValue::new("stream_i32", -5),
1141 NamedValue::new("stream_u32", 7),
1144 ],
1145 values_string: vec![
1146 NamedValue::new("stream_string", "hello".to_owned()),
1147 NamedValue::new("stream_string", "8".to_owned()),
1148 NamedValue::new("stream_u32", "22".to_owned()),
1151 ],
1152 };
1153
1154 let mut capture = context.command(|commands| commands.request(request, provider).capture());
1155
1156 context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
1157 assert!(capture.outcome.is_available());
1158 assert!(context.no_unhandled_errors());
1159
1160 let outcome: StreamMapOutcome = capture.try_into().unwrap();
1161 assert_eq!(outcome.stream_u32, [5, 10]);
1162 assert_eq!(outcome.stream_i32, [2, -5]);
1163 assert_eq!(outcome.stream_string, ["hello", "8"]);
1164 }
1165
1166 fn impl_dynamically_named_streams_blocking(
1167 request: NamedInputs,
1168 streams: <TestDynamicNamedStreams as StreamPack>::StreamBuffers,
1169 ) {
1170 for nv in request.values_u32 {
1171 streams.0.send(nv);
1172 }
1173
1174 for nv in request.values_i32 {
1175 streams.1.send(nv);
1176 }
1177
1178 for nv in request.values_string {
1179 streams.2.send(nv);
1180 }
1181 }
1182
1183 fn impl_dynamically_named_streams_async(
1184 request: NamedInputs,
1185 streams: <TestDynamicNamedStreams as StreamPack>::StreamChannels,
1186 ) {
1187 for nv in request.values_u32 {
1188 streams.0.send(nv);
1189 }
1190
1191 for nv in request.values_i32 {
1192 streams.1.send(nv);
1193 }
1194
1195 for nv in request.values_string {
1196 streams.2.send(nv);
1197 }
1198 }
1199
1200 fn impl_dynamically_named_streams_continuous(
1201 In(ContinuousService { key }): In<
1202 ContinuousService<NamedInputs, (), TestDynamicNamedStreams>,
1203 >,
1204 mut param: ContinuousQuery<NamedInputs, (), TestDynamicNamedStreams>,
1205 ) {
1206 param.get_mut(&key).unwrap().for_each(|order| {
1207 for nv in order.request().values_u32.iter() {
1208 order.streams().0.send(nv.clone());
1209 }
1210
1211 for nv in order.request().values_i32.iter() {
1212 order.streams().1.send(nv.clone());
1213 }
1214
1215 for nv in order.request().values_string.iter() {
1216 order.streams().2.send(nv.clone());
1217 }
1218
1219 order.respond(());
1220 });
1221 }
1222
1223 struct NamedInputs {
1224 values_u32: Vec<NamedValue<u32>>,
1225 values_i32: Vec<NamedValue<i32>>,
1226 values_string: Vec<NamedValue<String>>,
1227 }
1228
1229 #[derive(thiserror::Error, Debug)]
1230 #[error("received unknown name: {name}")]
1231 struct UnknownName {
1232 name: Cow<'static, str>,
1233 }
1234
1235 #[derive(StreamPack)]
1236 pub(crate) struct TestStreamPack {
1237 stream_u32: u32,
1238 stream_i32: i32,
1239 stream_string: String,
1240 }
1241}