use std::ops::Range;
use bevy_tasks::{ComputeTaskPool, CountdownEvent, TaskPool};
use bevy_utils::tracing::trace;
use downcast_rs::{impl_downcast, Downcast};
use fixedbitset::FixedBitSet;
use crate::{ArchetypesGeneration, Resources, System, ThreadLocalExecution, TypeAccess, World};
pub trait SystemStageExecutor: Downcast + Send + Sync {
fn execute_stage(
&mut self,
systems: &mut [Box<dyn System<In = (), Out = ()>>],
changed_systems: &[usize],
world: &mut World,
resources: &mut Resources,
);
}
impl_downcast!(SystemStageExecutor);
#[derive(Default)]
pub struct SerialSystemStageExecutor;
impl SystemStageExecutor for SerialSystemStageExecutor {
fn execute_stage(
&mut self,
systems: &mut [Box<dyn System<In = (), Out = ()>>],
_changed_systems: &[usize],
world: &mut World,
resources: &mut Resources,
) {
for system in systems.iter_mut() {
system.update(world);
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => {
system.run((), world, resources);
}
ThreadLocalExecution::Immediate => {
system.run((), world, resources);
system.run_thread_local(world, resources);
}
}
}
for system in systems.iter_mut() {
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => system.run_thread_local(world, resources),
ThreadLocalExecution::Immediate => { }
}
}
}
}
pub struct ParallelSystemStageExecutor {
system_dependencies: Vec<FixedBitSet>,
system_dependency_count: Vec<usize>,
ready_events: Vec<Option<CountdownEvent>>,
ready_events_of_dependents: Vec<Vec<CountdownEvent>>,
system_dependents: Vec<Vec<usize>>,
thread_local_system_indices: Vec<usize>,
last_archetypes_generation: ArchetypesGeneration,
}
impl Default for ParallelSystemStageExecutor {
fn default() -> Self {
Self {
system_dependents: Default::default(),
system_dependency_count: Default::default(),
ready_events: Default::default(),
ready_events_of_dependents: Default::default(),
system_dependencies: Default::default(),
thread_local_system_indices: Default::default(),
last_archetypes_generation: ArchetypesGeneration(u64::MAX),
}
}
}
impl ParallelSystemStageExecutor {
pub fn system_dependents(&self) -> &[Vec<usize>] {
&self.system_dependents
}
pub fn system_dependencies(&self) -> &[FixedBitSet] {
&self.system_dependencies
}
pub fn prepare_to_next_thread_local(
&mut self,
world: &World,
systems: &mut [Box<dyn System<In = (), Out = ()>>],
stage_changed: bool,
next_thread_local_index: usize,
) -> Range<usize> {
let (prepare_system_start_index, last_thread_local_index) = if next_thread_local_index == 0
{
(0, None)
} else {
(
self.thread_local_system_indices[next_thread_local_index - 1] + 1,
Some(self.thread_local_system_indices[next_thread_local_index - 1]),
)
};
let prepare_system_index_range = if let Some(index) = self
.thread_local_system_indices
.get(next_thread_local_index)
{
prepare_system_start_index..(*index + 1)
} else {
prepare_system_start_index..systems.len()
};
let archetypes_generation_changed =
self.last_archetypes_generation != world.archetypes_generation();
if stage_changed || archetypes_generation_changed {
for system_index in prepare_system_index_range.clone() {
systems[system_index].update(world);
self.system_dependents[system_index].clear();
self.system_dependencies[system_index].clear();
}
let mut current_archetype_access = TypeAccess::default();
let mut current_resource_access = TypeAccess::default();
for system_index in prepare_system_index_range.clone() {
let system = &systems[system_index];
let archetype_access = system.archetype_component_access();
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => {
let resource_access = system.resource_access();
if !current_archetype_access.is_compatible(archetype_access)
|| !current_resource_access.is_compatible(resource_access)
{
#[allow(clippy::needless_range_loop)]
for earlier_system_index in
prepare_system_index_range.start..system_index
{
let earlier_system = &systems[earlier_system_index];
debug_assert_eq!(
earlier_system.thread_local_execution(),
ThreadLocalExecution::NextFlush
);
if !earlier_system
.archetype_component_access()
.is_compatible(archetype_access)
|| !earlier_system
.resource_access()
.is_compatible(resource_access)
{
self.system_dependents[earlier_system_index].push(system_index);
self.system_dependencies[system_index]
.insert(earlier_system_index);
}
}
}
current_archetype_access.union(archetype_access);
current_resource_access.union(resource_access);
if let Some(last_thread_local_index) = last_thread_local_index {
self.system_dependents[last_thread_local_index].push(system_index);
self.system_dependencies[system_index].insert(last_thread_local_index);
}
}
ThreadLocalExecution::Immediate => {
for earlier_system_index in prepare_system_index_range.start..system_index {
self.system_dependents[earlier_system_index].push(system_index);
self.system_dependencies[system_index].insert(earlier_system_index);
}
}
}
}
#[cfg(debug_assertions)]
for system_index in prepare_system_index_range.clone() {
let mut system_dependents_set = std::collections::HashSet::new();
for dependent_system in &self.system_dependents[system_index] {
let inserted = system_dependents_set.insert(*dependent_system);
debug_assert!(inserted);
}
}
for ready_events_of_dependents in
&mut self.ready_events_of_dependents[prepare_system_index_range.clone()]
{
ready_events_of_dependents.clear();
}
for system_index in prepare_system_index_range.clone() {
assert!(!self.system_dependencies[system_index].contains(system_index));
let dependency_count = self.system_dependencies[system_index].count_ones(..);
self.system_dependency_count[system_index] = dependency_count;
self.ready_events[system_index] = match self.system_dependency_count[system_index] {
0 => None,
dependency_count => Some(CountdownEvent::new(dependency_count as isize)),
}
}
for system_index in prepare_system_index_range.clone() {
for dependent_system in &self.system_dependents[system_index] {
self.ready_events_of_dependents[system_index].push(
self.ready_events[*dependent_system]
.as_ref()
.expect("A dependent task should have a non-None ready event.")
.clone(),
);
}
}
} else {
self.reset_system_ready_events(prepare_system_index_range);
}
if let Some(index) = self
.thread_local_system_indices
.get(next_thread_local_index)
{
prepare_system_start_index..(*index)
} else {
prepare_system_start_index..systems.len()
}
}
fn reset_system_ready_events(&mut self, prepare_system_index_range: Range<usize>) {
for system_index in prepare_system_index_range {
let dependency_count = self.system_dependency_count[system_index];
if dependency_count > 0 {
self.ready_events[system_index]
.as_ref()
.expect("A system with >0 dependency count should have a non-None ready event.")
.reset(dependency_count as isize)
}
}
}
pub fn run_systems(
&self,
world: &World,
resources: &Resources,
systems: &mut [Box<dyn System<In = (), Out = ()>>],
prepared_system_range: Range<usize>,
compute_pool: &TaskPool,
) {
trace!("running systems {:?}", prepared_system_range);
compute_pool.scope(|scope| {
let start_system_index = prepared_system_range.start;
let mut system_index = start_system_index;
for system in &mut systems[prepared_system_range] {
trace!(
"prepare {} {} with {} dependents and {} dependencies",
system_index,
system.name(),
self.system_dependents[system_index].len(),
self.system_dependencies[system_index].count_ones(..)
);
let ready_event = &self.ready_events[system_index];
if start_system_index != 0 {
if let Some(ready_event) = ready_event.as_ref() {
for dependency in self.system_dependencies[system_index].ones() {
if dependency < start_system_index {
ready_event.decrement();
}
}
}
}
let world_ref = &*world;
let resources_ref = &*resources;
let trigger_events = &self.ready_events_of_dependents[system_index];
#[cfg(debug_assertions)]
{
let dependent_systems = &self.system_dependents[system_index];
debug_assert_eq!(trigger_events.len(), dependent_systems.len());
for (trigger_event, dependent_system_index) in
trigger_events.iter().zip(dependent_systems)
{
debug_assert!(
*dependent_system_index < start_system_index || trigger_event.get() > 0
);
}
}
scope.spawn(async move {
if let Some(ready_event) = ready_event {
ready_event.listen().await;
}
{
#[cfg(feature = "trace")]
let system_span = bevy_utils::tracing::info_span!(
"system",
name = system.name().as_ref()
);
#[cfg(feature = "trace")]
let _system_guard = system_span.enter();
unsafe {
system.run_unsafe((), world_ref, resources_ref);
}
}
for trigger_event in trigger_events {
trigger_event.decrement();
}
});
system_index += 1;
}
});
}
}
impl SystemStageExecutor for ParallelSystemStageExecutor {
fn execute_stage(
&mut self,
systems: &mut [Box<dyn System<In = (), Out = ()>>],
changed_systems: &[usize],
world: &mut World,
resources: &mut Resources,
) {
let start_archetypes_generation = world.archetypes_generation();
let compute_pool = resources
.get_or_insert_with(|| ComputeTaskPool(TaskPool::default()))
.clone();
let stage_changed = !changed_systems.is_empty();
if stage_changed {
self.system_dependencies.clear();
self.system_dependencies
.resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len()));
self.system_dependency_count.clear();
self.system_dependency_count.resize(systems.len(), 0);
self.thread_local_system_indices = Vec::new();
self.system_dependents.clear();
self.system_dependents.resize(systems.len(), Vec::new());
self.ready_events.resize(systems.len(), None);
self.ready_events_of_dependents
.resize(systems.len(), Vec::new());
for (system_index, system) in systems.iter().enumerate() {
if system.thread_local_execution() == ThreadLocalExecution::Immediate {
#[cfg(feature = "trace")]
let system_span =
bevy_utils::tracing::info_span!("system", name = system.name().as_ref());
#[cfg(feature = "trace")]
let _system_guard = system_span.enter();
self.thread_local_system_indices.push(system_index);
}
}
}
let mut next_thread_local_index = 0;
{
let prepared_system_range = self.prepare_to_next_thread_local(
world,
systems,
stage_changed,
next_thread_local_index,
);
self.run_systems(
world,
resources,
systems,
prepared_system_range,
&*compute_pool,
);
}
loop {
if next_thread_local_index >= self.thread_local_system_indices.len() {
break;
}
let thread_local_system_index =
self.thread_local_system_indices[next_thread_local_index];
{
let system = systems[thread_local_system_index].as_mut();
#[cfg(feature = "trace")]
let system_span = bevy_utils::tracing::info_span!(
"thread_local_system",
name = system.name().as_ref()
);
#[cfg(feature = "trace")]
let _system_guard = system_span.enter();
system.run((), world, resources);
system.run_thread_local(world, resources);
}
next_thread_local_index += 1;
let run_ready_system_index_range = self.prepare_to_next_thread_local(
world,
systems,
stage_changed,
next_thread_local_index,
);
self.run_systems(
world,
resources,
systems,
run_ready_system_index_range,
&*compute_pool,
);
}
for system in systems.iter_mut() {
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => {
#[cfg(feature = "trace")]
let system_span =
bevy_utils::tracing::info_span!("system", name = system.name().as_ref());
#[cfg(feature = "trace")]
let _system_guard = system_span.enter();
system.run_thread_local(world, resources);
}
ThreadLocalExecution::Immediate => { }
}
}
if start_archetypes_generation == world.archetypes_generation() {
self.last_archetypes_generation = world.archetypes_generation();
}
}
}