1use std::thread::{self, JoinHandle};
2
3mod work_queue;
4use work_queue::WorkQueue;
5
6#[derive(Clone, Debug)]
7enum Work<T> {
8 Job(T),
9 Quit,
10}
11
12#[derive(Debug)]
13pub struct WorkPool<T> {
14 queue: WorkQueue<Work<T>>,
15 threads: Vec<JoinHandle<()>>,
16}
17
18impl<T: Clone + Send> WorkPool<T> {
19 pub fn new(num_threads: usize, buf_len: Option<usize>) -> Result<WorkPool<T>, ()> {
21 let queue = WorkQueue::new(buf_len.unwrap_or(64usize));
22
23 let num_threads = if num_threads == 0 {
24 usize::from(std::thread::available_parallelism().unwrap())
25 } else {
26 num_threads
27 };
28
29 let threads = Vec::with_capacity(num_threads);
30 Ok(WorkPool {
31 queue,
32 threads,
33 })
34 }
35
36 pub fn dispatch(&mut self, work: T) {
38 self.queue.dispatch(Work::Job(work));
39 }
40
41 pub fn dispatch_many(&mut self, work: Vec<T>) {
43 let work = work.iter()
44 .map(|w| { Work::Job(w.to_owned()) })
45 .collect();
46 self.queue.dispatch_many(work);
47 }
48
49 pub fn set_executor_and_start<F>(&mut self, executor: F)
51 where
52 F: FnOnce(T) + Copy + Send + 'static,
53 T: Send + 'static
54 {
55 for _ in 0..self.threads.capacity() {
56 let queue = self.queue.clone();
57 self.threads.push(thread::spawn(move || {
58 for work in queue {
61 match work {
62 Work::Job(w) => executor(w),
63 Work::Quit => break,
64 }
65 }
66 }));
67 }
68 }
69
70 pub fn close(&mut self) {
72 let mut quits = Vec::with_capacity(self.threads.len());
73 for _ in 0..self.threads.len() {
74 quits.push(Work::Quit);
75 }
76
77 self.queue.dispatch_many(quits);
78
79 for _ in 0..self.threads.len() {
80 let thread = self.threads.pop().unwrap();
81 let _ = thread.join();
82 }
83 }
84}
85
86impl<T> Drop for WorkPool<T> {
87 fn drop(&mut self) {
89 for t in self.threads.iter_mut() {
90 self.queue.dispatch(Work::Quit);
91 drop(t)
92 }
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn should_create_pool() {
102 let pool: WorkPool<()> = WorkPool::new(8, None).unwrap();
103 assert_eq!(pool.threads.capacity(), 8);
104 }
105}