1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
/*
* Copyright (C) 2024 Open Source Robotics Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
use bevy_derive::{Deref, DerefMut};
use bevy_ecs::{
prelude::*,
schedule::{IntoScheduleConfigs, ScheduleConfigs},
system::{ScheduleSystem, SystemState},
};
use smallvec::SmallVec;
use anyhow::anyhow;
use backtrace::Backtrace;
use std::sync::Arc;
use crate::{
AddExecution, ChannelQueue, Detached, DisposalNotice, Finished, FlushWarning,
MiscellaneousFailure, OperationError, OperationRequest, OperationRoster,
SeriesLifecycleChannel, ServiceHook, ServiceLifecycle, ServiceLifecycleChannel,
UnhandledErrors, UnusedTarget, UnusedTargetDrop, ValidateScopeReachability, ValidationRequest,
WakeQueue, awaken_task, dispose_for_despawned_service, execute_operation,
};
#[cfg(feature = "single_threaded_async")]
use crate::async_execution::SingleThreadedExecution;
#[derive(Resource, Default, Clone, Copy)]
pub struct FlushParameters {
/// By default, a flush will loop until the whole [`OperationRoster`] is empty.
/// If there are loops of blocking services then it is possible for the flush
/// to loop indefinitely, creating the appearance of a blockup in the
/// application, or delaying other systems from running for a prolonged amount
/// of time.
///
/// Use this limit to prevent the flush from blocking for too long in those
/// scenarios. If the flush loops beyond this limit, anything remaining in
/// the roster will be moved into a deferred roster which will be processed
/// during the next flush.
///
/// A value of `None` means the flush can loop indefinitely (this is the default).
pub flush_loop_limit: Option<usize>,
/// When using the single_threaded_async feature, async futures get polled
/// during the execution flush. If async futures repeatedly spawn more async
/// tasks then the flush could get stuck in a loop. This parameter lets you
/// put a limit on how many futures get polled so that the flush cannot get
/// stuck.
///
/// A value of `None` means the futures can be polled indefinitely (this is the default).
pub single_threaded_poll_limit: Option<usize>,
/// How many items can be received from the channel in one flush. The channel
/// is implemented as unbounded, so if something is pushing commands to the
/// channel faster than they can be processed, the flush will appear to be
/// hanging.
pub channel_received_limit: Option<usize>,
}
impl FlushParameters {
/// A set of options to prevent the activity flush system from getting hung
/// up on a flood of commands.
pub fn avoid_hanging() -> Self {
Self {
flush_loop_limit: Some(10),
single_threaded_poll_limit: Some(100),
channel_received_limit: Some(100),
}
}
}
pub fn flush_execution() -> ScheduleConfigs<ScheduleSystem> {
flush_execution_impl.into_configs()
}
fn flush_execution_impl(
world: &mut World,
new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
) {
let parameters = *world.get_resource_or_insert_with(FlushParameters::default);
let mut roster = OperationRoster::new();
collect_from_channels(¶meters, new_service_query, world, &mut roster);
let mut loop_count = 0;
while !roster.is_empty() {
for e in roster.deferred_despawn.drain(..) {
if let Ok(e_mut) = world.get_entity_mut(e) {
e_mut.despawn();
}
}
let parameters = *world.get_resource_or_insert_with(FlushParameters::default);
let flush_loop_limit = parameters.flush_loop_limit;
if let Some(limit) = flush_loop_limit {
if limit <= loop_count {
// We have looped beyoond the limit, so we will defer anything that
// remains in the roster and stop looping from here.
world
.get_resource_or_insert_with(DeferredRoster::default)
.append(&mut roster);
world
.get_resource_or_insert_with(UnhandledErrors::default)
.flush_warnings
.push(FlushWarning::ExceededFlushLoopLimit {
limit,
reached: loop_count,
});
break;
}
}
loop_count += 1;
garbage_cleanup(world, &mut roster);
while let Some(unblock) = roster.unblock.pop_front() {
let serve_next = unblock.serve_next;
serve_next(unblock, world, &mut roster);
garbage_cleanup(world, &mut roster);
}
while let Some(source) = roster.queue.pop_front() {
execute_operation(OperationRequest {
source,
world,
roster: &mut roster,
});
garbage_cleanup(world, &mut roster);
loop_count += 1;
if let Some(limit) = flush_loop_limit {
if limit <= loop_count {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.flush_warnings
.push(FlushWarning::ExceededFlushLoopLimit {
limit,
reached: loop_count,
});
break;
}
}
}
while let Some(source) = roster.awake.pop_front() {
awaken_task(OperationRequest {
source,
world,
roster: &mut roster,
});
garbage_cleanup(world, &mut roster);
}
collect_from_channels(¶meters, new_service_query, world, &mut roster);
}
}
fn garbage_cleanup(world: &mut World, roster: &mut OperationRoster) {
while let Some(cleanup) = roster.cleanup_finished.pop() {
cleanup.trigger(world, roster);
}
while let Some(cancel) = roster.cancel.pop_front() {
cancel.trigger(world, roster);
}
}
fn collect_from_channels(
parameters: &FlushParameters,
new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
world: &mut World,
roster: &mut OperationRoster,
) {
// Get the receiver for async task commands
let mut received_count = 0;
while let Ok(item) = world
.get_resource_or_insert_with(ChannelQueue::new)
.receiver
.try_recv()
{
(item)(world, roster);
if let Some(limit) = parameters.channel_received_limit {
if limit <= received_count {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.flush_warnings
.push(FlushWarning::ExceededChannelReceivedLimit {
limit,
reached: received_count,
});
break;
}
}
received_count += 1;
}
roster.process_deferals();
let mut removed_services: SmallVec<[Entity; 8]> = SmallVec::new();
world.get_resource_or_insert_with(ServiceLifecycleChannel::new);
world.resource_scope::<ServiceLifecycleChannel, ()>(|world, mut lifecycles| {
// Clean up the dangling requests of any services that have been despawned.
while let Ok(removed_service) = lifecycles.receiver.try_recv() {
removed_services.push(removed_service);
}
// Add a lifecycle tracker to any new services that might have shown up
// TODO(@mxgrey): Make sure this works for services which are spawned by
// providers that are being flushed.
for (e, mut hook) in new_service_query.iter_mut(world) {
if hook.lifecycle.is_none() {
// Check if the lifecycle is none, because collect_from_channels
// can be run multiple times per flush, in which case we will
// iterate over the query again, and end up dropping the lifecycle
// managers that we just created. When that happens, the service
// gets treated as despawned prematurely.
hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
}
}
});
for removed_service in removed_services {
dispose_for_despawned_service(removed_service, world, roster);
}
// Queue any operations that needed to be deferred
let mut deferred = world.get_resource_or_insert_with(DeferredRoster::default);
roster.append(&mut deferred);
// Collect any tasks that are ready to be woken
let mut wake_queue = world.get_resource_or_insert_with(WakeQueue::new);
while let Ok(wakeable) = wake_queue.receiver.try_recv() {
roster.awake(wakeable);
}
let mut unused_targets_state: SystemState<Query<(Entity, &Detached), With<UnusedTarget>>> =
SystemState::new(world);
let mut add_finish: SmallVec<[_; 8]> = SmallVec::new();
let mut drop_targets: SmallVec<[_; 8]> = SmallVec::new();
for (e, detached) in unused_targets_state.get(world).iter() {
if detached.is_detached() {
add_finish.push(e);
} else {
drop_targets.push(e);
}
}
for e in add_finish {
// Add a Finished execution to the unused target of a detached series.
AddExecution::new(None, e, Finished).apply(world);
}
for target in drop_targets.drain(..) {
drop_target(target, world, roster, true);
}
let mut lifecycles = world.get_resource_or_insert_with(SeriesLifecycleChannel::default);
while let Ok(dropped_target) = lifecycles.receiver.try_recv() {
drop_targets.push(dropped_target);
}
for target in drop_targets {
drop_target(target, world, roster, false);
}
while let Some(DisposalNotice {
source,
origin,
session,
}) = roster.disposed.pop()
{
let Some(validate) = world.get::<ValidateScopeReachability>(source) else {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.miscellaneous
.push(MiscellaneousFailure {
error: Arc::new(anyhow!(
"Scope {source:?} for disposal notification does not \
have validation component",
)),
backtrace: Some(Backtrace::new()),
});
continue;
};
let validate = validate.0;
let req = ValidationRequest {
source,
origin,
session,
world,
roster,
};
if let Err(OperationError::Broken(backtrace)) = validate(req) {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.miscellaneous
.push(MiscellaneousFailure {
error: Arc::new(anyhow!(
"Scope {source:?} broken while validating a disposal"
)),
backtrace,
});
}
}
#[cfg(feature = "single_threaded_async")]
SingleThreadedExecution::world_poll(world, parameters.single_threaded_poll_limit);
}
fn drop_target(target: Entity, world: &mut World, roster: &mut OperationRoster, unused: bool) {
roster.purge(target);
let mut dropped_series = Vec::new();
let mut detached_series = None;
let mut execution = target;
let mut search_state: SystemState<(Query<&Children>, Query<&Detached>)> =
SystemState::new(world);
let (q_children, q_detached) = search_state.get(world);
loop {
let mut move_up_chain = false;
if let Ok(children) = q_children.get(execution) {
for child in children {
let Ok(detached) = q_detached.get(*child) else {
continue;
};
if detached.is_detached() {
// This child is detached so we will not include it in the
// dropped series. We need to de-parent it so that it does
// not get despawned with the rest of the series that we
// are dropping.
detached_series = Some(*child);
break;
} else {
// This child is not detached, so we will include it in our
// dropped series, and crawl towards one of it children.
if unused {
dropped_series.push(execution);
}
roster.purge(execution);
move_up_chain = true;
execution = *child;
continue;
}
}
}
if !move_up_chain {
// There is nothing further to include in the drop
break;
}
}
if let Some(detached_series) = detached_series {
if let Ok(mut detached_series_mut) = world.get_entity_mut(detached_series) {
detached_series_mut.remove::<ChildOf>();
}
}
if let Ok(unused_target_mut) = world.get_entity_mut(target) {
unused_target_mut.despawn();
}
if unused {
world
.get_resource_or_insert_with(UnhandledErrors::default)
.unused_targets
.push(UnusedTargetDrop {
unused_target: target,
dropped_series,
});
}
}
/// This resource is used to queue up operations in the roster in situations
/// where the regular roster is not available.
#[derive(Resource, Default, Deref, DerefMut)]
pub(crate) struct DeferredRoster(pub OperationRoster);