specs_task/lib.rs
1//! # Fork-join multitasking for SPECS ECS
2//!
3//! Instead of hand-rolling state machines to sequence the effects of various ECS systems, spawn
4//! tasks as entities and declare explicit temporal dependencies between them.
5//!
6//! ## Code Examples
7//!
8//! ### Making task graphs
9//!
10//! ```compile_fail
11//! fn make_static_task_graph(user: &TaskUser) {
12//! // Any component that implements TaskComponent can be spawned.
13//! let task_graph: TaskGraph = seq!(
14//! @TaskFoo("hello"),
15//! fork!(
16//! @TaskBar { value: 1 },
17//! @TaskBar { value: 2 },
18//! @TaskBar { value: 3 }
19//! ),
20//! @TaskZing("goodbye")
21//! );
22//! task_graph.assemble(user, OnCompletion::Delete);
23//! }
24//!
25//! fn make_dynamic_task_graph(user: &TaskUser) {
26//! let first: TaskGraph = task!(@TaskFoo("hello"));
27//! let mut middle: TaskGraph = empty_graph!();
28//! for i in 0..10 {
29//! middle = fork!(middle, @TaskBar { value: i });
30//! }
31//! let last: TaskGraph = task!(@TaskZing("goodbye"));
32//! let task_graph: TaskGraph = seq!(first, middle, last);
33//! task_graph.assemble(user, OnCompletion::Delete);
34//! }
35//! ```
36//!
37//! ### Building a dispatcher with a `TaskRunnerSystem`
38//!
39//! ```compile_fail
40//! #[derive(Clone, Debug)]
41//! struct PushValue {
42//! value: usize,
43//! }
44//!
45//! impl Component for PushValue {
46//! type Storage = VecStorage<Self>;
47//! }
48//!
49//! impl<'a> TaskComponent<'a> for PushValue {
50//! type Data = Write<'a, Vec<usize>>;
51//!
52//! fn run(&mut self, data: &mut Self::Data) -> bool {
53//! data.push(self.value);
54//!
55//! true
56//! }
57//! }
58//!
59//! fn make_dispatcher() -> Dispatcher {
60//! DispatcherBuilder::new()
61//! .with(
62//! TaskRunnerSystem::<PushValue>::default(),
63//! "push_value",
64//! &[],
65//! )
66//! .with(
67//! TaskManagerSystem,
68//! "task_manager",
69//! &[],
70//! )
71//! .build()
72//! }
73//! ```
74//!
75//! ## Data Model
76//!
77//! Here we expound on the technical details of this module's implementation. For basic usage, see
78//! the tests.
79//!
80//! In this model, every task is some entity. The entity is allowed to have exactly one component
81//! that implements `TaskComponent` (it may have other components that don't implement
82//! `TaskComponent`). The task will be run to completion by the corresponding `TaskRunnerSystem`.
83//!
84//! Every task entity is also a node in a (hopefully acyclic) directed graph. An edge `t2 --> t1`
85//! means that `t2` cannot start until `t1` has completed.
86//!
87//! In order for tasks to become unblocked, the `TaskManagerSystem` must run, whence it will
88//! traverse the graph, starting at the "final entities", and check for entities that have
89//! completed, potentially unblocking their parents. In order for a task to be run, it must be the
90//! descendent of a final entity. Entity component tuples become final by calling `finalize` (which
91//! adds a `FinalTag` component).
92//!
93//! Edges can either come from `SingleEdge` or `MultiEdge` components, but you should not use these
94//! types directly. You might wonder why we need both types of edges. It's a fair question, because
95//! adding the `SingleEdge` concept does not actually make the model capable of representing any
96//! semantically new graphs. The reason is efficiency.
97//!
98//! If you want to implement a fork join like this (note: time is going left to right but the
99//! directed edges are going right to left):
100//!
101//!```
102//! r#" ----- t1.1 <--- ----- t2.1 <---
103//! / \ / \
104//! t0 <------ t1.2 <----<------ t2.2 <---- t3
105//! \ / \ /
106//! ----- t1.3 <--- ----- t2.3 <--- "#;
107//!```
108//!
109//! You would actually do this by calling `make_fork` to create two "fork" entities `F1` and `F2`
110//! that don't have `TaskComponent`s, but they can have both a `SingleEdge` and a `MultiEdge`. Note
111//! that the children on the `MultiEdge` are called "prongs" of the fork.
112//!
113//!```
114//! r#" single single single
115//! t0 <-------- F1 <-------------- F2 <-------- t3
116//! | |
117//! t1.1 <---| t2.1 <--|
118//! t1.2 <---| multi t2.2 <--| multi
119//! t1.3 <---| t2.3 <--| "#;
120//!```
121//!
122//! The semantics would be such that this graph is equivalent to the one above. Before any of the
123//! tasks connected to `F2` by the `MultiEdge` could run, the tasks connected by the `SingleEdge`
124//! (`{ t0, t1.1, t1.2, t1.3 }`) would have to be complete. `t3` could only run once all of the
125//! descendents of `F2` had completed.
126//!
127//! The advantages of this scheme are:
128//! - a traversal of the graph starting from `t3` does not visit the same node twice
129//! - it is a bit easier to create fork-join graphs with larger numbers of concurrent tasks
130//! - there are fewer edges for the most common use cases
131//!
132//! Here's another example with "nested forks" to test your understanding:
133//!
134//! ```
135//! r#" With fork entities:
136//!
137//! t0 <-------------- FA <----- t2
138//! |
139//! tx <---|
140//! t1 <--- FB <---|
141//! |
142//! ty <-----|
143//! tz <-----|
144//!
145//! As time orderings:
146//!
147//! t0 < { t1, tx, ty, tz } < t2
148//! t1 < { ty, tz }
149//!
150//! Induced graph:
151//!
152//! t0 <------- tx <------- t2
153//! ^ |
154//! | /------ ty <----|
155//! | v |
156//! ----- t1 <---- tz <----- "#;
157//! ```
158//!
159//! ## Macro Usage
160//!
161//! Every user of this module should create task graphs via the `empty_graph!`, `seq!`, `fork!`, and
162//! `task!` macros, which make it easy to construct task graphs correctly. Once a graph is ready,
163//! call `assemble` on it to mark the task entities for execution.
164//!
165//! These systems must be scheduled for tasks to make progress:
166//! - `TaskManagerSystem`
167//! - `TaskRunnerSystem`
168//!
169//! ## Advanced Usage
170//!
171//! If you find the `TaskGraph` macros limiting, you can use the `TaskUser` methods; these are the
172//! building blocks for creating all task graphs, including buggy ones. These functions are totally
173//! dynamic in that they deal directly with entities of various archetypes, assuming that the
174//! programmer passed in the correct archetypes for the given function.
175//!
176//! Potential bugs that won't be detected for you:
177//! - leaked orphan entities
178//! - graph cycles
179//! - finalizing an entity that has children
180//! - users manually tampering with the `TaskProgress`, `SingleEdge`, `MultiEdge`, or `FinalTag`
181//! components; these should only be used inside this module
182//!
183
184mod components;
185mod cons;
186mod manager;
187mod runner;
188mod user;
189mod writer;
190
191pub use components::{
192 FinalTag, MultiEdge, OnCompletion, SingleEdge, TaskComponent, TaskProgress,
193};
194pub use cons::{Cons, TaskFactory, TaskGraph};
195pub use user::TaskUser;
196pub use manager::TaskManagerSystem;
197pub use runner::TaskRunnerSystem;
198pub use writer::TaskWriter;
199
200use specs::prelude::*;
201
202#[derive(SystemData)]
203pub struct TaskData<'a, P, S, M, F> {
204 entities: Entities<'a>,
205 lazy: Read<'a, LazyUpdate>,
206 progress: P,
207 single_edges: S,
208 multi_edges: M,
209 final_tags: F,
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215
216 #[derive(Clone, Debug, Default, Eq, PartialEq)]
217 struct AlreadyComplete {
218 was_run: bool,
219 }
220
221 impl Component for AlreadyComplete {
222 type Storage = VecStorage<Self>;
223 }
224
225 impl<'a> TaskComponent<'a> for AlreadyComplete {
226 type Data = ();
227
228 fn run(&mut self, _data: &mut Self::Data) -> bool {
229 self.was_run = true;
230
231 true
232 }
233 }
234
235 #[derive(Clone)]
236 struct WriteValue {
237 value: usize,
238 }
239
240 impl Component for WriteValue {
241 type Storage = VecStorage<Self>;
242 }
243
244 impl<'a> TaskComponent<'a> for WriteValue {
245 type Data = Write<'a, usize>;
246
247 fn run(&mut self, data: &mut Self::Data) -> bool {
248 **data = self.value;
249
250 true
251 }
252 }
253
254 fn set_up<'a, 'b>() -> (World, Dispatcher<'a, 'b>) {
255 let mut world = World::new();
256 let mut dispatcher = DispatcherBuilder::new()
257 .with(
258 TaskRunnerSystem::<AlreadyComplete>::default(),
259 "already_complete",
260 &[],
261 )
262 .with(
263 TaskRunnerSystem::<WriteValue>::default(),
264 "write_value",
265 &[],
266 )
267 // For sake of reproducible tests, assume the manager system is the last to run.
268 .with(
269 TaskManagerSystem,
270 "task_manager",
271 &["already_complete", "write_value"],
272 )
273 .build();
274 dispatcher.setup(&mut world);
275
276 (world, dispatcher)
277 }
278
279 enum MakeSingleTask {
280 Finalize(OnCompletion),
281 DontFinalize,
282 }
283
284 fn make_single_task<'a, T: TaskComponent<'a> + Send + Sync>(
285 world: &mut World,
286 task: T,
287 option: MakeSingleTask,
288 ) -> Entity {
289 let entity = world.exec(|user: TaskUser| {
290 let task = user.make_task_lazy(task);
291 if let MakeSingleTask::Finalize(on_completion) = option {
292 user.finalize_lazy(task, on_completion);
293 }
294
295 task
296 });
297 world.maintain();
298
299 entity
300 }
301
302 fn entity_is_complete(world: &mut World, entity: Entity) -> bool {
303 world.exec(|user: TaskUser| user.entity_is_complete(entity))
304 }
305
306 #[test]
307 fn single_task_not_run_until_finalized() {
308 let (mut world, mut dispatcher) = set_up();
309
310 let task = make_single_task(
311 &mut world,
312 AlreadyComplete::default(),
313 MakeSingleTask::DontFinalize,
314 );
315
316 // Give the task a chance to get unblocked if there was a bug.
317 dispatcher.dispatch(&world);
318 dispatcher.dispatch(&world);
319
320 assert_eq!(
321 world.read_storage::<AlreadyComplete>().get(task),
322 Some(&AlreadyComplete { was_run: false })
323 );
324
325 world.exec(|user: TaskUser| user.finalize_lazy(task, OnCompletion::None));
326 world.maintain();
327
328 // Unblock the task.
329 dispatcher.dispatch(&world);
330 // Run the task.
331 dispatcher.dispatch(&world);
332 // If there was a bug that deleted our entity, this would be necessary to see it.
333 world.maintain();
334
335 assert!(entity_is_complete(&mut world, task));
336 assert_eq!(
337 world.read_storage::<AlreadyComplete>().get(task),
338 Some(&AlreadyComplete { was_run: true }),
339 );
340 }
341
342 #[test]
343 fn single_task_deleted_on_completion() {
344 let (mut world, mut dispatcher) = set_up();
345
346 let task = make_single_task(
347 &mut world,
348 AlreadyComplete::default(),
349 MakeSingleTask::Finalize(OnCompletion::Delete),
350 );
351
352 // Unblock the task.
353 dispatcher.dispatch(&world);
354 // Run the task, after which it should be deleted.
355 dispatcher.dispatch(&world);
356 // This needs to be done for the entity deletion to be visible.
357 world.maintain();
358
359 assert_eq!(world.entities().is_alive(task), false);
360 }
361
362 #[test]
363 fn joined_tasks_run_in_order_and_deleted_on_completion() {
364 let (mut world, mut dispatcher) = set_up();
365
366 let root = world.exec(|user: TaskUser| {
367 let task_graph: TaskGraph = seq!(
368 @WriteValue { value: 1 },
369 @WriteValue { value: 2 },
370 @WriteValue { value: 3 }
371 );
372
373 task_graph.assemble(OnCompletion::Delete, &user)
374 });
375 world.maintain();
376
377 dispatcher.dispatch(&world);
378 dispatcher.dispatch(&world);
379 assert_eq!(*world.fetch::<usize>(), 1);
380 dispatcher.dispatch(&world);
381 assert_eq!(*world.fetch::<usize>(), 2);
382 dispatcher.dispatch(&world);
383 assert_eq!(*world.fetch::<usize>(), 3);
384
385 world.maintain();
386 assert_eq!(world.entities().is_alive(root.unwrap()), false);
387 }
388
389 #[test]
390 fn all_prongs_of_fork_run_before_join_and_deleted_on_completion() {
391 let (mut world, mut dispatcher) = set_up();
392
393 // ---> t1.1 ---
394 // / \
395 // t2 ----> t1.2 -----> t0
396
397 let root = world.exec(|user: TaskUser| {
398 let task_graph: TaskGraph = seq!(
399 @WriteValue { value: 1 },
400 fork!(
401 @WriteValue { value: 2 },
402 @WriteValue { value: 3 }
403 ),
404 @WriteValue { value: 4 }
405 );
406
407 task_graph.assemble(OnCompletion::Delete, &user)
408 });
409 world.maintain();
410
411 dispatcher.dispatch(&world);
412 dispatcher.dispatch(&world);
413 assert_eq!(*world.fetch::<usize>(), 1);
414 dispatcher.dispatch(&world);
415 let cur_value = *world.fetch::<usize>();
416 assert!(cur_value == 2 || cur_value == 3);
417 dispatcher.dispatch(&world);
418 assert_eq!(*world.fetch::<usize>(), 4);
419
420 world.maintain();
421 assert_eq!(world.entities().is_alive(root.unwrap()), false);
422 }
423}