async_ecs/dispatcher/
builder.rs

1use std::fmt::Debug;
2
3use hashbrown::hash_map::{Entry, HashMap};
4use tokio::{
5    sync::watch::channel,
6    task::{spawn as spawn_task, spawn_local},
7};
8
9use crate::{
10    access::Accessor,
11    resource::ResourceId,
12    system::{AsyncSystem, System},
13    world::World,
14};
15
16use super::{
17    task::{execute_local, execute_local_async, execute_thread, execute_thread_async},
18    Dispatcher, Error, LocalRun, LocalRunAsync, Receiver, Sender, SharedWorld, ThreadRun,
19    ThreadRunAsync,
20};
21
22/// Id of a system inside the `Dispatcher` and the `Builder`.
23#[derive(Default, Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
24struct SystemId(pub usize);
25
26/// Builder for the [`Dispatcher`].
27///
28/// [`Dispatcher`]: struct.Dispatcher.html
29///
30/// ## Barriers
31///
32/// Barriers are a way of sequentializing parts of
33/// the system execution. See `add_barrier()`/`with_barrier()`.
34///
35/// ## Examples
36///
37/// This is how you create a dispatcher with
38/// a shared thread pool:
39///
40/// ```rust
41/// # #![allow(unused)]
42/// #
43/// # use async_ecs::*;
44/// #
45/// # #[derive(Debug, Default)]
46/// # struct Res;
47/// #
48/// # #[derive(SystemData)]
49/// # struct Data<'a> { a: Read<'a, Res> }
50/// #
51/// # struct Dummy;
52/// #
53/// # impl<'a> System<'a> for Dummy {
54/// #   type SystemData = Data<'a>;
55/// #
56/// #   fn run(&mut self, _: Data<'a>) {}
57/// # }
58/// #
59/// # #[tokio::main]
60/// # async fn main() {
61/// # let system_a = Dummy;
62/// # let system_b = Dummy;
63/// # let system_c = Dummy;
64/// # let system_d = Dummy;
65/// # let system_e = Dummy;
66/// #
67/// let dispatcher = Dispatcher::builder()
68///     .with(system_a, "a", &[])
69///     .unwrap()
70///     .with(system_b, "b", &["a"])
71///     .unwrap() // b depends on a
72///     .with(system_c, "c", &["a"])
73///     .unwrap() // c also depends on a
74///     .with(system_d, "d", &[])
75///     .unwrap()
76///     .with(system_e, "e", &["c", "d"])
77///     .unwrap() // e executes after c and d are finished
78///     .build();
79/// # }
80/// ```
81///
82/// Systems can be conditionally added by using the `add_` functions:
83///
84/// ```rust
85/// # #![allow(unused)]
86/// #
87/// # use async_ecs::*;
88/// #
89/// # #[derive(Debug, Default)]
90/// # struct Res;
91/// #
92/// # #[derive(SystemData)]
93/// # struct Data<'a> { a: Read<'a, Res> }
94/// #
95/// # struct Dummy;
96/// #
97/// # impl<'a> System<'a> for Dummy {
98/// #   type SystemData = Data<'a>;
99/// #
100/// #   fn run(&mut self, _: Data<'a>) {}
101/// # }
102/// #
103/// # #[tokio::main]
104/// # async fn main() {
105/// # let b_enabled = true;
106/// # let system_a = Dummy;
107/// # let system_b = Dummy;
108/// let mut builder = Dispatcher::builder().with(system_a, "a", &[]).unwrap();
109///
110/// if b_enabled {
111///     builder.add(system_b, "b", &[]).unwrap();
112/// }
113///
114/// let dispatcher = builder.build();
115/// # }
116/// ```
117pub struct Builder<'a> {
118    world: Option<&'a mut World>,
119    next_id: SystemId,
120    items: HashMap<SystemId, Item>,
121    names: HashMap<String, SystemId>,
122}
123
124impl<'a> Builder<'a> {
125    pub fn new(world: Option<&'a mut World>) -> Self {
126        Self {
127            world,
128            next_id: Default::default(),
129            items: Default::default(),
130            names: Default::default(),
131        }
132    }
133
134    /// Builds the `Dispatcher`.
135    ///
136    /// This method will precompute useful information in order to speed up dispatching.
137    pub fn build(self) -> Dispatcher {
138        let receivers = self
139            .final_systems()
140            .into_iter()
141            .map(|id| self.items.get(&id).unwrap().receiver.clone())
142            .collect();
143
144        let world = SharedWorld::default();
145        let (sender, receiver) = channel(());
146
147        for (_, item) in self.items.into_iter() {
148            let run = item.run;
149            let name = item.name;
150            let sender = item.sender;
151            let receivers = if item.dependencies.is_empty() {
152                vec![receiver.clone()]
153            } else {
154                item.receivers
155            };
156
157            match run {
158                RunType::Thread(run) => {
159                    spawn_task(execute_thread(name, run, sender, receivers, world.clone()))
160                }
161                RunType::Local(run) => {
162                    spawn_local(execute_local(name, run, sender, receivers, world.clone()))
163                }
164                RunType::ThreadAsync(run) => spawn_task(execute_thread_async(
165                    name,
166                    run,
167                    sender,
168                    receivers,
169                    world.clone(),
170                )),
171                RunType::LocalAsync(run) => spawn_local(execute_local_async(
172                    name,
173                    run,
174                    sender,
175                    receivers,
176                    world.clone(),
177                )),
178            };
179        }
180
181        Dispatcher {
182            sender,
183            receivers,
184            world,
185        }
186    }
187
188    /// Adds a new system with a given name and a list of dependencies.
189    /// Please note that the dependency should be added before
190    /// you add the depending system.
191    ///
192    /// If you want to register systems which can not be specified as
193    /// dependencies, you can use `""` as their name, which will not panic
194    /// (using another name twice will).
195    ///
196    /// Same as [`add()`](struct.Dispatcher::builder().html#method.add), but
197    /// returns `self` to enable method chaining.
198    pub fn with<S>(mut self, system: S, name: &str, dependencies: &[&str]) -> Result<Self, Error>
199    where
200        S: for<'s> System<'s> + Send + 'static,
201    {
202        self.add(system, name, dependencies)?;
203
204        Ok(self)
205    }
206
207    /// Adds a new system with a given name and a list of dependencies.
208    /// Please note that the dependency should be added before
209    /// you add the depending system.
210    ///
211    /// If you want to register systems which can not be specified as
212    /// dependencies, you can use `""` as their name, which will not panic
213    /// (using another name twice will).
214    pub fn add<S>(
215        &mut self,
216        mut system: S,
217        name: &str,
218        dependencies: &[&str],
219    ) -> Result<&mut Self, Error>
220    where
221        S: for<'s> System<'s> + Send + 'static,
222    {
223        self.add_inner(
224            name,
225            dependencies,
226            system.accessor().reads(),
227            system.accessor().writes(),
228            |this, id| {
229                if let Some(ref mut w) = this.world {
230                    system.setup(w)
231                }
232
233                match this.items.entry(id) {
234                    Entry::Vacant(e) => e.insert(Item::thread(name.into(), system)),
235                    Entry::Occupied(_) => panic!("Item was already created!"),
236                }
237            },
238        )?;
239
240        Ok(self)
241    }
242
243    /// Adds a new asynchronous system with a given name and a list of dependencies.
244    /// Please note that the dependency should be added before
245    /// you add the depending system.
246    ///
247    /// If you want to register systems which can not be specified as
248    /// dependencies, you can use `""` as their name, which will not panic
249    /// (using another name twice will).
250    ///
251    /// Same as [`add()`](struct.Dispatcher::builder().html#method.add), but
252    /// returns `self` to enable method chaining.
253    pub fn with_async<S>(
254        mut self,
255        system: S,
256        name: &str,
257        dependencies: &[&str],
258    ) -> Result<Self, Error>
259    where
260        S: for<'s> AsyncSystem<'s> + Send + 'static,
261    {
262        self.add_async(system, name, dependencies)?;
263
264        Ok(self)
265    }
266
267    /// Adds a new asynchronous system with a given name and a list of dependencies.
268    /// Please note that the dependency should be added before
269    /// you add the depending system.
270    ///
271    /// If you want to register systems which can not be specified as
272    /// dependencies, you can use `""` as their name, which will not panic
273    /// (using another name twice will).
274    pub fn add_async<S>(
275        &mut self,
276        mut system: S,
277        name: &str,
278        dependencies: &[&str],
279    ) -> Result<&mut Self, Error>
280    where
281        S: for<'s> AsyncSystem<'s> + Send + 'static,
282    {
283        self.add_inner(
284            name,
285            dependencies,
286            system.accessor().reads(),
287            system.accessor().writes(),
288            |this, id| {
289                if let Some(ref mut w) = this.world {
290                    system.setup(w)
291                }
292
293                match this.items.entry(id) {
294                    Entry::Vacant(e) => e.insert(Item::thread_async(name.into(), system)),
295                    Entry::Occupied(_) => panic!("Item was already created!"),
296                }
297            },
298        )?;
299
300        Ok(self)
301    }
302
303    /// Adds a new thread local system.
304    ///
305    /// Please only use this if your struct is not `Send` and `Sync`.
306    ///
307    /// Thread-local systems are dispatched in-order.
308    ///
309    /// Same as [Dispatcher::builder()::add_local], but returns `self` to
310    /// enable method chaining.
311    pub fn with_local<S>(
312        mut self,
313        system: S,
314        name: &str,
315        dependencies: &[&str],
316    ) -> Result<Self, Error>
317    where
318        S: for<'s> System<'s> + 'static,
319    {
320        self.add_local(system, name, dependencies)?;
321
322        Ok(self)
323    }
324
325    /// Adds a new thread local system.
326    ///
327    /// Please only use this if your struct is not `Send` and `Sync`.
328    ///
329    /// Thread-local systems are dispatched in-order.
330    pub fn add_local<S>(
331        &mut self,
332        mut system: S,
333        name: &str,
334        dependencies: &[&str],
335    ) -> Result<&mut Self, Error>
336    where
337        S: for<'s> System<'s> + 'static,
338    {
339        self.add_inner(
340            name,
341            dependencies,
342            system.accessor().reads(),
343            system.accessor().writes(),
344            |this, id| {
345                if let Some(ref mut w) = this.world {
346                    system.setup(w)
347                }
348
349                match this.items.entry(id) {
350                    Entry::Vacant(e) => e.insert(Item::local(name.into(), system)),
351                    Entry::Occupied(_) => panic!("Item was already created!"),
352                }
353            },
354        )?;
355
356        Ok(self)
357    }
358
359    /// Adds a new thread local asynchronous system.
360    ///
361    /// Please only use this if your struct is not `Send` and `Sync`.
362    ///
363    /// Thread-local systems are dispatched in-order.
364    ///
365    /// Same as [Dispatcher::builder()::add_local], but returns `self` to
366    /// enable method chaining.
367    pub fn with_local_async<S>(
368        mut self,
369        system: S,
370        name: &str,
371        dependencies: &[&str],
372    ) -> Result<Self, Error>
373    where
374        S: for<'s> AsyncSystem<'s> + 'static,
375    {
376        self.add_local_async(system, name, dependencies)?;
377
378        Ok(self)
379    }
380
381    /// Adds a new thread local asynchronous system.
382    ///
383    /// Please only use this if your struct is not `Send` and `Sync`.
384    ///
385    /// Thread-local systems are dispatched in-order.
386    pub fn add_local_async<S>(
387        &mut self,
388        mut system: S,
389        name: &str,
390        dependencies: &[&str],
391    ) -> Result<&mut Self, Error>
392    where
393        S: for<'s> AsyncSystem<'s> + 'static,
394    {
395        self.add_inner(
396            name,
397            dependencies,
398            system.accessor().reads(),
399            system.accessor().writes(),
400            |this, id| {
401                if let Some(ref mut w) = this.world {
402                    system.setup(w)
403                }
404
405                match this.items.entry(id) {
406                    Entry::Vacant(e) => e.insert(Item::local_async(name.into(), system)),
407                    Entry::Occupied(_) => panic!("Item was already created!"),
408                }
409            },
410        )?;
411
412        Ok(self)
413    }
414
415    fn add_inner<F>(
416        &mut self,
417        name: &str,
418        dependencies: &[&str],
419        mut reads: Vec<ResourceId>,
420        mut writes: Vec<ResourceId>,
421        f: F,
422    ) -> Result<&mut Self, Error>
423    where
424        F: FnOnce(&mut Self, SystemId) -> &mut Item,
425    {
426        let name = name.to_owned();
427        let id = self.next_id();
428        let id = match self.names.entry(name) {
429            Entry::Vacant(e) => Ok(*e.insert(id)),
430            Entry::Occupied(e) => Err(Error::NameAlreadyRegistered(e.key().into())),
431        }?;
432
433        reads.sort();
434        writes.sort();
435
436        reads.dedup();
437        writes.dedup();
438
439        let mut dependencies = dependencies
440            .iter()
441            .map(|name| {
442                self.names
443                    .get(*name)
444                    .map(Clone::clone)
445                    .ok_or_else(|| Error::DependencyWasNotFound((*name).into()))
446            })
447            .collect::<Result<Vec<_>, _>>()?;
448
449        for read in &reads {
450            for (key, value) in &self.items {
451                if value.writes.contains(read) {
452                    dependencies.push(*key);
453                }
454            }
455        }
456
457        for write in &writes {
458            for (key, value) in &self.items {
459                if value.reads.contains(write) || value.writes.contains(write) {
460                    dependencies.push(*key);
461                }
462            }
463        }
464
465        self.reduce_dependencies(&mut dependencies);
466
467        let receivers = dependencies
468            .iter()
469            .map(|id| self.items.get(id).unwrap().receiver.clone())
470            .collect();
471
472        let item = f(self, id);
473
474        item.reads = reads;
475        item.writes = writes;
476        item.receivers = receivers;
477        item.dependencies = dependencies;
478
479        Ok(self)
480    }
481
482    fn final_systems(&self) -> Vec<SystemId> {
483        let mut ret = self.items.keys().map(Clone::clone).collect();
484
485        self.reduce_dependencies(&mut ret);
486
487        ret
488    }
489
490    fn reduce_dependencies(&self, dependencies: &mut Vec<SystemId>) {
491        dependencies.sort();
492        dependencies.dedup();
493
494        let mut remove_indices = Vec::new();
495        for (i, a) in dependencies.iter().enumerate() {
496            for (j, b) in dependencies.iter().enumerate() {
497                if self.depends_on(a, b) {
498                    remove_indices.push(j);
499                } else if self.depends_on(b, a) {
500                    remove_indices.push(i);
501                }
502            }
503        }
504
505        remove_indices.sort_unstable();
506        remove_indices.dedup();
507        remove_indices.reverse();
508
509        for i in remove_indices {
510            dependencies.remove(i);
511        }
512    }
513
514    fn depends_on(&self, a: &SystemId, b: &SystemId) -> bool {
515        let item = self.items.get(a).unwrap();
516
517        if item.dependencies.contains(b) {
518            return true;
519        }
520
521        for d in &item.dependencies {
522            if self.depends_on(d, b) {
523                return true;
524            }
525        }
526
527        false
528    }
529
530    fn next_id(&mut self) -> SystemId {
531        self.next_id.0 += 1;
532
533        self.next_id
534    }
535}
536
537/// Defines how to execute the `System` with the `Dispatcher`.
538enum RunType {
539    Thread(ThreadRun),
540    Local(LocalRun),
541    ThreadAsync(ThreadRunAsync),
542    LocalAsync(LocalRunAsync),
543}
544
545/// Item that wraps all information of a 'System` within the `Builder`.
546struct Item {
547    name: String,
548    run: RunType,
549
550    sender: Sender,
551    receiver: Receiver,
552    receivers: Vec<Receiver>,
553
554    reads: Vec<ResourceId>,
555    writes: Vec<ResourceId>,
556    dependencies: Vec<SystemId>,
557}
558
559impl Item {
560    fn new(name: String, run: RunType) -> Self {
561        let (sender, receiver) = channel(());
562
563        Self {
564            name,
565            run,
566
567            sender,
568            receiver,
569            receivers: Vec::new(),
570
571            reads: Vec::new(),
572            writes: Vec::new(),
573            dependencies: Vec::new(),
574        }
575    }
576
577    fn thread<S>(name: String, system: S) -> Self
578    where
579        S: for<'s> System<'s> + Send + 'static,
580    {
581        Self::new(name, RunType::Thread(Box::new(system)))
582    }
583
584    fn local<S>(name: String, system: S) -> Self
585    where
586        S: for<'s> System<'s> + 'static,
587    {
588        Self::new(name, RunType::Local(Box::new(system)))
589    }
590
591    fn thread_async<S>(name: String, system: S) -> Self
592    where
593        S: for<'s> AsyncSystem<'s> + Send + 'static,
594    {
595        Self::new(name, RunType::ThreadAsync(Box::new(system)))
596    }
597
598    fn local_async<S>(name: String, system: S) -> Self
599    where
600        S: for<'s> AsyncSystem<'s> + 'static,
601    {
602        Self::new(name, RunType::LocalAsync(Box::new(system)))
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use super::*;
609
610    use crate::{
611        access::AccessorCow,
612        system::{DynamicSystemData, System},
613        world::World,
614    };
615
616    #[test]
617    fn dependencies_on_read_and_write() {
618        /*
619            - Systems ------------------------------------
620                Id:     1       2       3       4       5
621
622            - Resources ----------------------------------
623                Read:   A       A       B       C       A
624                Write:  B       C       D       D      BCD
625
626            - Dependencies Total -------------------------
627                        |       |       |       |       |
628                        |<--------------|       |       |
629                        |       |       |       |       |
630                        |       |<--------------|       |
631                        |       |       |<------|       |
632                        |       |       |       |       |
633                        |<------------------------------|
634                        |       |<----------------------|
635                        |       |       |<--------------|
636                        |       |       |       |<------|
637                        |       |       |       |       |
638
639            - Dependencies Reduced -----------------------
640                        |       |       |       |       |
641                        |<--------------|       |       |
642                        |       |       |       |       |
643                        |       |<--------------|       |
644                        |       |       |<------|       |
645                        |       |       |       |       |
646                        |       |       |       |<------|
647                        |       |       |       |       |
648        */
649
650        struct ResA;
651        struct ResB;
652        struct ResC;
653        struct ResD;
654
655        let sys1 = TestSystem::new(
656            vec![ResourceId::new::<ResA>()],
657            vec![ResourceId::new::<ResB>()],
658        );
659        let sys2 = TestSystem::new(
660            vec![ResourceId::new::<ResA>()],
661            vec![ResourceId::new::<ResC>()],
662        );
663        let sys3 = TestSystem::new(
664            vec![ResourceId::new::<ResB>()],
665            vec![ResourceId::new::<ResD>()],
666        );
667        let sys4 = TestSystem::new(
668            vec![ResourceId::new::<ResC>()],
669            vec![ResourceId::new::<ResD>()],
670        );
671        let sys5 = TestSystem::new(
672            vec![ResourceId::new::<ResA>()],
673            vec![
674                ResourceId::new::<ResB>(),
675                ResourceId::new::<ResC>(),
676                ResourceId::new::<ResD>(),
677            ],
678        );
679
680        let dispatcher = Dispatcher::builder()
681            .with(sys1, "sys1", &[])
682            .unwrap()
683            .with(sys2, "sys2", &[])
684            .unwrap()
685            .with(sys3, "sys3", &[])
686            .unwrap()
687            .with(sys4, "sys4", &[])
688            .unwrap()
689            .with(sys5, "sys5", &[])
690            .unwrap();
691
692        let sys1 = dispatcher.items.get(&SystemId(1)).unwrap();
693        let sys2 = dispatcher.items.get(&SystemId(2)).unwrap();
694        let sys3 = dispatcher.items.get(&SystemId(3)).unwrap();
695        let sys4 = dispatcher.items.get(&SystemId(4)).unwrap();
696        let sys5 = dispatcher.items.get(&SystemId(5)).unwrap();
697
698        assert_eq!(sys1.dependencies, vec![]);
699        assert_eq!(sys2.dependencies, vec![]);
700        assert_eq!(sys3.dependencies, vec![SystemId(1)]);
701        assert_eq!(sys4.dependencies, vec![SystemId(2), SystemId(3)]);
702        assert_eq!(sys5.dependencies, vec![SystemId(4)]);
703        assert_eq!(dispatcher.final_systems(), vec![SystemId(5)]);
704    }
705
706    struct TestSystem {
707        accessor: TestAccessor,
708    }
709
710    impl TestSystem {
711        fn new(reads: Vec<ResourceId>, writes: Vec<ResourceId>) -> Self {
712            Self {
713                accessor: TestAccessor { reads, writes },
714            }
715        }
716    }
717
718    impl<'a> System<'a> for TestSystem {
719        type SystemData = TestData;
720
721        fn run(&mut self, _data: Self::SystemData) {
722            unimplemented!()
723        }
724
725        fn accessor<'b>(&'b self) -> AccessorCow<'a, 'b, Self::SystemData> {
726            AccessorCow::Borrow(&self.accessor)
727        }
728    }
729
730    struct TestData;
731
732    impl<'a> DynamicSystemData<'a> for TestData {
733        type Accessor = TestAccessor;
734
735        fn setup(_accessor: &Self::Accessor, _world: &mut World) {}
736
737        fn fetch(_access: &Self::Accessor, _world: &'a World) -> Self {
738            TestData
739        }
740    }
741
742    struct TestAccessor {
743        reads: Vec<ResourceId>,
744        writes: Vec<ResourceId>,
745    }
746
747    impl Accessor for TestAccessor {
748        fn reads(&self) -> Vec<ResourceId> {
749            self.reads.clone()
750        }
751
752        fn writes(&self) -> Vec<ResourceId> {
753            self.writes.clone()
754        }
755    }
756}