1use std::sync::mpsc;
3use std::sync::{Arc, Mutex};
4use std::thread;
5use std::thread::JoinHandle;
6
7type Job = Box<dyn FnOnce() + Send + 'static>;
8
9enum Message {
10 EndYourselfMortal,
12 DoThis(Job),
14}
15
16pub struct ThreadPool {
18 workers: Vec<Worker>,
19 sender: mpsc::Sender<Message>,
20}
21
22struct Worker {
23 #[allow(dead_code)]
25 id: usize,
26 thread: Option<JoinHandle<()>>,
28}
29
30impl Default for ThreadPool {
31 fn default() -> ThreadPool {
33 Self::with_threads(num_cpus::get())
34 }
35}
36
37impl ThreadPool {
38 pub fn with_threads(nthreads: usize) -> ThreadPool {
40 assert!(nthreads > 0);
41
42 let (sender, receiver) = mpsc::channel();
43 let receiver = Arc::new(Mutex::new(receiver));
44
45 let mut workers = Vec::new();
46 for tid in 0..nthreads {
47 workers.push(Worker::new(tid, receiver.clone()));
48 }
49 ThreadPool { workers, sender }
50 }
51
52 pub fn push<F>(&self, f: F)
54 where
55 F: FnOnce() + Send + 'static,
56 {
57 let job = Box::new(f);
58 self.sender.send(Message::DoThis(job)).unwrap();
59 }
60
61 pub fn done(&self) {
63 for _ in &self.workers {
64 self.sender.send(Message::EndYourselfMortal).unwrap();
65 }
66 }
67}
68
69impl Drop for ThreadPool {
70 fn drop(&mut self) {
71 for worker in self.workers.iter_mut() {
72 if let Some(thread) = worker.thread.take() {
73 thread.join().unwrap();
74 }
75 }
76 }
77}
78
79impl Worker {
80 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
81 Worker {
82 id,
83 thread: Some(thread::spawn(move || loop {
84 let message = receiver.lock().unwrap().recv().unwrap();
85
86 match message {
87 Message::EndYourselfMortal => break,
88 Message::DoThis(job) => job(),
89 }
90 })),
91 }
92 }
93}