1use bevy_derive::{Deref, DerefMut};
19use bevy_ecs::{
20 prelude::*,
21 schedule::{IntoScheduleConfigs, ScheduleConfigs},
22 system::{ScheduleSystem, SystemState},
23};
24
25use smallvec::SmallVec;
26
27use anyhow::anyhow;
28
29use backtrace::Backtrace;
30
31use std::sync::Arc;
32
33use crate::{
34 AddExecution, ChannelQueue, Detached, DisposalNotice, Finished, FlushWarning,
35 MiscellaneousFailure, OperationError, OperationRequest, OperationRoster,
36 SeriesLifecycleChannel, ServiceHook, ServiceLifecycle, ServiceLifecycleChannel,
37 UnhandledErrors, UnusedTarget, UnusedTargetDrop, ValidateScopeReachability, ValidationRequest,
38 WakeQueue, awaken_task, dispose_for_despawned_service, execute_operation,
39};
40
41#[cfg(feature = "single_threaded_async")]
42use crate::async_execution::SingleThreadedExecution;
43
44#[derive(Resource, Default, Clone, Copy)]
45pub struct FlushParameters {
46 pub flush_loop_limit: Option<usize>,
59 pub single_threaded_poll_limit: Option<usize>,
67 pub channel_received_limit: Option<usize>,
72}
73
74impl FlushParameters {
75 pub fn avoid_hanging() -> Self {
78 Self {
79 flush_loop_limit: Some(10),
80 single_threaded_poll_limit: Some(100),
81 channel_received_limit: Some(100),
82 }
83 }
84}
85
86pub fn flush_execution() -> ScheduleConfigs<ScheduleSystem> {
87 flush_execution_impl.into_configs()
88}
89
90fn flush_execution_impl(
91 world: &mut World,
92 new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
93) {
94 let parameters = *world.get_resource_or_insert_with(FlushParameters::default);
95 let mut roster = OperationRoster::new();
96 collect_from_channels(¶meters, new_service_query, world, &mut roster);
97
98 let mut loop_count = 0;
99 while !roster.is_empty() {
100 for e in roster.deferred_despawn.drain(..) {
101 if let Ok(e_mut) = world.get_entity_mut(e) {
102 e_mut.despawn();
103 }
104 }
105
106 let parameters = *world.get_resource_or_insert_with(FlushParameters::default);
107 let flush_loop_limit = parameters.flush_loop_limit;
108 if let Some(limit) = flush_loop_limit {
109 if limit <= loop_count {
110 world
113 .get_resource_or_insert_with(DeferredRoster::default)
114 .append(&mut roster);
115
116 world
117 .get_resource_or_insert_with(UnhandledErrors::default)
118 .flush_warnings
119 .push(FlushWarning::ExceededFlushLoopLimit {
120 limit,
121 reached: loop_count,
122 });
123
124 break;
125 }
126 }
127
128 loop_count += 1;
129
130 garbage_cleanup(world, &mut roster);
131
132 while let Some(unblock) = roster.unblock.pop_front() {
133 let serve_next = unblock.serve_next;
134 serve_next(unblock, world, &mut roster);
135 garbage_cleanup(world, &mut roster);
136 }
137
138 while let Some(source) = roster.queue.pop_front() {
139 execute_operation(OperationRequest {
140 source,
141 world,
142 roster: &mut roster,
143 });
144 garbage_cleanup(world, &mut roster);
145 loop_count += 1;
146 if let Some(limit) = flush_loop_limit {
147 if limit <= loop_count {
148 world
149 .get_resource_or_insert_with(UnhandledErrors::default)
150 .flush_warnings
151 .push(FlushWarning::ExceededFlushLoopLimit {
152 limit,
153 reached: loop_count,
154 });
155 break;
156 }
157 }
158 }
159
160 while let Some(source) = roster.awake.pop_front() {
161 awaken_task(OperationRequest {
162 source,
163 world,
164 roster: &mut roster,
165 });
166 garbage_cleanup(world, &mut roster);
167 }
168
169 collect_from_channels(¶meters, new_service_query, world, &mut roster);
170 }
171}
172
173fn garbage_cleanup(world: &mut World, roster: &mut OperationRoster) {
174 while let Some(cleanup) = roster.cleanup_finished.pop() {
175 cleanup.trigger(world, roster);
176 }
177
178 while let Some(cancel) = roster.cancel.pop_front() {
179 cancel.trigger(world, roster);
180 }
181}
182
183fn collect_from_channels(
184 parameters: &FlushParameters,
185 new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
186 world: &mut World,
187 roster: &mut OperationRoster,
188) {
189 let mut received_count = 0;
191 while let Ok(item) = world
192 .get_resource_or_insert_with(ChannelQueue::new)
193 .receiver
194 .try_recv()
195 {
196 (item)(world, roster);
197 if let Some(limit) = parameters.channel_received_limit {
198 if limit <= received_count {
199 world
200 .get_resource_or_insert_with(UnhandledErrors::default)
201 .flush_warnings
202 .push(FlushWarning::ExceededChannelReceivedLimit {
203 limit,
204 reached: received_count,
205 });
206 break;
207 }
208 }
209 received_count += 1;
210 }
211
212 roster.process_deferals();
213
214 let mut removed_services: SmallVec<[Entity; 8]> = SmallVec::new();
215 world.get_resource_or_insert_with(ServiceLifecycleChannel::new);
216 world.resource_scope::<ServiceLifecycleChannel, ()>(|world, mut lifecycles| {
217 while let Ok(removed_service) = lifecycles.receiver.try_recv() {
219 removed_services.push(removed_service);
220 }
221
222 for (e, mut hook) in new_service_query.iter_mut(world) {
226 if hook.lifecycle.is_none() {
227 hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
233 }
234 }
235 });
236
237 for removed_service in removed_services {
238 dispose_for_despawned_service(removed_service, world, roster);
239 }
240
241 let mut deferred = world.get_resource_or_insert_with(DeferredRoster::default);
243 roster.append(&mut deferred);
244
245 let mut wake_queue = world.get_resource_or_insert_with(WakeQueue::new);
247 while let Ok(wakeable) = wake_queue.receiver.try_recv() {
248 roster.awake(wakeable);
249 }
250
251 let mut unused_targets_state: SystemState<Query<(Entity, &Detached), With<UnusedTarget>>> =
252 SystemState::new(world);
253
254 let mut add_finish: SmallVec<[_; 8]> = SmallVec::new();
255 let mut drop_targets: SmallVec<[_; 8]> = SmallVec::new();
256 for (e, detached) in unused_targets_state.get(world).iter() {
257 if detached.is_detached() {
258 add_finish.push(e);
259 } else {
260 drop_targets.push(e);
261 }
262 }
263
264 for e in add_finish {
265 AddExecution::new(None, e, Finished).apply(world);
267 }
268
269 for target in drop_targets.drain(..) {
270 drop_target(target, world, roster, true);
271 }
272
273 let mut lifecycles = world.get_resource_or_insert_with(SeriesLifecycleChannel::default);
274 while let Ok(dropped_target) = lifecycles.receiver.try_recv() {
275 drop_targets.push(dropped_target);
276 }
277
278 for target in drop_targets {
279 drop_target(target, world, roster, false);
280 }
281
282 while let Some(DisposalNotice {
283 source,
284 origin,
285 session,
286 }) = roster.disposed.pop()
287 {
288 let Some(validate) = world.get::<ValidateScopeReachability>(source) else {
289 world
290 .get_resource_or_insert_with(UnhandledErrors::default)
291 .miscellaneous
292 .push(MiscellaneousFailure {
293 error: Arc::new(anyhow!(
294 "Scope {source:?} for disposal notification does not \
295 have validation component",
296 )),
297 backtrace: Some(Backtrace::new()),
298 });
299 continue;
300 };
301
302 let validate = validate.0;
303 let req = ValidationRequest {
304 source,
305 origin,
306 session,
307 world,
308 roster,
309 };
310 if let Err(OperationError::Broken(backtrace)) = validate(req) {
311 world
312 .get_resource_or_insert_with(UnhandledErrors::default)
313 .miscellaneous
314 .push(MiscellaneousFailure {
315 error: Arc::new(anyhow!(
316 "Scope {source:?} broken while validating a disposal"
317 )),
318 backtrace,
319 });
320 }
321 }
322
323 #[cfg(feature = "single_threaded_async")]
324 SingleThreadedExecution::world_poll(world, parameters.single_threaded_poll_limit);
325}
326
327fn drop_target(target: Entity, world: &mut World, roster: &mut OperationRoster, unused: bool) {
328 roster.purge(target);
329 let mut dropped_series = Vec::new();
330 let mut detached_series = None;
331
332 let mut execution = target;
333 let mut search_state: SystemState<(Query<&Children>, Query<&Detached>)> =
334 SystemState::new(world);
335
336 let (q_children, q_detached) = search_state.get(world);
337 loop {
338 let mut move_up_chain = false;
339 if let Ok(children) = q_children.get(execution) {
340 for child in children {
341 let Ok(detached) = q_detached.get(*child) else {
342 continue;
343 };
344 if detached.is_detached() {
345 detached_series = Some(*child);
350 break;
351 } else {
352 if unused {
355 dropped_series.push(execution);
356 }
357 roster.purge(execution);
358 move_up_chain = true;
359 execution = *child;
360 continue;
361 }
362 }
363 }
364
365 if !move_up_chain {
366 break;
368 }
369 }
370
371 if let Some(detached_series) = detached_series {
372 if let Ok(mut detached_series_mut) = world.get_entity_mut(detached_series) {
373 detached_series_mut.remove::<ChildOf>();
374 }
375 }
376
377 if let Ok(unused_target_mut) = world.get_entity_mut(target) {
378 unused_target_mut.despawn();
379 }
380
381 if unused {
382 world
383 .get_resource_or_insert_with(UnhandledErrors::default)
384 .unused_targets
385 .push(UnusedTargetDrop {
386 unused_target: target,
387 dropped_series,
388 });
389 }
390}
391
392#[derive(Resource, Default, Deref, DerefMut)]
395pub(crate) struct DeferredRoster(pub OperationRoster);