swimming_pool/
threadpool.rs

1use std::sync::mpsc;
2use std::sync::{Arc, Mutex};
3
4use crate::worker::Worker;
5
6pub(crate) type Job = Box<dyn FnOnce() + Send + 'static>;
7
8/// Thread pool struct.
9///
10/// # Panics
11/// Panics when the generic parameter (POOL_SIZE)
12/// is zero.
13///
14/// # Example
15/// ```rust
16/// use std::thread;
17/// use std::time;
18///
19/// use swimming_pool::ThreadPool;
20///
21/// fn main() {
22///     // Initialize a thread pool with 5 threads
23///     let pool = ThreadPool::new(5);
24///
25///     pool.execute(|| {
26///         // Send job to the pool
27///         thread::sleep(time::Duration::from_secs(5));
28///     });
29///
30///     pool.join_all();
31/// }
32/// ```
33pub struct ThreadPool {
34    workers: Vec<Worker>,
35    tx: Option<mpsc::Sender<Job>>,
36}
37
38impl ThreadPool {
39    /// Initialize a new thread pool
40    ///
41    /// # Panics
42    /// Panics when the generic parameter (POOL_SIZE)
43    /// is zero.
44    pub fn new(size: usize) -> Self {
45        assert!(size != 0, "Minimum of one thread is required in the pool.");
46
47        let (tx, rx) = mpsc::channel();
48        let rx = Arc::new(Mutex::new(rx));
49
50        let mut workers = Vec::with_capacity(size);
51
52        for _ in 0..size {
53            workers.push(Worker::new(Arc::clone(&rx)));
54        }
55
56        Self {
57            workers,
58            tx: Some(tx),
59        }
60    }
61
62    /// Send a job to the pool.
63    /// This job will be picked and executed
64    /// by any free woker thread.
65    ///
66    /// # Example
67    /// ```rust
68    /// use threadpool::Threadpool;
69    ///
70    /// let pool = ThreadPool::new(10);
71    /// pool.execute(|| {
72    ///     // Send job to the pool
73    /// });
74    ///```
75    pub fn execute<F: FnOnce() + Send + 'static>(&self, f: F) {
76        self.tx
77            .as_ref()
78            .unwrap()
79            .send(Box::new(f))
80            .expect("Failed to execute the function");
81    }
82
83    /// Join all the worker threads (wait for
84    /// them to finish executing their job).
85    pub fn join_all(mut self) {
86        drop(self.tx.take().unwrap());
87        self.workers.into_iter().for_each(|worker| worker.join());
88    }
89
90    /// Returns the number of worker threads in the
91    /// thread pool.
92    pub fn get_pool_size(&self) -> usize {
93        self.workers.len()
94    }
95
96    /// Returns the number of woker threads
97    /// which are currently executing a job.
98    pub fn get_working_threads(&self) -> usize {
99        self.workers
100            .iter()
101            .filter(|worker| worker.is_working())
102            .count()
103    }
104}