safina_threadpool/lib.rs
1//! # ARCHIVED ARCHIVED ARCHIVED
2//! This crate is archived and will not be updated.
3//!
4//! The code is now at
5//! [`safina::threadpool`](https://docs.rs/safina/latest/safina/threadpool/index.html) in the
6//! [`safina`](https://crates.io/crates/safina) crate.
7//!
8//! ----
9//!
10//! # safina-threadpool
11//!
12//! A threadpool.
13//!
14//! You can use it alone or with [`safina`](https://crates.io/crates/safina),
15//! a safe async runtime.
16//!
17//! # Features
18//! - Add a closure or `FnOnce` to the pool and one of the threads will execute it
19//! - Automatically restarts panicked threads
20//! - Retries after failing to spawn a thread
21//! - Drop the `ThreadPool` struct to stop all idle threads.
22//! - Destroy the pool and wait for all threads to stop
23//! - `forbid(unsafe_code)`
24//! - Depends only on `std`
25//! - 100% test coverage
26//!
27//! # Limitations
28//! - Not optimized
29//!
30//! # Examples
31//! ```rust
32//! # type ProcessResult = ();
33//! # fn process_data(data: (), sender: std::sync::mpsc::Sender<ProcessResult>) -> ProcessResult {
34//! # sender.send(()).unwrap();
35//! # }
36//! # fn f() {
37//! # let data_source = vec![(),()];
38//! let pool =
39//! safina_threadpool::ThreadPool::new("worker", 2).unwrap();
40//! let receiver = {
41//! let (sender, receiver) =
42//! std::sync::mpsc::channel();
43//! for data in data_source {
44//! let sender_clone = sender.clone();
45//! pool.schedule(
46//! move || process_data(data, sender_clone));
47//! }
48//! receiver
49//! };
50//! let results: Vec<ProcessResult> =
51//! receiver.iter().collect();
52//! // ...
53//! # }
54//! ```
55//!
56//! # Alternatives
57//! - [`blocking`](https://crates.io/crates/blocking)
58//! - Popular
59//! - A little `unsafe` code
60//! - [blocking/issues/24: Recover from thread spawn failure](https://github.com/smol-rs/blocking/issues/24)
61//! - [`threadpool`](https://crates.io/crates/threadpool)
62//! - Popular
63//! - Well maintained
64//! - Dependencies have `unsafe` code
65//! - [rust-threadpool/issues/97: Feature: provide a way to shutdown the thread pool](https://github.com/rust-threadpool/rust-threadpool/issues/97)
66//! - Panics when failing to spawn a thread.
67//! - [`scoped_threadpool`](https://crates.io/crates/scoped_threadpool)
68//! - Popular
69//! - Contains `unsafe` code
70//! - Unmaintained
71//! - Does not restart workers on panic.
72//! - [`scheduled-thread-pool`](https://crates.io/crates/scheduled-thread-pool)
73//! - Used by a popular connection pool library
74//! - Dependencies have `unsafe` code
75//! - Schedule jobs to run immediately, periodically, or after a specified delay.
76//! - [`workerpool`](https://crates.io/crates/workerpool)
77//! - Dependencies have `unsafe` code
78//! - [`threads_pool`](https://crates.io/crates/threads_pool)
79//! - Full of `unsafe`
80//! - [`thread-pool`](https://crates.io/crates/thread-pool)
81//! - Old
82//! - Dependencies have `unsafe` code
83//! - [`tasque`](https://crates.io/crates/tasque)
84//! - Dependencies have `unsafe` code
85//! - [`fast-threadpool`](https://crates.io/crates/fast-threadpool)
86//! - Dependencies have `unsafe` code
87//! - [`blocking-permit`](https://crates.io/crates/blocking-permit)
88//! - Full of `unsafe`
89//! - [`rayon-core`](https://crates.io/crates/rayon-core)
90//! - Full of `unsafe`
91//!
92//! # Changelog
93//! <details>
94//! <summary>Changelog</summary>
95//!
96//! - v0.2.4 - Update docs.
97//! - v0.2.3 - Implement `From<NewThreadPoolError>` and `From<TryScheduleError>` for `std::io::Error`.
98//! - v0.2.2 - Add `ThreadPool::join` and `ThreadPool::try_join`.
99//! - v0.2.1 - Improve test coverage.
100//! - v0.2.0
101//! - `ThreadPool::new` to return `Result`.
102//! - `ThreadPool::try_schedule` to return an error when it fails to restart panicked threads.
103//! - `ThreadPool::schedule` to handle failure starting replacement threads.
104//! - v0.1.4 - Stop threads on drop.
105//! - v0.1.3 - Support stable Rust! Needs 1.51+.
106//! - v0.1.2 - Add another example
107//! - v0.1.1 - Simplified internals and improved documentation.
108//! - v0.1.0 - First release
109//!
110//! </details>
111//!
112//! # TO DO
113//! - Make `join` and `try_join` work with `Arc<ThreadPool>`.
114//! - Log a warning when all threads panicked.
115//! - Update test coverage.
116//! - Add a public `respawn_threads` function.
117//! - Add a stress test
118//! - Add a benchmark. See benchmarks in <https://crates.io/crates/executors>
119//! - Add a way for a job to schedule another job on the same thread, with stealing.
120#![forbid(unsafe_code)]
121
122mod atomic_counter;
123
124use atomic_counter::AtomicCounter;
125use core::fmt::{Debug, Display, Formatter};
126use core::time::Duration;
127use std::error::Error;
128use std::io::ErrorKind;
129use std::sync::mpsc::{Receiver, RecvTimeoutError, SyncSender, TrySendError};
130use std::sync::{Arc, Mutex};
131use std::time::Instant;
132
133#[cfg(feature = "testing")]
134#[doc(hidden)]
135pub static INTERNAL_MAX_THREADS: core::sync::atomic::AtomicUsize =
136 core::sync::atomic::AtomicUsize::new(usize::MAX);
137
138fn sleep_ms(ms: u64) {
139 std::thread::sleep(Duration::from_millis(ms));
140}
141
142fn err_eq(a: &std::io::Error, b: &std::io::Error) -> bool {
143 a.kind() == b.kind() && format!("{}", a) == format!("{}", b)
144}
145
146#[derive(Debug)]
147pub enum StartThreadsError {
148 /// The pool has no threads and `std::thread::Builder::spawn` returned the included error.
149 NoThreads(std::io::Error),
150 /// The pool has at least one thread and `std::thread::Builder::spawn` returned the included error.
151 Respawn(std::io::Error),
152}
153impl Display for StartThreadsError {
154 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
155 match self {
156 StartThreadsError::NoThreads(e) => write!(
157 f,
158 "ThreadPool workers all panicked, failed starting replacement threads: {}",
159 e
160 ),
161 StartThreadsError::Respawn(e) => {
162 write!(
163 f,
164 "ThreadPool failed starting threads to replace panicked threads: {}",
165 e
166 )
167 }
168 }
169 }
170}
171impl Error for StartThreadsError {}
172impl PartialEq for StartThreadsError {
173 fn eq(&self, other: &Self) -> bool {
174 match (self, other) {
175 (StartThreadsError::NoThreads(a), StartThreadsError::NoThreads(b))
176 | (StartThreadsError::Respawn(a), StartThreadsError::Respawn(b)) => err_eq(a, b),
177 _ => false,
178 }
179 }
180}
181impl Eq for StartThreadsError {}
182
183#[derive(Debug)]
184pub enum NewThreadPoolError {
185 Parameter(String),
186 /// `std::thread::Builder::spawn` returned the included error.
187 Spawn(std::io::Error),
188}
189impl Display for NewThreadPoolError {
190 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
191 match self {
192 NewThreadPoolError::Parameter(s) => write!(f, "{}", s),
193 NewThreadPoolError::Spawn(e) => {
194 write!(f, "ThreadPool failed starting threads: {}", e)
195 }
196 }
197 }
198}
199impl Error for NewThreadPoolError {}
200impl PartialEq for NewThreadPoolError {
201 fn eq(&self, other: &Self) -> bool {
202 match (self, other) {
203 (NewThreadPoolError::Parameter(a), NewThreadPoolError::Parameter(b)) => a == b,
204 (NewThreadPoolError::Spawn(a), NewThreadPoolError::Spawn(b)) => err_eq(a, b),
205 _ => false,
206 }
207 }
208}
209impl Eq for NewThreadPoolError {}
210impl From<StartThreadsError> for NewThreadPoolError {
211 fn from(err: StartThreadsError) -> Self {
212 match err {
213 StartThreadsError::NoThreads(e) | StartThreadsError::Respawn(e) => {
214 NewThreadPoolError::Spawn(e)
215 }
216 }
217 }
218}
219impl From<NewThreadPoolError> for std::io::Error {
220 fn from(new_thread_pool_error: NewThreadPoolError) -> Self {
221 match new_thread_pool_error {
222 NewThreadPoolError::Parameter(s) => std::io::Error::new(ErrorKind::InvalidInput, s),
223 NewThreadPoolError::Spawn(s) => {
224 std::io::Error::new(ErrorKind::Other, format!("failed to start threads: {}", s))
225 }
226 }
227 }
228}
229
230#[derive(Debug)]
231pub enum TryScheduleError {
232 QueueFull,
233 /// The pool has no threads and `std::thread::Builder::spawn` returned the included error.
234 NoThreads(std::io::Error),
235 /// The pool has at least one thread and `std::thread::Builder::spawn` returned the included error.
236 Respawn(std::io::Error),
237}
238impl Display for TryScheduleError {
239 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
240 match self {
241 TryScheduleError::QueueFull => write!(f, "ThreadPool queue is full"),
242 TryScheduleError::NoThreads(e) => write!(
243 f,
244 "ThreadPool workers all panicked, failed starting replacement threads: {}",
245 e
246 ),
247 TryScheduleError::Respawn(e) => {
248 write!(
249 f,
250 "ThreadPool failed starting threads to replace panicked threads: {}",
251 e
252 )
253 }
254 }
255 }
256}
257impl Error for TryScheduleError {}
258impl PartialEq for TryScheduleError {
259 fn eq(&self, other: &Self) -> bool {
260 match (self, other) {
261 (TryScheduleError::QueueFull, TryScheduleError::QueueFull) => true,
262 (TryScheduleError::NoThreads(a), TryScheduleError::NoThreads(b))
263 | (TryScheduleError::Respawn(a), TryScheduleError::Respawn(b)) => err_eq(a, b),
264 _ => false,
265 }
266 }
267}
268impl Eq for TryScheduleError {}
269impl From<StartThreadsError> for TryScheduleError {
270 fn from(err: StartThreadsError) -> Self {
271 match err {
272 StartThreadsError::NoThreads(e) => TryScheduleError::NoThreads(e),
273 StartThreadsError::Respawn(e) => TryScheduleError::Respawn(e),
274 }
275 }
276}
277impl From<TryScheduleError> for std::io::Error {
278 fn from(try_schedule_error: TryScheduleError) -> Self {
279 match try_schedule_error {
280 TryScheduleError::QueueFull => {
281 std::io::Error::new(ErrorKind::WouldBlock, "TryScheduleError::QueueFull")
282 }
283 TryScheduleError::NoThreads(e) => std::io::Error::new(
284 e.kind(),
285 format!(
286 "ThreadPool workers all panicked, failed starting replacement threads: {}",
287 e
288 ),
289 ),
290 TryScheduleError::Respawn(e) => std::io::Error::new(
291 e.kind(),
292 format!(
293 "ThreadPool failed starting threads to replace panicked threads: {}",
294 e
295 ),
296 ),
297 }
298 }
299}
300
301struct Inner {
302 name: &'static str,
303 next_name_num: AtomicCounter,
304 size: usize,
305 receiver: Mutex<Receiver<Box<dyn FnOnce() + Send>>>,
306}
307
308impl Inner {
309 fn num_live_threads(self: &Arc<Self>) -> usize {
310 Arc::strong_count(self) - 1
311 }
312
313 fn work(self: &Arc<Self>) {
314 loop {
315 let recv_result = self
316 .receiver
317 .lock()
318 .unwrap()
319 .recv_timeout(Duration::from_millis(500));
320 match recv_result {
321 Ok(f) => {
322 let _ignored = self.start_threads();
323 f();
324 }
325 Err(RecvTimeoutError::Timeout) => {}
326 // ThreadPool was dropped.
327 Err(RecvTimeoutError::Disconnected) => return,
328 };
329 let _ignored = self.start_threads();
330 }
331 }
332
333 #[allow(clippy::unused_self)]
334 #[allow(unused_variables)]
335 fn spawn_thread(
336 &self,
337 num_live_threads: usize,
338 name: String,
339 f: impl FnOnce() + Send + 'static,
340 ) -> Result<(), std::io::Error> {
341 // I found no way to make std::thread fail reliably on both macOS & Linux. So we simulate it with this.
342 #[cfg(feature = "testing")]
343 if num_live_threads >= INTERNAL_MAX_THREADS.load(std::sync::atomic::Ordering::Acquire) {
344 return Err(std::io::Error::new(
345 std::io::ErrorKind::Other,
346 "err1".to_string(),
347 ));
348 }
349
350 std::thread::Builder::new().name(name).spawn(f)?;
351 Ok(())
352 }
353
354 fn start_thread(self: &Arc<Self>) -> Result<(), StartThreadsError> {
355 let self_clone = self.clone();
356 let num_live_threads = self.num_live_threads() - 1;
357 if num_live_threads < self.size {
358 self.spawn_thread(
359 num_live_threads,
360 format!("{}{}", self.name, self.next_name_num.next()),
361 move || self_clone.work(),
362 )
363 .map_err(|e| {
364 if num_live_threads == 0 {
365 StartThreadsError::NoThreads(e)
366 } else {
367 StartThreadsError::Respawn(e)
368 }
369 })?;
370 }
371 Ok(())
372 }
373
374 fn start_threads(self: &Arc<Self>) -> Result<(), StartThreadsError> {
375 while self.num_live_threads() < self.size {
376 self.start_thread()?;
377 }
378 Ok(())
379 }
380}
381
382/// A collection of threads and a queue for jobs (`FnOnce` structs) they execute.
383///
384/// Threads stop when they execute a job that panics.
385/// If one thread survives, it will recreate all the threads.
386/// The next call to [`schedule`](#method.schedule) or [`try_schedule`](#method.try_schedule)
387/// also recreates threads.
388///
389/// If your threadpool load is bursty and you want to automatically recover
390/// from an all-threads-panicked state, you could use
391/// [`safina_timer`](https://crates.io/crates/safina-timer) to periodically call
392/// [`schedule`](#method.schedule) or [`try_schedule`](#method.try_schedule).
393///
394/// After drop, threads stop as they become idle.
395///
396/// # Example
397/// ```rust
398/// # type ProcessResult = ();
399/// # fn process_data(data: (), sender: std::sync::mpsc::Sender<ProcessResult>) -> ProcessResult {
400/// # sender.send(()).unwrap();
401/// # }
402/// # fn f() {
403/// # let data_source = vec![(),()];
404/// let pool =
405/// safina_threadpool::ThreadPool::new("worker", 2).unwrap();
406/// let receiver = {
407/// let (sender, receiver) =
408/// std::sync::mpsc::channel();
409/// for data in data_source {
410/// let sender_clone = sender.clone();
411/// pool.schedule(
412/// move || process_data(data, sender_clone));
413/// }
414/// receiver
415/// };
416/// let results: Vec<ProcessResult> =
417/// receiver.iter().collect();
418/// // ...
419/// # }
420/// ```
421///
422/// ```rust
423/// # use core::time::Duration;
424/// # use std::sync::Arc;
425/// let pool =
426/// Arc::new(
427/// safina_threadpool::ThreadPool::new("worker", 2).unwrap());
428/// let executor = safina_executor::Executor::default();
429/// safina_timer::start_timer_thread();
430/// let pool_clone = pool.clone();
431/// executor.spawn(async move {
432/// loop {
433/// safina_timer::sleep_for(Duration::from_millis(500)).await;
434/// pool_clone.schedule(|| {});
435/// }
436/// });
437/// # assert_eq!(2, pool.num_live_threads());
438/// # for _ in 0..2 {
439/// # pool.schedule(|| {
440/// # std::thread::sleep(Duration::from_millis(100));
441/// # panic!("ignore this panic")
442/// # });
443/// # }
444/// # std::thread::sleep(Duration::from_millis(200));
445/// # assert_eq!(0, pool.num_live_threads());
446/// # std::thread::sleep(Duration::from_millis(500));
447/// # assert_eq!(2, pool.num_live_threads());
448/// ```
449pub struct ThreadPool {
450 inner: Arc<Inner>,
451 sender: SyncSender<Box<dyn FnOnce() + Send>>,
452}
453impl ThreadPool {
454 /// Creates a new thread pool containing `size` threads.
455 /// The threads all start immediately.
456 ///
457 /// Threads are named with `name` with a number.
458 /// For example, `ThreadPool::new("worker", 2)`
459 /// creates threads named "worker-1" and "worker-2".
460 /// If one of those threads panics, the pool creates "worker-3".
461 ///
462 /// After the `ThreadPool` struct drops, the threads continue processing
463 /// jobs and stop when the queue is empty.
464 ///
465 /// # Errors
466 /// Returns an error when `name` is empty, `size` is zero, or it fails to start the threads.
467 pub fn new(name: &'static str, size: usize) -> Result<Self, NewThreadPoolError> {
468 if name.is_empty() {
469 return Err(NewThreadPoolError::Parameter(
470 "ThreadPool::new called with empty name".to_string(),
471 ));
472 }
473 if size < 1 {
474 return Err(NewThreadPoolError::Parameter(format!(
475 "ThreadPool::new called with invalid size value: {:?}",
476 size
477 )));
478 }
479 // Use a channel with bounded size.
480 // If the channel was unbounded, the process could OOM when throughput goes down.
481 let (sender, receiver) = std::sync::mpsc::sync_channel(size * 200);
482 let pool = ThreadPool {
483 inner: Arc::new(Inner {
484 name,
485 next_name_num: AtomicCounter::new(),
486 size,
487 receiver: Mutex::new(receiver),
488 }),
489 sender,
490 };
491 pool.inner.start_threads()?;
492 Ok(pool)
493 }
494
495 /// Returns the number of threads in the pool.
496 #[must_use]
497 pub fn size(&self) -> usize {
498 self.inner.size
499 }
500
501 /// Returns the number of threads currently alive.
502 #[must_use]
503 pub fn num_live_threads(&self) -> usize {
504 self.inner.num_live_threads()
505 }
506
507 #[cfg(feature = "testing")]
508 #[doc(hidden)]
509 #[must_use]
510 pub fn num_live_threads_fn(&self) -> Box<dyn Fn() -> usize> {
511 let inner_clone = self.inner.clone();
512 Box::new(move || inner_clone.num_live_threads())
513 }
514
515 /// Adds a job to the queue. The next idle thread will execute it.
516 /// Jobs are started in FIFO order.
517 ///
518 /// Blocks when the queue is full or no threads are running.
519 /// See [`try_schedule`](#method.try_schedule).
520 ///
521 /// Recreates any threads that panicked.
522 /// Retries on failure to start a new thread.
523 ///
524 /// Puts `f` in a [`Box`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html) before
525 /// adding it to the queue.
526 #[allow(clippy::missing_panics_doc)]
527 pub fn schedule<F: FnOnce() + Send + 'static>(&self, f: F) {
528 let mut opt_box_f: Option<Box<dyn FnOnce() + Send + 'static>> = Some(Box::new(f));
529 loop {
530 match self.inner.start_threads() {
531 Ok(()) | Err(StartThreadsError::Respawn(_)) => {
532 // At least one thread is running.
533 }
534 Err(StartThreadsError::NoThreads(_)) => {
535 sleep_ms(10);
536 continue;
537 }
538 }
539 opt_box_f = match self.sender.try_send(opt_box_f.take().unwrap()) {
540 Ok(()) => return,
541 Err(TrySendError::Disconnected(_)) => unreachable!(),
542 Err(TrySendError::Full(box_f)) => Some(box_f),
543 };
544 sleep_ms(10);
545 }
546 }
547
548 /// Adds a job to the queue and then starts threads to replace any panicked threads.
549 /// The next idle thread will execute the job.
550 /// Starts jobs in FIFO order.
551 ///
552 /// Puts `f` in a [`Box`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html) before
553 /// adding it to the queue.
554 ///
555 /// # Errors
556 /// Returns an error when the queue is full or it fails to start a thread.
557 /// If the return value is not `TryScheduleError::QueueFull` then it added the job to the queue.
558 #[allow(clippy::missing_panics_doc)]
559 pub fn try_schedule(&self, f: impl FnOnce() + Send + 'static) -> Result<(), TryScheduleError> {
560 match self.sender.try_send(Box::new(f)) {
561 Ok(_) => {}
562 Err(TrySendError::Disconnected(_)) => unreachable!(),
563 Err(TrySendError::Full(_)) => return Err(TryScheduleError::QueueFull),
564 };
565 self.inner.start_threads().map_err(std::convert::Into::into)
566 }
567
568 /// Consumes the thread pool and waits for all threads to stop.
569 pub fn join(self) {
570 let inner = self.inner.clone();
571 drop(self);
572 while inner.num_live_threads() > 0 {
573 sleep_ms(10);
574 }
575 }
576
577 /// Consumes the thread pool and waits for all threads to stop.
578 ///
579 /// # Errors
580 /// Returns an error if the threads do not stop within the timeout duration.
581 pub fn try_join(self, timeout: Duration) -> Result<(), String> {
582 let inner = self.inner.clone();
583 drop(self);
584 let deadline = Instant::now() + timeout;
585 loop {
586 if inner.num_live_threads() < 1 {
587 return Ok(());
588 }
589 if deadline < Instant::now() {
590 return Err("timed out waiting for ThreadPool workers to stop".to_string());
591 }
592 sleep_ms(10);
593 }
594 }
595}
596impl Debug for ThreadPool {
597 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
598 write!(
599 f,
600 "ThreadPool{{{:?},size={:?}}}",
601 self.inner.name, self.inner.size
602 )
603 }
604}