1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
use std::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;
type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
EndYourselfMortal,
DoThis(Job),
}
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}
struct Worker {
#[allow(dead_code)]
id: usize,
thread: Option<JoinHandle<()>>,
}
impl ThreadPool {
pub fn new() -> ThreadPool {
Self::with_threads(num_cpus::get())
}
pub fn with_threads(nthreads: usize) -> ThreadPool {
assert!(nthreads > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::new();
for tid in 0..nthreads {
workers.push(Worker::new(tid, receiver.clone()));
}
ThreadPool { workers, sender }
}
pub fn push<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(Message::DoThis(job)).unwrap();
}
pub fn done(&self) {
for _ in &self.workers {
self.sender.send(Message::EndYourselfMortal).unwrap();
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in self.workers.iter_mut() {
worker.thread.take().map(|thread| thread.join().unwrap());
}
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
Worker {
id,
thread: Some(thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::EndYourselfMortal => break,
Message::DoThis(job) => job(),
}
})),
}
}
}