shred/dispatch/
dispatcher.rs

1use smallvec::SmallVec;
2
3use crate::{
4    dispatch::{stage::Stage, SendDispatcher},
5    system::RunNow,
6    world::World,
7};
8
9/// This wrapper is used to share a replaceable ThreadPool with other
10/// dispatchers. Useful with batch dispatchers.
11#[cfg(feature = "parallel")]
12pub type ThreadPoolWrapper = Option<::std::sync::Arc<::rayon::ThreadPool>>;
13
14/// The dispatcher struct, allowing
15/// systems to be executed in parallel.
16pub struct Dispatcher<'a, 'b> {
17    inner: SendDispatcher<'a>,
18    thread_local: ThreadLocal<'b>,
19}
20
21impl<'a, 'b> Dispatcher<'a, 'b> {
22    /// Sets up all the systems which means they are gonna add default values
23    /// for the resources they need.
24    pub fn setup(&mut self, world: &mut World) {
25        self.inner.setup(world);
26
27        for sys in &mut self.thread_local {
28            sys.setup(world);
29        }
30    }
31
32    /// Calls the `dispose` method of all systems and allows them to release
33    /// external resources. It is common this method removes components and
34    /// / or resources from the `World` which are associated with external
35    /// resources.
36    pub fn dispose(self, world: &mut World) {
37        self.inner.dispose(world);
38
39        for sys in self.thread_local {
40            sys.dispose(world);
41        }
42    }
43
44    /// Dispatch all the systems with given resources and context
45    /// and then run thread local systems.
46    ///
47    /// This function automatically redirects to
48    ///
49    /// * [Dispatcher::dispatch_par] in case it is supported
50    /// * [Dispatcher::dispatch_seq] otherwise
51    ///
52    /// and runs `dispatch_thread_local` afterwards.
53    ///
54    /// Please note that this method assumes that no resource
55    /// is currently borrowed. If that's the case, it panics.
56    pub fn dispatch(&mut self, world: &World) {
57        self.inner.dispatch(world);
58        self.dispatch_thread_local(world);
59    }
60
61    /// Dispatches the systems (except thread local systems)
62    /// in parallel given the resources to operate on.
63    ///
64    /// This operation blocks the
65    /// executing thread.
66    ///
67    /// Only available with "parallel" feature enabled.
68    ///
69    /// Please note that this method assumes that no resource
70    /// is currently borrowed. If that's the case, it panics.
71    #[cfg(feature = "parallel")]
72    pub fn dispatch_par(&mut self, world: &World) {
73        self.inner.dispatch_par(world);
74    }
75
76    /// Dispatches the systems (except thread local systems) sequentially.
77    ///
78    /// This is useful if parallel overhead is
79    /// too big or the platform does not support multithreading.
80    ///
81    /// Please note that this method assumes that no resource
82    /// is currently borrowed. If that's the case, it panics.
83    pub fn dispatch_seq(&mut self, world: &World) {
84        self.inner.dispatch_seq(world);
85    }
86
87    /// Dispatch only thread local systems sequentially.
88    ///
89    /// Please note that this method assumes that no resource
90    /// is currently borrowed. If that's the case, it panics.
91    pub fn dispatch_thread_local(&mut self, world: &World) {
92        for sys in &mut self.thread_local {
93            sys.run_now(world);
94        }
95    }
96
97    /// Converts this to a [`SendDispatcher`].
98    ///
99    /// Fails and returns the original distpatcher if it contains thread local systems.
100    pub fn try_into_sendable(self) -> Result<SendDispatcher<'a>, Self> {
101        let Dispatcher {
102            inner: _,
103            thread_local,
104        } = &self;
105
106        if thread_local.is_empty() {
107            Ok(self.inner)
108        } else {
109            Err(self)
110        }
111    }
112
113    /// This method returns the largest amount of threads this dispatcher
114    /// can make use of. This is mainly for debugging purposes so you can see
115    /// how well your systems can make use of multi-threading.
116    #[cfg(feature = "parallel")]
117    pub fn max_threads(&self) -> usize {
118        self.inner.max_threads()
119    }
120}
121
122impl<'a, 'b, 'c> RunNow<'a> for Dispatcher<'b, 'c> {
123    fn run_now(&mut self, world: &World) {
124        self.dispatch(world);
125    }
126
127    fn setup(&mut self, world: &mut World) {
128        self.setup(world);
129    }
130
131    fn dispose(self: Box<Self>, world: &mut World) {
132        (*self).dispose(world);
133    }
134}
135
136#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
137pub struct SystemId(pub usize);
138
139pub type SystemExecSend<'b> = Box<dyn for<'a> RunNow<'a> + Send + 'b>;
140pub type ThreadLocal<'a> = SmallVec<[Box<dyn for<'b> RunNow<'b> + 'a>; 4]>;
141
142#[cfg(feature = "parallel")]
143pub fn new_dispatcher<'a, 'b>(
144    stages: Vec<Stage<'a>>,
145    thread_local: ThreadLocal<'b>,
146    thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
147) -> Dispatcher<'a, 'b> {
148    Dispatcher {
149        inner: SendDispatcher {
150            stages,
151            thread_pool,
152        },
153        thread_local,
154    }
155}
156
157#[cfg(not(feature = "parallel"))]
158pub fn new_dispatcher<'a, 'b>(
159    stages: Vec<Stage<'a>>,
160    thread_local: ThreadLocal<'b>,
161) -> Dispatcher<'a, 'b> {
162    Dispatcher {
163        inner: SendDispatcher { stages },
164        thread_local,
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use crate::{dispatch::builder::DispatcherBuilder, system::*, world::*};
171
172    #[derive(Default)]
173    struct Res(i32);
174
175    struct Dummy(i32);
176
177    impl<'a> System<'a> for Dummy {
178        type SystemData = Write<'a, Res>;
179
180        fn run(&mut self, mut data: Self::SystemData) {
181            if self.0 == 4 {
182                // In second stage
183
184                assert_eq!(data.0, 6);
185            } else if self.0 == 5 {
186                // In second stage
187
188                assert_eq!(data.0, 10);
189            }
190
191            data.0 += self.0;
192        }
193    }
194
195    struct Panic;
196
197    impl<'a> System<'a> for Panic {
198        type SystemData = ();
199
200        fn run(&mut self, _: Self::SystemData) {
201            panic!("Propagated panic");
202        }
203    }
204
205    fn new_builder() -> DispatcherBuilder<'static, 'static> {
206        DispatcherBuilder::new()
207            .with(Dummy(0), "0", &[])
208            .with(Dummy(1), "1", &[])
209            .with(Dummy(2), "2", &[])
210            .with(Dummy(3), "3", &["1"])
211            .with_barrier()
212            .with(Dummy(4), "4", &[])
213            .with(Dummy(5), "5", &["4"])
214    }
215
216    fn new_world() -> World {
217        let mut world = World::empty();
218        world.insert(Res(0));
219
220        world
221    }
222
223    #[test]
224    #[should_panic(expected = "Propagated panic")]
225    fn dispatcher_panics() {
226        DispatcherBuilder::new()
227            .with(Panic, "p", &[])
228            .build()
229            .dispatch(&new_world())
230    }
231
232    #[test]
233    fn stages() {
234        let mut d = new_builder().build();
235
236        d.dispatch(&new_world());
237    }
238
239    #[test]
240    #[cfg(feature = "parallel")]
241    fn stages_async() {
242        let mut d = new_builder().build_async(new_world());
243
244        d.dispatch();
245    }
246}