legion_task/
lib.rs

1//! # Fork-join multitasking for Legion ECS
2//!
3//! Instead of hand-rolling state machines to sequence the effects of various ECS systems, spawn
4//! tasks as entities and declare explicit temporal dependencies between them.
5//!
6//! ## Code Example: making task graphs and dispatching task runners
7//!
8//! ```
9//! use legion::prelude::*;
10//! use legion_task::*;
11//!
12//! #[derive(Clone)]
13//! struct SaySomething(&'static str);
14//!
15//! impl<'a> TaskComponent<'a> for SaySomething {
16//!     type Data = ();
17//!
18//!     fn run(&mut self, data: &mut Self::Data) -> bool {
19//!         println!("{}", self.0);
20//!
21//!         true
22//!     }
23//! }
24//!
25//! #[derive(Clone, Debug)]
26//! struct PushValue {
27//!     value: usize,
28//! }
29//!
30//! impl<'a> TaskComponent<'a> for PushValue {
31//!     type Data = Vec<usize>;
32//!
33//!     fn run(&mut self, data: &mut Self::Data) -> bool {
34//!         data.push(self.value);
35//!
36//!         true
37//!     }
38//! }
39//!
40//! fn make_static_task_graph(cmd: &mut CommandBuffer) {
41//!     // Any component that implements TaskComponent can be spawned.
42//!     let task_graph: TaskGraph = seq!(
43//!         @SaySomething("hello"),
44//!         fork!(
45//!             @PushValue { value: 1 },
46//!             @PushValue { value: 2 },
47//!             @PushValue { value: 3 }
48//!         ),
49//!         @SaySomething("goodbye")
50//!     );
51//!     task_graph.assemble(OnCompletion::Delete, cmd);
52//! }
53//!
54//! fn make_dynamic_task_graph(cmd: &mut CommandBuffer) {
55//!     let first: TaskGraph = task!(@SaySomething("hello"));
56//!     let mut middle: TaskGraph = empty_graph!();
57//!     for i in 0..10 {
58//!         middle = fork!(middle, @PushValue { value: i });
59//!     }
60//!     let last: TaskGraph = task!(@SaySomething("goodbye"));
61//!     let task_graph: TaskGraph = seq!(first, middle, last);
62//!     task_graph.assemble(OnCompletion::Delete, cmd);
63//! }
64//!
65//! fn build_say_something_task_runner_system() -> Box<dyn Schedulable> {
66//!     SystemBuilder::new("say_something_task_runner")
67//!         .with_query(task_runner_query::<SaySomething>())
68//!         .build(|_, mut world, _, task_query| {
69//!             run_tasks(&mut world, &mut (), task_query)
70//!         })
71//! }
72//!
73//! fn build_push_value_task_runner_system() -> Box<dyn Schedulable> {
74//!     SystemBuilder::new("push_value_task_runner")
75//!         .write_resource::<Vec<usize>>()
76//!         .with_query(task_runner_query::<PushValue>())
77//!         .build(|_, mut world, value, task_query| {
78//!             run_tasks(&mut world, &mut **value, task_query)
79//!         })
80//! }
81//!
82//! fn make_schedule() -> Schedule {
83//!     Schedule::builder()
84//!         .add_system(build_say_something_task_runner_system())
85//!         .add_system(build_push_value_task_runner_system())
86//!         .add_system(build_task_manager_system("task_manager"))
87//!         .build()
88//! }
89//! ```
90//!
91//! ## Data Model
92//!
93//! Here we expound on the technical details of this module's implementation. For basic usage, see
94//! the tests.
95//!
96//! In this model, every task is some entity. The entity is allowed to have exactly one component
97//! that implements `TaskComponent` (it may have other components that don't implement
98//! `TaskComponent`). The task will be run to completion by running a system that calls `run_tasks`
99//! with the proper `TaskComponent::Data` and `task_query`.
100//!
101//! Every task entity is also a node in a (hopefully acyclic) directed graph. An edge `t2 --> t1`
102//! means that `t2` cannot start until `t1` has completed.
103//!
104//! In order for tasks to become unblocked, the system created with `build_task_manager_system` must
105//! run, whence it will traverse the graph, starting at the "final entities", and check for entities
106//! that have completed, potentially unblocking their parents. In order for a task to be run, it
107//! must be the descendent of a final entity. Entity component tuples become final by calling
108//! `finalize` (which adds a `FinalTag` component).
109//!
110//! Edges can either come from `SingleEdge` or `MultiEdge` components, but you should not use these
111//! types directly. You might wonder why we need both types of edges. It's a fair question, because
112//! adding the `SingleEdge` concept does not actually make the model capable of representing any
113//! semantically new graphs. The reason is efficiency.
114//!
115//! If you want to implement a fork join like this (note: time is going left to right but the
116//! directed edges are going right to left):
117//!
118//!```
119//! r#"       ----- t1.1 <---   ----- t2.1 <---
120//!          /               \ /               \
121//!      t0 <------ t1.2 <----<------ t2.2 <---- t3
122//!          \               / \               /
123//!           ----- t1.3 <---   ----- t2.3 <---      "#;
124//!```
125//!
126//! You would actually do this by calling `make_fork` to create two "fork" entities `F1` and `F2`
127//! that don't have `TaskComponent`s, but they can have both a `SingleEdge` and a `MultiEdge`. Note
128//! that the children on the `MultiEdge` are called "prongs" of the fork.
129//!
130//!```
131//! r#"      single          single          single
132//!     t0 <-------- F1 <-------------- F2 <-------- t3
133//!                   |                  |
134//!          t1.1 <---|          t2.1 <--|
135//!          t1.2 <---| multi    t2.2 <--| multi
136//!          t1.3 <---|          t2.3 <--|            "#;
137//!```
138//!
139//! The semantics would be such that this graph is equivalent to the one above. Before any of the
140//! tasks connected to `F2` by the `MultiEdge` could run, the tasks connected by the `SingleEdge`
141//! (`{ t0, t1.1, t1.2, t1.3 }`) would have to be complete. `t3` could only run once all of the
142//! descendents of `F2` had completed.
143//!
144//! The advantages of this scheme are:
145//!   - a traversal of the graph starting from `t3` does not visit the same node twice
146//!   - it is a bit easier to create fork-join graphs with larger numbers of concurrent tasks
147//!   - there are fewer edges for the most common use cases
148//!
149//! Here's another example with "nested forks" to test your understanding:
150//!
151//! ```
152//! r#"   With fork entities:
153//!
154//!           t0 <-------------- FA <----- t2
155//!                              |
156//!                       tx <---|
157//!               t1 <--- FB <---|
158//!                        |
159//!               ty <-----|
160//!               tz <-----|
161//!
162//!       As time orderings:
163//!
164//!           t0   < { t1, tx, ty, tz } < t2
165//!           t1   < { ty, tz }
166//!
167//!       Induced graph:
168//!
169//!           t0 <------- tx <------- t2
170//!            ^                      |
171//!            |      /------ ty <----|
172//!            |     v                |
173//!            ----- t1 <---- tz <-----          "#;
174//! ```
175//!
176//! ## Macro Usage
177//!
178//! Every user of this module should create task graphs via the `empty_graph!`, `seq!`, `fork!`, and
179//! `task!` macros, which make it easy to construct task graphs correctly. Once a graph is ready,
180//! call `assemble` on it to mark the task entities for execution (by finalizing the root of the
181//! graph).
182//!
183//! These systems must be scheduled for tasks to make progress:
184//!   - a system created with `build_task_manager_system`
185//!   - a system that calls `run_tasks` on each `TaskComponent` used
186//!
187//! ## Advanced Usage
188//!
189//! If you find the `TaskGraph` macros limiting, you can use the `make_task`, `join`, `make_fork`,
190//! and `add_prong` functions; these are the building blocks for creating all task graphs, including
191//! buggy ones. These functions are totally dynamic in that they deal directly with entities of
192//! various archetypes, assuming that the programmer passed in the correct archetypes for the given
193//! function.
194//!
195//! Potential bugs that won't be detected for you:
196//!   - leaked orphan entities
197//!   - graph cycles
198//!   - finalizing an entity that has children
199//!   - users manually tampering with the `TaskProgress`, `SingleEdge`, `MultiEdge`, or `FinalTag`
200//!     components; these should only be used inside this module
201//!
202
203mod components;
204mod graph_builder;
205mod manager;
206mod runner;
207
208pub use components::{
209    add_prong, finalize, join, make_fork, make_task, with_task_components, FinalTag, OnCompletion,
210    TaskComponent, TaskProgress,
211};
212pub use graph_builder::{Cons, TaskFactory, TaskGraph};
213pub use manager::{build_task_manager_system, entity_is_complete};
214pub use runner::{run_tasks, task_runner_query, TaskEntityFilter, TaskQuery, TaskSystemQuery};
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219
220    use legion::prelude::*;
221
222    #[derive(Clone, Debug, Default, Eq, PartialEq)]
223    struct Noop {
224        was_run: bool,
225    }
226
227    impl<'a> TaskComponent<'a> for Noop {
228        type Data = ();
229
230        fn run(&mut self, _data: &mut Self::Data) -> bool {
231            self.was_run = true;
232
233            true
234        }
235    }
236
237    fn build_noop_task_runner_system() -> Box<dyn Schedulable> {
238        SystemBuilder::new("noop_task_runner")
239            .with_query(task_runner_query::<Noop>())
240            .build(|_, mut world, _, task_query| run_tasks(&mut world, &mut (), task_query))
241    }
242
243    #[derive(Clone, Debug)]
244    struct PushValue {
245        value: usize,
246    }
247
248    impl<'a> TaskComponent<'a> for PushValue {
249        type Data = Vec<usize>;
250
251        fn run(&mut self, data: &mut Self::Data) -> bool {
252            log::debug!("Task pushing value {}", self.value);
253            data.push(self.value);
254
255            true
256        }
257    }
258
259    fn build_push_value_task_runner_system() -> Box<dyn Schedulable> {
260        SystemBuilder::new("example_task_runner")
261            .write_resource::<Vec<usize>>()
262            .with_query(task_runner_query::<PushValue>())
263            .build(|_, mut world, value, task_query| {
264                run_tasks(&mut world, &mut **value, task_query)
265            })
266    }
267
268    fn set_up<'a, 'b>() -> (World, Resources, Schedule) {
269        let mut resources = Resources::default();
270        resources.insert::<Vec<usize>>(Vec::new());
271
272        let world = World::new();
273
274        let schedule = Schedule::builder()
275            .add_system(build_noop_task_runner_system())
276            .add_system(build_push_value_task_runner_system())
277            // For sake of reproducible tests, assume the manager system is the last to run.
278            .add_system(build_task_manager_system("task_manager"))
279            .build();
280
281        (world, resources, schedule)
282    }
283
284    fn assemble_task_graph(
285        make_task_graph: fn() -> TaskGraph,
286        on_completion: OnCompletion,
287        world: &mut World,
288        resources: &mut Resources,
289    ) -> Entity {
290        resources.insert::<Option<Entity>>(None);
291        let assemble_system = SystemBuilder::new("assembler")
292            .write_resource::<Option<Entity>>()
293            .build(move |mut cmd, _subworld, final_task, _| {
294                **final_task = Some(make_task_graph().assemble(on_completion, &mut cmd));
295            });
296        let mut assemble_schedule = Schedule::builder()
297            .add_system(assemble_system)
298            .flush()
299            .build();
300        assemble_schedule.execute(world, resources);
301
302        resources.get::<Option<Entity>>().unwrap().unwrap()
303    }
304
305    fn assert_task_is_complete(
306        task: Entity,
307        is_alive: bool,
308        world: &mut World,
309        resources: &mut Resources,
310    ) {
311        let assert_system =
312            with_task_components(SystemBuilder::new("asserter")).build(move |_, subworld, _, _| {
313                if is_alive {
314                    assert!(entity_is_complete(&subworld, task));
315                }
316                assert_eq!(subworld.is_alive(task), is_alive);
317            });
318        let mut assert_schedule = Schedule::builder().add_system(assert_system).build();
319        assert_schedule.execute(world, resources);
320    }
321
322    #[test]
323    fn run_single_task() {
324        let (mut world, mut resources, mut schedule) = set_up();
325
326        fn make_task_graph() -> TaskGraph {
327            task!(@Noop::default())
328        }
329        let root = assemble_task_graph(
330            make_task_graph,
331            OnCompletion::None,
332            &mut world,
333            &mut resources,
334        );
335
336        schedule.execute(&mut world, &mut resources);
337        schedule.execute(&mut world, &mut resources);
338
339        assert_eq!(
340            *world.get_component::<Noop>(root).unwrap(),
341            Noop { was_run: true }
342        );
343        assert_task_is_complete(root, true, &mut world, &mut resources);
344    }
345
346    #[test]
347    fn single_task_deleted_on_completion() {
348        let (mut world, mut resources, mut schedule) = set_up();
349
350        fn make_task_graph() -> TaskGraph {
351            task!(@Noop::default())
352        }
353        let root = assemble_task_graph(
354            make_task_graph,
355            OnCompletion::Delete,
356            &mut world,
357            &mut resources,
358        );
359
360        schedule.execute(&mut world, &mut resources);
361        schedule.execute(&mut world, &mut resources);
362
363        assert_task_is_complete(root, false, &mut world, &mut resources);
364    }
365
366    #[test]
367    fn joined_tasks_run_in_order_and_deleted_on_completion() {
368        let (mut world, mut resources, mut schedule) = set_up();
369
370        fn make_task_graph() -> TaskGraph {
371            seq!(
372                @PushValue { value: 1 },
373                @PushValue { value: 2 },
374                @PushValue { value: 3 }
375            )
376        }
377        let root = assemble_task_graph(
378            make_task_graph,
379            OnCompletion::Delete,
380            &mut world,
381            &mut resources,
382        );
383
384        schedule.execute(&mut world, &mut resources);
385        schedule.execute(&mut world, &mut resources);
386        schedule.execute(&mut world, &mut resources);
387        schedule.execute(&mut world, &mut resources);
388
389        assert_eq!(*resources.get::<Vec<usize>>().unwrap(), vec![1, 2, 3]);
390        assert_task_is_complete(root, false, &mut world, &mut resources);
391    }
392
393    #[test]
394    fn all_prongs_of_fork_run_before_join_and_deleted_on_completion() {
395        let (mut world, mut resources, mut schedule) = set_up();
396
397        //         ---> t1.1 ---
398        //       /               \
399        //     t2 ----> t1.2 -----> t0
400
401        fn make_task_graph() -> TaskGraph {
402            seq!(
403                @PushValue { value: 1 },
404                fork!(@PushValue { value: 2 }, @PushValue { value: 3 }),
405                @PushValue { value: 4 }
406            )
407        }
408        let root = assemble_task_graph(
409            make_task_graph,
410            OnCompletion::Delete,
411            &mut world,
412            &mut resources,
413        );
414
415        schedule.execute(&mut world, &mut resources);
416        schedule.execute(&mut world, &mut resources);
417        schedule.execute(&mut world, &mut resources);
418        schedule.execute(&mut world, &mut resources);
419
420        let pushed_values: Vec<usize> = (*resources.get::<Vec<usize>>().unwrap()).clone();
421        assert!(pushed_values == vec![1, 2, 3, 4] || pushed_values == vec![1, 3, 2, 4]);
422
423        assert_task_is_complete(root, false, &mut world, &mut resources);
424    }
425
426    #[test]
427    fn join_fork_with_nested_fork() {
428        let (mut world, mut resources, mut schedule) = set_up();
429
430        fn make_task_graph() -> TaskGraph {
431            seq!(
432                @PushValue { value: 1 },
433                fork!(
434                    @PushValue { value: 2 },
435                    fork!(@PushValue { value: 3 }, @PushValue { value: 4 })
436                ),
437                @PushValue { value: 5 }
438            )
439        }
440        let root = assemble_task_graph(
441            make_task_graph,
442            OnCompletion::Delete,
443            &mut world,
444            &mut resources,
445        );
446
447        schedule.execute(&mut world, &mut resources);
448        schedule.execute(&mut world, &mut resources);
449        schedule.execute(&mut world, &mut resources);
450        schedule.execute(&mut world, &mut resources);
451
452        let pushed_values: Vec<usize> = (*resources.get::<Vec<usize>>().unwrap()).clone();
453        assert!(
454            pushed_values == vec![1, 2, 3, 4, 5]
455                || pushed_values == vec![1, 2, 4, 3, 5]
456                || pushed_values == vec![1, 3, 2, 4, 5]
457                || pushed_values == vec![1, 3, 4, 2, 5]
458                || pushed_values == vec![1, 4, 2, 3, 5]
459                || pushed_values == vec![1, 4, 3, 2, 5]
460        );
461
462        assert_task_is_complete(root, false, &mut world, &mut resources);
463    }
464}