job_pool/
pool.rs

1use crate::scope::Scope;
2use crate::worker::{Job, Worker, Message};
3use crate::{channel, Counter, PoolConfig, Result};
4use crate::channel::SenderWrapper;
5
6/// Thread Pool
7///
8/// A thread pool coordinates a group of threads to run
9/// taks in parallel.
10///
11/// # Example
12/// ```
13/// use job_pool::ThreadPool;
14///
15/// let pool = ThreadPool::with_size(32).expect("Error creating pool");
16/// pool.execute(|| println!("Hello world!"));
17/// ```
18pub struct ThreadPool {
19    workers: Vec<Worker>,
20    sender: SenderWrapper<Message>,
21    job_count: Counter,
22    max_jobs: Option<u16>,
23}
24
25impl ThreadPool {
26    /// Creates a new `ThreadPool`
27    ///
28    /// # Errors
29    /// If the [PoolConfig] is not valid
30    pub fn new(config: PoolConfig) -> Result<ThreadPool> {
31        config.validate()?;
32
33        let size = config.n_workers as usize;
34        let (sender,receiver) =
35            if let Some(max) = config.incoming_buf_size {
36                channel::sync_channel(max as usize)
37            } else {
38                channel::channel()
39            };
40        let mut workers = Vec::with_capacity(size);
41        for _ in 0..size-1 {
42            let worker = Worker::new(receiver.clone());
43            workers.push(worker);
44        }
45        let worker = Worker::new(receiver);
46        workers.push(worker);
47
48        let global = Counter::new();
49        Ok(ThreadPool {
50            workers,
51            job_count: global,
52            max_jobs: config.max_jobs,
53            sender,
54        })
55    }
56    /// Create a [ThreadPool] with the default [configuration](PoolConfig)
57    #[inline]
58    pub fn with_default_config() -> Self {
59        let conf = PoolConfig::default();
60        Self::new(conf).expect("The default config is valid")
61    }
62    /// Create a [ThreadPool] with a given size
63    #[inline]
64    pub fn with_size(size: u16) -> Result<Self> {
65        let conf = PoolConfig::builder()
66                              .n_workers(size)
67                              .build();
68        Self::new(conf)
69    }
70
71    /// Returns the number of pending jobs
72    pub fn pending_jobs(&self) -> usize {
73        self.job_count.count() as usize
74    }
75
76    pub(crate) fn execute_inside_scope(&self, job: Box<dyn Job<'static>>, scope_counter: Counter) {
77        self.job_count.inc(self.max_jobs);
78        scope_counter.inc(None);
79
80        let msg = Message::Job {
81            job: Box::new(job),
82            global_counter: self.job_count.clone(),
83            scope_counter: Some(scope_counter),
84        };
85        self.sender.send(msg).unwrap()
86    }
87
88    /// Executes the given job inside this pool.
89    ///
90    /// # Example
91    /// ```
92    /// use job_pool::ThreadPool;
93    ///
94    /// fn heavy_computation(n: u64) -> u64 {
95    ///     // ....
96    ///     n
97    /// }
98    ///
99    /// let pool = ThreadPool::default();
100    /// pool.execute(|| {
101    ///     println!("JOB1: {}", heavy_computation(1));
102    /// });
103    ///
104    /// pool.execute(|| {
105    ///     println!("JOB2: {}", heavy_computation(2));
106    /// });
107    /// ```
108    pub fn execute(&self, job: impl Job<'static>) {
109        self.job_count.inc(self.max_jobs);
110        let msg = Message::Job {
111            job: Box::new(job),
112            global_counter: self.job_count.clone(),
113            scope_counter: None
114        };
115        self.sender.send(msg).unwrap();
116    }
117
118    /// Creates a new [Scope] to spawn jobs.
119    ///
120    /// All the jobs spawned via [Scope::execute], will be joined
121    /// when the scope drops.
122    ///
123    /// # Example
124    /// ```
125    /// use job_pool::ThreadPool;
126    ///
127    /// let pool = ThreadPool::default();
128    ///
129    /// let msg = String::from("Helloo :)");
130    /// pool.scope(|scope| {
131    ///     scope.execute(|| {
132    ///         println!("I'm job1, borrowing {msg:?}");
133    ///     });
134    ///     scope.execute(|| {
135    ///         println!("I'm job2, borrowing {msg:?}");
136    ///     });
137    /// });
138    ///
139    /// // At this point, all the jobs spawned inside the scope above
140    /// // are done. That's wy it is ok to borrow msg, because we make
141    /// // sure that the jobs don't outlive the scope's lifetime.
142    /// ```
143    pub fn scope<'scope, 'pool, F, R>(&'pool self, f: F) -> R
144    where
145        F: FnOnce(&Scope<'scope, 'pool>) -> R,
146        'pool: 'scope
147    {
148        let scope = Scope::new(self);
149        f(&scope)
150    }
151
152    /// Waits for all the jobs in the pool to finish
153    pub fn join(&self) {
154        self.job_count.join();
155    }
156}
157
158impl Drop for ThreadPool  {
159    fn drop(&mut self) {
160        for _ in 0..self.workers.len() {
161            self.sender.send(Message::Shutdown).unwrap();
162        }
163
164        for worker in &mut self.workers {
165            worker.shutdown();
166        }
167    }
168}
169
170impl Default for ThreadPool {
171    fn default() -> Self {
172        Self::with_default_config()
173    }
174}