Skip to main content

rust_web_server/thread_pool/
mod.rs

1#[cfg(test)]
2mod tests;
3
4use std::sync::atomic::Ordering;
5use std::sync::{Arc, mpsc, Mutex};
6use std::thread;
7
8use crate::metrics;
9
10pub struct ThreadPool {
11    _workers: Vec<Worker>,
12    sender: mpsc::Sender<Job>,
13}
14
15type Job = Box<dyn FnOnce() + Send + 'static>;
16
17impl ThreadPool {
18    pub fn new(size: usize) -> ThreadPool {
19        assert!(size > 0);
20
21        let (sender, receiver) = mpsc::channel();
22
23        let receiver = Arc::new(Mutex::new(receiver));
24
25        let mut workers = Vec::with_capacity(size);
26        for id in 0..size {
27            workers.push(Worker::new(id, Arc::clone(&receiver)));
28        }
29
30        ThreadPool {
31            _workers: workers,
32            sender,
33        }
34    }
35
36    pub fn execute<F>(&self, f: F)
37        where
38            F: FnOnce() + Send  + 'static,
39    {
40        metrics::THREAD_POOL_QUEUED.fetch_add(1, Ordering::Relaxed);
41        let job = Box::new(move || {
42            metrics::THREAD_POOL_QUEUED.fetch_sub(1, Ordering::Relaxed);
43            f();
44        });
45        let boxed_send = self.sender.send(job);
46        if boxed_send.is_err() {
47            eprintln!("unable to send job: {}", boxed_send.err().unwrap());
48        } else {
49            boxed_send.unwrap()
50        }
51
52    }
53
54    /// Drain the pool: stop accepting new jobs and wait for all in-flight
55    /// workers to finish. Consumes `self`.
56    pub fn join(mut self) {
57        drop(self.sender);
58        for worker in self._workers.drain(..) {
59            if let Err(e) = worker._thread.join() {
60                eprintln!("worker thread panicked: {:?}", e);
61            }
62        }
63    }
64}
65
66struct Worker {
67    _id: usize,
68    _thread: thread::JoinHandle<()>,
69}
70
71impl Worker {
72    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
73        let builder = thread::Builder::new().name(format!("{}", id));
74
75        let boxed_thread = builder.spawn(move || loop {
76
77            let boxed_lock = receiver.lock();
78            if boxed_lock.is_err() {
79                eprintln!("Worker {} -> unable to acquire lock {}", id, boxed_lock.err().unwrap());
80            } else {
81                let boxed_job = boxed_lock.unwrap().recv();
82                match boxed_job {
83                    Ok(job) => job(),
84                    Err(_) => break,
85                }
86            }
87
88        });
89
90        if boxed_thread.is_err() {
91            eprintln!("Failed while creating a thread id: {} error: {}", id, boxed_thread.as_ref().err().unwrap());
92        }
93
94        Worker { _id: id, _thread: boxed_thread.unwrap() }
95    }
96}