shred/dispatch/builder.rs
1use std::{collections::hash_map::Entry, fmt};
2
3use ahash::AHashMap as HashMap;
4
5#[cfg(feature = "parallel")]
6use crate::dispatch::dispatcher::ThreadPoolWrapper;
7use crate::{
8 dispatch::{
9 batch::BatchControllerSystem,
10 dispatcher::{SystemId, ThreadLocal},
11 stage::StagesBuilder,
12 BatchAccessor, BatchController, Dispatcher,
13 },
14 system::{RunNow, System, SystemData},
15};
16
17/// Builder for the [`Dispatcher`].
18///
19/// [`Dispatcher`]: struct.Dispatcher.html
20///
21/// ## Barriers
22///
23/// Barriers are a way of sequentializing parts of
24/// the system execution. See `add_barrier()`/`with_barrier()`.
25///
26/// ## Examples
27///
28/// This is how you create a dispatcher with
29/// a shared thread pool:
30///
31/// ```rust
32/// # #![allow(unused)]
33/// #
34/// # extern crate shred;
35/// # #[macro_use]
36/// # extern crate shred_derive;
37/// # use shred::{Dispatcher, DispatcherBuilder, Read, ResourceId, World, System, SystemData};
38/// # #[derive(Debug, Default)] struct Res;
39/// # #[derive(SystemData)] #[allow(unused)] struct Data<'a> { a: Read<'a, Res> }
40/// # struct Dummy;
41/// # impl<'a> System<'a> for Dummy {
42/// # type SystemData = Data<'a>;
43/// #
44/// # fn run(&mut self, _: Data<'a>) {}
45/// # }
46/// #
47/// # fn main() {
48/// # let system_a = Dummy;
49/// # let system_b = Dummy;
50/// # let system_c = Dummy;
51/// # let system_d = Dummy;
52/// # let system_e = Dummy;
53/// let dispatcher: Dispatcher = DispatcherBuilder::new()
54/// .with(system_a, "a", &[])
55/// .with(system_b, "b", &["a"]) // b depends on a
56/// .with(system_c, "c", &["a"]) // c also depends on a
57/// .with(system_d, "d", &[])
58/// .with(system_e, "e", &["c", "d"]) // e executes after c and d are finished
59/// .build();
60/// # }
61/// ```
62///
63/// Systems can be conditionally added by using the `add_` functions:
64///
65/// ```rust
66/// # #![allow(unused)]
67/// #
68/// # extern crate shred;
69/// # #[macro_use]
70/// # extern crate shred_derive;
71/// # use shred::{Dispatcher, DispatcherBuilder, Read, ResourceId, World, System, SystemData};
72/// # #[derive(Debug, Default)] struct Res;
73/// # #[derive(SystemData)] #[allow(unused)] struct Data<'a> { a: Read<'a, Res> }
74/// # struct Dummy;
75/// # impl<'a> System<'a> for Dummy {
76/// # type SystemData = Data<'a>;
77/// #
78/// # fn run(&mut self, _: Data<'a>) {}
79/// # }
80/// #
81/// # fn main() {
82/// # let b_enabled = true;
83/// # let system_a = Dummy;
84/// # let system_b = Dummy;
85/// let mut builder = DispatcherBuilder::new()
86/// .with(system_a, "a", &[]);
87///
88/// if b_enabled {
89/// builder.add(system_b, "b", &[]);
90/// }
91///
92/// let dispatcher = builder.build();
93/// # }
94/// ```
95#[derive(Default)]
96pub struct DispatcherBuilder<'a, 'b> {
97 current_id: usize,
98 map: HashMap<String, SystemId>,
99 pub(crate) stages_builder: StagesBuilder<'a>,
100 thread_local: ThreadLocal<'b>,
101 #[cfg(feature = "parallel")]
102 thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
103}
104
105impl<'a, 'b> DispatcherBuilder<'a, 'b> {
106 /// Creates a new `DispatcherBuilder` by using the `Default` implementation.
107 ///
108 /// The default behaviour is to create a thread pool on `finish`.
109 /// If you already have a rayon `ThreadPool`, it's highly recommended to
110 /// configure this builder to use it with `with_pool` instead.
111 pub fn new() -> Self {
112 Default::default()
113 }
114
115 /// Returns whether or not any system has been added to the builder
116 pub fn is_empty(&self) -> bool {
117 self.map.is_empty()
118 }
119
120 /// Returns the number of systems added to the builder
121 pub fn num_systems(&self) -> usize {
122 self.map.len()
123 }
124
125 /// Returns whether or not a specific system has been added to the builder
126 /// This is useful as [`add()`](struct.DispatcherBuilder.html#method.add)
127 /// will throw if a dependency does not exist So you can use this
128 /// function to check if dependencies are satisfied
129 pub fn has_system(&self, system: &str) -> bool {
130 self.map.contains_key(system)
131 }
132
133 /// Adds a new system with a given name and a list of dependencies.
134 /// Please note that the dependency should be added before
135 /// you add the depending system.
136 ///
137 /// If you want to register systems which can not be specified as
138 /// dependencies, you can use `""` as their name, which will not panic
139 /// (using another name twice will).
140 ///
141 /// Same as [`add()`](struct.DispatcherBuilder.html#method.add), but
142 /// returns `self` to enable method chaining.
143 ///
144 /// # Panics
145 ///
146 /// * if the specified dependency does not exist
147 /// * if a system with the same name was already registered.
148 pub fn with<T>(mut self, system: T, name: &str, dep: &[&str]) -> Self
149 where
150 T: for<'c> System<'c> + Send + 'a,
151 {
152 self.add(system, name, dep);
153
154 self
155 }
156
157 /// Adds a new system with a given name and a list of dependencies.
158 /// Please note that the dependency should be added before
159 /// you add the depending system.
160 ///
161 /// If you want to register systems which can not be specified as
162 /// dependencies, you can use `""` as their name, which will not panic
163 /// (using another name twice will).
164 ///
165 /// # Panics
166 ///
167 /// * if the specified dependency does not exist
168 /// * if a system with the same name was already registered.
169 pub fn add<T>(&mut self, system: T, name: &str, dep: &[&str])
170 where
171 T: for<'c> System<'c> + Send + 'a,
172 {
173 let id = self.next_id();
174
175 let dependencies = dep
176 .iter()
177 .map(|x| {
178 *self
179 .map
180 .get(*x)
181 .unwrap_or_else(|| panic!("No such system registered (\"{}\")", *x))
182 })
183 .collect();
184
185 if !name.is_empty() {
186 if let Entry::Vacant(e) = self.map.entry(name.to_owned()) {
187 e.insert(id);
188 } else {
189 panic!(
190 "Cannot insert multiple systems with the same name (\"{}\")",
191 name
192 );
193 }
194 }
195
196 self.stages_builder.insert(dependencies, id, system);
197 }
198
199 /// Returns `true` if a system with the given name has been added to the
200 /// `BispatcherBuilder`, otherwise, returns false.
201 pub fn contains(&self, name: &str) -> bool {
202 self.map.contains_key(name)
203 }
204
205 /// The `Batch` is a `System` which contains a `Dispatcher`.
206 /// By wrapping a `Dispatcher` inside a system, we can control the execution
207 /// of a whole group of system, without sacrificing parallelism or
208 /// conciseness.
209 ///
210 /// This function accepts the `DispatcherBuilder` as parameter, and the type
211 /// of the `System` that will drive the execution of the internal
212 /// dispatcher.
213 ///
214 /// Note that depending on the dependencies of the SubSystems the Batch
215 /// can run in parallel with other Systems.
216 /// In addition the Sub Systems can run in parallel within the Batch.
217 ///
218 /// The `Dispatcher` created for this `Batch` is completelly separate,
219 /// from the parent `Dispatcher`.
220 /// This mean that the dependencies, the `System` names, etc.. specified on
221 /// the `Batch` `Dispatcher` are not visible on the parent, and is not
222 /// allowed to specify cross dependencies.
223 pub fn with_batch<T>(
224 mut self,
225 controller: T,
226 dispatcher_builder: DispatcherBuilder<'a, 'b>,
227 name: &str,
228 dep: &[&str],
229 ) -> Self
230 where
231 T: for<'c> BatchController<'a, 'b, 'c> + Send + 'a,
232 'b: 'a,
233 {
234 self.add_batch::<T>(controller, dispatcher_builder, name, dep);
235
236 self
237 }
238
239 /// The `Batch` is a `System` which contains a `Dispatcher`.
240 /// By wrapping a `Dispatcher` inside a system, we can control the execution
241 /// of a whole group of system, without sacrificing parallelism or
242 /// conciseness.
243 ///
244 /// This function accepts the `DispatcherBuilder` as parameter, and the type
245 /// of the `System` that will drive the execution of the internal
246 /// dispatcher.
247 ///
248 /// Note that depending on the dependencies of the SubSystems the Batch
249 /// can run in parallel with other Systems.
250 /// In addition the Sub Systems can run in parallel within the Batch.
251 ///
252 /// The `Dispatcher` created for this `Batch` is completelly separate,
253 /// from the parent `Dispatcher`.
254 /// This mean that the dependencies, the `System` names, etc.. specified on
255 /// the `Batch` `Dispatcher` are not visible on the parent, and is not
256 /// allowed to specify cross dependencies.
257 pub fn add_batch<T>(
258 &mut self,
259 controller: T,
260 mut dispatcher_builder: DispatcherBuilder<'a, 'b>,
261 name: &str,
262 dep: &[&str],
263 ) where
264 T: for<'c> BatchController<'a, 'b, 'c> + Send + 'a,
265 'b: 'a,
266 {
267 #[cfg(feature = "parallel")]
268 {
269 dispatcher_builder.thread_pool = self.thread_pool.clone();
270 }
271
272 let mut reads = dispatcher_builder.stages_builder.fetch_all_reads();
273 reads.extend(<T::BatchSystemData as SystemData>::reads());
274 reads.sort();
275 reads.dedup();
276
277 let mut writes = dispatcher_builder.stages_builder.fetch_all_writes();
278 writes.extend(<T::BatchSystemData as SystemData>::writes());
279 writes.sort();
280 writes.dedup();
281
282 let accessor = BatchAccessor::new(reads, writes);
283 let dispatcher: Dispatcher<'a, 'b> = dispatcher_builder.build();
284
285 let batch_system =
286 unsafe { BatchControllerSystem::<'a, 'b, T>::create(accessor, controller, dispatcher) };
287
288 self.add(batch_system, name, dep);
289 }
290
291 /// Adds a new thread local system.
292 ///
293 /// Please only use this if your struct is not `Send` and `Sync`.
294 ///
295 /// Thread-local systems are dispatched in-order.
296 ///
297 /// Same as [DispatcherBuilder::add_thread_local], but returns `self` to
298 /// enable method chaining.
299 pub fn with_thread_local<T>(mut self, system: T) -> Self
300 where
301 T: for<'c> RunNow<'c> + 'b,
302 {
303 self.add_thread_local(system);
304
305 self
306 }
307
308 /// Adds a new thread local system.
309 ///
310 /// Please only use this if your struct is not `Send` and `Sync`.
311 ///
312 /// Thread-local systems are dispatched in-order.
313 pub fn add_thread_local<T>(&mut self, system: T)
314 where
315 T: for<'c> RunNow<'c> + 'b,
316 {
317 self.thread_local.push(Box::new(system));
318 }
319
320 /// Inserts a barrier which assures that all systems
321 /// added before the barrier are executed before the ones
322 /// after this barrier.
323 ///
324 /// Does nothing if there were no systems added
325 /// since the last call to `add_barrier()`/`with_barrier()`.
326 ///
327 /// Thread-local systems are not affected by barriers;
328 /// they're always executed at the end.
329 ///
330 /// Same as [DispatcherBuilder::add_barrier], but returns `self` to enable
331 /// method chaining.
332 pub fn with_barrier(mut self) -> Self {
333 self.add_barrier();
334
335 self
336 }
337
338 /// Inserts a barrier which assures that all systems
339 /// added before the barrier are executed before the ones
340 /// after this barrier.
341 ///
342 /// Does nothing if there were no systems added
343 /// since the last call to `add_barrier()`/`with_barrier()`.
344 ///
345 /// Thread-local systems are not affected by barriers;
346 /// they're always executed at the end.
347 pub fn add_barrier(&mut self) {
348 self.stages_builder.add_barrier();
349 }
350
351 /// Attach a rayon thread pool to the builder
352 /// and use that instead of creating one.
353 ///
354 /// Same as
355 /// [`add_pool()`](struct.DispatcherBuilder.html#method.add_pool),
356 /// but returns `self` to enable method chaining.
357 #[cfg(feature = "parallel")]
358 pub fn with_pool(mut self, pool: ::std::sync::Arc<::rayon::ThreadPool>) -> Self {
359 self.add_pool(pool);
360
361 self
362 }
363
364 /// Attach a rayon thread pool to the builder
365 /// and use that instead of creating one.
366 #[cfg(feature = "parallel")]
367 pub fn add_pool(&mut self, pool: ::std::sync::Arc<::rayon::ThreadPool>) {
368 *self.thread_pool.write().unwrap() = Some(pool);
369 }
370
371 /// Prints the equivalent system graph
372 /// that can be easily used to get the graph using the `seq!` and `par!`
373 /// macros. This is only recommended for advanced users.
374 pub fn print_par_seq(&self) {
375 println!("{:#?}", self);
376 }
377
378 /// Builds the `Dispatcher`.
379 ///
380 /// In the future, this method will
381 /// precompute useful information in
382 /// order to speed up dispatching.
383 pub fn build(self) -> Dispatcher<'a, 'b> {
384 use crate::dispatch::dispatcher::new_dispatcher;
385
386 #[cfg(feature = "parallel")]
387 self.thread_pool
388 .write()
389 .unwrap()
390 .get_or_insert_with(Self::create_thread_pool);
391
392 #[cfg(feature = "parallel")]
393 let d = new_dispatcher(
394 self.stages_builder.build(),
395 self.thread_local,
396 self.thread_pool,
397 );
398
399 #[cfg(not(feature = "parallel"))]
400 let d = new_dispatcher(self.stages_builder.build(), self.thread_local);
401
402 d
403 }
404
405 fn next_id(&mut self) -> SystemId {
406 let id = self.current_id;
407 self.current_id += 1;
408
409 SystemId(id)
410 }
411
412 #[cfg(feature = "parallel")]
413 fn create_thread_pool() -> ::std::sync::Arc<::rayon::ThreadPool> {
414 use rayon::ThreadPoolBuilder;
415 use std::sync::Arc;
416
417 Arc::new(
418 ThreadPoolBuilder::new()
419 .build()
420 .expect("Invalid configuration"),
421 )
422 }
423}
424
425#[cfg(feature = "parallel")]
426impl<'b> DispatcherBuilder<'static, 'b> {
427 /// Builds an async dispatcher.
428 ///
429 /// It does not allow non-static types and accepts a `World` struct or a
430 /// value that can be borrowed as `World`.
431 pub fn build_async<R>(
432 self,
433 world: R,
434 ) -> crate::dispatch::async_dispatcher::AsyncDispatcher<'b, R> {
435 use crate::dispatch::async_dispatcher::new_async;
436
437 self.thread_pool
438 .write()
439 .unwrap()
440 .get_or_insert_with(Self::create_thread_pool);
441
442 new_async(
443 world,
444 self.stages_builder.build(),
445 self.thread_local,
446 self.thread_pool,
447 )
448 }
449}
450
451impl<'a, 'b> fmt::Debug for DispatcherBuilder<'a, 'b> {
452 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
453 self.stages_builder.write_par_seq(f, &self.map)
454 }
455}