use std::thread::{self, JoinHandle};
mod work_queue;
use work_queue::WorkQueue;
#[derive(Clone, Debug)]
enum Work<T> {
Job(T),
Quit,
}
#[derive(Debug)]
pub struct WorkPool<T> {
queue: WorkQueue<Work<T>>,
threads: Vec<JoinHandle<()>>,
}
impl<T: Clone + Send> WorkPool<T> {
pub fn new(num_threads: usize, buf_len: Option<usize>) -> Result<WorkPool<T>, ()> {
let queue = WorkQueue::new(buf_len.unwrap_or(64usize));
let num_threads = if num_threads == 0 {
usize::from(std::thread::available_parallelism().unwrap())
} else {
num_threads
};
let threads = Vec::with_capacity(num_threads);
Ok(WorkPool {
queue,
threads,
})
}
pub fn dispatch(&mut self, work: T) {
self.queue.dispatch(Work::Job(work));
}
pub fn dispatch_many(&mut self, work: Vec<T>) {
let work = work.iter()
.map(|w| { Work::Job(w.to_owned()) })
.collect();
self.queue.dispatch_many(work);
}
pub fn set_executor_and_start<F>(&mut self, executor: F)
where
F: FnOnce(T) + Copy + Send + 'static,
T: Send + 'static
{
for _ in 0..self.threads.capacity() {
let queue = self.queue.clone();
self.threads.push(thread::spawn(move || {
for work in queue {
match work {
Work::Job(w) => executor(w),
Work::Quit => break,
}
}
}));
}
}
pub fn close(&mut self) {
let mut quits = Vec::with_capacity(self.threads.len());
for _ in 0..self.threads.len() {
quits.push(Work::Quit);
}
self.queue.dispatch_many(quits);
for _ in 0..self.threads.len() {
let thread = self.threads.pop().unwrap();
let _ = thread.join();
}
}
}
impl<T> Drop for WorkPool<T> {
fn drop(&mut self) {
for t in self.threads.iter_mut() {
self.queue.dispatch(Work::Quit);
drop(t)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_create_pool() {
let pool: WorkPool<()> = WorkPool::new(8, None).unwrap();
assert_eq!(pool.threads.capacity(), 8);
}
}