1use std::fmt::Debug;
2
3use hashbrown::hash_map::{Entry, HashMap};
4use tokio::{
5 sync::watch::channel,
6 task::{spawn as spawn_task, spawn_local},
7};
8
9use crate::{
10 access::Accessor,
11 resource::ResourceId,
12 system::{AsyncSystem, System},
13 world::World,
14};
15
16use super::{
17 task::{execute_local, execute_local_async, execute_thread, execute_thread_async},
18 Dispatcher, Error, LocalRun, LocalRunAsync, Receiver, Sender, SharedWorld, ThreadRun,
19 ThreadRunAsync,
20};
21
22#[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
24struct SystemId(pub usize);
25
26pub struct Builder<'a> {
118 world: Option<&'a mut World>,
119 next_id: SystemId,
120 items: HashMap<SystemId, Item>,
121 names: HashMap<String, SystemId>,
122}
123
124impl<'a> Builder<'a> {
125 pub fn new(world: Option<&'a mut World>) -> Self {
126 Self {
127 world,
128 next_id: Default::default(),
129 items: Default::default(),
130 names: Default::default(),
131 }
132 }
133
134 pub fn build(self) -> Dispatcher {
138 let receivers = self
139 .final_systems()
140 .into_iter()
141 .map(|id| self.items.get(&id).unwrap().receiver.clone())
142 .collect();
143
144 let world = SharedWorld::default();
145 let (sender, receiver) = channel(());
146
147 for (_, item) in self.items.into_iter() {
148 let run = item.run;
149 let name = item.name;
150 let sender = item.sender;
151 let receivers = if item.dependencies.is_empty() {
152 vec![receiver.clone()]
153 } else {
154 item.receivers
155 };
156
157 match run {
158 RunType::Thread(run) => {
159 spawn_task(execute_thread(name, run, sender, receivers, world.clone()))
160 }
161 RunType::Local(run) => {
162 spawn_local(execute_local(name, run, sender, receivers, world.clone()))
163 }
164 RunType::ThreadAsync(run) => spawn_task(execute_thread_async(
165 name,
166 run,
167 sender,
168 receivers,
169 world.clone(),
170 )),
171 RunType::LocalAsync(run) => spawn_local(execute_local_async(
172 name,
173 run,
174 sender,
175 receivers,
176 world.clone(),
177 )),
178 };
179 }
180
181 Dispatcher {
182 sender,
183 receivers,
184 world,
185 }
186 }
187
188 pub fn with<S>(mut self, system: S, name: &str, dependencies: &[&str]) -> Result<Self, Error>
199 where
200 S: for<'s> System<'s> + Send + 'static,
201 {
202 self.add(system, name, dependencies)?;
203
204 Ok(self)
205 }
206
207 pub fn add<S>(
215 &mut self,
216 mut system: S,
217 name: &str,
218 dependencies: &[&str],
219 ) -> Result<&mut Self, Error>
220 where
221 S: for<'s> System<'s> + Send + 'static,
222 {
223 self.add_inner(
224 name,
225 dependencies,
226 system.accessor().reads(),
227 system.accessor().writes(),
228 |this, id| {
229 if let Some(ref mut w) = this.world {
230 system.setup(w)
231 }
232
233 match this.items.entry(id) {
234 Entry::Vacant(e) => e.insert(Item::thread(name.into(), system)),
235 Entry::Occupied(_) => panic!("Item was already created!"),
236 }
237 },
238 )?;
239
240 Ok(self)
241 }
242
243 pub fn with_async<S>(
254 mut self,
255 system: S,
256 name: &str,
257 dependencies: &[&str],
258 ) -> Result<Self, Error>
259 where
260 S: for<'s> AsyncSystem<'s> + Send + 'static,
261 {
262 self.add_async(system, name, dependencies)?;
263
264 Ok(self)
265 }
266
267 pub fn add_async<S>(
275 &mut self,
276 mut system: S,
277 name: &str,
278 dependencies: &[&str],
279 ) -> Result<&mut Self, Error>
280 where
281 S: for<'s> AsyncSystem<'s> + Send + 'static,
282 {
283 self.add_inner(
284 name,
285 dependencies,
286 system.accessor().reads(),
287 system.accessor().writes(),
288 |this, id| {
289 if let Some(ref mut w) = this.world {
290 system.setup(w)
291 }
292
293 match this.items.entry(id) {
294 Entry::Vacant(e) => e.insert(Item::thread_async(name.into(), system)),
295 Entry::Occupied(_) => panic!("Item was already created!"),
296 }
297 },
298 )?;
299
300 Ok(self)
301 }
302
303 pub fn with_local<S>(
312 mut self,
313 system: S,
314 name: &str,
315 dependencies: &[&str],
316 ) -> Result<Self, Error>
317 where
318 S: for<'s> System<'s> + 'static,
319 {
320 self.add_local(system, name, dependencies)?;
321
322 Ok(self)
323 }
324
325 pub fn add_local<S>(
331 &mut self,
332 mut system: S,
333 name: &str,
334 dependencies: &[&str],
335 ) -> Result<&mut Self, Error>
336 where
337 S: for<'s> System<'s> + 'static,
338 {
339 self.add_inner(
340 name,
341 dependencies,
342 system.accessor().reads(),
343 system.accessor().writes(),
344 |this, id| {
345 if let Some(ref mut w) = this.world {
346 system.setup(w)
347 }
348
349 match this.items.entry(id) {
350 Entry::Vacant(e) => e.insert(Item::local(name.into(), system)),
351 Entry::Occupied(_) => panic!("Item was already created!"),
352 }
353 },
354 )?;
355
356 Ok(self)
357 }
358
359 pub fn with_local_async<S>(
368 mut self,
369 system: S,
370 name: &str,
371 dependencies: &[&str],
372 ) -> Result<Self, Error>
373 where
374 S: for<'s> AsyncSystem<'s> + 'static,
375 {
376 self.add_local_async(system, name, dependencies)?;
377
378 Ok(self)
379 }
380
381 pub fn add_local_async<S>(
387 &mut self,
388 mut system: S,
389 name: &str,
390 dependencies: &[&str],
391 ) -> Result<&mut Self, Error>
392 where
393 S: for<'s> AsyncSystem<'s> + 'static,
394 {
395 self.add_inner(
396 name,
397 dependencies,
398 system.accessor().reads(),
399 system.accessor().writes(),
400 |this, id| {
401 if let Some(ref mut w) = this.world {
402 system.setup(w)
403 }
404
405 match this.items.entry(id) {
406 Entry::Vacant(e) => e.insert(Item::local_async(name.into(), system)),
407 Entry::Occupied(_) => panic!("Item was already created!"),
408 }
409 },
410 )?;
411
412 Ok(self)
413 }
414
415 fn add_inner<F>(
416 &mut self,
417 name: &str,
418 dependencies: &[&str],
419 mut reads: Vec<ResourceId>,
420 mut writes: Vec<ResourceId>,
421 f: F,
422 ) -> Result<&mut Self, Error>
423 where
424 F: FnOnce(&mut Self, SystemId) -> &mut Item,
425 {
426 let name = name.to_owned();
427 let id = self.next_id();
428 let id = match self.names.entry(name) {
429 Entry::Vacant(e) => Ok(*e.insert(id)),
430 Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())),
431 }?;
432
433 reads.sort();
434 writes.sort();
435
436 reads.dedup();
437 writes.dedup();
438
439 let mut dependencies = dependencies
440 .iter()
441 .map(|name| {
442 self.names
443 .get(*name)
444 .map(Clone::clone)
445 .ok_or_else(|| Error::DependencyWasNotFound((*name).into()))
446 })
447 .collect::<Result<Vec<_>, _>>()?;
448
449 for read in &reads {
450 for (key, value) in &self.items {
451 if value.writes.contains(read) {
452 dependencies.push(*key);
453 }
454 }
455 }
456
457 for write in &writes {
458 for (key, value) in &self.items {
459 if value.reads.contains(write) || value.writes.contains(write) {
460 dependencies.push(*key);
461 }
462 }
463 }
464
465 self.reduce_dependencies(&mut dependencies);
466
467 let receivers = dependencies
468 .iter()
469 .map(|id| self.items.get(id).unwrap().receiver.clone())
470 .collect();
471
472 let item = f(self, id);
473
474 item.reads = reads;
475 item.writes = writes;
476 item.receivers = receivers;
477 item.dependencies = dependencies;
478
479 Ok(self)
480 }
481
482 fn final_systems(&self) -> Vec<SystemId> {
483 let mut ret = self.items.keys().map(Clone::clone).collect();
484
485 self.reduce_dependencies(&mut ret);
486
487 ret
488 }
489
490 fn reduce_dependencies(&self, dependencies: &mut Vec<SystemId>) {
491 dependencies.sort();
492 dependencies.dedup();
493
494 let mut remove_indices = Vec::new();
495 for (i, a) in dependencies.iter().enumerate() {
496 for (j, b) in dependencies.iter().enumerate() {
497 if self.depends_on(a, b) {
498 remove_indices.push(j);
499 } else if self.depends_on(b, a) {
500 remove_indices.push(i);
501 }
502 }
503 }
504
505 remove_indices.sort_unstable();
506 remove_indices.dedup();
507 remove_indices.reverse();
508
509 for i in remove_indices {
510 dependencies.remove(i);
511 }
512 }
513
514 fn depends_on(&self, a: &SystemId, b: &SystemId) -> bool {
515 let item = self.items.get(a).unwrap();
516
517 if item.dependencies.contains(b) {
518 return true;
519 }
520
521 for d in &item.dependencies {
522 if self.depends_on(d, b) {
523 return true;
524 }
525 }
526
527 false
528 }
529
530 fn next_id(&mut self) -> SystemId {
531 self.next_id.0 += 1;
532
533 self.next_id
534 }
535}
536
537enum RunType {
539 Thread(ThreadRun),
540 Local(LocalRun),
541 ThreadAsync(ThreadRunAsync),
542 LocalAsync(LocalRunAsync),
543}
544
545struct Item {
547 name: String,
548 run: RunType,
549
550 sender: Sender,
551 receiver: Receiver,
552 receivers: Vec<Receiver>,
553
554 reads: Vec<ResourceId>,
555 writes: Vec<ResourceId>,
556 dependencies: Vec<SystemId>,
557}
558
559impl Item {
560 fn new(name: String, run: RunType) -> Self {
561 let (sender, receiver) = channel(());
562
563 Self {
564 name,
565 run,
566
567 sender,
568 receiver,
569 receivers: Vec::new(),
570
571 reads: Vec::new(),
572 writes: Vec::new(),
573 dependencies: Vec::new(),
574 }
575 }
576
577 fn thread<S>(name: String, system: S) -> Self
578 where
579 S: for<'s> System<'s> + Send + 'static,
580 {
581 Self::new(name, RunType::Thread(Box::new(system)))
582 }
583
584 fn local<S>(name: String, system: S) -> Self
585 where
586 S: for<'s> System<'s> + 'static,
587 {
588 Self::new(name, RunType::Local(Box::new(system)))
589 }
590
591 fn thread_async<S>(name: String, system: S) -> Self
592 where
593 S: for<'s> AsyncSystem<'s> + Send + 'static,
594 {
595 Self::new(name, RunType::ThreadAsync(Box::new(system)))
596 }
597
598 fn local_async<S>(name: String, system: S) -> Self
599 where
600 S: for<'s> AsyncSystem<'s> + 'static,
601 {
602 Self::new(name, RunType::LocalAsync(Box::new(system)))
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609
610 use crate::{
611 access::AccessorCow,
612 system::{DynamicSystemData, System},
613 world::World,
614 };
615
616 #[test]
617 fn dependencies_on_read_and_write() {
618 struct ResA;
651 struct ResB;
652 struct ResC;
653 struct ResD;
654
655 let sys1 = TestSystem::new(
656 vec![ResourceId::new::<ResA>()],
657 vec![ResourceId::new::<ResB>()],
658 );
659 let sys2 = TestSystem::new(
660 vec![ResourceId::new::<ResA>()],
661 vec![ResourceId::new::<ResC>()],
662 );
663 let sys3 = TestSystem::new(
664 vec![ResourceId::new::<ResB>()],
665 vec![ResourceId::new::<ResD>()],
666 );
667 let sys4 = TestSystem::new(
668 vec![ResourceId::new::<ResC>()],
669 vec![ResourceId::new::<ResD>()],
670 );
671 let sys5 = TestSystem::new(
672 vec![ResourceId::new::<ResA>()],
673 vec![
674 ResourceId::new::<ResB>(),
675 ResourceId::new::<ResC>(),
676 ResourceId::new::<ResD>(),
677 ],
678 );
679
680 let dispatcher = Dispatcher::builder()
681 .with(sys1, "sys1", &[])
682 .unwrap()
683 .with(sys2, "sys2", &[])
684 .unwrap()
685 .with(sys3, "sys3", &[])
686 .unwrap()
687 .with(sys4, "sys4", &[])
688 .unwrap()
689 .with(sys5, "sys5", &[])
690 .unwrap();
691
692 let sys1 = dispatcher.items.get(&SystemId(1)).unwrap();
693 let sys2 = dispatcher.items.get(&SystemId(2)).unwrap();
694 let sys3 = dispatcher.items.get(&SystemId(3)).unwrap();
695 let sys4 = dispatcher.items.get(&SystemId(4)).unwrap();
696 let sys5 = dispatcher.items.get(&SystemId(5)).unwrap();
697
698 assert_eq!(sys1.dependencies, vec![]);
699 assert_eq!(sys2.dependencies, vec![]);
700 assert_eq!(sys3.dependencies, vec![SystemId(1)]);
701 assert_eq!(sys4.dependencies, vec![SystemId(2), SystemId(3)]);
702 assert_eq!(sys5.dependencies, vec![SystemId(4)]);
703 assert_eq!(dispatcher.final_systems(), vec![SystemId(5)]);
704 }
705
706 struct TestSystem {
707 accessor: TestAccessor,
708 }
709
710 impl TestSystem {
711 fn new(reads: Vec<ResourceId>, writes: Vec<ResourceId>) -> Self {
712 Self {
713 accessor: TestAccessor { reads, writes },
714 }
715 }
716 }
717
718 impl<'a> System<'a> for TestSystem {
719 type SystemData = TestData;
720
721 fn run(&mut self, _data: Self::SystemData) {
722 unimplemented!()
723 }
724
725 fn accessor<'b>(&'b self) -> AccessorCow<'a, 'b, Self::SystemData> {
726 AccessorCow::Borrow(&self.accessor)
727 }
728 }
729
730 struct TestData;
731
732 impl<'a> DynamicSystemData<'a> for TestData {
733 type Accessor = TestAccessor;
734
735 fn setup(_accessor: &Self::Accessor, _world: &mut World) {}
736
737 fn fetch(_access: &Self::Accessor, _world: &'a World) -> Self {
738 TestData
739 }
740 }
741
742 struct TestAccessor {
743 reads: Vec<ResourceId>,
744 writes: Vec<ResourceId>,
745 }
746
747 impl Accessor for TestAccessor {
748 fn reads(&self) -> Vec<ResourceId> {
749 self.reads.clone()
750 }
751
752 fn writes(&self) -> Vec<ResourceId> {
753 self.writes.clone()
754 }
755 }
756}