Skip to main content

threaded/
lib.rs

1//! Minimalist Thread Pool in Rust
2//!
3//! Glanceable source code for prototypes seeking brevity with transparency.
4
5use std::thread;
6use std::time::Instant;
7
8use crossbeam::channel::{
9    Sender,
10    Receiver,
11    unbounded
12};
13use uuid::Uuid;
14use std::ops::Sub;
15
16type Job = Box<dyn FnOnce() + Send + 'static>;
17
18enum Message {
19    NewJob(Job),
20    Terminate,
21}
22
23#[allow(dead_code)]
24struct Worker {
25    id: Uuid,
26    thread: Option<thread::JoinHandle<()>>,
27    created: Instant
28}
29
30impl Worker {
31    fn new(receiver: Receiver<Message>) -> Worker {
32        let thread = thread::spawn(move || loop {
33            let message = receiver.recv().unwrap();
34
35            match message {
36                Message::NewJob(job) => {
37                    job();
38                }
39                Message::Terminate => {
40                    break;
41                }
42            }
43        });
44
45        Worker {
46            id: Uuid::new_v4(),
47            thread: Some(thread),
48            created: Instant::now()
49        }
50    }
51}
52
53/// Thread pool of workers awaiting execution orders.
54pub struct ThreadPool {
55    capacity: usize,
56    workers: Vec<Worker>,
57    sender: Sender<Message>,
58    receiver: Receiver<Message>
59}
60
61impl ThreadPool {
62    /// Create a new ThreadPool.
63    ///
64    /// The capacity is the number of desired threads in the pool.
65    ///
66    /// # Panics
67    ///
68    /// The `new` function will panic if the capacity is zero.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use std::sync::Arc;
74    /// use std::sync::atomic::{ Ordering, AtomicBool };
75    /// use threaded::ThreadPool;
76    ///
77    /// let num_workers = 2;
78    /// let tp = ThreadPool::new(num_workers);
79    ///
80    /// assert_eq!(tp.capacity(), num_workers);
81    ///
82    /// let has_executed = Arc::new(AtomicBool::new(false));
83    /// {
84    ///     let has_executed = has_executed.clone();
85    ///     tp.execute(move || {
86    ///         has_executed.swap(true, Ordering::SeqCst);
87    ///     });
88    /// }
89    ///
90    /// drop(tp); // block main thread until execute finishes (uses handle.join())
91    ///
92    /// assert_eq!(has_executed.load(Ordering::SeqCst), true);
93    /// ```
94    pub fn new(capacity: usize) -> ThreadPool {
95        assert!(capacity > 0);
96
97        // create crossbeam crate channel of unbounded capacity
98        let (sender, receiver) = unbounded();
99
100        let mut workers = Vec::with_capacity(capacity);
101        for _ in 0..capacity {
102            workers.push(Worker::new(receiver.clone()));
103        }
104
105        ThreadPool { capacity, workers, sender, receiver }
106    }
107
108    /// Capacity of thread pool (number of workers).
109    pub fn capacity(&self) -> usize {
110        self.capacity
111    }
112
113    /// Resize thread pool to new capacity
114    ///
115    /// # Panics
116    ///
117    /// The `resize` function will panic if the capacity is zero.
118    ///
119    /// Caution:
120    ///     Dead workers aren't removed from pool wasting memory but is fault tolerant.
121    pub fn resize(&mut self, capacity: usize) {
122        assert!(capacity > 0);
123
124        let current_capacity = self.capacity() as isize;
125        let delta = current_capacity.sub(capacity as isize);
126
127        if delta.is_positive() {
128            // reduce size
129            for _ in 0..delta {
130                self.sender.send(Message::Terminate).unwrap();
131            }
132        } else {
133            // increase size
134            for _ in 0..delta.abs() {
135                self.workers.push(Worker::new(self.receiver.clone()));
136            }
137        }
138
139        self.capacity = capacity;
140    }
141
142    /// Execute function/closure using worker from thread pool.
143    pub fn execute<F>(&self, f: F)
144        where
145            F: FnOnce() + Send + 'static,
146    {
147        let job = Box::new(f);
148
149        self.sender.send(Message::NewJob(job)).unwrap();
150    }
151}
152
153impl Drop for ThreadPool {
154    fn drop(&mut self) {
155        // sending terminate to all workers
156        for _ in &self.workers {
157            self.sender.send(Message::Terminate).unwrap();
158        }
159
160        // joining worker threads
161        for worker in &mut self.workers {
162            if let Some(thread) = worker.thread.take() {
163                thread.join().unwrap();
164            }
165        }
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172
173    use std::sync::Arc;
174    use std::sync::atomic::{
175        Ordering,
176        AtomicBool
177    };
178    use std::time::Duration;
179
180    #[test]
181    #[should_panic]
182    fn invalid_capacity_size() {
183        let _ = ThreadPool::new(0);
184    }
185
186    #[test]
187    fn executes_spsc_job() {
188        let tp = ThreadPool::new(1);
189        let executed = Arc::new(AtomicBool::new(false));
190        {
191            let executed = executed.clone();
192            tp.execute(move || {
193                executed.swap(true, Ordering::SeqCst);
194            });
195        }
196        drop(tp);
197        assert_eq!(executed.load(Ordering::SeqCst), true);
198    }
199
200    #[test]
201    fn executes_spmc_jobs() {
202        // fixme - verify jobs run in parallel (worker id, overlap, etc)
203        let tp = ThreadPool::new(2);
204        let job1_executed = Arc::new(AtomicBool::new(false));
205        let job2_executed = Arc::new(AtomicBool::new(false));
206        {
207            let job1_executed = job1_executed.clone();
208            tp.execute(move || {
209                job1_executed.swap(true, Ordering::SeqCst);
210            });
211        }
212        {
213            let job2_executed = job2_executed.clone();
214            tp.execute(move || {
215                job2_executed.swap(true, Ordering::SeqCst);
216            });
217        }
218        drop(tp);
219        assert_eq!(job1_executed.load(Ordering::SeqCst), true);
220        assert_eq!(job2_executed.load(Ordering::SeqCst), true);
221    }
222
223    #[test]
224    fn thread_pool_capacity_eq_num_of_workers() {
225        let capacity = 2;
226        let tp = ThreadPool::new(capacity);
227        let expected = capacity;
228        assert_eq!(tp.capacity(), expected);
229        assert_eq!(tp.capacity(), tp.workers.len());
230    }
231
232    #[test]
233    fn thread_pool_resize_to_bigger_capacity() {
234        let capacity = 2;
235        let resize_capacity = 4;
236        
237        let mut tp = ThreadPool::new(capacity);
238        assert_eq!(tp.capacity(), capacity);
239
240        tp.resize(resize_capacity);
241
242        thread::sleep(Duration::from_millis(5));
243        assert_eq!(tp.capacity(), resize_capacity);
244    }
245
246    #[test]
247    fn thread_pool_resize_to_smaller_capacity() {
248        let capacity = 4;
249        let resize_capacity = 2;
250
251        let mut tp = ThreadPool::new(capacity);
252        assert_eq!(tp.capacity(), capacity);
253
254        tp.resize(resize_capacity);
255
256        thread::sleep(Duration::from_millis(5));
257        assert_eq!(tp.capacity(), resize_capacity);
258    }
259}