shred/dispatch/
dispatcher.rs1use smallvec::SmallVec;
2
3use crate::{
4 dispatch::{stage::Stage, SendDispatcher},
5 system::RunNow,
6 world::World,
7};
8
9#[cfg(feature = "parallel")]
12pub type ThreadPoolWrapper = Option<::std::sync::Arc<::rayon::ThreadPool>>;
13
14pub struct Dispatcher<'a, 'b> {
17 inner: SendDispatcher<'a>,
18 thread_local: ThreadLocal<'b>,
19}
20
21impl<'a, 'b> Dispatcher<'a, 'b> {
22 pub fn setup(&mut self, world: &mut World) {
25 self.inner.setup(world);
26
27 for sys in &mut self.thread_local {
28 sys.setup(world);
29 }
30 }
31
32 pub fn dispose(self, world: &mut World) {
37 self.inner.dispose(world);
38
39 for sys in self.thread_local {
40 sys.dispose(world);
41 }
42 }
43
44 pub fn dispatch(&mut self, world: &World) {
57 self.inner.dispatch(world);
58 self.dispatch_thread_local(world);
59 }
60
61 #[cfg(feature = "parallel")]
72 pub fn dispatch_par(&mut self, world: &World) {
73 self.inner.dispatch_par(world);
74 }
75
76 pub fn dispatch_seq(&mut self, world: &World) {
84 self.inner.dispatch_seq(world);
85 }
86
87 pub fn dispatch_thread_local(&mut self, world: &World) {
92 for sys in &mut self.thread_local {
93 sys.run_now(world);
94 }
95 }
96
97 pub fn try_into_sendable(self) -> Result<SendDispatcher<'a>, Self> {
101 let Dispatcher {
102 inner: _,
103 thread_local,
104 } = &self;
105
106 if thread_local.is_empty() {
107 Ok(self.inner)
108 } else {
109 Err(self)
110 }
111 }
112
113 #[cfg(feature = "parallel")]
117 pub fn max_threads(&self) -> usize {
118 self.inner.max_threads()
119 }
120}
121
122impl<'a, 'b, 'c> RunNow<'a> for Dispatcher<'b, 'c> {
123 fn run_now(&mut self, world: &World) {
124 self.dispatch(world);
125 }
126
127 fn setup(&mut self, world: &mut World) {
128 self.setup(world);
129 }
130
131 fn dispose(self: Box<Self>, world: &mut World) {
132 (*self).dispose(world);
133 }
134}
135
136#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
137pub struct SystemId(pub usize);
138
139pub type SystemExecSend<'b> = Box<dyn for<'a> RunNow<'a> + Send + 'b>;
140pub type ThreadLocal<'a> = SmallVec<[Box<dyn for<'b> RunNow<'b> + 'a>; 4]>;
141
142#[cfg(feature = "parallel")]
143pub fn new_dispatcher<'a, 'b>(
144 stages: Vec<Stage<'a>>,
145 thread_local: ThreadLocal<'b>,
146 thread_pool: ::std::sync::Arc<::std::sync::RwLock<ThreadPoolWrapper>>,
147) -> Dispatcher<'a, 'b> {
148 Dispatcher {
149 inner: SendDispatcher {
150 stages,
151 thread_pool,
152 },
153 thread_local,
154 }
155}
156
157#[cfg(not(feature = "parallel"))]
158pub fn new_dispatcher<'a, 'b>(
159 stages: Vec<Stage<'a>>,
160 thread_local: ThreadLocal<'b>,
161) -> Dispatcher<'a, 'b> {
162 Dispatcher {
163 inner: SendDispatcher { stages },
164 thread_local,
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use crate::{dispatch::builder::DispatcherBuilder, system::*, world::*};
171
172 #[derive(Default)]
173 struct Res(i32);
174
175 struct Dummy(i32);
176
177 impl<'a> System<'a> for Dummy {
178 type SystemData = Write<'a, Res>;
179
180 fn run(&mut self, mut data: Self::SystemData) {
181 if self.0 == 4 {
182 assert_eq!(data.0, 6);
185 } else if self.0 == 5 {
186 assert_eq!(data.0, 10);
189 }
190
191 data.0 += self.0;
192 }
193 }
194
195 struct Panic;
196
197 impl<'a> System<'a> for Panic {
198 type SystemData = ();
199
200 fn run(&mut self, _: Self::SystemData) {
201 panic!("Propagated panic");
202 }
203 }
204
205 fn new_builder() -> DispatcherBuilder<'static, 'static> {
206 DispatcherBuilder::new()
207 .with(Dummy(0), "0", &[])
208 .with(Dummy(1), "1", &[])
209 .with(Dummy(2), "2", &[])
210 .with(Dummy(3), "3", &["1"])
211 .with_barrier()
212 .with(Dummy(4), "4", &[])
213 .with(Dummy(5), "5", &["4"])
214 }
215
216 fn new_world() -> World {
217 let mut world = World::empty();
218 world.insert(Res(0));
219
220 world
221 }
222
223 #[test]
224 #[should_panic(expected = "Propagated panic")]
225 fn dispatcher_panics() {
226 DispatcherBuilder::new()
227 .with(Panic, "p", &[])
228 .build()
229 .dispatch(&new_world())
230 }
231
232 #[test]
233 fn stages() {
234 let mut d = new_builder().build();
235
236 d.dispatch(&new_world());
237 }
238
239 #[test]
240 #[cfg(feature = "parallel")]
241 fn stages_async() {
242 let mut d = new_builder().build_async(new_world());
243
244 d.dispatch();
245 }
246}