1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
mod worker;
use std::sync::{
mpsc::{channel, Sender},
Arc, Mutex,
};
use crate::utils::optimal_number_of_threads;
use self::worker::{Job, Task, Worker};
pub struct ThreadPool {
_workers: Vec<Worker>,
sender: Sender<Task>,
capacity: u16,
}
impl ThreadPool {
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(capacity: u16) -> Self {
let mut _workers = Vec::with_capacity(capacity as usize);
let (sender, receiver) = channel();
let receiver = Arc::new(Mutex::new(receiver));
for id in 0..capacity {
_workers.push(Worker::new(id, receiver.clone()));
}
Self {
_workers,
sender,
capacity,
}
}
pub fn capacity(&self) -> u16 {
self.capacity
}
pub fn execute<T>(&self, f: T)
where
T: Job,
{
self.sender.send(Task::New(Box::new(f))).unwrap()
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for _ in 0..self.capacity() {
self.sender.send(Task::Exit).unwrap()
}
}
}
impl Default for ThreadPool {
fn default() -> Self {
Self::with_capacity(optimal_number_of_threads(u16::MAX))
}
}
#[cfg(test)]
mod test {
use super::ThreadPool;
#[test]
fn graceful_shutdown() {
let thread_pool = ThreadPool::with_capacity(4);
for i in 0..thread_pool.capacity() {
thread_pool.execute(move || assert_eq!(i, i));
}
}
#[test]
fn loops() {
let thread_pool = ThreadPool::new();
let mut i = 0;
let work = move || loop {
if i >= 1e6 as u32 {
break;
}
i += 1;
};
for _ in 0..thread_pool.capacity() {
thread_pool.execute(work)
}
thread_pool.execute(|| assert!(true))
}
}