1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
//! A multitasking module that supports the fork-join model. Implemented on top of SPECS ECS. //! //! Here we expound on the technical details of this module's implementation. For basic usage, see //! the tests. //! //! In this model, every task is some entity. The entity is allowed to have exactly one component //! that implements `TaskComponent` (it may have other components that don't implement //! `TaskComponent`). The task will be run to completion by the corresponding `TaskRunnerSystem`. //! //! Every task entity is also a node in a (hopefully acyclic) directed graph. An edge `t2 --> t1` //! means that `t2` cannot start until `t1` has completed. //! //! In order for tasks to become unblocked, the `TaskManagerSystem` must run, whence it will //! traverse the graph, starting at the "final entities", and check for entities that have //! completed, potentially unblocking their parents. In order for a task to be run, it must be the //! descendent of a final entity. Entities become final by calling `TaskManager::finalize`. //! //! Edges can either come from `SingleEdge` or `MultiEdge` components, but you should not use these //! types directly. You might wonder why we need both. It's a fair question, because adding the //! `SingleEdge` concept does not actually make the model capable of representing any semantically //! new graphs. The reason is efficiency. //! //! If you want to implement a fork join like this: //! //!``` //! r#" ---> t1.1 --- //! / \ //! t2 ----> t1.2 -----> t0 //! \ / //! ---> t1.3 --- "#; //!``` //! //! You would actually do this by calling `TaskManager::make_fork` to create a "fork" entity called //! `F` that doesn't have a `TaskComponent`, but it has a `SingleEdge` from `t2` to `t0`, and a //! `MultiEdge` from `t2` to `{ t1.1, t1.2, t1.3 }`. Note that the children on the `MultiEdge` are //! called "prongs" of the fork. //! //!``` //! r#" t2 --> F --> t0 //! | //! | --> t1.1 //! | --> t1.2 //! | --> t1.3 "#; //!``` //! //! The semantics would be such that this graph is equivalent to the one above. Before any of the //! tasks connected to `F` by the `MultiEdge` could run, the task connected by the `SingleEdge` //! (`t0`) would have to be complete. `t2` could only run once all of the children of `F` had //! completed. //! //! The advantages of this scheme are: //! - a traversal of the graph starting from `t2` does not visit the same node twice //! - it is a bit easier to create fork-join graphs with larger numbers of concurrent tasks //! - there are fewer edges for the most common use cases //! //! Every user of this module should use it via the `TaskManager`. It will enforce certain //! invariants about the kinds of entities that can be constructed. For example, any entity with a //! `MultiEdge` component is considered a "fork entity", and it is not allowed to have a //! `TaskComponent` or a `TaskProgress`. Therefore, if you want a task to have multiple children, it //! must do so via a fork entity. //! //! These systems must be dispatched for tasks to make progress: //! - `TaskManagerSystem` //! - `TaskRunnerSystem` for every `T: TaskRunner` used //! //! This module can be dangerous when used improperly due to the dynamic nature of SPECS. Potential //! bugs not handled by this module: //! - leaked orphan entities //! - graph cycles //! - DO NOT manually touch the storages for task module components! Always go through the //! `TaskManager`. mod task_manager; mod task_runner; pub use task_manager::{ FinalTag, MultiEdge, SingleEdge, TaskManager, TaskManagerSystem, UnexpectedEntity, }; pub use task_runner::TaskRunnerSystem; use specs::prelude::*; use std::sync::atomic::{AtomicBool, Ordering}; /// An ephemeral component that needs access to `SystemData` to run some task. Will be run until /// `is_complete` by the `TaskRunnerSystem<T>`. /// /// Note: `TaskComponent::Data` isn't allowed to contain `Storage<TaskComponent>`, since the /// `TaskRunnerSystem` already uses that resource and borrows it mutably while calling /// `TaskComponent::run`. pub trait TaskComponent<'a>: Component { type Data: SystemData<'a>; /// Returns `true` iff the task is complete. fn run(&mut self, data: &mut Self::Data) -> bool; } // As long as an entity has this component, it will be considered by the `TaskRunnerSystem`. /// WARNING: only public because `TaskManager` is public. DO NOT USE. #[derive(Default)] pub struct TaskProgress { is_complete: AtomicBool, is_unblocked: bool, } impl Component for TaskProgress { type Storage = VecStorage<Self>; } impl TaskProgress { fn is_complete(&self) -> bool { self.is_complete.load(Ordering::Relaxed) } fn complete(&self) { self.is_complete.store(true, Ordering::Relaxed); } fn unblock(&mut self) { self.is_unblocked = true; } } #[cfg(test)] mod tests { use super::*; #[derive(Debug, Default, Eq, PartialEq)] struct AlreadyComplete { was_run: bool, } impl Component for AlreadyComplete { type Storage = VecStorage<Self>; } impl<'a> TaskComponent<'a> for AlreadyComplete { type Data = (); fn run(&mut self, _data: &mut Self::Data) -> bool { self.was_run = true; true } } struct WriteValue { value: usize, } impl Component for WriteValue { type Storage = VecStorage<Self>; } impl<'a> TaskComponent<'a> for WriteValue { type Data = Write<'a, usize>; fn run(&mut self, data: &mut Self::Data) -> bool { **data = self.value; true } } fn set_up<'a, 'b>() -> (World, Dispatcher<'a, 'b>) { let mut world = World::new(); let mut dispatcher = DispatcherBuilder::new() .with( TaskRunnerSystem::<AlreadyComplete>::default(), "already_complete", &[], ) .with( TaskRunnerSystem::<WriteValue>::default(), "write_value", &[], ) // For sake of reproducible tests, assume the manager system is the last to run. .with( TaskManagerSystem, "task_manager", &["already_complete", "write_value"], ) .build(); dispatcher.setup(&mut world); (world, dispatcher) } enum MakeSingleTask { Finalize(bool), DontFinalize, } fn make_single_task<'a, T: TaskComponent<'a>>( world: &mut World, task: T, option: MakeSingleTask, ) -> Entity { world.exec( |(entities, mut task_man, mut tasks): (Entities, TaskManager, WriteStorage<T>)| { let task = task_man.make_task(task, &entities, &mut tasks); if let MakeSingleTask::Finalize(delete_on_completion) = option { task_man.finalize(task, delete_on_completion); } task }, ) } fn make_fork(world: &mut World) -> Entity { world .exec(|(entities, mut task_man): (Entities, TaskManager)| task_man.make_fork(&entities)) } #[test] fn single_task_not_run_until_finalized() { let (mut world, mut dispatcher) = set_up(); let task = make_single_task( &mut world, AlreadyComplete::default(), MakeSingleTask::DontFinalize, ); // Give the task a chance to get unblocked if there was a bug. dispatcher.dispatch(&world); dispatcher.dispatch(&world); assert_eq!( world.read_storage::<AlreadyComplete>().get(task), Some(&AlreadyComplete { was_run: false }) ); world.exec(|mut task_man: TaskManager| task_man.finalize(task, false)); // Unblock the task. dispatcher.dispatch(&world); // Run the task. dispatcher.dispatch(&world); // If there was a bug that deleted our entity, this would be necessary to see it. world.maintain(); assert_eq!( world.read_storage::<AlreadyComplete>().get(task), Some(&AlreadyComplete { was_run: true }), ); } #[test] fn single_task_deleted_on_completion() { let (mut world, mut dispatcher) = set_up(); let task = make_single_task( &mut world, AlreadyComplete::default(), MakeSingleTask::Finalize(true), ); // Unblock the task. dispatcher.dispatch(&world); // Run the task, after which it should be deleted. dispatcher.dispatch(&world); // This needs to be done for the entity deletion to be visible. world.maintain(); assert_eq!(world.entities().is_alive(task), false); } #[test] fn joined_tasks_run_in_order_and_deleted_on_completion() { let (mut world, mut dispatcher) = set_up(); let task1 = make_single_task( &mut world, WriteValue { value: 1 }, MakeSingleTask::DontFinalize, ); let task2 = make_single_task( &mut world, WriteValue { value: 2 }, MakeSingleTask::DontFinalize, ); let task3 = make_single_task( &mut world, WriteValue { value: 3 }, MakeSingleTask::DontFinalize, ); world.exec(|mut task_man: TaskManager| { task_man.join(task3, task2); task_man.join(task2, task1); task_man.finalize(task3, true); }); dispatcher.dispatch(&world); dispatcher.dispatch(&world); assert_eq!(*world.fetch::<usize>(), 1); dispatcher.dispatch(&world); assert_eq!(*world.fetch::<usize>(), 2); dispatcher.dispatch(&world); assert_eq!(*world.fetch::<usize>(), 3); world.maintain(); assert_eq!(world.entities().is_alive(task1), false); assert_eq!(world.entities().is_alive(task2), false); assert_eq!(world.entities().is_alive(task3), false); } #[test] fn all_prongs_of_fork_run_before_join_and_deleted_on_completion() { let (mut world, mut dispatcher) = set_up(); // ---> t1.1 --- // / \ // t2 ----> t1.2 -----> t0 let fork = make_fork(&mut world); let initial_task = make_single_task( &mut world, WriteValue { value: 1 }, MakeSingleTask::DontFinalize, ); let prong1_task = make_single_task( &mut world, WriteValue { value: 2 }, MakeSingleTask::DontFinalize, ); let prong2_task = make_single_task( &mut world, WriteValue { value: 3 }, MakeSingleTask::DontFinalize, ); let join_task = make_single_task( &mut world, WriteValue { value: 4 }, MakeSingleTask::DontFinalize, ); world.exec(|mut task_man: TaskManager| { task_man.join(fork, initial_task); task_man.add_prong(fork, prong1_task).unwrap(); task_man.add_prong(fork, prong2_task).unwrap(); task_man.join(join_task, fork); task_man.finalize(join_task, true); }); dispatcher.dispatch(&world); dispatcher.dispatch(&world); assert_eq!(*world.fetch::<usize>(), 1); dispatcher.dispatch(&world); let cur_value = *world.fetch::<usize>(); assert!(cur_value == 2 || cur_value == 3); dispatcher.dispatch(&world); assert_eq!(*world.fetch::<usize>(), 4); world.maintain(); assert_eq!(world.entities().is_alive(initial_task), false); assert_eq!(world.entities().is_alive(prong1_task), false); assert_eq!(world.entities().is_alive(prong2_task), false); assert_eq!(world.entities().is_alive(join_task), false); } }