1use std::thread;
6use std::time::Instant;
7
8use crossbeam::channel::{
9 Sender,
10 Receiver,
11 unbounded
12};
13use uuid::Uuid;
14use std::ops::Sub;
15
16type Job = Box<dyn FnOnce() + Send + 'static>;
17
18enum Message {
19 NewJob(Job),
20 Terminate,
21}
22
23#[allow(dead_code)]
24struct Worker {
25 id: Uuid,
26 thread: Option<thread::JoinHandle<()>>,
27 created: Instant
28}
29
30impl Worker {
31 fn new(receiver: Receiver<Message>) -> Worker {
32 let thread = thread::spawn(move || loop {
33 let message = receiver.recv().unwrap();
34
35 match message {
36 Message::NewJob(job) => {
37 job();
38 }
39 Message::Terminate => {
40 break;
41 }
42 }
43 });
44
45 Worker {
46 id: Uuid::new_v4(),
47 thread: Some(thread),
48 created: Instant::now()
49 }
50 }
51}
52
53pub struct ThreadPool {
55 capacity: usize,
56 workers: Vec<Worker>,
57 sender: Sender<Message>,
58 receiver: Receiver<Message>
59}
60
61impl ThreadPool {
62 pub fn new(capacity: usize) -> ThreadPool {
95 assert!(capacity > 0);
96
97 let (sender, receiver) = unbounded();
99
100 let mut workers = Vec::with_capacity(capacity);
101 for _ in 0..capacity {
102 workers.push(Worker::new(receiver.clone()));
103 }
104
105 ThreadPool { capacity, workers, sender, receiver }
106 }
107
108 pub fn capacity(&self) -> usize {
110 self.capacity
111 }
112
113 pub fn resize(&mut self, capacity: usize) {
122 assert!(capacity > 0);
123
124 let current_capacity = self.capacity() as isize;
125 let delta = current_capacity.sub(capacity as isize);
126
127 if delta.is_positive() {
128 for _ in 0..delta {
130 self.sender.send(Message::Terminate).unwrap();
131 }
132 } else {
133 for _ in 0..delta.abs() {
135 self.workers.push(Worker::new(self.receiver.clone()));
136 }
137 }
138
139 self.capacity = capacity;
140 }
141
142 pub fn execute<F>(&self, f: F)
144 where
145 F: FnOnce() + Send + 'static,
146 {
147 let job = Box::new(f);
148
149 self.sender.send(Message::NewJob(job)).unwrap();
150 }
151}
152
153impl Drop for ThreadPool {
154 fn drop(&mut self) {
155 for _ in &self.workers {
157 self.sender.send(Message::Terminate).unwrap();
158 }
159
160 for worker in &mut self.workers {
162 if let Some(thread) = worker.thread.take() {
163 thread.join().unwrap();
164 }
165 }
166 }
167}
168
169#[cfg(test)]
170mod tests {
171 use super::*;
172
173 use std::sync::Arc;
174 use std::sync::atomic::{
175 Ordering,
176 AtomicBool
177 };
178 use std::time::Duration;
179
180 #[test]
181 #[should_panic]
182 fn invalid_capacity_size() {
183 let _ = ThreadPool::new(0);
184 }
185
186 #[test]
187 fn executes_spsc_job() {
188 let tp = ThreadPool::new(1);
189 let executed = Arc::new(AtomicBool::new(false));
190 {
191 let executed = executed.clone();
192 tp.execute(move || {
193 executed.swap(true, Ordering::SeqCst);
194 });
195 }
196 drop(tp);
197 assert_eq!(executed.load(Ordering::SeqCst), true);
198 }
199
200 #[test]
201 fn executes_spmc_jobs() {
202 let tp = ThreadPool::new(2);
204 let job1_executed = Arc::new(AtomicBool::new(false));
205 let job2_executed = Arc::new(AtomicBool::new(false));
206 {
207 let job1_executed = job1_executed.clone();
208 tp.execute(move || {
209 job1_executed.swap(true, Ordering::SeqCst);
210 });
211 }
212 {
213 let job2_executed = job2_executed.clone();
214 tp.execute(move || {
215 job2_executed.swap(true, Ordering::SeqCst);
216 });
217 }
218 drop(tp);
219 assert_eq!(job1_executed.load(Ordering::SeqCst), true);
220 assert_eq!(job2_executed.load(Ordering::SeqCst), true);
221 }
222
223 #[test]
224 fn thread_pool_capacity_eq_num_of_workers() {
225 let capacity = 2;
226 let tp = ThreadPool::new(capacity);
227 let expected = capacity;
228 assert_eq!(tp.capacity(), expected);
229 assert_eq!(tp.capacity(), tp.workers.len());
230 }
231
232 #[test]
233 fn thread_pool_resize_to_bigger_capacity() {
234 let capacity = 2;
235 let resize_capacity = 4;
236
237 let mut tp = ThreadPool::new(capacity);
238 assert_eq!(tp.capacity(), capacity);
239
240 tp.resize(resize_capacity);
241
242 thread::sleep(Duration::from_millis(5));
243 assert_eq!(tp.capacity(), resize_capacity);
244 }
245
246 #[test]
247 fn thread_pool_resize_to_smaller_capacity() {
248 let capacity = 4;
249 let resize_capacity = 2;
250
251 let mut tp = ThreadPool::new(capacity);
252 assert_eq!(tp.capacity(), capacity);
253
254 tp.resize(resize_capacity);
255
256 thread::sleep(Duration::from_millis(5));
257 assert_eq!(tp.capacity(), resize_capacity);
258 }
259}