use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Clone)]
pub struct RunNowExecutor {
active: Arc<AtomicBool>,
}
use super::*;
impl RunNowExecutor {
pub fn new() -> RunNowExecutor {
RunNowExecutor { active: Arc::new(AtomicBool::new(true)) }
}
}
impl Executor for RunNowExecutor {
fn execute<F>(&self, job: F)
where
F: FnOnce() + Send + 'static,
{
if self.active.load(Ordering::SeqCst) {
job();
} else {
warn!("Ignoring job as pool is shutting down.");
}
}
fn shutdown_async(&self) {
if self.active.compare_and_swap(true, false, Ordering::SeqCst) {
debug!("Shutting down executor.");
} else {
warn!("Executor was already shut down!");
}
}
fn shutdown(self) -> Result<(), String> {
if self.active.compare_and_swap(true, false, Ordering::SeqCst) {
debug!("Shutting down executor.");
Result::Ok(())
} else {
Result::Err(String::from("Pool was already shut down!"))
}
}
}
#[cfg(test)]
mod tests {
extern crate env_logger;
use super::*;
use std::time::Duration;
#[test]
fn run_tasks() {
env_logger::init();
let latch = Arc::new(CountdownEvent::new(2));
let exec = RunNowExecutor::new();
let latch2 = latch.clone();
let latch3 = latch.clone();
exec.execute(move || ignore(latch2.decrement()));
exec.execute(move || ignore(latch3.decrement()));
let res = latch.wait_timeout(Duration::from_secs(5));
assert_eq!(res, 0);
}
#[test]
fn shutdown_from_worker() {
env_logger::init();
let exec = RunNowExecutor::new();
let exec2 = exec.clone();
let latch = Arc::new(CountdownEvent::new(2));
let latch2 = latch.clone();
let latch3 = latch.clone();
let stop_latch = Arc::new(CountdownEvent::new(1));
let stop_latch2 = stop_latch.clone();
exec.execute(move || ignore(latch2.decrement()));
exec.execute(move || {
exec2.shutdown_async();
ignore(stop_latch2.decrement());
});
let res = stop_latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 0);
exec.execute(move || ignore(latch3.decrement()));
let res = latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 1);
}
#[test]
fn shutdown_external() {
env_logger::init();
let exec = RunNowExecutor::new();
let exec2 = exec.clone();
let latch = Arc::new(CountdownEvent::new(2));
let latch2 = latch.clone();
let latch3 = latch.clone();
exec.execute(move || ignore(latch2.decrement()));
exec.shutdown().expect("pool to shut down");
exec2.execute(move || ignore(latch3.decrement()));
let res = latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 1);
}
}