crossflow/
flush.rs

1/*
2 * Copyright (C) 2024 Open Source Robotics Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 *     http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16*/
17
18use bevy_derive::{Deref, DerefMut};
19use bevy_ecs::{
20    prelude::*,
21    schedule::{IntoScheduleConfigs, ScheduleConfigs},
22    system::{ScheduleSystem, SystemState},
23};
24
25use smallvec::SmallVec;
26
27use anyhow::anyhow;
28
29use backtrace::Backtrace;
30
31use std::sync::Arc;
32
33use crate::{
34    AddExecution, ChannelQueue, Detached, DisposalNotice, Finished, FlushWarning,
35    MiscellaneousFailure, OperationError, OperationRequest, OperationRoster,
36    SeriesLifecycleChannel, ServiceHook, ServiceLifecycle, ServiceLifecycleChannel,
37    UnhandledErrors, UnusedTarget, UnusedTargetDrop, ValidateScopeReachability, ValidationRequest,
38    WakeQueue, awaken_task, dispose_for_despawned_service, execute_operation,
39};
40
41#[cfg(feature = "single_threaded_async")]
42use crate::async_execution::SingleThreadedExecution;
43
44#[derive(Resource, Default, Clone, Copy)]
45pub struct FlushParameters {
46    /// By default, a flush will loop until the whole [`OperationRoster`] is empty.
47    /// If there are loops of blocking services then it is possible for the flush
48    /// to loop indefinitely, creating the appearance of a blockup in the
49    /// application, or delaying other systems from running for a prolonged amount
50    /// of time.
51    ///
52    /// Use this limit to prevent the flush from blocking for too long in those
53    /// scenarios. If the flush loops beyond this limit, anything remaining in
54    /// the roster will be moved into a deferred roster which will be processed
55    /// during the next flush.
56    ///
57    /// A value of `None` means the flush can loop indefinitely (this is the default).
58    pub flush_loop_limit: Option<usize>,
59    /// When using the single_threaded_async feature, async futures get polled
60    /// during the execution flush. If async futures repeatedly spawn more async
61    /// tasks then the flush could get stuck in a loop. This parameter lets you
62    /// put a limit on how many futures get polled so that the flush cannot get
63    /// stuck.
64    ///
65    /// A value of `None` means the futures can be polled indefinitely (this is the default).
66    pub single_threaded_poll_limit: Option<usize>,
67    /// How many items can be received from the channel in one flush. The channel
68    /// is implemented as unbounded, so if something is pushing commands to the
69    /// channel faster than they can be processed, the flush will appear to be
70    /// hanging.
71    pub channel_received_limit: Option<usize>,
72}
73
74impl FlushParameters {
75    /// A set of options to prevent the activity flush system from getting hung
76    /// up on a flood of commands.
77    pub fn avoid_hanging() -> Self {
78        Self {
79            flush_loop_limit: Some(10),
80            single_threaded_poll_limit: Some(100),
81            channel_received_limit: Some(100),
82        }
83    }
84}
85
86pub fn flush_execution() -> ScheduleConfigs<ScheduleSystem> {
87    flush_execution_impl.into_configs()
88}
89
90fn flush_execution_impl(
91    world: &mut World,
92    new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
93) {
94    let parameters = *world.get_resource_or_insert_with(FlushParameters::default);
95    let mut roster = OperationRoster::new();
96    collect_from_channels(&parameters, new_service_query, world, &mut roster);
97
98    let mut loop_count = 0;
99    while !roster.is_empty() {
100        for e in roster.deferred_despawn.drain(..) {
101            if let Ok(e_mut) = world.get_entity_mut(e) {
102                e_mut.despawn();
103            }
104        }
105
106        let parameters = *world.get_resource_or_insert_with(FlushParameters::default);
107        let flush_loop_limit = parameters.flush_loop_limit;
108        if let Some(limit) = flush_loop_limit {
109            if limit <= loop_count {
110                // We have looped beyoond the limit, so we will defer anything that
111                // remains in the roster and stop looping from here.
112                world
113                    .get_resource_or_insert_with(DeferredRoster::default)
114                    .append(&mut roster);
115
116                world
117                    .get_resource_or_insert_with(UnhandledErrors::default)
118                    .flush_warnings
119                    .push(FlushWarning::ExceededFlushLoopLimit {
120                        limit,
121                        reached: loop_count,
122                    });
123
124                break;
125            }
126        }
127
128        loop_count += 1;
129
130        garbage_cleanup(world, &mut roster);
131
132        while let Some(unblock) = roster.unblock.pop_front() {
133            let serve_next = unblock.serve_next;
134            serve_next(unblock, world, &mut roster);
135            garbage_cleanup(world, &mut roster);
136        }
137
138        while let Some(source) = roster.queue.pop_front() {
139            execute_operation(OperationRequest {
140                source,
141                world,
142                roster: &mut roster,
143            });
144            garbage_cleanup(world, &mut roster);
145            loop_count += 1;
146            if let Some(limit) = flush_loop_limit {
147                if limit <= loop_count {
148                    world
149                        .get_resource_or_insert_with(UnhandledErrors::default)
150                        .flush_warnings
151                        .push(FlushWarning::ExceededFlushLoopLimit {
152                            limit,
153                            reached: loop_count,
154                        });
155                    break;
156                }
157            }
158        }
159
160        while let Some(source) = roster.awake.pop_front() {
161            awaken_task(OperationRequest {
162                source,
163                world,
164                roster: &mut roster,
165            });
166            garbage_cleanup(world, &mut roster);
167        }
168
169        collect_from_channels(&parameters, new_service_query, world, &mut roster);
170    }
171}
172
173fn garbage_cleanup(world: &mut World, roster: &mut OperationRoster) {
174    while let Some(cleanup) = roster.cleanup_finished.pop() {
175        cleanup.trigger(world, roster);
176    }
177
178    while let Some(cancel) = roster.cancel.pop_front() {
179        cancel.trigger(world, roster);
180    }
181}
182
183fn collect_from_channels(
184    parameters: &FlushParameters,
185    new_service_query: &mut QueryState<(Entity, &mut ServiceHook), Added<ServiceHook>>,
186    world: &mut World,
187    roster: &mut OperationRoster,
188) {
189    // Get the receiver for async task commands
190    let mut received_count = 0;
191    while let Ok(item) = world
192        .get_resource_or_insert_with(ChannelQueue::new)
193        .receiver
194        .try_recv()
195    {
196        (item)(world, roster);
197        if let Some(limit) = parameters.channel_received_limit {
198            if limit <= received_count {
199                world
200                    .get_resource_or_insert_with(UnhandledErrors::default)
201                    .flush_warnings
202                    .push(FlushWarning::ExceededChannelReceivedLimit {
203                        limit,
204                        reached: received_count,
205                    });
206                break;
207            }
208        }
209        received_count += 1;
210    }
211
212    roster.process_deferals();
213
214    let mut removed_services: SmallVec<[Entity; 8]> = SmallVec::new();
215    world.get_resource_or_insert_with(ServiceLifecycleChannel::new);
216    world.resource_scope::<ServiceLifecycleChannel, ()>(|world, mut lifecycles| {
217        // Clean up the dangling requests of any services that have been despawned.
218        while let Ok(removed_service) = lifecycles.receiver.try_recv() {
219            removed_services.push(removed_service);
220        }
221
222        // Add a lifecycle tracker to any new services that might have shown up
223        // TODO(@mxgrey): Make sure this works for services which are spawned by
224        // providers that are being flushed.
225        for (e, mut hook) in new_service_query.iter_mut(world) {
226            if hook.lifecycle.is_none() {
227                // Check if the lifecycle is none, because collect_from_channels
228                // can be run multiple times per flush, in which case we will
229                // iterate over the query again, and end up dropping the lifecycle
230                // managers that we just created. When that happens, the service
231                // gets treated as despawned prematurely.
232                hook.lifecycle = Some(ServiceLifecycle::new(e, lifecycles.sender.clone()));
233            }
234        }
235    });
236
237    for removed_service in removed_services {
238        dispose_for_despawned_service(removed_service, world, roster);
239    }
240
241    // Queue any operations that needed to be deferred
242    let mut deferred = world.get_resource_or_insert_with(DeferredRoster::default);
243    roster.append(&mut deferred);
244
245    // Collect any tasks that are ready to be woken
246    let mut wake_queue = world.get_resource_or_insert_with(WakeQueue::new);
247    while let Ok(wakeable) = wake_queue.receiver.try_recv() {
248        roster.awake(wakeable);
249    }
250
251    let mut unused_targets_state: SystemState<Query<(Entity, &Detached), With<UnusedTarget>>> =
252        SystemState::new(world);
253
254    let mut add_finish: SmallVec<[_; 8]> = SmallVec::new();
255    let mut drop_targets: SmallVec<[_; 8]> = SmallVec::new();
256    for (e, detached) in unused_targets_state.get(world).iter() {
257        if detached.is_detached() {
258            add_finish.push(e);
259        } else {
260            drop_targets.push(e);
261        }
262    }
263
264    for e in add_finish {
265        // Add a Finished execution to the unused target of a detached series.
266        AddExecution::new(None, e, Finished).apply(world);
267    }
268
269    for target in drop_targets.drain(..) {
270        drop_target(target, world, roster, true);
271    }
272
273    let mut lifecycles = world.get_resource_or_insert_with(SeriesLifecycleChannel::default);
274    while let Ok(dropped_target) = lifecycles.receiver.try_recv() {
275        drop_targets.push(dropped_target);
276    }
277
278    for target in drop_targets {
279        drop_target(target, world, roster, false);
280    }
281
282    while let Some(DisposalNotice {
283        source,
284        origin,
285        session,
286    }) = roster.disposed.pop()
287    {
288        let Some(validate) = world.get::<ValidateScopeReachability>(source) else {
289            world
290                .get_resource_or_insert_with(UnhandledErrors::default)
291                .miscellaneous
292                .push(MiscellaneousFailure {
293                    error: Arc::new(anyhow!(
294                        "Scope {source:?} for disposal notification does not \
295                        have validation component",
296                    )),
297                    backtrace: Some(Backtrace::new()),
298                });
299            continue;
300        };
301
302        let validate = validate.0;
303        let req = ValidationRequest {
304            source,
305            origin,
306            session,
307            world,
308            roster,
309        };
310        if let Err(OperationError::Broken(backtrace)) = validate(req) {
311            world
312                .get_resource_or_insert_with(UnhandledErrors::default)
313                .miscellaneous
314                .push(MiscellaneousFailure {
315                    error: Arc::new(anyhow!(
316                        "Scope {source:?} broken while validating a disposal"
317                    )),
318                    backtrace,
319                });
320        }
321    }
322
323    #[cfg(feature = "single_threaded_async")]
324    SingleThreadedExecution::world_poll(world, parameters.single_threaded_poll_limit);
325}
326
327fn drop_target(target: Entity, world: &mut World, roster: &mut OperationRoster, unused: bool) {
328    roster.purge(target);
329    let mut dropped_series = Vec::new();
330    let mut detached_series = None;
331
332    let mut execution = target;
333    let mut search_state: SystemState<(Query<&Children>, Query<&Detached>)> =
334        SystemState::new(world);
335
336    let (q_children, q_detached) = search_state.get(world);
337    loop {
338        let mut move_up_chain = false;
339        if let Ok(children) = q_children.get(execution) {
340            for child in children {
341                let Ok(detached) = q_detached.get(*child) else {
342                    continue;
343                };
344                if detached.is_detached() {
345                    // This child is detached so we will not include it in the
346                    // dropped series. We need to de-parent it so that it does
347                    // not get despawned with the rest of the series that we
348                    // are dropping.
349                    detached_series = Some(*child);
350                    break;
351                } else {
352                    // This child is not detached, so we will include it in our
353                    // dropped series, and crawl towards one of it children.
354                    if unused {
355                        dropped_series.push(execution);
356                    }
357                    roster.purge(execution);
358                    move_up_chain = true;
359                    execution = *child;
360                    continue;
361                }
362            }
363        }
364
365        if !move_up_chain {
366            // There is nothing further to include in the drop
367            break;
368        }
369    }
370
371    if let Some(detached_series) = detached_series {
372        if let Ok(mut detached_series_mut) = world.get_entity_mut(detached_series) {
373            detached_series_mut.remove::<ChildOf>();
374        }
375    }
376
377    if let Ok(unused_target_mut) = world.get_entity_mut(target) {
378        unused_target_mut.despawn();
379    }
380
381    if unused {
382        world
383            .get_resource_or_insert_with(UnhandledErrors::default)
384            .unused_targets
385            .push(UnusedTargetDrop {
386                unused_target: target,
387                dropped_series,
388            });
389    }
390}
391
392/// This resource is used to queue up operations in the roster in situations
393/// where the regular roster is not available.
394#[derive(Resource, Default, Deref, DerefMut)]
395pub(crate) struct DeferredRoster(pub OperationRoster);