work_pool/
lib.rs

1use std::thread::{self, JoinHandle};
2
3mod work_queue;
4use work_queue::WorkQueue;
5
6#[derive(Clone, Debug)]
7enum Work<T> {
8    Job(T),
9    Quit,
10}
11
12#[derive(Debug)]
13pub struct WorkPool<T> {
14    queue: WorkQueue<Work<T>>,
15    threads: Vec<JoinHandle<()>>,
16}
17
18impl<T: Clone + Send> WorkPool<T> {
19    /// Create a new WorkPool
20    pub fn new(num_threads: usize, buf_len: Option<usize>) -> Result<WorkPool<T>, ()> {
21        let queue = WorkQueue::new(buf_len.unwrap_or(64usize));
22
23        let num_threads = if num_threads == 0 {
24            usize::from(std::thread::available_parallelism().unwrap())
25        } else {
26            num_threads
27        };
28
29        let threads = Vec::with_capacity(num_threads);
30        Ok(WorkPool {
31            queue,
32            threads,
33        })
34    }
35
36    /// Send a job to the pool
37    pub fn dispatch(&mut self, work: T) {
38        self.queue.dispatch(Work::Job(work));
39    }
40
41    /// Send a list of jobs to the pool
42    pub fn dispatch_many(&mut self, work: Vec<T>) {
43        let work = work.iter()
44            .map(|w| { Work::Job(w.to_owned()) })
45            .collect();
46        self.queue.dispatch_many(work);
47    }
48
49    /// Setup the job executor function and start threads
50    pub fn set_executor_and_start<F>(&mut self, executor: F)
51    where
52        F: FnOnce(T) + Copy + Send + 'static,
53        T: Send + 'static
54    {
55        for _ in 0..self.threads.capacity() {
56            let queue = self.queue.clone();
57            self.threads.push(thread::spawn(move || {
58                // steal work -> Pass to executor
59                // Executor should accept parameter of type T
60                for work in queue {
61                    match work {
62                        Work::Job(w) => executor(w),
63                        Work::Quit => break,
64                    }
65                }
66            }));
67        }
68    }
69
70    /// Send a quit message to all threads and wait for them to join.
71    pub fn close(&mut self) {
72        let mut quits = Vec::with_capacity(self.threads.len());
73        for _ in 0..self.threads.len() {
74            quits.push(Work::Quit);
75        }
76
77        self.queue.dispatch_many(quits);
78
79        for _ in 0..self.threads.len() {
80            let thread = self.threads.pop().unwrap();
81            let _ = thread.join();
82        }
83    }
84}
85
86impl<T> Drop for WorkPool<T> {
87    /// When dropping this struct, threads will be detached
88    fn drop(&mut self) {
89        for t in self.threads.iter_mut() {
90            self.queue.dispatch(Work::Quit);
91            drop(t)
92        }
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn should_create_pool() {
102        let pool: WorkPool<()> = WorkPool::new(8, None).unwrap();
103        assert_eq!(pool.threads.capacity(), 8);
104    }
105}