uvth 1.0.2

Compact and efficient threadpool implementation as an alternative to the threadpool crate.
Documentation
#[macro_use]
extern crate log;

use crossbeam_channel::{unbounded, Receiver, Sender};
use std::sync::Arc;
use std::thread;

type Task = Box<dyn FnOnce() + Send>;

enum Message {
    Task(Task),
    Exit,
}

#[derive(Clone)]
struct MessageQueue {
    sender: Arc<Sender<Message>>,
    receiver: Arc<Receiver<Message>>,
}

impl MessageQueue {
    fn new() -> Self {
        let (tx, rx) = unbounded();
        let (tx, rx) = (Arc::new(tx), Arc::new(rx));
        Self {
            sender: tx,
            receiver: rx,
        }
    }

    fn insert(&self, message: Message) {
        if self.sender.send(message).is_ok() {
            debug!("Successfully inserted message into queue.");
        } else {
            warn!("Failed to insert message into queue.");
        }
    }

    fn remove(&self) -> Option<Message> {
        if let Ok(message) = self.receiver.recv() {
            debug!("Successfully removed message from queue.");
            Some(message)
        } else {
            warn!("Failed to remove message from queue.");
            None
        }
    }
}

struct Worker {
    queue: MessageQueue,
    notify_exit: Arc<Sender<()>>,
    normal_exit: bool,
}

impl Worker {
    fn start(queue: &MessageQueue, notify_exit: &Arc<Sender<()>>) {
        let queue = queue.clone();
        let notify_exit = notify_exit.clone();
        let mut worker = Worker {
            queue,
            notify_exit,
            normal_exit: false,
        };
        thread::spawn(move || {
            worker.do_work();
        });
    }

    fn do_work(&mut self) {
        debug!("Worker thread started.");
        while let Some(message) = self.queue.remove() {
            match message {
                Message::Task(task) => task(),
                Message::Exit => break,
            }
        }
        let _ = self.notify_exit.send(());
        self.normal_exit = true;
        debug!("Worker thread exited.");
    }
}

impl Drop for Worker {
    fn drop(&mut self) {
        if !self.normal_exit {
            warn!("Panic in threadpool. Restarting worker.");
            Worker::start(&self.queue, &self.notify_exit);
        }
    }
}

/// A threadpool. Not much to say here. I will assume you know what a threadpool is.
#[derive(Clone)]
pub struct ThreadPool {
    worker_count: usize,
    queue: MessageQueue,
    notify_exit: Arc<Receiver<()>>,
}

impl ThreadPool {
    /// Create a new threadpool with a set number of threads.
    pub fn new(worker_count: usize) -> Self {
        debug!("Creating threadpool");
        let queue = MessageQueue::new();
        let (notify_exit_tx, notify_exit_rx) = unbounded();
        let (notify_exit_tx, notify_exit_rx) = (Arc::new(notify_exit_tx), Arc::new(notify_exit_rx));

        for _ in 0..worker_count {
            Worker::start(&queue, &notify_exit_tx);
        }

        Self {
            worker_count,
            queue,
            notify_exit: notify_exit_rx,
        }
    }

    /// Execute a task on the pool.
    #[inline]
    pub fn execute<F: 'static + FnOnce() + Send>(&self, f: F) {
        let task = Box::new(f);
        self.queue.insert(Message::Task(task));
    }

    /// Terminate all threads in the pool.
    pub fn terminate(&self) {
        for _ in 0..self.worker_count {
            self.queue.insert(Message::Exit);
        }
    }

    /// Wait for all workers to exit.
    pub fn join(&self) {
        for _ in 0..self.worker_count {
            self.notify_exit.recv().unwrap();
        }
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        debug!("Dropping threadpool.");
        debug!("Terminating threads");
        self.terminate();
        debug!("Waiting for threads to exit.");
        self.join();
    }
}