nucleus_http/
thread_pool.rs1use std::{
2 thread::{self},
3 sync::{mpsc::{self}, Arc, Mutex}
4 };
5
6 pub struct ThreadPool {
7 workers: Vec<Worker>,
8 sender: Option<mpsc::Sender<Job>>
9 }
10
11 type Job = Box<dyn FnOnce() + Send + 'static>;
12
13 impl ThreadPool {
14 pub fn new(size: usize) -> ThreadPool {
22 assert!(size > 0);
23
24 let (sender, receiver) = mpsc::channel();
25 let receiver = Arc::new(Mutex::new(receiver));
26
27 let mut workers = Vec::with_capacity(size);
28
29 for id in 0..size {
30 workers.push(Worker::new(id, Arc::clone(&receiver)));
31 }
32
33 ThreadPool { workers, sender: Some(sender)}
34 }
35
36 pub fn execute<F>(&self, f: F)
37 where
38 F: FnOnce() + Send + 'static,
39 {
40 let job = Box::new(f);
41
42 self.sender.as_ref().unwrap().send(job).unwrap();
43 }
44 }
45
46 impl Drop for ThreadPool {
47 fn drop(&mut self) {
48 drop(self.sender.take());
49
50 for worker in &mut self.workers {
51 println!("Shutting Down Worker {}", worker.id);
52 if let Some(thread) = worker.thread.take() {
53 thread.join().unwrap();
54 }
55 }
56 }
57 }
58
59 struct Worker {
60 id: usize,
61 thread: Option<thread::JoinHandle<()>>,
62 }
63
64 impl Worker {
65 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
66 let thread = thread::spawn(move || loop {
67 let message = receiver.lock().unwrap().recv();
68
69 match message {
70 Ok(job) => {
71 job();
72 }
73 Err(_) => {
74 println!("Worker {id} disconnected; Shutting Down");
75 break;
76 }
77 }
78 });
79
80 Worker {
81 id,
82 thread: Some(thread)
83 }
84 }
85 }