legion_systems/
schedule.rs

1use crate::{
2    resource::{ResourceTypeId, Resources},
3    system::SystemId,
4};
5use legion_core::{
6    borrow::RefMut,
7    command::CommandBuffer,
8    storage::ComponentTypeId,
9    subworld::ArchetypeAccess,
10    world::{World, WorldId},
11};
12use std::cell::UnsafeCell;
13
14#[cfg(feature = "par-schedule")]
15use tracing::{span, trace, Level};
16
17#[cfg(feature = "par-schedule")]
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20#[cfg(feature = "par-schedule")]
21use fxhash::{FxHashMap, FxHashSet};
22
23#[cfg(feature = "par-schedule")]
24use rayon::prelude::*;
25
26#[cfg(feature = "par-schedule")]
27use itertools::izip;
28
29#[cfg(feature = "par-schedule")]
30use std::iter::repeat;
31
32/// Empty trait which defines a `System` as schedulable by the dispatcher - this requires that the
33/// type is both `Send` and `Sync`.
34///
35/// This is automatically implemented for all types that implement `Runnable` which meet the requirements.
36pub trait Schedulable: Runnable + Send + Sync {}
37impl<T> Schedulable for T where T: Runnable + Send + Sync {}
38
39/// Trait describing a schedulable type. This is implemented by `System`
40pub trait Runnable {
41    /// Gets the name of the system.
42    fn name(&self) -> &SystemId;
43
44    /// Gets the resources and component types read by the system.
45    fn reads(&self) -> (&[ResourceTypeId], &[ComponentTypeId]);
46
47    /// Gets the resources and component types written by the system.
48    fn writes(&self) -> (&[ResourceTypeId], &[ComponentTypeId]);
49
50    /// Prepares the system for execution against a world.
51    fn prepare(&mut self, world: &World);
52
53    /// Gets the set of archetypes the system will access when run,
54    /// as determined when the system was last prepared.
55    fn accesses_archetypes(&self) -> &ArchetypeAccess;
56
57    /// Runs the system.
58    ///
59    /// # Safety
60    ///
61    /// The shared references to world and resources may result in
62    /// unsound mutable aliasing if other code is accessing the same components or
63    /// resources as this system. Prefer to use `run` when possible.
64    unsafe fn run_unsafe(&mut self, world: &World, resources: &Resources);
65
66    /// Gets the system's command buffer.
67    fn command_buffer_mut(&self, world: WorldId) -> Option<RefMut<CommandBuffer>>;
68
69    /// Runs the system.
70    fn run(&mut self, world: &mut World, resources: &mut Resources) {
71        unsafe { self.run_unsafe(world, resources) };
72    }
73}
74
75/// Executes a sequence of systems, potentially in parallel, and then commits their command buffers.
76///
77/// Systems are provided in execution order. When the `par-schedule` feature is enabled, the `Executor`
78/// may run some systems in parallel. The order in which side-effects (e.g. writes to resources
79/// or entities) are observed is maintained.
80pub struct Executor {
81    systems: Vec<SystemBox>,
82    #[cfg(feature = "par-schedule")]
83    static_dependants: Vec<Vec<usize>>,
84    #[cfg(feature = "par-schedule")]
85    dynamic_dependants: Vec<Vec<usize>>,
86    #[cfg(feature = "par-schedule")]
87    static_dependency_counts: Vec<AtomicUsize>,
88    #[cfg(feature = "par-schedule")]
89    awaiting: Vec<AtomicUsize>,
90}
91
92struct SystemBox(UnsafeCell<Box<dyn Schedulable>>);
93
94// NOT SAFE:
95// This type is only safe to use as Send and Sync within
96// the constraints of how it is used inside Executor
97unsafe impl Send for SystemBox {}
98unsafe impl Sync for SystemBox {}
99
100impl SystemBox {
101    #[cfg(feature = "par-schedule")]
102    unsafe fn get(&self) -> &dyn Schedulable { std::ops::Deref::deref(&*self.0.get()) }
103
104    #[allow(clippy::mut_from_ref)]
105    unsafe fn get_mut(&self) -> &mut dyn Schedulable {
106        std::ops::DerefMut::deref_mut(&mut *self.0.get())
107    }
108}
109
110impl Executor {
111    /// Constructs a new executor for all systems to be run in a single stage.
112    ///
113    /// Systems are provided in the order in which side-effects (e.g. writes to resources or entities)
114    /// are to be observed.
115    #[cfg(not(feature = "par-schedule"))]
116    pub fn new(systems: Vec<Box<dyn Schedulable>>) -> Self {
117        Self {
118            systems: systems
119                .into_iter()
120                .map(|s| SystemBox(UnsafeCell::new(s)))
121                .collect(),
122        }
123    }
124
125    /// Constructs a new executor for all systems to be run in a single stage.
126    ///
127    /// Systems are provided in the order in which side-effects (e.g. writes to resources or entities)
128    /// are to be observed.
129    #[cfg(feature = "par-schedule")]
130    #[allow(clippy::cognitive_complexity)]
131    // TODO: we should break this up
132    pub fn new(systems: Vec<Box<dyn Schedulable>>) -> Self {
133        if systems.len() > 1 {
134            let mut static_dependency_counts = Vec::with_capacity(systems.len());
135
136            let mut static_dependants: Vec<Vec<_>> =
137                repeat(Vec::with_capacity(64)).take(systems.len()).collect();
138            let mut dynamic_dependants: Vec<Vec<_>> =
139                repeat(Vec::with_capacity(64)).take(systems.len()).collect();
140
141            let mut resource_last_mutated =
142                FxHashMap::<ResourceTypeId, usize>::with_capacity_and_hasher(
143                    64,
144                    Default::default(),
145                );
146            let mut resource_last_read =
147                FxHashMap::<ResourceTypeId, usize>::with_capacity_and_hasher(
148                    64,
149                    Default::default(),
150                );
151            let mut component_last_mutated =
152                FxHashMap::<ComponentTypeId, usize>::with_capacity_and_hasher(
153                    64,
154                    Default::default(),
155                );
156            let mut component_last_read =
157                FxHashMap::<ComponentTypeId, usize>::with_capacity_and_hasher(
158                    64,
159                    Default::default(),
160                );
161
162            for (i, system) in systems.iter().enumerate() {
163                let span = span!(
164                    Level::TRACE,
165                    "Building system dependencies",
166                    system = %system.name(),
167                    index = i,
168                );
169                let _guard = span.enter();
170
171                let (read_res, read_comp) = system.reads();
172                let (write_res, write_comp) = system.writes();
173
174                // find resource access dependencies
175                let mut dependencies = FxHashSet::with_capacity_and_hasher(64, Default::default());
176                for res in read_res {
177                    trace!(resource = ?res, "Read resource");
178                    if let Some(n) = resource_last_mutated.get(res) {
179                        trace!(system_index = n, "Added write dependency");
180                        dependencies.insert(*n);
181                    }
182                }
183                for res in write_res {
184                    trace!(resource = ?res, "Write resource");
185                    // Writes have to be exclusive, so we are dependent on reads too
186                    if let Some(n) = resource_last_read.get(res) {
187                        trace!(system_index = n, "Added read dependency");
188                        dependencies.insert(*n);
189                    }
190
191                    if let Some(n) = resource_last_mutated.get(res) {
192                        trace!(system_index = n, "Added write dependency");
193                        dependencies.insert(*n);
194                    }
195                }
196
197                // update access tracking
198                for res in read_res {
199                    resource_last_read.insert(*res, i);
200                }
201                for res in write_res {
202                    resource_last_read.insert(*res, i);
203                    resource_last_mutated.insert(*res, i);
204                }
205
206                static_dependency_counts.push(AtomicUsize::from(dependencies.len()));
207                trace!(dependants = ?dependencies, dependency_counts = ?static_dependency_counts, "Computed static dependants");
208                for dep in &dependencies {
209                    static_dependants[*dep].push(i);
210                }
211
212                // find component access dependencies
213                let mut comp_dependencies = FxHashSet::default();
214                for comp in read_comp {
215                    trace!(component = ?comp, "Read component");
216                    if let Some(n) = component_last_mutated.get(comp) {
217                        trace!(system_index = n, "Added write dependency");
218                        comp_dependencies.insert(*n);
219                    }
220                }
221                for comp in write_comp {
222                    // writes have to be exclusive, so we are dependent on reads too
223                    trace!(component = ?comp, "Write component");
224                    if let Some(n) = component_last_read.get(comp) {
225                        trace!(system_index = n, "Added read dependency");
226                        comp_dependencies.insert(*n);
227                    }
228                    if let Some(n) = component_last_mutated.get(comp) {
229                        trace!(system_index = n, "Added write dependency");
230                        comp_dependencies.insert(*n);
231                    }
232                }
233
234                // update access tracking
235                for comp in read_comp {
236                    component_last_read.insert(*comp, i);
237                }
238                for comp in write_comp {
239                    component_last_read.insert(*comp, i);
240                    component_last_mutated.insert(*comp, i);
241                }
242
243                // remove dependencies which are already static from dynamic dependencies
244                for static_dep in &dependencies {
245                    comp_dependencies.remove(static_dep);
246                }
247
248                trace!(depentants = ?comp_dependencies, "Computed dynamic dependants");
249                for dep in comp_dependencies {
250                    if dep != i {
251                        // dont be dependent on ourselves
252                        dynamic_dependants[dep].push(i);
253                    }
254                }
255            }
256
257            trace!(
258                ?static_dependants,
259                ?dynamic_dependants,
260                "Computed system dependencies"
261            );
262
263            let mut awaiting = Vec::with_capacity(systems.len());
264            systems
265                .iter()
266                .for_each(|_| awaiting.push(AtomicUsize::new(0)));
267
268            Executor {
269                awaiting,
270                static_dependants,
271                dynamic_dependants,
272                static_dependency_counts,
273                systems: systems
274                    .into_iter()
275                    .map(|s| SystemBox(UnsafeCell::new(s)))
276                    .collect(),
277            }
278        } else {
279            Executor {
280                awaiting: Vec::with_capacity(0),
281                static_dependants: Vec::with_capacity(0),
282                dynamic_dependants: Vec::with_capacity(0),
283                static_dependency_counts: Vec::with_capacity(0),
284                systems: systems
285                    .into_iter()
286                    .map(|s| SystemBox(UnsafeCell::new(s)))
287                    .collect(),
288            }
289        }
290    }
291
292    /// Converts this executor into a vector of its component systems.
293    pub fn into_vec(self) -> Vec<Box<dyn Schedulable>> {
294        self.systems.into_iter().map(|s| s.0.into_inner()).collect()
295    }
296
297    /// Executes all systems and then flushes their command buffers.
298    pub fn execute(&mut self, world: &mut World, resources: &mut Resources) {
299        self.run_systems(world, resources);
300        self.flush_command_buffers(world);
301    }
302
303    /// Executes all systems sequentially.
304    ///
305    /// Only enabled with par-schedule is disabled
306    #[cfg(not(feature = "par-schedule"))]
307    pub fn run_systems(&mut self, world: &mut World, resources: &mut Resources) {
308        self.systems.iter_mut().for_each(|system| {
309            let system = unsafe { system.get_mut() };
310            system.prepare(world);
311            system.run(world, resources);
312        });
313    }
314
315    /// Executes all systems, potentially in parallel.
316    ///
317    /// Ordering is retained in so far as the order of observed resource and component
318    /// accesses is maintained.
319    ///
320    /// Call from within `rayon::ThreadPool::install()` to execute within a specific thread pool.
321    #[cfg(feature = "par-schedule")]
322    pub fn run_systems(&mut self, world: &mut World, resources: &mut Resources) {
323        rayon::join(
324            || {},
325            || {
326                match self.systems.len() {
327                    1 => {
328                        // safety: we have exlusive access to all systems, world and resources here
329                        unsafe {
330                            let system = self.systems[0].get_mut();
331                            system.prepare(world);
332                            system.run(world, resources);
333                        };
334                    }
335                    _ => {
336                        let systems = &mut self.systems;
337                        let static_dependency_counts = &self.static_dependency_counts;
338                        let awaiting = &mut self.awaiting;
339
340                        // prepare all systems - archetype filters are pre-executed here
341                        systems
342                            .par_iter_mut()
343                            .for_each(|sys| unsafe { sys.get_mut() }.prepare(world));
344
345                        // determine dynamic dependencies
346                        izip!(
347                            systems.iter(),
348                            self.static_dependants.iter_mut(),
349                            self.dynamic_dependants.iter_mut()
350                        )
351                        .par_bridge()
352                        .for_each(|(sys, static_dep, dyn_dep)| {
353                            // safety: systems is held exclusively, and we are only reading each system
354                            let archetypes = unsafe { sys.get() }.accesses_archetypes();
355                            for i in (0..dyn_dep.len()).rev() {
356                                let dep = dyn_dep[i];
357                                let other = unsafe { systems[dep].get() };
358
359                                // if the archetype sets intersect,
360                                // then we can move the dynamic dependant into the static dependants set
361                                if !other.accesses_archetypes().is_disjoint(archetypes) {
362                                    static_dep.push(dep);
363                                    dyn_dep.swap_remove(i);
364                                    static_dependency_counts[dep].fetch_add(1, Ordering::Relaxed);
365                                }
366                            }
367                        });
368
369                        // initialize dependency tracking
370                        for (i, count) in static_dependency_counts.iter().enumerate() {
371                            awaiting[i].store(count.load(Ordering::Relaxed), Ordering::Relaxed);
372                        }
373
374                        let awaiting = &self.awaiting;
375
376                        trace!(?awaiting, "Initialized await counts");
377
378                        // execute all systems with no outstanding dependencies
379                        (0..systems.len())
380                            .into_par_iter()
381                            .filter(|i| static_dependency_counts[*i].load(Ordering::SeqCst) == 0)
382                            .for_each(|i| {
383                                // safety: we are at the root of the execution tree, so we know each
384                                // index is exclusive here
385                                unsafe { self.run_recursive(i, world, resources) };
386                            });
387
388                        debug_assert!(
389                            awaiting.iter().all(|x| x.load(Ordering::SeqCst) == 0),
390                            "not all systems run: {:?}",
391                            awaiting
392                        );
393                    }
394                }
395            },
396        );
397    }
398
399    /// Flushes the recorded command buffers for all systems.
400    pub fn flush_command_buffers(&mut self, world: &mut World) {
401        self.systems.iter().for_each(|system| {
402            // safety: systems are exlcusive due to &mut self
403            let system = unsafe { system.get_mut() };
404            if let Some(mut cmd) = system.command_buffer_mut(world.id()) {
405                cmd.write(world);
406            }
407        });
408    }
409
410    /// Recursively execute through the generated depedency cascade and exhaust it.
411    ///
412    /// # Safety
413    ///
414    /// Ensure the system indexed by `i` is only accessed once.
415    #[cfg(feature = "par-schedule")]
416    unsafe fn run_recursive(&self, i: usize, world: &World, resources: &Resources) {
417        // safety: the caller ensures nothing else is accessing systems[i]
418        self.systems[i].get_mut().run_unsafe(world, resources);
419
420        self.static_dependants[i].par_iter().for_each(|dep| {
421            if self.awaiting[*dep].fetch_sub(1, Ordering::Relaxed) == 1 {
422                // safety: each dependency is unique, so run_recursive is safe to call
423                self.run_recursive(*dep, world, resources);
424            }
425        });
426    }
427}
428
429/// A factory for `Schedule`.
430pub struct Builder {
431    steps: Vec<Step>,
432    accumulator: Vec<Box<dyn Schedulable>>,
433}
434
435impl Builder {
436    /// Adds a system to the schedule.
437    pub fn add_system<T: Into<Box<dyn Schedulable>>>(mut self, system: T) -> Self {
438        self.accumulator.push(system.into());
439        self
440    }
441
442    /// Waits for executing systems to complete, and the flushes all outstanding system
443    /// command buffers.
444    pub fn flush(mut self) -> Self {
445        self.finalize_executor();
446        self.steps.push(Step::FlushCmdBuffers);
447        self
448    }
449
450    fn finalize_executor(&mut self) {
451        if !self.accumulator.is_empty() {
452            let mut systems = Vec::new();
453            std::mem::swap(&mut self.accumulator, &mut systems);
454            let executor = Executor::new(systems);
455            self.steps.push(Step::Systems(executor));
456        }
457    }
458
459    /// Adds a thread local function to the schedule. This function will be executed on the main thread.
460    pub fn add_thread_local_fn<F: FnMut(&mut World, &mut Resources) + 'static>(
461        mut self,
462        f: F,
463    ) -> Self {
464        self.finalize_executor();
465        self.steps.push(Step::ThreadLocalFn(
466            Box::new(f) as Box<dyn FnMut(&mut World, &mut Resources)>
467        ));
468        self
469    }
470
471    /// Adds a thread local system to the schedule. This system will be executed on the main thread.
472    pub fn add_thread_local<S: Into<Box<dyn Runnable>>>(mut self, system: S) -> Self {
473        self.finalize_executor();
474        let system = system.into();
475        self.steps.push(Step::ThreadLocalSystem(system));
476        self
477    }
478
479    /// Finalizes the builder into a `Schedule`.
480    pub fn build(self) -> Schedule { self.into() }
481}
482
483impl Default for Builder {
484    fn default() -> Self {
485        Self {
486            steps: Vec::new(),
487            accumulator: Vec::new(),
488        }
489    }
490}
491
492/// A step in a schedule.
493pub enum Step {
494    /// A batch of systems.
495    Systems(Executor),
496    /// Flush system command buffers.
497    FlushCmdBuffers,
498    /// A thread local function.
499    ThreadLocalFn(Box<dyn FnMut(&mut World, &mut Resources)>),
500    /// A thread local system
501    ThreadLocalSystem(Box<dyn Runnable>),
502}
503
504/// A schedule of systems for execution.
505///
506/// # Examples
507///
508/// ```rust
509/// # use legion_core::prelude::*;
510/// # use legion_systems::prelude::*;
511/// # let find_collisions = SystemBuilder::new("find_collisions").build(|_,_,_,_| {});
512/// # let calculate_acceleration = SystemBuilder::new("calculate_acceleration").build(|_,_,_,_| {});
513/// # let update_positions = SystemBuilder::new("update_positions").build(|_,_,_,_| {});
514/// let mut world = World::new();
515/// let mut resources = Resources::default();
516/// let mut schedule = Schedule::builder()
517///     .add_system(find_collisions)
518///     .flush()
519///     .add_system(calculate_acceleration)
520///     .add_system(update_positions)
521///     .build();
522///
523/// schedule.execute(&mut world, &mut resources);
524/// ```
525pub struct Schedule {
526    steps: Vec<Step>,
527}
528
529impl Schedule {
530    /// Creates a new schedule builder.
531    pub fn builder() -> Builder { Builder::default() }
532
533    /// Executes all of the steps in the schedule.
534    pub fn execute(&mut self, world: &mut World, resources: &mut Resources) {
535        enum ToFlush<'a> {
536            Executor(&'a mut Executor),
537            System(RefMut<'a, CommandBuffer>),
538        }
539
540        let mut waiting_flush: Vec<ToFlush> = Vec::new();
541        for step in &mut self.steps {
542            match step {
543                Step::Systems(executor) => {
544                    executor.run_systems(world, resources);
545                    waiting_flush.push(ToFlush::Executor(executor));
546                }
547                Step::FlushCmdBuffers => {
548                    waiting_flush.drain(..).for_each(|e| match e {
549                        ToFlush::Executor(exec) => exec.flush_command_buffers(world),
550                        ToFlush::System(mut cmd) => cmd.write(world),
551                    });
552                }
553                Step::ThreadLocalFn(function) => function(world, resources),
554                Step::ThreadLocalSystem(system) => {
555                    system.run(world, resources);
556                    if let Some(cmd) = system.command_buffer_mut(world.id()) {
557                        waiting_flush.push(ToFlush::System(cmd));
558                    }
559                }
560            }
561        }
562    }
563
564    /// Converts the schedule into a vector of steps.
565    pub fn into_vec(self) -> Vec<Step> { self.steps }
566}
567
568impl From<Builder> for Schedule {
569    fn from(builder: Builder) -> Self {
570        Self {
571            steps: builder.flush().steps,
572        }
573    }
574}
575
576impl From<Vec<Step>> for Schedule {
577    fn from(steps: Vec<Step>) -> Self { Self { steps } }
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583    use crate::prelude::*;
584    use itertools::sorted;
585    use legion_core::prelude::*;
586    use std::sync::{Arc, Mutex};
587
588    #[test]
589    fn execute_in_order() {
590        let universe = Universe::new();
591        let mut world = universe.create_world();
592
593        #[derive(Default)]
594        struct Resource;
595
596        let mut resources = Resources::default();
597        resources.insert(Resource);
598
599        let order = Arc::new(Mutex::new(Vec::new()));
600
601        let order_clone = order.clone();
602        let system_one = SystemBuilder::new("one")
603            .write_resource::<Resource>()
604            .build(move |_, _, _, _| order_clone.lock().unwrap().push(1usize));
605        let order_clone = order.clone();
606        let system_two = SystemBuilder::new("two")
607            .write_resource::<Resource>()
608            .build(move |_, _, _, _| order_clone.lock().unwrap().push(2usize));
609        let order_clone = order.clone();
610        let system_three = SystemBuilder::new("three")
611            .write_resource::<Resource>()
612            .build(move |_, _, _, _| order_clone.lock().unwrap().push(3usize));
613
614        let mut schedule = Schedule::builder()
615            .add_system(system_one)
616            .add_system(system_two)
617            .add_system(system_three)
618            .build();
619
620        schedule.execute(&mut world, &mut resources);
621
622        let order = order.lock().unwrap();
623        let sorted: Vec<usize> = sorted(order.clone()).collect();
624        assert_eq!(*order, sorted);
625    }
626
627    #[test]
628    fn flush() {
629        let universe = Universe::new();
630        let mut world = universe.create_world();
631        let mut resources = Resources::default();
632
633        #[derive(Clone, Copy, Debug, PartialEq)]
634        struct TestComp(f32, f32, f32);
635
636        let system_one = SystemBuilder::new("one").build(move |cmd, _, _, _| {
637            cmd.insert((), vec![(TestComp(0., 0., 0.),)]);
638        });
639        let system_two = SystemBuilder::new("two")
640            .with_query(Write::<TestComp>::query())
641            .build(move |_, world, _, query| {
642                assert_eq!(0, query.iter_mut(world).count());
643            });
644        let system_three = SystemBuilder::new("three")
645            .with_query(Write::<TestComp>::query())
646            .build(move |_, world, _, query| {
647                assert_eq!(1, query.iter_mut(world).count());
648            });
649
650        let mut schedule = Schedule::builder()
651            .add_system(system_one)
652            .add_system(system_two)
653            .flush()
654            .add_system(system_three)
655            .build();
656
657        schedule.execute(&mut world, &mut resources);
658    }
659
660    #[test]
661    fn flush_thread_local() {
662        let universe = Universe::new();
663        let mut world = universe.create_world();
664        let mut resources = Resources::default();
665
666        #[derive(Clone, Copy, Debug, PartialEq)]
667        struct TestComp(f32, f32, f32);
668
669        let entity = Arc::new(Mutex::new(None));
670
671        {
672            let entity = entity.clone();
673
674            let system_one = SystemBuilder::new("one").build_thread_local(move |cmd, _, _, _| {
675                let mut entity = entity.lock().unwrap();
676                *entity = Some(cmd.insert((), vec![(TestComp(0.0, 0.0, 0.0),)])[0]);
677            });
678
679            let mut schedule = Schedule::builder().add_thread_local(system_one).build();
680
681            schedule.execute(&mut world, &mut resources);
682        }
683
684        let entity = entity.lock().unwrap();
685
686        assert!(entity.is_some());
687        assert!(world.get_entity_location(entity.unwrap()).is_some());
688    }
689}