use std::{
collections::VecDeque,
fmt,
num::NonZero,
sync::{Arc, LazyLock, Mutex},
thread,
thread::Thread,
};
pub struct ThreadPool {
max_workers: usize,
inner: Arc<Mutex<Inner>>,
}
impl fmt::Debug for ThreadPool {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("ThreadPool").field("max_workers", &self.max_workers).finish_non_exhaustive()
}
}
type Task = Box<dyn FnOnce() + Send + 'static>;
#[derive(Default)]
struct Inner {
workers: Vec<Thread>,
tasks: VecDeque<Task>,
exit: bool,
idle: usize,
}
impl ThreadPool {
pub fn new(max_workers: NonZero<usize>) -> Self {
Self { max_workers: max_workers.get(), inner: Arc::new(Mutex::new(Inner::default())) }
}
pub fn exec(&self, f: impl FnOnce() + Send + 'static) -> Result<(), std::io::Error> {
let mut inner = self.inner.lock().unwrap();
inner.tasks.push_back(Box::new(f));
if inner.idle == 0 && inner.workers.len() < self.max_workers {
let id = inner.workers.len() + 1;
tracing::debug!("starting worker {id} / {}", self.max_workers);
let hnd = {
let inner = self.inner.clone();
thread::Builder::new().name(format!("remoc worker {id}")).spawn(move || Self::worker(inner))
}?;
inner.workers.push(hnd.thread().clone());
} else {
for worker in &inner.workers {
worker.unpark();
}
}
Ok(())
}
fn worker(inner: Arc<Mutex<Inner>>) {
let mut idle = false;
loop {
let mut inner = inner.lock().unwrap();
if let Some(task) = inner.tasks.pop_front() {
if idle {
inner.idle -= 1;
idle = false;
}
drop(inner);
task();
} else if inner.exit {
break;
} else {
if !idle {
inner.idle += 1;
idle = true;
}
drop(inner);
thread::park();
}
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
let mut inner = self.inner.lock().unwrap();
inner.exit = true;
for worker in &inner.workers {
worker.unpark();
}
}
}
pub fn default() -> &'static ThreadPool {
static INSTANCE: LazyLock<ThreadPool> = LazyLock::new(|| {
let parallelism = thread::available_parallelism().unwrap_or_else(|err| {
tracing::warn!("available parallelism is unknown: {err}");
NonZero::new(1).unwrap()
});
ThreadPool::new(parallelism)
});
&INSTANCE
}