#![doc(html_root_url = "https://docs.rs/executors/0.9.0")]
#![deny(missing_docs)]
#![allow(unused_parens)]
#![allow(clippy::unused_unit)]
#[macro_use]
extern crate log;
pub mod bichannel;
pub mod common;
#[cfg(feature = "cb-channel-exec")]
pub mod crossbeam_channel_pool;
#[cfg(feature = "workstealing-exec")]
pub mod crossbeam_workstealing_pool;
#[cfg(feature = "numa-aware")]
pub mod numa_utils;
pub mod parker;
pub mod run_now;
#[cfg(feature = "threadpool-exec")]
pub mod threadpool_executor;
mod timeconstants;
pub use crate::common::{CanExecute, Executor};
#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
pub mod futures_executor;
#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
pub use crate::futures_executor::{FuturesExecutor, JoinHandle};
use synchronoise::CountdownEvent;
mod locals;
pub use locals::*;
#[cfg(feature = "produce-metrics")]
use metrics::{
counter,
decrement_gauge,
increment_counter,
increment_gauge,
register_counter,
register_gauge,
};
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use std::{
fmt::Debug,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
pub const N_DEPTH_SMALL: usize = 1024;
pub const N_DEPTH: usize = 8192; pub const N_WIDTH: usize = 128;
pub const N_SLEEP_ROUNDS: usize = 11;
pub const TEST_TIMEOUT: Duration = Duration::from_secs(240);
pub fn test_debug<E>(exec: &E, label: &str)
where
E: Executor + Debug,
{
println!("Debug output for {}: {:?}", label, exec);
}
pub fn test_small_defaults<E>(label: &str)
where
E: Executor + Debug + std::default::Default + 'static,
{
let _ = env_logger::try_init();
#[cfg(feature = "produce-metrics")]
metrics_printer::init();
let pool = E::default();
#[cfg(feature = "produce-metrics")]
pool.register_metrics();
let latch = Arc::new(CountdownEvent::new(N_DEPTH_SMALL * N_WIDTH));
for _ in 0..N_WIDTH {
let pool2 = pool.clone();
let latch2 = latch.clone();
pool.execute(move || {
do_step(latch2, pool2, N_DEPTH_SMALL);
});
}
let res = latch.wait_timeout(TEST_TIMEOUT);
assert_eq!(res, 0);
pool.shutdown()
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}
pub fn test_defaults<E>(label: &str)
where
E: Executor + Debug + std::default::Default + 'static,
{
#[cfg(feature = "produce-metrics")]
metrics_printer::init();
let pool = E::default();
#[cfg(feature = "produce-metrics")]
pool.register_metrics();
let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
for _ in 0..N_WIDTH {
let pool2 = pool.clone();
let latch2 = latch.clone();
pool.execute(move || {
do_step(latch2, pool2, N_DEPTH);
});
}
let res = latch.wait_timeout(TEST_TIMEOUT);
assert_eq!(res, 0);
pool.shutdown()
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}
pub fn test_custom<E>(exec: E, label: &str)
where
E: Executor + Debug + 'static,
{
#[cfg(feature = "produce-metrics")]
metrics_printer::init();
let pool = exec;
#[cfg(feature = "produce-metrics")]
pool.register_metrics();
let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
for _ in 0..N_WIDTH {
let pool2 = pool.clone();
let latch2 = latch.clone();
pool.execute(move || {
do_step(latch2, pool2, N_DEPTH);
});
}
let res = latch.wait_timeout(TEST_TIMEOUT);
assert_eq!(res, 0);
pool.shutdown()
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}
pub fn test_sleepy<E>(pool: E, label: &str)
where
E: Executor + 'static,
{
#[cfg(feature = "produce-metrics")]
metrics_printer::init();
#[cfg(feature = "produce-metrics")]
pool.register_metrics();
info!("Running sleepy test for {}", label);
let latch = Arc::new(CountdownEvent::new(N_SLEEP_ROUNDS * N_WIDTH));
for round in 1..=N_SLEEP_ROUNDS {
let sleep_time = 1u64 << round;
std::thread::sleep(Duration::from_millis(sleep_time));
for _ in 0..N_WIDTH {
let latch2 = latch.clone();
pool.execute(move || {
latch2.decrement().expect("Latch didn't decrement!");
});
}
}
let res = latch.wait_timeout(Duration::from_secs((N_SLEEP_ROUNDS as u64) * 3));
assert_eq!(res, 0);
pool.shutdown()
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}
pub fn test_local<E>(exec: E, label: &str)
where
E: Executor + Debug + 'static,
{
#[cfg(feature = "produce-metrics")]
metrics_printer::init();
let pool = exec;
#[cfg(feature = "produce-metrics")]
pool.register_metrics();
let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
let failed = Arc::new(AtomicBool::new(false));
for _ in 0..N_WIDTH {
let latch2 = latch.clone();
let failed2 = failed.clone();
pool.execute(move || {
do_step_local(latch2, failed2, N_DEPTH);
});
}
let res = latch.wait_timeout(TEST_TIMEOUT);
assert_eq!(res, 0);
assert!(
!failed.load(Ordering::SeqCst),
"Executor does not support local scheduling!"
);
pool.shutdown()
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}
pub fn test_small_local<E>(exec: E, label: &str)
where
E: Executor + Debug + 'static,
{
#[cfg(feature = "produce-metrics")]
metrics_printer::init();
let pool = exec;
#[cfg(feature = "produce-metrics")]
pool.register_metrics();
let latch = Arc::new(CountdownEvent::new(N_DEPTH_SMALL * N_WIDTH));
let failed = Arc::new(AtomicBool::new(false));
for _ in 0..N_WIDTH {
let latch2 = latch.clone();
let failed2 = failed.clone();
pool.execute(move || {
do_step_local(latch2, failed2, N_DEPTH_SMALL);
});
}
let res = latch.wait_timeout(TEST_TIMEOUT);
assert_eq!(res, 0);
assert!(
!failed.load(Ordering::SeqCst),
"Executor does not support local scheduling!"
);
pool.shutdown()
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}
fn do_step<E>(latch: Arc<CountdownEvent>, pool: E, depth: usize)
where
E: Executor + Debug + 'static,
{
let new_depth = depth - 1;
latch.decrement().expect("Latch didn't decrement!");
if (new_depth > 0) {
let pool2 = pool.clone();
pool.execute(move || do_step(latch, pool2, new_depth))
}
}
fn do_step_local(latch: Arc<CountdownEvent>, failed: Arc<AtomicBool>, depth: usize) {
let new_depth = depth - 1;
match latch.decrement() {
Ok(_) => {
if (new_depth > 0) {
let failed2 = failed.clone();
let latch2 = latch.clone();
let res =
try_execute_locally(move || do_step_local(latch2, failed2, new_depth));
if res.is_err() {
error!("do_step_local should have executed locally!");
failed.store(true, Ordering::SeqCst);
while latch.decrement().is_ok() {
() }
}
}
}
Err(e) => {
if failed.load(Ordering::SeqCst) {
warn!("Aborting test as it failed");
} else {
panic!("Latch didn't decrement! Error: {:?}", e);
}
}
}
}
}