crossflow/
stream.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::prelude::{Entity, World};
19
20use std::sync::OnceLock;
21
22use crate::{ManageInput, OperationError, OperationResult, OperationRoster, OrBroken};
23
24mod anonymous_stream;
25pub use anonymous_stream::*;
26
27mod dynamically_named_stream;
28pub use dynamically_named_stream::*;
29
30mod named_stream;
31pub use named_stream::*;
32
33mod stream_availability;
34pub use stream_availability::*;
35
36mod stream_buffer;
37pub use stream_buffer::*;
38
39mod stream_channel;
40pub use stream_channel::*;
41
42mod stream_filter;
43pub use stream_filter::*;
44
45mod stream_pack;
46pub use stream_pack::*;
47
48mod stream_target_map;
49pub use stream_target_map::*;
50
51// TODO(@mxgrey): Add module-level documentation for stream.rs
52
53pub use crossflow_derive::{Stream, StreamPack};
54/// You can create custom stream types that have side-effects, such as:
55/// - applying a transformation to the stream input to produce a different type of output
56/// - logging the message data
57/// - triggering an event or modifying a resource/component in the [`World`]
58///
59/// After you have implemented `StreamEffect` for your struct, you can apply
60/// `#[derive(Stream)]` to the struct and then use as a [`StreamPack`], either
61/// on its own or in a tuple.
62///
63/// If you just want to stream a message with no side-effects, you can simply
64/// wrap your message type in [`StreamOf`] to get a [`StreamPack`]. Users only
65/// need to use `StreamEffect` if you want custom stream side-effects.
66pub trait StreamEffect: 'static + Send + Sync + Sized {
67    type Input: 'static + Send + Sync;
68    type Output: 'static + Send + Sync;
69
70    /// Specify a side effect that is meant to happen whenever a stream value is
71    /// sent.
72    fn side_effect(
73        input: Self::Input,
74        request: &mut StreamRequest,
75    ) -> Result<Self::Output, OperationError>;
76}
77
78/// A wrapper to make an anonymous (unnamed) stream for any type `T` that
79/// implements `'static + Send + Sync`. This simply transmits the `T` with no
80/// side-effects.
81#[derive(Stream)]
82pub struct StreamOf<T: 'static + Send + Sync>(std::marker::PhantomData<fn(T)>);
83
84impl<T: 'static + Send + Sync> Clone for StreamOf<T> {
85    fn clone(&self) -> Self {
86        *self
87    }
88}
89
90impl<T: 'static + Send + Sync> Copy for StreamOf<T> {}
91
92impl<T: 'static + Send + Sync> std::fmt::Debug for StreamOf<T> {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        static NAME: OnceLock<String> = OnceLock::new();
95        let name = NAME.get_or_init(|| format!("StreamOf<{}>", std::any::type_name::<T>(),));
96
97        f.debug_struct(name.as_str()).finish()
98    }
99}
100
101impl<T: 'static + Send + Sync> StreamEffect for StreamOf<T> {
102    type Input = T;
103    type Output = T;
104
105    /// `StreamOf` has no side-effect
106    fn side_effect(
107        input: Self::Input,
108        _: &mut StreamRequest,
109    ) -> Result<Self::Output, OperationError> {
110        Ok(input)
111    }
112}
113
114/// `StreamRequest` is provided to types that implement [`StreamEffect`] so they
115/// can process a stream input and apply any side effects to it. Note that your
116/// implementation of [`StreamEffect::side_effect`] should return the output
117/// data; it should *not* use the `StreamRequest` send it the output to the target.
118pub struct StreamRequest<'a> {
119    /// The node that emitted the stream
120    pub source: Entity,
121    /// The session of the stream
122    pub session: Entity,
123    /// The target of the stream, if a specific target exists.
124    pub target: Option<Entity>,
125    /// The world that the stream exists inside
126    pub world: &'a mut World,
127    /// The roster of the stream
128    pub roster: &'a mut OperationRoster,
129}
130
131impl<'a> StreamRequest<'a> {
132    pub fn send_output<T: 'static + Send + Sync>(self, output: T) -> OperationResult {
133        let Self {
134            session,
135            target,
136            world,
137            roster,
138            ..
139        } = self;
140        if let Some(target) = target {
141            world
142                .get_entity_mut(target)
143                .or_broken()?
144                .give_input(session, output, roster)?;
145        }
146
147        Ok(())
148    }
149}
150
151#[cfg(test)]
152pub(crate) mod tests {
153    use crate::{dyn_node::*, prelude::*, testing::*};
154    use std::borrow::Cow;
155
156    #[test]
157    fn test_single_stream() {
158        let mut context = TestingContext::minimal_plugins();
159
160        let count_blocking_srv = context.command(|commands| {
161            commands.spawn_service(|In(input): BlockingServiceInput<u32, StreamOf<u32>>| {
162                for i in 0..input.request {
163                    input.streams.send(i);
164                }
165                return input.request;
166            })
167        });
168
169        test_counting_stream(count_blocking_srv, &mut context);
170
171        let count_async_srv = context.command(|commands| {
172            commands.spawn_service(
173                |In(input): AsyncServiceInput<u32, StreamOf<u32>>| async move {
174                    for i in 0..input.request {
175                        input.streams.send(i);
176                    }
177                    return input.request;
178                },
179            )
180        });
181
182        test_counting_stream(count_async_srv, &mut context);
183
184        let count_blocking_callback = (|In(input): BlockingCallbackInput<u32, StreamOf<u32>>| {
185            for i in 0..input.request {
186                input.streams.send(i);
187            }
188            return input.request;
189        })
190        .as_callback();
191
192        test_counting_stream(count_blocking_callback, &mut context);
193
194        let count_async_callback =
195            (|In(input): AsyncCallbackInput<u32, StreamOf<u32>>| async move {
196                for i in 0..input.request {
197                    input.streams.send(i);
198                }
199                return input.request;
200            })
201            .as_callback();
202
203        test_counting_stream(count_async_callback, &mut context);
204
205        let count_blocking_map = (|input: BlockingMap<u32, StreamOf<u32>>| {
206            for i in 0..input.request {
207                input.streams.send(i);
208            }
209            return input.request;
210        })
211        .as_map();
212
213        test_counting_stream(count_blocking_map, &mut context);
214
215        let count_async_map = (|input: AsyncMap<u32, StreamOf<u32>>| async move {
216            for i in 0..input.request {
217                input.streams.send(i);
218            }
219            return input.request;
220        })
221        .as_map();
222
223        test_counting_stream(count_async_map, &mut context);
224    }
225
226    fn test_counting_stream(
227        provider: impl Provider<Request = u32, Response = u32, Streams = StreamOf<u32>>,
228        context: &mut TestingContext,
229    ) {
230        let mut recipient = context.command(|commands| commands.request(10, provider).take());
231
232        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
233        assert!(recipient
234            .response
235            .take()
236            .available()
237            .is_some_and(|v| v == 10));
238
239        let mut stream: Vec<u32> = Vec::new();
240        while let Ok(r) = recipient.streams.try_recv() {
241            stream.push(r);
242        }
243        assert_eq!(stream, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
244        assert!(context.no_unhandled_errors());
245    }
246
247    type FormatStreams = (StreamOf<u32>, StreamOf<i32>, StreamOf<f32>);
248    #[test]
249    fn test_tuple_stream() {
250        let mut context = TestingContext::minimal_plugins();
251
252        let parse_blocking_srv = context.command(|commands| {
253            commands.spawn_service(|In(input): BlockingServiceInput<String, FormatStreams>| {
254                impl_formatting_streams_blocking(input.request, input.streams);
255            })
256        });
257
258        validate_formatting_stream(parse_blocking_srv, &mut context);
259
260        let parse_async_srv = context.command(|commands| {
261            commands.spawn_service(
262                |In(input): AsyncServiceInput<String, FormatStreams>| async move {
263                    impl_formatting_streams_async(input.request, input.streams);
264                },
265            )
266        });
267
268        validate_formatting_stream(parse_async_srv, &mut context);
269
270        let parse_continuous_srv = context
271            .app
272            .spawn_continuous_service(Update, impl_formatting_streams_continuous);
273
274        validate_formatting_stream(parse_continuous_srv, &mut context);
275
276        let parse_blocking_callback =
277            (|In(input): BlockingCallbackInput<String, FormatStreams>| {
278                impl_formatting_streams_blocking(input.request, input.streams);
279            })
280            .as_callback();
281
282        validate_formatting_stream(parse_blocking_callback, &mut context);
283
284        let parse_async_callback =
285            (|In(input): AsyncCallbackInput<String, FormatStreams>| async move {
286                impl_formatting_streams_async(input.request, input.streams);
287            })
288            .as_callback();
289
290        validate_formatting_stream(parse_async_callback, &mut context);
291
292        let parse_blocking_map = (|input: BlockingMap<String, FormatStreams>| {
293            impl_formatting_streams_blocking(input.request, input.streams);
294        })
295        .as_map();
296
297        validate_formatting_stream(parse_blocking_map, &mut context);
298
299        let parse_async_map = (|input: AsyncMap<String, FormatStreams>| async move {
300            impl_formatting_streams_async(input.request, input.streams);
301        })
302        .as_map();
303
304        validate_formatting_stream(parse_async_map, &mut context);
305
306        let make_workflow = |service: Service<String, (), FormatStreams>| {
307            move |scope: Scope<String, (), FormatStreams>, builder: &mut Builder| {
308                let node = scope
309                    .input
310                    .chain(builder)
311                    .map_block(move |value| (value, service.into()))
312                    .then_injection_node();
313
314                builder.connect(node.streams.0, scope.streams.0);
315                builder.connect(node.streams.1, scope.streams.1);
316                builder.connect(node.streams.2, scope.streams.2);
317
318                builder.connect(node.output, scope.terminate);
319            }
320        };
321
322        let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
323        validate_formatting_stream(blocking_injection_workflow, &mut context);
324
325        let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
326        validate_formatting_stream(async_injection_workflow, &mut context);
327
328        let continuous_injection_workflow =
329            context.spawn_workflow(make_workflow(parse_continuous_srv));
330        validate_formatting_stream(continuous_injection_workflow, &mut context);
331
332        let nested_workflow = context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
333            let inner_node = scope
334                .input
335                .chain(builder)
336                .then_node(continuous_injection_workflow);
337            builder.connect(inner_node.streams.0, scope.streams.0);
338            builder.connect(inner_node.streams.1, scope.streams.1);
339            builder.connect(inner_node.streams.2, scope.streams.2);
340            builder.connect(inner_node.output, scope.terminate);
341        });
342        validate_formatting_stream(nested_workflow, &mut context);
343
344        let double_nested_workflow =
345            context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
346                let inner_node = builder.create_node(nested_workflow);
347                builder.connect(scope.input, inner_node.input);
348                builder.connect(inner_node.streams.0, scope.streams.0);
349                builder.connect(inner_node.streams.1, scope.streams.1);
350                builder.connect(inner_node.streams.2, scope.streams.2);
351                builder.connect(inner_node.output, scope.terminate);
352            });
353        validate_formatting_stream(double_nested_workflow, &mut context);
354    }
355
356    fn impl_formatting_streams_blocking(
357        request: String,
358        streams: <FormatStreams as StreamPack>::StreamBuffers,
359    ) {
360        if let Ok(value) = request.parse::<u32>() {
361            streams.0.send(value);
362        }
363
364        if let Ok(value) = request.parse::<i32>() {
365            streams.1.send(value);
366        }
367
368        if let Ok(value) = request.parse::<f32>() {
369            streams.2.send(value);
370        }
371    }
372
373    fn impl_formatting_streams_async(
374        request: String,
375        streams: <FormatStreams as StreamPack>::StreamChannels,
376    ) {
377        if let Ok(value) = request.parse::<u32>() {
378            streams.0.send(value);
379        }
380
381        if let Ok(value) = request.parse::<i32>() {
382            streams.1.send(value);
383        }
384
385        if let Ok(value) = request.parse::<f32>() {
386            streams.2.send(value);
387        }
388    }
389
390    fn impl_formatting_streams_continuous(
391        In(ContinuousService { key }): In<ContinuousService<String, (), FormatStreams>>,
392        mut param: ContinuousQuery<String, (), FormatStreams>,
393    ) {
394        param.get_mut(&key).unwrap().for_each(|order| {
395            if let Ok(value) = order.request().parse::<u32>() {
396                order.streams().0.send(value);
397            }
398
399            if let Ok(value) = order.request().parse::<i32>() {
400                order.streams().1.send(value);
401            }
402
403            if let Ok(value) = order.request().parse::<f32>() {
404                order.streams().2.send(value);
405            }
406
407            order.respond(());
408        });
409    }
410
411    fn validate_formatting_stream(
412        provider: impl Provider<Request = String, Response = (), Streams = FormatStreams> + Clone,
413        context: &mut TestingContext,
414    ) {
415        let mut recipient =
416            context.command(|commands| commands.request("5".to_owned(), provider.clone()).take());
417
418        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
419        assert!(recipient.response.take().available().is_some());
420        assert!(context.no_unhandled_errors());
421
422        let outcome: FormatOutcome = recipient.into();
423        assert_eq!(outcome.stream_u32, [5]);
424        assert_eq!(outcome.stream_i32, [5]);
425        assert_eq!(outcome.stream_f32, [5.0]);
426
427        let mut recipient =
428            context.command(|commands| commands.request("-2".to_owned(), provider.clone()).take());
429
430        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
431        assert!(recipient.response.take().available().is_some());
432        assert!(context.no_unhandled_errors());
433
434        let outcome: FormatOutcome = recipient.into();
435        assert!(outcome.stream_u32.is_empty());
436        assert_eq!(outcome.stream_i32, [-2]);
437        assert_eq!(outcome.stream_f32, [-2.0]);
438
439        let mut recipient =
440            context.command(|commands| commands.request("6.7".to_owned(), provider.clone()).take());
441
442        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
443        assert!(recipient.response.take().available().is_some());
444        assert!(context.no_unhandled_errors());
445
446        let outcome: FormatOutcome = recipient.into();
447        assert!(outcome.stream_u32.is_empty());
448        assert!(outcome.stream_i32.is_empty());
449        assert_eq!(outcome.stream_f32, [6.7]);
450
451        let mut recipient = context.command(|commands| {
452            commands
453                .request("hello".to_owned(), provider.clone())
454                .take()
455        });
456
457        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
458        assert!(recipient.response.take().available().is_some());
459        assert!(context.no_unhandled_errors());
460
461        let outcome: FormatOutcome = recipient.into();
462        assert!(outcome.stream_u32.is_empty());
463        assert!(outcome.stream_i32.is_empty());
464        assert!(outcome.stream_f32.is_empty());
465    }
466
467    #[derive(Default)]
468    struct FormatOutcome {
469        stream_u32: Vec<u32>,
470        stream_i32: Vec<i32>,
471        stream_f32: Vec<f32>,
472    }
473
474    impl From<Recipient<(), FormatStreams>> for FormatOutcome {
475        fn from(mut recipient: Recipient<(), FormatStreams>) -> Self {
476            let mut result = Self::default();
477            while let Ok(r) = recipient.streams.0.try_recv() {
478                result.stream_u32.push(r);
479            }
480
481            while let Ok(r) = recipient.streams.1.try_recv() {
482                result.stream_i32.push(r);
483            }
484
485            while let Ok(r) = recipient.streams.2.try_recv() {
486                result.stream_f32.push(r);
487            }
488
489            result
490        }
491    }
492
493    #[test]
494    fn test_stream_pack() {
495        let mut context = TestingContext::minimal_plugins();
496
497        let parse_blocking_srv = context.command(|commands| {
498            commands.spawn_service(
499                |In(input): BlockingServiceInput<Vec<String>, TestStreamPack>| {
500                    impl_stream_pack_test_blocking(input.request, input.streams);
501                },
502            )
503        });
504
505        validate_stream_pack(parse_blocking_srv, &mut context);
506
507        let parse_async_srv = context.command(|commands| {
508            commands.spawn_service(
509                |In(input): AsyncServiceInput<Vec<String>, TestStreamPack>| async move {
510                    impl_stream_pack_test_async(input.request, input.streams);
511                },
512            )
513        });
514
515        validate_stream_pack(parse_async_srv, &mut context);
516
517        let parse_continuous_srv = context
518            .app
519            .spawn_continuous_service(Update, impl_stream_pack_test_continuous);
520
521        validate_stream_pack(parse_continuous_srv, &mut context);
522
523        let parse_blocking_callback =
524            (|In(input): BlockingCallbackInput<Vec<String>, TestStreamPack>| {
525                impl_stream_pack_test_blocking(input.request, input.streams);
526            })
527            .as_callback();
528
529        validate_stream_pack(parse_blocking_callback, &mut context);
530
531        let parse_async_callback =
532            (|In(input): AsyncCallbackInput<Vec<String>, TestStreamPack>| async move {
533                impl_stream_pack_test_async(input.request, input.streams);
534            })
535            .as_callback();
536
537        validate_stream_pack(parse_async_callback, &mut context);
538
539        let parse_blocking_map = (|input: BlockingMap<Vec<String>, TestStreamPack>| {
540            impl_stream_pack_test_blocking(input.request, input.streams);
541        })
542        .as_map();
543
544        validate_stream_pack(parse_blocking_map, &mut context);
545
546        let parse_async_map = (|input: AsyncMap<Vec<String>, TestStreamPack>| async move {
547            impl_stream_pack_test_async(input.request, input.streams);
548        })
549        .as_map();
550
551        validate_stream_pack(parse_async_map, &mut context);
552
553        let make_workflow = |service: Service<Vec<String>, (), TestStreamPack>| {
554            move |scope: Scope<Vec<String>, (), TestStreamPack>, builder: &mut Builder| {
555                let node = scope
556                    .input
557                    .chain(builder)
558                    .map_block(move |value| (value, service.into()))
559                    .then_injection_node();
560
561                builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
562                builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
563                builder.connect(node.streams.stream_string, scope.streams.stream_string);
564
565                builder.connect(node.output, scope.terminate);
566            }
567        };
568
569        let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
570        validate_stream_pack(blocking_injection_workflow, &mut context);
571
572        let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
573        validate_stream_pack(async_injection_workflow, &mut context);
574
575        let continuous_injection_workflow =
576            context.spawn_workflow(make_workflow(parse_continuous_srv));
577        validate_stream_pack(continuous_injection_workflow, &mut context);
578
579        let nested_workflow =
580            context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
581                let node = scope.input.chain(builder).then_node(parse_continuous_srv);
582
583                builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
584                builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
585                builder.connect(node.streams.stream_string, scope.streams.stream_string);
586
587                builder.connect(node.output, scope.terminate);
588            });
589        validate_stream_pack(nested_workflow, &mut context);
590
591        let double_nested_workflow =
592            context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
593                let node = scope.input.chain(builder).then_node(nested_workflow);
594
595                builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
596                builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
597                builder.connect(node.streams.stream_string, scope.streams.stream_string);
598
599                builder.connect(node.output, scope.terminate);
600            });
601        validate_stream_pack(double_nested_workflow, &mut context);
602
603        let scoped_workflow =
604            context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
605                let inner_scope =
606                    builder.create_scope::<_, _, TestStreamPack, _>(|scope, builder| {
607                        let node = scope.input.chain(builder).then_node(parse_continuous_srv);
608
609                        builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
610                        builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
611                        builder.connect(node.streams.stream_string, scope.streams.stream_string);
612
613                        builder.connect(node.output, scope.terminate);
614                    });
615
616                builder.connect(scope.input, inner_scope.input);
617
618                builder.connect(inner_scope.streams.stream_u32, scope.streams.stream_u32);
619                builder.connect(inner_scope.streams.stream_i32, scope.streams.stream_i32);
620                builder.connect(
621                    inner_scope.streams.stream_string,
622                    scope.streams.stream_string,
623                );
624
625                builder.connect(inner_scope.output, scope.terminate);
626            });
627        validate_stream_pack(scoped_workflow, &mut context);
628
629        let dyn_stream_workflow =
630            context.spawn_workflow::<Vec<String>, (), TestStreamPack, _>(|scope, builder| {
631                let dyn_scope_input: DynOutput = scope.input.into();
632
633                let node = builder.create_node(parse_continuous_srv);
634                let mut dyn_node: DynNode = node.into();
635
636                dyn_scope_input
637                    .connect_to(&dyn_node.input, builder)
638                    .unwrap();
639
640                let dyn_scope_stream_u32: DynInputSlot = scope.streams.stream_u32.into();
641                let dyn_node_stream_u32 = dyn_node.streams.take_named("stream_u32").unwrap();
642                dyn_node_stream_u32
643                    .connect_to(&dyn_scope_stream_u32, builder)
644                    .unwrap();
645
646                let dyn_scope_stream_i32: DynInputSlot = scope.streams.stream_i32.into();
647                let dyn_node_stream_i32 = dyn_node.streams.take_named("stream_i32").unwrap();
648                dyn_node_stream_i32
649                    .connect_to(&dyn_scope_stream_i32, builder)
650                    .unwrap();
651
652                let dyn_scope_stream_string: DynInputSlot = scope.streams.stream_string.into();
653                let dyn_node_stream_string = dyn_node.streams.take_named("stream_string").unwrap();
654                dyn_node_stream_string
655                    .connect_to(&dyn_scope_stream_string, builder)
656                    .unwrap();
657
658                let terminate: DynInputSlot = scope.terminate.into();
659                dyn_node.output.connect_to(&terminate, builder).unwrap();
660            });
661        validate_stream_pack(dyn_stream_workflow, &mut context);
662
663        // We can do a stream cast for the service-type providers but not for
664        // the callbacks or maps.
665        validate_dynamically_named_stream_receiver(parse_blocking_srv, &mut context);
666        validate_dynamically_named_stream_receiver(parse_async_srv, &mut context);
667        validate_dynamically_named_stream_receiver(parse_continuous_srv, &mut context);
668        validate_dynamically_named_stream_receiver(blocking_injection_workflow, &mut context);
669        validate_dynamically_named_stream_receiver(async_injection_workflow, &mut context);
670        validate_dynamically_named_stream_receiver(continuous_injection_workflow, &mut context);
671        validate_dynamically_named_stream_receiver(nested_workflow, &mut context);
672        validate_dynamically_named_stream_receiver(double_nested_workflow, &mut context);
673    }
674
675    fn validate_stream_pack(
676        provider: impl Provider<Request = Vec<String>, Response = (), Streams = TestStreamPack> + Clone,
677        context: &mut TestingContext,
678    ) {
679        let request = vec![
680            "5".to_owned(),
681            "10".to_owned(),
682            "-3".to_owned(),
683            "-27".to_owned(),
684            "hello".to_owned(),
685        ];
686
687        let mut recipient =
688            context.command(|commands| commands.request(request, provider.clone()).take());
689
690        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
691        assert!(recipient.response.take().available().is_some());
692        assert!(
693            context.no_unhandled_errors(),
694            "{:#?}",
695            context.get_unhandled_errors()
696        );
697
698        let outcome: StreamMapOutcome = recipient.into();
699        assert_eq!(outcome.stream_u32, [5, 10]);
700        assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
701        assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
702
703        let request = vec![];
704
705        let mut recipient =
706            context.command(|commands| commands.request(request, provider.clone()).take());
707
708        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
709        assert!(recipient.response.take().available().is_some());
710        assert!(
711            context.no_unhandled_errors(),
712            "{:#?}",
713            context.get_unhandled_errors()
714        );
715
716        let outcome: StreamMapOutcome = recipient.into();
717        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
718        assert_eq!(outcome.stream_i32, Vec::<i32>::new());
719        assert_eq!(outcome.stream_string, Vec::<String>::new());
720
721        let request = vec![
722            "foo".to_string(),
723            "bar".to_string(),
724            "1.32".to_string(),
725            "-8".to_string(),
726        ];
727
728        let mut recipient =
729            context.command(|commands| commands.request(request, provider.clone()).take());
730
731        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
732        assert!(recipient.response.take().available().is_some());
733
734        let outcome: StreamMapOutcome = recipient.into();
735        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
736        assert_eq!(outcome.stream_i32, [-8]);
737        assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
738    }
739
740    fn validate_dynamically_named_stream_receiver(
741        provider: Service<Vec<String>, (), TestStreamPack>,
742        context: &mut TestingContext,
743    ) {
744        let provider: Service<Vec<String>, (), TestDynamicNamedStreams> =
745            provider.optional_stream_cast();
746
747        let request = vec![
748            "5".to_owned(),
749            "10".to_owned(),
750            "-3".to_owned(),
751            "-27".to_owned(),
752            "hello".to_owned(),
753        ];
754
755        let mut recipient =
756            context.command(|commands| commands.request(request, provider.clone()).take());
757
758        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
759        assert!(recipient.response.take().available().is_some());
760        assert!(
761            context.no_unhandled_errors(),
762            "{:#?}",
763            context.get_unhandled_errors()
764        );
765
766        let outcome: StreamMapOutcome = recipient.try_into().unwrap();
767        assert_eq!(outcome.stream_u32, [5, 10]);
768        assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
769        assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
770
771        let request = vec![];
772
773        let mut recipient =
774            context.command(|commands| commands.request(request, provider.clone()).take());
775
776        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
777        assert!(recipient.response.take().available().is_some());
778        assert!(
779            context.no_unhandled_errors(),
780            "{:#?}",
781            context.get_unhandled_errors()
782        );
783
784        let outcome: StreamMapOutcome = recipient.try_into().unwrap();
785        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
786        assert_eq!(outcome.stream_i32, Vec::<i32>::new());
787        assert_eq!(outcome.stream_string, Vec::<String>::new());
788
789        let request = vec![
790            "foo".to_string(),
791            "bar".to_string(),
792            "1.32".to_string(),
793            "-8".to_string(),
794        ];
795
796        let mut recipient =
797            context.command(|commands| commands.request(request, provider.clone()).take());
798
799        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
800        assert!(recipient.response.take().available().is_some());
801
802        let outcome: StreamMapOutcome = recipient.try_into().unwrap();
803        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
804        assert_eq!(outcome.stream_i32, [-8]);
805        assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
806    }
807
808    fn impl_stream_pack_test_blocking(
809        request: Vec<String>,
810        streams: <TestStreamPack as StreamPack>::StreamBuffers,
811    ) {
812        for r in request {
813            if let Ok(value) = r.parse::<u32>() {
814                streams.stream_u32.send(value);
815            }
816
817            if let Ok(value) = r.parse::<i32>() {
818                streams.stream_i32.send(value);
819            }
820
821            streams.stream_string.send(r);
822        }
823    }
824
825    fn impl_stream_pack_test_async(
826        request: Vec<String>,
827        streams: <TestStreamPack as StreamPack>::StreamChannels,
828    ) {
829        for r in request {
830            if let Ok(value) = r.parse::<u32>() {
831                streams.stream_u32.send(value);
832            }
833
834            if let Ok(value) = r.parse::<i32>() {
835                streams.stream_i32.send(value);
836            }
837
838            streams.stream_string.send(r);
839        }
840    }
841
842    fn impl_stream_pack_test_continuous(
843        In(ContinuousService { key }): In<ContinuousService<Vec<String>, (), TestStreamPack>>,
844        mut param: ContinuousQuery<Vec<String>, (), TestStreamPack>,
845    ) {
846        param.get_mut(&key).unwrap().for_each(|order| {
847            for r in order.request().clone() {
848                if let Ok(value) = r.parse::<u32>() {
849                    order.streams().stream_u32.send(value);
850                }
851
852                if let Ok(value) = r.parse::<i32>() {
853                    order.streams().stream_i32.send(value);
854                }
855
856                order.streams().stream_string.send(r);
857            }
858
859            order.respond(());
860        });
861    }
862
863    #[derive(Default)]
864    struct StreamMapOutcome {
865        stream_u32: Vec<u32>,
866        stream_i32: Vec<i32>,
867        stream_string: Vec<String>,
868    }
869
870    impl From<Recipient<(), TestStreamPack>> for StreamMapOutcome {
871        fn from(mut recipient: Recipient<(), TestStreamPack>) -> Self {
872            let mut result = Self::default();
873            while let Ok(r) = recipient.streams.stream_u32.try_recv() {
874                result.stream_u32.push(r);
875            }
876
877            while let Ok(r) = recipient.streams.stream_i32.try_recv() {
878                result.stream_i32.push(r);
879            }
880
881            while let Ok(r) = recipient.streams.stream_string.try_recv() {
882                result.stream_string.push(r);
883            }
884
885            result
886        }
887    }
888
889    type TestDynamicNamedStreams = (
890        DynamicallyNamedStream<StreamOf<u32>>,
891        DynamicallyNamedStream<StreamOf<i32>>,
892        DynamicallyNamedStream<StreamOf<String>>,
893    );
894
895    impl TryFrom<Recipient<(), TestDynamicNamedStreams>> for StreamMapOutcome {
896        type Error = UnknownName;
897        fn try_from(
898            mut recipient: Recipient<(), TestDynamicNamedStreams>,
899        ) -> Result<Self, Self::Error> {
900            let mut result = Self::default();
901            while let Ok(NamedValue { name, value }) = recipient.streams.0.try_recv() {
902                if name == "stream_u32" {
903                    result.stream_u32.push(value);
904                } else {
905                    return Err(UnknownName { name });
906                }
907            }
908
909            while let Ok(NamedValue { name, value }) = recipient.streams.1.try_recv() {
910                if name == "stream_i32" {
911                    result.stream_i32.push(value);
912                } else {
913                    return Err(UnknownName { name });
914                }
915            }
916
917            while let Ok(NamedValue { name, value }) = recipient.streams.2.try_recv() {
918                if name == "stream_string" {
919                    result.stream_string.push(value);
920                } else {
921                    return Err(UnknownName { name });
922                }
923            }
924
925            Ok(result)
926        }
927    }
928
929    #[test]
930    fn test_dynamically_named_streams() {
931        let mut context = TestingContext::minimal_plugins();
932
933        let parse_blocking_srv = context.command(|commands| {
934            commands.spawn_service(
935                |In(input): BlockingServiceInput<NamedInputs, TestDynamicNamedStreams>| {
936                    impl_dynamically_named_streams_blocking(input.request, input.streams);
937                },
938            )
939        });
940
941        validate_dynamically_named_streams(parse_blocking_srv, &mut context);
942
943        let parse_async_srv = context.command(|commands| {
944            commands.spawn_service(
945                |In(input): AsyncServiceInput<NamedInputs, TestDynamicNamedStreams>| async move {
946                    impl_dynamically_named_streams_async(input.request, input.streams);
947                },
948            )
949        });
950
951        validate_dynamically_named_streams(parse_async_srv, &mut context);
952
953        let parse_continuous_srv = context
954            .app
955            .spawn_continuous_service(Update, impl_dynamically_named_streams_continuous);
956
957        validate_dynamically_named_streams(parse_continuous_srv, &mut context);
958
959        let parse_blocking_callback =
960            (|In(input): BlockingCallbackInput<NamedInputs, TestDynamicNamedStreams>| {
961                impl_dynamically_named_streams_blocking(input.request, input.streams);
962            })
963            .as_callback();
964
965        validate_dynamically_named_streams(parse_blocking_callback, &mut context);
966
967        let parse_async_callback =
968            (|In(input): AsyncCallbackInput<NamedInputs, TestDynamicNamedStreams>| async move {
969                impl_dynamically_named_streams_async(input.request, input.streams);
970            })
971            .as_callback();
972
973        validate_dynamically_named_streams(parse_async_callback, &mut context);
974
975        let parse_blocking_map = (|input: BlockingMap<NamedInputs, TestDynamicNamedStreams>| {
976            impl_dynamically_named_streams_blocking(input.request, input.streams);
977        })
978        .as_map();
979
980        validate_dynamically_named_streams(parse_blocking_map, &mut context);
981
982        let parse_async_map = (|input: AsyncMap<NamedInputs, TestDynamicNamedStreams>| async move {
983            impl_dynamically_named_streams_async(input.request, input.streams);
984        })
985        .as_map();
986
987        validate_dynamically_named_streams(parse_async_map, &mut context);
988
989        let make_workflow = |service: Service<NamedInputs, (), TestDynamicNamedStreams>| {
990            move |scope: Scope<NamedInputs, (), TestDynamicNamedStreams>, builder: &mut Builder| {
991                let node = scope
992                    .input
993                    .chain(builder)
994                    .map_block(move |value| (value, service.into()))
995                    .then_injection_node();
996
997                builder.connect(node.streams.0, scope.streams.0);
998                builder.connect(node.streams.1, scope.streams.1);
999                builder.connect(node.streams.2, scope.streams.2);
1000
1001                builder.connect(node.output, scope.terminate);
1002            }
1003        };
1004
1005        let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
1006        validate_dynamically_named_streams(blocking_injection_workflow, &mut context);
1007
1008        let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
1009        validate_dynamically_named_streams(async_injection_workflow, &mut context);
1010
1011        let continuous_injection_workflow =
1012            context.spawn_workflow(make_workflow(parse_continuous_srv));
1013        validate_dynamically_named_streams(continuous_injection_workflow, &mut context);
1014
1015        let nested_workflow =
1016            context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1017                let node = scope.input.chain(builder).then_node(parse_continuous_srv);
1018
1019                builder.connect(node.streams.0, scope.streams.0);
1020                builder.connect(node.streams.1, scope.streams.1);
1021                builder.connect(node.streams.2, scope.streams.2);
1022
1023                builder.connect(node.output, scope.terminate);
1024            });
1025        validate_dynamically_named_streams(nested_workflow, &mut context);
1026
1027        let double_nested_workflow =
1028            context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1029                let node = scope.input.chain(builder).then_node(nested_workflow);
1030
1031                builder.connect(node.streams.0, scope.streams.0);
1032                builder.connect(node.streams.1, scope.streams.1);
1033                builder.connect(node.streams.2, scope.streams.2);
1034
1035                builder.connect(node.output, scope.terminate);
1036            });
1037        validate_dynamically_named_streams(double_nested_workflow, &mut context);
1038
1039        let scoped_workflow =
1040            context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1041                let inner_scope =
1042                    builder.create_scope::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1043                        let node = scope.input.chain(builder).then_node(parse_continuous_srv);
1044
1045                        builder.connect(node.streams.0, scope.streams.0);
1046                        builder.connect(node.streams.1, scope.streams.1);
1047                        builder.connect(node.streams.2, scope.streams.2);
1048
1049                        builder.connect(node.output, scope.terminate);
1050                    });
1051
1052                builder.connect(scope.input, inner_scope.input);
1053
1054                builder.connect(inner_scope.streams.0, scope.streams.0);
1055                builder.connect(inner_scope.streams.1, scope.streams.1);
1056                builder.connect(inner_scope.streams.2, scope.streams.2);
1057
1058                builder.connect(inner_scope.output, scope.terminate);
1059            });
1060        validate_dynamically_named_streams(scoped_workflow, &mut context);
1061
1062        // We can do a stream cast for the service-type providers but not for
1063        // the callbacks or maps.
1064        validate_dynamically_named_streams_into_stream_pack(parse_blocking_srv, &mut context);
1065        validate_dynamically_named_streams_into_stream_pack(parse_async_srv, &mut context);
1066        validate_dynamically_named_streams_into_stream_pack(parse_continuous_srv, &mut context);
1067        validate_dynamically_named_streams_into_stream_pack(
1068            blocking_injection_workflow,
1069            &mut context,
1070        );
1071        validate_dynamically_named_streams_into_stream_pack(async_injection_workflow, &mut context);
1072        validate_dynamically_named_streams_into_stream_pack(
1073            continuous_injection_workflow,
1074            &mut context,
1075        );
1076        validate_dynamically_named_streams_into_stream_pack(nested_workflow, &mut context);
1077        validate_dynamically_named_streams_into_stream_pack(double_nested_workflow, &mut context);
1078    }
1079
1080    fn validate_dynamically_named_streams(
1081        provider: impl Provider<Request = NamedInputs, Response = (), Streams = TestDynamicNamedStreams>
1082            + Clone,
1083        context: &mut TestingContext,
1084    ) {
1085        let expected_values_u32 = vec![
1086            NamedValue::new("stream_u32", 5),
1087            NamedValue::new("stream_u32", 10),
1088            NamedValue::new("stream_i32", 12),
1089        ];
1090
1091        let expected_values_i32 = vec![
1092            NamedValue::new("stream_i32", 2),
1093            NamedValue::new("stream_i32", -5),
1094            NamedValue::new("stream_u32", 7),
1095        ];
1096
1097        let expected_values_string = vec![
1098            NamedValue::new("stream_string", "hello".to_owned()),
1099            NamedValue::new("stream_string", "8".to_owned()),
1100            NamedValue::new("stream_u32", "22".to_owned()),
1101        ];
1102
1103        let request = NamedInputs {
1104            values_u32: expected_values_u32.clone(),
1105            values_i32: expected_values_i32.clone(),
1106            values_string: expected_values_string.clone(),
1107        };
1108
1109        let mut recipient = context.command(|commands| commands.request(request, provider).take());
1110
1111        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
1112        assert!(recipient.response.take().available().is_some());
1113        assert!(context.no_unhandled_errors());
1114
1115        let received_values_u32 = collect_received_values(recipient.streams.0);
1116        assert_eq!(expected_values_u32, received_values_u32);
1117
1118        let received_values_i32 = collect_received_values(recipient.streams.1);
1119        assert_eq!(expected_values_i32, received_values_i32);
1120
1121        let received_values_string = collect_received_values(recipient.streams.2);
1122        assert_eq!(expected_values_string, received_values_string);
1123    }
1124
1125    pub fn collect_received_values<T>(mut receiver: crate::Receiver<T>) -> Vec<T> {
1126        let mut result = Vec::new();
1127        while let Ok(value) = receiver.try_recv() {
1128            result.push(value);
1129        }
1130        result
1131    }
1132
1133    fn validate_dynamically_named_streams_into_stream_pack(
1134        provider: Service<NamedInputs, (), TestDynamicNamedStreams>,
1135        context: &mut TestingContext,
1136    ) {
1137        let provider: Service<NamedInputs, (), TestStreamPack> = provider.optional_stream_cast();
1138
1139        let request = NamedInputs {
1140            values_u32: vec![
1141                NamedValue::new("stream_u32", 5),
1142                NamedValue::new("stream_u32", 10),
1143                // This won't appear because its name isn't being listened for
1144                // for this value type
1145                NamedValue::new("stream_i32", 12),
1146            ],
1147            values_i32: vec![
1148                NamedValue::new("stream_i32", 2),
1149                NamedValue::new("stream_i32", -5),
1150                // This won't appear because its name isn't being listened for
1151                // for this value type
1152                NamedValue::new("stream_u32", 7),
1153            ],
1154            values_string: vec![
1155                NamedValue::new("stream_string", "hello".to_owned()),
1156                NamedValue::new("stream_string", "8".to_owned()),
1157                // This won't appear because its named isn't being listened for
1158                // for this value type
1159                NamedValue::new("stream_u32", "22".to_owned()),
1160            ],
1161        };
1162
1163        let mut recipient = context.command(|commands| commands.request(request, provider).take());
1164
1165        context.run_with_conditions(&mut recipient.response, Duration::from_secs(2));
1166        assert!(recipient.response.take().available().is_some());
1167        assert!(context.no_unhandled_errors());
1168
1169        let outcome: StreamMapOutcome = recipient.try_into().unwrap();
1170        assert_eq!(outcome.stream_u32, [5, 10]);
1171        assert_eq!(outcome.stream_i32, [2, -5]);
1172        assert_eq!(outcome.stream_string, ["hello", "8"]);
1173    }
1174
1175    fn impl_dynamically_named_streams_blocking(
1176        request: NamedInputs,
1177        streams: <TestDynamicNamedStreams as StreamPack>::StreamBuffers,
1178    ) {
1179        for nv in request.values_u32 {
1180            streams.0.send(nv);
1181        }
1182
1183        for nv in request.values_i32 {
1184            streams.1.send(nv);
1185        }
1186
1187        for nv in request.values_string {
1188            streams.2.send(nv);
1189        }
1190    }
1191
1192    fn impl_dynamically_named_streams_async(
1193        request: NamedInputs,
1194        streams: <TestDynamicNamedStreams as StreamPack>::StreamChannels,
1195    ) {
1196        for nv in request.values_u32 {
1197            streams.0.send(nv);
1198        }
1199
1200        for nv in request.values_i32 {
1201            streams.1.send(nv);
1202        }
1203
1204        for nv in request.values_string {
1205            streams.2.send(nv);
1206        }
1207    }
1208
1209    fn impl_dynamically_named_streams_continuous(
1210        In(ContinuousService { key }): In<
1211            ContinuousService<NamedInputs, (), TestDynamicNamedStreams>,
1212        >,
1213        mut param: ContinuousQuery<NamedInputs, (), TestDynamicNamedStreams>,
1214    ) {
1215        param.get_mut(&key).unwrap().for_each(|order| {
1216            for nv in order.request().values_u32.iter() {
1217                order.streams().0.send(nv.clone());
1218            }
1219
1220            for nv in order.request().values_i32.iter() {
1221                order.streams().1.send(nv.clone());
1222            }
1223
1224            for nv in order.request().values_string.iter() {
1225                order.streams().2.send(nv.clone());
1226            }
1227
1228            order.respond(());
1229        });
1230    }
1231
1232    struct NamedInputs {
1233        values_u32: Vec<NamedValue<u32>>,
1234        values_i32: Vec<NamedValue<i32>>,
1235        values_string: Vec<NamedValue<String>>,
1236    }
1237
1238    #[derive(thiserror::Error, Debug)]
1239    #[error("received unknown name: {name}")]
1240    struct UnknownName {
1241        name: Cow<'static, str>,
1242    }
1243
1244    #[derive(StreamPack)]
1245    pub(crate) struct TestStreamPack {
1246        stream_u32: u32,
1247        stream_i32: i32,
1248        stream_string: String,
1249    }
1250}