1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
/*
* Copyright (C) 2024 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
use bevy_ecs::{
prelude::{Bundle, Component, Entity},
world::{EntityRef, EntityWorldMut, World, Command},
};
use smallvec::SmallVec;
use backtrace::Backtrace;
use crate::{
Broken, BufferStorage, Cancel, Cancellation, CancellationCause, DeferredRoster, OperationError,
OperationRoster, OrBroken, SessionStatus, UnusedTarget,
};
/// This contains data that has been provided as input into an operation, along
/// with an indication of what session the data belongs to.
pub struct Input<T> {
pub session: Entity,
pub data: T,
}
/// General purpose input storage used by most [operations](crate::Operation).
/// This component is inserted on the source entity of the operation and will
/// queue up inputs that have arrived for the source.
#[derive(Component)]
pub(crate) struct InputStorage<T> {
// Items will be inserted into this queue from the front, so we pop off the
// back to get the oldest items out.
// TODO(@mxgrey): Consider if it's worth implementing a Deque on top of
// the SmallVec data structure.
reverse_queue: SmallVec<[Input<T>; 16]>,
}
impl<T> InputStorage<T> {
pub fn new() -> Self {
Self {
reverse_queue: Default::default(),
}
}
pub fn contains_session(&self, session: Entity) -> bool {
self.reverse_queue
.iter()
.any(|input| input.session == session)
}
}
impl<T> Default for InputStorage<T> {
fn default() -> Self {
Self::new()
}
}
#[derive(Bundle)]
pub struct InputBundle<T: 'static + Send + Sync> {
storage: InputStorage<T>,
}
impl<T: 'static + Send + Sync> InputBundle<T> {
pub fn new() -> Self {
Self {
storage: Default::default(),
}
}
}
impl<T: 'static + Send + Sync> Default for InputBundle<T> {
fn default() -> Self {
Self::new()
}
}
pub trait ManageInput {
/// Give an input to this node. The node will be queued up to immediately
/// process the input.
fn give_input<T: 'static + Send + Sync>(
&mut self,
session: Entity,
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError>;
/// Same as [`Self::give_input`], but the wakeup for this node will be
/// deferred until after the async updates are flushed. This is used for
/// async task output to ensure that all async operations, such as streams,
/// are finished being processed before the final output gets processed.
fn defer_input<T: 'static + Send + Sync>(
&mut self,
session: Entity,
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError>;
/// Give an input to this node without flagging it in the roster. This
/// should not generally be used. It's only for special cases where we know
/// the node will be manually run after giving this input. It's marked
/// unsafe to bring attention to this requirement.
///
/// # Safety
///
/// After calling this function you must make sure to either add the target
/// operation to the queue or run the operation explicitly. Failing to do
/// one of these could mean that this input (or one that follows it) will
/// never be processed, which could cause a workflow to hang forever.
unsafe fn sneak_input<T: 'static + Send + Sync>(
&mut self,
session: Entity,
data: T,
only_if_active: bool,
) -> Result<bool, OperationError>;
/// Get an input that is ready to be taken, or else produce an error.
fn take_input<T: 'static + Send + Sync>(&mut self) -> Result<Input<T>, OperationError>;
/// Try to take an input if one is ready. If no input is ready this will
/// return Ok(None). It only returns an error if the node is broken.
fn try_take_input<T: 'static + Send + Sync>(
&mut self,
) -> Result<Option<Input<T>>, OperationError>;
fn cleanup_inputs<T: 'static + Send + Sync>(&mut self, session: Entity);
}
pub trait InspectInput {
fn has_input<T: 'static + Send + Sync>(&self, session: Entity) -> Result<bool, OperationError>;
}
impl<'w> ManageInput for EntityWorldMut<'w> {
fn give_input<T: 'static + Send + Sync>(
&mut self,
session: Entity,
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError> {
if unsafe { self.sneak_input(session, data, true)? } {
roster.queue(self.id());
}
Ok(())
}
fn defer_input<T: 'static + Send + Sync>(
&mut self,
session: Entity,
data: T,
roster: &mut OperationRoster,
) -> Result<(), OperationError> {
if unsafe { self.sneak_input(session, data, true)? } {
roster.defer(self.id());
}
Ok(())
}
unsafe fn sneak_input<T: 'static + Send + Sync>(
&mut self,
session: Entity,
data: T,
only_if_active: bool,
) -> Result<bool, OperationError> {
if only_if_active {
let active_session =
if let Some(session_status) = self.world().get::<SessionStatus>(session) {
matches!(session_status, SessionStatus::Active)
} else {
false
};
if !active_session {
// The session being sent is not active, either it is being cleaned
// or already despawned. Therefore we should not propogate any inputs
// related to it.
return Ok(false);
}
}
if let Some(mut storage) = self.get_mut::<InputStorage<T>>() {
storage.reverse_queue.insert(0, Input { session, data });
} else if !self.contains::<UnusedTarget>() {
// If the input is being fed to an unused target then we can
// generally ignore it, although it may indicate a bug in the user's
// workflow because workflow branches that end in an unused target
// will be spuriously dropped when the scope terminates.
// However in this case, the target is not unused but also does not
// have the correct input storage type. This indicates
None.or_broken()?;
}
Ok(true)
}
fn take_input<T: 'static + Send + Sync>(&mut self) -> Result<Input<T>, OperationError> {
self.try_take_input()?.or_not_ready()
}
fn try_take_input<T: 'static + Send + Sync>(
&mut self,
) -> Result<Option<Input<T>>, OperationError> {
let mut storage = self.get_mut::<InputStorage<T>>().or_broken()?;
Ok(storage.reverse_queue.pop())
}
fn cleanup_inputs<T: 'static + Send + Sync>(&mut self, session: Entity) {
if self.contains::<BufferStorage<T>>() {
// Buffers are handled in a special way because the data of some
// buffers will be used during cancellation. Therefore we do not
// want to just delete their contents, but instead store them in the
// buffer storage until the scope gives the signal to clear all
// buffer data after all the cancellation workflows are finished.
if let Some(mut inputs) = self.get_mut::<InputStorage<T>>() {
// Pull out only the data that
let remaining_indices: SmallVec<[usize; 16]> = inputs
.reverse_queue
.iter()
.enumerate()
.filter_map(|(i, input)| {
if input.session == session {
Some(i)
} else {
None
}
})
.collect();
let mut reverse_remaining: SmallVec<[T; 16]> = SmallVec::new();
for i in remaining_indices.into_iter().rev() {
reverse_remaining.push(inputs.reverse_queue.remove(i).data);
}
// INVARIANT: Earlier in this function we checked that the
// entity contains this component, and we have not removed it
// since then.
let mut buffer = self.get_mut::<BufferStorage<T>>().unwrap();
for data in reverse_remaining.into_iter().rev() {
buffer.force_push(session, data);
}
}
return;
}
if let Some(mut inputs) = self.get_mut::<InputStorage<T>>() {
inputs
.reverse_queue
.retain(|Input { session: r, .. }| *r != session);
}
}
}
impl<'a> InspectInput for EntityWorldMut<'a> {
fn has_input<T: 'static + Send + Sync>(&self, session: Entity) -> Result<bool, OperationError> {
let inputs = self.get::<InputStorage<T>>().or_broken()?;
Ok(inputs.contains_session(session))
}
}
impl<'a> InspectInput for EntityRef<'a> {
fn has_input<T: 'static + Send + Sync>(&self, session: Entity) -> Result<bool, OperationError> {
let inputs = self.get::<InputStorage<T>>().or_broken()?;
Ok(inputs.contains_session(session))
}
}
pub(crate) struct InputCommand<T> {
pub(crate) target: Entity,
pub(crate) session: Entity,
pub(crate) data: T,
}
impl<T: 'static + Send + Sync> Command for InputCommand<T> {
fn apply(self, world: &mut World) {
match world.get_mut::<InputStorage<T>>(self.target) {
Some(mut storage) => {
storage.reverse_queue.insert(
0,
Input {
session: self.session,
data: self.data,
},
);
world
.get_resource_or_insert_with(DeferredRoster::default)
.queue(self.target);
}
None => {
let cause = CancellationCause::Broken(Broken {
node: self.target,
backtrace: Some(Backtrace::new()),
});
let cancel = Cancel {
origin: self.target,
target: self.session,
session: Some(self.session),
cancellation: Cancellation::from_cause(cause),
};
world
.get_resource_or_insert_with(DeferredRoster::default)
.cancel(cancel);
}
}
}
}