executors/
lib.rs

1// Copyright 2017-2020 Lars Kroll. See the LICENSE
2// file at the top-level directory of this distribution.
3//
4// Licensed under the MIT license
5// <LICENSE or http://opensource.org/licenses/MIT>.
6// This file may not be copied, modified, or distributed
7// except according to those terms.
8#![doc(html_root_url = "https://docs.rs/executors/0.10.0")]
9#![deny(missing_docs)]
10#![allow(unused_parens)]
11#![allow(clippy::unused_unit)]
12
13//! This crate provides a number of task executors all implementing the
14//! [`Executor`](common/trait.Executor.html) trait.
15//!
16//! General examples can be found in the [`Executor`](common/trait.Executor.html) trait
17//! documentation, and implementation specific examples with each implementation module.
18//!
19//! # Crate Feature Flags
20//!
21//! The following crate feature flags are available. They are configured in your `Cargo.toml`.
22//!
23//! - `threadpool-exec` (default)
24//!     - Provides a wrapper implementation around the [threadpool](https://crates.io/crates/threadpool) crate.
25//!     - See [threadpool_executor](threadpool_executor).
26//! - `cb-channel-exec` (default)
27//!     - Provides a thread pool executor with a single global queue.
28//!     - See [crossbeam_channel_pool](crossbeam_channel_pool).
29//! - `workstealing-exec` (default)
30//!     - Provides a thread pool executor with thread-local queues in addition to a global injector queue.
31//!     - See [crossbeam_workstealing_pool](crossbeam_workstealing_pool).
32//! - `defaults` (default)
33//!     - Produces [Default](std::default::Default) implementations using [num_cpus](https://crates.io/crates/num_cpus) to determine pool sizes.
34//! - `ws-timed-fairness` (default)
35//!     - This feature flag determines the fairness mechanism between local and global queues in the [crossbeam_workstealing_pool](crossbeam_workstealing_pool).
36//!     - If the flag is enabled the fairness is time-based. The global queue will be checked every 100ms.
37//!     - If the flags is absent the fairness is count-based. The global queue will be checked every 100 local jobs.
38//!     - Which one you should pick depends on your application.
39//!     - Time-based fairness is a compromise between latency of externally scheduled jobs and overall throughput.
40//!     - Count-based is going to depend heavily on how long your jobs typically are, but counting is cheaper than checking time, so it can lead to higher throughput.
41//! - `ws-no-park`
42//!     - Disable thread parking for the [crossbeam_workstealing_pool](crossbeam_workstealing_pool).
43//!     - This is generally detrimental to performance, as idle threads will unnecessarily hang on to CPU resources.
44//!     - However, for very latency sensitive interactions with external resources (e.g., I/O), this can reduce overall job latency.
45//! - `thread-pinning`
46//!     - Allows pool threads to be pinned to specific cores.
47//!     - This can reduce cache invalidation overhead when threads sleep and then are woken up later.
48//!     - However, if your cores are needed by other processes, it can also introduce additional scheduling delay, if the pinned core isn't available immediately at wake time.
49//!     - Use with care.
50//! - `numa-aware`
51//!     - Make memory architecture aware decisions.
52//!     - Concretely this setting currently only affects [crossbeam_workstealing_pool](crossbeam_workstealing_pool).
53//!     - When it is enabled, work-stealing will happen by memory proximity.
54//!     - That is threads with too little work will try to steal work from memory-close other threads first, before trying further away threads.
55//! - `produce-metrics`
56//!     - Every executor provided in this crate can produce metrics using the [metrics](https://crates.io/crates/metrics) crate.
57//!     - The metrics are `executors.jobs_executed` (*"How many jobs were executed in total?"*) and `executors.jobs_queued` (*"How many jobs are currently waiting to be executed?"*).
58//!     - Not all executors produce all metrics.
59//!     - **WARNING**: Collecting these metrics typically has a serious performance impact. You should only consider using this in production if your jobs are fairly large anyway (say in the millisecond range).
60
61#[macro_use]
62extern crate log;
63
64pub mod bichannel;
65pub mod common;
66#[cfg(feature = "cb-channel-exec")]
67pub mod crossbeam_channel_pool;
68#[cfg(feature = "workstealing-exec")]
69pub mod crossbeam_workstealing_pool;
70#[cfg(feature = "numa-aware")]
71pub mod numa_utils;
72pub mod parker;
73pub mod run_now;
74#[cfg(feature = "threadpool-exec")]
75pub mod threadpool_executor;
76mod timeconstants;
77
78pub use crate::common::{CanExecute, Executor};
79
80#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
81pub mod futures_executor;
82#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
83pub use crate::futures_executor::{FuturesExecutor, JoinHandle};
84
85//use bichannel::*;
86use synchronoise::CountdownEvent;
87
88mod locals;
89pub use locals::*;
90
91#[cfg(feature = "produce-metrics")]
92use metrics::{counter, describe_counter, describe_gauge, gauge};
93
94// #[cfg(feature = "produce-metrics")]
95// pub mod metric_keys {
96//     /// Counts the total number of jobs that were executed.
97//     ///
98//     /// This is a metric key.
99//     pub const JOBS_EXECUTED: &str = "executors.jobs_executed";
100
101//     pub(crate) const JOBS_EXECUTED_DESCRIPTION: &str =
102//         "The total number of jobs that were executed";
103
104//     /// Counts the number of jobs that are currently waiting to be executed.
105//     ///
106//     /// This is a metric key.
107//     pub const JOBS_QUEUED: &str = "executors.jobs_queued";
108
109//     pub(crate) const JOBS_QUEUED_DESCRIPTION: &str =
110//         "The number of jobs that are currently waiting to be executed";
111
112//     /// The concrete [Executor](Executor) implementation that logged record.
113//     ///
114//     /// This is a label key.
115//     pub const EXECUTOR: &str = "executor";
116
117//     /// The id of the thread that logged record.
118//     ///
119//     /// This is only present on multi-threaded executors.
120//     ///
121//     /// This is a label key.
122//     pub const THREAD_ID: &str = "thread_id";
123// }
124
125#[cfg(test)]
126pub(crate) mod tests {
127    use super::*;
128    use std::{
129        fmt::Debug,
130        sync::{
131            atomic::{AtomicBool, Ordering},
132            Arc,
133        },
134        time::Duration,
135    };
136
137    pub const N_DEPTH_SMALL: usize = 1024;
138    pub const N_DEPTH: usize = 8192; // run_now can't do this, but it's a good test for the others
139    pub const N_WIDTH: usize = 128;
140    pub const N_SLEEP_ROUNDS: usize = 11;
141
142    pub const TEST_TIMEOUT: Duration = Duration::from_secs(480);
143
144    pub fn test_debug<E>(exec: &E, label: &str)
145    where
146        E: Executor + Debug,
147    {
148        println!("Debug output for {}: {:?}", label, exec);
149    }
150
151    pub fn test_small_defaults<E>(label: &str)
152    where
153        E: Executor + Debug + std::default::Default + 'static,
154    {
155        let _ = env_logger::try_init();
156        #[cfg(feature = "produce-metrics")]
157        metrics_printer::init();
158
159        let pool = E::default();
160        #[cfg(feature = "produce-metrics")]
161        pool.register_metrics();
162
163        let latch = Arc::new(CountdownEvent::new(N_DEPTH_SMALL * N_WIDTH));
164        for _ in 0..N_WIDTH {
165            let pool2 = pool.clone();
166            let latch2 = latch.clone();
167            pool.execute(move || {
168                do_step(latch2, pool2, N_DEPTH_SMALL);
169            });
170        }
171        let res = latch.wait_timeout(TEST_TIMEOUT);
172        assert_eq!(res, 0);
173        pool.shutdown()
174            .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
175    }
176
177    pub fn test_defaults<E>(label: &str)
178    where
179        E: Executor + Debug + std::default::Default + 'static,
180    {
181        #[cfg(feature = "produce-metrics")]
182        metrics_printer::init();
183
184        let pool = E::default();
185        #[cfg(feature = "produce-metrics")]
186        pool.register_metrics();
187
188        let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
189        for _ in 0..N_WIDTH {
190            let pool2 = pool.clone();
191            let latch2 = latch.clone();
192            pool.execute(move || {
193                do_step(latch2, pool2, N_DEPTH);
194            });
195        }
196        let res = latch.wait_timeout(TEST_TIMEOUT);
197        assert_eq!(res, 0);
198        pool.shutdown()
199            .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
200    }
201
202    pub fn test_custom<E>(exec: E, label: &str)
203    where
204        E: Executor + Debug + 'static,
205    {
206        #[cfg(feature = "produce-metrics")]
207        metrics_printer::init();
208
209        let pool = exec;
210        #[cfg(feature = "produce-metrics")]
211        pool.register_metrics();
212
213        let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
214        for _ in 0..N_WIDTH {
215            let pool2 = pool.clone();
216            let latch2 = latch.clone();
217            pool.execute(move || {
218                do_step(latch2, pool2, N_DEPTH);
219            });
220        }
221        let res = latch.wait_timeout(TEST_TIMEOUT);
222        assert_eq!(res, 0);
223        pool.shutdown()
224            .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
225    }
226
227    pub fn test_sleepy<E>(pool: E, label: &str)
228    where
229        E: Executor + 'static,
230    {
231        #[cfg(feature = "produce-metrics")]
232        metrics_printer::init();
233
234        #[cfg(feature = "produce-metrics")]
235        pool.register_metrics();
236
237        info!("Running sleepy test for {}", label);
238        let latch = Arc::new(CountdownEvent::new(N_SLEEP_ROUNDS * N_WIDTH));
239        for round in 1..=N_SLEEP_ROUNDS {
240            // let threads go to sleep
241            let sleep_time = 1u64 << round;
242            std::thread::sleep(Duration::from_millis(sleep_time));
243            for _ in 0..N_WIDTH {
244                let latch2 = latch.clone();
245                pool.execute(move || {
246                    latch2.decrement().expect("Latch didn't decrement!");
247                });
248            }
249        }
250        let res = latch.wait_timeout(Duration::from_secs((N_SLEEP_ROUNDS as u64) * 3));
251        assert_eq!(res, 0);
252        pool.shutdown()
253            .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
254    }
255
256    pub fn test_local<E>(exec: E, label: &str)
257    where
258        E: Executor + Debug + 'static,
259    {
260        #[cfg(feature = "produce-metrics")]
261        metrics_printer::init();
262
263        let pool = exec;
264        #[cfg(feature = "produce-metrics")]
265        pool.register_metrics();
266
267        let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
268        let failed = Arc::new(AtomicBool::new(false));
269        for _ in 0..N_WIDTH {
270            let latch2 = latch.clone();
271            let failed2 = failed.clone();
272            pool.execute(move || {
273                do_step_local(latch2, failed2, N_DEPTH);
274            });
275        }
276        let res = latch.wait_timeout(TEST_TIMEOUT);
277        assert_eq!(res, 0);
278        assert!(
279            !failed.load(Ordering::SeqCst),
280            "Executor does not support local scheduling!"
281        );
282        pool.shutdown()
283            .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
284    }
285
286    pub fn test_small_local<E>(exec: E, label: &str)
287    where
288        E: Executor + Debug + 'static,
289    {
290        #[cfg(feature = "produce-metrics")]
291        metrics_printer::init();
292
293        let pool = exec;
294        #[cfg(feature = "produce-metrics")]
295        pool.register_metrics();
296
297        let latch = Arc::new(CountdownEvent::new(N_DEPTH_SMALL * N_WIDTH));
298        let failed = Arc::new(AtomicBool::new(false));
299        for _ in 0..N_WIDTH {
300            let latch2 = latch.clone();
301            let failed2 = failed.clone();
302            pool.execute(move || {
303                do_step_local(latch2, failed2, N_DEPTH_SMALL);
304            });
305        }
306        let res = latch.wait_timeout(TEST_TIMEOUT);
307        assert_eq!(res, 0);
308        assert!(
309            !failed.load(Ordering::SeqCst),
310            "Executor does not support local scheduling!"
311        );
312        pool.shutdown()
313            .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
314    }
315
316    fn do_step<E>(latch: Arc<CountdownEvent>, pool: E, depth: usize)
317    where
318        E: Executor + Debug + 'static,
319    {
320        let new_depth = depth - 1;
321        latch.decrement().expect("Latch didn't decrement!");
322        if (new_depth > 0) {
323            let pool2 = pool.clone();
324            pool.execute(move || do_step(latch, pool2, new_depth))
325        }
326    }
327
328    fn do_step_local(latch: Arc<CountdownEvent>, failed: Arc<AtomicBool>, depth: usize) {
329        let new_depth = depth - 1;
330        match latch.decrement() {
331            Ok(_) => {
332                if (new_depth > 0) {
333                    let failed2 = failed.clone();
334                    let latch2 = latch.clone();
335                    let res =
336                        try_execute_locally(move || do_step_local(latch2, failed2, new_depth));
337                    if res.is_err() {
338                        error!("do_step_local should have executed locally!");
339                        failed.store(true, Ordering::SeqCst);
340                        while latch.decrement().is_ok() {
341                            () // do nothing, just keep draining, so the main thread wakes up
342                        }
343                    }
344                }
345            }
346            Err(e) => {
347                if failed.load(Ordering::SeqCst) {
348                    warn!("Aborting test as it failed");
349                // and simply return here
350                } else {
351                    panic!("Latch didn't decrement! Error: {:?}", e);
352                }
353            }
354        }
355    }
356}