server_lib/multithreading/
thread_pool.rs

1use crate::multithreading::message::Message;
2use crate::multithreading::thread_worker::ThreadWorker;
3
4use std::sync::mpsc;
5use std::sync::Arc;
6use std::sync::Mutex;
7
8pub struct ThreadPool {
9    workers: Vec<ThreadWorker>,
10    sender: mpsc::Sender<Message>,
11}
12
13impl ThreadPool {
14    /// Create a new ThreadPool.
15    ///
16    /// The size is the number of threads in the pool.
17    ///
18    /// # Panics
19    ///
20    /// The `new` function will panic if the size is zero.
21    pub fn new(size: usize) -> ThreadPool {
22        assert!(size > 0);
23
24        let (sender, receiver) = mpsc::channel();
25
26        let receiver = Arc::new(Mutex::new(receiver));
27
28        let mut workers = Vec::with_capacity(size);
29
30        for id in 0..size {
31            workers.push(ThreadWorker::new(id, Arc::clone(&receiver)));
32        }
33
34        ThreadPool { workers, sender }
35    }
36    pub fn execute<F>(&self, f: F)
37    where
38        F: FnOnce() + Send + 'static,
39    {
40        let job = Box::new(f);
41
42        self.sender.send(Message::NewJob(job)).unwrap();
43    }
44}
45
46impl Drop for ThreadPool {
47    fn drop(&mut self) {
48        println!("Sending terminate message to all workers.");
49
50        for _ in &self.workers {
51            self.sender.send(Message::Terminate).unwrap();
52        }
53
54        println!("Shutting down all workers.");
55
56        for worker in &mut self.workers {
57            println!("Shutting down worker {}", worker.id());
58
59            if let Some(thread) = worker.thread.take() {
60                thread.join().unwrap();
61            }
62        }
63    }
64}