1#![doc(html_root_url = "https://docs.rs/executors/0.10.0")]
9#![deny(missing_docs)]
10#![allow(unused_parens)]
11#![allow(clippy::unused_unit)]
12
13#[macro_use]
62extern crate log;
63
64pub mod bichannel;
65pub mod common;
66#[cfg(feature = "cb-channel-exec")]
67pub mod crossbeam_channel_pool;
68#[cfg(feature = "workstealing-exec")]
69pub mod crossbeam_workstealing_pool;
70#[cfg(feature = "numa-aware")]
71pub mod numa_utils;
72pub mod parker;
73pub mod run_now;
74#[cfg(feature = "threadpool-exec")]
75pub mod threadpool_executor;
76mod timeconstants;
77
78pub use crate::common::{CanExecute, Executor};
79
80#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
81pub mod futures_executor;
82#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
83pub use crate::futures_executor::{FuturesExecutor, JoinHandle};
84
85use synchronoise::CountdownEvent;
87
88mod locals;
89pub use locals::*;
90
91#[cfg(feature = "produce-metrics")]
92use metrics::{counter, describe_counter, describe_gauge, gauge};
93
94#[cfg(test)]
126pub(crate) mod tests {
127 use super::*;
128 use std::{
129 fmt::Debug,
130 sync::{
131 atomic::{AtomicBool, Ordering},
132 Arc,
133 },
134 time::Duration,
135 };
136
137 pub const N_DEPTH_SMALL: usize = 1024;
138 pub const N_DEPTH: usize = 8192; pub const N_WIDTH: usize = 128;
140 pub const N_SLEEP_ROUNDS: usize = 11;
141
142 pub const TEST_TIMEOUT: Duration = Duration::from_secs(480);
143
144 pub fn test_debug<E>(exec: &E, label: &str)
145 where
146 E: Executor + Debug,
147 {
148 println!("Debug output for {}: {:?}", label, exec);
149 }
150
151 pub fn test_small_defaults<E>(label: &str)
152 where
153 E: Executor + Debug + std::default::Default + 'static,
154 {
155 let _ = env_logger::try_init();
156 #[cfg(feature = "produce-metrics")]
157 metrics_printer::init();
158
159 let pool = E::default();
160 #[cfg(feature = "produce-metrics")]
161 pool.register_metrics();
162
163 let latch = Arc::new(CountdownEvent::new(N_DEPTH_SMALL * N_WIDTH));
164 for _ in 0..N_WIDTH {
165 let pool2 = pool.clone();
166 let latch2 = latch.clone();
167 pool.execute(move || {
168 do_step(latch2, pool2, N_DEPTH_SMALL);
169 });
170 }
171 let res = latch.wait_timeout(TEST_TIMEOUT);
172 assert_eq!(res, 0);
173 pool.shutdown()
174 .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
175 }
176
177 pub fn test_defaults<E>(label: &str)
178 where
179 E: Executor + Debug + std::default::Default + 'static,
180 {
181 #[cfg(feature = "produce-metrics")]
182 metrics_printer::init();
183
184 let pool = E::default();
185 #[cfg(feature = "produce-metrics")]
186 pool.register_metrics();
187
188 let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
189 for _ in 0..N_WIDTH {
190 let pool2 = pool.clone();
191 let latch2 = latch.clone();
192 pool.execute(move || {
193 do_step(latch2, pool2, N_DEPTH);
194 });
195 }
196 let res = latch.wait_timeout(TEST_TIMEOUT);
197 assert_eq!(res, 0);
198 pool.shutdown()
199 .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
200 }
201
202 pub fn test_custom<E>(exec: E, label: &str)
203 where
204 E: Executor + Debug + 'static,
205 {
206 #[cfg(feature = "produce-metrics")]
207 metrics_printer::init();
208
209 let pool = exec;
210 #[cfg(feature = "produce-metrics")]
211 pool.register_metrics();
212
213 let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
214 for _ in 0..N_WIDTH {
215 let pool2 = pool.clone();
216 let latch2 = latch.clone();
217 pool.execute(move || {
218 do_step(latch2, pool2, N_DEPTH);
219 });
220 }
221 let res = latch.wait_timeout(TEST_TIMEOUT);
222 assert_eq!(res, 0);
223 pool.shutdown()
224 .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
225 }
226
227 pub fn test_sleepy<E>(pool: E, label: &str)
228 where
229 E: Executor + 'static,
230 {
231 #[cfg(feature = "produce-metrics")]
232 metrics_printer::init();
233
234 #[cfg(feature = "produce-metrics")]
235 pool.register_metrics();
236
237 info!("Running sleepy test for {}", label);
238 let latch = Arc::new(CountdownEvent::new(N_SLEEP_ROUNDS * N_WIDTH));
239 for round in 1..=N_SLEEP_ROUNDS {
240 let sleep_time = 1u64 << round;
242 std::thread::sleep(Duration::from_millis(sleep_time));
243 for _ in 0..N_WIDTH {
244 let latch2 = latch.clone();
245 pool.execute(move || {
246 latch2.decrement().expect("Latch didn't decrement!");
247 });
248 }
249 }
250 let res = latch.wait_timeout(Duration::from_secs((N_SLEEP_ROUNDS as u64) * 3));
251 assert_eq!(res, 0);
252 pool.shutdown()
253 .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
254 }
255
256 pub fn test_local<E>(exec: E, label: &str)
257 where
258 E: Executor + Debug + 'static,
259 {
260 #[cfg(feature = "produce-metrics")]
261 metrics_printer::init();
262
263 let pool = exec;
264 #[cfg(feature = "produce-metrics")]
265 pool.register_metrics();
266
267 let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
268 let failed = Arc::new(AtomicBool::new(false));
269 for _ in 0..N_WIDTH {
270 let latch2 = latch.clone();
271 let failed2 = failed.clone();
272 pool.execute(move || {
273 do_step_local(latch2, failed2, N_DEPTH);
274 });
275 }
276 let res = latch.wait_timeout(TEST_TIMEOUT);
277 assert_eq!(res, 0);
278 assert!(
279 !failed.load(Ordering::SeqCst),
280 "Executor does not support local scheduling!"
281 );
282 pool.shutdown()
283 .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
284 }
285
286 pub fn test_small_local<E>(exec: E, label: &str)
287 where
288 E: Executor + Debug + 'static,
289 {
290 #[cfg(feature = "produce-metrics")]
291 metrics_printer::init();
292
293 let pool = exec;
294 #[cfg(feature = "produce-metrics")]
295 pool.register_metrics();
296
297 let latch = Arc::new(CountdownEvent::new(N_DEPTH_SMALL * N_WIDTH));
298 let failed = Arc::new(AtomicBool::new(false));
299 for _ in 0..N_WIDTH {
300 let latch2 = latch.clone();
301 let failed2 = failed.clone();
302 pool.execute(move || {
303 do_step_local(latch2, failed2, N_DEPTH_SMALL);
304 });
305 }
306 let res = latch.wait_timeout(TEST_TIMEOUT);
307 assert_eq!(res, 0);
308 assert!(
309 !failed.load(Ordering::SeqCst),
310 "Executor does not support local scheduling!"
311 );
312 pool.shutdown()
313 .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
314 }
315
316 fn do_step<E>(latch: Arc<CountdownEvent>, pool: E, depth: usize)
317 where
318 E: Executor + Debug + 'static,
319 {
320 let new_depth = depth - 1;
321 latch.decrement().expect("Latch didn't decrement!");
322 if (new_depth > 0) {
323 let pool2 = pool.clone();
324 pool.execute(move || do_step(latch, pool2, new_depth))
325 }
326 }
327
328 fn do_step_local(latch: Arc<CountdownEvent>, failed: Arc<AtomicBool>, depth: usize) {
329 let new_depth = depth - 1;
330 match latch.decrement() {
331 Ok(_) => {
332 if (new_depth > 0) {
333 let failed2 = failed.clone();
334 let latch2 = latch.clone();
335 let res =
336 try_execute_locally(move || do_step_local(latch2, failed2, new_depth));
337 if res.is_err() {
338 error!("do_step_local should have executed locally!");
339 failed.store(true, Ordering::SeqCst);
340 while latch.decrement().is_ok() {
341 () }
343 }
344 }
345 }
346 Err(e) => {
347 if failed.load(Ordering::SeqCst) {
348 warn!("Aborting test as it failed");
349 } else {
351 panic!("Latch didn't decrement! Error: {:?}", e);
352 }
353 }
354 }
355 }
356}