1use crate::{
2 resource::{ResourceTypeId, Resources},
3 system::SystemId,
4};
5use legion_core::{
6 borrow::RefMut,
7 command::CommandBuffer,
8 storage::ComponentTypeId,
9 subworld::ArchetypeAccess,
10 world::{World, WorldId},
11};
12use std::cell::UnsafeCell;
13
14#[cfg(feature = "par-schedule")]
15use tracing::{span, trace, Level};
16
17#[cfg(feature = "par-schedule")]
18use std::sync::atomic::{AtomicUsize, Ordering};
19
20#[cfg(feature = "par-schedule")]
21use fxhash::{FxHashMap, FxHashSet};
22
23#[cfg(feature = "par-schedule")]
24use rayon::prelude::*;
25
26#[cfg(feature = "par-schedule")]
27use itertools::izip;
28
29#[cfg(feature = "par-schedule")]
30use std::iter::repeat;
31
32pub trait Schedulable: Runnable + Send + Sync {}
37impl<T> Schedulable for T where T: Runnable + Send + Sync {}
38
39pub trait Runnable {
41 fn name(&self) -> &SystemId;
43
44 fn reads(&self) -> (&[ResourceTypeId], &[ComponentTypeId]);
46
47 fn writes(&self) -> (&[ResourceTypeId], &[ComponentTypeId]);
49
50 fn prepare(&mut self, world: &World);
52
53 fn accesses_archetypes(&self) -> &ArchetypeAccess;
56
57 unsafe fn run_unsafe(&mut self, world: &World, resources: &Resources);
65
66 fn command_buffer_mut(&self, world: WorldId) -> Option<RefMut<CommandBuffer>>;
68
69 fn run(&mut self, world: &mut World, resources: &mut Resources) {
71 unsafe { self.run_unsafe(world, resources) };
72 }
73}
74
75pub struct Executor {
81 systems: Vec<SystemBox>,
82 #[cfg(feature = "par-schedule")]
83 static_dependants: Vec<Vec<usize>>,
84 #[cfg(feature = "par-schedule")]
85 dynamic_dependants: Vec<Vec<usize>>,
86 #[cfg(feature = "par-schedule")]
87 static_dependency_counts: Vec<AtomicUsize>,
88 #[cfg(feature = "par-schedule")]
89 awaiting: Vec<AtomicUsize>,
90}
91
92struct SystemBox(UnsafeCell<Box<dyn Schedulable>>);
93
94unsafe impl Send for SystemBox {}
98unsafe impl Sync for SystemBox {}
99
100impl SystemBox {
101 #[cfg(feature = "par-schedule")]
102 unsafe fn get(&self) -> &dyn Schedulable { std::ops::Deref::deref(&*self.0.get()) }
103
104 #[allow(clippy::mut_from_ref)]
105 unsafe fn get_mut(&self) -> &mut dyn Schedulable {
106 std::ops::DerefMut::deref_mut(&mut *self.0.get())
107 }
108}
109
110impl Executor {
111 #[cfg(not(feature = "par-schedule"))]
116 pub fn new(systems: Vec<Box<dyn Schedulable>>) -> Self {
117 Self {
118 systems: systems
119 .into_iter()
120 .map(|s| SystemBox(UnsafeCell::new(s)))
121 .collect(),
122 }
123 }
124
125 #[cfg(feature = "par-schedule")]
130 #[allow(clippy::cognitive_complexity)]
131 pub fn new(systems: Vec<Box<dyn Schedulable>>) -> Self {
133 if systems.len() > 1 {
134 let mut static_dependency_counts = Vec::with_capacity(systems.len());
135
136 let mut static_dependants: Vec<Vec<_>> =
137 repeat(Vec::with_capacity(64)).take(systems.len()).collect();
138 let mut dynamic_dependants: Vec<Vec<_>> =
139 repeat(Vec::with_capacity(64)).take(systems.len()).collect();
140
141 let mut resource_last_mutated =
142 FxHashMap::<ResourceTypeId, usize>::with_capacity_and_hasher(
143 64,
144 Default::default(),
145 );
146 let mut resource_last_read =
147 FxHashMap::<ResourceTypeId, usize>::with_capacity_and_hasher(
148 64,
149 Default::default(),
150 );
151 let mut component_last_mutated =
152 FxHashMap::<ComponentTypeId, usize>::with_capacity_and_hasher(
153 64,
154 Default::default(),
155 );
156 let mut component_last_read =
157 FxHashMap::<ComponentTypeId, usize>::with_capacity_and_hasher(
158 64,
159 Default::default(),
160 );
161
162 for (i, system) in systems.iter().enumerate() {
163 let span = span!(
164 Level::TRACE,
165 "Building system dependencies",
166 system = %system.name(),
167 index = i,
168 );
169 let _guard = span.enter();
170
171 let (read_res, read_comp) = system.reads();
172 let (write_res, write_comp) = system.writes();
173
174 let mut dependencies = FxHashSet::with_capacity_and_hasher(64, Default::default());
176 for res in read_res {
177 trace!(resource = ?res, "Read resource");
178 if let Some(n) = resource_last_mutated.get(res) {
179 trace!(system_index = n, "Added write dependency");
180 dependencies.insert(*n);
181 }
182 }
183 for res in write_res {
184 trace!(resource = ?res, "Write resource");
185 if let Some(n) = resource_last_read.get(res) {
187 trace!(system_index = n, "Added read dependency");
188 dependencies.insert(*n);
189 }
190
191 if let Some(n) = resource_last_mutated.get(res) {
192 trace!(system_index = n, "Added write dependency");
193 dependencies.insert(*n);
194 }
195 }
196
197 for res in read_res {
199 resource_last_read.insert(*res, i);
200 }
201 for res in write_res {
202 resource_last_read.insert(*res, i);
203 resource_last_mutated.insert(*res, i);
204 }
205
206 static_dependency_counts.push(AtomicUsize::from(dependencies.len()));
207 trace!(dependants = ?dependencies, dependency_counts = ?static_dependency_counts, "Computed static dependants");
208 for dep in &dependencies {
209 static_dependants[*dep].push(i);
210 }
211
212 let mut comp_dependencies = FxHashSet::default();
214 for comp in read_comp {
215 trace!(component = ?comp, "Read component");
216 if let Some(n) = component_last_mutated.get(comp) {
217 trace!(system_index = n, "Added write dependency");
218 comp_dependencies.insert(*n);
219 }
220 }
221 for comp in write_comp {
222 trace!(component = ?comp, "Write component");
224 if let Some(n) = component_last_read.get(comp) {
225 trace!(system_index = n, "Added read dependency");
226 comp_dependencies.insert(*n);
227 }
228 if let Some(n) = component_last_mutated.get(comp) {
229 trace!(system_index = n, "Added write dependency");
230 comp_dependencies.insert(*n);
231 }
232 }
233
234 for comp in read_comp {
236 component_last_read.insert(*comp, i);
237 }
238 for comp in write_comp {
239 component_last_read.insert(*comp, i);
240 component_last_mutated.insert(*comp, i);
241 }
242
243 for static_dep in &dependencies {
245 comp_dependencies.remove(static_dep);
246 }
247
248 trace!(depentants = ?comp_dependencies, "Computed dynamic dependants");
249 for dep in comp_dependencies {
250 if dep != i {
251 dynamic_dependants[dep].push(i);
253 }
254 }
255 }
256
257 trace!(
258 ?static_dependants,
259 ?dynamic_dependants,
260 "Computed system dependencies"
261 );
262
263 let mut awaiting = Vec::with_capacity(systems.len());
264 systems
265 .iter()
266 .for_each(|_| awaiting.push(AtomicUsize::new(0)));
267
268 Executor {
269 awaiting,
270 static_dependants,
271 dynamic_dependants,
272 static_dependency_counts,
273 systems: systems
274 .into_iter()
275 .map(|s| SystemBox(UnsafeCell::new(s)))
276 .collect(),
277 }
278 } else {
279 Executor {
280 awaiting: Vec::with_capacity(0),
281 static_dependants: Vec::with_capacity(0),
282 dynamic_dependants: Vec::with_capacity(0),
283 static_dependency_counts: Vec::with_capacity(0),
284 systems: systems
285 .into_iter()
286 .map(|s| SystemBox(UnsafeCell::new(s)))
287 .collect(),
288 }
289 }
290 }
291
292 pub fn into_vec(self) -> Vec<Box<dyn Schedulable>> {
294 self.systems.into_iter().map(|s| s.0.into_inner()).collect()
295 }
296
297 pub fn execute(&mut self, world: &mut World, resources: &mut Resources) {
299 self.run_systems(world, resources);
300 self.flush_command_buffers(world);
301 }
302
303 #[cfg(not(feature = "par-schedule"))]
307 pub fn run_systems(&mut self, world: &mut World, resources: &mut Resources) {
308 self.systems.iter_mut().for_each(|system| {
309 let system = unsafe { system.get_mut() };
310 system.prepare(world);
311 system.run(world, resources);
312 });
313 }
314
315 #[cfg(feature = "par-schedule")]
322 pub fn run_systems(&mut self, world: &mut World, resources: &mut Resources) {
323 rayon::join(
324 || {},
325 || {
326 match self.systems.len() {
327 1 => {
328 unsafe {
330 let system = self.systems[0].get_mut();
331 system.prepare(world);
332 system.run(world, resources);
333 };
334 }
335 _ => {
336 let systems = &mut self.systems;
337 let static_dependency_counts = &self.static_dependency_counts;
338 let awaiting = &mut self.awaiting;
339
340 systems
342 .par_iter_mut()
343 .for_each(|sys| unsafe { sys.get_mut() }.prepare(world));
344
345 izip!(
347 systems.iter(),
348 self.static_dependants.iter_mut(),
349 self.dynamic_dependants.iter_mut()
350 )
351 .par_bridge()
352 .for_each(|(sys, static_dep, dyn_dep)| {
353 let archetypes = unsafe { sys.get() }.accesses_archetypes();
355 for i in (0..dyn_dep.len()).rev() {
356 let dep = dyn_dep[i];
357 let other = unsafe { systems[dep].get() };
358
359 if !other.accesses_archetypes().is_disjoint(archetypes) {
362 static_dep.push(dep);
363 dyn_dep.swap_remove(i);
364 static_dependency_counts[dep].fetch_add(1, Ordering::Relaxed);
365 }
366 }
367 });
368
369 for (i, count) in static_dependency_counts.iter().enumerate() {
371 awaiting[i].store(count.load(Ordering::Relaxed), Ordering::Relaxed);
372 }
373
374 let awaiting = &self.awaiting;
375
376 trace!(?awaiting, "Initialized await counts");
377
378 (0..systems.len())
380 .into_par_iter()
381 .filter(|i| static_dependency_counts[*i].load(Ordering::SeqCst) == 0)
382 .for_each(|i| {
383 unsafe { self.run_recursive(i, world, resources) };
386 });
387
388 debug_assert!(
389 awaiting.iter().all(|x| x.load(Ordering::SeqCst) == 0),
390 "not all systems run: {:?}",
391 awaiting
392 );
393 }
394 }
395 },
396 );
397 }
398
399 pub fn flush_command_buffers(&mut self, world: &mut World) {
401 self.systems.iter().for_each(|system| {
402 let system = unsafe { system.get_mut() };
404 if let Some(mut cmd) = system.command_buffer_mut(world.id()) {
405 cmd.write(world);
406 }
407 });
408 }
409
410 #[cfg(feature = "par-schedule")]
416 unsafe fn run_recursive(&self, i: usize, world: &World, resources: &Resources) {
417 self.systems[i].get_mut().run_unsafe(world, resources);
419
420 self.static_dependants[i].par_iter().for_each(|dep| {
421 if self.awaiting[*dep].fetch_sub(1, Ordering::Relaxed) == 1 {
422 self.run_recursive(*dep, world, resources);
424 }
425 });
426 }
427}
428
429pub struct Builder {
431 steps: Vec<Step>,
432 accumulator: Vec<Box<dyn Schedulable>>,
433}
434
435impl Builder {
436 pub fn add_system<T: Into<Box<dyn Schedulable>>>(mut self, system: T) -> Self {
438 self.accumulator.push(system.into());
439 self
440 }
441
442 pub fn flush(mut self) -> Self {
445 self.finalize_executor();
446 self.steps.push(Step::FlushCmdBuffers);
447 self
448 }
449
450 fn finalize_executor(&mut self) {
451 if !self.accumulator.is_empty() {
452 let mut systems = Vec::new();
453 std::mem::swap(&mut self.accumulator, &mut systems);
454 let executor = Executor::new(systems);
455 self.steps.push(Step::Systems(executor));
456 }
457 }
458
459 pub fn add_thread_local_fn<F: FnMut(&mut World, &mut Resources) + 'static>(
461 mut self,
462 f: F,
463 ) -> Self {
464 self.finalize_executor();
465 self.steps.push(Step::ThreadLocalFn(
466 Box::new(f) as Box<dyn FnMut(&mut World, &mut Resources)>
467 ));
468 self
469 }
470
471 pub fn add_thread_local<S: Into<Box<dyn Runnable>>>(mut self, system: S) -> Self {
473 self.finalize_executor();
474 let system = system.into();
475 self.steps.push(Step::ThreadLocalSystem(system));
476 self
477 }
478
479 pub fn build(self) -> Schedule { self.into() }
481}
482
483impl Default for Builder {
484 fn default() -> Self {
485 Self {
486 steps: Vec::new(),
487 accumulator: Vec::new(),
488 }
489 }
490}
491
492pub enum Step {
494 Systems(Executor),
496 FlushCmdBuffers,
498 ThreadLocalFn(Box<dyn FnMut(&mut World, &mut Resources)>),
500 ThreadLocalSystem(Box<dyn Runnable>),
502}
503
504pub struct Schedule {
526 steps: Vec<Step>,
527}
528
529impl Schedule {
530 pub fn builder() -> Builder { Builder::default() }
532
533 pub fn execute(&mut self, world: &mut World, resources: &mut Resources) {
535 enum ToFlush<'a> {
536 Executor(&'a mut Executor),
537 System(RefMut<'a, CommandBuffer>),
538 }
539
540 let mut waiting_flush: Vec<ToFlush> = Vec::new();
541 for step in &mut self.steps {
542 match step {
543 Step::Systems(executor) => {
544 executor.run_systems(world, resources);
545 waiting_flush.push(ToFlush::Executor(executor));
546 }
547 Step::FlushCmdBuffers => {
548 waiting_flush.drain(..).for_each(|e| match e {
549 ToFlush::Executor(exec) => exec.flush_command_buffers(world),
550 ToFlush::System(mut cmd) => cmd.write(world),
551 });
552 }
553 Step::ThreadLocalFn(function) => function(world, resources),
554 Step::ThreadLocalSystem(system) => {
555 system.run(world, resources);
556 if let Some(cmd) = system.command_buffer_mut(world.id()) {
557 waiting_flush.push(ToFlush::System(cmd));
558 }
559 }
560 }
561 }
562 }
563
564 pub fn into_vec(self) -> Vec<Step> { self.steps }
566}
567
568impl From<Builder> for Schedule {
569 fn from(builder: Builder) -> Self {
570 Self {
571 steps: builder.flush().steps,
572 }
573 }
574}
575
576impl From<Vec<Step>> for Schedule {
577 fn from(steps: Vec<Step>) -> Self { Self { steps } }
578}
579
580#[cfg(test)]
581mod tests {
582 use super::*;
583 use crate::prelude::*;
584 use itertools::sorted;
585 use legion_core::prelude::*;
586 use std::sync::{Arc, Mutex};
587
588 #[test]
589 fn execute_in_order() {
590 let universe = Universe::new();
591 let mut world = universe.create_world();
592
593 #[derive(Default)]
594 struct Resource;
595
596 let mut resources = Resources::default();
597 resources.insert(Resource);
598
599 let order = Arc::new(Mutex::new(Vec::new()));
600
601 let order_clone = order.clone();
602 let system_one = SystemBuilder::new("one")
603 .write_resource::<Resource>()
604 .build(move |_, _, _, _| order_clone.lock().unwrap().push(1usize));
605 let order_clone = order.clone();
606 let system_two = SystemBuilder::new("two")
607 .write_resource::<Resource>()
608 .build(move |_, _, _, _| order_clone.lock().unwrap().push(2usize));
609 let order_clone = order.clone();
610 let system_three = SystemBuilder::new("three")
611 .write_resource::<Resource>()
612 .build(move |_, _, _, _| order_clone.lock().unwrap().push(3usize));
613
614 let mut schedule = Schedule::builder()
615 .add_system(system_one)
616 .add_system(system_two)
617 .add_system(system_three)
618 .build();
619
620 schedule.execute(&mut world, &mut resources);
621
622 let order = order.lock().unwrap();
623 let sorted: Vec<usize> = sorted(order.clone()).collect();
624 assert_eq!(*order, sorted);
625 }
626
627 #[test]
628 fn flush() {
629 let universe = Universe::new();
630 let mut world = universe.create_world();
631 let mut resources = Resources::default();
632
633 #[derive(Clone, Copy, Debug, PartialEq)]
634 struct TestComp(f32, f32, f32);
635
636 let system_one = SystemBuilder::new("one").build(move |cmd, _, _, _| {
637 cmd.insert((), vec![(TestComp(0., 0., 0.),)]);
638 });
639 let system_two = SystemBuilder::new("two")
640 .with_query(Write::<TestComp>::query())
641 .build(move |_, world, _, query| {
642 assert_eq!(0, query.iter_mut(world).count());
643 });
644 let system_three = SystemBuilder::new("three")
645 .with_query(Write::<TestComp>::query())
646 .build(move |_, world, _, query| {
647 assert_eq!(1, query.iter_mut(world).count());
648 });
649
650 let mut schedule = Schedule::builder()
651 .add_system(system_one)
652 .add_system(system_two)
653 .flush()
654 .add_system(system_three)
655 .build();
656
657 schedule.execute(&mut world, &mut resources);
658 }
659
660 #[test]
661 fn flush_thread_local() {
662 let universe = Universe::new();
663 let mut world = universe.create_world();
664 let mut resources = Resources::default();
665
666 #[derive(Clone, Copy, Debug, PartialEq)]
667 struct TestComp(f32, f32, f32);
668
669 let entity = Arc::new(Mutex::new(None));
670
671 {
672 let entity = entity.clone();
673
674 let system_one = SystemBuilder::new("one").build_thread_local(move |cmd, _, _, _| {
675 let mut entity = entity.lock().unwrap();
676 *entity = Some(cmd.insert((), vec![(TestComp(0.0, 0.0, 0.0),)])[0]);
677 });
678
679 let mut schedule = Schedule::builder().add_thread_local(system_one).build();
680
681 schedule.execute(&mut world, &mut resources);
682 }
683
684 let entity = entity.lock().unwrap();
685
686 assert!(entity.is_some());
687 assert!(world.get_entity_location(entity.unwrap()).is_some());
688 }
689}