use crate::{
resource::{ResourceTypeId, Resources},
system::SystemId,
};
use legion_core::{
borrow::RefMut,
command::CommandBuffer,
storage::ComponentTypeId,
subworld::ArchetypeAccess,
world::{World, WorldId},
};
use std::cell::UnsafeCell;
#[cfg(feature = "par-schedule")]
use tracing::{span, trace, Level};
#[cfg(feature = "par-schedule")]
use std::sync::atomic::{AtomicUsize, Ordering};
#[cfg(feature = "par-schedule")]
use fxhash::{FxHashMap, FxHashSet};
#[cfg(feature = "par-schedule")]
use rayon::prelude::*;
#[cfg(feature = "par-schedule")]
use itertools::izip;
#[cfg(feature = "par-schedule")]
use std::iter::repeat;
pub trait Schedulable: Runnable + Send + Sync {}
impl<T> Schedulable for T where T: Runnable + Send + Sync {}
pub trait Runnable {
fn name(&self) -> &SystemId;
fn reads(&self) -> (&[ResourceTypeId], &[ComponentTypeId]);
fn writes(&self) -> (&[ResourceTypeId], &[ComponentTypeId]);
fn prepare(&mut self, world: &World);
fn accesses_archetypes(&self) -> &ArchetypeAccess;
unsafe fn run_unsafe(&mut self, world: &World, resources: &Resources);
fn command_buffer_mut(&self, world: WorldId) -> Option<RefMut<CommandBuffer>>;
fn run(&mut self, world: &mut World, resources: &mut Resources) {
unsafe { self.run_unsafe(world, resources) };
}
}
pub struct Executor {
systems: Vec<SystemBox>,
#[cfg(feature = "par-schedule")]
static_dependants: Vec<Vec<usize>>,
#[cfg(feature = "par-schedule")]
dynamic_dependants: Vec<Vec<usize>>,
#[cfg(feature = "par-schedule")]
static_dependency_counts: Vec<AtomicUsize>,
#[cfg(feature = "par-schedule")]
awaiting: Vec<AtomicUsize>,
}
struct SystemBox(UnsafeCell<Box<dyn Schedulable>>);
unsafe impl Send for SystemBox {}
unsafe impl Sync for SystemBox {}
impl SystemBox {
#[cfg(feature = "par-schedule")]
unsafe fn get(&self) -> &dyn Schedulable { std::ops::Deref::deref(&*self.0.get()) }
#[allow(clippy::mut_from_ref)]
unsafe fn get_mut(&self) -> &mut dyn Schedulable {
std::ops::DerefMut::deref_mut(&mut *self.0.get())
}
}
impl Executor {
#[cfg(not(feature = "par-schedule"))]
pub fn new(systems: Vec<Box<dyn Schedulable>>) -> Self {
Self {
systems: systems
.into_iter()
.map(|s| SystemBox(UnsafeCell::new(s)))
.collect(),
}
}
#[cfg(feature = "par-schedule")]
#[allow(clippy::cognitive_complexity)]
pub fn new(systems: Vec<Box<dyn Schedulable>>) -> Self {
if systems.len() > 1 {
let mut static_dependency_counts = Vec::with_capacity(systems.len());
let mut static_dependants: Vec<Vec<_>> =
repeat(Vec::with_capacity(64)).take(systems.len()).collect();
let mut dynamic_dependants: Vec<Vec<_>> =
repeat(Vec::with_capacity(64)).take(systems.len()).collect();
let mut resource_last_mutated =
FxHashMap::<ResourceTypeId, usize>::with_capacity_and_hasher(
64,
Default::default(),
);
let mut resource_last_read =
FxHashMap::<ResourceTypeId, usize>::with_capacity_and_hasher(
64,
Default::default(),
);
let mut component_last_mutated =
FxHashMap::<ComponentTypeId, usize>::with_capacity_and_hasher(
64,
Default::default(),
);
let mut component_last_read =
FxHashMap::<ComponentTypeId, usize>::with_capacity_and_hasher(
64,
Default::default(),
);
for (i, system) in systems.iter().enumerate() {
let span = span!(
Level::TRACE,
"Building system dependencies",
system = %system.name(),
index = i,
);
let _guard = span.enter();
let (read_res, read_comp) = system.reads();
let (write_res, write_comp) = system.writes();
let mut dependencies = FxHashSet::with_capacity_and_hasher(64, Default::default());
for res in read_res {
trace!(resource = ?res, "Read resource");
if let Some(n) = resource_last_mutated.get(res) {
trace!(system_index = n, "Added write dependency");
dependencies.insert(*n);
}
resource_last_read.insert(*res, i);
}
for res in write_res {
trace!(resource = ?res, "Write resource");
if let Some(n) = resource_last_read.get(res) {
trace!(system_index = n, "Added read dependency");
dependencies.insert(*n);
}
if let Some(n) = resource_last_mutated.get(res) {
trace!(system_index = n, "Added write dependency");
dependencies.insert(*n);
}
resource_last_mutated.insert(*res, i);
}
static_dependency_counts.push(AtomicUsize::from(dependencies.len()));
trace!(dependants = ?dependencies, "Computed static dependants");
for dep in dependencies {
static_dependants[dep].push(i);
}
let mut comp_dependencies = FxHashSet::default();
for comp in write_comp {
trace!(component = ?comp, "Write component");
if let Some(n) = component_last_read.get(comp) {
trace!(system_index = n, "Added read dependency");
comp_dependencies.insert(*n);
}
if let Some(n) = component_last_mutated.get(comp) {
trace!(system_index = n, "Added write dependency");
comp_dependencies.insert(*n);
}
component_last_mutated.insert(*comp, i);
}
for comp in read_comp {
trace!(component = ?comp, "Read component");
if let Some(n) = component_last_mutated.get(comp) {
trace!(system_index = n, "Added write dependency");
comp_dependencies.insert(*n);
}
component_last_read.insert(*comp, i);
}
trace!(depentants = ?comp_dependencies, "Computed dynamic dependants");
for dep in comp_dependencies {
if dep != i {
dynamic_dependants[dep].push(i);
}
}
}
trace!(
?static_dependants,
?dynamic_dependants,
"Computed system dependencies"
);
let mut awaiting = Vec::with_capacity(systems.len());
systems
.iter()
.for_each(|_| awaiting.push(AtomicUsize::new(0)));
Executor {
awaiting,
static_dependants,
dynamic_dependants,
static_dependency_counts,
systems: systems
.into_iter()
.map(|s| SystemBox(UnsafeCell::new(s)))
.collect(),
}
} else {
Executor {
awaiting: Vec::with_capacity(0),
static_dependants: Vec::with_capacity(0),
dynamic_dependants: Vec::with_capacity(0),
static_dependency_counts: Vec::with_capacity(0),
systems: systems
.into_iter()
.map(|s| SystemBox(UnsafeCell::new(s)))
.collect(),
}
}
}
pub fn into_vec(self) -> Vec<Box<dyn Schedulable>> {
self.systems.into_iter().map(|s| s.0.into_inner()).collect()
}
pub fn execute(&mut self, world: &mut World, resources: &mut Resources) {
self.run_systems(world, resources);
self.flush_command_buffers(world);
}
#[cfg(not(feature = "par-schedule"))]
pub fn run_systems(&mut self, world: &mut World, resources: &mut Resources) {
self.systems.iter_mut().for_each(|system| {
let system = unsafe { system.get_mut() };
system.prepare(world);
system.run(world, resources);
});
}
#[cfg(feature = "par-schedule")]
pub fn run_systems(&mut self, world: &mut World, resources: &mut Resources) {
rayon::join(
|| {},
|| {
match self.systems.len() {
1 => {
unsafe {
let system = self.systems[0].get_mut();
system.prepare(world);
system.run(world, resources);
};
}
_ => {
let systems = &mut self.systems;
let static_dependency_counts = &self.static_dependency_counts;
let awaiting = &mut self.awaiting;
systems
.par_iter_mut()
.for_each(|sys| unsafe { sys.get_mut() }.prepare(world));
izip!(
systems.iter(),
self.static_dependants.iter_mut(),
self.dynamic_dependants.iter_mut()
)
.par_bridge()
.for_each(|(sys, static_dep, dyn_dep)| {
let archetypes = unsafe { sys.get() }.accesses_archetypes();
for i in (0..dyn_dep.len()).rev() {
let dep = dyn_dep[i];
let other = unsafe { systems[dep].get() };
if !other.accesses_archetypes().is_disjoint(archetypes) {
static_dep.push(dep);
dyn_dep.swap_remove(i);
static_dependency_counts[dep].fetch_add(1, Ordering::Relaxed);
}
}
});
for (i, count) in static_dependency_counts.iter().enumerate() {
awaiting[i].store(count.load(Ordering::Relaxed), Ordering::Relaxed);
}
let awaiting = &self.awaiting;
(0..systems.len())
.filter(|i| awaiting[*i].load(Ordering::SeqCst) == 0)
.for_each(|i| {
unsafe { self.run_recursive(i, world, resources) };
});
}
}
},
);
}
pub fn flush_command_buffers(&mut self, world: &mut World) {
self.systems.iter().for_each(|system| {
let system = unsafe { system.get_mut() };
if let Some(mut cmd) = system.command_buffer_mut(world.id()) {
cmd.write(world);
}
});
}
#[cfg(feature = "par-schedule")]
unsafe fn run_recursive(&self, i: usize, world: &World, resources: &Resources) {
self.systems[i].get_mut().run_unsafe(world, resources);
self.static_dependants[i].par_iter().for_each(|dep| {
match self.awaiting[*dep].compare_exchange(
1,
std::usize::MAX,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
self.run_recursive(*dep, world, resources);
}
Err(_) => {
self.awaiting[*dep].fetch_sub(1, Ordering::Relaxed);
}
}
});
}
}
pub struct Builder {
steps: Vec<Step>,
accumulator: Vec<Box<dyn Schedulable>>,
}
impl Builder {
pub fn add_system<T: Into<Box<dyn Schedulable>>>(mut self, system: T) -> Self {
self.accumulator.push(system.into());
self
}
pub fn flush(mut self) -> Self {
self.finalize_executor();
self.steps.push(Step::FlushCmdBuffers);
self
}
fn finalize_executor(&mut self) {
if !self.accumulator.is_empty() {
let mut systems = Vec::new();
std::mem::swap(&mut self.accumulator, &mut systems);
let executor = Executor::new(systems);
self.steps.push(Step::Systems(executor));
}
}
pub fn add_thread_local_fn<F: FnMut(&mut World, &mut Resources) + 'static>(
mut self,
f: F,
) -> Self {
self.finalize_executor();
self.steps.push(Step::ThreadLocalFn(
Box::new(f) as Box<dyn FnMut(&mut World, &mut Resources)>
));
self
}
pub fn add_thread_local<S: Into<Box<dyn Runnable>>>(mut self, system: S) -> Self {
let system = system.into();
self.steps.push(Step::ThreadLocalSystem(system));
self
}
pub fn build(self) -> Schedule { self.into() }
}
impl Default for Builder {
fn default() -> Self {
Self {
steps: Vec::new(),
accumulator: Vec::new(),
}
}
}
pub enum Step {
Systems(Executor),
FlushCmdBuffers,
ThreadLocalFn(Box<dyn FnMut(&mut World, &mut Resources)>),
ThreadLocalSystem(Box<dyn Runnable>),
}
pub struct Schedule {
steps: Vec<Step>,
}
impl Schedule {
pub fn builder() -> Builder { Builder::default() }
pub fn execute(&mut self, world: &mut World, resources: &mut Resources) {
let mut waiting_flush: Vec<&mut Executor> = Vec::new();
let mut thread_local_systems: Vec<&mut Box<dyn Runnable>> = Vec::new();
for step in &mut self.steps {
match step {
Step::Systems(executor) => {
executor.run_systems(world, resources);
waiting_flush.push(executor);
}
Step::FlushCmdBuffers => {
waiting_flush
.drain(..)
.for_each(|e| e.flush_command_buffers(world));
thread_local_systems.drain(..).for_each(|system| {
system.command_buffer_mut(world.id()).unwrap().write(world)
});
}
Step::ThreadLocalFn(function) => function(world, resources),
Step::ThreadLocalSystem(system) => {
system.run(world, resources);
if system.command_buffer_mut(world.id()).is_some() {
thread_local_systems.push(system)
}
}
}
}
}
pub fn into_vec(self) -> Vec<Step> { self.steps }
}
impl From<Builder> for Schedule {
fn from(builder: Builder) -> Self {
Self {
steps: builder.flush().steps,
}
}
}
impl From<Vec<Step>> for Schedule {
fn from(steps: Vec<Step>) -> Self { Self { steps } }
}
#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::*;
use itertools::sorted;
use legion_core::prelude::*;
use std::sync::{Arc, Mutex};
#[test]
fn execute_in_order() {
let universe = Universe::new();
let mut world = universe.create_world();
#[derive(Default)]
struct Resource;
let mut resources = Resources::default();
resources.insert(Resource);
let order = Arc::new(Mutex::new(Vec::new()));
let order_clone = order.clone();
let system_one = SystemBuilder::new("one")
.write_resource::<Resource>()
.build(move |_, _, _, _| order_clone.lock().unwrap().push(1usize));
let order_clone = order.clone();
let system_two = SystemBuilder::new("two")
.write_resource::<Resource>()
.build(move |_, _, _, _| order_clone.lock().unwrap().push(2usize));
let order_clone = order.clone();
let system_three = SystemBuilder::new("three")
.write_resource::<Resource>()
.build(move |_, _, _, _| order_clone.lock().unwrap().push(3usize));
let mut schedule = Schedule::builder()
.add_system(system_one)
.add_system(system_two)
.add_system(system_three)
.build();
schedule.execute(&mut world, &mut resources);
let order = order.lock().unwrap();
let sorted: Vec<usize> = sorted(order.clone()).collect();
assert_eq!(*order, sorted);
}
#[test]
fn flush() {
let universe = Universe::new();
let mut world = universe.create_world();
let mut resources = Resources::default();
#[derive(Clone, Copy, Debug, PartialEq)]
struct TestComp(f32, f32, f32);
let system_one = SystemBuilder::new("one").build(move |cmd, _, _, _| {
cmd.insert((), vec![(TestComp(0., 0., 0.),)]);
});
let system_two = SystemBuilder::new("two")
.with_query(Write::<TestComp>::query())
.build(move |_, world, _, query| {
assert_eq!(0, query.iter_mut(world).count());
});
let system_three = SystemBuilder::new("three")
.with_query(Write::<TestComp>::query())
.build(move |_, world, _, query| {
assert_eq!(1, query.iter_mut(world).count());
});
let mut schedule = Schedule::builder()
.add_system(system_one)
.add_system(system_two)
.flush()
.add_system(system_three)
.build();
schedule.execute(&mut world, &mut resources);
}
#[test]
fn flush_thread_local() {
let universe = Universe::new();
let mut world = universe.create_world();
let mut resources = Resources::default();
#[derive(Clone, Copy, Debug, PartialEq)]
struct TestComp(f32, f32, f32);
let entity = Arc::new(Mutex::new(None));
{
let entity = entity.clone();
let system_one = SystemBuilder::new("one").build_thread_local(move |cmd, _, _, _| {
let mut entity = entity.lock().unwrap();
*entity = Some(cmd.insert((), vec![(TestComp(0.0, 0.0, 0.0),)])[0]);
});
let mut schedule = Schedule::builder().add_thread_local(system_one).build();
schedule.execute(&mut world, &mut resources);
}
let entity = entity.lock().unwrap();
assert!(entity.is_some());
assert!(world.get_entity_location(entity.unwrap()).is_some());
}
}