1use crossbeam_channel::{Receiver, Sender};
2use hecs::ArchetypesGeneration;
3use hecs::World;
4use parking_lot::Mutex;
5use rayon::prelude::*;
6use std::{
7 collections::{HashMap, HashSet},
8 sync::Arc,
9};
10
11use crate::{
12 ArchetypeSet, ComponentSet, ExecutorBuilder, ResourceSet, ResourceTuple, ResourceWrap,
13 SystemClosure, SystemContext, SystemId,
14};
15
16static DISCONNECTED: &str = "channel should not be disconnected at this point";
17static INVALID_ID: &str = "system IDs should always be valid";
18
19pub enum ExecutorParallel<'closures, Resources>
20where
21 Resources: ResourceTuple,
22{
23 Dispatching(Dispatcher<'closures, Resources>),
26 Scheduling(Scheduler<'closures, Resources>),
27}
28
29impl<'closures, Resources> ExecutorParallel<'closures, Resources>
30where
31 Resources: ResourceTuple,
32{
33 pub fn build<Handle>(builder: ExecutorBuilder<'closures, Resources, Handle>) -> Self {
34 let mut all_dependencies = Vec::new();
36 let mut systems_without_dependencies = Vec::new();
37 let ExecutorBuilder {
38 mut systems,
39 mut all_component_types,
40 ..
41 } = builder;
42 let all_component_types = all_component_types.drain().collect::<Vec<_>>();
44 let mut systems: HashMap<SystemId, System<'closures, Resources>> = systems
45 .drain()
46 .map(|(id, system)| {
47 let dependencies = system.dependencies.len();
48 if dependencies == 0 {
50 systems_without_dependencies.push(id);
51 }
52 all_dependencies.push((id, system.dependencies));
53 (
54 id,
55 System {
56 closure: Arc::new(Mutex::new(system.closure)),
57 resource_set: system.resource_set,
58 component_set: system.component_type_set.condense(&all_component_types),
59 archetype_set: ArchetypeSet::default(),
60 archetype_writer: system.archetype_writer,
61 dependants: vec![],
62 dependencies,
63 unsatisfied_dependencies: 0,
64 },
65 )
66 })
67 .collect();
68 if systems.len() == systems_without_dependencies.len() {
70 let mut tested_ids = Vec::new();
71 let mut all_disjoint = true;
72 'outer: for (id, system) in &systems {
73 tested_ids.push(*id);
74 for (id, other) in &systems {
75 if !tested_ids.contains(id)
76 && (!system.resource_set.is_compatible(&other.resource_set)
77 || !system.component_set.is_compatible(&other.component_set))
78 {
79 all_disjoint = false;
80 break 'outer;
81 }
82 }
83 }
84 if all_disjoint {
85 return ExecutorParallel::Dispatching(Dispatcher {
86 borrows: Resources::instantiate_borrows(),
87 systems: systems
88 .drain()
89 .map(|(id, system)| (id, system.closure))
90 .collect(),
91 });
92 }
93 }
94 for (dependant_id, mut dependencies) in all_dependencies.drain(..) {
96 for dependee_id in dependencies.drain(..) {
97 systems
98 .get_mut(&dependee_id)
99 .expect(INVALID_ID)
100 .dependants
101 .push(dependant_id);
102 }
103 }
104 let mut systems_without_dependencies: Vec<_> = systems_without_dependencies
106 .drain(..)
107 .map(|id| {
108 (
109 id,
110 DependantsLength(systems.get(&id).expect(INVALID_ID).dependants.len()),
111 )
112 })
113 .collect();
114 systems_without_dependencies.sort_by(|(_, a), (_, b)| b.cmp(a));
116 debug_assert!(!systems_without_dependencies.is_empty());
118 let (sender, receiver) = crossbeam_channel::unbounded();
119 ExecutorParallel::Scheduling(Scheduler {
120 borrows: Resources::instantiate_borrows(),
121 systems,
122 archetypes_generation: None,
123 systems_without_dependencies,
124 systems_to_run_now: Vec::new(),
125 systems_running: HashSet::new(),
126 systems_just_finished: Vec::new(),
127 systems_to_decrement_dependencies: Vec::new(),
128 sender,
129 receiver,
130 })
131 }
132
133 pub fn force_archetype_recalculation(&mut self) {
134 match self {
135 ExecutorParallel::Dispatching(_) => (),
136 ExecutorParallel::Scheduling(scheduler) => scheduler.archetypes_generation = None,
137 }
138 }
139
140 pub fn run<ResourceTuple>(&mut self, world: &World, resources: ResourceTuple)
141 where
142 ResourceTuple: ResourceWrap<Cells = Resources::Cells, Borrows = Resources::Borrows> + Send,
143 Resources::Borrows: Send,
144 Resources::Cells: Send + Sync,
145 {
146 match self {
147 ExecutorParallel::Dispatching(dispatcher) => dispatcher.run(world, resources),
148 ExecutorParallel::Scheduling(scheduler) => scheduler.run(world, resources),
149 }
150 }
151}
152
153pub struct Dispatcher<'closures, Resources>
154where
155 Resources: ResourceTuple,
156{
157 borrows: Resources::Borrows,
158 systems: HashMap<SystemId, Arc<Mutex<SystemClosure<'closures, Resources::Cells>>>>,
159}
160
161impl<'closures, Resources> Dispatcher<'closures, Resources>
162where
163 Resources: ResourceTuple,
164{
165 fn run<ResourceTuple>(&mut self, world: &World, mut resources: ResourceTuple)
166 where
167 ResourceTuple: ResourceWrap<Cells = Resources::Cells, Borrows = Resources::Borrows> + Send,
168 Resources::Borrows: Send,
169 Resources::Cells: Send + Sync,
170 {
171 let wrapped = resources.wrap(&mut self.borrows);
172 self.systems.par_iter().for_each(|(id, system)| {
173 let system = &mut *system
174 .try_lock() .expect("systems should only be ran once per execution");
176 system(
177 SystemContext {
178 system_id: Some(*id),
179 world,
180 },
181 &wrapped,
182 );
183 });
184 }
185}
186
187struct System<'closure, Resources>
188where
189 Resources: ResourceTuple,
190{
191 closure: Arc<Mutex<SystemClosure<'closure, Resources::Cells>>>,
192 resource_set: ResourceSet,
193 component_set: ComponentSet,
194 archetype_set: ArchetypeSet,
195 archetype_writer: Box<dyn Fn(&World, &mut ArchetypeSet) + Send>,
196 dependants: Vec<SystemId>,
197 dependencies: usize,
198 unsatisfied_dependencies: usize,
199}
200
201#[derive(Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
202struct DependantsLength(usize);
203
204pub struct Scheduler<'closures, Resources>
205where
206 Resources: ResourceTuple,
207{
208 borrows: Resources::Borrows,
209 systems: HashMap<SystemId, System<'closures, Resources>>,
210 archetypes_generation: Option<ArchetypesGeneration>,
211 systems_without_dependencies: Vec<(SystemId, DependantsLength)>,
212 systems_to_run_now: Vec<(SystemId, DependantsLength)>,
213 systems_running: HashSet<SystemId>,
214 systems_just_finished: Vec<SystemId>,
215 systems_to_decrement_dependencies: Vec<SystemId>,
216 sender: Sender<SystemId>,
217 receiver: Receiver<SystemId>,
218}
219
220impl<'closures, Resources> Scheduler<'closures, Resources>
221where
222 Resources: ResourceTuple,
223{
224 fn run<ResourceTuple>(&mut self, world: &World, mut resources: ResourceTuple)
225 where
226 ResourceTuple: ResourceWrap<Cells = Resources::Cells, Borrows = Resources::Borrows> + Send,
227 Resources::Borrows: Send,
228 Resources::Cells: Send + Sync,
229 {
230 if Some(world.archetypes_generation()) == self.archetypes_generation {
231 for system in self.systems.values_mut() {
233 debug_assert!(system.unsatisfied_dependencies == 0);
234 system.unsatisfied_dependencies = system.dependencies;
235 }
236 } else {
237 self.archetypes_generation = Some(world.archetypes_generation());
240 for system in self.systems.values_mut() {
241 (system.archetype_writer)(world, &mut system.archetype_set);
242 debug_assert!(system.unsatisfied_dependencies == 0);
243 system.unsatisfied_dependencies = system.dependencies;
244 }
245 }
246 self.systems_to_run_now
248 .extend(&self.systems_without_dependencies);
249 let wrapped = resources.wrap(&mut self.borrows);
251 let wrapped = &wrapped;
252 rayon::scope(|scope| {
253 while !(self.systems_to_run_now.is_empty() && self.systems_running.is_empty()) {
255 for (id, _) in &self.systems_to_run_now {
256 if self.can_run_now(*id) {
259 self.systems_running.insert(*id);
261 let system = self.systems.get_mut(id).expect(INVALID_ID).closure.clone();
263 let sender = self.sender.clone();
264 let id = *id;
265 scope.spawn(move |_| {
266 let system = &mut *system
267 .try_lock() .expect("systems should only be ran once per execution");
269 system(
270 SystemContext {
271 system_id: Some(id),
272 world,
273 },
274 wrapped,
275 );
276 sender.send(id).expect(DISCONNECTED);
278 });
279 }
280 }
281 {
282 let mut i = 0;
286 while i != self.systems_to_run_now.len() {
287 if self.systems_running.contains(&self.systems_to_run_now[i].0) {
288 self.systems_to_run_now.remove(i);
289 } else {
290 i += 1;
291 }
292 }
293 }
294 let id = self.receiver.recv().expect(DISCONNECTED);
296 self.systems_just_finished.push(id);
297 self.systems_just_finished.extend(self.receiver.try_iter());
299 for id in &self.systems_just_finished {
301 self.systems_running.remove(id);
302 }
303 for finished in &self.systems_just_finished {
305 for dependant in &self.systems.get(finished).expect(INVALID_ID).dependants {
306 self.systems_to_decrement_dependencies.push(*dependant);
307 }
308 }
309 self.systems_just_finished.clear();
310 for id in &self.systems_to_decrement_dependencies {
313 let system = &mut self.systems.get_mut(id).expect(INVALID_ID);
314 let dependants = DependantsLength(system.dependants.len());
315 let unsatisfied_dependencies = &mut system.unsatisfied_dependencies;
316 *unsatisfied_dependencies -= 1;
317 if *unsatisfied_dependencies == 0 {
318 self.systems_to_run_now.push((*id, dependants));
319 }
320 }
321 self.systems_to_decrement_dependencies.clear();
322 self.systems_to_run_now.sort_by(|(_, a), (_, b)| b.cmp(a));
324 }
325 });
326 debug_assert!(self.systems_to_run_now.is_empty());
327 debug_assert!(self.systems_running.is_empty());
328 debug_assert!(self.systems_just_finished.is_empty());
329 debug_assert!(self.systems_to_decrement_dependencies.is_empty());
330 }
331
332 fn can_run_now(&self, id: SystemId) -> bool {
333 let system = self.systems.get(&id).expect(INVALID_ID);
334 for id in &self.systems_running {
335 let running_system = self.systems.get(id).expect(INVALID_ID);
336 if !system
338 .resource_set
339 .is_compatible(&running_system.resource_set)
340 {
341 return false;
342 }
343 if !system
347 .component_set
348 .is_compatible(&running_system.component_set)
349 && !system
350 .archetype_set
351 .is_compatible(&running_system.archetype_set)
352 {
353 return false;
354 }
355 }
356 true
357 }
358}
359
360#[test]
361fn dispatch_heuristic_trivial() {
362 if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<()>::build(
363 crate::Executor::builder()
364 .system(|_, _: (), _: ()| {})
365 .system(|_, _: (), _: ()| {}),
366 ) {
367 panic!();
368 }
369}
370
371#[test]
372fn dispatch_heuristic_trivial_with_resources() {
373 if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<(usize, f32)>::build(
374 crate::Executor::builder()
375 .system(|_, _: (), _: ()| {})
376 .system(|_, _: (), _: ()| {}),
377 ) {
378 panic!();
379 }
380}
381
382#[test]
383fn dispatch_heuristic_resources_incompatible() {
384 if let ExecutorParallel::Dispatching(_) = ExecutorParallel::<(usize, f32)>::build(
385 crate::Executor::builder()
386 .system(|_, _: &f32, _: ()| {})
387 .system(|_, _: &mut f32, _: ()| {}),
388 ) {
389 panic!();
390 }
391}
392
393#[test]
394fn dispatch_heuristic_resources_disjoint() {
395 if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<(usize, f32)>::build(
396 crate::Executor::builder()
397 .system(|_, _: &mut usize, _: ()| {})
398 .system(|_, _: &mut f32, _: ()| {}),
399 ) {
400 panic!();
401 }
402}
403
404#[test]
405fn dispatch_heuristic_resources_immutable() {
406 if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<(usize, f32)>::build(
407 crate::Executor::builder()
408 .system(|_, _: &f32, _: ()| {})
409 .system(|_, _: &f32, _: ()| {}),
410 ) {
411 panic!();
412 }
413}
414
415#[test]
416fn dispatch_heuristic_queries_incompatible() {
417 if let ExecutorParallel::Dispatching(_) = ExecutorParallel::<()>::build(
418 crate::Executor::builder()
419 .system(|_, _: (), _: crate::QueryMarker<&mut f32>| {})
420 .system(|_, _: (), _: crate::QueryMarker<&f32>| {}),
421 ) {
422 panic!();
423 }
424}
425
426#[test]
427fn dispatch_heuristic_queries_disjoint() {
428 if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<()>::build(
429 crate::Executor::builder()
430 .system(|_, _: (), _: crate::QueryMarker<&mut usize>| {})
431 .system(|_, _: (), _: crate::QueryMarker<&mut f32>| {}),
432 ) {
433 panic!();
434 }
435}
436
437#[test]
438fn dispatch_heuristic_queries_immutable() {
439 if let ExecutorParallel::Scheduling(_) = ExecutorParallel::<()>::build(
440 crate::Executor::builder()
441 .system(|_, _: (), _: crate::QueryMarker<&f32>| {})
442 .system(|_, _: (), _: crate::QueryMarker<&f32>| {}),
443 ) {
444 panic!();
445 }
446}