rust_web_server/thread_pool/
mod.rs1#[cfg(test)]
2mod tests;
3
4use std::sync::atomic::Ordering;
5use std::sync::{Arc, mpsc, Mutex};
6use std::thread;
7
8use crate::metrics;
9
10pub struct ThreadPool {
11 _workers: Vec<Worker>,
12 sender: mpsc::Sender<Job>,
13}
14
15type Job = Box<dyn FnOnce() + Send + 'static>;
16
17impl ThreadPool {
18 pub fn new(size: usize) -> ThreadPool {
19 assert!(size > 0);
20
21 let (sender, receiver) = mpsc::channel();
22
23 let receiver = Arc::new(Mutex::new(receiver));
24
25 let mut workers = Vec::with_capacity(size);
26 for id in 0..size {
27 workers.push(Worker::new(id, Arc::clone(&receiver)));
28 }
29
30 ThreadPool {
31 _workers: workers,
32 sender,
33 }
34 }
35
36 pub fn execute<F>(&self, f: F)
37 where
38 F: FnOnce() + Send + 'static,
39 {
40 metrics::THREAD_POOL_QUEUED.fetch_add(1, Ordering::Relaxed);
41 let job = Box::new(move || {
42 metrics::THREAD_POOL_QUEUED.fetch_sub(1, Ordering::Relaxed);
43 f();
44 });
45 let boxed_send = self.sender.send(job);
46 if boxed_send.is_err() {
47 eprintln!("unable to send job: {}", boxed_send.err().unwrap());
48 } else {
49 boxed_send.unwrap()
50 }
51
52 }
53
54 pub fn join(mut self) {
57 drop(self.sender);
58 for worker in self._workers.drain(..) {
59 if let Err(e) = worker._thread.join() {
60 eprintln!("worker thread panicked: {:?}", e);
61 }
62 }
63 }
64}
65
66struct Worker {
67 _id: usize,
68 _thread: thread::JoinHandle<()>,
69}
70
71impl Worker {
72 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
73 let builder = thread::Builder::new().name(format!("{}", id));
74
75 let boxed_thread = builder.spawn(move || loop {
76
77 let boxed_lock = receiver.lock();
78 if boxed_lock.is_err() {
79 eprintln!("Worker {} -> unable to acquire lock {}", id, boxed_lock.err().unwrap());
80 } else {
81 let boxed_job = boxed_lock.unwrap().recv();
82 match boxed_job {
83 Ok(job) => job(),
84 Err(_) => break,
85 }
86 }
87
88 });
89
90 if boxed_thread.is_err() {
91 eprintln!("Failed while creating a thread id: {} error: {}", id, boxed_thread.as_ref().err().unwrap());
92 }
93
94 Worker { _id: id, _thread: boxed_thread.unwrap() }
95 }
96}