job_pool/
pool.rs

1use std::sync::{Arc, Condvar, Mutex};
2use crate::worker::{Job, Worker};
3use crate::{channel, PoolConfig, Result, Semaphore};
4use crate::channel::SenderWrapper;
5
6/// Thread Pool
7///
8/// A thread pool coordinates a group of threads to run
9/// taks in parallel.
10///
11/// # Example
12/// ```
13/// use job_pool::ThreadPool;
14///
15/// let pool = ThreadPool::with_size(32).expect("Error creating pool");
16/// pool.execute(|| println!("Hello world!"));
17/// ```
18pub struct ThreadPool {
19    workers: Vec<Worker>,
20    sender: Option<SenderWrapper<Box<dyn Job>>>,
21    semaphore: Semaphore,
22    max_jobs: Option<u16>,
23}
24
25impl ThreadPool {
26    /// Create a new ThreadPool.
27    pub fn new(config: PoolConfig) -> Result<ThreadPool> {
28        let size = config.n_workers as usize;
29        let (sender,receiver) =
30            if let Some(max) = config.incoming_buf_size {
31                channel::sync_channel(max as usize)
32            } else {
33                channel::channel()
34            };
35        let semaphore = Arc::new((Mutex::new(0),Condvar::new()));
36        let mut workers = Vec::with_capacity(size);
37        for _ in 0..size {
38            let worker = Worker::new(receiver.clone(),semaphore.clone());
39            workers.push(worker);
40        }
41        Ok(ThreadPool {
42            workers, semaphore,
43            sender:Some(sender),
44            max_jobs: config.max_jobs
45        })
46    }
47    /// Create a [ThreadPool] with the default [configuration](PoolConfig)
48    #[inline]
49    pub fn with_default_config() -> Result<Self> {
50        let conf = PoolConfig::builder()
51                              .build().map_err(|err| err.to_string())?;
52        Self::new(conf)
53    }
54    /// Create a [ThreadPool] with a given size
55    #[inline]
56    pub fn with_size(size: u16) -> Result<Self> {
57        let conf = PoolConfig::builder()
58                              .n_workers(size)
59                              .build().map_err(|err| err.to_string())?;
60        Self::new(conf)
61    }
62    pub fn execute(&self, job: impl Job) {
63        fn _execute(slf: &ThreadPool, job: Box<dyn Job>) {
64            {
65                let (lock,cvar) = &*slf.semaphore;
66                let mut counter = lock.lock().unwrap();
67                if let Some(max) = slf.max_jobs {
68                    counter = cvar.wait_while(counter, |n| *n >= max).unwrap();
69                }
70                *counter += 1;
71            }
72            slf.sender
73                .as_ref()
74                .unwrap()
75                .send(job)
76                .unwrap();
77        }
78         _execute(self, Box::new(job));
79    }
80    /// Waits for all the jobs in the pool to finish
81    pub fn join(&self) {
82        let (lock,condv) = &*self.semaphore;
83        let counter = lock.lock().unwrap();
84        let _guard = condv.wait_while(counter, |n| *n > 0).unwrap();
85    }
86}
87
88impl Drop for ThreadPool  {
89    fn drop(&mut self) {
90        drop(self.sender.take());
91        self.workers
92            .iter_mut()
93            .for_each(Worker::shutdown);
94    }
95}