1use std::sync::{Arc, Condvar, Mutex};
2use crate::worker::{Job, Worker};
3use crate::{channel, PoolConfig, Result, Semaphore};
4use crate::channel::SenderWrapper;
5
6pub struct ThreadPool {
19 workers: Vec<Worker>,
20 sender: Option<SenderWrapper<Box<dyn Job>>>,
21 semaphore: Semaphore,
22 max_jobs: Option<u16>,
23}
24
25impl ThreadPool {
26 pub fn new(config: PoolConfig) -> Result<ThreadPool> {
28 let size = config.n_workers as usize;
29 let (sender,receiver) =
30 if let Some(max) = config.incoming_buf_size {
31 channel::sync_channel(max as usize)
32 } else {
33 channel::channel()
34 };
35 let semaphore = Arc::new((Mutex::new(0),Condvar::new()));
36 let mut workers = Vec::with_capacity(size);
37 for _ in 0..size {
38 let worker = Worker::new(receiver.clone(),semaphore.clone());
39 workers.push(worker);
40 }
41 Ok(ThreadPool {
42 workers, semaphore,
43 sender:Some(sender),
44 max_jobs: config.max_jobs
45 })
46 }
47 #[inline]
49 pub fn with_default_config() -> Result<Self> {
50 let conf = PoolConfig::builder()
51 .build().map_err(|err| err.to_string())?;
52 Self::new(conf)
53 }
54 #[inline]
56 pub fn with_size(size: u16) -> Result<Self> {
57 let conf = PoolConfig::builder()
58 .n_workers(size)
59 .build().map_err(|err| err.to_string())?;
60 Self::new(conf)
61 }
62 pub fn execute(&self, job: impl Job) {
63 fn _execute(slf: &ThreadPool, job: Box<dyn Job>) {
64 {
65 let (lock,cvar) = &*slf.semaphore;
66 let mut counter = lock.lock().unwrap();
67 if let Some(max) = slf.max_jobs {
68 counter = cvar.wait_while(counter, |n| *n >= max).unwrap();
69 }
70 *counter += 1;
71 }
72 slf.sender
73 .as_ref()
74 .unwrap()
75 .send(job)
76 .unwrap();
77 }
78 _execute(self, Box::new(job));
79 }
80 pub fn join(&self) {
82 let (lock,condv) = &*self.semaphore;
83 let counter = lock.lock().unwrap();
84 let _guard = condv.wait_while(counter, |n| *n > 0).unwrap();
85 }
86}
87
88impl Drop for ThreadPool {
89 fn drop(&mut self) {
90 drop(self.sender.take());
91 self.workers
92 .iter_mut()
93 .for_each(Worker::shutdown);
94 }
95}