Skip to main content

yaks/
executor_parallel.rs

1use crossbeam_channel::{Receiver, Sender};
2use hecs::ArchetypesGeneration;
3use hecs::World;
4use parking_lot::Mutex;
5use rayon::prelude::*;
6use std::{
7    collections::{HashMap, HashSet},
8    sync::Arc,
9};
10
11use crate::{
12    ArchetypeSet, ComponentSet, ExecutorBuilder, ResourceSet, ResourceTuple, ResourceWrap,
13    SystemClosure, SystemContext, SystemId,
14};
15
16static DISCONNECTED: &str = "channel should not be disconnected at this point";
17static INVALID_ID: &str = "system IDs should always be valid";
18
19pub enum ExecutorParallel<'closures, Resources>
20where
21    Resources: ResourceTuple,
22{
23    // TODO consider more granularity:
24    // scheduler, disjoint scheduler, dispatcher, disjoint dispatcher
25    Dispatching(Dispatcher<'closures, Resources>),
26    Scheduling(Scheduler<'closures, Resources>),
27}
28
29impl<'closures, Resources> ExecutorParallel<'closures, Resources>
30where
31    Resources: ResourceTuple,
32{
33    pub fn build<Handle>(builder: ExecutorBuilder<'closures, Resources, Handle>) -> Self {
34        // This will cache dependencies for later conversion into dependants.
35        let mut all_dependencies = Vec::new();
36        let mut systems_without_dependencies = Vec::new();
37        let ExecutorBuilder {
38            mut systems,
39            mut all_component_types,
40            ..
41        } = builder;
42        // This guarantees iteration order; TODO probably not necessary?..
43        let all_component_types = all_component_types.drain().collect::<Vec<_>>();
44        let mut systems: HashMap<SystemId, System<'closures, Resources>> = systems
45            .drain()
46            .map(|(id, system)| {
47                let dependencies = system.dependencies.len();
48                // Remember systems with no dependencies, these will be queued first on run.
49                if dependencies == 0 {
50                    systems_without_dependencies.push(id);
51                }
52                all_dependencies.push((id, system.dependencies));
53                (
54                    id,
55                    System {
56                        closure: Arc::new(Mutex::new(system.closure)),
57                        resource_set: system.resource_set,
58                        component_set: system.component_type_set.condense(&all_component_types),
59                        archetype_set: ArchetypeSet::default(),
60                        archetype_writer: system.archetype_writer,
61                        dependants: vec![],
62                        dependencies,
63                        unsatisfied_dependencies: 0,
64                    },
65                )
66            })
67            .collect();
68        // If all systems are independent, it might be possible to use dispatching heuristic.
69        if systems.len() == systems_without_dependencies.len() {
70            let mut tested_ids = Vec::new();
71            let mut all_disjoint = true;
72            'outer: for (id, system) in &systems {
73                tested_ids.push(*id);
74                for (id, other) in &systems {
75                    if !tested_ids.contains(id)
76                        && (!system.resource_set.is_compatible(&other.resource_set)
77                            || !system.component_set.is_compatible(&other.component_set))
78                    {
79                        all_disjoint = false;
80                        break 'outer;
81                    }
82                }
83            }
84            if all_disjoint {
85                return ExecutorParallel::Dispatching(Dispatcher {
86                    borrows: Resources::instantiate_borrows(),
87                    systems: systems
88                        .drain()
89                        .map(|(id, system)| (id, system.closure))
90                        .collect(),
91                });
92            }
93        }
94        // Convert system-dependencies mapping to system-dependants mapping.
95        for (dependant_id, mut dependencies) in all_dependencies.drain(..) {
96            for dependee_id in dependencies.drain(..) {
97                systems
98                    .get_mut(&dependee_id)
99                    .expect(INVALID_ID)
100                    .dependants
101                    .push(dependant_id);
102            }
103        }
104        // Cache amount of dependants the system has.
105        let mut systems_without_dependencies: Vec<_> = systems_without_dependencies
106            .drain(..)
107            .map(|id| {
108                (
109                    id,
110                    DependantsLength(systems.get(&id).expect(INVALID_ID).dependants.len()),
111                )
112            })
113            .collect();
114        // Sort independent systems so that those with most dependants are queued first.
115        systems_without_dependencies.sort_by(|(_, a), (_, b)| b.cmp(a));
116        // This should be guaranteed by the builder's logic anyway.
117        debug_assert!(!systems_without_dependencies.is_empty());
118        let (sender, receiver) = crossbeam_channel::unbounded();
119        ExecutorParallel::Scheduling(Scheduler {
120            borrows: Resources::instantiate_borrows(),
121            systems,
122            archetypes_generation: None,
123            systems_without_dependencies,
124            systems_to_run_now: Vec::new(),
125            systems_running: HashSet::new(),
126            systems_just_finished: Vec::new(),
127            systems_to_decrement_dependencies: Vec::new(),
128            sender,
129            receiver,
130        })
131    }
132
133    pub fn force_archetype_recalculation(&mut self) {
134        match self {
135            ExecutorParallel::Dispatching(_) => (),
136            ExecutorParallel::Scheduling(scheduler) => scheduler.archetypes_generation = None,
137        }
138    }
139
140    pub fn run<ResourceTuple>(&mut self, world: &World, resources: ResourceTuple)
141    where
142        ResourceTuple: ResourceWrap<Cells = Resources::Cells, Borrows = Resources::Borrows> + Send,
143        Resources::Borrows: Send,
144        Resources::Cells: Send + Sync,
145    {
146        match self {
147            ExecutorParallel::Dispatching(dispatcher) => dispatcher.run(world, resources),
148            ExecutorParallel::Scheduling(scheduler) => scheduler.run(world, resources),
149        }
150    }
151}
152
153pub struct Dispatcher<'closures, Resources>
154where
155    Resources: ResourceTuple,
156{
157    borrows: Resources::Borrows,
158    systems: HashMap<SystemId, Arc<Mutex<SystemClosure<'closures, Resources::Cells>>>>,
159}
160
161impl<'closures, Resources> Dispatcher<'closures, Resources>
162where
163    Resources: ResourceTuple,
164{
165    fn run<ResourceTuple>(&mut self, world: &World, mut resources: ResourceTuple)
166    where
167        ResourceTuple: ResourceWrap<Cells = Resources::Cells, Borrows = Resources::Borrows> + Send,
168        Resources::Borrows: Send,
169        Resources::Cells: Send + Sync,
170    {
171        let wrapped = resources.wrap(&mut self.borrows);
172        self.systems.par_iter().for_each(|(id, system)| {
173            let system = &mut *system
174                .try_lock() // TODO should this be .lock() instead?
175                .expect("systems should only be ran once per execution");
176            system(
177                SystemContext {
178                    system_id: Some(*id),
179                    world,
180                },
181                &wrapped,
182            );
183        });
184    }
185}
186
187struct System<'closure, Resources>
188where
189    Resources: ResourceTuple,
190{
191    closure: Arc<Mutex<SystemClosure<'closure, Resources::Cells>>>,
192    resource_set: ResourceSet,
193    component_set: ComponentSet,
194    archetype_set: ArchetypeSet,
195    archetype_writer: Box<dyn Fn(&World, &mut ArchetypeSet) + Send>,
196    dependants: Vec<SystemId>,
197    dependencies: usize,
198    unsatisfied_dependencies: usize,
199}
200
201#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
202struct DependantsLength(usize);
203
204pub struct Scheduler<'closures, Resources>
205where
206    Resources: ResourceTuple,
207{
208    borrows: Resources::Borrows,
209    systems: HashMap<SystemId, System<'closures, Resources>>,
210    archetypes_generation: Option<ArchetypesGeneration>,
211    systems_without_dependencies: Vec<(SystemId, DependantsLength)>,
212    systems_to_run_now: Vec<(SystemId, DependantsLength)>,
213    systems_running: HashSet<SystemId>,
214    systems_just_finished: Vec<SystemId>,
215    systems_to_decrement_dependencies: Vec<SystemId>,
216    sender: Sender<SystemId>,
217    receiver: Receiver<SystemId>,
218}
219
220impl<'closures, Resources> Scheduler<'closures, Resources>
221where
222    Resources: ResourceTuple,
223{
224    fn run<ResourceTuple>(&mut self, world: &World, mut resources: ResourceTuple)
225    where
226        ResourceTuple: ResourceWrap<Cells = Resources::Cells, Borrows = Resources::Borrows> + Send,
227        Resources::Borrows: Send,
228        Resources::Cells: Send + Sync,
229    {
230        if Some(world.archetypes_generation()) == self.archetypes_generation {
231            // If archetypes haven't changed since last run, reset dependency counters.
232            for system in self.systems.values_mut() {
233                debug_assert!(system.unsatisfied_dependencies == 0);
234                system.unsatisfied_dependencies = system.dependencies;
235            }
236        } else {
237            // If archetypes have changed, recalculate archetype sets for all systems,
238            // and reset dependency counters.
239            self.archetypes_generation = Some(world.archetypes_generation());
240            for system in self.systems.values_mut() {
241                (system.archetype_writer)(world, &mut system.archetype_set);
242                debug_assert!(system.unsatisfied_dependencies == 0);
243                system.unsatisfied_dependencies = system.dependencies;
244            }
245        }
246        // Queue systems that don't have any dependencies to run first.
247        self.systems_to_run_now
248            .extend(&self.systems_without_dependencies);
249        // Wrap resources for disjoint fetching.
250        let wrapped = resources.wrap(&mut self.borrows);
251        let wrapped = &wrapped;
252        rayon::scope(|scope| {
253            // All systems have been ran if there are no queued or currently running systems.
254            while !(self.systems_to_run_now.is_empty() && self.systems_running.is_empty()) {
255                for (id, _) in &self.systems_to_run_now {
256                    // Check if a queued system can run concurrently with
257                    // other systems already running.
258                    if self.can_run_now(*id) {
259                        // Add it to the currently running systems set.
260                        self.systems_running.insert(*id);
261                        // Pointers and data sent over to a worker thread.
262                        let system = self.systems.get_mut(id).expect(INVALID_ID).closure.clone();
263                        let sender = self.sender.clone();
264                        let id = *id;
265                        scope.spawn(move |_| {
266                            let system = &mut *system
267                                .try_lock() // TODO should this be .lock() instead?
268                                .expect("systems should only be ran once per execution");
269                            system(
270                                SystemContext {
271                                    system_id: Some(id),
272                                    world,
273                                },
274                                wrapped,
275                            );
276                            // Notify dispatching thread than this system has finished running.
277                            sender.send(id).expect(DISCONNECTED);
278                        });
279                    }
280                }
281                {
282                    // Remove newly running systems from systems-to-run-now set.
283                    // TODO replace with `.drain_filter()` once stable
284                    //  https://github.com/rust-lang/rust/issues/43244
285                    let mut i = 0;
286                    while i != self.systems_to_run_now.len() {
287                        if self.systems_running.contains(&self.systems_to_run_now[i].0) {
288                            self.systems_to_run_now.remove(i);
289                        } else {
290                            i += 1;
291                        }
292                    }
293                }
294                // Wait until at least one system is finished.
295                let id = self.receiver.recv().expect(DISCONNECTED);
296                self.systems_just_finished.push(id);
297                // Handle any other systems that may have finished.
298                self.systems_just_finished.extend(self.receiver.try_iter());
299                // Remove finished systems from set of running systems.
300                for id in &self.systems_just_finished {
301                    self.systems_running.remove(id);
302                }
303                // Gather dependants of finished systems.
304                for finished in &self.systems_just_finished {
305                    for dependant in &self.systems.get(finished).expect(INVALID_ID).dependants {
306                        self.systems_to_decrement_dependencies.push(*dependant);
307                    }
308                }
309                self.systems_just_finished.clear();
310                // Figure out which of the gathered dependants have had all their dependencies
311                // satisfied and queue them to run.
312                for id in &self.systems_to_decrement_dependencies {
313                    let system = &mut self.systems.get_mut(id).expect(INVALID_ID);
314                    let dependants = DependantsLength(system.dependants.len());
315                    let unsatisfied_dependencies = &mut system.unsatisfied_dependencies;
316                    *unsatisfied_dependencies -= 1;
317                    if *unsatisfied_dependencies == 0 {
318                        self.systems_to_run_now.push((*id, dependants));
319                    }
320                }
321                self.systems_to_decrement_dependencies.clear();
322                // Sort queued systems so that those with most dependants run first.
323                self.systems_to_run_now.sort_by(|(_, a), (_, b)| b.cmp(a));
324            }
325        });
326        debug_assert!(self.systems_to_run_now.is_empty());
327        debug_assert!(self.systems_running.is_empty());
328        debug_assert!(self.systems_just_finished.is_empty());
329        debug_assert!(self.systems_to_decrement_dependencies.is_empty());
330    }
331
332    fn can_run_now(&self, id: SystemId) -> bool {
333        let system = self.systems.get(&id).expect(INVALID_ID);
334        for id in &self.systems_running {
335            let running_system = self.systems.get(id).expect(INVALID_ID);
336            // A system can't run if the resources it needs are already borrowed incompatibly.
337            if !system
338                .resource_set
339                .is_compatible(&running_system.resource_set)
340            {
341                return false;
342            }
343            // A system can't run if it could borrow incompatibly any components.
344            // This can only happen if the system could incompatibly access the same components
345            // from the same archetype that another system may be using.
346            if !system
347                .component_set
348                .is_compatible(&running_system.component_set)
349                && !system
350                    .archetype_set
351                    .is_compatible(&running_system.archetype_set)
352            {
353                return false;
354            }
355        }
356        true
357    }
358}
359
360#[test]
361fn dispatch_heuristic_trivial() {
362    if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<()>::build(
363        crate::Executor::builder()
364            .system(|_, _: (), _: ()| {})
365            .system(|_, _: (), _: ()| {}),
366    ) {
367        panic!();
368    }
369}
370
371#[test]
372fn dispatch_heuristic_trivial_with_resources() {
373    if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<(usize, f32)>::build(
374        crate::Executor::builder()
375            .system(|_, _: (), _: ()| {})
376            .system(|_, _: (), _: ()| {}),
377    ) {
378        panic!();
379    }
380}
381
382#[test]
383fn dispatch_heuristic_resources_incompatible() {
384    if let ExecutorParallel::Dispatching(_) = ExecutorParallel::<(usize, f32)>::build(
385        crate::Executor::builder()
386            .system(|_, _: &f32, _: ()| {})
387            .system(|_, _: &mut f32, _: ()| {}),
388    ) {
389        panic!();
390    }
391}
392
393#[test]
394fn dispatch_heuristic_resources_disjoint() {
395    if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<(usize, f32)>::build(
396        crate::Executor::builder()
397            .system(|_, _: &mut usize, _: ()| {})
398            .system(|_, _: &mut f32, _: ()| {}),
399    ) {
400        panic!();
401    }
402}
403
404#[test]
405fn dispatch_heuristic_resources_immutable() {
406    if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<(usize, f32)>::build(
407        crate::Executor::builder()
408            .system(|_, _: &f32, _: ()| {})
409            .system(|_, _: &f32, _: ()| {}),
410    ) {
411        panic!();
412    }
413}
414
415#[test]
416fn dispatch_heuristic_queries_incompatible() {
417    if let ExecutorParallel::Dispatching(_) = ExecutorParallel::<()>::build(
418        crate::Executor::builder()
419            .system(|_, _: (), _: crate::QueryMarker<&mut f32>| {})
420            .system(|_, _: (), _: crate::QueryMarker<&f32>| {}),
421    ) {
422        panic!();
423    }
424}
425
426#[test]
427fn dispatch_heuristic_queries_disjoint() {
428    if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<()>::build(
429        crate::Executor::builder()
430            .system(|_, _: (), _: crate::QueryMarker<&mut usize>| {})
431            .system(|_, _: (), _: crate::QueryMarker<&mut f32>| {}),
432    ) {
433        panic!();
434    }
435}
436
437#[test]
438fn dispatch_heuristic_queries_immutable() {
439    if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<()>::build(
440        crate::Executor::builder()
441            .system(|_, _: (), _: crate::QueryMarker<&f32>| {})
442            .system(|_, _: (), _: crate::QueryMarker<&f32>| {}),
443    ) {
444        panic!();
445    }
446}