specs_task/
lib.rs

1//! # Fork-join multitasking for SPECS ECS
2//!
3//! Instead of hand-rolling state machines to sequence the effects of various ECS systems, spawn
4//! tasks as entities and declare explicit temporal dependencies between them.
5//!
6//! ## Code Examples
7//!
8//! ### Making task graphs
9//!
10//! ```compile_fail
11//! fn make_static_task_graph(user: &TaskUser) {
12//!     // Any component that implements TaskComponent can be spawned.
13//!     let task_graph: TaskGraph = seq!(
14//!         @TaskFoo("hello"),
15//!         fork!(
16//!             @TaskBar { value: 1 },
17//!             @TaskBar { value: 2 },
18//!             @TaskBar { value: 3 }
19//!         ),
20//!         @TaskZing("goodbye")
21//!     );
22//!     task_graph.assemble(user, OnCompletion::Delete);
23//! }
24//!
25//! fn make_dynamic_task_graph(user: &TaskUser) {
26//!     let first: TaskGraph = task!(@TaskFoo("hello"));
27//!     let mut middle: TaskGraph = empty_graph!();
28//!     for i in 0..10 {
29//!         middle = fork!(middle, @TaskBar { value: i });
30//!     }
31//!     let last: TaskGraph = task!(@TaskZing("goodbye"));
32//!     let task_graph: TaskGraph = seq!(first, middle, last);
33//!     task_graph.assemble(user, OnCompletion::Delete);
34//! }
35//! ```
36//!
37//! ### Building a dispatcher with a `TaskRunnerSystem`
38//!
39//! ```compile_fail
40//! #[derive(Clone, Debug)]
41//! struct PushValue {
42//!     value: usize,
43//! }
44//!
45//! impl Component for PushValue {
46//!     type Storage = VecStorage<Self>;
47//! }
48//!
49//! impl<'a> TaskComponent<'a> for PushValue {
50//!     type Data = Write<'a, Vec<usize>>;
51//!
52//!     fn run(&mut self, data: &mut Self::Data) -> bool {
53//!         data.push(self.value);
54//!
55//!         true
56//!     }
57//! }
58//!
59//! fn make_dispatcher() -> Dispatcher {
60//!     DispatcherBuilder::new()
61//!         .with(
62//!             TaskRunnerSystem::<PushValue>::default(),
63//!             "push_value",
64//!             &[],
65//!         )
66//!         .with(
67//!             TaskManagerSystem,
68//!             "task_manager",
69//!             &[],
70//!         )
71//!         .build()
72//! }
73//! ```
74//!
75//! ## Data Model
76//!
77//! Here we expound on the technical details of this module's implementation. For basic usage, see
78//! the tests.
79//!
80//! In this model, every task is some entity. The entity is allowed to have exactly one component
81//! that implements `TaskComponent` (it may have other components that don't implement
82//! `TaskComponent`). The task will be run to completion by the corresponding `TaskRunnerSystem`.
83//!
84//! Every task entity is also a node in a (hopefully acyclic) directed graph. An edge `t2 --> t1`
85//! means that `t2` cannot start until `t1` has completed.
86//!
87//! In order for tasks to become unblocked, the `TaskManagerSystem` must run, whence it will
88//! traverse the graph, starting at the "final entities", and check for entities that have
89//! completed, potentially unblocking their parents. In order for a task to be run, it must be the
90//! descendent of a final entity. Entity component tuples become final by calling `finalize` (which
91//! adds a `FinalTag` component).
92//!
93//! Edges can either come from `SingleEdge` or `MultiEdge` components, but you should not use these
94//! types directly. You might wonder why we need both types of edges. It's a fair question, because
95//! adding the `SingleEdge` concept does not actually make the model capable of representing any
96//! semantically new graphs. The reason is efficiency.
97//!
98//! If you want to implement a fork join like this (note: time is going left to right but the
99//! directed edges are going right to left):
100//!
101//!```
102//! r#"       ----- t1.1 <---   ----- t2.1 <---
103//!          /               \ /               \
104//!      t0 <------ t1.2 <----<------ t2.2 <---- t3
105//!          \               / \               /
106//!           ----- t1.3 <---   ----- t2.3 <---      "#;
107//!```
108//!
109//! You would actually do this by calling `make_fork` to create two "fork" entities `F1` and `F2`
110//! that don't have `TaskComponent`s, but they can have both a `SingleEdge` and a `MultiEdge`. Note
111//! that the children on the `MultiEdge` are called "prongs" of the fork.
112//!
113//!```
114//! r#"      single          single          single
115//!     t0 <-------- F1 <-------------- F2 <-------- t3
116//!                   |                  |
117//!          t1.1 <---|          t2.1 <--|
118//!          t1.2 <---| multi    t2.2 <--| multi
119//!          t1.3 <---|          t2.3 <--|            "#;
120//!```
121//!
122//! The semantics would be such that this graph is equivalent to the one above. Before any of the
123//! tasks connected to `F2` by the `MultiEdge` could run, the tasks connected by the `SingleEdge`
124//! (`{ t0, t1.1, t1.2, t1.3 }`) would have to be complete. `t3` could only run once all of the
125//! descendents of `F2` had completed.
126//!
127//! The advantages of this scheme are:
128//!   - a traversal of the graph starting from `t3` does not visit the same node twice
129//!   - it is a bit easier to create fork-join graphs with larger numbers of concurrent tasks
130//!   - there are fewer edges for the most common use cases
131//!
132//! Here's another example with "nested forks" to test your understanding:
133//!
134//! ```
135//! r#"   With fork entities:
136//!
137//!           t0 <-------------- FA <----- t2
138//!                              |
139//!                       tx <---|
140//!               t1 <--- FB <---|
141//!                        |
142//!               ty <-----|
143//!               tz <-----|
144//!
145//!       As time orderings:
146//!
147//!           t0   < { t1, tx, ty, tz } < t2
148//!           t1   < { ty, tz }
149//!
150//!       Induced graph:
151//!
152//!           t0 <------- tx <------- t2
153//!            ^                      |
154//!            |      /------ ty <----|
155//!            |     v                |
156//!            ----- t1 <---- tz <-----          "#;
157//! ```
158//!
159//! ## Macro Usage
160//!
161//! Every user of this module should create task graphs via the `empty_graph!`, `seq!`, `fork!`, and
162//! `task!` macros, which make it easy to construct task graphs correctly. Once a graph is ready,
163//! call `assemble` on it to mark the task entities for execution.
164//!
165//! These systems must be scheduled for tasks to make progress:
166//!   - `TaskManagerSystem`
167//!   - `TaskRunnerSystem`
168//!
169//! ## Advanced Usage
170//!
171//! If you find the `TaskGraph` macros limiting, you can use the `TaskUser` methods; these are the
172//! building blocks for creating all task graphs, including buggy ones. These functions are totally
173//! dynamic in that they deal directly with entities of various archetypes, assuming that the
174//! programmer passed in the correct archetypes for the given function.
175//!
176//! Potential bugs that won't be detected for you:
177//!   - leaked orphan entities
178//!   - graph cycles
179//!   - finalizing an entity that has children
180//!   - users manually tampering with the `TaskProgress`, `SingleEdge`, `MultiEdge`, or `FinalTag`
181//!     components; these should only be used inside this module
182//!
183
184mod components;
185mod cons;
186mod manager;
187mod runner;
188mod user;
189mod writer;
190
191pub use components::{
192    FinalTag, MultiEdge, OnCompletion, SingleEdge, TaskComponent, TaskProgress,
193};
194pub use cons::{Cons, TaskFactory, TaskGraph};
195pub use user::TaskUser;
196pub use manager::TaskManagerSystem;
197pub use runner::TaskRunnerSystem;
198pub use writer::TaskWriter;
199
200use specs::prelude::*;
201
202#[derive(SystemData)]
203pub struct TaskData<'a, P, S, M, F> {
204    entities: Entities<'a>,
205    lazy: Read<'a, LazyUpdate>,
206    progress: P,
207    single_edges: S,
208    multi_edges: M,
209    final_tags: F,
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215
216    #[derive(Clone, Debug, Default, Eq, PartialEq)]
217    struct AlreadyComplete {
218        was_run: bool,
219    }
220
221    impl Component for AlreadyComplete {
222        type Storage = VecStorage<Self>;
223    }
224
225    impl<'a> TaskComponent<'a> for AlreadyComplete {
226        type Data = ();
227
228        fn run(&mut self, _data: &mut Self::Data) -> bool {
229            self.was_run = true;
230
231            true
232        }
233    }
234
235    #[derive(Clone)]
236    struct WriteValue {
237        value: usize,
238    }
239
240    impl Component for WriteValue {
241        type Storage = VecStorage<Self>;
242    }
243
244    impl<'a> TaskComponent<'a> for WriteValue {
245        type Data = Write<'a, usize>;
246
247        fn run(&mut self, data: &mut Self::Data) -> bool {
248            **data = self.value;
249
250            true
251        }
252    }
253
254    fn set_up<'a, 'b>() -> (World, Dispatcher<'a, 'b>) {
255        let mut world = World::new();
256        let mut dispatcher = DispatcherBuilder::new()
257            .with(
258                TaskRunnerSystem::<AlreadyComplete>::default(),
259                "already_complete",
260                &[],
261            )
262            .with(
263                TaskRunnerSystem::<WriteValue>::default(),
264                "write_value",
265                &[],
266            )
267            // For sake of reproducible tests, assume the manager system is the last to run.
268            .with(
269                TaskManagerSystem,
270                "task_manager",
271                &["already_complete", "write_value"],
272            )
273            .build();
274        dispatcher.setup(&mut world);
275
276        (world, dispatcher)
277    }
278
279    enum MakeSingleTask {
280        Finalize(OnCompletion),
281        DontFinalize,
282    }
283
284    fn make_single_task<'a, T: TaskComponent<'a> + Send + Sync>(
285        world: &mut World,
286        task: T,
287        option: MakeSingleTask,
288    ) -> Entity {
289        let entity = world.exec(|user: TaskUser| {
290            let task = user.make_task_lazy(task);
291            if let MakeSingleTask::Finalize(on_completion) = option {
292                user.finalize_lazy(task, on_completion);
293            }
294
295            task
296        });
297        world.maintain();
298
299        entity
300    }
301
302    fn entity_is_complete(world: &mut World, entity: Entity) -> bool {
303        world.exec(|user: TaskUser| user.entity_is_complete(entity))
304    }
305
306    #[test]
307    fn single_task_not_run_until_finalized() {
308        let (mut world, mut dispatcher) = set_up();
309
310        let task = make_single_task(
311            &mut world,
312            AlreadyComplete::default(),
313            MakeSingleTask::DontFinalize,
314        );
315
316        // Give the task a chance to get unblocked if there was a bug.
317        dispatcher.dispatch(&world);
318        dispatcher.dispatch(&world);
319
320        assert_eq!(
321            world.read_storage::<AlreadyComplete>().get(task),
322            Some(&AlreadyComplete { was_run: false })
323        );
324
325        world.exec(|user: TaskUser| user.finalize_lazy(task, OnCompletion::None));
326        world.maintain();
327
328        // Unblock the task.
329        dispatcher.dispatch(&world);
330        // Run the task.
331        dispatcher.dispatch(&world);
332        // If there was a bug that deleted our entity, this would be necessary to see it.
333        world.maintain();
334
335        assert!(entity_is_complete(&mut world, task));
336        assert_eq!(
337            world.read_storage::<AlreadyComplete>().get(task),
338            Some(&AlreadyComplete { was_run: true }),
339        );
340    }
341
342    #[test]
343    fn single_task_deleted_on_completion() {
344        let (mut world, mut dispatcher) = set_up();
345
346        let task = make_single_task(
347            &mut world,
348            AlreadyComplete::default(),
349            MakeSingleTask::Finalize(OnCompletion::Delete),
350        );
351
352        // Unblock the task.
353        dispatcher.dispatch(&world);
354        // Run the task, after which it should be deleted.
355        dispatcher.dispatch(&world);
356        // This needs to be done for the entity deletion to be visible.
357        world.maintain();
358
359        assert_eq!(world.entities().is_alive(task), false);
360    }
361
362    #[test]
363    fn joined_tasks_run_in_order_and_deleted_on_completion() {
364        let (mut world, mut dispatcher) = set_up();
365
366        let root = world.exec(|user: TaskUser| {
367            let task_graph: TaskGraph = seq!(
368                @WriteValue { value: 1 },
369                @WriteValue { value: 2 },
370                @WriteValue { value: 3 }
371            );
372
373            task_graph.assemble(OnCompletion::Delete, &user)
374        });
375        world.maintain();
376
377        dispatcher.dispatch(&world);
378        dispatcher.dispatch(&world);
379        assert_eq!(*world.fetch::<usize>(), 1);
380        dispatcher.dispatch(&world);
381        assert_eq!(*world.fetch::<usize>(), 2);
382        dispatcher.dispatch(&world);
383        assert_eq!(*world.fetch::<usize>(), 3);
384
385        world.maintain();
386        assert_eq!(world.entities().is_alive(root.unwrap()), false);
387    }
388
389    #[test]
390    fn all_prongs_of_fork_run_before_join_and_deleted_on_completion() {
391        let (mut world, mut dispatcher) = set_up();
392
393        //         ---> t1.1 ---
394        //       /               \
395        //     t2 ----> t1.2 -----> t0
396
397        let root = world.exec(|user: TaskUser| {
398            let task_graph: TaskGraph = seq!(
399                @WriteValue { value: 1 },
400                fork!(
401                    @WriteValue { value: 2 },
402                    @WriteValue { value: 3 }
403                ),
404                @WriteValue { value: 4 }
405            );
406
407            task_graph.assemble(OnCompletion::Delete, &user)
408        });
409        world.maintain();
410
411        dispatcher.dispatch(&world);
412        dispatcher.dispatch(&world);
413        assert_eq!(*world.fetch::<usize>(), 1);
414        dispatcher.dispatch(&world);
415        let cur_value = *world.fetch::<usize>();
416        assert!(cur_value == 2 || cur_value == 3);
417        dispatcher.dispatch(&world);
418        assert_eq!(*world.fetch::<usize>(), 4);
419
420        world.maintain();
421        assert_eq!(world.entities().is_alive(root.unwrap()), false);
422    }
423}