use std::process;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
pub struct PoisonPill;
impl Drop for PoisonPill {
fn drop(&mut self) {
if thread::panicking() {
process::exit(1);
}
}
}
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}
#[derive(Debug)]
struct Thread {
pub id: usize,
pub name: String,
handle: Option<JoinHandle<()>>,
}
impl Thread {
fn new(id: usize, receiver: Arc<Mutex<Receiver<Message>>>) -> Self {
let runner = move || {
let _poison = PoisonPill;
loop {
let recv_result = { receiver.lock().unwrap().recv() };
match recv_result {
Ok(message) => match message {
Message::NewJob(job) => {
job();
}
Message::Terminate => break,
},
Err(_) => break,
}
}
};
let name = format!("Thread {}", id);
let handle = thread::Builder::new()
.name(name.clone())
.spawn(runner)
.unwrap();
Self {
id,
name,
handle: Some(handle),
}
}
}
impl Drop for Thread {
fn drop(&mut self) {
let handle_opt = self.handle.take();
if let Some(handle) = handle_opt {
let _ = handle.join();
}
}
}
#[derive(Debug)]
pub struct ThreadPool {
num_threads: usize,
threads: Vec<Thread>,
sender: Sender<Message>,
receiver: Arc<Mutex<Receiver<Message>>>,
}
impl ThreadPool {
pub fn new(num_threads: usize) -> Self {
let (sender, receiver) = mpsc::channel::<Message>();
let receiver = Arc::new(Mutex::new(receiver));
let mut threads = Vec::with_capacity(num_threads);
for id in 0..num_threads {
threads.push(Thread::new(id, Arc::clone(&receiver)));
}
Self {
num_threads,
threads,
sender,
receiver,
}
}
pub fn run<J: Into<Job>>(&self, job: J) {
self.sender.send(Message::NewJob(job.into())).unwrap()
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
{
let locked_receiver = self.receiver.lock().unwrap();
while let Ok(_) = locked_receiver.try_recv() {}
}
for _ in 0..self.num_threads {
let _ = self.sender.send(Message::Terminate);
}
}
}