surff/lib.rs
1use std::thread;
2use std::sync::{mpsc, Arc, Mutex};
3
4/* The std lib provides thread::spawn
5that expects to get some code the thread should run as soon as the thread is created.
6It doesn't provide a way to create the threads and have them wait for code sent later.
7
8=> Worker data structure between the ThreadPool and the threads:
91. Define a Worker struct that holds an id and a JoinHandle<()>.
102. ThreadPool to hold a vector of Worker instances.
113. Define a Worker::new that takes an id number and returns a Worker instance
12that holds the id and a thread spawned with an empty closure.
134. In ThreadPool::new, use the for loop counter to generate an id,
14create a new Worker with that id, and store the worker in the vector.
15
16# Sending requests to threads via channels:
171. The ThreadPool creates a channel and hold on to the sending side of the channel.
182. Each Worker holds on to the receiving side of the channel.
193. A Job type alias for a trait object
20that holds the type of closure that .execute receives.
214. The .execute method sends the job it wants to execute
22down the sending side of the channel.
235. In its thread, the Worker loops over its receiving side of the channel
24and executes the closures of any jobs received. */
25
26pub struct ThreadPool {
27 workers: Vec<Worker>,
28 sender: mpsc::Sender<Message>,
29}
30
31struct Worker {
32 id: usize,
33 thread: Option<thread::JoinHandle<()>>,
34}
35
36// Make threads listen for either a Job to run or a signal to stop listening.
37enum Message {
38 NewJob(Job),
39 Terminate,
40}
41
42// Job type alias is a Box of anything that implements the FnBox trait, etc
43// => it's a trait object:
44type Job = Box<dyn FnBox + Send + 'static>;
45
46// # Trick to take ownership of the value inside a Box<T> using self: Box<Self>.
47trait FnBox {
48 fn call_box(self: Box<Self>);
49}
50
51// Implement the FnBox trait for any type F that implements the FnOnce() trait.
52impl<F: FnOnce()> FnBox for F {
53 fn call_box(self: Box<F>) {
54 (*self)()
55 // to take ownership of self and move the value out of the Box.
56 // any FnOnce() closure can use .call_box() to move the closure out of the Box
57 // and call the closure.
58 }
59}
60
61impl ThreadPool {
62 // # Create a new ThreadPool!
63 pub fn new (size: usize) -> ThreadPool {
64 // size is the number of the threads in the pool.
65
66 assert!(size > 0);
67 // panics if the size is zero.
68
69 let (sender, receiver) = mpsc::channel();
70
71 let receiver = Arc::new(Mutex::new(receiver));
72
73 let mut workers = Vec::with_capacity(size);
74
75 for id in 0..size {
76 workers.push(Worker::new(id, Arc::clone(&receiver)));
77 // the workers can share ownership of the receiving end.
78 }
79
80 ThreadPool {
81 workers,
82 sender,
83 }
84 }
85
86 // Using the thread::spawn impl as a reference:
87 pub fn execute<F>(&self, f: F)
88 where
89 F: FnOnce() + Send + 'static
90 // () after FnOnce because the closure takes no parameters.
91 {
92 let job = Box::new(f);
93
94 self.sender.send(Message::NewJob(job)).unwrap();
95 }
96}
97
98impl Worker {
99 fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
100 // Mutex<T> ensures that only one Worker thread at a time is trying to request a job.
101
102 let thread = thread::spawn(move|| {
103 // closure loops forever,
104 // asking the receiving end of the channel for a job and running it.
105 loop {
106 let message = receiver.lock().unwrap().recv().unwrap();
107 // .lock() on the receiver to acquire the mutex.
108 // (can fail if the mutex is in a poisoned state
109 // (other thread panicked while holding the lock).
110 // .recv() to receive a Job from the channel
111 // (blocks if there's no job yet, and waits. It can fail if thread
112 // holding the sending side has shut down, idem viceversa).
113
114 match message {
115 Message::NewJob(job) => {
116 println! ("Worker {} got a job; executing.", id);
117 job.call_box();
118 },
119 Message::Terminate => {
120 println!("Worker {} was told to terminate.", id);
121 break;
122 },
123 }
124 }
125 });
126
127 Worker {
128 id: id,
129 thread: Some(thread),
130 }
131 }
132}
133
134// # Graceful shutdown!
135// Impl Drop trait to call .join() on each thread in the pool and clean it up.
136// Threads can finish the requests they're working on before closing.
137
138impl Drop for ThreadPool {
139 fn drop(&mut self) {
140 println!("Sending terminate message to all workers.");
141 for _ in &mut self.workers {
142 self.sender.send(Message::Terminate).unwrap();
143 }
144
145 println!("Shutting down all workers.");
146 for worker in &mut self.workers {
147 println!("Shutting down worker {}", worker.id);
148
149 if let Some(thread) = worker.thread.take() {
150 // .take() on Option to move thread out of worker.
151 thread.join().unwrap();
152 // .join() takes ownership / consumes the thread.
153 }
154 }
155 }
156}