job_pool/pool.rs
1use crate::scope::Scope;
2use crate::worker::{Job, Worker, Message};
3use crate::{channel, Counter, PoolConfig, Result};
4use crate::channel::SenderWrapper;
5
6/// Thread Pool
7///
8/// A thread pool coordinates a group of threads to run
9/// taks in parallel.
10///
11/// # Example
12/// ```
13/// use job_pool::ThreadPool;
14///
15/// let pool = ThreadPool::with_size(32).expect("Error creating pool");
16/// pool.execute(|| println!("Hello world!"));
17/// ```
18pub struct ThreadPool {
19 workers: Vec<Worker>,
20 sender: SenderWrapper<Message>,
21 job_count: Counter,
22 max_jobs: Option<u16>,
23}
24
25impl ThreadPool {
26 /// Creates a new `ThreadPool`
27 ///
28 /// # Errors
29 /// If the [PoolConfig] is not valid
30 pub fn new(config: PoolConfig) -> Result<ThreadPool> {
31 config.validate()?;
32
33 let size = config.n_workers as usize;
34 let (sender,receiver) =
35 if let Some(max) = config.incoming_buf_size {
36 channel::sync_channel(max as usize)
37 } else {
38 channel::channel()
39 };
40 let mut workers = Vec::with_capacity(size);
41 for _ in 0..size-1 {
42 let worker = Worker::new(receiver.clone());
43 workers.push(worker);
44 }
45 let worker = Worker::new(receiver);
46 workers.push(worker);
47
48 let global = Counter::new();
49 Ok(ThreadPool {
50 workers,
51 job_count: global,
52 max_jobs: config.max_jobs,
53 sender,
54 })
55 }
56 /// Create a [ThreadPool] with the default [configuration](PoolConfig)
57 #[inline]
58 pub fn with_default_config() -> Self {
59 let conf = PoolConfig::default();
60 Self::new(conf).expect("The default config is valid")
61 }
62 /// Create a [ThreadPool] with a given size
63 #[inline]
64 pub fn with_size(size: u16) -> Result<Self> {
65 let conf = PoolConfig::builder()
66 .n_workers(size)
67 .build();
68 Self::new(conf)
69 }
70
71 /// Returns the number of pending jobs
72 pub fn pending_jobs(&self) -> usize {
73 self.job_count.count() as usize
74 }
75
76 pub(crate) fn execute_inside_scope(&self, job: Box<dyn Job<'static>>, scope_counter: Counter) {
77 self.job_count.inc(self.max_jobs);
78 scope_counter.inc(None);
79
80 let msg = Message::Job {
81 job: Box::new(job),
82 global_counter: self.job_count.clone(),
83 scope_counter: Some(scope_counter),
84 };
85 self.sender.send(msg).unwrap()
86 }
87
88 /// Executes the given job inside this pool.
89 ///
90 /// # Example
91 /// ```
92 /// use job_pool::ThreadPool;
93 ///
94 /// fn heavy_computation(n: u64) -> u64 {
95 /// // ....
96 /// n
97 /// }
98 ///
99 /// let pool = ThreadPool::default();
100 /// pool.execute(|| {
101 /// println!("JOB1: {}", heavy_computation(1));
102 /// });
103 ///
104 /// pool.execute(|| {
105 /// println!("JOB2: {}", heavy_computation(2));
106 /// });
107 /// ```
108 pub fn execute(&self, job: impl Job<'static>) {
109 self.job_count.inc(self.max_jobs);
110 let msg = Message::Job {
111 job: Box::new(job),
112 global_counter: self.job_count.clone(),
113 scope_counter: None
114 };
115 self.sender.send(msg).unwrap();
116 }
117
118 /// Creates a new [Scope] to spawn jobs.
119 ///
120 /// All the jobs spawned via [Scope::execute], will be joined
121 /// when the scope drops.
122 ///
123 /// # Example
124 /// ```
125 /// use job_pool::ThreadPool;
126 ///
127 /// let pool = ThreadPool::default();
128 ///
129 /// let msg = String::from("Helloo :)");
130 /// pool.scope(|scope| {
131 /// scope.execute(|| {
132 /// println!("I'm job1, borrowing {msg:?}");
133 /// });
134 /// scope.execute(|| {
135 /// println!("I'm job2, borrowing {msg:?}");
136 /// });
137 /// });
138 ///
139 /// // At this point, all the jobs spawned inside the scope above
140 /// // are done. That's wy it is ok to borrow msg, because we make
141 /// // sure that the jobs don't outlive the scope's lifetime.
142 /// ```
143 pub fn scope<'scope, 'pool, F, R>(&'pool self, f: F) -> R
144 where
145 F: FnOnce(&Scope<'scope, 'pool>) -> R,
146 'pool: 'scope
147 {
148 let scope = Scope::new(self);
149 f(&scope)
150 }
151
152 /// Waits for all the jobs in the pool to finish
153 pub fn join(&self) {
154 self.job_count.join();
155 }
156}
157
158impl Drop for ThreadPool {
159 fn drop(&mut self) {
160 for _ in 0..self.workers.len() {
161 self.sender.send(Message::Shutdown).unwrap();
162 }
163
164 for worker in &mut self.workers {
165 worker.shutdown();
166 }
167 }
168}
169
170impl Default for ThreadPool {
171 fn default() -> Self {
172 Self::with_default_config()
173 }
174}