poolio/
lib.rs

1//! poolio is a thread-pool implementation using only channels for concurrency.
2//!
3//! ## Design
4//!
5//! A poolio thread-pool is essentially made up of a 'supervisor'-thread and a specified number of 'worker'-threads.
6//! A worker's only purpose is executing jobs (in the guise of closures) while the supervisor is responsible for anything else, like - most importantly - assigning jobs to workers it gets from outside the pool via the public API.
7//! To this end, the thread-pool is set up in such a way that the supervisor can communicate with each worker seperately but concurrently.
8//! This, in particular, ensures that each worker is equally busy.
9//! A single supervisor-worker-communication is roughly as follows:
10//! 1. worker tells the supervisor its current status
11//! 2. supervisor decides what to tell the worker to do on the basis of the current order-message from outside the pool and the worker-status
12//! 3. supervisor tells the work what to do
13//! 4. worker tries to do what it was told by the supervisor
14//! 5. worker tells the supervisor its current status
15//!
16//! The following graphic illustrates the aformentioned communication-model of a supervisor-thread S and a worker-thread W:
17//!
18//! <pre>
19//!    W
20//!    _
21//!    .
22//!    .
23//!    send-status
24//!    .   O
25//!    .     O
26//!    .       O                 send-message
27//!    .         O                   O
28//!    .           O               O
29//!    recv         recv         O
30//!   * .  O       O  . .      O
31//!  .   .   O   O   .   .   O
32//! .     e    O    m     recv . . | S
33//!  .   .   O   O   .   *
34//!   . .  O       O  . .
35//!    send-status  send-message
36//!
37//! X | . . * : arrow starting at | and ending at * representing the control-flow of thread X
38//! O O O O O : channel
39//! e : execute job
40//! m : manage workers
41//! </pre>
42//!
43//! ## Usage
44//!
45//! To use a poolio-[`ThreadPool`] you simply have to set one up using the [`ThreadPool::new`]-method and task the pool to run jobs using the [`ThreadPool::execute`]-method.
46//!
47//! # Examples
48//!
49//! Setting up a pool to make some server multi-threaded:
50//!
51//! ```
52//! fn handle(req: usize) {
53//!     println!("Handled!")
54//! }
55//!
56//! let server_requests = [1, 2, 3, 4, 5, 6, 7, 8, 9];
57//!
58//! let pool = poolio::ThreadPool::new(3, poolio::PanicSwitch::Kill).unwrap();
59//!
60//! for req in server_requests {
61//!     pool.execute(move || {
62//!         handle(req);
63//!     });
64//! }
65//! ```
66
67mod thread {
68    //! This module is a wrapper for parts of the module [`std::thread`] to deal with ownership issues when joining threads embedded into a larger data structure.
69    //! It lets you spawn threads returning a handle which you can join in the usual way even if the handle is part of a larger data structure.
70
71    use std::thread;
72
73    /// Wraps [`std::thread::JoinHandle<T>`] to set up a thread-counterfeiting heist.
74    pub type JoinHandle = Option<thread::JoinHandle<()>>;
75
76    /// Wraps [`std::thread::spawn`] in a [`Option::Some`].
77    #[inline]
78    pub fn spawn<F>(f: F) -> JoinHandle
79    where
80        F: FnOnce() + Send + 'static,
81    {
82        Some(thread::spawn(f))
83    }
84
85    /// Carries out the thread-counterfeiting heist on the thread embedded at the call site to pass it to [`std::thread::JoinHandle<T>::join`].
86    /// - `thread` is a reference to the handle this function wants to steal.
87    ///
88    /// # Panics
89    ///
90    /// A panic is caused if the `thread` is `None` or if joining the thread fails (which is only the case when the thread has panicked).
91    pub fn join(thread: &mut JoinHandle) {
92        let thread = thread.take();
93
94        match thread {
95            Some(thread) => {
96                if let Err(e) = thread.join() {
97                    panic!("{:?}", e);
98                }
99            }
100            None => panic!("Cannot join: no thread has been provided."),
101        };
102    }
103
104    #[cfg(test)]
105    mod tests {
106        use super::*;
107
108        #[test]
109        fn test_spawn() {
110            assert!(matches!(spawn(|| {}), Some(_)));
111        }
112
113        #[test]
114        fn test_join() {
115            let mut thread = spawn(|| {});
116            join(&mut thread);
117            assert!(matches!(thread, None));
118        }
119
120        #[test]
121        #[should_panic]
122        fn test_join_panic_some() {
123            join(&mut spawn(|| panic!("Oh no!")));
124        }
125
126        #[test]
127        #[should_panic]
128        fn test_join_panic_none() {
129            join(&mut None);
130        }
131    }
132}
133
134use thread::JoinHandle;
135
136use std::fmt;
137use std::panic::UnwindSafe;
138
139use crossbeam::channel::unbounded as channel;
140use crossbeam::channel::Sender;
141
142/// Types the jobs the [`ThreadPool`] can run.
143type Job = Box<dyn FnOnce() + UnwindSafe + Send + 'static>;
144
145/// Defines what the [`ThreadPool`] can be ordered to do.
146enum Message {
147    /// Order the pool to execute a job.
148    NewJob(Job),
149    /// Order the pool to finish its remaining jobs and shut down afterwards.
150    Terminate,
151}
152
153impl fmt::Display for Message {
154    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
155        match *self {
156            Self::NewJob(_) => write!(f, "[NewJob]"),
157            Self::Terminate => write!(f, "[Terminate]"),
158        }
159    }
160}
161
162/// Configures what the [`ThreadPool`] is supposed to do in case of a 'panicking job', that is, a job which panics while running in a thread.
163pub enum PanicSwitch {
164    /// Configure the pool to finish parallely running jobs and then kill the whole process in case of a panicked job.
165    Kill,
166    /// Configure the pool to ignore panicked jobs and just respawn the polluted threads.
167    Respawn,
168}
169
170/// Abstracts the thread-pools.
171pub struct ThreadPool {
172    /// interface to the pool-controlling thread
173    supervisor: Supervisor,
174}
175
176impl ThreadPool {
177    /// Sets up a new pool.
178    /// - `size` is the (non-zero) number of worker-threads in the pool.
179    /// - `mode` is the setting of the panic switch.
180    ///
181    /// # Errors
182    ///
183    /// An error is returned if 0 was passed as `size` (since a pool without worker-threads does not make sense).
184    ///
185    /// # Examples
186    ///
187    /// Setting up a pool with three worker-threads in kill-mode:
188    ///
189    /// ```
190    /// let pool = poolio::ThreadPool::new(3, poolio::PanicSwitch::Kill).unwrap();
191    /// ```
192    pub fn new<'a>(size: usize, mode: PanicSwitch) -> Result<Self, &'a str> {
193        if size == 0 {
194            return Err("Setting up a pool with no workers is not allowed.");
195        };
196
197        let pool = Self {
198            supervisor: Supervisor::new(size, mode),
199        };
200        Ok(pool)
201    }
202
203    /// Runs a job in `self`.
204    /// - `f` is the job to be run and has to be provided as a certain closure.
205    ///
206    /// Note that if `f` panics, the behavior is according to the setting of the [`PanicSwitch`] of `self`.
207    ///
208    /// # Panics
209    ///
210    /// A panic is caused if the pool is unreachable.
211    ///
212    /// # Examples
213    ///
214    /// Setting up a pool and printing two strings concurrently:
215    ///
216    /// ```
217    /// let pool = poolio::ThreadPool::new(2, poolio::PanicSwitch::Kill).unwrap();
218    /// pool.execute(|| println!{"house"});
219    /// pool.execute(|| println!{"cat"});
220    /// ```
221    pub fn execute<F>(&self, f: F)
222    where
223        F: FnOnce() + UnwindSafe + Send + 'static,
224    {
225        let job = Box::new(f);
226
227        self.send(Message::NewJob(job));
228    }
229
230    /// Tries to shut down `self` gracefully.
231    ///
232    /// In particular, one has to assume that all remaining jobs will be finished (modulo panics in [`PanicSwitch::Kill`]-mode).
233    ///
234    /// # Panics
235    ///
236    /// A panic occurs if
237    /// 1. the pool is unreachable.
238    /// 2. joining the threads panics.
239    fn terminate(&mut self) {
240        self.send(Message::Terminate);
241
242        thread::join(&mut self.supervisor.thread);
243    }
244
245    /// Wraps sending a [`Message`] to the pool.
246    ///
247    /// # Panics
248    ///
249    /// A panic is caused if the receiver has already been deallocated.
250    fn send(&self, msg: Message) {
251        let panic_message = format!("Ordering {} failed. Pool is unreachable.", msg);
252
253        self.supervisor.orders_s.send(msg).expect(&panic_message);
254    }
255}
256
257impl Drop for ThreadPool {
258    /// Tries to shut down `self` gracefully.
259    ///
260    /// In particular, one has to assume that all remaining jobs will be finished (modulo panics in [`PanicSwitch::Kill`]-mode).
261    ///
262    /// # Panics
263    ///
264    /// A panic occurs if
265    /// 1. the pool is unreachable
266    /// 2. joining the threads panics.
267    ///
268    /// Remember that a panic while dropping aborts the whole process.
269    fn drop(&mut self) {
270        self.terminate();
271    }
272}
273
274/// [`StaffNumber`]s identify workers.
275type StaffNumber = usize;
276
277/// [`Status`] is what worker with [`StaffNumber`] is currently doing.
278enum Status {
279    /// worker `id` is idle.
280    Idle(StaffNumber),
281    /// worker `id` has a panicked job.
282    Panic(StaffNumber),
283}
284
285impl fmt::Display for Status {
286    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
287        match *self {
288            Self::Idle(_) => write!(f, "[idle]"),
289            Self::Panic(_) => write!(f, "[panic]"),
290        }
291    }
292}
293
294/// [`Supervisor`] abstracts the supervisors.
295struct Supervisor {
296    /// place to put orders
297    orders_s: Sender<Message>,
298    /// handle to join
299    thread: JoinHandle,
300}
301
302impl Supervisor {
303    /// Sets up a supervisor.
304    /// - `number_of_workers` is how many workers are employed.
305    /// - `mode` configures what happens when workers report panicking jobs.
306    ///
307    /// In particular, it spawns a thread and sets up a way to communicate to the thread.
308    /// Moreover, it creates the workers controlled by the just spawned supervisor-thread.
309    fn new(mut number_of_workers: usize, mode: PanicSwitch) -> Self {
310        // this channel is used by the pool to contact the supervisor
311        let (orders_s, orders_r) = channel();
312
313        let thread = thread::spawn(move || {
314            // this channel is used by the workers to contact the supervisor
315            let (statuses_s, statuses_r) = channel();
316
317            // construct `number_of_workers` worker-threads
318            let mut workers = Vec::with_capacity(number_of_workers);
319            for id in 0..number_of_workers {
320                workers.push(Worker::new(id, statuses_s.clone()));
321            }
322
323            // track how many jobs have panicked
324            let mut panicked_jobs = 0;
325
326            // keepin' running to distribute jobs among idle workers
327            'distribute_jobs: while let Message::NewJob(job) = orders_r.recv().unwrap() {
328                'query_status: loop {
329                    match statuses_r.recv().unwrap() {
330                        Status::Idle(id) => {
331                            workers[id]
332                                .instructions_s
333                                .send(Message::NewJob(job))
334                                .unwrap();
335                            break 'query_status;
336                        }
337                        Status::Panic(id) => {
338                            thread::join(&mut workers[id].thread);
339                            match mode {
340                                PanicSwitch::Kill => {
341                                    panicked_jobs += 1;
342                                    number_of_workers -= 1;
343                                    break 'distribute_jobs;
344                                }
345                                PanicSwitch::Respawn => {
346                                    workers[id] = Worker::new(id, statuses_s.clone());
347                                }
348                            };
349                        }
350                    }
351                }
352            }
353
354            // destruct all remaining worker-threads
355            while number_of_workers != 0 {
356                match statuses_r.recv().unwrap() {
357                    Status::Idle(id) => {
358                        workers[id].instructions_s.send(Message::Terminate).unwrap();
359                        thread::join(&mut workers[id].thread);
360                    }
361                    Status::Panic(id) => {
362                        thread::join(&mut workers[id].thread);
363                        if let PanicSwitch::Kill = mode {
364                            panicked_jobs += 1;
365                        };
366                    }
367                };
368                number_of_workers -= 1;
369            }
370
371            if panicked_jobs > 0 {
372                eprintln!("Aborting process: {} panicked jobs.", panicked_jobs);
373                std::process::abort();
374            }
375
376            // ensure that `orders_r` lives as long as the thread to prevent reachability-errors
377            drop(orders_r);
378        });
379
380        Self { orders_s, thread }
381    }
382}
383
384/// [`Worker`] abstracts workers.
385struct Worker {
386    /// place to put instructions
387    instructions_s: Sender<Message>,
388    /// handle to join
389    thread: JoinHandle,
390}
391
392impl Worker {
393    /// Sets up a new worker.
394    /// - `id` is the worker's staff number.
395    /// - `statuses_s` is where the worker puts its current status.
396    ///
397    /// In particular, it spawns a thread and sets up a way to communicate to the thread.
398    fn new(id: StaffNumber, statuses_s: Sender<Status>) -> Self {
399        // this channel is used by the supervisor to contact this worker
400        let (instructions_s, instructions_r) = channel();
401
402        let thread = thread::spawn(move || {
403            // report for duty
404            statuses_s.send(Status::Idle(id)).unwrap();
405
406            // keepin' running to execute jobs
407            loop {
408                let message = instructions_r.recv().unwrap();
409
410                match message {
411                    Message::NewJob(job) => match std::panic::catch_unwind(job) {
412                        Ok(_) => {
413                            statuses_s.send(Status::Idle(id)).unwrap();
414                        }
415                        Err(_) => {
416                            statuses_s.send(Status::Panic(id)).unwrap();
417                            break;
418                        }
419                    },
420                    Message::Terminate => break,
421                }
422            }
423        });
424
425        Self {
426            instructions_s,
427            thread,
428        }
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
436    use std::sync::Arc;
437
438    // settings
439    const SIZE: usize = 2; //= 6; && = 12; && = 36;
440    const MODE: PanicSwitch = PanicSwitch::Respawn; //= PanicSwitch::Kill;
441    const ID: StaffNumber = 0;
442
443    #[test]
444    fn test_threadpool_new_ok() {
445        let pool = ThreadPool::new(SIZE, MODE);
446        assert!(matches!(pool, Ok(_)));
447    }
448
449    #[test]
450    fn test_threadpool_new_err() {
451        let pool = ThreadPool::new(0, MODE);
452        assert!(matches!(pool, Err(_)));
453    }
454
455    #[test]
456    fn test_threadpool_execute() {
457        const N: usize = 5;
458
459        let pool = ThreadPool::new(SIZE, MODE).unwrap();
460
461        let counter = Arc::new(AtomicUsize::new(0));
462
463        let count_to = |n: usize| {
464            for _ in 0..n {
465                let counter = Arc::clone(&counter);
466                pool.execute(move || {
467                    counter.fetch_add(1, Ordering::SeqCst);
468                });
469            }
470        };
471
472        for _ in 0..N {
473            count_to(SIZE);
474            if let PanicSwitch::Respawn = MODE {
475                pool.execute(|| panic!("Oh no!"));
476            }
477        }
478
479        drop(pool);
480
481        assert_eq!(N * SIZE, counter.load(Ordering::SeqCst));
482    }
483
484    #[test]
485    fn test_worker_thread_newjob() {
486        let (statuses_s, statuses_r) = channel();
487        let mut worker = Worker::new(ID, statuses_s);
488
489        assert!(matches!(statuses_r.recv().unwrap(), Status::Idle(ID)));
490
491        let flag = Arc::new(AtomicBool::new(false));
492        let flag_ref = Arc::clone(&flag);
493        let job = Box::new(move || {
494            flag_ref.store(true, Ordering::SeqCst);
495        });
496        worker.instructions_s.send(Message::NewJob(job)).unwrap();
497        assert!(matches!(statuses_r.recv().unwrap(), Status::Idle(ID)));
498        assert!(flag.load(Ordering::SeqCst));
499
500        let job = Box::new(|| panic!("Oh no!"));
501        worker.instructions_s.send(Message::NewJob(job)).unwrap();
502        assert!(matches!(statuses_r.recv().unwrap(), Status::Panic(ID)));
503
504        thread::join(&mut worker.thread);
505    }
506
507    #[test]
508    fn test_worker_thread_terminate() {
509        let (statuses_s, statuses_r) = channel();
510        let mut worker = Worker::new(ID, statuses_s);
511
512        assert!(matches!(statuses_r.recv().unwrap(), Status::Idle(ID)));
513
514        worker.instructions_s.send(Message::Terminate).unwrap();
515
516        thread::join(&mut worker.thread);
517    }
518}