my_threadpool/
lib.rs

1use std::thread;
2use std::sync::{mpsc, mpsc::Sender, Arc, Mutex};
3
4
5pub type Job = Box<dyn FnOnce() + Send + 'static>;
6
7/// A ThreadPool that runs the given task with the given threads.
8/// If all the threads are occupied then it waits until a new thread is available.
9pub struct ThreadPool {
10    workers: Vec<Worker>,
11    sender: Sender<Message>,
12}
13
14struct Worker {
15    thread: Option<thread::JoinHandle<()>>,
16    id: usize,
17}
18
19enum Message {
20    Terminate,
21    Job(Job)
22}
23
24impl ThreadPool {
25
26    /// Returns a new instce of ThreadPool.
27    /// The ThreadPool::new() method takes an argument which is the number of threads
28    /// the threadpool has.
29    /// 
30    /// # Examples
31    /// 
32    /// ```
33    /// let pool = ThreadPool::new(4);
34    /// ```
35    pub fn new(size: usize) -> Self {
36        let (sender, reciever) = mpsc::channel();
37        let reciever = Arc::new(Mutex::new(reciever));
38
39        let mut workers = Vec::with_capacity(size);
40
41 
42
43        for id in 0..size {
44            let reciever = Arc::clone(&reciever);
45
46            workers.push(Worker { thread: Some(thread::spawn(move || loop {
47                let msg: Message = match reciever.lock().unwrap().recv() {
48
49                    Ok(n) => n,
50
51                    Err(_) => continue,
52
53                };
54
55                match msg {
56                    Message::Job(msg) => {
57                        msg()
58                    },
59                    _ => {
60                        break;
61                    }
62                }
63            })), id: id + 1 });
64        }
65
66        ThreadPool {
67
68            sender,
69
70            workers,
71
72        }
73
74    }
75
76    /// Runs a FnOnce() with the number of threads it has.
77    /// 
78    /// # Examples
79    /// 
80    /// ```
81    /// let pool = ThreadPool::new();
82    /// ```
83    pub fn execute<F: FnOnce() + Send + 'static>(&self, job: F) {
84        self.sender.send(Message::Job(Box::new(job))).unwrap();
85    }
86
87    pub fn shutdown(&mut self) {
88        for _ in &self.workers {
89            self.sender.send(Message::Terminate).unwrap();
90        }
91
92        println!("ThreadPool 1.0.1: Shutting down all threads.");
93
94        for worker in &mut self.workers {
95            println!("ThreadPool 1.0.1: Shutting down worker {}", worker.id);
96
97            if let Some(thread) = worker.thread.take() {
98                thread.join().unwrap();
99            }
100        }
101    }
102}
103
104impl Drop for ThreadPool {
105    fn drop(&mut self) {
106        self.shutdown();
107    }
108}