1use bevy_ecs::prelude::{Entity, World};
19
20use std::sync::OnceLock;
21
22use crate::{ManageInput, OperationError, OperationResult, OperationRoster, OrBroken};
23
24mod anonymous_stream;
25pub use anonymous_stream::*;
26
27mod dynamically_named_stream;
28pub use dynamically_named_stream::*;
29
30mod named_stream;
31pub use named_stream::*;
32
33mod stream_availability;
34pub use stream_availability::*;
35
36mod stream_buffer;
37pub use stream_buffer::*;
38
39mod stream_channel;
40pub use stream_channel::*;
41
42mod stream_filter;
43pub use stream_filter::*;
44
45mod stream_pack;
46pub use stream_pack::*;
47
48mod stream_target_map;
49pub use stream_target_map::*;
50
51pub use crossflow_derive::{Stream, StreamPack};
54pub trait StreamEffect: 'static + Send + Sync + Sized {
67 type Input: 'static + Send + Sync;
68 type Output: 'static + Send + Sync;
69
70 fn side_effect(
73 input: Self::Input,
74 request: &mut StreamRequest,
75 ) -> Result<Self::Output, OperationError>;
76}
77
78#[derive(Stream)]
82pub struct StreamOf<T: 'static + Send + Sync>(std::marker::PhantomData<fn(T)>);
83
84impl<T: 'static + Send + Sync> Clone for StreamOf<T> {
85 fn clone(&self) -> Self {
86 *self
87 }
88}
89
90impl<T: 'static + Send + Sync> Copy for StreamOf<T> {}
91
92impl<T: 'static + Send + Sync> std::fmt::Debug for StreamOf<T> {
93 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94 static NAME: OnceLock<String> = OnceLock::new();
95 let name = NAME.get_or_init(|| format!("StreamOf<{}>", std::any::type_name::<T>(),));
96
97 f.debug_struct(name.as_str()).finish()
98 }
99}
100
101impl<T: 'static + Send + Sync> StreamEffect for StreamOf<T> {
102 type Input = T;
103 type Output = T;
104
105 fn side_effect(
107 input: Self::Input,
108 _: &mut StreamRequest,
109 ) -> Result<Self::Output, OperationError> {
110 Ok(input)
111 }
112}
113
114pub struct StreamRequest<'a> {
119 pub source: Entity,
121 pub session: Entity,
123 pub target: Option<Entity>,
125 pub world: &'a mut World,
127 pub roster: &'a mut OperationRoster,
129}
130
131impl<'a> StreamRequest<'a> {
132 pub fn send_output<T: 'static + Send + Sync>(self, output: T) -> OperationResult {
133 let Self {
134 session,
135 target,
136 world,
137 roster,
138 ..
139 } = self;
140 if let Some(target) = target {
141 world
142 .get_entity_mut(target)
143 .or_broken()?
144 .give_input(session, output, roster)?;
145 }
146
147 Ok(())
148 }
149}
150
151#[cfg(test)]
152pub(crate) mod tests {
153 use crate::{dyn_node::*, prelude::*, testing::*};
154 use std::borrow::Cow;
155
156 #[test]
157 fn test_single_stream() {
158 let mut context = TestingContext::minimal_plugins();
159
160 let count_blocking_srv = context.command(|commands| {
161 commands.spawn_service(|In(input): BlockingServiceInput<u32, StreamOf<u32>>| {
162 for i in 0..input.request {
163 input.streams.send(i);
164 }
165 return input.request;
166 })
167 });
168
169 test_counting_stream(count_blocking_srv, &mut context);
170
171 let count_async_srv = context.command(|commands| {
172 commands.spawn_service(
173 |In(input): AsyncServiceInput<u32, StreamOf<u32>>| async move {
174 for i in 0..input.request {
175 input.streams.send(i);
176 }
177 return input.request;
178 },
179 )
180 });
181
182 test_counting_stream(count_async_srv, &mut context);
183
184 let count_blocking_callback = (|In(input): BlockingCallbackInput<u32, StreamOf<u32>>| {
185 for i in 0..input.request {
186 input.streams.send(i);
187 }
188 return input.request;
189 })
190 .as_callback();
191
192 test_counting_stream(count_blocking_callback, &mut context);
193
194 let count_async_callback =
195 (|In(input): AsyncCallbackInput<u32, StreamOf<u32>>| async move {
196 for i in 0..input.request {
197 input.streams.send(i);
198 }
199 return input.request;
200 })
201 .as_callback();
202
203 test_counting_stream(count_async_callback, &mut context);
204
205 let count_blocking_map = (|input: BlockingMap<u32, StreamOf<u32>>| {
206 for i in 0..input.request {
207 input.streams.send(i);
208 }
209 return input.request;
210 })
211 .as_map();
212
213 test_counting_stream(count_blocking_map, &mut context);
214
215 let count_async_map = (|input: AsyncMap<u32, StreamOf<u32>>| async move {
216 for i in 0..input.request {
217 input.streams.send(i);
218 }
219 return input.request;
220 })
221 .as_map();
222
223 test_counting_stream(count_async_map, &mut context);
224 }
225
226 fn test_counting_stream(
227 provider: impl Provider<Request = u32, Response = u32, Streams = StreamOf<u32>>,
228 context: &mut TestingContext,
229 ) {
230 let mut recipient = context.command(|commands| commands.request(10, provider).take());
231
232 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
233 assert!(recipient
234 .response
235 .take()
236 .available()
237 .is_some_and(|v| v == 10));
238
239 let mut stream: Vec<u32> = Vec::new();
240 while let Ok(r) = recipient.streams.try_recv() {
241 stream.push(r);
242 }
243 assert_eq!(stream, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
244 assert!(context.no_unhandled_errors());
245 }
246
247 type FormatStreams = (StreamOf<u32>, StreamOf<i32>, StreamOf<f32>);
248 #[test]
249 fn test_tuple_stream() {
250 let mut context = TestingContext::minimal_plugins();
251
252 let parse_blocking_srv = context.command(|commands| {
253 commands.spawn_service(|In(input): BlockingServiceInput<String, FormatStreams>| {
254 impl_formatting_streams_blocking(input.request, input.streams);
255 })
256 });
257
258 validate_formatting_stream(parse_blocking_srv, &mut context);
259
260 let parse_async_srv = context.command(|commands| {
261 commands.spawn_service(
262 |In(input): AsyncServiceInput<String, FormatStreams>| async move {
263 impl_formatting_streams_async(input.request, input.streams);
264 },
265 )
266 });
267
268 validate_formatting_stream(parse_async_srv, &mut context);
269
270 let parse_continuous_srv = context
271 .app
272 .spawn_continuous_service(Update, impl_formatting_streams_continuous);
273
274 validate_formatting_stream(parse_continuous_srv, &mut context);
275
276 let parse_blocking_callback =
277 (|In(input): BlockingCallbackInput<String, FormatStreams>| {
278 impl_formatting_streams_blocking(input.request, input.streams);
279 })
280 .as_callback();
281
282 validate_formatting_stream(parse_blocking_callback, &mut context);
283
284 let parse_async_callback =
285 (|In(input): AsyncCallbackInput<String, FormatStreams>| async move {
286 impl_formatting_streams_async(input.request, input.streams);
287 })
288 .as_callback();
289
290 validate_formatting_stream(parse_async_callback, &mut context);
291
292 let parse_blocking_map = (|input: BlockingMap<String, FormatStreams>| {
293 impl_formatting_streams_blocking(input.request, input.streams);
294 })
295 .as_map();
296
297 validate_formatting_stream(parse_blocking_map, &mut context);
298
299 let parse_async_map = (|input: AsyncMap<String, FormatStreams>| async move {
300 impl_formatting_streams_async(input.request, input.streams);
301 })
302 .as_map();
303
304 validate_formatting_stream(parse_async_map, &mut context);
305
306 let make_workflow = |service: Service<String, (), FormatStreams>| {
307 move |scope: Scope<String, (), FormatStreams>, builder: &mut Builder| {
308 let node = scope
309 .input
310 .chain(builder)
311 .map_block(move |value| (value, service.into()))
312 .then_injection_node();
313
314 builder.connect(node.streams.0, scope.streams.0);
315 builder.connect(node.streams.1, scope.streams.1);
316 builder.connect(node.streams.2, scope.streams.2);
317
318 builder.connect(node.output, scope.terminate);
319 }
320 };
321
322 let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
323 validate_formatting_stream(blocking_injection_workflow, &mut context);
324
325 let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
326 validate_formatting_stream(async_injection_workflow, &mut context);
327
328 let continuous_injection_workflow =
329 context.spawn_workflow(make_workflow(parse_continuous_srv));
330 validate_formatting_stream(continuous_injection_workflow, &mut context);
331
332 let nested_workflow = context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
333 let inner_node = scope
334 .input
335 .chain(builder)
336 .then_node(continuous_injection_workflow);
337 builder.connect(inner_node.streams.0, scope.streams.0);
338 builder.connect(inner_node.streams.1, scope.streams.1);
339 builder.connect(inner_node.streams.2, scope.streams.2);
340 builder.connect(inner_node.output, scope.terminate);
341 });
342 validate_formatting_stream(nested_workflow, &mut context);
343
344 let double_nested_workflow =
345 context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
346 let inner_node = builder.create_node(nested_workflow);
347 builder.connect(scope.input, inner_node.input);
348 builder.connect(inner_node.streams.0, scope.streams.0);
349 builder.connect(inner_node.streams.1, scope.streams.1);
350 builder.connect(inner_node.streams.2, scope.streams.2);
351 builder.connect(inner_node.output, scope.terminate);
352 });
353 validate_formatting_stream(double_nested_workflow, &mut context);
354 }
355
356 fn impl_formatting_streams_blocking(
357 request: String,
358 streams: <FormatStreams as StreamPack>::StreamBuffers,
359 ) {
360 if let Ok(value) = request.parse::<u32>() {
361 streams.0.send(value);
362 }
363
364 if let Ok(value) = request.parse::<i32>() {
365 streams.1.send(value);
366 }
367
368 if let Ok(value) = request.parse::<f32>() {
369 streams.2.send(value);
370 }
371 }
372
373 fn impl_formatting_streams_async(
374 request: String,
375 streams: <FormatStreams as StreamPack>::StreamChannels,
376 ) {
377 if let Ok(value) = request.parse::<u32>() {
378 streams.0.send(value);
379 }
380
381 if let Ok(value) = request.parse::<i32>() {
382 streams.1.send(value);
383 }
384
385 if let Ok(value) = request.parse::<f32>() {
386 streams.2.send(value);
387 }
388 }
389
390 fn impl_formatting_streams_continuous(
391 In(ContinuousService { key }): In<ContinuousService<String, (), FormatStreams>>,
392 mut param: ContinuousQuery<String, (), FormatStreams>,
393 ) {
394 param.get_mut(&key).unwrap().for_each(|order| {
395 if let Ok(value) = order.request().parse::<u32>() {
396 order.streams().0.send(value);
397 }
398
399 if let Ok(value) = order.request().parse::<i32>() {
400 order.streams().1.send(value);
401 }
402
403 if let Ok(value) = order.request().parse::<f32>() {
404 order.streams().2.send(value);
405 }
406
407 order.respond(());
408 });
409 }
410
411 fn validate_formatting_stream(
412 provider: impl Provider<Request = String, Response = (), Streams = FormatStreams> + Clone,
413 context: &mut TestingContext,
414 ) {
415 let mut recipient =
416 context.command(|commands| commands.request("5".to_owned(), provider.clone()).take());
417
418 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
419 assert!(recipient.response.take().available().is_some());
420 assert!(context.no_unhandled_errors());
421
422 let outcome: FormatOutcome = recipient.into();
423 assert_eq!(outcome.stream_u32, [5]);
424 assert_eq!(outcome.stream_i32, [5]);
425 assert_eq!(outcome.stream_f32, [5.0]);
426
427 let mut recipient =
428 context.command(|commands| commands.request("-2".to_owned(), provider.clone()).take());
429
430 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
431 assert!(recipient.response.take().available().is_some());
432 assert!(context.no_unhandled_errors());
433
434 let outcome: FormatOutcome = recipient.into();
435 assert!(outcome.stream_u32.is_empty());
436 assert_eq!(outcome.stream_i32, [-2]);
437 assert_eq!(outcome.stream_f32, [-2.0]);
438
439 let mut recipient =
440 context.command(|commands| commands.request("6.7".to_owned(), provider.clone()).take());
441
442 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
443 assert!(recipient.response.take().available().is_some());
444 assert!(context.no_unhandled_errors());
445
446 let outcome: FormatOutcome = recipient.into();
447 assert!(outcome.stream_u32.is_empty());
448 assert!(outcome.stream_i32.is_empty());
449 assert_eq!(outcome.stream_f32, [6.7]);
450
451 let mut recipient = context.command(|commands| {
452 commands
453 .request("hello".to_owned(), provider.clone())
454 .take()
455 });
456
457 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
458 assert!(recipient.response.take().available().is_some());
459 assert!(context.no_unhandled_errors());
460
461 let outcome: FormatOutcome = recipient.into();
462 assert!(outcome.stream_u32.is_empty());
463 assert!(outcome.stream_i32.is_empty());
464 assert!(outcome.stream_f32.is_empty());
465 }
466
467 #[derive(Default)]
468 struct FormatOutcome {
469 stream_u32: Vec<u32>,
470 stream_i32: Vec<i32>,
471 stream_f32: Vec<f32>,
472 }
473
474 impl From<Recipient<(), FormatStreams>> for FormatOutcome {
475 fn from(mut recipient: Recipient<(), FormatStreams>) -> Self {
476 let mut result = Self::default();
477 while let Ok(r) = recipient.streams.0.try_recv() {
478 result.stream_u32.push(r);
479 }
480
481 while let Ok(r) = recipient.streams.1.try_recv() {
482 result.stream_i32.push(r);
483 }
484
485 while let Ok(r) = recipient.streams.2.try_recv() {
486 result.stream_f32.push(r);
487 }
488
489 result
490 }
491 }
492
493 #[test]
494 fn test_stream_pack() {
495 let mut context = TestingContext::minimal_plugins();
496
497 let parse_blocking_srv = context.command(|commands| {
498 commands.spawn_service(
499 |In(input): BlockingServiceInput<Vec<String>, TestStreamPack>| {
500 impl_stream_pack_test_blocking(input.request, input.streams);
501 },
502 )
503 });
504
505 validate_stream_pack(parse_blocking_srv, &mut context);
506
507 let parse_async_srv = context.command(|commands| {
508 commands.spawn_service(
509 |In(input): AsyncServiceInput<Vec<String>, TestStreamPack>| async move {
510 impl_stream_pack_test_async(input.request, input.streams);
511 },
512 )
513 });
514
515 validate_stream_pack(parse_async_srv, &mut context);
516
517 let parse_continuous_srv = context
518 .app
519 .spawn_continuous_service(Update, impl_stream_pack_test_continuous);
520
521 validate_stream_pack(parse_continuous_srv, &mut context);
522
523 let parse_blocking_callback =
524 (|In(input): BlockingCallbackInput<Vec<String>, TestStreamPack>| {
525 impl_stream_pack_test_blocking(input.request, input.streams);
526 })
527 .as_callback();
528
529 validate_stream_pack(parse_blocking_callback, &mut context);
530
531 let parse_async_callback =
532 (|In(input): AsyncCallbackInput<Vec<String>, TestStreamPack>| async move {
533 impl_stream_pack_test_async(input.request, input.streams);
534 })
535 .as_callback();
536
537 validate_stream_pack(parse_async_callback, &mut context);
538
539 let parse_blocking_map = (|input: BlockingMap<Vec<String>, TestStreamPack>| {
540 impl_stream_pack_test_blocking(input.request, input.streams);
541 })
542 .as_map();
543
544 validate_stream_pack(parse_blocking_map, &mut context);
545
546 let parse_async_map = (|input: AsyncMap<Vec<String>, TestStreamPack>| async move {
547 impl_stream_pack_test_async(input.request, input.streams);
548 })
549 .as_map();
550
551 validate_stream_pack(parse_async_map, &mut context);
552
553 let make_workflow = |service: Service<Vec<String>, (), TestStreamPack>| {
554 move |scope: Scope<Vec<String>, (), TestStreamPack>, builder: &mut Builder| {
555 let node = scope
556 .input
557 .chain(builder)
558 .map_block(move |value| (value, service.into()))
559 .then_injection_node();
560
561 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
562 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
563 builder.connect(node.streams.stream_string, scope.streams.stream_string);
564
565 builder.connect(node.output, scope.terminate);
566 }
567 };
568
569 let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
570 validate_stream_pack(blocking_injection_workflow, &mut context);
571
572 let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
573 validate_stream_pack(async_injection_workflow, &mut context);
574
575 let continuous_injection_workflow =
576 context.spawn_workflow(make_workflow(parse_continuous_srv));
577 validate_stream_pack(continuous_injection_workflow, &mut context);
578
579 let nested_workflow =
580 context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
581 let node = scope.input.chain(builder).then_node(parse_continuous_srv);
582
583 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
584 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
585 builder.connect(node.streams.stream_string, scope.streams.stream_string);
586
587 builder.connect(node.output, scope.terminate);
588 });
589 validate_stream_pack(nested_workflow, &mut context);
590
591 let double_nested_workflow =
592 context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
593 let node = scope.input.chain(builder).then_node(nested_workflow);
594
595 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
596 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
597 builder.connect(node.streams.stream_string, scope.streams.stream_string);
598
599 builder.connect(node.output, scope.terminate);
600 });
601 validate_stream_pack(double_nested_workflow, &mut context);
602
603 let scoped_workflow =
604 context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
605 let inner_scope =
606 builder.create_scope::<_, _, TestStreamPack, _>(|scope, builder| {
607 let node = scope.input.chain(builder).then_node(parse_continuous_srv);
608
609 builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
610 builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
611 builder.connect(node.streams.stream_string, scope.streams.stream_string);
612
613 builder.connect(node.output, scope.terminate);
614 });
615
616 builder.connect(scope.input, inner_scope.input);
617
618 builder.connect(inner_scope.streams.stream_u32, scope.streams.stream_u32);
619 builder.connect(inner_scope.streams.stream_i32, scope.streams.stream_i32);
620 builder.connect(
621 inner_scope.streams.stream_string,
622 scope.streams.stream_string,
623 );
624
625 builder.connect(inner_scope.output, scope.terminate);
626 });
627 validate_stream_pack(scoped_workflow, &mut context);
628
629 let dyn_stream_workflow =
630 context.spawn_workflow::<Vec<String>, (), TestStreamPack, _>(|scope, builder| {
631 let dyn_scope_input: DynOutput = scope.input.into();
632
633 let node = builder.create_node(parse_continuous_srv);
634 let mut dyn_node: DynNode = node.into();
635
636 dyn_scope_input
637 .connect_to(&dyn_node.input, builder)
638 .unwrap();
639
640 let dyn_scope_stream_u32: DynInputSlot = scope.streams.stream_u32.into();
641 let dyn_node_stream_u32 = dyn_node.streams.take_named("stream_u32").unwrap();
642 dyn_node_stream_u32
643 .connect_to(&dyn_scope_stream_u32, builder)
644 .unwrap();
645
646 let dyn_scope_stream_i32: DynInputSlot = scope.streams.stream_i32.into();
647 let dyn_node_stream_i32 = dyn_node.streams.take_named("stream_i32").unwrap();
648 dyn_node_stream_i32
649 .connect_to(&dyn_scope_stream_i32, builder)
650 .unwrap();
651
652 let dyn_scope_stream_string: DynInputSlot = scope.streams.stream_string.into();
653 let dyn_node_stream_string = dyn_node.streams.take_named("stream_string").unwrap();
654 dyn_node_stream_string
655 .connect_to(&dyn_scope_stream_string, builder)
656 .unwrap();
657
658 let terminate: DynInputSlot = scope.terminate.into();
659 dyn_node.output.connect_to(&terminate, builder).unwrap();
660 });
661 validate_stream_pack(dyn_stream_workflow, &mut context);
662
663 validate_dynamically_named_stream_receiver(parse_blocking_srv, &mut context);
666 validate_dynamically_named_stream_receiver(parse_async_srv, &mut context);
667 validate_dynamically_named_stream_receiver(parse_continuous_srv, &mut context);
668 validate_dynamically_named_stream_receiver(blocking_injection_workflow, &mut context);
669 validate_dynamically_named_stream_receiver(async_injection_workflow, &mut context);
670 validate_dynamically_named_stream_receiver(continuous_injection_workflow, &mut context);
671 validate_dynamically_named_stream_receiver(nested_workflow, &mut context);
672 validate_dynamically_named_stream_receiver(double_nested_workflow, &mut context);
673 }
674
675 fn validate_stream_pack(
676 provider: impl Provider<Request = Vec<String>, Response = (), Streams = TestStreamPack> + Clone,
677 context: &mut TestingContext,
678 ) {
679 let request = vec![
680 "5".to_owned(),
681 "10".to_owned(),
682 "-3".to_owned(),
683 "-27".to_owned(),
684 "hello".to_owned(),
685 ];
686
687 let mut recipient =
688 context.command(|commands| commands.request(request, provider.clone()).take());
689
690 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
691 assert!(recipient.response.take().available().is_some());
692 assert!(
693 context.no_unhandled_errors(),
694 "{:#?}",
695 context.get_unhandled_errors()
696 );
697
698 let outcome: StreamMapOutcome = recipient.into();
699 assert_eq!(outcome.stream_u32, [5, 10]);
700 assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
701 assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
702
703 let request = vec![];
704
705 let mut recipient =
706 context.command(|commands| commands.request(request, provider.clone()).take());
707
708 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
709 assert!(recipient.response.take().available().is_some());
710 assert!(
711 context.no_unhandled_errors(),
712 "{:#?}",
713 context.get_unhandled_errors()
714 );
715
716 let outcome: StreamMapOutcome = recipient.into();
717 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
718 assert_eq!(outcome.stream_i32, Vec::<i32>::new());
719 assert_eq!(outcome.stream_string, Vec::<String>::new());
720
721 let request = vec![
722 "foo".to_string(),
723 "bar".to_string(),
724 "1.32".to_string(),
725 "-8".to_string(),
726 ];
727
728 let mut recipient =
729 context.command(|commands| commands.request(request, provider.clone()).take());
730
731 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
732 assert!(recipient.response.take().available().is_some());
733
734 let outcome: StreamMapOutcome = recipient.into();
735 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
736 assert_eq!(outcome.stream_i32, [-8]);
737 assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
738 }
739
740 fn validate_dynamically_named_stream_receiver(
741 provider: Service<Vec<String>, (), TestStreamPack>,
742 context: &mut TestingContext,
743 ) {
744 let provider: Service<Vec<String>, (), TestDynamicNamedStreams> =
745 provider.optional_stream_cast();
746
747 let request = vec![
748 "5".to_owned(),
749 "10".to_owned(),
750 "-3".to_owned(),
751 "-27".to_owned(),
752 "hello".to_owned(),
753 ];
754
755 let mut recipient =
756 context.command(|commands| commands.request(request, provider.clone()).take());
757
758 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
759 assert!(recipient.response.take().available().is_some());
760 assert!(
761 context.no_unhandled_errors(),
762 "{:#?}",
763 context.get_unhandled_errors()
764 );
765
766 let outcome: StreamMapOutcome = recipient.try_into().unwrap();
767 assert_eq!(outcome.stream_u32, [5, 10]);
768 assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
769 assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
770
771 let request = vec![];
772
773 let mut recipient =
774 context.command(|commands| commands.request(request, provider.clone()).take());
775
776 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
777 assert!(recipient.response.take().available().is_some());
778 assert!(
779 context.no_unhandled_errors(),
780 "{:#?}",
781 context.get_unhandled_errors()
782 );
783
784 let outcome: StreamMapOutcome = recipient.try_into().unwrap();
785 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
786 assert_eq!(outcome.stream_i32, Vec::<i32>::new());
787 assert_eq!(outcome.stream_string, Vec::<String>::new());
788
789 let request = vec![
790 "foo".to_string(),
791 "bar".to_string(),
792 "1.32".to_string(),
793 "-8".to_string(),
794 ];
795
796 let mut recipient =
797 context.command(|commands| commands.request(request, provider.clone()).take());
798
799 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
800 assert!(recipient.response.take().available().is_some());
801
802 let outcome: StreamMapOutcome = recipient.try_into().unwrap();
803 assert_eq!(outcome.stream_u32, Vec::<u32>::new());
804 assert_eq!(outcome.stream_i32, [-8]);
805 assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
806 }
807
808 fn impl_stream_pack_test_blocking(
809 request: Vec<String>,
810 streams: <TestStreamPack as StreamPack>::StreamBuffers,
811 ) {
812 for r in request {
813 if let Ok(value) = r.parse::<u32>() {
814 streams.stream_u32.send(value);
815 }
816
817 if let Ok(value) = r.parse::<i32>() {
818 streams.stream_i32.send(value);
819 }
820
821 streams.stream_string.send(r);
822 }
823 }
824
825 fn impl_stream_pack_test_async(
826 request: Vec<String>,
827 streams: <TestStreamPack as StreamPack>::StreamChannels,
828 ) {
829 for r in request {
830 if let Ok(value) = r.parse::<u32>() {
831 streams.stream_u32.send(value);
832 }
833
834 if let Ok(value) = r.parse::<i32>() {
835 streams.stream_i32.send(value);
836 }
837
838 streams.stream_string.send(r);
839 }
840 }
841
842 fn impl_stream_pack_test_continuous(
843 In(ContinuousService { key }): In<ContinuousService<Vec<String>, (), TestStreamPack>>,
844 mut param: ContinuousQuery<Vec<String>, (), TestStreamPack>,
845 ) {
846 param.get_mut(&key).unwrap().for_each(|order| {
847 for r in order.request().clone() {
848 if let Ok(value) = r.parse::<u32>() {
849 order.streams().stream_u32.send(value);
850 }
851
852 if let Ok(value) = r.parse::<i32>() {
853 order.streams().stream_i32.send(value);
854 }
855
856 order.streams().stream_string.send(r);
857 }
858
859 order.respond(());
860 });
861 }
862
863 #[derive(Default)]
864 struct StreamMapOutcome {
865 stream_u32: Vec<u32>,
866 stream_i32: Vec<i32>,
867 stream_string: Vec<String>,
868 }
869
870 impl From<Recipient<(), TestStreamPack>> for StreamMapOutcome {
871 fn from(mut recipient: Recipient<(), TestStreamPack>) -> Self {
872 let mut result = Self::default();
873 while let Ok(r) = recipient.streams.stream_u32.try_recv() {
874 result.stream_u32.push(r);
875 }
876
877 while let Ok(r) = recipient.streams.stream_i32.try_recv() {
878 result.stream_i32.push(r);
879 }
880
881 while let Ok(r) = recipient.streams.stream_string.try_recv() {
882 result.stream_string.push(r);
883 }
884
885 result
886 }
887 }
888
889 type TestDynamicNamedStreams = (
890 DynamicallyNamedStream<StreamOf<u32>>,
891 DynamicallyNamedStream<StreamOf<i32>>,
892 DynamicallyNamedStream<StreamOf<String>>,
893 );
894
895 impl TryFrom<Recipient<(), TestDynamicNamedStreams>> for StreamMapOutcome {
896 type Error = UnknownName;
897 fn try_from(
898 mut recipient: Recipient<(), TestDynamicNamedStreams>,
899 ) -> Result<Self, Self::Error> {
900 let mut result = Self::default();
901 while let Ok(NamedValue { name, value }) = recipient.streams.0.try_recv() {
902 if name == "stream_u32" {
903 result.stream_u32.push(value);
904 } else {
905 return Err(UnknownName { name });
906 }
907 }
908
909 while let Ok(NamedValue { name, value }) = recipient.streams.1.try_recv() {
910 if name == "stream_i32" {
911 result.stream_i32.push(value);
912 } else {
913 return Err(UnknownName { name });
914 }
915 }
916
917 while let Ok(NamedValue { name, value }) = recipient.streams.2.try_recv() {
918 if name == "stream_string" {
919 result.stream_string.push(value);
920 } else {
921 return Err(UnknownName { name });
922 }
923 }
924
925 Ok(result)
926 }
927 }
928
929 #[test]
930 fn test_dynamically_named_streams() {
931 let mut context = TestingContext::minimal_plugins();
932
933 let parse_blocking_srv = context.command(|commands| {
934 commands.spawn_service(
935 |In(input): BlockingServiceInput<NamedInputs, TestDynamicNamedStreams>| {
936 impl_dynamically_named_streams_blocking(input.request, input.streams);
937 },
938 )
939 });
940
941 validate_dynamically_named_streams(parse_blocking_srv, &mut context);
942
943 let parse_async_srv = context.command(|commands| {
944 commands.spawn_service(
945 |In(input): AsyncServiceInput<NamedInputs, TestDynamicNamedStreams>| async move {
946 impl_dynamically_named_streams_async(input.request, input.streams);
947 },
948 )
949 });
950
951 validate_dynamically_named_streams(parse_async_srv, &mut context);
952
953 let parse_continuous_srv = context
954 .app
955 .spawn_continuous_service(Update, impl_dynamically_named_streams_continuous);
956
957 validate_dynamically_named_streams(parse_continuous_srv, &mut context);
958
959 let parse_blocking_callback =
960 (|In(input): BlockingCallbackInput<NamedInputs, TestDynamicNamedStreams>| {
961 impl_dynamically_named_streams_blocking(input.request, input.streams);
962 })
963 .as_callback();
964
965 validate_dynamically_named_streams(parse_blocking_callback, &mut context);
966
967 let parse_async_callback =
968 (|In(input): AsyncCallbackInput<NamedInputs, TestDynamicNamedStreams>| async move {
969 impl_dynamically_named_streams_async(input.request, input.streams);
970 })
971 .as_callback();
972
973 validate_dynamically_named_streams(parse_async_callback, &mut context);
974
975 let parse_blocking_map = (|input: BlockingMap<NamedInputs, TestDynamicNamedStreams>| {
976 impl_dynamically_named_streams_blocking(input.request, input.streams);
977 })
978 .as_map();
979
980 validate_dynamically_named_streams(parse_blocking_map, &mut context);
981
982 let parse_async_map = (|input: AsyncMap<NamedInputs, TestDynamicNamedStreams>| async move {
983 impl_dynamically_named_streams_async(input.request, input.streams);
984 })
985 .as_map();
986
987 validate_dynamically_named_streams(parse_async_map, &mut context);
988
989 let make_workflow = |service: Service<NamedInputs, (), TestDynamicNamedStreams>| {
990 move |scope: Scope<NamedInputs, (), TestDynamicNamedStreams>, builder: &mut Builder| {
991 let node = scope
992 .input
993 .chain(builder)
994 .map_block(move |value| (value, service.into()))
995 .then_injection_node();
996
997 builder.connect(node.streams.0, scope.streams.0);
998 builder.connect(node.streams.1, scope.streams.1);
999 builder.connect(node.streams.2, scope.streams.2);
1000
1001 builder.connect(node.output, scope.terminate);
1002 }
1003 };
1004
1005 let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
1006 validate_dynamically_named_streams(blocking_injection_workflow, &mut context);
1007
1008 let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
1009 validate_dynamically_named_streams(async_injection_workflow, &mut context);
1010
1011 let continuous_injection_workflow =
1012 context.spawn_workflow(make_workflow(parse_continuous_srv));
1013 validate_dynamically_named_streams(continuous_injection_workflow, &mut context);
1014
1015 let nested_workflow =
1016 context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1017 let node = scope.input.chain(builder).then_node(parse_continuous_srv);
1018
1019 builder.connect(node.streams.0, scope.streams.0);
1020 builder.connect(node.streams.1, scope.streams.1);
1021 builder.connect(node.streams.2, scope.streams.2);
1022
1023 builder.connect(node.output, scope.terminate);
1024 });
1025 validate_dynamically_named_streams(nested_workflow, &mut context);
1026
1027 let double_nested_workflow =
1028 context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1029 let node = scope.input.chain(builder).then_node(nested_workflow);
1030
1031 builder.connect(node.streams.0, scope.streams.0);
1032 builder.connect(node.streams.1, scope.streams.1);
1033 builder.connect(node.streams.2, scope.streams.2);
1034
1035 builder.connect(node.output, scope.terminate);
1036 });
1037 validate_dynamically_named_streams(double_nested_workflow, &mut context);
1038
1039 let scoped_workflow =
1040 context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1041 let inner_scope =
1042 builder.create_scope::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1043 let node = scope.input.chain(builder).then_node(parse_continuous_srv);
1044
1045 builder.connect(node.streams.0, scope.streams.0);
1046 builder.connect(node.streams.1, scope.streams.1);
1047 builder.connect(node.streams.2, scope.streams.2);
1048
1049 builder.connect(node.output, scope.terminate);
1050 });
1051
1052 builder.connect(scope.input, inner_scope.input);
1053
1054 builder.connect(inner_scope.streams.0, scope.streams.0);
1055 builder.connect(inner_scope.streams.1, scope.streams.1);
1056 builder.connect(inner_scope.streams.2, scope.streams.2);
1057
1058 builder.connect(inner_scope.output, scope.terminate);
1059 });
1060 validate_dynamically_named_streams(scoped_workflow, &mut context);
1061
1062 validate_dynamically_named_streams_into_stream_pack(parse_blocking_srv, &mut context);
1065 validate_dynamically_named_streams_into_stream_pack(parse_async_srv, &mut context);
1066 validate_dynamically_named_streams_into_stream_pack(parse_continuous_srv, &mut context);
1067 validate_dynamically_named_streams_into_stream_pack(
1068 blocking_injection_workflow,
1069 &mut context,
1070 );
1071 validate_dynamically_named_streams_into_stream_pack(async_injection_workflow, &mut context);
1072 validate_dynamically_named_streams_into_stream_pack(
1073 continuous_injection_workflow,
1074 &mut context,
1075 );
1076 validate_dynamically_named_streams_into_stream_pack(nested_workflow, &mut context);
1077 validate_dynamically_named_streams_into_stream_pack(double_nested_workflow, &mut context);
1078 }
1079
1080 fn validate_dynamically_named_streams(
1081 provider: impl Provider<Request = NamedInputs, Response = (), Streams = TestDynamicNamedStreams>
1082 + Clone,
1083 context: &mut TestingContext,
1084 ) {
1085 let expected_values_u32 = vec![
1086 NamedValue::new("stream_u32", 5),
1087 NamedValue::new("stream_u32", 10),
1088 NamedValue::new("stream_i32", 12),
1089 ];
1090
1091 let expected_values_i32 = vec![
1092 NamedValue::new("stream_i32", 2),
1093 NamedValue::new("stream_i32", -5),
1094 NamedValue::new("stream_u32", 7),
1095 ];
1096
1097 let expected_values_string = vec![
1098 NamedValue::new("stream_string", "hello".to_owned()),
1099 NamedValue::new("stream_string", "8".to_owned()),
1100 NamedValue::new("stream_u32", "22".to_owned()),
1101 ];
1102
1103 let request = NamedInputs {
1104 values_u32: expected_values_u32.clone(),
1105 values_i32: expected_values_i32.clone(),
1106 values_string: expected_values_string.clone(),
1107 };
1108
1109 let mut recipient = context.command(|commands| commands.request(request, provider).take());
1110
1111 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
1112 assert!(recipient.response.take().available().is_some());
1113 assert!(context.no_unhandled_errors());
1114
1115 let received_values_u32 = collect_received_values(recipient.streams.0);
1116 assert_eq!(expected_values_u32, received_values_u32);
1117
1118 let received_values_i32 = collect_received_values(recipient.streams.1);
1119 assert_eq!(expected_values_i32, received_values_i32);
1120
1121 let received_values_string = collect_received_values(recipient.streams.2);
1122 assert_eq!(expected_values_string, received_values_string);
1123 }
1124
1125 pub fn collect_received_values<T>(mut receiver: crate::Receiver<T>) -> Vec<T> {
1126 let mut result = Vec::new();
1127 while let Ok(value) = receiver.try_recv() {
1128 result.push(value);
1129 }
1130 result
1131 }
1132
1133 fn validate_dynamically_named_streams_into_stream_pack(
1134 provider: Service<NamedInputs, (), TestDynamicNamedStreams>,
1135 context: &mut TestingContext,
1136 ) {
1137 let provider: Service<NamedInputs, (), TestStreamPack> = provider.optional_stream_cast();
1138
1139 let request = NamedInputs {
1140 values_u32: vec![
1141 NamedValue::new("stream_u32", 5),
1142 NamedValue::new("stream_u32", 10),
1143 NamedValue::new("stream_i32", 12),
1146 ],
1147 values_i32: vec![
1148 NamedValue::new("stream_i32", 2),
1149 NamedValue::new("stream_i32", -5),
1150 NamedValue::new("stream_u32", 7),
1153 ],
1154 values_string: vec![
1155 NamedValue::new("stream_string", "hello".to_owned()),
1156 NamedValue::new("stream_string", "8".to_owned()),
1157 NamedValue::new("stream_u32", "22".to_owned()),
1160 ],
1161 };
1162
1163 let mut recipient = context.command(|commands| commands.request(request, provider).take());
1164
1165 context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
1166 assert!(recipient.response.take().available().is_some());
1167 assert!(context.no_unhandled_errors());
1168
1169 let outcome: StreamMapOutcome = recipient.try_into().unwrap();
1170 assert_eq!(outcome.stream_u32, [5, 10]);
1171 assert_eq!(outcome.stream_i32, [2, -5]);
1172 assert_eq!(outcome.stream_string, ["hello", "8"]);
1173 }
1174
1175 fn impl_dynamically_named_streams_blocking(
1176 request: NamedInputs,
1177 streams: <TestDynamicNamedStreams as StreamPack>::StreamBuffers,
1178 ) {
1179 for nv in request.values_u32 {
1180 streams.0.send(nv);
1181 }
1182
1183 for nv in request.values_i32 {
1184 streams.1.send(nv);
1185 }
1186
1187 for nv in request.values_string {
1188 streams.2.send(nv);
1189 }
1190 }
1191
1192 fn impl_dynamically_named_streams_async(
1193 request: NamedInputs,
1194 streams: <TestDynamicNamedStreams as StreamPack>::StreamChannels,
1195 ) {
1196 for nv in request.values_u32 {
1197 streams.0.send(nv);
1198 }
1199
1200 for nv in request.values_i32 {
1201 streams.1.send(nv);
1202 }
1203
1204 for nv in request.values_string {
1205 streams.2.send(nv);
1206 }
1207 }
1208
1209 fn impl_dynamically_named_streams_continuous(
1210 In(ContinuousService { key }): In<
1211 ContinuousService<NamedInputs, (), TestDynamicNamedStreams>,
1212 >,
1213 mut param: ContinuousQuery<NamedInputs, (), TestDynamicNamedStreams>,
1214 ) {
1215 param.get_mut(&key).unwrap().for_each(|order| {
1216 for nv in order.request().values_u32.iter() {
1217 order.streams().0.send(nv.clone());
1218 }
1219
1220 for nv in order.request().values_i32.iter() {
1221 order.streams().1.send(nv.clone());
1222 }
1223
1224 for nv in order.request().values_string.iter() {
1225 order.streams().2.send(nv.clone());
1226 }
1227
1228 order.respond(());
1229 });
1230 }
1231
1232 struct NamedInputs {
1233 values_u32: Vec<NamedValue<u32>>,
1234 values_i32: Vec<NamedValue<i32>>,
1235 values_string: Vec<NamedValue<String>>,
1236 }
1237
1238 #[derive(thiserror::Error, Debug)]
1239 #[error("received unknown name: {name}")]
1240 struct UnknownName {
1241 name: Cow<'static, str>,
1242 }
1243
1244 #[derive(StreamPack)]
1245 pub(crate) struct TestStreamPack {
1246 stream_u32: u32,
1247 stream_i32: i32,
1248 stream_string: String,
1249 }
1250}