safina_threadpool/
lib.rs

1//! # ARCHIVED ARCHIVED ARCHIVED
2//! This crate is archived and will not be updated.
3//!
4//! The code is now at
5//! [`safina::threadpool`](https://docs.rs/safina/latest/safina/threadpool/index.html) in the
6//! [`safina`](https://crates.io/crates/safina) crate.
7//!
8//! ----
9//!
10//! # safina-threadpool
11//!
12//! A threadpool.
13//!
14//! You can use it alone or with [`safina`](https://crates.io/crates/safina),
15//! a safe async runtime.
16//!
17//! # Features
18//! - Add a closure or `FnOnce` to the pool and one of the threads will execute it
19//! - Automatically restarts panicked threads
20//! - Retries after failing to spawn a thread
21//! - Drop the `ThreadPool` struct to stop all idle threads.
22//! - Destroy the pool and wait for all threads to stop
23//! - `forbid(unsafe_code)`
24//! - Depends only on `std`
25//! - 100% test coverage
26//!
27//! # Limitations
28//! - Not optimized
29//!
30//! # Examples
31//! ```rust
32//! # type ProcessResult = ();
33//! # fn process_data(data: (), sender: std::sync::mpsc::Sender<ProcessResult>) -> ProcessResult {
34//! #    sender.send(()).unwrap();
35//! # }
36//! # fn f() {
37//! # let data_source = vec![(),()];
38//! let pool =
39//!     safina_threadpool::ThreadPool::new("worker", 2).unwrap();
40//! let receiver = {
41//!     let (sender, receiver) =
42//!         std::sync::mpsc::channel();
43//!     for data in data_source {
44//!         let sender_clone = sender.clone();
45//!         pool.schedule(
46//!             move || process_data(data, sender_clone));
47//!     }
48//!     receiver
49//! };
50//! let results: Vec<ProcessResult> =
51//!     receiver.iter().collect();
52//! // ...
53//! # }
54//! ```
55//!
56//! # Alternatives
57//! - [`blocking`](https://crates.io/crates/blocking)
58//!   - Popular
59//!   - A little `unsafe` code
60//!   - [blocking/issues/24: Recover from thread spawn failure](https://github.com/smol-rs/blocking/issues/24)
61//! - [`threadpool`](https://crates.io/crates/threadpool)
62//!   - Popular
63//!   - Well maintained
64//!   - Dependencies have `unsafe` code
65//!   - [rust-threadpool/issues/97: Feature: provide a way to shutdown the thread pool](https://github.com/rust-threadpool/rust-threadpool/issues/97)
66//!   - Panics when failing to spawn a thread.
67//! - [`scoped_threadpool`](https://crates.io/crates/scoped_threadpool)
68//!   - Popular
69//!   - Contains `unsafe` code
70//!   - Unmaintained
71//!   - Does not restart workers on panic.
72//! - [`scheduled-thread-pool`](https://crates.io/crates/scheduled-thread-pool)
73//!   - Used by a popular connection pool library
74//!   - Dependencies have `unsafe` code
75//!   - Schedule jobs to run immediately, periodically, or after a specified delay.
76//! - [`workerpool`](https://crates.io/crates/workerpool)
77//!   - Dependencies have `unsafe` code
78//! - [`threads_pool`](https://crates.io/crates/threads_pool)
79//!   - Full of `unsafe`
80//! - [`thread-pool`](https://crates.io/crates/thread-pool)
81//!   - Old
82//!   - Dependencies have `unsafe` code
83//! - [`tasque`](https://crates.io/crates/tasque)
84//!   - Dependencies have `unsafe` code
85//! - [`fast-threadpool`](https://crates.io/crates/fast-threadpool)
86//!   - Dependencies have `unsafe` code
87//! - [`blocking-permit`](https://crates.io/crates/blocking-permit)
88//!   - Full of `unsafe`
89//! - [`rayon-core`](https://crates.io/crates/rayon-core)
90//!   - Full of `unsafe`
91//!
92//! # Changelog
93//! <details>
94//! <summary>Changelog</summary>
95//!
96//! - v0.2.4 - Update docs.
97//! - v0.2.3 - Implement `From<NewThreadPoolError>` and `From<TryScheduleError>` for `std::io::Error`.
98//! - v0.2.2 - Add `ThreadPool::join` and `ThreadPool::try_join`.
99//! - v0.2.1 - Improve test coverage.
100//! - v0.2.0
101//!   - `ThreadPool::new` to return `Result`.
102//!   - `ThreadPool::try_schedule` to return an error when it fails to restart panicked threads.
103//!   - `ThreadPool::schedule` to handle failure starting replacement threads.
104//! - v0.1.4 - Stop threads on drop.
105//! - v0.1.3 - Support stable Rust!  Needs 1.51+.
106//! - v0.1.2 - Add another example
107//! - v0.1.1 - Simplified internals and improved documentation.
108//! - v0.1.0 - First release
109//!
110//! </details>
111//!
112//! # TO DO
113//! - Make `join` and `try_join` work with `Arc<ThreadPool>`.
114//! - Log a warning when all threads panicked.
115//! - Update test coverage.
116//! - Add a public `respawn_threads` function.
117//! - Add a stress test
118//! - Add a benchmark.  See benchmarks in <https://crates.io/crates/executors>
119//! - Add a way for a job to schedule another job on the same thread, with stealing.
120#![forbid(unsafe_code)]
121
122mod atomic_counter;
123
124use atomic_counter::AtomicCounter;
125use core::fmt::{Debug, Display, Formatter};
126use core::time::Duration;
127use std::error::Error;
128use std::io::ErrorKind;
129use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender, TrySendError};
130use std::sync::{Arc, Mutex};
131use std::time::Instant;
132
133#[cfg(feature = "testing")]
134#[doc(hidden)]
135pub static INTERNAL_MAX_THREADS: core::sync::atomic::AtomicUsize =
136    core::sync::atomic::AtomicUsize::new(usize::MAX);
137
138fn sleep_ms(ms: u64) {
139    std::thread::sleep(Duration::from_millis(ms));
140}
141
142fn err_eq(a: &std::io::Error, b: &std::io::Error) -> bool {
143    a.kind() == b.kind() && format!("{}", a) == format!("{}", b)
144}
145
146#[derive(Debug)]
147pub enum StartThreadsError {
148    /// The pool has no threads and `std::thread::Builder::spawn` returned the included error.
149    NoThreads(std::io::Error),
150    /// The pool has at least one thread and `std::thread::Builder::spawn` returned the included error.
151    Respawn(std::io::Error),
152}
153impl Display for StartThreadsError {
154    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
155        match self {
156            StartThreadsError::NoThreads(e) => write!(
157                f,
158                "ThreadPool workers all panicked, failed starting replacement threads: {}",
159                e
160            ),
161            StartThreadsError::Respawn(e) => {
162                write!(
163                    f,
164                    "ThreadPool failed starting threads to replace panicked threads: {}",
165                    e
166                )
167            }
168        }
169    }
170}
171impl Error for StartThreadsError {}
172impl PartialEq for StartThreadsError {
173    fn eq(&self, other: &Self) -> bool {
174        match (self, other) {
175            (StartThreadsError::NoThreads(a), StartThreadsError::NoThreads(b))
176            | (StartThreadsError::Respawn(a), StartThreadsError::Respawn(b)) => err_eq(a, b),
177            _ => false,
178        }
179    }
180}
181impl Eq for StartThreadsError {}
182
183#[derive(Debug)]
184pub enum NewThreadPoolError {
185    Parameter(String),
186    /// `std::thread::Builder::spawn` returned the included error.
187    Spawn(std::io::Error),
188}
189impl Display for NewThreadPoolError {
190    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
191        match self {
192            NewThreadPoolError::Parameter(s) => write!(f, "{}", s),
193            NewThreadPoolError::Spawn(e) => {
194                write!(f, "ThreadPool failed starting threads: {}", e)
195            }
196        }
197    }
198}
199impl Error for NewThreadPoolError {}
200impl PartialEq for NewThreadPoolError {
201    fn eq(&self, other: &Self) -> bool {
202        match (self, other) {
203            (NewThreadPoolError::Parameter(a), NewThreadPoolError::Parameter(b)) => a == b,
204            (NewThreadPoolError::Spawn(a), NewThreadPoolError::Spawn(b)) => err_eq(a, b),
205            _ => false,
206        }
207    }
208}
209impl Eq for NewThreadPoolError {}
210impl From<StartThreadsError> for NewThreadPoolError {
211    fn from(err: StartThreadsError) -> Self {
212        match err {
213            StartThreadsError::NoThreads(e) | StartThreadsError::Respawn(e) => {
214                NewThreadPoolError::Spawn(e)
215            }
216        }
217    }
218}
219impl From<NewThreadPoolError> for std::io::Error {
220    fn from(new_thread_pool_error: NewThreadPoolError) -> Self {
221        match new_thread_pool_error {
222            NewThreadPoolError::Parameter(s) => std::io::Error::new(ErrorKind::InvalidInput, s),
223            NewThreadPoolError::Spawn(s) => {
224                std::io::Error::new(ErrorKind::Other, format!("failed to start threads: {}", s))
225            }
226        }
227    }
228}
229
230#[derive(Debug)]
231pub enum TryScheduleError {
232    QueueFull,
233    /// The pool has no threads and `std::thread::Builder::spawn` returned the included error.
234    NoThreads(std::io::Error),
235    /// The pool has at least one thread and `std::thread::Builder::spawn` returned the included error.
236    Respawn(std::io::Error),
237}
238impl Display for TryScheduleError {
239    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
240        match self {
241            TryScheduleError::QueueFull => write!(f, "ThreadPool queue is full"),
242            TryScheduleError::NoThreads(e) => write!(
243                f,
244                "ThreadPool workers all panicked, failed starting replacement threads: {}",
245                e
246            ),
247            TryScheduleError::Respawn(e) => {
248                write!(
249                    f,
250                    "ThreadPool failed starting threads to replace panicked threads: {}",
251                    e
252                )
253            }
254        }
255    }
256}
257impl Error for TryScheduleError {}
258impl PartialEq for TryScheduleError {
259    fn eq(&self, other: &Self) -> bool {
260        match (self, other) {
261            (TryScheduleError::QueueFull, TryScheduleError::QueueFull) => true,
262            (TryScheduleError::NoThreads(a), TryScheduleError::NoThreads(b))
263            | (TryScheduleError::Respawn(a), TryScheduleError::Respawn(b)) => err_eq(a, b),
264            _ => false,
265        }
266    }
267}
268impl Eq for TryScheduleError {}
269impl From<StartThreadsError> for TryScheduleError {
270    fn from(err: StartThreadsError) -> Self {
271        match err {
272            StartThreadsError::NoThreads(e) => TryScheduleError::NoThreads(e),
273            StartThreadsError::Respawn(e) => TryScheduleError::Respawn(e),
274        }
275    }
276}
277impl From<TryScheduleError> for std::io::Error {
278    fn from(try_schedule_error: TryScheduleError) -> Self {
279        match try_schedule_error {
280            TryScheduleError::QueueFull => {
281                std::io::Error::new(ErrorKind::WouldBlock, "TryScheduleError::QueueFull")
282            }
283            TryScheduleError::NoThreads(e) => std::io::Error::new(
284                e.kind(),
285                format!(
286                    "ThreadPool workers all panicked, failed starting replacement threads: {}",
287                    e
288                ),
289            ),
290            TryScheduleError::Respawn(e) => std::io::Error::new(
291                e.kind(),
292                format!(
293                    "ThreadPool failed starting threads to replace panicked threads: {}",
294                    e
295                ),
296            ),
297        }
298    }
299}
300
301struct Inner {
302    name: &'static str,
303    next_name_num: AtomicCounter,
304    size: usize,
305    receiver: Mutex<Receiver<Box<dyn FnOnce() + Send>>>,
306}
307
308impl Inner {
309    fn num_live_threads(self: &Arc<Self>) -> usize {
310        Arc::strong_count(self) - 1
311    }
312
313    fn work(self: &Arc<Self>) {
314        loop {
315            let recv_result = self
316                .receiver
317                .lock()
318                .unwrap()
319                .recv_timeout(Duration::from_millis(500));
320            match recv_result {
321                Ok(f) => {
322                    let _ignored = self.start_threads();
323                    f();
324                }
325                Err(RecvTimeoutError::Timeout) => {}
326                // ThreadPool was dropped.
327                Err(RecvTimeoutError::Disconnected) => return,
328            };
329            let _ignored = self.start_threads();
330        }
331    }
332
333    #[allow(clippy::unused_self)]
334    #[allow(unused_variables)]
335    fn spawn_thread(
336        &self,
337        num_live_threads: usize,
338        name: String,
339        f: impl FnOnce() + Send + 'static,
340    ) -> Result<(), std::io::Error> {
341        // I found no way to make std::thread fail reliably on both macOS & Linux.  So we simulate it with this.
342        #[cfg(feature = "testing")]
343        if num_live_threads >= INTERNAL_MAX_THREADS.load(std::sync::atomic::Ordering::Acquire) {
344            return Err(std::io::Error::new(
345                std::io::ErrorKind::Other,
346                "err1".to_string(),
347            ));
348        }
349
350        std::thread::Builder::new().name(name).spawn(f)?;
351        Ok(())
352    }
353
354    fn start_thread(self: &Arc<Self>) -> Result<(), StartThreadsError> {
355        let self_clone = self.clone();
356        let num_live_threads = self.num_live_threads() - 1;
357        if num_live_threads < self.size {
358            self.spawn_thread(
359                num_live_threads,
360                format!("{}{}", self.name, self.next_name_num.next()),
361                move || self_clone.work(),
362            )
363            .map_err(|e| {
364                if num_live_threads == 0 {
365                    StartThreadsError::NoThreads(e)
366                } else {
367                    StartThreadsError::Respawn(e)
368                }
369            })?;
370        }
371        Ok(())
372    }
373
374    fn start_threads(self: &Arc<Self>) -> Result<(), StartThreadsError> {
375        while self.num_live_threads() < self.size {
376            self.start_thread()?;
377        }
378        Ok(())
379    }
380}
381
382/// A collection of threads and a queue for jobs (`FnOnce` structs) they execute.
383///
384/// Threads stop when they execute a job that panics.
385/// If one thread survives, it will recreate all the threads.
386/// The next call to [`schedule`](#method.schedule) or [`try_schedule`](#method.try_schedule)
387/// also recreates threads.
388///
389/// If your threadpool load is bursty and you want to automatically recover
390/// from an all-threads-panicked state, you could use
391/// [`safina_timer`](https://crates.io/crates/safina-timer) to periodically call
392/// [`schedule`](#method.schedule) or [`try_schedule`](#method.try_schedule).
393///
394/// After drop, threads stop as they become idle.
395///
396/// # Example
397/// ```rust
398/// # type ProcessResult = ();
399/// # fn process_data(data: (), sender: std::sync::mpsc::Sender<ProcessResult>) -> ProcessResult {
400/// #    sender.send(()).unwrap();
401/// # }
402/// # fn f() {
403/// # let data_source = vec![(),()];
404/// let pool =
405///     safina_threadpool::ThreadPool::new("worker", 2).unwrap();
406/// let receiver = {
407///     let (sender, receiver) =
408///         std::sync::mpsc::channel();
409///     for data in data_source {
410///         let sender_clone = sender.clone();
411///         pool.schedule(
412///             move || process_data(data, sender_clone));
413///     }
414///     receiver
415/// };
416/// let results: Vec<ProcessResult> =
417///     receiver.iter().collect();
418/// // ...
419/// # }
420/// ```
421///
422/// ```rust
423/// # use core::time::Duration;
424/// # use std::sync::Arc;
425/// let pool =
426///     Arc::new(
427///         safina_threadpool::ThreadPool::new("worker", 2).unwrap());
428/// let executor = safina_executor::Executor::default();
429/// safina_timer::start_timer_thread();
430/// let pool_clone = pool.clone();
431/// executor.spawn(async move {
432///     loop {
433///         safina_timer::sleep_for(Duration::from_millis(500)).await;
434///         pool_clone.schedule(|| {});
435///     }
436/// });
437/// # assert_eq!(2, pool.num_live_threads());
438/// # for _ in 0..2 {
439/// #     pool.schedule(|| {
440/// #         std::thread::sleep(Duration::from_millis(100));
441/// #         panic!("ignore this panic")
442/// #     });
443/// # }
444/// # std::thread::sleep(Duration::from_millis(200));
445/// # assert_eq!(0, pool.num_live_threads());
446/// # std::thread::sleep(Duration::from_millis(500));
447/// # assert_eq!(2, pool.num_live_threads());
448/// ```
449pub struct ThreadPool {
450    inner: Arc<Inner>,
451    sender: SyncSender<Box<dyn FnOnce() + Send>>,
452}
453impl ThreadPool {
454    /// Creates a new thread pool containing `size` threads.
455    /// The threads all start immediately.
456    ///
457    /// Threads are named with `name` with a number.
458    /// For example, `ThreadPool::new("worker", 2)`
459    /// creates threads named "worker-1" and "worker-2".
460    /// If one of those threads panics, the pool creates "worker-3".
461    ///
462    /// After the `ThreadPool` struct drops, the threads continue processing
463    /// jobs and stop when the queue is empty.
464    ///
465    /// # Errors
466    /// Returns an error when `name` is empty, `size` is zero, or it fails to start the threads.
467    pub fn new(name: &'static str, size: usize) -> Result<Self, NewThreadPoolError> {
468        if name.is_empty() {
469            return Err(NewThreadPoolError::Parameter(
470                "ThreadPool::new called with empty name".to_string(),
471            ));
472        }
473        if size < 1 {
474            return Err(NewThreadPoolError::Parameter(format!(
475                "ThreadPool::new called with invalid size value: {:?}",
476                size
477            )));
478        }
479        // Use a channel with bounded size.
480        // If the channel was unbounded, the process could OOM when throughput goes down.
481        let (sender, receiver) = std::sync::mpsc::sync_channel(size * 200);
482        let pool = ThreadPool {
483            inner: Arc::new(Inner {
484                name,
485                next_name_num: AtomicCounter::new(),
486                size,
487                receiver: Mutex::new(receiver),
488            }),
489            sender,
490        };
491        pool.inner.start_threads()?;
492        Ok(pool)
493    }
494
495    /// Returns the number of threads in the pool.
496    #[must_use]
497    pub fn size(&self) -> usize {
498        self.inner.size
499    }
500
501    /// Returns the number of threads currently alive.
502    #[must_use]
503    pub fn num_live_threads(&self) -> usize {
504        self.inner.num_live_threads()
505    }
506
507    #[cfg(feature = "testing")]
508    #[doc(hidden)]
509    #[must_use]
510    pub fn num_live_threads_fn(&self) -> Box<dyn Fn() -> usize> {
511        let inner_clone = self.inner.clone();
512        Box::new(move || inner_clone.num_live_threads())
513    }
514
515    /// Adds a job to the queue.  The next idle thread will execute it.
516    /// Jobs are started in FIFO order.
517    ///
518    /// Blocks when the queue is full or no threads are running.
519    /// See [`try_schedule`](#method.try_schedule).
520    ///
521    /// Recreates any threads that panicked.
522    /// Retries on failure to start a new thread.
523    ///
524    /// Puts `f` in a [`Box`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html) before
525    /// adding it to the queue.
526    #[allow(clippy::missing_panics_doc)]
527    pub fn schedule<F: FnOnce() + Send + 'static>(&self, f: F) {
528        let mut opt_box_f: Option<Box<dyn FnOnce() + Send + 'static>> = Some(Box::new(f));
529        loop {
530            match self.inner.start_threads() {
531                Ok(()) | Err(StartThreadsError::Respawn(_)) => {
532                    // At least one thread is running.
533                }
534                Err(StartThreadsError::NoThreads(_)) => {
535                    sleep_ms(10);
536                    continue;
537                }
538            }
539            opt_box_f = match self.sender.try_send(opt_box_f.take().unwrap()) {
540                Ok(()) => return,
541                Err(TrySendError::Disconnected(_)) => unreachable!(),
542                Err(TrySendError::Full(box_f)) => Some(box_f),
543            };
544            sleep_ms(10);
545        }
546    }
547
548    /// Adds a job to the queue and then starts threads to replace any panicked threads.
549    /// The next idle thread will execute the job.
550    /// Starts jobs in FIFO order.
551    ///
552    /// Puts `f` in a [`Box`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html) before
553    /// adding it to the queue.
554    ///
555    /// # Errors
556    /// Returns an error when the queue is full or it fails to start a thread.
557    /// If the return value is not `TryScheduleError::QueueFull` then it added the job to the queue.
558    #[allow(clippy::missing_panics_doc)]
559    pub fn try_schedule(&self, f: impl FnOnce() + Send + 'static) -> Result<(), TryScheduleError> {
560        match self.sender.try_send(Box::new(f)) {
561            Ok(_) => {}
562            Err(TrySendError::Disconnected(_)) => unreachable!(),
563            Err(TrySendError::Full(_)) => return Err(TryScheduleError::QueueFull),
564        };
565        self.inner.start_threads().map_err(std::convert::Into::into)
566    }
567
568    /// Consumes the thread pool and waits for all threads to stop.
569    pub fn join(self) {
570        let inner = self.inner.clone();
571        drop(self);
572        while inner.num_live_threads() > 0 {
573            sleep_ms(10);
574        }
575    }
576
577    /// Consumes the thread pool and waits for all threads to stop.
578    ///
579    /// # Errors
580    /// Returns an error if the threads do not stop within the timeout duration.
581    pub fn try_join(self, timeout: Duration) -> Result<(), String> {
582        let inner = self.inner.clone();
583        drop(self);
584        let deadline = Instant::now() + timeout;
585        loop {
586            if inner.num_live_threads() < 1 {
587                return Ok(());
588            }
589            if deadline < Instant::now() {
590                return Err("timed out waiting for ThreadPool workers to stop".to_string());
591            }
592            sleep_ms(10);
593        }
594    }
595}
596impl Debug for ThreadPool {
597    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
598        write!(
599            f,
600            "ThreadPool{{{:?},size={:?}}}",
601            self.inner.name, self.inner.size
602        )
603    }
604}