use super::*;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use threadpool::ThreadPool;
#[derive(Clone, Debug)]
pub struct ThreadPoolExecutor {
pool: ThreadPool,
active: Arc<AtomicBool>,
}
impl ThreadPoolExecutor {
pub fn new(threads: usize) -> ThreadPoolExecutor {
let pool = ThreadPool::new(threads);
ThreadPoolExecutor {
pool,
active: Arc::new(AtomicBool::new(true)),
}
}
}
#[cfg(feature = "defaults")]
impl Default for ThreadPoolExecutor {
fn default() -> Self {
ThreadPoolExecutor::new(num_cpus::get())
}
}
impl CanExecute for ThreadPoolExecutor {
fn execute_job(&self, job: Box<dyn FnOnce() + Send + 'static>) {
if self.active.load(Ordering::SeqCst) {
#[cfg(feature = "produce-metrics")]
let job = {
gauge!("executors.jobs_queued", "executor" => std::any::type_name::<ThreadPoolExecutor>()).increment(1.0);
Box::new(move || {
job();
counter!("executors.jobs_executed", "executor" => std::any::type_name::<ThreadPoolExecutor>()).increment(1);
gauge!("executors.jobs_queued", "executor" => std::any::type_name::<ThreadPoolExecutor>()).decrement(1.0);
})
};
self.pool.execute(job);
} else {
warn!("Ignoring job as pool is shutting down.");
}
}
}
impl Executor for ThreadPoolExecutor {
fn execute<F>(&self, job: F)
where
F: FnOnce() + Send + 'static,
{
if self.active.load(Ordering::SeqCst) {
#[cfg(feature = "produce-metrics")]
let job = {
gauge!("executors.jobs_queued", "executor" => std::any::type_name::<ThreadPoolExecutor>()).increment(1.0);
move || {
job();
counter!("executors.jobs_executed", "executor" => std::any::type_name::<ThreadPoolExecutor>()).increment(1);
gauge!("executors.jobs_queued", "executor" => std::any::type_name::<ThreadPoolExecutor>()).decrement(1.0);
}
};
self.pool.execute(job);
} else {
warn!("Ignoring job as pool is shutting down.");
}
}
fn shutdown_async(&self) {
if self
.active
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
debug!("Shutting down executor.");
} else {
warn!("Executor was already shut down!");
}
}
fn shutdown_borrowed(&self) -> Result<(), String> {
if self
.active
.compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
.is_ok()
{
debug!("Waiting for pool to shut down.");
self.pool.join();
debug!("Pool was shut down.");
Result::Ok(())
} else {
Result::Err(String::from("Pool was already shut down!"))
}
}
#[cfg(feature = "produce-metrics")]
fn register_metrics(&self) {
describe_counter!(
"executors.jobs_executed",
"The total number of jobs that were executed"
);
describe_gauge!(
"executors.jobs_queued",
"The number of jobs that are currently waiting to be executed"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
const LABEL: &str = "Threadpool";
#[test]
fn test_debug() {
let exec = ThreadPoolExecutor::new(2);
crate::tests::test_debug(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}
#[test]
fn test_sleepy() {
let exec = ThreadPoolExecutor::new(4);
crate::tests::test_sleepy(exec, LABEL);
}
#[test]
fn test_defaults() {
crate::tests::test_defaults::<ThreadPoolExecutor>(LABEL);
}
#[test]
#[ignore]
#[should_panic] fn test_local() {
let exec = ThreadPoolExecutor::default();
crate::tests::test_local(exec, LABEL);
}
#[test]
fn run_with_two_threads() {
let _ = env_logger::try_init();
let latch = Arc::new(CountdownEvent::new(2));
let pool = ThreadPoolExecutor::new(2);
let latch2 = latch.clone();
let latch3 = latch.clone();
pool.execute(move || {
let _ = latch2.decrement();
});
pool.execute(move || {
let _ = latch3.decrement();
});
let res = latch.wait_timeout(Duration::from_secs(5));
assert_eq!(res, 0);
}
#[test]
#[ignore]
fn keep_pool_size() {
let _ = env_logger::try_init();
let latch = Arc::new(CountdownEvent::new(2));
let pool = ThreadPoolExecutor::new(1);
let latch2 = latch.clone();
let latch3 = latch.clone();
pool.execute(move || {
let _ = latch2.decrement();
});
pool.execute(move || panic!("test panic please ignore"));
pool.execute(move || {
let _ = latch3.decrement();
});
let res = latch.wait_timeout(Duration::from_secs(5));
assert_eq!(res, 0);
}
#[test]
fn shutdown_from_worker() {
let _ = env_logger::try_init();
let pool = ThreadPoolExecutor::new(1);
let pool2 = pool.clone();
let latch = Arc::new(CountdownEvent::new(2));
let latch2 = latch.clone();
let latch3 = latch.clone();
let stop_latch = Arc::new(CountdownEvent::new(1));
let stop_latch2 = stop_latch.clone();
pool.execute(move || {
let _ = latch2.decrement();
});
pool.execute(move || {
pool2.shutdown_async();
let _ = stop_latch2.decrement();
});
let res = stop_latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 0);
pool.execute(move || {
let _ = latch3.decrement();
});
let res = latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 1);
}
#[test]
fn shutdown_external() {
let _ = env_logger::try_init();
let pool = ThreadPoolExecutor::new(1);
let pool2 = pool.clone();
let latch = Arc::new(CountdownEvent::new(2));
let latch2 = latch.clone();
let latch3 = latch.clone();
pool.execute(move || {
let _ = latch2.decrement();
});
pool.shutdown().expect("pool to shut down");
pool2.execute(move || {
let _ = latch3.decrement();
});
let res = latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 1);
}
}