blunders_engine/
threads.rs

1//! Functionality related to multi-threading.
2
3use std::process;
4use std::sync::mpsc::{self, Receiver, Sender};
5use std::sync::{Arc, Mutex};
6use std::thread::{self, JoinHandle};
7
8/// PoisonPill is used to cause the process to abort if there are
9/// any panics in any thread. This may lead to a resource leak,
10/// but also allows us to better handle bugs in threads.
11/// TODO: Remove after squashing bugs.
12pub struct PoisonPill;
13
14impl Drop for PoisonPill {
15    fn drop(&mut self) {
16        if thread::panicking() {
17            process::exit(1);
18        }
19    }
20}
21
22/// Type of function accepted as a runnable job for a Thread.
23type Job = Box<dyn FnOnce() + Send + 'static>;
24
25/// Message passed from ThreadPool to Threads to give jobs or signal termination.
26enum Message {
27    NewJob(Job),
28    Terminate,
29}
30
31/// Long lived Thread type. Each Thread receives commands through a receiver.
32#[derive(Debug)]
33struct Thread {
34    pub id: usize,
35    pub name: String,
36    handle: Option<JoinHandle<()>>,
37}
38
39impl Thread {
40    /// Spawn a new thread
41    fn new(id: usize, receiver: Arc<Mutex<Receiver<Message>>>) -> Self {
42        let runner = move || {
43            // Shutdown process on any panics.
44            let _poison = PoisonPill;
45
46            loop {
47                let recv_result = { receiver.lock().unwrap().recv() };
48
49                match recv_result {
50                    Ok(message) => match message {
51                        Message::NewJob(job) => {
52                            job();
53                        }
54                        Message::Terminate => break,
55                    },
56
57                    // Sender has closed, allow thread graceful exit.
58                    Err(_) => break,
59                }
60            }
61        };
62
63        let name = format!("Thread {}", id);
64        let handle = thread::Builder::new()
65            .name(name.clone())
66            .spawn(runner)
67            .unwrap();
68
69        Self {
70            id,
71            name,
72            handle: Some(handle),
73        }
74    }
75}
76
77impl Drop for Thread {
78    fn drop(&mut self) {
79        let handle_opt = self.handle.take();
80        if let Some(handle) = handle_opt {
81            let _ = handle.join();
82        }
83    }
84}
85
86/// Long-lived thread pool containing n threads for job processing.
87///
88/// Requirements:
89/// ThreadPool needs to know which threads are available at any given time.
90/// A ThreadPool is expected to live for the duration of the engine.
91/// Must be sharable b/t threads.
92/// The ThreadPool manages all threads within it, the threads may not outlive it.
93#[derive(Debug)]
94pub struct ThreadPool {
95    num_threads: usize,
96    threads: Vec<Thread>,
97    sender: Sender<Message>,
98    receiver: Arc<Mutex<Receiver<Message>>>,
99}
100
101impl ThreadPool {
102    /// Create a new ThreadPool with `num_threads` persistent worker threads.
103    pub fn new(num_threads: usize) -> Self {
104        let (sender, receiver) = mpsc::channel::<Message>();
105        let receiver = Arc::new(Mutex::new(receiver));
106
107        let mut threads = Vec::with_capacity(num_threads);
108
109        for id in 0..num_threads {
110            threads.push(Thread::new(id, Arc::clone(&receiver)));
111        }
112
113        Self {
114            num_threads,
115            threads,
116            sender,
117            receiver,
118        }
119    }
120
121    /// Send a runnable job to an available Thread in the ThreadPool to run.
122    pub fn run<J: Into<Job>>(&self, job: J) {
123        self.sender.send(Message::NewJob(job.into())).unwrap()
124    }
125}
126
127impl Drop for ThreadPool {
128    fn drop(&mut self) {
129        // Clear all pending jobs in queue.
130        {
131            let locked_receiver = self.receiver.lock().unwrap();
132            while let Ok(_) = locked_receiver.try_recv() {}
133        }
134
135        // Tell each thread to terminate.
136        for _ in 0..self.num_threads {
137            let _ = self.sender.send(Message::Terminate);
138        }
139    }
140}