use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use log::debug;
struct ThreadPoolState {
active_workers: u32,
active: bool,
}
impl ThreadPoolState {
pub fn new() -> ThreadPoolState {
ThreadPoolState {
active_workers: 0,
active: true,
}
}
pub fn is_active(&self) -> bool {
self.active
}
pub fn active_workers(&self) -> u32 {
self.active_workers
}
pub fn switch_to_inactive(&mut self) {
self.active = false;
}
pub fn increment_active(&mut self) {
self.active_workers += 1;
}
pub fn decrement_active(&mut self) {
self.active_workers -= 1;
}
}
pub struct ThreadPool {
pool: rayon::ThreadPool,
state: Arc<Mutex<ThreadPoolState>>,
}
impl ThreadPool {
pub fn new(num_threads: usize, pool_name: String) -> Self {
debug!("Creating new ThreadPool with {num_threads} threads!");
let pool = rayon::ThreadPoolBuilder::new()
.thread_name(move |i| format!("{pool_name}#{i}"))
.num_threads(num_threads)
.build()
.unwrap();
let state = Arc::new(Mutex::new(ThreadPoolState::new()));
Self { pool, state }
}
pub fn spawn<OP>(&self, work: OP)
where
OP: FnOnce() + Send + 'static,
{
{
let mut state = self.state.lock().unwrap();
if state.is_active() {
state.increment_active();
} else {
return;
}
}
let state = self.state.clone();
self.pool.spawn(move || {
{
let mut state = state.lock().unwrap();
if !state.is_active() {
return state.decrement_active();
}
}
work();
{
let mut state = state.lock().unwrap();
state.decrement_active();
}
});
}
pub fn exit(&self) {
{
let mut state = self.state.lock().unwrap();
state.switch_to_inactive();
}
let mut rounds = 0;
loop {
rounds += 1;
{
let state = self.state.lock().unwrap();
let still_active = state.active_workers();
if still_active == 0 || rounds == 10 {
if still_active > 0 {
debug!(
"Exiting ThreadPool with {:?} still working(should be zero)",
still_active
);
}
break;
}
}
thread::sleep(Duration::from_millis(100));
}
}
}