1use bevy_ecs::{
19 prelude::{ChildOf, Commands, Entity, World},
20 system::Command,
21};
22
23pub use tokio::sync::mpsc::UnboundedReceiver as Receiver;
24use tokio::sync::mpsc::unbounded_channel;
25
26use std::{rc::Rc, sync::Arc};
27
28use crate::{
29 AddExecution, AddOperation, AnonymousStreamRedirect, Builder, DefaultStreamBufferContainer,
30 DeferredRoster, InnerChannel, InputSlot, OperationError, OperationResult, OperationRoster,
31 OrBroken, Output, Push, RedirectScopeStream, RedirectWorkflowStream, ReportUnhandled,
32 SingleInputStorage, StreamAvailability, StreamBuffer, StreamChannel, StreamEffect, StreamPack,
33 StreamRequest, StreamTargetMap, TakenStream, UnusedStreams, UnusedTarget,
34 dyn_node::{DynStreamInputPack, DynStreamOutputPack},
35};
36
37pub struct AnonymousStream<S: StreamEffect>(std::marker::PhantomData<fn(S)>);
44
45impl<S: StreamEffect> StreamEffect for AnonymousStream<S> {
46 type Input = S::Input;
47 type Output = S::Output;
48 fn side_effect(
49 input: Self::Input,
50 request: &mut StreamRequest,
51 ) -> Result<Self::Output, OperationError> {
52 S::side_effect(input, request)
53 }
54}
55
56impl<S: StreamEffect> StreamPack for AnonymousStream<S> {
57 type StreamInputPack = InputSlot<S::Input>;
58 type StreamOutputPack = Output<S::Output>;
59 type StreamReceivers = Receiver<S::Output>;
60 type StreamChannels = StreamChannel<S>;
61 type StreamBuffers = StreamBuffer<S::Input>;
62
63 fn spawn_scope_streams(
64 in_scope: Entity,
65 out_scope: Entity,
66 commands: &mut Commands,
67 ) -> (InputSlot<S::Input>, Output<S::Output>) {
68 let source = commands.spawn(()).id();
69 let target = commands.spawn(UnusedTarget).id();
70 commands.queue(AddOperation::new(
71 Some(in_scope),
72 source,
73 RedirectScopeStream::<Self>::new(target),
74 ));
75
76 (
77 InputSlot::new(in_scope, source),
78 Output::new(out_scope, target),
79 )
80 }
81
82 fn spawn_workflow_streams(builder: &mut Builder) -> InputSlot<S::Input> {
83 let source = builder.commands.spawn(()).id();
84 builder.commands.queue(AddOperation::new(
85 Some(builder.scope()),
86 source,
87 RedirectWorkflowStream::new(AnonymousStreamRedirect::<S>::new(None)),
88 ));
89 InputSlot::new(builder.scope(), source)
90 }
91
92 fn spawn_node_streams(
93 source: Entity,
94 map: &mut StreamTargetMap,
95 builder: &mut Builder,
96 ) -> Output<S::Output> {
97 let target = builder
98 .commands
99 .spawn((SingleInputStorage::new(source), UnusedTarget))
100 .id();
101
102 map.add_anonymous::<S::Output>(target, builder.commands());
103 Output::new(builder.scope(), target)
104 }
105
106 fn take_streams(
107 source: Entity,
108 map: &mut StreamTargetMap,
109 commands: &mut Commands,
110 ) -> Receiver<S::Output> {
111 let (sender, receiver) = unbounded_channel::<S::Output>();
112 let target = commands
113 .spawn(())
114 .insert(ChildOf(source))
117 .id();
118
119 map.add_anonymous::<S::Output>(target, commands);
120 commands.queue(AddExecution::new(None, target, TakenStream::new(sender)));
121
122 receiver
123 }
124
125 fn collect_streams(
126 source: Entity,
127 target: Entity,
128 map: &mut StreamTargetMap,
129 commands: &mut Commands,
130 ) {
131 let redirect = commands.spawn(()).insert(ChildOf(source)).id();
132 commands.queue(AddExecution::new(
133 None,
134 redirect,
135 Push::<S::Output>::new(target, true),
136 ));
137 map.add_anonymous::<S::Output>(redirect, commands);
138 }
139
140 fn make_stream_channels(inner: &Arc<InnerChannel>, world: &World) -> Self::StreamChannels {
141 let target = world
142 .get::<StreamTargetMap>(inner.source())
143 .and_then(|t| t.get_anonymous::<S::Output>());
144 StreamChannel::new(target, Arc::clone(inner))
145 }
146
147 fn make_stream_buffers(target_map: Option<&StreamTargetMap>) -> StreamBuffer<S::Input> {
148 let target = target_map.and_then(|map| map.get_anonymous::<S::Output>());
149
150 StreamBuffer {
151 container: Default::default(),
152 target,
153 }
154 }
155
156 fn process_stream_buffers(
157 buffer: Self::StreamBuffers,
158 source: Entity,
159 session: Entity,
160 unused: &mut UnusedStreams,
161 world: &mut World,
162 roster: &mut OperationRoster,
163 ) -> OperationResult {
164 let target = buffer.target;
165 let mut was_unused = true;
166 for data in Rc::into_inner(buffer.container)
167 .or_broken()?
168 .into_inner()
169 .into_iter()
170 {
171 was_unused = false;
172 let mut request = StreamRequest {
173 source,
174 session,
175 target,
176 world,
177 roster,
178 };
179
180 Self::side_effect(data, &mut request)
181 .and_then(|output| request.send_output(output))
182 .report_unhandled(source, world);
183 }
184
185 if was_unused {
186 unused.streams.push(std::any::type_name::<Self>());
187 }
188
189 Ok(())
190 }
191
192 fn defer_buffers(
193 buffer: Self::StreamBuffers,
194 source: Entity,
195 session: Entity,
196 commands: &mut Commands,
197 ) {
198 commands.queue(SendAnonymousStreams::<
199 S,
200 DefaultStreamBufferContainer<S::Input>,
201 >::new(
202 buffer.container.take(), source, session, buffer.target
203 ));
204 }
205
206 fn set_stream_availability(availability: &mut StreamAvailability) {
207 availability.add_anonymous::<S::Output>();
208 }
209
210 fn are_streams_available(availability: &StreamAvailability) -> bool {
211 availability.has_anonymous::<S::Output>()
212 }
213
214 fn into_dyn_stream_input_pack(pack: &mut DynStreamInputPack, inputs: Self::StreamInputPack) {
215 pack.add_anonymous(inputs);
216 }
217
218 fn into_dyn_stream_output_pack(
219 pack: &mut DynStreamOutputPack,
220 outputs: Self::StreamOutputPack,
221 ) {
222 pack.add_anonymous(outputs);
223 }
224
225 fn has_streams() -> bool {
226 true
227 }
228}
229
230pub struct SendAnonymousStreams<S, Container> {
231 container: Container,
232 source: Entity,
233 session: Entity,
234 target: Option<Entity>,
235 _ignore: std::marker::PhantomData<fn(S)>,
236}
237
238impl<S, Container> SendAnonymousStreams<S, Container> {
239 pub fn new(
240 container: Container,
241 source: Entity,
242 session: Entity,
243 target: Option<Entity>,
244 ) -> Self {
245 Self {
246 container,
247 source,
248 session,
249 target,
250 _ignore: Default::default(),
251 }
252 }
253}
254
255impl<S, Container> Command for SendAnonymousStreams<S, Container>
256where
257 S: StreamEffect,
258 Container: 'static + Send + Sync + IntoIterator<Item = S::Input>,
259{
260 fn apply(self, world: &mut World) {
261 world.get_resource_or_insert_with(DeferredRoster::default);
262 world.resource_scope::<DeferredRoster, _>(|world, mut deferred| {
263 for data in self.container {
264 let mut request = StreamRequest {
265 source: self.source,
266 session: self.session,
267 target: self.target,
268 world,
269 roster: &mut deferred,
270 };
271
272 S::side_effect(data, &mut request)
273 .and_then(move |output| request.send_output(output))
274 .report_unhandled(self.source, world);
275 }
276 });
277 }
278}