use std::sync::mpsc::Receiver;
type Job = Box<dyn Send + 'static + FnOnce()>;
pub struct ThreadPool {
sender: std::sync::mpsc::Sender<Msg>,
workers: Vec<Worker>,
}
impl ThreadPool {
pub fn new(size: usize) -> Self {
assert_ne!(size, 0, "Cannot create 0-sized thread pool");
let (sender, receiver) = std::sync::mpsc::channel();
let mut workers = Vec::with_capacity(size);
let receiver = Redex::new(receiver);
for _ in 0..size {
workers.push(Worker::new(receiver.clone()));
}
Self { sender, workers }
}
pub fn execute<F: Send + 'static + FnOnce()>(&self, f: F) {
let msg = Msg::Task(Box::new(f));
self.sender.send(msg).unwrap()
}
pub fn join(self) {
for _ in 0..self.workers.len() {
self.sender.send(Msg::Terminate).unwrap();
}
for Worker { thread } in self.workers {
thread.join().unwrap();
}
}
pub fn terminate(&self) {
for worker in self.workers.iter() {
worker.thread.thread().unpark();
}
}
}
struct Worker {
thread: std::thread::JoinHandle<()>,
}
impl Worker {
fn new(receiver: Redex<Receiver<Msg>>) -> Self {
Self {
thread: std::thread::spawn(move || loop {
let msg = receiver.recv().unwrap();
match msg {
Msg::Terminate => break,
Msg::Task(job) => job(),
}
}),
}
}
}
enum Msg {
Terminate,
Task(Job),
}
pub struct Redex<T> {
data: std::sync::Arc<T>,
}
impl<T> Clone for Redex<T> {
fn clone(&self) -> Self {
Self {
data: self.data.clone(),
}
}
}
impl<T> Redex<T> {
pub fn new(data: T) -> Self {
Self {
data: std::sync::Arc::new(data),
}
}
}
impl<T> std::ops::Deref for Redex<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
self.data.as_ref()
}
}
unsafe impl<T> Sync for Redex<T> {}
unsafe impl<T> Send for Redex<T> {}