crossflow/
stream.rs

1/*
2 * Copyright (C) 2023 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::prelude::{Entity, World};
19
20use std::sync::OnceLock;
21
22use crate::{ManageInput, OperationError, OperationResult, OperationRoster, OrBroken};
23
24mod anonymous_stream;
25pub use anonymous_stream::*;
26
27mod dynamically_named_stream;
28pub use dynamically_named_stream::*;
29
30mod named_stream;
31pub use named_stream::*;
32
33mod stream_availability;
34pub use stream_availability::*;
35
36mod stream_buffer;
37pub use stream_buffer::*;
38
39mod stream_channel;
40pub use stream_channel::*;
41
42mod stream_filter;
43pub use stream_filter::*;
44
45mod stream_pack;
46pub use stream_pack::*;
47
48mod stream_target_map;
49pub use stream_target_map::*;
50
51// TODO(@mxgrey): Add module-level documentation for stream.rs
52
53pub use crossflow_derive::{Stream, StreamPack};
54/// You can create custom stream types that have side-effects, such as:
55/// - applying a transformation to the stream input to produce a different type of output
56/// - logging the message data
57/// - triggering an event or modifying a resource/component in the [`World`]
58///
59/// After you have implemented `StreamEffect` for your struct, you can apply
60/// `#[derive(Stream)]` to the struct and then use as a [`StreamPack`], either
61/// on its own or in a tuple.
62///
63/// If you just want to stream a message with no side-effects, you can simply
64/// wrap your message type in [`StreamOf`] to get a [`StreamPack`]. Users only
65/// need to use `StreamEffect` if you want custom stream side-effects.
66pub trait StreamEffect: 'static + Send + Sync + Sized {
67    type Input: 'static + Send + Sync;
68    type Output: 'static + Send + Sync;
69
70    /// Specify a side effect that is meant to happen whenever a stream value is
71    /// sent.
72    fn side_effect(
73        input: Self::Input,
74        request: &mut StreamRequest,
75    ) -> Result<Self::Output, OperationError>;
76}
77
78/// A wrapper to make an anonymous (unnamed) stream for any type `T` that
79/// implements `'static + Send + Sync`. This simply transmits the `T` with no
80/// side-effects.
81#[derive(Stream)]
82pub struct StreamOf<T: 'static + Send + Sync>(std::marker::PhantomData<fn(T)>);
83
84impl<T: 'static + Send + Sync> Clone for StreamOf<T> {
85    fn clone(&self) -> Self {
86        *self
87    }
88}
89
90impl<T: 'static + Send + Sync> Copy for StreamOf<T> {}
91
92impl<T: 'static + Send + Sync> std::fmt::Debug for StreamOf<T> {
93    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
94        static NAME: OnceLock<String> = OnceLock::new();
95        let name = NAME.get_or_init(|| format!("StreamOf<{}>", std::any::type_name::<T>(),));
96
97        f.debug_struct(name.as_str()).finish()
98    }
99}
100
101impl<T: 'static + Send + Sync> StreamEffect for StreamOf<T> {
102    type Input = T;
103    type Output = T;
104
105    /// `StreamOf` has no side-effect
106    fn side_effect(
107        input: Self::Input,
108        _: &mut StreamRequest,
109    ) -> Result<Self::Output, OperationError> {
110        Ok(input)
111    }
112}
113
114/// `StreamRequest` is provided to types that implement [`StreamEffect`] so they
115/// can process a stream input and apply any side effects to it. Note that your
116/// implementation of [`StreamEffect::side_effect`] should return the output
117/// data; it should *not* use the `StreamRequest` send it the output to the target.
118pub struct StreamRequest<'a> {
119    /// The node that emitted the stream
120    pub source: Entity,
121    /// The session of the stream
122    pub session: Entity,
123    /// The target of the stream, if a specific target exists.
124    pub target: Option<Entity>,
125    /// The world that the stream exists inside
126    pub world: &'a mut World,
127    /// The roster of the stream
128    pub roster: &'a mut OperationRoster,
129}
130
131impl<'a> StreamRequest<'a> {
132    pub fn send_output<T: 'static + Send + Sync>(self, output: T) -> OperationResult {
133        let Self {
134            session,
135            target,
136            world,
137            roster,
138            ..
139        } = self;
140        if let Some(target) = target {
141            world
142                .get_entity_mut(target)
143                .or_broken()?
144                .give_input(session, output, roster)?;
145        }
146
147        Ok(())
148    }
149}
150
151#[cfg(test)]
152pub(crate) mod tests {
153    use crate::{dyn_node::*, prelude::*, testing::*};
154    use std::borrow::Cow;
155
156    #[test]
157    fn test_single_stream() {
158        let mut context = TestingContext::minimal_plugins();
159
160        let count_blocking_srv = context.command(|commands| {
161            commands.spawn_service(|In(input): BlockingServiceInput<u32, StreamOf<u32>>| {
162                for i in 0..input.request {
163                    input.streams.send(i);
164                }
165                return input.request;
166            })
167        });
168
169        test_counting_stream(count_blocking_srv, &mut context);
170
171        let count_async_srv = context.command(|commands| {
172            commands.spawn_service(
173                |In(input): AsyncServiceInput<u32, StreamOf<u32>>| async move {
174                    for i in 0..input.request {
175                        input.streams.send(i);
176                    }
177                    return input.request;
178                },
179            )
180        });
181
182        test_counting_stream(count_async_srv, &mut context);
183
184        let count_blocking_callback = (|In(input): BlockingCallbackInput<u32, StreamOf<u32>>| {
185            for i in 0..input.request {
186                input.streams.send(i);
187            }
188            return input.request;
189        })
190        .as_callback();
191
192        test_counting_stream(count_blocking_callback, &mut context);
193
194        let count_async_callback =
195            (|In(input): AsyncCallbackInput<u32, StreamOf<u32>>| async move {
196                for i in 0..input.request {
197                    input.streams.send(i);
198                }
199                return input.request;
200            })
201            .as_callback();
202
203        test_counting_stream(count_async_callback, &mut context);
204
205        let count_blocking_map = (|input: BlockingMap<u32, StreamOf<u32>>| {
206            for i in 0..input.request {
207                input.streams.send(i);
208            }
209            return input.request;
210        })
211        .as_map();
212
213        test_counting_stream(count_blocking_map, &mut context);
214
215        let count_async_map = (|input: AsyncMap<u32, StreamOf<u32>>| async move {
216            for i in 0..input.request {
217                input.streams.send(i);
218            }
219            return input.request;
220        })
221        .as_map();
222
223        test_counting_stream(count_async_map, &mut context);
224    }
225
226    fn test_counting_stream(
227        provider: impl Provider<Request = u32, Response = u32, Streams = StreamOf<u32>>,
228        context: &mut TestingContext,
229    ) {
230        let mut capture = context.command(|commands| commands.request(10, provider).capture());
231
232        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
233        let r = capture.outcome.try_recv().unwrap().unwrap();
234        assert_eq!(r, 10);
235
236        let mut stream: Vec<u32> = Vec::new();
237        while let Ok(r) = capture.streams.try_recv() {
238            stream.push(r);
239        }
240        assert_eq!(stream, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
241        context.assert_no_errors();
242    }
243
244    type FormatStreams = (StreamOf<u32>, StreamOf<i32>, StreamOf<f32>);
245    #[test]
246    fn test_tuple_stream() {
247        let mut context = TestingContext::minimal_plugins();
248
249        let parse_blocking_srv = context.command(|commands| {
250            commands.spawn_service(|In(input): BlockingServiceInput<String, FormatStreams>| {
251                impl_formatting_streams_blocking(input.request, input.streams);
252            })
253        });
254
255        validate_formatting_stream(parse_blocking_srv, &mut context);
256
257        let parse_async_srv = context.command(|commands| {
258            commands.spawn_service(
259                |In(input): AsyncServiceInput<String, FormatStreams>| async move {
260                    impl_formatting_streams_async(input.request, input.streams);
261                },
262            )
263        });
264
265        validate_formatting_stream(parse_async_srv, &mut context);
266
267        let parse_continuous_srv = context
268            .app
269            .spawn_continuous_service(Update, impl_formatting_streams_continuous);
270
271        validate_formatting_stream(parse_continuous_srv, &mut context);
272
273        let parse_blocking_callback =
274            (|In(input): BlockingCallbackInput<String, FormatStreams>| {
275                impl_formatting_streams_blocking(input.request, input.streams);
276            })
277            .as_callback();
278
279        validate_formatting_stream(parse_blocking_callback, &mut context);
280
281        let parse_async_callback =
282            (|In(input): AsyncCallbackInput<String, FormatStreams>| async move {
283                impl_formatting_streams_async(input.request, input.streams);
284            })
285            .as_callback();
286
287        validate_formatting_stream(parse_async_callback, &mut context);
288
289        let parse_blocking_map = (|input: BlockingMap<String, FormatStreams>| {
290            impl_formatting_streams_blocking(input.request, input.streams);
291        })
292        .as_map();
293
294        validate_formatting_stream(parse_blocking_map, &mut context);
295
296        let parse_async_map = (|input: AsyncMap<String, FormatStreams>| async move {
297            impl_formatting_streams_async(input.request, input.streams);
298        })
299        .as_map();
300
301        validate_formatting_stream(parse_async_map, &mut context);
302
303        let make_workflow = |service: Service<String, (), FormatStreams>| {
304            move |scope: Scope<String, (), FormatStreams>, builder: &mut Builder| {
305                let node = builder
306                    .chain(scope.start)
307                    .map_block(move |value| (value, service.into()))
308                    .then_injection_node();
309
310                builder.connect(node.streams.0, scope.streams.0);
311                builder.connect(node.streams.1, scope.streams.1);
312                builder.connect(node.streams.2, scope.streams.2);
313
314                builder.connect(node.output, scope.terminate);
315            }
316        };
317
318        let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
319        validate_formatting_stream(blocking_injection_workflow, &mut context);
320
321        let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
322        validate_formatting_stream(async_injection_workflow, &mut context);
323
324        let continuous_injection_workflow =
325            context.spawn_workflow(make_workflow(parse_continuous_srv));
326        validate_formatting_stream(continuous_injection_workflow, &mut context);
327
328        let nested_workflow = context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
329            let inner_node = builder
330                .chain(scope.start)
331                .then_node(continuous_injection_workflow);
332            builder.connect(inner_node.streams.0, scope.streams.0);
333            builder.connect(inner_node.streams.1, scope.streams.1);
334            builder.connect(inner_node.streams.2, scope.streams.2);
335            builder.connect(inner_node.output, scope.terminate);
336        });
337        validate_formatting_stream(nested_workflow, &mut context);
338
339        let double_nested_workflow =
340            context.spawn_workflow::<_, _, FormatStreams, _>(|scope, builder| {
341                let inner_node = builder.create_node(nested_workflow);
342                builder.connect(scope.start, inner_node.input);
343                builder.connect(inner_node.streams.0, scope.streams.0);
344                builder.connect(inner_node.streams.1, scope.streams.1);
345                builder.connect(inner_node.streams.2, scope.streams.2);
346                builder.connect(inner_node.output, scope.terminate);
347            });
348        validate_formatting_stream(double_nested_workflow, &mut context);
349    }
350
351    fn impl_formatting_streams_blocking(
352        request: String,
353        streams: <FormatStreams as StreamPack>::StreamBuffers,
354    ) {
355        if let Ok(value) = request.parse::<u32>() {
356            streams.0.send(value);
357        }
358
359        if let Ok(value) = request.parse::<i32>() {
360            streams.1.send(value);
361        }
362
363        if let Ok(value) = request.parse::<f32>() {
364            streams.2.send(value);
365        }
366    }
367
368    fn impl_formatting_streams_async(
369        request: String,
370        streams: <FormatStreams as StreamPack>::StreamChannels,
371    ) {
372        if let Ok(value) = request.parse::<u32>() {
373            streams.0.send(value);
374        }
375
376        if let Ok(value) = request.parse::<i32>() {
377            streams.1.send(value);
378        }
379
380        if let Ok(value) = request.parse::<f32>() {
381            streams.2.send(value);
382        }
383    }
384
385    fn impl_formatting_streams_continuous(
386        In(ContinuousService { key }): In<ContinuousService<String, (), FormatStreams>>,
387        mut param: ContinuousQuery<String, (), FormatStreams>,
388    ) {
389        param.get_mut(&key).unwrap().for_each(|order| {
390            if let Ok(value) = order.request().parse::<u32>() {
391                order.streams().0.send(value);
392            }
393
394            if let Ok(value) = order.request().parse::<i32>() {
395                order.streams().1.send(value);
396            }
397
398            if let Ok(value) = order.request().parse::<f32>() {
399                order.streams().2.send(value);
400            }
401
402            order.respond(());
403        });
404    }
405
406    fn validate_formatting_stream(
407        provider: impl Provider<Request = String, Response = (), Streams = FormatStreams> + Clone,
408        context: &mut TestingContext,
409    ) {
410        let mut capture = context
411            .command(|commands| commands.request("5".to_owned(), provider.clone()).capture());
412
413        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
414        assert!(capture.outcome.is_available());
415        context.assert_no_errors();
416
417        let outcome: FormatOutcome = capture.into();
418        assert_eq!(outcome.stream_u32, [5]);
419        assert_eq!(outcome.stream_i32, [5]);
420        assert_eq!(outcome.stream_f32, [5.0]);
421
422        let mut capture = context.command(|commands| {
423            commands
424                .request("-2".to_owned(), provider.clone())
425                .capture()
426        });
427
428        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
429        assert!(capture.outcome.is_available());
430        assert!(context.no_unhandled_errors());
431
432        let outcome: FormatOutcome = capture.into();
433        assert!(outcome.stream_u32.is_empty());
434        assert_eq!(outcome.stream_i32, [-2]);
435        assert_eq!(outcome.stream_f32, [-2.0]);
436
437        let mut capture = context.command(|commands| {
438            commands
439                .request("6.7".to_owned(), provider.clone())
440                .capture()
441        });
442
443        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
444        assert!(capture.outcome.is_available());
445        context.assert_no_errors();
446
447        let outcome: FormatOutcome = capture.into();
448        assert!(outcome.stream_u32.is_empty());
449        assert!(outcome.stream_i32.is_empty());
450        assert_eq!(outcome.stream_f32, [6.7]);
451
452        let mut capture = context.command(|commands| {
453            commands
454                .request("hello".to_owned(), provider.clone())
455                .capture()
456        });
457
458        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
459        assert!(capture.outcome.is_available());
460        context.assert_no_errors();
461
462        let outcome: FormatOutcome = capture.into();
463        assert!(outcome.stream_u32.is_empty());
464        assert!(outcome.stream_i32.is_empty());
465        assert!(outcome.stream_f32.is_empty());
466    }
467
468    #[derive(Default)]
469    struct FormatOutcome {
470        stream_u32: Vec<u32>,
471        stream_i32: Vec<i32>,
472        stream_f32: Vec<f32>,
473    }
474
475    impl From<Capture<(), FormatStreams>> for FormatOutcome {
476        fn from(mut capture: Capture<(), FormatStreams>) -> Self {
477            let mut result = Self::default();
478            while let Ok(r) = capture.streams.0.try_recv() {
479                result.stream_u32.push(r);
480            }
481
482            while let Ok(r) = capture.streams.1.try_recv() {
483                result.stream_i32.push(r);
484            }
485
486            while let Ok(r) = capture.streams.2.try_recv() {
487                result.stream_f32.push(r);
488            }
489
490            result
491        }
492    }
493
494    #[test]
495    fn test_stream_pack() {
496        let mut context = TestingContext::minimal_plugins();
497
498        let parse_blocking_srv = context.command(|commands| {
499            commands.spawn_service(
500                |In(input): BlockingServiceInput<Vec<String>, TestStreamPack>| {
501                    impl_stream_pack_test_blocking(input.request, input.streams);
502                },
503            )
504        });
505
506        validate_stream_pack(parse_blocking_srv, &mut context);
507
508        let parse_async_srv = context.command(|commands| {
509            commands.spawn_service(
510                |In(input): AsyncServiceInput<Vec<String>, TestStreamPack>| async move {
511                    impl_stream_pack_test_async(input.request, input.streams);
512                },
513            )
514        });
515
516        validate_stream_pack(parse_async_srv, &mut context);
517
518        let parse_continuous_srv = context
519            .app
520            .spawn_continuous_service(Update, impl_stream_pack_test_continuous);
521
522        validate_stream_pack(parse_continuous_srv, &mut context);
523
524        let parse_blocking_callback =
525            (|In(input): BlockingCallbackInput<Vec<String>, TestStreamPack>| {
526                impl_stream_pack_test_blocking(input.request, input.streams);
527            })
528            .as_callback();
529
530        validate_stream_pack(parse_blocking_callback, &mut context);
531
532        let parse_async_callback =
533            (|In(input): AsyncCallbackInput<Vec<String>, TestStreamPack>| async move {
534                impl_stream_pack_test_async(input.request, input.streams);
535            })
536            .as_callback();
537
538        validate_stream_pack(parse_async_callback, &mut context);
539
540        let parse_blocking_map = (|input: BlockingMap<Vec<String>, TestStreamPack>| {
541            impl_stream_pack_test_blocking(input.request, input.streams);
542        })
543        .as_map();
544
545        validate_stream_pack(parse_blocking_map, &mut context);
546
547        let parse_async_map = (|input: AsyncMap<Vec<String>, TestStreamPack>| async move {
548            impl_stream_pack_test_async(input.request, input.streams);
549        })
550        .as_map();
551
552        validate_stream_pack(parse_async_map, &mut context);
553
554        let make_workflow = |service: Service<Vec<String>, (), TestStreamPack>| {
555            move |scope: Scope<Vec<String>, (), TestStreamPack>, builder: &mut Builder| {
556                let node = builder
557                    .chain(scope.start)
558                    .map_block(move |value| (value, service.into()))
559                    .then_injection_node();
560
561                builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
562                builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
563                builder.connect(node.streams.stream_string, scope.streams.stream_string);
564
565                builder.connect(node.output, scope.terminate);
566            }
567        };
568
569        let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
570        validate_stream_pack(blocking_injection_workflow, &mut context);
571
572        let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
573        validate_stream_pack(async_injection_workflow, &mut context);
574
575        let continuous_injection_workflow =
576            context.spawn_workflow(make_workflow(parse_continuous_srv));
577        validate_stream_pack(continuous_injection_workflow, &mut context);
578
579        let nested_workflow =
580            context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
581                let node = builder.chain(scope.start).then_node(parse_continuous_srv);
582
583                builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
584                builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
585                builder.connect(node.streams.stream_string, scope.streams.stream_string);
586
587                builder.connect(node.output, scope.terminate);
588            });
589        validate_stream_pack(nested_workflow, &mut context);
590
591        let double_nested_workflow =
592            context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
593                let node = builder.chain(scope.start).then_node(nested_workflow);
594
595                builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
596                builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
597                builder.connect(node.streams.stream_string, scope.streams.stream_string);
598
599                builder.connect(node.output, scope.terminate);
600            });
601        validate_stream_pack(double_nested_workflow, &mut context);
602
603        let scoped_workflow =
604            context.spawn_workflow::<_, _, TestStreamPack, _>(|scope, builder| {
605                let inner_scope =
606                    builder.create_scope::<_, _, TestStreamPack, _>(|scope, builder| {
607                        let node = builder.chain(scope.start).then_node(parse_continuous_srv);
608
609                        builder.connect(node.streams.stream_u32, scope.streams.stream_u32);
610                        builder.connect(node.streams.stream_i32, scope.streams.stream_i32);
611                        builder.connect(node.streams.stream_string, scope.streams.stream_string);
612
613                        builder.connect(node.output, scope.terminate);
614                    });
615
616                builder.connect(scope.start, inner_scope.input);
617
618                builder.connect(inner_scope.streams.stream_u32, scope.streams.stream_u32);
619                builder.connect(inner_scope.streams.stream_i32, scope.streams.stream_i32);
620                builder.connect(
621                    inner_scope.streams.stream_string,
622                    scope.streams.stream_string,
623                );
624
625                builder.connect(inner_scope.output, scope.terminate);
626            });
627        validate_stream_pack(scoped_workflow, &mut context);
628
629        let dyn_stream_workflow =
630            context.spawn_workflow::<Vec<String>, (), TestStreamPack, _>(|scope, builder| {
631                let dyn_scope_input: DynOutput = scope.start.into();
632
633                let node = builder.create_node(parse_continuous_srv);
634                let mut dyn_node: DynNode = node.into();
635
636                dyn_scope_input
637                    .connect_to(&dyn_node.input, builder)
638                    .unwrap();
639
640                let dyn_scope_stream_u32: DynInputSlot = scope.streams.stream_u32.into();
641                let dyn_node_stream_u32 = dyn_node.streams.take_named("stream_u32").unwrap();
642                dyn_node_stream_u32
643                    .connect_to(&dyn_scope_stream_u32, builder)
644                    .unwrap();
645
646                let dyn_scope_stream_i32: DynInputSlot = scope.streams.stream_i32.into();
647                let dyn_node_stream_i32 = dyn_node.streams.take_named("stream_i32").unwrap();
648                dyn_node_stream_i32
649                    .connect_to(&dyn_scope_stream_i32, builder)
650                    .unwrap();
651
652                let dyn_scope_stream_string: DynInputSlot = scope.streams.stream_string.into();
653                let dyn_node_stream_string = dyn_node.streams.take_named("stream_string").unwrap();
654                dyn_node_stream_string
655                    .connect_to(&dyn_scope_stream_string, builder)
656                    .unwrap();
657
658                let terminate: DynInputSlot = scope.terminate.into();
659                dyn_node.output.connect_to(&terminate, builder).unwrap();
660            });
661        validate_stream_pack(dyn_stream_workflow, &mut context);
662
663        // We can do a stream cast for the service-type providers but not for
664        // the callbacks or maps.
665        validate_dynamically_named_stream_receiver(parse_blocking_srv, &mut context);
666        validate_dynamically_named_stream_receiver(parse_async_srv, &mut context);
667        validate_dynamically_named_stream_receiver(parse_continuous_srv, &mut context);
668        validate_dynamically_named_stream_receiver(blocking_injection_workflow, &mut context);
669        validate_dynamically_named_stream_receiver(async_injection_workflow, &mut context);
670        validate_dynamically_named_stream_receiver(continuous_injection_workflow, &mut context);
671        validate_dynamically_named_stream_receiver(nested_workflow, &mut context);
672        validate_dynamically_named_stream_receiver(double_nested_workflow, &mut context);
673    }
674
675    fn validate_stream_pack(
676        provider: impl Provider<Request = Vec<String>, Response = (), Streams = TestStreamPack> + Clone,
677        context: &mut TestingContext,
678    ) {
679        let request = vec![
680            "5".to_owned(),
681            "10".to_owned(),
682            "-3".to_owned(),
683            "-27".to_owned(),
684            "hello".to_owned(),
685        ];
686
687        let mut capture =
688            context.command(|commands| commands.request(request, provider.clone()).capture());
689
690        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
691        assert!(capture.outcome.is_available());
692        context.assert_no_errors();
693
694        let outcome: StreamMapOutcome = capture.into();
695        assert_eq!(outcome.stream_u32, [5, 10]);
696        assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
697        assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
698
699        let request = vec![];
700
701        let mut capture =
702            context.command(|commands| commands.request(request, provider.clone()).capture());
703
704        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
705        assert!(capture.outcome.is_available());
706        context.assert_no_errors();
707
708        let outcome: StreamMapOutcome = capture.into();
709        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
710        assert_eq!(outcome.stream_i32, Vec::<i32>::new());
711        assert_eq!(outcome.stream_string, Vec::<String>::new());
712
713        let request = vec![
714            "foo".to_string(),
715            "bar".to_string(),
716            "1.32".to_string(),
717            "-8".to_string(),
718        ];
719
720        let mut capture =
721            context.command(|commands| commands.request(request, provider.clone()).capture());
722
723        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
724        assert!(capture.outcome.is_available());
725
726        let outcome: StreamMapOutcome = capture.into();
727        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
728        assert_eq!(outcome.stream_i32, [-8]);
729        assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
730    }
731
732    fn validate_dynamically_named_stream_receiver(
733        provider: Service<Vec<String>, (), TestStreamPack>,
734        context: &mut TestingContext,
735    ) {
736        let provider: Service<Vec<String>, (), TestDynamicNamedStreams> =
737            provider.optional_stream_cast();
738
739        let request = vec![
740            "5".to_owned(),
741            "10".to_owned(),
742            "-3".to_owned(),
743            "-27".to_owned(),
744            "hello".to_owned(),
745        ];
746
747        let mut capture =
748            context.command(|commands| commands.request(request, provider.clone()).capture());
749
750        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
751        assert!(capture.outcome.is_available());
752        assert!(
753            context.no_unhandled_errors(),
754            "{:#?}",
755            context.get_unhandled_errors()
756        );
757
758        let outcome: StreamMapOutcome = capture.try_into().unwrap();
759        assert_eq!(outcome.stream_u32, [5, 10]);
760        assert_eq!(outcome.stream_i32, [5, 10, -3, -27]);
761        assert_eq!(outcome.stream_string, ["5", "10", "-3", "-27", "hello"]);
762
763        let request = vec![];
764
765        let mut capture =
766            context.command(|commands| commands.request(request, provider.clone()).capture());
767
768        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
769        assert!(capture.outcome.is_available());
770        assert!(
771            context.no_unhandled_errors(),
772            "{:#?}",
773            context.get_unhandled_errors()
774        );
775
776        let outcome: StreamMapOutcome = capture.try_into().unwrap();
777        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
778        assert_eq!(outcome.stream_i32, Vec::<i32>::new());
779        assert_eq!(outcome.stream_string, Vec::<String>::new());
780
781        let request = vec![
782            "foo".to_string(),
783            "bar".to_string(),
784            "1.32".to_string(),
785            "-8".to_string(),
786        ];
787
788        let mut capture =
789            context.command(|commands| commands.request(request, provider.clone()).capture());
790
791        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
792        assert!(capture.outcome.is_available());
793
794        let outcome: StreamMapOutcome = capture.try_into().unwrap();
795        assert_eq!(outcome.stream_u32, Vec::<u32>::new());
796        assert_eq!(outcome.stream_i32, [-8]);
797        assert_eq!(outcome.stream_string, ["foo", "bar", "1.32", "-8"]);
798    }
799
800    fn impl_stream_pack_test_blocking(
801        request: Vec<String>,
802        streams: <TestStreamPack as StreamPack>::StreamBuffers,
803    ) {
804        for r in request {
805            if let Ok(value) = r.parse::<u32>() {
806                streams.stream_u32.send(value);
807            }
808
809            if let Ok(value) = r.parse::<i32>() {
810                streams.stream_i32.send(value);
811            }
812
813            streams.stream_string.send(r);
814        }
815    }
816
817    fn impl_stream_pack_test_async(
818        request: Vec<String>,
819        streams: <TestStreamPack as StreamPack>::StreamChannels,
820    ) {
821        for r in request {
822            if let Ok(value) = r.parse::<u32>() {
823                streams.stream_u32.send(value);
824            }
825
826            if let Ok(value) = r.parse::<i32>() {
827                streams.stream_i32.send(value);
828            }
829
830            streams.stream_string.send(r);
831        }
832    }
833
834    fn impl_stream_pack_test_continuous(
835        In(ContinuousService { key }): In<ContinuousService<Vec<String>, (), TestStreamPack>>,
836        mut param: ContinuousQuery<Vec<String>, (), TestStreamPack>,
837    ) {
838        param.get_mut(&key).unwrap().for_each(|order| {
839            for r in order.request().clone() {
840                if let Ok(value) = r.parse::<u32>() {
841                    order.streams().stream_u32.send(value);
842                }
843
844                if let Ok(value) = r.parse::<i32>() {
845                    order.streams().stream_i32.send(value);
846                }
847
848                order.streams().stream_string.send(r);
849            }
850
851            order.respond(());
852        });
853    }
854
855    #[derive(Default)]
856    struct StreamMapOutcome {
857        stream_u32: Vec<u32>,
858        stream_i32: Vec<i32>,
859        stream_string: Vec<String>,
860    }
861
862    impl From<Capture<(), TestStreamPack>> for StreamMapOutcome {
863        fn from(mut capture: Capture<(), TestStreamPack>) -> Self {
864            let mut result = Self::default();
865            while let Ok(r) = capture.streams.stream_u32.try_recv() {
866                result.stream_u32.push(r);
867            }
868
869            while let Ok(r) = capture.streams.stream_i32.try_recv() {
870                result.stream_i32.push(r);
871            }
872
873            while let Ok(r) = capture.streams.stream_string.try_recv() {
874                result.stream_string.push(r);
875            }
876
877            result
878        }
879    }
880
881    type TestDynamicNamedStreams = (
882        DynamicallyNamedStream<StreamOf<u32>>,
883        DynamicallyNamedStream<StreamOf<i32>>,
884        DynamicallyNamedStream<StreamOf<String>>,
885    );
886
887    impl TryFrom<Capture<(), TestDynamicNamedStreams>> for StreamMapOutcome {
888        type Error = UnknownName;
889        fn try_from(
890            mut capture: Capture<(), TestDynamicNamedStreams>,
891        ) -> Result<Self, Self::Error> {
892            let mut result = Self::default();
893            while let Ok(NamedValue { name, value }) = capture.streams.0.try_recv() {
894                if name == "stream_u32" {
895                    result.stream_u32.push(value);
896                } else {
897                    return Err(UnknownName { name });
898                }
899            }
900
901            while let Ok(NamedValue { name, value }) = capture.streams.1.try_recv() {
902                if name == "stream_i32" {
903                    result.stream_i32.push(value);
904                } else {
905                    return Err(UnknownName { name });
906                }
907            }
908
909            while let Ok(NamedValue { name, value }) = capture.streams.2.try_recv() {
910                if name == "stream_string" {
911                    result.stream_string.push(value);
912                } else {
913                    return Err(UnknownName { name });
914                }
915            }
916
917            Ok(result)
918        }
919    }
920
921    #[test]
922    fn test_dynamically_named_streams() {
923        let mut context = TestingContext::minimal_plugins();
924
925        let parse_blocking_srv = context.command(|commands| {
926            commands.spawn_service(
927                |In(input): BlockingServiceInput<NamedInputs, TestDynamicNamedStreams>| {
928                    impl_dynamically_named_streams_blocking(input.request, input.streams);
929                },
930            )
931        });
932
933        validate_dynamically_named_streams(parse_blocking_srv, &mut context);
934
935        let parse_async_srv = context.command(|commands| {
936            commands.spawn_service(
937                |In(input): AsyncServiceInput<NamedInputs, TestDynamicNamedStreams>| async move {
938                    impl_dynamically_named_streams_async(input.request, input.streams);
939                },
940            )
941        });
942
943        validate_dynamically_named_streams(parse_async_srv, &mut context);
944
945        let parse_continuous_srv = context
946            .app
947            .spawn_continuous_service(Update, impl_dynamically_named_streams_continuous);
948
949        validate_dynamically_named_streams(parse_continuous_srv, &mut context);
950
951        let parse_blocking_callback =
952            (|In(input): BlockingCallbackInput<NamedInputs, TestDynamicNamedStreams>| {
953                impl_dynamically_named_streams_blocking(input.request, input.streams);
954            })
955            .as_callback();
956
957        validate_dynamically_named_streams(parse_blocking_callback, &mut context);
958
959        let parse_async_callback =
960            (|In(input): AsyncCallbackInput<NamedInputs, TestDynamicNamedStreams>| async move {
961                impl_dynamically_named_streams_async(input.request, input.streams);
962            })
963            .as_callback();
964
965        validate_dynamically_named_streams(parse_async_callback, &mut context);
966
967        let parse_blocking_map = (|input: BlockingMap<NamedInputs, TestDynamicNamedStreams>| {
968            impl_dynamically_named_streams_blocking(input.request, input.streams);
969        })
970        .as_map();
971
972        validate_dynamically_named_streams(parse_blocking_map, &mut context);
973
974        let parse_async_map = (|input: AsyncMap<NamedInputs, TestDynamicNamedStreams>| async move {
975            impl_dynamically_named_streams_async(input.request, input.streams);
976        })
977        .as_map();
978
979        validate_dynamically_named_streams(parse_async_map, &mut context);
980
981        let make_workflow = |service: Service<NamedInputs, (), TestDynamicNamedStreams>| {
982            move |scope: Scope<NamedInputs, (), TestDynamicNamedStreams>, builder: &mut Builder| {
983                let node = builder
984                    .chain(scope.start)
985                    .map_block(move |value| (value, service.into()))
986                    .then_injection_node();
987
988                builder.connect(node.streams.0, scope.streams.0);
989                builder.connect(node.streams.1, scope.streams.1);
990                builder.connect(node.streams.2, scope.streams.2);
991
992                builder.connect(node.output, scope.terminate);
993            }
994        };
995
996        let blocking_injection_workflow = context.spawn_workflow(make_workflow(parse_blocking_srv));
997        validate_dynamically_named_streams(blocking_injection_workflow, &mut context);
998
999        let async_injection_workflow = context.spawn_workflow(make_workflow(parse_async_srv));
1000        validate_dynamically_named_streams(async_injection_workflow, &mut context);
1001
1002        let continuous_injection_workflow =
1003            context.spawn_workflow(make_workflow(parse_continuous_srv));
1004        validate_dynamically_named_streams(continuous_injection_workflow, &mut context);
1005
1006        let nested_workflow =
1007            context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1008                let node = builder.chain(scope.start).then_node(parse_continuous_srv);
1009
1010                builder.connect(node.streams.0, scope.streams.0);
1011                builder.connect(node.streams.1, scope.streams.1);
1012                builder.connect(node.streams.2, scope.streams.2);
1013
1014                builder.connect(node.output, scope.terminate);
1015            });
1016        validate_dynamically_named_streams(nested_workflow, &mut context);
1017
1018        let double_nested_workflow =
1019            context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1020                let node = builder.chain(scope.start).then_node(nested_workflow);
1021
1022                builder.connect(node.streams.0, scope.streams.0);
1023                builder.connect(node.streams.1, scope.streams.1);
1024                builder.connect(node.streams.2, scope.streams.2);
1025
1026                builder.connect(node.output, scope.terminate);
1027            });
1028        validate_dynamically_named_streams(double_nested_workflow, &mut context);
1029
1030        let scoped_workflow =
1031            context.spawn_workflow::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1032                let inner_scope =
1033                    builder.create_scope::<_, _, TestDynamicNamedStreams, _>(|scope, builder| {
1034                        let node = builder.chain(scope.start).then_node(parse_continuous_srv);
1035
1036                        builder.connect(node.streams.0, scope.streams.0);
1037                        builder.connect(node.streams.1, scope.streams.1);
1038                        builder.connect(node.streams.2, scope.streams.2);
1039
1040                        builder.connect(node.output, scope.terminate);
1041                    });
1042
1043                builder.connect(scope.start, inner_scope.input);
1044
1045                builder.connect(inner_scope.streams.0, scope.streams.0);
1046                builder.connect(inner_scope.streams.1, scope.streams.1);
1047                builder.connect(inner_scope.streams.2, scope.streams.2);
1048
1049                builder.connect(inner_scope.output, scope.terminate);
1050            });
1051        validate_dynamically_named_streams(scoped_workflow, &mut context);
1052
1053        // We can do a stream cast for the service-type providers but not for
1054        // the callbacks or maps.
1055        validate_dynamically_named_streams_into_stream_pack(parse_blocking_srv, &mut context);
1056        validate_dynamically_named_streams_into_stream_pack(parse_async_srv, &mut context);
1057        validate_dynamically_named_streams_into_stream_pack(parse_continuous_srv, &mut context);
1058        validate_dynamically_named_streams_into_stream_pack(
1059            blocking_injection_workflow,
1060            &mut context,
1061        );
1062        validate_dynamically_named_streams_into_stream_pack(async_injection_workflow, &mut context);
1063        validate_dynamically_named_streams_into_stream_pack(
1064            continuous_injection_workflow,
1065            &mut context,
1066        );
1067        validate_dynamically_named_streams_into_stream_pack(nested_workflow, &mut context);
1068        validate_dynamically_named_streams_into_stream_pack(double_nested_workflow, &mut context);
1069    }
1070
1071    fn validate_dynamically_named_streams(
1072        provider: impl Provider<Request = NamedInputs, Response = (), Streams = TestDynamicNamedStreams>
1073        + Clone,
1074        context: &mut TestingContext,
1075    ) {
1076        let expected_values_u32 = vec![
1077            NamedValue::new("stream_u32", 5),
1078            NamedValue::new("stream_u32", 10),
1079            NamedValue::new("stream_i32", 12),
1080        ];
1081
1082        let expected_values_i32 = vec![
1083            NamedValue::new("stream_i32", 2),
1084            NamedValue::new("stream_i32", -5),
1085            NamedValue::new("stream_u32", 7),
1086        ];
1087
1088        let expected_values_string = vec![
1089            NamedValue::new("stream_string", "hello".to_owned()),
1090            NamedValue::new("stream_string", "8".to_owned()),
1091            NamedValue::new("stream_u32", "22".to_owned()),
1092        ];
1093
1094        let request = NamedInputs {
1095            values_u32: expected_values_u32.clone(),
1096            values_i32: expected_values_i32.clone(),
1097            values_string: expected_values_string.clone(),
1098        };
1099
1100        let mut capture = context.command(|commands| commands.request(request, provider).capture());
1101
1102        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
1103        assert!(capture.outcome.is_available());
1104        context.assert_no_errors();
1105
1106        let received_values_u32 = collect_received_values(capture.streams.0);
1107        assert_eq!(expected_values_u32, received_values_u32);
1108
1109        let received_values_i32 = collect_received_values(capture.streams.1);
1110        assert_eq!(expected_values_i32, received_values_i32);
1111
1112        let received_values_string = collect_received_values(capture.streams.2);
1113        assert_eq!(expected_values_string, received_values_string);
1114    }
1115
1116    pub fn collect_received_values<T>(mut receiver: crate::Receiver<T>) -> Vec<T> {
1117        let mut result = Vec::new();
1118        while let Ok(value) = receiver.try_recv() {
1119            result.push(value);
1120        }
1121        result
1122    }
1123
1124    fn validate_dynamically_named_streams_into_stream_pack(
1125        provider: Service<NamedInputs, (), TestDynamicNamedStreams>,
1126        context: &mut TestingContext,
1127    ) {
1128        let provider: Service<NamedInputs, (), TestStreamPack> = provider.optional_stream_cast();
1129
1130        let request = NamedInputs {
1131            values_u32: vec![
1132                NamedValue::new("stream_u32", 5),
1133                NamedValue::new("stream_u32", 10),
1134                // This won't appear because its name isn't being listened for
1135                // for this value type
1136                NamedValue::new("stream_i32", 12),
1137            ],
1138            values_i32: vec![
1139                NamedValue::new("stream_i32", 2),
1140                NamedValue::new("stream_i32", -5),
1141                // This won't appear because its name isn't being listened for
1142                // for this value type
1143                NamedValue::new("stream_u32", 7),
1144            ],
1145            values_string: vec![
1146                NamedValue::new("stream_string", "hello".to_owned()),
1147                NamedValue::new("stream_string", "8".to_owned()),
1148                // This won't appear because its named isn't being listened for
1149                // for this value type
1150                NamedValue::new("stream_u32", "22".to_owned()),
1151            ],
1152        };
1153
1154        let mut capture = context.command(|commands| commands.request(request, provider).capture());
1155
1156        context.run_with_conditions(&mut capture.outcome, Duration::from_secs(2));
1157        assert!(capture.outcome.is_available());
1158        assert!(context.no_unhandled_errors());
1159
1160        let outcome: StreamMapOutcome = capture.try_into().unwrap();
1161        assert_eq!(outcome.stream_u32, [5, 10]);
1162        assert_eq!(outcome.stream_i32, [2, -5]);
1163        assert_eq!(outcome.stream_string, ["hello", "8"]);
1164    }
1165
1166    fn impl_dynamically_named_streams_blocking(
1167        request: NamedInputs,
1168        streams: <TestDynamicNamedStreams as StreamPack>::StreamBuffers,
1169    ) {
1170        for nv in request.values_u32 {
1171            streams.0.send(nv);
1172        }
1173
1174        for nv in request.values_i32 {
1175            streams.1.send(nv);
1176        }
1177
1178        for nv in request.values_string {
1179            streams.2.send(nv);
1180        }
1181    }
1182
1183    fn impl_dynamically_named_streams_async(
1184        request: NamedInputs,
1185        streams: <TestDynamicNamedStreams as StreamPack>::StreamChannels,
1186    ) {
1187        for nv in request.values_u32 {
1188            streams.0.send(nv);
1189        }
1190
1191        for nv in request.values_i32 {
1192            streams.1.send(nv);
1193        }
1194
1195        for nv in request.values_string {
1196            streams.2.send(nv);
1197        }
1198    }
1199
1200    fn impl_dynamically_named_streams_continuous(
1201        In(ContinuousService { key }): In<
1202            ContinuousService<NamedInputs, (), TestDynamicNamedStreams>,
1203        >,
1204        mut param: ContinuousQuery<NamedInputs, (), TestDynamicNamedStreams>,
1205    ) {
1206        param.get_mut(&key).unwrap().for_each(|order| {
1207            for nv in order.request().values_u32.iter() {
1208                order.streams().0.send(nv.clone());
1209            }
1210
1211            for nv in order.request().values_i32.iter() {
1212                order.streams().1.send(nv.clone());
1213            }
1214
1215            for nv in order.request().values_string.iter() {
1216                order.streams().2.send(nv.clone());
1217            }
1218
1219            order.respond(());
1220        });
1221    }
1222
1223    struct NamedInputs {
1224        values_u32: Vec<NamedValue<u32>>,
1225        values_i32: Vec<NamedValue<i32>>,
1226        values_string: Vec<NamedValue<String>>,
1227    }
1228
1229    #[derive(thiserror::Error, Debug)]
1230    #[error("received unknown name: {name}")]
1231    struct UnknownName {
1232        name: Cow<'static, str>,
1233    }
1234
1235    #[derive(StreamPack)]
1236    pub(crate) struct TestStreamPack {
1237        stream_u32: u32,
1238        stream_i32: i32,
1239        stream_string: String,
1240    }
1241}