use super::Schedule;
use crate::{
resource::Resources,
system::{ArchetypeAccess, System, ThreadLocalExecution, TypeAccess},
};
use crossbeam_channel::{Receiver, Sender};
use fixedbitset::FixedBitSet;
use bevy_hecs::{ArchetypesGeneration, World};
use rayon::ScopeFifo;
use std::{
ops::Range,
sync::{Arc, Mutex},
};
#[derive(Debug)]
pub struct ParallelExecutor {
stages: Vec<ExecutorStage>,
last_schedule_generation: usize,
clear_trackers: bool,
}
impl Default for ParallelExecutor {
fn default() -> Self {
Self {
stages: Default::default(),
last_schedule_generation: usize::MAX,
clear_trackers: true,
}
}
}
impl ParallelExecutor {
pub fn without_tracker_clears() -> Self {
Self {
clear_trackers: false,
..Default::default()
}
}
pub fn run(&mut self, schedule: &mut Schedule, world: &mut World, resources: &mut Resources) {
let schedule_generation = schedule.generation();
let schedule_changed = schedule.generation() != self.last_schedule_generation;
if schedule_changed {
self.stages.clear();
self.stages
.resize_with(schedule.stage_order.len(), || ExecutorStage::default());
}
for (stage_name, executor_stage) in schedule.stage_order.iter().zip(self.stages.iter_mut())
{
if let Some(stage_systems) = schedule.stages.get_mut(stage_name) {
executor_stage.run(world, resources, stage_systems, schedule_changed);
}
}
if self.clear_trackers {
world.clear_trackers();
}
self.last_schedule_generation = schedule_generation;
}
}
#[derive(Debug, Clone)]
pub struct ExecutorStage {
system_dependencies: Vec<FixedBitSet>,
system_dependents: Vec<Vec<usize>>,
thread_local_system_indices: Vec<usize>,
next_thread_local_index: usize,
finished_systems: FixedBitSet,
running_systems: FixedBitSet,
sender: Sender<usize>,
receiver: Receiver<usize>,
last_archetypes_generation: ArchetypesGeneration,
}
impl Default for ExecutorStage {
fn default() -> Self {
let (sender, receiver) = crossbeam_channel::unbounded();
Self {
system_dependents: Default::default(),
system_dependencies: Default::default(),
thread_local_system_indices: Default::default(),
next_thread_local_index: 0,
finished_systems: Default::default(),
running_systems: Default::default(),
sender,
receiver,
last_archetypes_generation: ArchetypesGeneration(u64::MAX),
}
}
}
enum RunReadyResult {
Ok,
ThreadLocalReady(usize),
}
enum RunReadyType {
Range(Range<usize>),
Dependents(usize),
}
impl ExecutorStage {
pub fn prepare_to_next_thread_local(
&mut self,
world: &World,
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
let (prepare_system_start_index, last_thread_local_index) =
if self.next_thread_local_index == 0 {
(0, None)
} else {
(
self.thread_local_system_indices[self.next_thread_local_index - 1] + 1,
Some(self.thread_local_system_indices[self.next_thread_local_index - 1]),
)
};
let prepare_system_index_range = if let Some(index) = self
.thread_local_system_indices
.get(self.next_thread_local_index)
{
prepare_system_start_index..(*index + 1)
} else {
prepare_system_start_index..systems.len()
};
let archetypes_generation_changed =
self.last_archetypes_generation != world.archetypes_generation();
if schedule_changed || archetypes_generation_changed {
for system_index in prepare_system_index_range.clone() {
let mut system = systems[system_index].lock().unwrap();
system.update_archetype_access(world);
}
let mut current_archetype_access = ArchetypeAccess::default();
let mut current_resource_access = TypeAccess::default();
for system_index in prepare_system_index_range.clone() {
let system = systems[system_index].lock().unwrap();
let archetype_access = system.archetype_access();
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => {
let resource_access = system.resource_access();
if current_archetype_access.is_compatible(archetype_access) == false
|| current_resource_access.is_compatible(resource_access) == false
{
for earlier_system_index in
prepare_system_index_range.start..system_index
{
let earlier_system = systems[earlier_system_index].lock().unwrap();
debug_assert_eq!(
earlier_system.thread_local_execution(),
ThreadLocalExecution::NextFlush
);
if earlier_system
.archetype_access()
.is_compatible(archetype_access)
== false
|| earlier_system
.resource_access()
.is_compatible(resource_access)
== false
{
self.system_dependents[earlier_system_index].push(system_index);
self.system_dependencies[system_index]
.insert(earlier_system_index);
}
}
}
current_archetype_access.union(archetype_access);
current_resource_access.union(resource_access);
if let Some(last_thread_local_index) = last_thread_local_index {
self.system_dependents[last_thread_local_index].push(system_index);
self.system_dependencies[system_index].insert(last_thread_local_index);
}
}
ThreadLocalExecution::Immediate => {
for earlier_system_index in prepare_system_index_range.start..system_index {
self.system_dependents[earlier_system_index].push(system_index);
self.system_dependencies[system_index].insert(earlier_system_index);
}
}
}
}
}
self.next_thread_local_index += 1;
}
fn run_ready_systems<'run>(
&mut self,
systems: &[Arc<Mutex<Box<dyn System>>>],
run_ready_type: RunReadyType,
scope: &ScopeFifo<'run>,
world: &'run World,
resources: &'run Resources,
) -> RunReadyResult {
let mut all;
let mut dependents;
let system_index_iter: &mut dyn Iterator<Item = usize> = match run_ready_type {
RunReadyType::Range(range) => {
all = range;
&mut all
}
RunReadyType::Dependents(system_index) => {
dependents = self.system_dependents[system_index].iter().cloned();
&mut dependents
}
};
let mut systems_currently_running = false;
for system_index in system_index_iter {
if self.running_systems.contains(system_index) {
continue;
}
if self.system_dependencies[system_index].is_subset(&self.finished_systems) {
let system = systems[system_index].clone();
{
let system = system.lock().unwrap();
if let ThreadLocalExecution::Immediate = system.thread_local_execution() {
if systems_currently_running {
continue;
} else {
return RunReadyResult::ThreadLocalReady(system_index);
}
}
}
let sender = self.sender.clone();
self.running_systems.insert(system_index);
scope.spawn_fifo(move |_| {
let mut system = system.lock().unwrap();
system.run(world, resources);
sender.send(system_index).unwrap();
});
systems_currently_running = true;
}
}
RunReadyResult::Ok
}
pub fn run(
&mut self,
world: &mut World,
resources: &mut Resources,
systems: &[Arc<Mutex<Box<dyn System>>>],
schedule_changed: bool,
) {
if schedule_changed {
self.system_dependencies.clear();
self.system_dependencies
.resize_with(systems.len(), || FixedBitSet::with_capacity(systems.len()));
self.thread_local_system_indices = Vec::new();
self.system_dependents.clear();
self.system_dependents.resize(systems.len(), Vec::new());
self.finished_systems.grow(systems.len());
self.running_systems.grow(systems.len());
for (system_index, system) in systems.iter().enumerate() {
let system = system.lock().unwrap();
if system.thread_local_execution() == ThreadLocalExecution::Immediate {
self.thread_local_system_indices.push(system_index);
}
}
}
self.next_thread_local_index = 0;
self.prepare_to_next_thread_local(world, systems, schedule_changed);
self.finished_systems.clear();
self.running_systems.clear();
let mut run_ready_result = RunReadyResult::Ok;
let run_ready_system_index_range = if let Some(index) = self
.thread_local_system_indices
.get(0)
{
0..(*index + 1)
} else {
0..systems.len()
};
rayon::scope_fifo(|scope| {
run_ready_result =
self.run_ready_systems(systems, RunReadyType::Range(run_ready_system_index_range), scope, world, resources);
});
loop {
if self.finished_systems.count_ones(..) == systems.len() {
break;
}
if let RunReadyResult::ThreadLocalReady(thread_local_index) = run_ready_result {
let mut system = systems[thread_local_index].lock().unwrap();
self.running_systems.insert(thread_local_index);
system.run(world, resources);
system.run_thread_local(world, resources);
self.finished_systems.insert(thread_local_index);
self.sender.send(thread_local_index).unwrap();
self.prepare_to_next_thread_local(world, systems, schedule_changed);
run_ready_result = RunReadyResult::Ok;
} else {
rayon::scope_fifo(|scope| {
loop {
if self.finished_systems.count_ones(..) == systems.len() {
break;
}
let finished_system = self.receiver.recv().unwrap();
self.finished_systems.insert(finished_system);
run_ready_result = self.run_ready_systems(
systems,
RunReadyType::Dependents(finished_system),
scope,
world,
resources,
);
if let RunReadyResult::ThreadLocalReady(_) = run_ready_result {
break;
}
}
});
}
}
for system in systems.iter() {
let mut system = system.lock().unwrap();
match system.thread_local_execution() {
ThreadLocalExecution::NextFlush => system.run_thread_local(world, resources),
ThreadLocalExecution::Immediate => { }
}
}
self.last_archetypes_generation = world.archetypes_generation();
}
}
#[cfg(test)]
mod tests {
use super::ParallelExecutor;
use crate::{
resource::{Res, ResMut, Resources},
schedule::Schedule,
system::{IntoQuerySystem, IntoThreadLocalSystem, Query},
Commands,
};
use fixedbitset::FixedBitSet;
use bevy_hecs::{Entity, World};
use std::sync::{Arc, Mutex};
#[derive(Default)]
struct Counter {
count: Arc<Mutex<usize>>,
}
#[test]
fn cross_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
let mut schedule = Schedule::default();
schedule.add_stage("PreArchetypeChange");
schedule.add_stage("PostArchetypeChange");
fn insert(mut commands: Commands) {
commands.spawn((1u32,));
}
fn read(query: Query<&u32>, mut entities: Query<Entity>) {
for entity in &mut entities.iter() {
query.get::<u32>(entity).unwrap();
}
assert_eq!(1, entities.iter().iter().count());
}
schedule.add_system_to_stage("PreArchetypeChange", insert.system());
schedule.add_system_to_stage("PostArchetypeChange", read.system());
let mut executor = ParallelExecutor::default();
executor.run(&mut schedule, &mut world, &mut resources);
}
#[test]
fn intra_stage_archetype_change_prepare() {
let mut world = World::new();
let mut resources = Resources::default();
let mut schedule = Schedule::default();
schedule.add_stage("update");
fn insert(world: &mut World, _resources: &mut Resources) {
world.spawn((1u32,));
}
fn read(query: Query<&u32>, mut entities: Query<Entity>) {
for entity in &mut entities.iter() {
query.get::<u32>(entity).unwrap();
}
assert_eq!(1, entities.iter().iter().count());
}
schedule.add_system_to_stage("update", insert.thread_local_system());
schedule.add_system_to_stage("update", read.system());
let mut executor = ParallelExecutor::default();
executor.run(&mut schedule, &mut world, &mut resources);
}
#[test]
fn schedule() {
let mut world = World::new();
let mut resources = Resources::default();
resources.insert(Counter::default());
resources.insert(1.0f64);
resources.insert(2isize);
world.spawn((1.0f32,));
world.spawn((1u32, 1u64));
world.spawn((2u32,));
let mut schedule = Schedule::default();
schedule.add_stage("A");
schedule.add_stage("B");
schedule.add_stage("C");
fn read_u32(counter: Res<Counter>, _query: Query<&u32>) {
let mut count = counter.count.lock().unwrap();
assert!(*count < 2, "should be one of the first two systems to run");
*count += 1;
}
fn write_float(counter: Res<Counter>, _query: Query<&f32>) {
let mut count = counter.count.lock().unwrap();
assert!(*count < 2, "should be one of the first two systems to run");
*count += 1;
}
fn read_u32_write_u64(counter: Res<Counter>, _query: Query<(&u32, &mut u64)>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 2, "should always be the 3rd system to run");
*count += 1;
}
fn read_u64(counter: Res<Counter>, _query: Query<&u64>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 3, "should always be the 4th system to run");
*count += 1;
}
schedule.add_system_to_stage("A", read_u32.system());
schedule.add_system_to_stage("A", write_float.system());
schedule.add_system_to_stage("A", read_u32_write_u64.system());
schedule.add_system_to_stage("A", read_u64.system());
fn write_u64(counter: Res<Counter>, _query: Query<&mut u64>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 4, "should always be the 5th system to run");
*count += 1;
}
fn thread_local_system(_world: &mut World, resources: &mut Resources) {
let counter = resources.get::<Counter>().unwrap();
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 5, "should always be the 6th system to run");
*count += 1;
}
fn write_f32(counter: Res<Counter>, _query: Query<&mut f32>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 6, "should always be the 7th system to run");
*count += 1;
}
schedule.add_system_to_stage("B", write_u64.system());
schedule.add_system_to_stage("B", thread_local_system.thread_local_system());
schedule.add_system_to_stage("B", write_f32.system());
fn read_f64_res(counter: Res<Counter>, _f64_res: Res<f64>) {
let mut count = counter.count.lock().unwrap();
assert!(
7 == *count || *count == 8,
"should always be the 8th or 9th system to run"
);
*count += 1;
}
fn read_isize_res(counter: Res<Counter>, _isize_res: Res<isize>) {
let mut count = counter.count.lock().unwrap();
assert!(
7 == *count || *count == 8,
"should always be the 8th or 9th system to run"
);
*count += 1;
}
fn read_isize_write_f64_res(
counter: Res<Counter>,
_isize_res: Res<isize>,
_f64_res: ResMut<f64>,
) {
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 9, "should always be the 10th system to run");
*count += 1;
}
fn write_f64_res(counter: Res<Counter>, _f64_res: ResMut<f64>) {
let mut count = counter.count.lock().unwrap();
assert_eq!(*count, 10, "should always be the 11th system to run");
*count += 1;
}
schedule.add_system_to_stage("C", read_f64_res.system());
schedule.add_system_to_stage("C", read_isize_res.system());
schedule.add_system_to_stage("C", read_isize_write_f64_res.system());
schedule.add_system_to_stage("C", write_f64_res.system());
fn run_executor_and_validate(
executor: &mut ParallelExecutor,
schedule: &mut Schedule,
world: &mut World,
resources: &mut Resources,
) {
executor.run(schedule, world, resources);
assert_eq!(
executor.stages[0].system_dependents,
vec![vec![2], vec![], vec![3], vec![]]
);
assert_eq!(
executor.stages[1].system_dependents,
vec![vec![1], vec![2], vec![]]
);
assert_eq!(
executor.stages[2].system_dependents,
vec![vec![2, 3], vec![], vec![3], vec![]]
);
let stage_0_len = executor.stages[0].system_dependencies.len();
let mut read_u32_write_u64_deps = FixedBitSet::with_capacity(stage_0_len);
read_u32_write_u64_deps.insert(0);
let mut read_u64_deps = FixedBitSet::with_capacity(stage_0_len);
read_u64_deps.insert(2);
assert_eq!(
executor.stages[0].system_dependencies,
vec![
FixedBitSet::with_capacity(stage_0_len),
FixedBitSet::with_capacity(stage_0_len),
read_u32_write_u64_deps,
read_u64_deps,
]
);
let stage_1_len = executor.stages[1].system_dependencies.len();
let mut thread_local_deps = FixedBitSet::with_capacity(stage_1_len);
thread_local_deps.insert(0);
let mut write_f64_deps = FixedBitSet::with_capacity(stage_1_len);
write_f64_deps.insert(1);
assert_eq!(
executor.stages[1].system_dependencies,
vec![
FixedBitSet::with_capacity(stage_1_len),
thread_local_deps,
write_f64_deps
]
);
let stage_2_len = executor.stages[2].system_dependencies.len();
let mut read_isize_write_f64_res_deps = FixedBitSet::with_capacity(stage_2_len);
read_isize_write_f64_res_deps.insert(0);
let mut write_f64_res_deps = FixedBitSet::with_capacity(stage_2_len);
write_f64_res_deps.insert(0);
write_f64_res_deps.insert(2);
assert_eq!(
executor.stages[2].system_dependencies,
vec![
FixedBitSet::with_capacity(stage_2_len),
FixedBitSet::with_capacity(stage_2_len),
read_isize_write_f64_res_deps,
write_f64_res_deps
]
);
let counter = resources.get::<Counter>().unwrap();
assert_eq!(
*counter.count.lock().unwrap(),
11,
"counter should have been incremented once for each system"
);
}
let mut executor = ParallelExecutor::default();
run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources);
*resources.get::<Counter>().unwrap().count.lock().unwrap() = 0;
run_executor_and_validate(&mut executor, &mut schedule, &mut world, &mut resources);
}
}