1use std::{
2 sync::{
3 mpsc::{self, Receiver, Sender},
4 Arc, Mutex,
5 },
6 thread,
7};
8
9pub struct ThreadPool {
10 workers: Vec<Worker>,
11 sender: Sender<Message>,
12}
13
14type Job = Box<dyn FnOnce() + Send + 'static>;
15enum Message {
16 NewJob(Job),
17 Terminate,
18}
19
20impl ThreadPool {
21 pub fn new(size: usize) -> ThreadPool {
22 assert!(size > 0);
23 let mut workers = Vec::with_capacity(size);
24 let (sender, receiver) = mpsc::channel();
25 let receiver = Arc::new(Mutex::new(receiver));
26 for id in 0..size {
27 workers.push(Worker::new(id, Arc::clone(&receiver)))
28 }
29 ThreadPool { workers, sender }
30 }
31 pub fn execute<F>(&self, f: F)
32 where
33 F: FnOnce() + Send + 'static,
34 {
35 let job = Box::new(f);
36 self.sender.send(Message::NewJob(job)).unwrap();
37 }
38}
39
40impl Drop for ThreadPool {
41 fn drop(&mut self) {
42 println!("Sending terminate message to all workers.");
43
44 for _ in &mut self.workers {
45 self.sender.send(Message::Terminate).unwrap();
46 }
47 for worker in &mut self.workers {
48 println!("Shutting down worker {}", worker.id);
49 if let Some(thread) = worker.thread.take() {
50 thread.join().unwrap();
51 }
52 }
53 }
54}
55
56type WorkerReceiver = Arc<Mutex<Receiver<Message>>>;
57
58struct Worker {
59 id: usize,
60 thread: Option<thread::JoinHandle<()>>,
61}
62
63impl Worker {
64 fn new(id: usize, receiver: WorkerReceiver) -> Worker {
65 let thread = thread::spawn(move || loop {
66 let message = receiver.lock().unwrap().recv().unwrap();
67 match message {
68 Message::NewJob(job) => {
69 println!("thread {} consume a job", id);
70 job()
71 }
72 Message::Terminate => {
73 println!("Worker {} was told to terminate.", id);
74 break;
75 }
76 }
77 });
78 Worker {
79 id,
80 thread: Some(thread),
81 }
82 }
83}