nucleus_http/
thread_pool.rs

1use std::{
2    thread::{self}, 
3    sync::{mpsc::{self}, Arc, Mutex}
4  };
5  
6  pub struct ThreadPool {
7    workers: Vec<Worker>,
8    sender: Option<mpsc::Sender<Job>>
9  }
10  
11  type Job = Box<dyn FnOnce() + Send + 'static>;
12  
13  impl ThreadPool {
14    /// Create a new ThreadPool
15    /// 
16    /// size is number of threads to allocate to pool
17    /// 
18    /// # Panics
19    /// 
20    /// 'new' function will panic if size is zero
21    pub fn new(size: usize) -> ThreadPool {
22      assert!(size > 0);
23  
24      let (sender, receiver) = mpsc::channel();
25      let receiver = Arc::new(Mutex::new(receiver));
26  
27      let mut workers = Vec::with_capacity(size);
28  
29      for id in 0..size {
30        workers.push(Worker::new(id, Arc::clone(&receiver)));
31      }
32  
33      ThreadPool { workers, sender: Some(sender)}
34    }
35  
36    pub fn execute<F>(&self, f: F)
37    where
38      F: FnOnce() + Send + 'static,
39    {
40      let job = Box::new(f);
41  
42      self.sender.as_ref().unwrap().send(job).unwrap();
43    }
44  }
45  
46  impl Drop for ThreadPool {
47    fn drop(&mut self) {
48      drop(self.sender.take());
49  
50      for worker in &mut self.workers {
51        println!("Shutting Down Worker {}", worker.id);
52        if let Some(thread) = worker.thread.take() {
53          thread.join().unwrap();
54        }
55      }
56    }
57  }
58  
59  struct Worker {
60    id: usize,
61    thread: Option<thread::JoinHandle<()>>,
62  }
63  
64  impl Worker {
65    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
66      let thread = thread::spawn(move || loop {
67        let message = receiver.lock().unwrap().recv();
68  
69        match message {
70          Ok(job) => {
71            job();
72          }
73          Err(_) => {
74            println!("Worker {id} disconnected; Shutting Down");
75            break;
76          }
77        }
78      });
79      
80      Worker {
81         id, 
82         thread: Some(thread) 
83        }
84    }   
85  }