nix_doc/
threadpool.rs

1//! Thread pool implementation
2use std::sync::mpsc;
3use std::sync::{Arc, Mutex};
4use std::thread;
5use std::thread::JoinHandle;
6
7type Job = Box<dyn FnOnce() + Send + 'static>;
8
9enum Message {
10    /// Workers receiving this should finish work and terminate.
11    EndYourselfMortal,
12    /// Workers receiving this should run this task.
13    DoThis(Job),
14}
15
16/// A basic thread pool implementation.
17pub struct ThreadPool {
18    workers: Vec<Worker>,
19    sender: mpsc::Sender<Message>,
20}
21
22struct Worker {
23    /// Worker ID, useful for debugging.
24    #[allow(dead_code)]
25    id: usize,
26    /// The handle to the thread so this worker can be cleaned up.
27    thread: Option<JoinHandle<()>>,
28}
29
30impl Default for ThreadPool {
31    /// Creates a new ThreadPool with the number of physical+logical threads the computer has.
32    fn default() -> ThreadPool {
33        Self::with_threads(num_cpus::get())
34    }
35}
36
37impl ThreadPool {
38    /// Makes a new ThreadPool with `nthreads` threads.
39    pub fn with_threads(nthreads: usize) -> ThreadPool {
40        assert!(nthreads > 0);
41
42        let (sender, receiver) = mpsc::channel();
43        let receiver = Arc::new(Mutex::new(receiver));
44
45        let mut workers = Vec::new();
46        for tid in 0..nthreads {
47            workers.push(Worker::new(tid, receiver.clone()));
48        }
49        ThreadPool { workers, sender }
50    }
51
52    /// Pushes a closure onto the task queue for the pool.
53    pub fn push<F>(&self, f: F)
54    where
55        F: FnOnce() + Send + 'static,
56    {
57        let job = Box::new(f);
58        self.sender.send(Message::DoThis(job)).unwrap();
59    }
60
61    /// Call when done putting work into the queue.
62    pub fn done(&self) {
63        for _ in &self.workers {
64            self.sender.send(Message::EndYourselfMortal).unwrap();
65        }
66    }
67}
68
69impl Drop for ThreadPool {
70    fn drop(&mut self) {
71        for worker in self.workers.iter_mut() {
72            if let Some(thread) = worker.thread.take() {
73                thread.join().unwrap();
74            }
75        }
76    }
77}
78
79impl Worker {
80    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
81        Worker {
82            id,
83            thread: Some(thread::spawn(move || loop {
84                let message = receiver.lock().unwrap().recv().unwrap();
85
86                match message {
87                    Message::EndYourselfMortal => break,
88                    Message::DoThis(job) => job(),
89                }
90            })),
91        }
92    }
93}