legion_task/lib.rs
1//! # Fork-join multitasking for Legion ECS
2//!
3//! Instead of hand-rolling state machines to sequence the effects of various ECS systems, spawn
4//! tasks as entities and declare explicit temporal dependencies between them.
5//!
6//! ## Code Example: making task graphs and dispatching task runners
7//!
8//! ```
9//! use legion::prelude::*;
10//! use legion_task::*;
11//!
12//! #[derive(Clone)]
13//! struct SaySomething(&'static str);
14//!
15//! impl<'a> TaskComponent<'a> for SaySomething {
16//! type Data = ();
17//!
18//! fn run(&mut self, data: &mut Self::Data) -> bool {
19//! println!("{}", self.0);
20//!
21//! true
22//! }
23//! }
24//!
25//! #[derive(Clone, Debug)]
26//! struct PushValue {
27//! value: usize,
28//! }
29//!
30//! impl<'a> TaskComponent<'a> for PushValue {
31//! type Data = Vec<usize>;
32//!
33//! fn run(&mut self, data: &mut Self::Data) -> bool {
34//! data.push(self.value);
35//!
36//! true
37//! }
38//! }
39//!
40//! fn make_static_task_graph(cmd: &mut CommandBuffer) {
41//! // Any component that implements TaskComponent can be spawned.
42//! let task_graph: TaskGraph = seq!(
43//! @SaySomething("hello"),
44//! fork!(
45//! @PushValue { value: 1 },
46//! @PushValue { value: 2 },
47//! @PushValue { value: 3 }
48//! ),
49//! @SaySomething("goodbye")
50//! );
51//! task_graph.assemble(OnCompletion::Delete, cmd);
52//! }
53//!
54//! fn make_dynamic_task_graph(cmd: &mut CommandBuffer) {
55//! let first: TaskGraph = task!(@SaySomething("hello"));
56//! let mut middle: TaskGraph = empty_graph!();
57//! for i in 0..10 {
58//! middle = fork!(middle, @PushValue { value: i });
59//! }
60//! let last: TaskGraph = task!(@SaySomething("goodbye"));
61//! let task_graph: TaskGraph = seq!(first, middle, last);
62//! task_graph.assemble(OnCompletion::Delete, cmd);
63//! }
64//!
65//! fn build_say_something_task_runner_system() -> Box<dyn Schedulable> {
66//! SystemBuilder::new("say_something_task_runner")
67//! .with_query(task_runner_query::<SaySomething>())
68//! .build(|_, mut world, _, task_query| {
69//! run_tasks(&mut world, &mut (), task_query)
70//! })
71//! }
72//!
73//! fn build_push_value_task_runner_system() -> Box<dyn Schedulable> {
74//! SystemBuilder::new("push_value_task_runner")
75//! .write_resource::<Vec<usize>>()
76//! .with_query(task_runner_query::<PushValue>())
77//! .build(|_, mut world, value, task_query| {
78//! run_tasks(&mut world, &mut **value, task_query)
79//! })
80//! }
81//!
82//! fn make_schedule() -> Schedule {
83//! Schedule::builder()
84//! .add_system(build_say_something_task_runner_system())
85//! .add_system(build_push_value_task_runner_system())
86//! .add_system(build_task_manager_system("task_manager"))
87//! .build()
88//! }
89//! ```
90//!
91//! ## Data Model
92//!
93//! Here we expound on the technical details of this module's implementation. For basic usage, see
94//! the tests.
95//!
96//! In this model, every task is some entity. The entity is allowed to have exactly one component
97//! that implements `TaskComponent` (it may have other components that don't implement
98//! `TaskComponent`). The task will be run to completion by running a system that calls `run_tasks`
99//! with the proper `TaskComponent::Data` and `task_query`.
100//!
101//! Every task entity is also a node in a (hopefully acyclic) directed graph. An edge `t2 --> t1`
102//! means that `t2` cannot start until `t1` has completed.
103//!
104//! In order for tasks to become unblocked, the system created with `build_task_manager_system` must
105//! run, whence it will traverse the graph, starting at the "final entities", and check for entities
106//! that have completed, potentially unblocking their parents. In order for a task to be run, it
107//! must be the descendent of a final entity. Entity component tuples become final by calling
108//! `finalize` (which adds a `FinalTag` component).
109//!
110//! Edges can either come from `SingleEdge` or `MultiEdge` components, but you should not use these
111//! types directly. You might wonder why we need both types of edges. It's a fair question, because
112//! adding the `SingleEdge` concept does not actually make the model capable of representing any
113//! semantically new graphs. The reason is efficiency.
114//!
115//! If you want to implement a fork join like this (note: time is going left to right but the
116//! directed edges are going right to left):
117//!
118//!```
119//! r#" ----- t1.1 <--- ----- t2.1 <---
120//! / \ / \
121//! t0 <------ t1.2 <----<------ t2.2 <---- t3
122//! \ / \ /
123//! ----- t1.3 <--- ----- t2.3 <--- "#;
124//!```
125//!
126//! You would actually do this by calling `make_fork` to create two "fork" entities `F1` and `F2`
127//! that don't have `TaskComponent`s, but they can have both a `SingleEdge` and a `MultiEdge`. Note
128//! that the children on the `MultiEdge` are called "prongs" of the fork.
129//!
130//!```
131//! r#" single single single
132//! t0 <-------- F1 <-------------- F2 <-------- t3
133//! | |
134//! t1.1 <---| t2.1 <--|
135//! t1.2 <---| multi t2.2 <--| multi
136//! t1.3 <---| t2.3 <--| "#;
137//!```
138//!
139//! The semantics would be such that this graph is equivalent to the one above. Before any of the
140//! tasks connected to `F2` by the `MultiEdge` could run, the tasks connected by the `SingleEdge`
141//! (`{ t0, t1.1, t1.2, t1.3 }`) would have to be complete. `t3` could only run once all of the
142//! descendents of `F2` had completed.
143//!
144//! The advantages of this scheme are:
145//! - a traversal of the graph starting from `t3` does not visit the same node twice
146//! - it is a bit easier to create fork-join graphs with larger numbers of concurrent tasks
147//! - there are fewer edges for the most common use cases
148//!
149//! Here's another example with "nested forks" to test your understanding:
150//!
151//! ```
152//! r#" With fork entities:
153//!
154//! t0 <-------------- FA <----- t2
155//! |
156//! tx <---|
157//! t1 <--- FB <---|
158//! |
159//! ty <-----|
160//! tz <-----|
161//!
162//! As time orderings:
163//!
164//! t0 < { t1, tx, ty, tz } < t2
165//! t1 < { ty, tz }
166//!
167//! Induced graph:
168//!
169//! t0 <------- tx <------- t2
170//! ^ |
171//! | /------ ty <----|
172//! | v |
173//! ----- t1 <---- tz <----- "#;
174//! ```
175//!
176//! ## Macro Usage
177//!
178//! Every user of this module should create task graphs via the `empty_graph!`, `seq!`, `fork!`, and
179//! `task!` macros, which make it easy to construct task graphs correctly. Once a graph is ready,
180//! call `assemble` on it to mark the task entities for execution (by finalizing the root of the
181//! graph).
182//!
183//! These systems must be scheduled for tasks to make progress:
184//! - a system created with `build_task_manager_system`
185//! - a system that calls `run_tasks` on each `TaskComponent` used
186//!
187//! ## Advanced Usage
188//!
189//! If you find the `TaskGraph` macros limiting, you can use the `make_task`, `join`, `make_fork`,
190//! and `add_prong` functions; these are the building blocks for creating all task graphs, including
191//! buggy ones. These functions are totally dynamic in that they deal directly with entities of
192//! various archetypes, assuming that the programmer passed in the correct archetypes for the given
193//! function.
194//!
195//! Potential bugs that won't be detected for you:
196//! - leaked orphan entities
197//! - graph cycles
198//! - finalizing an entity that has children
199//! - users manually tampering with the `TaskProgress`, `SingleEdge`, `MultiEdge`, or `FinalTag`
200//! components; these should only be used inside this module
201//!
202
203mod components;
204mod graph_builder;
205mod manager;
206mod runner;
207
208pub use components::{
209 add_prong, finalize, join, make_fork, make_task, with_task_components, FinalTag, OnCompletion,
210 TaskComponent, TaskProgress,
211};
212pub use graph_builder::{Cons, TaskFactory, TaskGraph};
213pub use manager::{build_task_manager_system, entity_is_complete};
214pub use runner::{run_tasks, task_runner_query, TaskEntityFilter, TaskQuery, TaskSystemQuery};
215
216#[cfg(test)]
217mod tests {
218 use super::*;
219
220 use legion::prelude::*;
221
222 #[derive(Clone, Debug, Default, Eq, PartialEq)]
223 struct Noop {
224 was_run: bool,
225 }
226
227 impl<'a> TaskComponent<'a> for Noop {
228 type Data = ();
229
230 fn run(&mut self, _data: &mut Self::Data) -> bool {
231 self.was_run = true;
232
233 true
234 }
235 }
236
237 fn build_noop_task_runner_system() -> Box<dyn Schedulable> {
238 SystemBuilder::new("noop_task_runner")
239 .with_query(task_runner_query::<Noop>())
240 .build(|_, mut world, _, task_query| run_tasks(&mut world, &mut (), task_query))
241 }
242
243 #[derive(Clone, Debug)]
244 struct PushValue {
245 value: usize,
246 }
247
248 impl<'a> TaskComponent<'a> for PushValue {
249 type Data = Vec<usize>;
250
251 fn run(&mut self, data: &mut Self::Data) -> bool {
252 log::debug!("Task pushing value {}", self.value);
253 data.push(self.value);
254
255 true
256 }
257 }
258
259 fn build_push_value_task_runner_system() -> Box<dyn Schedulable> {
260 SystemBuilder::new("example_task_runner")
261 .write_resource::<Vec<usize>>()
262 .with_query(task_runner_query::<PushValue>())
263 .build(|_, mut world, value, task_query| {
264 run_tasks(&mut world, &mut **value, task_query)
265 })
266 }
267
268 fn set_up<'a, 'b>() -> (World, Resources, Schedule) {
269 let mut resources = Resources::default();
270 resources.insert::<Vec<usize>>(Vec::new());
271
272 let world = World::new();
273
274 let schedule = Schedule::builder()
275 .add_system(build_noop_task_runner_system())
276 .add_system(build_push_value_task_runner_system())
277 // For sake of reproducible tests, assume the manager system is the last to run.
278 .add_system(build_task_manager_system("task_manager"))
279 .build();
280
281 (world, resources, schedule)
282 }
283
284 fn assemble_task_graph(
285 make_task_graph: fn() -> TaskGraph,
286 on_completion: OnCompletion,
287 world: &mut World,
288 resources: &mut Resources,
289 ) -> Entity {
290 resources.insert::<Option<Entity>>(None);
291 let assemble_system = SystemBuilder::new("assembler")
292 .write_resource::<Option<Entity>>()
293 .build(move |mut cmd, _subworld, final_task, _| {
294 **final_task = Some(make_task_graph().assemble(on_completion, &mut cmd));
295 });
296 let mut assemble_schedule = Schedule::builder()
297 .add_system(assemble_system)
298 .flush()
299 .build();
300 assemble_schedule.execute(world, resources);
301
302 resources.get::<Option<Entity>>().unwrap().unwrap()
303 }
304
305 fn assert_task_is_complete(
306 task: Entity,
307 is_alive: bool,
308 world: &mut World,
309 resources: &mut Resources,
310 ) {
311 let assert_system =
312 with_task_components(SystemBuilder::new("asserter")).build(move |_, subworld, _, _| {
313 if is_alive {
314 assert!(entity_is_complete(&subworld, task));
315 }
316 assert_eq!(subworld.is_alive(task), is_alive);
317 });
318 let mut assert_schedule = Schedule::builder().add_system(assert_system).build();
319 assert_schedule.execute(world, resources);
320 }
321
322 #[test]
323 fn run_single_task() {
324 let (mut world, mut resources, mut schedule) = set_up();
325
326 fn make_task_graph() -> TaskGraph {
327 task!(@Noop::default())
328 }
329 let root = assemble_task_graph(
330 make_task_graph,
331 OnCompletion::None,
332 &mut world,
333 &mut resources,
334 );
335
336 schedule.execute(&mut world, &mut resources);
337 schedule.execute(&mut world, &mut resources);
338
339 assert_eq!(
340 *world.get_component::<Noop>(root).unwrap(),
341 Noop { was_run: true }
342 );
343 assert_task_is_complete(root, true, &mut world, &mut resources);
344 }
345
346 #[test]
347 fn single_task_deleted_on_completion() {
348 let (mut world, mut resources, mut schedule) = set_up();
349
350 fn make_task_graph() -> TaskGraph {
351 task!(@Noop::default())
352 }
353 let root = assemble_task_graph(
354 make_task_graph,
355 OnCompletion::Delete,
356 &mut world,
357 &mut resources,
358 );
359
360 schedule.execute(&mut world, &mut resources);
361 schedule.execute(&mut world, &mut resources);
362
363 assert_task_is_complete(root, false, &mut world, &mut resources);
364 }
365
366 #[test]
367 fn joined_tasks_run_in_order_and_deleted_on_completion() {
368 let (mut world, mut resources, mut schedule) = set_up();
369
370 fn make_task_graph() -> TaskGraph {
371 seq!(
372 @PushValue { value: 1 },
373 @PushValue { value: 2 },
374 @PushValue { value: 3 }
375 )
376 }
377 let root = assemble_task_graph(
378 make_task_graph,
379 OnCompletion::Delete,
380 &mut world,
381 &mut resources,
382 );
383
384 schedule.execute(&mut world, &mut resources);
385 schedule.execute(&mut world, &mut resources);
386 schedule.execute(&mut world, &mut resources);
387 schedule.execute(&mut world, &mut resources);
388
389 assert_eq!(*resources.get::<Vec<usize>>().unwrap(), vec![1, 2, 3]);
390 assert_task_is_complete(root, false, &mut world, &mut resources);
391 }
392
393 #[test]
394 fn all_prongs_of_fork_run_before_join_and_deleted_on_completion() {
395 let (mut world, mut resources, mut schedule) = set_up();
396
397 // ---> t1.1 ---
398 // / \
399 // t2 ----> t1.2 -----> t0
400
401 fn make_task_graph() -> TaskGraph {
402 seq!(
403 @PushValue { value: 1 },
404 fork!(@PushValue { value: 2 }, @PushValue { value: 3 }),
405 @PushValue { value: 4 }
406 )
407 }
408 let root = assemble_task_graph(
409 make_task_graph,
410 OnCompletion::Delete,
411 &mut world,
412 &mut resources,
413 );
414
415 schedule.execute(&mut world, &mut resources);
416 schedule.execute(&mut world, &mut resources);
417 schedule.execute(&mut world, &mut resources);
418 schedule.execute(&mut world, &mut resources);
419
420 let pushed_values: Vec<usize> = (*resources.get::<Vec<usize>>().unwrap()).clone();
421 assert!(pushed_values == vec![1, 2, 3, 4] || pushed_values == vec![1, 3, 2, 4]);
422
423 assert_task_is_complete(root, false, &mut world, &mut resources);
424 }
425
426 #[test]
427 fn join_fork_with_nested_fork() {
428 let (mut world, mut resources, mut schedule) = set_up();
429
430 fn make_task_graph() -> TaskGraph {
431 seq!(
432 @PushValue { value: 1 },
433 fork!(
434 @PushValue { value: 2 },
435 fork!(@PushValue { value: 3 }, @PushValue { value: 4 })
436 ),
437 @PushValue { value: 5 }
438 )
439 }
440 let root = assemble_task_graph(
441 make_task_graph,
442 OnCompletion::Delete,
443 &mut world,
444 &mut resources,
445 );
446
447 schedule.execute(&mut world, &mut resources);
448 schedule.execute(&mut world, &mut resources);
449 schedule.execute(&mut world, &mut resources);
450 schedule.execute(&mut world, &mut resources);
451
452 let pushed_values: Vec<usize> = (*resources.get::<Vec<usize>>().unwrap()).clone();
453 assert!(
454 pushed_values == vec![1, 2, 3, 4, 5]
455 || pushed_values == vec![1, 2, 4, 3, 5]
456 || pushed_values == vec![1, 3, 2, 4, 5]
457 || pushed_values == vec![1, 3, 4, 2, 5]
458 || pushed_values == vec![1, 4, 2, 3, 5]
459 || pushed_values == vec![1, 4, 3, 2, 5]
460 );
461
462 assert_task_is_complete(root, false, &mut world, &mut resources);
463 }
464}