shred/dispatch/
builder.rs

1use std::{collections::hash_map::Entry, fmt};
2
3use ahash::AHashMap as HashMap;
4
5#[cfg(feature = "parallel")]
6use crate::dispatch::dispatcher::ThreadPoolWrapper;
7use crate::{
8    dispatch::{
9        batch::BatchControllerSystem,
10        dispatcher::{SystemId, ThreadLocal},
11        stage::StagesBuilder,
12        BatchAccessor, BatchController, Dispatcher,
13    },
14    system::{RunNow, System, SystemData},
15};
16
17/// Builder for the [`Dispatcher`].
18///
19/// [`Dispatcher`]: struct.Dispatcher.html
20///
21/// ## Barriers
22///
23/// Barriers are a way of sequentializing parts of
24/// the system execution. See `add_barrier()`/`with_barrier()`.
25///
26/// ## Examples
27///
28/// This is how you create a dispatcher with
29/// a shared thread pool:
30///
31/// ```rust
32/// # #![allow(unused)]
33/// #
34/// # extern crate shred;
35/// # #[macro_use]
36/// # extern crate shred_derive;
37/// # use shred::{Dispatcher, DispatcherBuilder, Read, ResourceId, World, System, SystemData};
38/// # #[derive(Debug, Default)] struct Res;
39/// # #[derive(SystemData)] #[allow(unused)] struct Data<'a> { a: Read<'a, Res> }
40/// # struct Dummy;
41/// # impl<'a> System<'a> for Dummy {
42/// #   type SystemData = Data<'a>;
43/// #
44/// #   fn run(&mut self, _: Data<'a>) {}
45/// # }
46/// #
47/// # fn main() {
48/// # let system_a = Dummy;
49/// # let system_b = Dummy;
50/// # let system_c = Dummy;
51/// # let system_d = Dummy;
52/// # let system_e = Dummy;
53/// let dispatcher: Dispatcher = DispatcherBuilder::new()
54///     .with(system_a, "a", &[])
55///     .with(system_b, "b", &["a"]) // b depends on a
56///     .with(system_c, "c", &["a"]) // c also depends on a
57///     .with(system_d, "d", &[])
58///     .with(system_e, "e", &["c", "d"]) // e executes after c and d are finished
59///     .build();
60/// # }
61/// ```
62///
63/// Systems can be conditionally added by using the `add_` functions:
64///
65/// ```rust
66/// # #![allow(unused)]
67/// #
68/// # extern crate shred;
69/// # #[macro_use]
70/// # extern crate shred_derive;
71/// # use shred::{Dispatcher, DispatcherBuilder, Read, ResourceId, World, System, SystemData};
72/// # #[derive(Debug, Default)] struct Res;
73/// # #[derive(SystemData)] #[allow(unused)] struct Data<'a> { a: Read<'a, Res> }
74/// # struct Dummy;
75/// # impl<'a> System<'a> for Dummy {
76/// #   type SystemData = Data<'a>;
77/// #
78/// #   fn run(&mut self, _: Data<'a>) {}
79/// # }
80/// #
81/// # fn main() {
82/// # let b_enabled = true;
83/// # let system_a = Dummy;
84/// # let system_b = Dummy;
85/// let mut builder = DispatcherBuilder::new()
86///     .with(system_a, "a", &[]);
87///
88/// if b_enabled {
89///    builder.add(system_b, "b", &[]);
90/// }
91///
92/// let dispatcher = builder.build();
93/// # }
94/// ```
95#[derive(Default)]
96pub struct DispatcherBuilder<'a, 'b> {
97    current_id: usize,
98    map: HashMap<String, SystemId>,
99    pub(crate) stages_builder: StagesBuilder<'a>,
100    thread_local: ThreadLocal<'b>,
101    #[cfg(feature = "parallel")]
102    thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
103}
104
105impl<'a, 'b> DispatcherBuilder<'a, 'b> {
106    /// Creates a new `DispatcherBuilder` by using the `Default` implementation.
107    ///
108    /// The default behaviour is to create a thread pool on `finish`.
109    /// If you already have a rayon `ThreadPool`, it's highly recommended to
110    /// configure this builder to use it with `with_pool` instead.
111    pub fn new() -> Self {
112        Default::default()
113    }
114
115    /// Returns whether or not any system has been added to the builder
116    pub fn is_empty(&self) -> bool {
117        self.map.is_empty()
118    }
119
120    /// Returns the number of systems added to the builder
121    pub fn num_systems(&self) -> usize {
122        self.map.len()
123    }
124
125    /// Returns whether or not a specific system has been added to the builder
126    /// This is useful as [`add()`](struct.DispatcherBuilder.html#method.add)
127    /// will throw if a dependency does not exist So you can use this
128    /// function to check if dependencies are satisfied
129    pub fn has_system(&self, system: &str) -> bool {
130        self.map.contains_key(system)
131    }
132
133    /// Adds a new system with a given name and a list of dependencies.
134    /// Please note that the dependency should be added before
135    /// you add the depending system.
136    ///
137    /// If you want to register systems which can not be specified as
138    /// dependencies, you can use `""` as their name, which will not panic
139    /// (using another name twice will).
140    ///
141    /// Same as [`add()`](struct.DispatcherBuilder.html#method.add), but
142    /// returns `self` to enable method chaining.
143    ///
144    /// # Panics
145    ///
146    /// * if the specified dependency does not exist
147    /// * if a system with the same name was already registered.
148    pub fn with<T>(mut self, system: T, name: &str, dep: &[&str]) -> Self
149    where
150        T: for<'c> System<'c> + Send + 'a,
151    {
152        self.add(system, name, dep);
153
154        self
155    }
156
157    /// Adds a new system with a given name and a list of dependencies.
158    /// Please note that the dependency should be added before
159    /// you add the depending system.
160    ///
161    /// If you want to register systems which can not be specified as
162    /// dependencies, you can use `""` as their name, which will not panic
163    /// (using another name twice will).
164    ///
165    /// # Panics
166    ///
167    /// * if the specified dependency does not exist
168    /// * if a system with the same name was already registered.
169    pub fn add<T>(&mut self, system: T, name: &str, dep: &[&str])
170    where
171        T: for<'c> System<'c> + Send + 'a,
172    {
173        let id = self.next_id();
174
175        let dependencies = dep
176            .iter()
177            .map(|x| {
178                *self
179                    .map
180                    .get(*x)
181                    .unwrap_or_else(|| panic!("No such system registered (\"{}\")", *x))
182            })
183            .collect();
184
185        if !name.is_empty() {
186            if let Entry::Vacant(e) = self.map.entry(name.to_owned()) {
187                e.insert(id);
188            } else {
189                panic!(
190                    "Cannot insert multiple systems with the same name (\"{}\")",
191                    name
192                );
193            }
194        }
195
196        self.stages_builder.insert(dependencies, id, system);
197    }
198
199    /// Returns `true` if a system with the given name has been added to the
200    /// `BispatcherBuilder`, otherwise, returns false.
201    pub fn contains(&self, name: &str) -> bool {
202        self.map.contains_key(name)
203    }
204
205    /// The `Batch` is a `System` which contains a `Dispatcher`.
206    /// By wrapping a `Dispatcher` inside a system, we can control the execution
207    /// of a whole group of system, without sacrificing parallelism or
208    /// conciseness.
209    ///
210    /// This function accepts the `DispatcherBuilder` as parameter, and the type
211    /// of the `System` that will drive the execution of the internal
212    /// dispatcher.
213    ///
214    /// Note that depending on the dependencies of the SubSystems the Batch
215    /// can run in parallel with other Systems.
216    /// In addition the Sub Systems can run in parallel within the Batch.
217    ///
218    /// The `Dispatcher` created for this `Batch` is completelly separate,
219    /// from the parent `Dispatcher`.
220    /// This mean that the dependencies, the `System` names, etc.. specified on
221    /// the `Batch` `Dispatcher` are not visible on the parent, and is not
222    /// allowed to specify cross dependencies.
223    pub fn with_batch<T>(
224        mut self,
225        controller: T,
226        dispatcher_builder: DispatcherBuilder<'a, 'b>,
227        name: &str,
228        dep: &[&str],
229    ) -> Self
230    where
231        T: for<'c> BatchController<'a, 'b, 'c> + Send + 'a,
232        'b: 'a,
233    {
234        self.add_batch::<T>(controller, dispatcher_builder, name, dep);
235
236        self
237    }
238
239    /// The `Batch` is a `System` which contains a `Dispatcher`.
240    /// By wrapping a `Dispatcher` inside a system, we can control the execution
241    /// of a whole group of system, without sacrificing parallelism or
242    /// conciseness.
243    ///
244    /// This function accepts the `DispatcherBuilder` as parameter, and the type
245    /// of the `System` that will drive the execution of the internal
246    /// dispatcher.
247    ///
248    /// Note that depending on the dependencies of the SubSystems the Batch
249    /// can run in parallel with other Systems.
250    /// In addition the Sub Systems can run in parallel within the Batch.
251    ///
252    /// The `Dispatcher` created for this `Batch` is completelly separate,
253    /// from the parent `Dispatcher`.
254    /// This mean that the dependencies, the `System` names, etc.. specified on
255    /// the `Batch` `Dispatcher` are not visible on the parent, and is not
256    /// allowed to specify cross dependencies.
257    pub fn add_batch<T>(
258        &mut self,
259        controller: T,
260        mut dispatcher_builder: DispatcherBuilder<'a, 'b>,
261        name: &str,
262        dep: &[&str],
263    ) where
264        T: for<'c> BatchController<'a, 'b, 'c> + Send + 'a,
265        'b: 'a,
266    {
267        #[cfg(feature = "parallel")]
268        {
269            dispatcher_builder.thread_pool = self.thread_pool.clone();
270        }
271
272        let mut reads = dispatcher_builder.stages_builder.fetch_all_reads();
273        reads.extend(<T::BatchSystemData as SystemData>::reads());
274        reads.sort();
275        reads.dedup();
276
277        let mut writes = dispatcher_builder.stages_builder.fetch_all_writes();
278        writes.extend(<T::BatchSystemData as SystemData>::writes());
279        writes.sort();
280        writes.dedup();
281
282        let accessor = BatchAccessor::new(reads, writes);
283        let dispatcher: Dispatcher<'a, 'b> = dispatcher_builder.build();
284
285        let batch_system =
286            unsafe { BatchControllerSystem::<'a, 'b, T>::create(accessor, controller, dispatcher) };
287
288        self.add(batch_system, name, dep);
289    }
290
291    /// Adds a new thread local system.
292    ///
293    /// Please only use this if your struct is not `Send` and `Sync`.
294    ///
295    /// Thread-local systems are dispatched in-order.
296    ///
297    /// Same as [DispatcherBuilder::add_thread_local], but returns `self` to
298    /// enable method chaining.
299    pub fn with_thread_local<T>(mut self, system: T) -> Self
300    where
301        T: for<'c> RunNow<'c> + 'b,
302    {
303        self.add_thread_local(system);
304
305        self
306    }
307
308    /// Adds a new thread local system.
309    ///
310    /// Please only use this if your struct is not `Send` and `Sync`.
311    ///
312    /// Thread-local systems are dispatched in-order.
313    pub fn add_thread_local<T>(&mut self, system: T)
314    where
315        T: for<'c> RunNow<'c> + 'b,
316    {
317        self.thread_local.push(Box::new(system));
318    }
319
320    /// Inserts a barrier which assures that all systems
321    /// added before the barrier are executed before the ones
322    /// after this barrier.
323    ///
324    /// Does nothing if there were no systems added
325    /// since the last call to `add_barrier()`/`with_barrier()`.
326    ///
327    /// Thread-local systems are not affected by barriers;
328    /// they're always executed at the end.
329    ///
330    /// Same as [DispatcherBuilder::add_barrier], but returns `self` to enable
331    /// method chaining.
332    pub fn with_barrier(mut self) -> Self {
333        self.add_barrier();
334
335        self
336    }
337
338    /// Inserts a barrier which assures that all systems
339    /// added before the barrier are executed before the ones
340    /// after this barrier.
341    ///
342    /// Does nothing if there were no systems added
343    /// since the last call to `add_barrier()`/`with_barrier()`.
344    ///
345    /// Thread-local systems are not affected by barriers;
346    /// they're always executed at the end.
347    pub fn add_barrier(&mut self) {
348        self.stages_builder.add_barrier();
349    }
350
351    /// Attach a rayon thread pool to the builder
352    /// and use that instead of creating one.
353    ///
354    /// Same as
355    /// [`add_pool()`](struct.DispatcherBuilder.html#method.add_pool),
356    /// but returns `self` to enable method chaining.
357    #[cfg(feature = "parallel")]
358    pub fn with_pool(mut self, pool: ::std::sync::Arc<::rayon::ThreadPool>) -> Self {
359        self.add_pool(pool);
360
361        self
362    }
363
364    /// Attach a rayon thread pool to the builder
365    /// and use that instead of creating one.
366    #[cfg(feature = "parallel")]
367    pub fn add_pool(&mut self, pool: ::std::sync::Arc<::rayon::ThreadPool>) {
368        *self.thread_pool.write().unwrap() = Some(pool);
369    }
370
371    /// Prints the equivalent system graph
372    /// that can be easily used to get the graph using the `seq!` and `par!`
373    /// macros. This is only recommended for advanced users.
374    pub fn print_par_seq(&self) {
375        println!("{:#?}", self);
376    }
377
378    /// Builds the `Dispatcher`.
379    ///
380    /// In the future, this method will
381    /// precompute useful information in
382    /// order to speed up dispatching.
383    pub fn build(self) -> Dispatcher<'a, 'b> {
384        use crate::dispatch::dispatcher::new_dispatcher;
385
386        #[cfg(feature = "parallel")]
387        self.thread_pool
388            .write()
389            .unwrap()
390            .get_or_insert_with(Self::create_thread_pool);
391
392        #[cfg(feature = "parallel")]
393        let d = new_dispatcher(
394            self.stages_builder.build(),
395            self.thread_local,
396            self.thread_pool,
397        );
398
399        #[cfg(not(feature = "parallel"))]
400        let d = new_dispatcher(self.stages_builder.build(), self.thread_local);
401
402        d
403    }
404
405    fn next_id(&mut self) -> SystemId {
406        let id = self.current_id;
407        self.current_id += 1;
408
409        SystemId(id)
410    }
411
412    #[cfg(feature = "parallel")]
413    fn create_thread_pool() -> ::std::sync::Arc<::rayon::ThreadPool> {
414        use rayon::ThreadPoolBuilder;
415        use std::sync::Arc;
416
417        Arc::new(
418            ThreadPoolBuilder::new()
419                .build()
420                .expect("Invalid configuration"),
421        )
422    }
423}
424
425#[cfg(feature = "parallel")]
426impl<'b> DispatcherBuilder<'static, 'b> {
427    /// Builds an async dispatcher.
428    ///
429    /// It does not allow non-static types and accepts a `World` struct or a
430    /// value that can be borrowed as `World`.
431    pub fn build_async<R>(
432        self,
433        world: R,
434    ) -> crate::dispatch::async_dispatcher::AsyncDispatcher<'b, R> {
435        use crate::dispatch::async_dispatcher::new_async;
436
437        self.thread_pool
438            .write()
439            .unwrap()
440            .get_or_insert_with(Self::create_thread_pool);
441
442        new_async(
443            world,
444            self.stages_builder.build(),
445            self.thread_local,
446            self.thread_pool,
447        )
448    }
449}
450
451impl<'a, 'b> fmt::Debug for DispatcherBuilder<'a, 'b> {
452    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
453        self.stages_builder.write_par_seq(f, &self.map)
454    }
455}