poolio/lib.rs
1//! poolio is a thread-pool implementation using only channels for concurrency.
2//!
3//! ## Design
4//!
5//! A poolio thread-pool is essentially made up of a 'supervisor'-thread and a specified number of 'worker'-threads.
6//! A worker's only purpose is executing jobs (in the guise of closures) while the supervisor is responsible for anything else, like - most importantly - assigning jobs to workers it gets from outside the pool via the public API.
7//! To this end, the thread-pool is set up in such a way that the supervisor can communicate with each worker seperately but concurrently.
8//! This, in particular, ensures that each worker is equally busy.
9//! A single supervisor-worker-communication is roughly as follows:
10//! 1. worker tells the supervisor its current status
11//! 2. supervisor decides what to tell the worker to do on the basis of the current order-message from outside the pool and the worker-status
12//! 3. supervisor tells the work what to do
13//! 4. worker tries to do what it was told by the supervisor
14//! 5. worker tells the supervisor its current status
15//!
16//! The following graphic illustrates the aformentioned communication-model of a supervisor-thread S and a worker-thread W:
17//!
18//! <pre>
19//! W
20//! _
21//! .
22//! .
23//! send-status
24//! . O
25//! . O
26//! . O send-message
27//! . O O
28//! . O O
29//! recv recv O
30//! * . O O . . O
31//! . . O O . . O
32//! . e O m recv . . | S
33//! . . O O . *
34//! . . O O . .
35//! send-status send-message
36//!
37//! X | . . * : arrow starting at | and ending at * representing the control-flow of thread X
38//! O O O O O : channel
39//! e : execute job
40//! m : manage workers
41//! </pre>
42//!
43//! ## Usage
44//!
45//! To use a poolio-[`ThreadPool`] you simply have to set one up using the [`ThreadPool::new`]-method and task the pool to run jobs using the [`ThreadPool::execute`]-method.
46//!
47//! # Examples
48//!
49//! Setting up a pool to make some server multi-threaded:
50//!
51//! ```
52//! fn handle(req: usize) {
53//! println!("Handled!")
54//! }
55//!
56//! let server_requests = [1, 2, 3, 4, 5, 6, 7, 8, 9];
57//!
58//! let pool = poolio::ThreadPool::new(3, poolio::PanicSwitch::Kill).unwrap();
59//!
60//! for req in server_requests {
61//! pool.execute(move || {
62//! handle(req);
63//! });
64//! }
65//! ```
66
67mod thread {
68 //! This module is a wrapper for parts of the module [`std::thread`] to deal with ownership issues when joining threads embedded into a larger data structure.
69 //! It lets you spawn threads returning a handle which you can join in the usual way even if the handle is part of a larger data structure.
70
71 use std::thread;
72
73 /// Wraps [`std::thread::JoinHandle<T>`] to set up a thread-counterfeiting heist.
74 pub type JoinHandle = Option<thread::JoinHandle<()>>;
75
76 /// Wraps [`std::thread::spawn`] in a [`Option::Some`].
77 #[inline]
78 pub fn spawn<F>(f: F) -> JoinHandle
79 where
80 F: FnOnce() + Send + 'static,
81 {
82 Some(thread::spawn(f))
83 }
84
85 /// Carries out the thread-counterfeiting heist on the thread embedded at the call site to pass it to [`std::thread::JoinHandle<T>::join`].
86 /// - `thread` is a reference to the handle this function wants to steal.
87 ///
88 /// # Panics
89 ///
90 /// A panic is caused if the `thread` is `None` or if joining the thread fails (which is only the case when the thread has panicked).
91 pub fn join(thread: &mut JoinHandle) {
92 let thread = thread.take();
93
94 match thread {
95 Some(thread) => {
96 if let Err(e) = thread.join() {
97 panic!("{:?}", e);
98 }
99 }
100 None => panic!("Cannot join: no thread has been provided."),
101 };
102 }
103
104 #[cfg(test)]
105 mod tests {
106 use super::*;
107
108 #[test]
109 fn test_spawn() {
110 assert!(matches!(spawn(|| {}), Some(_)));
111 }
112
113 #[test]
114 fn test_join() {
115 let mut thread = spawn(|| {});
116 join(&mut thread);
117 assert!(matches!(thread, None));
118 }
119
120 #[test]
121 #[should_panic]
122 fn test_join_panic_some() {
123 join(&mut spawn(|| panic!("Oh no!")));
124 }
125
126 #[test]
127 #[should_panic]
128 fn test_join_panic_none() {
129 join(&mut None);
130 }
131 }
132}
133
134use thread::JoinHandle;
135
136use std::fmt;
137use std::panic::UnwindSafe;
138
139use crossbeam::channel::unbounded as channel;
140use crossbeam::channel::Sender;
141
142/// Types the jobs the [`ThreadPool`] can run.
143type Job = Box<dyn FnOnce() + UnwindSafe + Send + 'static>;
144
145/// Defines what the [`ThreadPool`] can be ordered to do.
146enum Message {
147 /// Order the pool to execute a job.
148 NewJob(Job),
149 /// Order the pool to finish its remaining jobs and shut down afterwards.
150 Terminate,
151}
152
153impl fmt::Display for Message {
154 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
155 match *self {
156 Self::NewJob(_) => write!(f, "[NewJob]"),
157 Self::Terminate => write!(f, "[Terminate]"),
158 }
159 }
160}
161
162/// Configures what the [`ThreadPool`] is supposed to do in case of a 'panicking job', that is, a job which panics while running in a thread.
163pub enum PanicSwitch {
164 /// Configure the pool to finish parallely running jobs and then kill the whole process in case of a panicked job.
165 Kill,
166 /// Configure the pool to ignore panicked jobs and just respawn the polluted threads.
167 Respawn,
168}
169
170/// Abstracts the thread-pools.
171pub struct ThreadPool {
172 /// interface to the pool-controlling thread
173 supervisor: Supervisor,
174}
175
176impl ThreadPool {
177 /// Sets up a new pool.
178 /// - `size` is the (non-zero) number of worker-threads in the pool.
179 /// - `mode` is the setting of the panic switch.
180 ///
181 /// # Errors
182 ///
183 /// An error is returned if 0 was passed as `size` (since a pool without worker-threads does not make sense).
184 ///
185 /// # Examples
186 ///
187 /// Setting up a pool with three worker-threads in kill-mode:
188 ///
189 /// ```
190 /// let pool = poolio::ThreadPool::new(3, poolio::PanicSwitch::Kill).unwrap();
191 /// ```
192 pub fn new<'a>(size: usize, mode: PanicSwitch) -> Result<Self, &'a str> {
193 if size == 0 {
194 return Err("Setting up a pool with no workers is not allowed.");
195 };
196
197 let pool = Self {
198 supervisor: Supervisor::new(size, mode),
199 };
200 Ok(pool)
201 }
202
203 /// Runs a job in `self`.
204 /// - `f` is the job to be run and has to be provided as a certain closure.
205 ///
206 /// Note that if `f` panics, the behavior is according to the setting of the [`PanicSwitch`] of `self`.
207 ///
208 /// # Panics
209 ///
210 /// A panic is caused if the pool is unreachable.
211 ///
212 /// # Examples
213 ///
214 /// Setting up a pool and printing two strings concurrently:
215 ///
216 /// ```
217 /// let pool = poolio::ThreadPool::new(2, poolio::PanicSwitch::Kill).unwrap();
218 /// pool.execute(|| println!{"house"});
219 /// pool.execute(|| println!{"cat"});
220 /// ```
221 pub fn execute<F>(&self, f: F)
222 where
223 F: FnOnce() + UnwindSafe + Send + 'static,
224 {
225 let job = Box::new(f);
226
227 self.send(Message::NewJob(job));
228 }
229
230 /// Tries to shut down `self` gracefully.
231 ///
232 /// In particular, one has to assume that all remaining jobs will be finished (modulo panics in [`PanicSwitch::Kill`]-mode).
233 ///
234 /// # Panics
235 ///
236 /// A panic occurs if
237 /// 1. the pool is unreachable.
238 /// 2. joining the threads panics.
239 fn terminate(&mut self) {
240 self.send(Message::Terminate);
241
242 thread::join(&mut self.supervisor.thread);
243 }
244
245 /// Wraps sending a [`Message`] to the pool.
246 ///
247 /// # Panics
248 ///
249 /// A panic is caused if the receiver has already been deallocated.
250 fn send(&self, msg: Message) {
251 let panic_message = format!("Ordering {} failed. Pool is unreachable.", msg);
252
253 self.supervisor.orders_s.send(msg).expect(&panic_message);
254 }
255}
256
257impl Drop for ThreadPool {
258 /// Tries to shut down `self` gracefully.
259 ///
260 /// In particular, one has to assume that all remaining jobs will be finished (modulo panics in [`PanicSwitch::Kill`]-mode).
261 ///
262 /// # Panics
263 ///
264 /// A panic occurs if
265 /// 1. the pool is unreachable
266 /// 2. joining the threads panics.
267 ///
268 /// Remember that a panic while dropping aborts the whole process.
269 fn drop(&mut self) {
270 self.terminate();
271 }
272}
273
274/// [`StaffNumber`]s identify workers.
275type StaffNumber = usize;
276
277/// [`Status`] is what worker with [`StaffNumber`] is currently doing.
278enum Status {
279 /// worker `id` is idle.
280 Idle(StaffNumber),
281 /// worker `id` has a panicked job.
282 Panic(StaffNumber),
283}
284
285impl fmt::Display for Status {
286 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
287 match *self {
288 Self::Idle(_) => write!(f, "[idle]"),
289 Self::Panic(_) => write!(f, "[panic]"),
290 }
291 }
292}
293
294/// [`Supervisor`] abstracts the supervisors.
295struct Supervisor {
296 /// place to put orders
297 orders_s: Sender<Message>,
298 /// handle to join
299 thread: JoinHandle,
300}
301
302impl Supervisor {
303 /// Sets up a supervisor.
304 /// - `number_of_workers` is how many workers are employed.
305 /// - `mode` configures what happens when workers report panicking jobs.
306 ///
307 /// In particular, it spawns a thread and sets up a way to communicate to the thread.
308 /// Moreover, it creates the workers controlled by the just spawned supervisor-thread.
309 fn new(mut number_of_workers: usize, mode: PanicSwitch) -> Self {
310 // this channel is used by the pool to contact the supervisor
311 let (orders_s, orders_r) = channel();
312
313 let thread = thread::spawn(move || {
314 // this channel is used by the workers to contact the supervisor
315 let (statuses_s, statuses_r) = channel();
316
317 // construct `number_of_workers` worker-threads
318 let mut workers = Vec::with_capacity(number_of_workers);
319 for id in 0..number_of_workers {
320 workers.push(Worker::new(id, statuses_s.clone()));
321 }
322
323 // track how many jobs have panicked
324 let mut panicked_jobs = 0;
325
326 // keepin' running to distribute jobs among idle workers
327 'distribute_jobs: while let Message::NewJob(job) = orders_r.recv().unwrap() {
328 'query_status: loop {
329 match statuses_r.recv().unwrap() {
330 Status::Idle(id) => {
331 workers[id]
332 .instructions_s
333 .send(Message::NewJob(job))
334 .unwrap();
335 break 'query_status;
336 }
337 Status::Panic(id) => {
338 thread::join(&mut workers[id].thread);
339 match mode {
340 PanicSwitch::Kill => {
341 panicked_jobs += 1;
342 number_of_workers -= 1;
343 break 'distribute_jobs;
344 }
345 PanicSwitch::Respawn => {
346 workers[id] = Worker::new(id, statuses_s.clone());
347 }
348 };
349 }
350 }
351 }
352 }
353
354 // destruct all remaining worker-threads
355 while number_of_workers != 0 {
356 match statuses_r.recv().unwrap() {
357 Status::Idle(id) => {
358 workers[id].instructions_s.send(Message::Terminate).unwrap();
359 thread::join(&mut workers[id].thread);
360 }
361 Status::Panic(id) => {
362 thread::join(&mut workers[id].thread);
363 if let PanicSwitch::Kill = mode {
364 panicked_jobs += 1;
365 };
366 }
367 };
368 number_of_workers -= 1;
369 }
370
371 if panicked_jobs > 0 {
372 eprintln!("Aborting process: {} panicked jobs.", panicked_jobs);
373 std::process::abort();
374 }
375
376 // ensure that `orders_r` lives as long as the thread to prevent reachability-errors
377 drop(orders_r);
378 });
379
380 Self { orders_s, thread }
381 }
382}
383
384/// [`Worker`] abstracts workers.
385struct Worker {
386 /// place to put instructions
387 instructions_s: Sender<Message>,
388 /// handle to join
389 thread: JoinHandle,
390}
391
392impl Worker {
393 /// Sets up a new worker.
394 /// - `id` is the worker's staff number.
395 /// - `statuses_s` is where the worker puts its current status.
396 ///
397 /// In particular, it spawns a thread and sets up a way to communicate to the thread.
398 fn new(id: StaffNumber, statuses_s: Sender<Status>) -> Self {
399 // this channel is used by the supervisor to contact this worker
400 let (instructions_s, instructions_r) = channel();
401
402 let thread = thread::spawn(move || {
403 // report for duty
404 statuses_s.send(Status::Idle(id)).unwrap();
405
406 // keepin' running to execute jobs
407 loop {
408 let message = instructions_r.recv().unwrap();
409
410 match message {
411 Message::NewJob(job) => match std::panic::catch_unwind(job) {
412 Ok(_) => {
413 statuses_s.send(Status::Idle(id)).unwrap();
414 }
415 Err(_) => {
416 statuses_s.send(Status::Panic(id)).unwrap();
417 break;
418 }
419 },
420 Message::Terminate => break,
421 }
422 }
423 });
424
425 Self {
426 instructions_s,
427 thread,
428 }
429 }
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
436 use std::sync::Arc;
437
438 // settings
439 const SIZE: usize = 2; //= 6; && = 12; && = 36;
440 const MODE: PanicSwitch = PanicSwitch::Respawn; //= PanicSwitch::Kill;
441 const ID: StaffNumber = 0;
442
443 #[test]
444 fn test_threadpool_new_ok() {
445 let pool = ThreadPool::new(SIZE, MODE);
446 assert!(matches!(pool, Ok(_)));
447 }
448
449 #[test]
450 fn test_threadpool_new_err() {
451 let pool = ThreadPool::new(0, MODE);
452 assert!(matches!(pool, Err(_)));
453 }
454
455 #[test]
456 fn test_threadpool_execute() {
457 const N: usize = 5;
458
459 let pool = ThreadPool::new(SIZE, MODE).unwrap();
460
461 let counter = Arc::new(AtomicUsize::new(0));
462
463 let count_to = |n: usize| {
464 for _ in 0..n {
465 let counter = Arc::clone(&counter);
466 pool.execute(move || {
467 counter.fetch_add(1, Ordering::SeqCst);
468 });
469 }
470 };
471
472 for _ in 0..N {
473 count_to(SIZE);
474 if let PanicSwitch::Respawn = MODE {
475 pool.execute(|| panic!("Oh no!"));
476 }
477 }
478
479 drop(pool);
480
481 assert_eq!(N * SIZE, counter.load(Ordering::SeqCst));
482 }
483
484 #[test]
485 fn test_worker_thread_newjob() {
486 let (statuses_s, statuses_r) = channel();
487 let mut worker = Worker::new(ID, statuses_s);
488
489 assert!(matches!(statuses_r.recv().unwrap(), Status::Idle(ID)));
490
491 let flag = Arc::new(AtomicBool::new(false));
492 let flag_ref = Arc::clone(&flag);
493 let job = Box::new(move || {
494 flag_ref.store(true, Ordering::SeqCst);
495 });
496 worker.instructions_s.send(Message::NewJob(job)).unwrap();
497 assert!(matches!(statuses_r.recv().unwrap(), Status::Idle(ID)));
498 assert!(flag.load(Ordering::SeqCst));
499
500 let job = Box::new(|| panic!("Oh no!"));
501 worker.instructions_s.send(Message::NewJob(job)).unwrap();
502 assert!(matches!(statuses_r.recv().unwrap(), Status::Panic(ID)));
503
504 thread::join(&mut worker.thread);
505 }
506
507 #[test]
508 fn test_worker_thread_terminate() {
509 let (statuses_s, statuses_r) = channel();
510 let mut worker = Worker::new(ID, statuses_s);
511
512 assert!(matches!(statuses_r.recv().unwrap(), Status::Idle(ID)));
513
514 worker.instructions_s.send(Message::Terminate).unwrap();
515
516 thread::join(&mut worker.thread);
517 }
518}