crossflow/stream/
anonymous_stream.rs

1/*
2 * Copyright (C) 2025 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_ecs::{
19    prelude::{ChildOf, Commands, Entity, World},
20    system::Command,
21};
22
23pub use tokio::sync::mpsc::UnboundedReceiver as Receiver;
24use tokio::sync::mpsc::unbounded_channel;
25
26use std::{rc::Rc, sync::Arc};
27
28use crate::{
29    AddExecution, AddOperation, AnonymousStreamRedirect, Builder, DefaultStreamBufferContainer,
30    DeferredRoster, InnerChannel, InputSlot, OperationError, OperationResult, OperationRoster,
31    OrBroken, Output, Push, RedirectScopeStream, RedirectWorkflowStream, ReportUnhandled,
32    SingleInputStorage, StreamAvailability, StreamBuffer, StreamChannel, StreamEffect, StreamPack,
33    StreamRequest, StreamTargetMap, TakenStream, UnusedStreams, UnusedTarget,
34    dyn_node::{DynStreamInputPack, DynStreamOutputPack},
35};
36
37/// A wrapper to turn any [`StreamEffect`] into an anonymous (unnamed) stream.
38/// This should be used if you want a stream with the same behavior as [`StreamOf`][1]
39/// but with some additional side effect. The input and output data types of the
40/// stream may be different.
41///
42/// [1]: crate::StreamOf
43pub struct AnonymousStream<S: StreamEffect>(std::marker::PhantomData<fn(S)>);
44
45impl<S: StreamEffect> StreamEffect for AnonymousStream<S> {
46    type Input = S::Input;
47    type Output = S::Output;
48    fn side_effect(
49        input: Self::Input,
50        request: &mut StreamRequest,
51    ) -> Result<Self::Output, OperationError> {
52        S::side_effect(input, request)
53    }
54}
55
56impl<S: StreamEffect> StreamPack for AnonymousStream<S> {
57    type StreamInputPack = InputSlot<S::Input>;
58    type StreamOutputPack = Output<S::Output>;
59    type StreamReceivers = Receiver<S::Output>;
60    type StreamChannels = StreamChannel<S>;
61    type StreamBuffers = StreamBuffer<S::Input>;
62
63    fn spawn_scope_streams(
64        in_scope: Entity,
65        out_scope: Entity,
66        commands: &mut Commands,
67    ) -> (InputSlot<S::Input>, Output<S::Output>) {
68        let source = commands.spawn(()).id();
69        let target = commands.spawn(UnusedTarget).id();
70        commands.queue(AddOperation::new(
71            Some(in_scope),
72            source,
73            RedirectScopeStream::<Self>::new(target),
74        ));
75
76        (
77            InputSlot::new(in_scope, source),
78            Output::new(out_scope, target),
79        )
80    }
81
82    fn spawn_workflow_streams(builder: &mut Builder) -> InputSlot<S::Input> {
83        let source = builder.commands.spawn(()).id();
84        builder.commands.queue(AddOperation::new(
85            Some(builder.scope()),
86            source,
87            RedirectWorkflowStream::new(AnonymousStreamRedirect::<S>::new(None)),
88        ));
89        InputSlot::new(builder.scope(), source)
90    }
91
92    fn spawn_node_streams(
93        source: Entity,
94        map: &mut StreamTargetMap,
95        builder: &mut Builder,
96    ) -> Output<S::Output> {
97        let target = builder
98            .commands
99            .spawn((SingleInputStorage::new(source), UnusedTarget))
100            .id();
101
102        map.add_anonymous::<S::Output>(target, builder.commands());
103        Output::new(builder.scope(), target)
104    }
105
106    fn take_streams(
107        source: Entity,
108        map: &mut StreamTargetMap,
109        commands: &mut Commands,
110    ) -> Receiver<S::Output> {
111        let (sender, receiver) = unbounded_channel::<S::Output>();
112        let target = commands
113            .spawn(())
114            // Set the parent of this stream to be the series so it can be
115            // recursively despawned together.
116            .insert(ChildOf(source))
117            .id();
118
119        map.add_anonymous::<S::Output>(target, commands);
120        commands.queue(AddExecution::new(None, target, TakenStream::new(sender)));
121
122        receiver
123    }
124
125    fn collect_streams(
126        source: Entity,
127        target: Entity,
128        map: &mut StreamTargetMap,
129        commands: &mut Commands,
130    ) {
131        let redirect = commands.spawn(()).insert(ChildOf(source)).id();
132        commands.queue(AddExecution::new(
133            None,
134            redirect,
135            Push::<S::Output>::new(target, true),
136        ));
137        map.add_anonymous::<S::Output>(redirect, commands);
138    }
139
140    fn make_stream_channels(inner: &Arc<InnerChannel>, world: &World) -> Self::StreamChannels {
141        let target = world
142            .get::<StreamTargetMap>(inner.source())
143            .and_then(|t| t.get_anonymous::<S::Output>());
144        StreamChannel::new(target, Arc::clone(inner))
145    }
146
147    fn make_stream_buffers(target_map: Option<&StreamTargetMap>) -> StreamBuffer<S::Input> {
148        let target = target_map.and_then(|map| map.get_anonymous::<S::Output>());
149
150        StreamBuffer {
151            container: Default::default(),
152            target,
153        }
154    }
155
156    fn process_stream_buffers(
157        buffer: Self::StreamBuffers,
158        source: Entity,
159        session: Entity,
160        unused: &mut UnusedStreams,
161        world: &mut World,
162        roster: &mut OperationRoster,
163    ) -> OperationResult {
164        let target = buffer.target;
165        let mut was_unused = true;
166        for data in Rc::into_inner(buffer.container)
167            .or_broken()?
168            .into_inner()
169            .into_iter()
170        {
171            was_unused = false;
172            let mut request = StreamRequest {
173                source,
174                session,
175                target,
176                world,
177                roster,
178            };
179
180            Self::side_effect(data, &mut request)
181                .and_then(|output| request.send_output(output))
182                .report_unhandled(source, world);
183        }
184
185        if was_unused {
186            unused.streams.push(std::any::type_name::<Self>());
187        }
188
189        Ok(())
190    }
191
192    fn defer_buffers(
193        buffer: Self::StreamBuffers,
194        source: Entity,
195        session: Entity,
196        commands: &mut Commands,
197    ) {
198        commands.queue(SendAnonymousStreams::<
199            S,
200            DefaultStreamBufferContainer<S::Input>,
201        >::new(
202            buffer.container.take(), source, session, buffer.target
203        ));
204    }
205
206    fn set_stream_availability(availability: &mut StreamAvailability) {
207        availability.add_anonymous::<S::Output>();
208    }
209
210    fn are_streams_available(availability: &StreamAvailability) -> bool {
211        availability.has_anonymous::<S::Output>()
212    }
213
214    fn into_dyn_stream_input_pack(pack: &mut DynStreamInputPack, inputs: Self::StreamInputPack) {
215        pack.add_anonymous(inputs);
216    }
217
218    fn into_dyn_stream_output_pack(
219        pack: &mut DynStreamOutputPack,
220        outputs: Self::StreamOutputPack,
221    ) {
222        pack.add_anonymous(outputs);
223    }
224
225    fn has_streams() -> bool {
226        true
227    }
228}
229
230pub struct SendAnonymousStreams<S, Container> {
231    container: Container,
232    source: Entity,
233    session: Entity,
234    target: Option<Entity>,
235    _ignore: std::marker::PhantomData<fn(S)>,
236}
237
238impl<S, Container> SendAnonymousStreams<S, Container> {
239    pub fn new(
240        container: Container,
241        source: Entity,
242        session: Entity,
243        target: Option<Entity>,
244    ) -> Self {
245        Self {
246            container,
247            source,
248            session,
249            target,
250            _ignore: Default::default(),
251        }
252    }
253}
254
255impl<S, Container> Command for SendAnonymousStreams<S, Container>
256where
257    S: StreamEffect,
258    Container: 'static + Send + Sync + IntoIterator<Item = S::Input>,
259{
260    fn apply(self, world: &mut World) {
261        world.get_resource_or_insert_with(DeferredRoster::default);
262        world.resource_scope::<DeferredRoster, _>(|world, mut deferred| {
263            for data in self.container {
264                let mut request = StreamRequest {
265                    source: self.source,
266                    session: self.session,
267                    target: self.target,
268                    world,
269                    roster: &mut deferred,
270                };
271
272                S::side_effect(data, &mut request)
273                    .and_then(move |output| request.send_output(output))
274                    .report_unhandled(self.source, world);
275            }
276        });
277    }
278}