1use std::sync::mpsc;
2use std::sync::mpsc::Receiver;
3use std::sync::Arc;
4use std::sync::Mutex;
5use std::thread;
6
7trait FnBox {
8 fn call(self: Box<Self>);
9}
10
11impl<F: FnOnce()> FnBox for F {
12 fn call(self: Box<Self>) {
13 (*self)()
14 }
15}
16
17type Job = Box<dyn FnBox + Send>;
19
20struct Worker {
21 id: usize,
22 thread: Option<thread::JoinHandle<()>>,
23}
24
25impl Worker {
26 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
27 let thread: thread::JoinHandle<()> = thread::spawn(move || loop {
32 let message: Message = receiver.lock().unwrap().recv().unwrap();
33 match message {
34 Message::NewJob(job) => {
35 println!("Worker {} got a new job..", id);
36 job.call();
37 }
38 Message::Terminate => {
39 println!("Worker {} was told to terminate.", id);
40 break;
41 }
42 }
43 });
44 Worker {
45 id,
46 thread: Some(thread),
47 }
48 }
49}
50
51enum Message {
52 NewJob(Job),
53 Terminate,
54}
55
56pub struct ThreadPool {
57 workers: Vec<Worker>,
59 sender: mpsc::Sender<Message>,
60}
61
62impl ThreadPool {
63 pub fn new(size: usize) -> ThreadPool {
71 assert!(size > 0);
72 let (sender, receiver) = mpsc::channel();
73 let receiver: Arc<Mutex<Receiver<Message>>> = Arc::new(Mutex::new(receiver));
74 let mut workers = Vec::with_capacity(size);
75 for i in 0..size {
76 workers.push(Worker::new(i, Arc::clone(&receiver)));
78 }
79 ThreadPool { workers, sender }
80 }
81
82 pub fn execute<F>(&self, f: F)
83 where
84 F: FnOnce() + Send + 'static,
85 {
86 let job: Box<F> = Box::new(f);
87 self.sender.send(Message::NewJob(job)).unwrap();
88 }
89}
90
91impl Drop for ThreadPool {
92 fn drop(&mut self) {
93 println!("Sending terminate message to all workers.");
94
95 for _ in &mut self.workers {
96 self.sender.send(Message::Terminate).unwrap();
97 }
98 println!("Shutting down all workers");
99
100 for worker in &mut self.workers {
101 println!("Shutting down work {}", worker.id);
102 if let Some(thread) = worker.thread.take() {
103 thread.join().unwrap();
104 }
105 }
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 #[test]
112 fn test_eq() {
113 assert_eq!(1, 1);
114 }
115}