rusty_pool/
lib.rs

1#[cfg(feature = "async")]
2use futures::{
3    future::BoxFuture,
4    task::{waker_ref, ArcWake},
5};
6use futures_channel::oneshot;
7use futures_executor::block_on;
8use std::future::Future;
9use std::option::Option;
10use std::sync::{
11    atomic::{AtomicUsize, Ordering},
12    Arc, Condvar, Mutex,
13};
14#[cfg(feature = "async")]
15use std::task::Context;
16use std::thread;
17use std::time::Duration;
18
19const BITS: usize = std::mem::size_of::<usize>() * 8;
20/// The absolute maximum number of workers. This corresponds to the maximum value that can be stored within half the bits of usize,
21/// as two counters (total workers and idle workers) are stored in one AtomicUsize.
22pub const MAX_SIZE: usize = (1 << (BITS / 2)) - 1;
23
24type Job = Box<dyn FnOnce() + Send + 'static>;
25
26/// Trait to implement for all items that may be executed by the `ThreadPool`.
27pub trait Task<R: Send>: Send {
28    /// Execute this task and return its result.
29    fn run(self) -> R;
30
31    /// Transform this `Task` into a heap allocated `FnOnce` if possible.
32    ///
33    /// Used by [`ThreadPool::execute`](struct.ThreadPool.html#method.execute) to turn this `Task` into a `Job`
34    /// directly without having to create an additional `Job` that calls this `Task`.
35    fn into_fn(self) -> Option<Box<dyn FnOnce() -> R + Send + 'static>>;
36
37    /// Return `true` if calling [`Task::into_fn`] on this `Task` returns `Some`.
38    fn is_fn(&self) -> bool;
39}
40
41/// Implement the `Task` trait for any FnOnce closure that returns a thread-safe result.
42impl<R, F> Task<R> for F
43where
44    R: Send,
45    F: FnOnce() -> R + Send + 'static,
46{
47    fn run(self) -> R {
48        self()
49    }
50
51    fn into_fn(self) -> Option<Box<dyn FnOnce() -> R + Send + 'static>> {
52        Some(Box::new(self))
53    }
54
55    fn is_fn(&self) -> bool {
56        true
57    }
58}
59
60/// Handle returned by [`ThreadPool::evaluate`](struct.ThreadPool.html#method.evaluate) and [`ThreadPool::complete`](struct.ThreadPool.html#method.complete)
61/// that allows to block the current thread and wait for the result of a submitted task. The returned `JoinHandle` may also be sent to the [`ThreadPool`](struct.ThreadPool.html)
62/// to create a task that blocks a worker thread until the task is completed and then does something with the result. This handle communicates with the worker thread
63/// using a oneshot channel blocking the thread when [`try_await_complete()`](struct.JoinHandle.html#method.try_await_complete) is called until a message, i.e. the result of the
64/// task, is received.
65pub struct JoinHandle<T: Send> {
66    pub receiver: oneshot::Receiver<T>,
67}
68
69impl<T: Send> JoinHandle<T> {
70    /// Block the current thread until the result of the task is received.
71    ///
72    /// # Errors
73    ///
74    /// This function might return a `oneshot::Canceled` if the channel was broken
75    /// before the result was received. This is generally the case if execution of
76    /// the task panicked.
77    pub fn try_await_complete(self) -> Result<T, oneshot::Canceled> {
78        block_on(self.receiver)
79    }
80
81    /// Block the current thread until the result of the task is received.
82    ///
83    /// # Panics
84    ///
85    /// This function might panic if [`try_await_complete()`](struct.JoinHandle.html#method.try_await_complete) returns `oneshot::Canceled`.
86    /// This is generally the case if execution of the task panicked and the sender was dropped before sending a result to the receiver.
87    pub fn await_complete(self) -> T {
88        self.try_await_complete()
89            .expect("could not receive message because channel was cancelled")
90    }
91}
92
93#[cfg(feature = "async")]
94struct AsyncTask {
95    future: Mutex<Option<BoxFuture<'static, ()>>>,
96    pool: ThreadPool,
97}
98
99/// Implement `ArcWake` for `AsyncTask` by re-submitting the `AsyncTask` i.e. the `Future` to the pool.
100#[cfg(feature = "async")]
101impl ArcWake for AsyncTask {
102    fn wake_by_ref(arc_self: &Arc<Self>) {
103        let cloned_task = arc_self.clone();
104        arc_self
105            .pool
106            .try_execute(cloned_task)
107            .expect("failed to wake future because message could not be sent to pool");
108    }
109}
110
111/// Implement the `Task` trait for `AsyncTask` in order to make it executable for the pool by
112/// creating a waker and polling the future.
113#[cfg(feature = "async")]
114impl Task<()> for Arc<AsyncTask> {
115    fn run(self) {
116        let mut future_slot = self.future.lock().expect("failed to acquire mutex");
117        if let Some(mut future) = future_slot.take() {
118            let waker = waker_ref(&self);
119            let context = &mut Context::from_waker(&*waker);
120            if future.as_mut().poll(context).is_pending() {
121                *future_slot = Some(future);
122            }
123        }
124    }
125
126    fn into_fn(self) -> Option<Box<dyn FnOnce() + Send + 'static>> {
127        None
128    }
129
130    fn is_fn(&self) -> bool {
131        false
132    }
133}
134
135// assert that Send is implemented
136trait ThreadSafe: Send {}
137
138impl<R: Send> ThreadSafe for dyn Task<R> {}
139
140impl<R: Send> ThreadSafe for JoinHandle<R> {}
141
142impl ThreadSafe for ThreadPool {}
143
144/// Self growing / shrinking `ThreadPool` implementation based on crossbeam's
145/// multi-producer multi-consumer channels that enables awaiting the result of a
146/// task and offers async support.
147///
148/// This `ThreadPool` has two different pool sizes; a core pool size filled with
149/// threads that live for as long as the channel and a max pool size which describes
150/// the maximum amount of worker threads that may live at the same time.
151/// Those additional non-core threads have a specific keep_alive time described when
152/// creating the `ThreadPool` that defines how long such threads may be idle for
153/// without receiving any work before giving up and terminating their work loop.
154///
155/// This `ThreadPool` does not spawn any threads until a task is submitted to it.
156/// Then it will create a new thread for each task until the core pool size is full.
157/// After that a new thread will only be created upon an `execute()` call if the
158/// current pool is lower than the max pool size and there are no idle threads.
159///
160/// Functions like `evaluate()` and `complete()` return a `JoinHandle` that may be used
161/// to await the result of a submitted task or future. JoinHandles may be sent to the
162/// thread pool to create a task that blocks a worker thread until it receives the
163/// result of the other task and then operates on the result. If the task panics the
164/// `JoinHandle` receives a cancellation error. This is implemented using a futures
165/// oneshot channel to communicate with the worker thread.
166///
167/// This `ThreadPool` may be used as a futures executor if the "async" feature is enabled,
168/// which is the case by default. The "async" feature includes the `spawn()` and
169/// `try_spawn()` functions which create a task that polls the future one by one and
170/// creates a waker that re-submits the future to the pool when it can make progress.
171/// Without the "async" feature, futures can simply be executed to completion using
172/// the `complete` function, which simply blocks a worker thread until the future has
173/// been polled to completion.
174///
175/// The "async" feature can be disabled if not need by adding the following to your
176/// Cargo dependency:
177/// ```toml
178/// [dependencies.rusty_pool]
179/// default-features = false
180/// version = "*"
181/// ```
182///
183/// When creating a new worker this `ThreadPool` tries to increment the worker count
184/// using a compare-and-swap mechanism, if the increment fails because the total worker
185/// count has been incremented to the specified limit (the core_size when trying to
186/// create a core thread, else the max_size) by another thread, the pool tries to create
187/// a non-core worker instead (if previously trying to create a core worker and no idle
188/// worker exists) or sends the task to the channel instead. Panicking workers are always
189/// cloned and replaced.
190///
191/// Locks are only used for the join functions to lock the `Condvar`, apart from that
192/// this `ThreadPool` implementation fully relies on crossbeam and atomic operations.
193/// This `ThreadPool` decides whether it is currently idle (and should fast-return
194/// join attempts) by comparing the total worker count to the idle worker count, which
195/// are two values stored in one `AtomicUsize` (both half the size of usize) making sure
196/// that if both are updated they may be updated in a single atomic operation.
197///
198/// The thread pool and its crossbeam channel can be destroyed by using the shutdown
199/// function, however that does not stop tasks that are already running but will
200/// terminate the thread the next time it will try to fetch work from the channel.
201/// The channel is only destroyed once all clones of the `ThreadPool` have been
202/// shut down / dropped.
203///
204/// # Usage
205/// Create a new `ThreadPool`:
206/// ```rust
207/// use rusty_pool::Builder;
208/// use rusty_pool::ThreadPool;
209/// // Create default `ThreadPool` configuration with the number of CPUs as core pool size
210/// let pool = ThreadPool::default();
211/// // Create a `ThreadPool` with default naming:
212/// use std::time::Duration;
213/// let pool2 = ThreadPool::new(5, 50, Duration::from_secs(60));
214/// // Create a `ThreadPool` with a custom name:
215/// let pool3 = ThreadPool::new_named(String::from("my_pool"), 5, 50, Duration::from_secs(60));
216/// // using the Builder struct:
217/// let pool4 = Builder::new().core_size(5).max_size(50).build();
218/// ```
219///
220/// Submit a closure for execution in the `ThreadPool`:
221/// ```rust
222/// use rusty_pool::ThreadPool;
223/// use std::thread;
224/// use std::time::Duration;
225/// let pool = ThreadPool::default();
226/// pool.execute(|| {
227///     thread::sleep(Duration::from_secs(5));
228///     print!("hello");
229/// });
230/// ```
231///
232/// Submit a task and await the result:
233/// ```rust
234/// use rusty_pool::ThreadPool;
235/// use std::thread;
236/// use std::time::Duration;
237/// let pool = ThreadPool::default();
238/// let handle = pool.evaluate(|| {
239///     thread::sleep(Duration::from_secs(5));
240///     return 4;
241/// });
242/// let result = handle.await_complete();
243/// assert_eq!(result, 4);
244/// ```
245///
246/// Spawn futures using the `ThreadPool`:
247/// ```rust
248/// async fn some_async_fn(x: i32, y: i32) -> i32 {
249///     x + y
250/// }
251///
252/// async fn other_async_fn(x: i32, y: i32) -> i32 {
253///     x - y
254/// }
255///
256/// use rusty_pool::ThreadPool;
257/// let pool = ThreadPool::default();
258///
259/// // simply complete future by blocking a worker until the future has been completed
260/// let handle = pool.complete(async {
261///     let a = some_async_fn(4, 6).await; // 10
262///     let b = some_async_fn(a, 3).await; // 13
263///     let c = other_async_fn(b, a).await; // 3
264///     some_async_fn(c, 5).await // 8
265/// });
266/// assert_eq!(handle.await_complete(), 8);
267///
268/// use std::sync::{Arc, atomic::{AtomicI32, Ordering}};
269///
270/// // spawn future and create waker that automatically re-submits itself to the threadpool if ready to make progress, this requires the "async" feature which is enabled by default
271/// let count = Arc::new(AtomicI32::new(0));
272/// let clone = count.clone();
273/// pool.spawn(async move {
274///     let a = some_async_fn(3, 6).await; // 9
275///     let b = other_async_fn(a, 4).await; // 5
276///     let c = some_async_fn(b, 7).await; // 12
277///     clone.fetch_add(c, Ordering::Relaxed);
278/// });
279/// pool.join();
280/// assert_eq!(count.load(Ordering::Relaxed), 12);
281/// ```
282///
283/// Join and shut down the `ThreadPool`:
284/// ```rust
285/// use std::thread;
286/// use std::time::Duration;
287/// use rusty_pool::ThreadPool;
288/// use std::sync::{Arc, atomic::{AtomicI32, Ordering}};
289///
290/// let pool = ThreadPool::default();
291/// for _ in 0..10 {
292///     pool.execute(|| { thread::sleep(Duration::from_secs(10)) })
293/// }
294/// // wait for all threads to become idle, i.e. all tasks to be completed including tasks added by other threads after join() is called by this thread or for the timeout to be reached
295/// pool.join_timeout(Duration::from_secs(5));
296///
297/// let count = Arc::new(AtomicI32::new(0));
298/// for _ in 0..15 {
299///     let clone = count.clone();
300///     pool.execute(move || {
301///         thread::sleep(Duration::from_secs(5));
302///         clone.fetch_add(1, Ordering::Relaxed);
303///     });
304/// }
305///
306/// // shut down and drop the only instance of this `ThreadPool` (no clones) causing the channel to be broken leading all workers to exit after completing their current work
307/// // and wait for all workers to become idle, i.e. finish their work.
308/// pool.shutdown_join();
309/// assert_eq!(count.load(Ordering::Relaxed), 15);
310/// ```
311#[derive(Clone)]
312pub struct ThreadPool {
313    core_size: usize,
314    max_size: usize,
315    keep_alive: Duration,
316    channel_data: Arc<ChannelData>,
317    worker_data: Arc<WorkerData>,
318}
319
320impl ThreadPool {
321    /// Construct a new `ThreadPool` with the specified core pool size, max pool size
322    /// and keep_alive time for non-core threads. This function does not spawn any
323    /// threads. This `ThreadPool` will receive a default name in the following format:
324    /// "rusty_pool_" + pool number.
325    ///
326    /// `core_size` specifies the amount of threads to keep alive for as long as
327    /// the `ThreadPool` exists and its channel remains connected.
328    ///
329    /// `max_size` specifies the maximum number of worker threads that may exist
330    /// at the same time.
331    ///
332    /// `keep_alive` specifies the duration for which to keep non-core pool
333    /// worker threads alive while they do not receive any work.
334    ///
335    /// # Panics
336    ///
337    /// This function will panic if max_size is 0, lower than core_size or exceeds half
338    /// the size of usize. This restriction exists because two counters (total workers and
339    /// idle counters) are stored within one AtomicUsize.
340    pub fn new(core_size: usize, max_size: usize, keep_alive: Duration) -> Self {
341        static POOL_COUNTER: AtomicUsize = AtomicUsize::new(1);
342        let name = format!(
343            "rusty_pool_{}",
344            POOL_COUNTER.fetch_add(1, Ordering::Relaxed)
345        );
346        ThreadPool::new_named(name, core_size, max_size, keep_alive)
347    }
348
349    /// Construct a new `ThreadPool` with the specified name, core pool size, max pool size
350    /// and keep_alive time for non-core threads. This function does not spawn any
351    /// threads.
352    ///
353    /// `name` the name of the `ThreadPool` that will be used as prefix for each
354    /// thread.
355    ///
356    /// `core_size` specifies the amount of threads to keep alive for as long as
357    /// the `ThreadPool` exists and its channel remains connected.
358    ///
359    /// `max_size` specifies the maximum number of worker threads that may exist
360    /// at the same time.
361    ///
362    /// `keep_alive` specifies the duration for which to keep non-core pool
363    /// worker threads alive while they do not receive any work.
364    ///
365    /// # Panics
366    ///
367    /// This function will panic if max_size is 0, lower than core_size or exceeds half
368    /// the size of usize. This restriction exists because two counters (total workers and
369    /// idle counters) are stored within one AtomicUsize.
370    pub fn new_named(
371        name: String,
372        core_size: usize,
373        max_size: usize,
374        keep_alive: Duration,
375    ) -> Self {
376        let (sender, receiver) = crossbeam_channel::unbounded();
377
378        if max_size == 0 || max_size < core_size {
379            panic!("max_size must be greater than 0 and greater or equal to the core pool size");
380        } else if max_size > MAX_SIZE {
381            panic!(
382                "max_size may not exceed {}, the maximum value that can be stored within half the bits of usize ({} -> {} bits in this case)",
383                MAX_SIZE,
384                BITS,
385                BITS / 2
386            );
387        }
388
389        let worker_data = WorkerData {
390            pool_name: name,
391            worker_count_data: WorkerCountData::default(),
392            worker_number: AtomicUsize::new(1),
393            join_notify_condvar: Condvar::new(),
394            join_notify_mutex: Mutex::new(()),
395            join_generation: AtomicUsize::new(0),
396        };
397
398        let channel_data = ChannelData { sender, receiver };
399
400        Self {
401            core_size,
402            max_size,
403            keep_alive,
404            channel_data: Arc::new(channel_data),
405            worker_data: Arc::new(worker_data),
406        }
407    }
408
409    /// Get the number of live workers, includes all workers waiting for work or executing tasks.
410    ///
411    /// This counter is incremented when creating a new worker. The value is increment just before
412    /// the worker starts executing its initial task. Incrementing the worker total might fail
413    /// if the total has already reached the specified limit (either core_size or max_size) after
414    /// being incremented by another thread, as of rusty_pool 0.5.0 failed attempts to create a worker
415    /// no longer skews the worker total as failed attempts to increment the worker total does not
416    /// increment the value at all.
417    /// This counter is decremented when a worker reaches the end of its working loop, which for non-core
418    /// threads might happen if it does not receive any work during its keep alive time,
419    /// for core threads this only happens once the channel is disconnected.
420    pub fn get_current_worker_count(&self) -> usize {
421        self.worker_data.worker_count_data.get_total_worker_count()
422    }
423
424    /// Get the number of workers currently waiting for work. Those threads are currently
425    /// polling from the crossbeam receiver. Core threads wait indefinitely and might remain
426    /// in this state until the `ThreadPool` is dropped. The remaining threads give up after
427    /// waiting for the specified keep_alive time.
428    pub fn get_idle_worker_count(&self) -> usize {
429        self.worker_data.worker_count_data.get_idle_worker_count()
430    }
431
432    /// Send a new task to the worker threads. This function is responsible for sending the message through the
433    /// channel and creating new workers if needed. If the current worker count is lower than the core pool size
434    /// this function will always create a new worker. If the current worker count is equal to or greater than
435    /// the core pool size this function only creates a new worker if the worker count is below the max pool size
436    /// and there are no idle threads.
437    ///
438    /// When attempting to increment the total worker count before creating a worker fails due to the
439    /// counter reaching the provided limit (core_size when attempting to create core thread, else
440    /// max_size) after being incremented by another thread, the pool tries to create
441    /// a non-core worker instead (if previously trying to create a core worker and no idle
442    /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
443    /// either because the current value of the counter matched the expected value or because the
444    /// last observed value was still below the limit, the worker starts with the provided task as
445    /// initial task and spawns its thread.
446    ///
447    /// # Panics
448    ///
449    /// This function might panic if `try_execute` returns an error when the crossbeam channel has been
450    /// closed unexpectedly.
451    /// This should never occur under normal circumstances using safe code, as shutting down the `ThreadPool`
452    /// consumes ownership and the crossbeam channel is never dropped unless dropping the `ThreadPool`.
453    pub fn execute<T: Task<()> + 'static>(&self, task: T) {
454        if self.try_execute(task).is_err() {
455            panic!("the channel of the thread pool has been closed");
456        }
457    }
458
459    /// Send a new task to the worker threads. This function is responsible for sending the message through the
460    /// channel and creating new workers if needed. If the current worker count is lower than the core pool size
461    /// this function will always create a new worker. If the current worker count is equal to or greater than
462    /// the core pool size this function only creates a new worker if the worker count is below the max pool size
463    /// and there are no idle threads.
464    ///
465    /// When attempting to increment the total worker count before creating a worker fails due to the
466    /// counter reaching the provided limit (core_size when attempting to create core thread, else
467    /// max_size) after being incremented by another thread, the pool tries to create
468    /// a non-core worker instead (if previously trying to create a core worker and no idle
469    /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
470    /// either because the current value of the counter matched the expected value or because the
471    /// last observed value was still below the limit, the worker starts with the provided task as
472    /// initial task and spawns its thread.
473    ///
474    /// # Errors
475    ///
476    /// This function might return `crossbeam_channel::SendError` if the sender was dropped unexpectedly.
477    pub fn try_execute<T: Task<()> + 'static>(
478        &self,
479        task: T,
480    ) -> Result<(), crossbeam_channel::SendError<Job>> {
481        if task.is_fn() {
482            self.try_execute_task(
483                task.into_fn()
484                    .expect("Task::into_fn returned None despite is_fn returning true"),
485            )
486        } else {
487            self.try_execute_task(Box::new(move || {
488                task.run();
489            }))
490        }
491    }
492
493    /// Send a new task to the worker threads and return a [`JoinHandle`](struct.JoinHandle.html) that may be used to await
494    /// the result. This function is responsible for sending the message through the channel and creating new
495    /// workers if needed. If the current worker count is lower than the core pool size this function will always
496    /// create a new worker. If the current worker count is equal to or greater than the core pool size this
497    /// function only creates a new worker if the worker count is below the max pool size and there are no idle
498    /// threads.
499    ///
500    /// When attempting to increment the total worker count before creating a worker fails due to the
501    /// counter reaching the provided limit (core_size when attempting to create core thread, else
502    /// max_size) after being incremented by another thread, the pool tries to create
503    /// a non-core worker instead (if previously trying to create a core worker and no idle
504    /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
505    /// either because the current value of the counter matched the expected value or because the
506    /// last observed value was still below the limit, the worker starts with the provided task as
507    /// initial task and spawns its thread.
508    ///
509    /// # Panics
510    ///
511    /// This function might panic if `try_execute` returns an error when the crossbeam channel has been
512    /// closed unexpectedly.
513    /// This should never occur under normal circumstances using safe code, as shutting down the `ThreadPool`
514    /// consumes ownership and the crossbeam channel is never dropped unless dropping the `ThreadPool`.
515    pub fn evaluate<R: Send + 'static, T: Task<R> + 'static>(&self, task: T) -> JoinHandle<R> {
516        match self.try_evaluate(task) {
517            Ok(handle) => handle,
518            Err(e) => panic!("the channel of the thread pool has been closed: {:?}", e),
519        }
520    }
521
522    /// Send a new task to the worker threads and return a [`JoinHandle`](struct.JoinHandle.html) that may be used to await
523    /// the result. This function is responsible for sending the message through the channel and creating new
524    /// workers if needed. If the current worker count is lower than the core pool size this function will always
525    /// create a new worker. If the current worker count is equal to or greater than the core pool size this
526    /// function only creates a new worker if the worker count is below the max pool size and there are no idle
527    /// threads.
528    ///
529    /// When attempting to increment the total worker count before creating a worker fails due to the
530    /// counter reaching the provided limit (core_size when attempting to create core thread, else
531    /// max_size) after being incremented by another thread, the pool tries to create
532    /// a non-core worker instead (if previously trying to create a core worker and no idle
533    /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
534    /// either because the current value of the counter matched the expected value or because the
535    /// last observed value was still below the limit, the worker starts with the provided task as
536    /// initial task and spawns its thread.
537    ///
538    /// # Errors
539    ///
540    /// This function might return `crossbeam_channel::SendError` if the sender was dropped unexpectedly.
541    pub fn try_evaluate<R: Send + 'static, T: Task<R> + 'static>(
542        &self,
543        task: T,
544    ) -> Result<JoinHandle<R>, crossbeam_channel::SendError<Job>> {
545        let (sender, receiver) = oneshot::channel::<R>();
546        let join_handle = JoinHandle { receiver };
547        let job = || {
548            let result = task.run();
549            // if the receiver was dropped that means the caller was not interested in the result
550            let _ignored_result = sender.send(result);
551        };
552
553        let execute_attempt = self.try_execute_task(Box::new(job));
554        execute_attempt.map(|_| join_handle)
555    }
556
557    /// Send a task to the `ThreadPool` that completes the given `Future` and return a [`JoinHandle`](struct.JoinHandle.html)
558    /// that may be used to await the result. This function simply calls [`evaluate()`](struct.ThreadPool.html#method.evaluate)
559    /// with a closure that calls `block_on` with the provided future.
560    ///
561    /// # Panic
562    ///
563    /// This function panics if the task fails to be sent to the `ThreadPool` due to the channel being broken.
564    pub fn complete<R: Send + 'static>(
565        &self,
566        future: impl Future<Output = R> + 'static + Send,
567    ) -> JoinHandle<R> {
568        self.evaluate(|| block_on(future))
569    }
570
571    /// Send a task to the `ThreadPool` that completes the given `Future` and return a [`JoinHandle`](struct.JoinHandle.html)
572    /// that may be used to await the result. This function simply calls [`try_evaluate()`](struct.ThreadPool.html#method.try_evaluate)
573    /// with a closure that calls `block_on` with the provided future.
574    ///
575    /// # Errors
576    ///
577    /// This function returns `crossbeam_channel::SendError` if the task fails to be sent to the `ThreadPool` due to the channel being broken.
578    pub fn try_complete<R: Send + 'static>(
579        &self,
580        future: impl Future<Output = R> + 'static + Send,
581    ) -> Result<JoinHandle<R>, crossbeam_channel::SendError<Job>> {
582        self.try_evaluate(|| block_on(future))
583    }
584
585    /// Submit a `Future` to be polled by this `ThreadPool`. Unlike [`complete()`](struct.ThreadPool.html#method.complete) this does not
586    /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
587    /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
588    /// function simply constructs the `AsyncTask` and calls [`execute()`](struct.ThreadPool.html#method.execute).
589    ///
590    /// # Panic
591    ///
592    /// This function panics if the task fails to be sent to the `ThreadPool` due to the channel being broken.
593    #[cfg(feature = "async")]
594    pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
595        let future_task = Arc::new(AsyncTask {
596            future: Mutex::new(Some(Box::pin(future))),
597            pool: self.clone(),
598        });
599
600        self.execute(future_task)
601    }
602
603    /// Submit a `Future` to be polled by this `ThreadPool`. Unlike [`try_complete()`](struct.ThreadPool.html#method.try_complete) this does not
604    /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
605    /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
606    /// function simply constructs the `AsyncTask` and calls [`try_execute()`](struct.ThreadPool.html#method.try_execute).
607    ///
608    /// # Errors
609    ///
610    /// This function returns `crossbeam_channel::SendError` if the task fails to be sent to the `ThreadPool` due to the channel being broken.
611    #[cfg(feature = "async")]
612    pub fn try_spawn(
613        &self,
614        future: impl Future<Output = ()> + 'static + Send,
615    ) -> Result<(), crossbeam_channel::SendError<Job>> {
616        let future_task = Arc::new(AsyncTask {
617            future: Mutex::new(Some(Box::pin(future))),
618            pool: self.clone(),
619        });
620
621        self.try_execute(future_task)
622    }
623
624    /// Create a top-level `Future` that awaits the provided `Future` and then sends the result to the
625    /// returned [`JoinHandle`](struct.JoinHandle.html). Unlike [`complete()`](struct.ThreadPool.html#method.complete) this does not
626    /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
627    /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
628    /// function simply constructs the `AsyncTask` and calls [`execute()`](struct.ThreadPool.html#method.execute).
629    ///
630    /// This enables awaiting the final result outside of an async context like [`complete()`](struct.ThreadPool.html#method.complete) while still
631    /// polling the future lazily instead of eagerly blocking the worker until the future is done.
632    ///
633    /// # Panic
634    ///
635    /// This function panics if the task fails to be sent to the `ThreadPool` due to the channel being broken.
636    #[cfg(feature = "async")]
637    pub fn spawn_await<R: Send + 'static>(
638        &self,
639        future: impl Future<Output = R> + 'static + Send,
640    ) -> JoinHandle<R> {
641        match self.try_spawn_await(future) {
642            Ok(handle) => handle,
643            Err(e) => panic!("the channel of the thread pool has been closed: {:?}", e),
644        }
645    }
646
647    /// Create a top-level `Future` that awaits the provided `Future` and then sends the result to the
648    /// returned [`JoinHandle`](struct.JoinHandle.html). Unlike [`try_complete()`](struct.ThreadPool.html#method.try_complete) this does not
649    /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
650    /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
651    /// function simply constructs the `AsyncTask` and calls [`try_execute()`](struct.ThreadPool.html#method.try_execute).
652    ///
653    /// This enables awaiting the final result outside of an async context like [`complete()`](struct.ThreadPool.html#method.complete) while still
654    /// polling the future lazily instead of eagerly blocking the worker until the future is done.
655    ///
656    /// # Errors
657    ///
658    /// This function returns `crossbeam_channel::SendError` if the task fails to be sent to the `ThreadPool` due to the channel being broken.
659    #[cfg(feature = "async")]
660    pub fn try_spawn_await<R: Send + 'static>(
661        &self,
662        future: impl Future<Output = R> + 'static + Send,
663    ) -> Result<JoinHandle<R>, crossbeam_channel::SendError<Job>> {
664        let (sender, receiver) = oneshot::channel::<R>();
665        let join_handle = JoinHandle { receiver };
666
667        self.try_spawn(async {
668            let result = future.await;
669            // if the receiver was dropped that means the caller was not interested in the result
670            let _ignored_result = sender.send(result);
671        })
672        .map(|_| join_handle)
673    }
674
675    #[inline]
676    fn try_execute_task(&self, task: Job) -> Result<(), crossbeam_channel::SendError<Job>> {
677        // create a new worker either if the current worker count is lower than the core pool size
678        // or if there are no idle threads and the current worker count is lower than the max pool size
679        let worker_count_data = &self.worker_data.worker_count_data;
680        let mut worker_count_val = worker_count_data.worker_count.load(Ordering::Relaxed);
681        let (mut curr_worker_count, idle_worker_count) = WorkerCountData::split(worker_count_val);
682        let mut curr_idle_count = idle_worker_count;
683
684        // always create a new worker if current pool size is below core size
685        if curr_worker_count < self.core_size {
686            let witnessed =
687                worker_count_data.try_increment_worker_total(worker_count_val, self.core_size);
688
689            // the witnessed value matched the expected value, meaning the initial exchange succeeded, or the final witnessed
690            // value is still below the coreSize, meaning the increment eventually succeeded
691            if witnessed == worker_count_val
692                || WorkerCountData::get_total_count(witnessed) < self.core_size
693            {
694                let worker = Worker::new(
695                    self.channel_data.receiver.clone(),
696                    Arc::clone(&self.worker_data),
697                    None,
698                );
699
700                worker.start(Some(task));
701                return Ok(());
702            }
703
704            curr_worker_count = WorkerCountData::get_total_count(witnessed);
705            curr_idle_count = WorkerCountData::get_idle_count(witnessed);
706            worker_count_val = witnessed;
707        }
708
709        // create a new worker if the current worker count is below the maxSize and the pool has been observed to be busy
710        // (no idle workers) during the invocation of this function
711        if curr_worker_count < self.max_size && (idle_worker_count == 0 || curr_idle_count == 0) {
712            let witnessed =
713                worker_count_data.try_increment_worker_total(worker_count_val, self.max_size);
714
715            if witnessed == worker_count_val
716                || WorkerCountData::get_total_count(witnessed) < self.max_size
717            {
718                let worker = Worker::new(
719                    self.channel_data.receiver.clone(),
720                    Arc::clone(&self.worker_data),
721                    Some(self.keep_alive),
722                );
723
724                worker.start(Some(task));
725                return Ok(());
726            }
727        }
728
729        self.send_task_to_channel(task)
730    }
731
732    /// Blocks the current thread until there aren't any non-idle threads anymore.
733    /// This includes work started after calling this function.
734    /// This function blocks until the next time this `ThreadPool` completes all of its work,
735    /// except if all threads are idle and the channel is empty at the time of calling this
736    /// function, in which case it will fast-return.
737    ///
738    /// This utilizes a `Condvar` that is notified by workers when they complete a job and notice
739    /// that the channel is currently empty and it was the last thread to finish the current
740    /// generation of work (i.e. when incrementing the idle worker counter brings the value
741    /// up to the total worker counter, meaning it's the last thread to become idle).
742    pub fn join(&self) {
743        self.inner_join(None);
744    }
745
746    /// Blocks the current thread until there aren't any non-idle threads anymore or until the
747    /// specified time_out Duration passes, whichever happens first.
748    /// This includes work started after calling this function.
749    /// This function blocks until the next time this `ThreadPool` completes all of its work,
750    /// (or until the time_out is reached) except if all threads are idle and the channel is
751    /// empty at the time of calling this function, in which case it will fast-return.
752    ///
753    /// This utilizes a `Condvar` that is notified by workers when they complete a job and notice
754    /// that the channel is currently empty and it was the last thread to finish the current
755    /// generation of work (i.e. when incrementing the idle worker counter brings the value
756    /// up to the total worker counter, meaning it's the last thread to become idle).
757    pub fn join_timeout(&self, time_out: Duration) {
758        self.inner_join(Some(time_out));
759    }
760
761    /// Destroy this `ThreadPool` by claiming ownership and dropping the value,
762    /// causing the `Sender` to drop thus disconnecting the channel.
763    /// Threads in this pool that are currently executing a task will finish what
764    /// they're doing until they check the channel, discovering that it has been
765    /// disconnected from the sender and thus terminate their work loop.
766    ///
767    /// If other clones of this `ThreadPool` exist the sender will remain intact
768    /// and tasks submitted to those clones will succeed, this includes pending
769    /// `AsyncTask` instances as they hold an owned clone of the `ThreadPool`
770    /// to re-submit awakened futures.
771    pub fn shutdown(self) {
772        drop(self);
773    }
774
775    /// Destroy this `ThreadPool` by claiming ownership and dropping the value,
776    /// causing the `Sender` to drop thus disconnecting the channel.
777    /// Threads in this pool that are currently executing a task will finish what
778    /// they're doing until they check the channel, discovering that it has been
779    /// disconnected from the sender and thus terminate their work loop.
780    ///
781    /// If other clones of this `ThreadPool` exist the sender will remain intact
782    /// and tasks submitted to those clones will succeed, this includes pending
783    /// `AsyncTask` instances as they hold an owned clone of the `ThreadPool`
784    /// to re-submit awakened futures.
785    ///
786    /// This function additionally joins all workers after dropping the pool to
787    /// wait for all work to finish.
788    /// Blocks the current thread until there aren't any non-idle threads anymore.
789    /// This function blocks until this `ThreadPool` completes all of its work,
790    /// except if all threads are idle and the channel is empty at the time of
791    /// calling this function, in which case the join will fast-return.
792    /// If other live clones of this `ThreadPool` exist this behaves the same as
793    /// calling [`join`](struct.ThreadPool.html#method.join) on a live `ThreadPool` as tasks submitted
794    /// to one of the clones will be joined as well.
795    ///
796    /// The join utilizes a `Condvar` that is notified by workers when they complete a job and notice
797    /// that the channel is currently empty and it was the last thread to finish the current
798    /// generation of work (i.e. when incrementing the idle worker counter brings the value
799    /// up to the total worker counter, meaning it's the last thread to become idle).
800    pub fn shutdown_join(self) {
801        self.inner_shutdown_join(None);
802    }
803
804    /// Destroy this `ThreadPool` by claiming ownership and dropping the value,
805    /// causing the `Sender` to drop thus disconnecting the channel.
806    /// Threads in this pool that are currently executing a task will finish what
807    /// they're doing until they check the channel, discovering that it has been
808    /// disconnected from the sender and thus terminate their work loop.
809    ///
810    /// If other clones of this `ThreadPool` exist the sender will remain intact
811    /// and tasks submitted to those clones will succeed, this includes pending
812    /// `AsyncTask` instances as they hold an owned clone of the `ThreadPool`
813    /// to re-submit awakened futures.
814    ///
815    /// This function additionally joins all workers after dropping the pool to
816    /// wait for all work to finish.
817    /// Blocks the current thread until there aren't any non-idle threads anymore or until the
818    /// specified time_out Duration passes, whichever happens first.
819    /// This function blocks until this `ThreadPool` completes all of its work,
820    /// (or until the time_out is reached) except if all threads are idle and the channel is
821    /// empty at the time of calling this function, in which case the join will fast-return.
822    /// If other live clones of this `ThreadPool` exist this behaves the same as
823    /// calling [`join`](struct.ThreadPool.html#method.join) on a live `ThreadPool` as tasks submitted
824    /// to one of the clones will be joined as well.
825    ///
826    /// The join utilizes a `Condvar` that is notified by workers when they complete a job and notice
827    /// that the channel is currently empty and it was the last thread to finish the current
828    /// generation of work (i.e. when incrementing the idle worker counter brings the value
829    /// up to the total worker counter, meaning it's the last thread to become idle).
830    pub fn shutdown_join_timeout(self, timeout: Duration) {
831        self.inner_shutdown_join(Some(timeout));
832    }
833
834    /// Return the name of this pool, used as prefix for each worker thread.
835    pub fn get_name(&self) -> &str {
836        &self.worker_data.pool_name
837    }
838
839    /// Starts all core workers by creating core idle workers until the total worker count reaches the core count.
840    ///
841    /// Returns immediately if the current worker count is already >= core size.
842    pub fn start_core_threads(&self) {
843        let worker_count_data = &self.worker_data.worker_count_data;
844
845        let core_size = self.core_size;
846        let mut curr_worker_count = worker_count_data.worker_count.load(Ordering::Relaxed);
847        if WorkerCountData::get_total_count(curr_worker_count) >= core_size {
848            return;
849        }
850
851        loop {
852            let witnessed = worker_count_data.try_increment_worker_count(
853                curr_worker_count,
854                INCREMENT_TOTAL | INCREMENT_IDLE,
855                core_size,
856            );
857
858            if WorkerCountData::get_total_count(witnessed) >= core_size {
859                return;
860            }
861
862            let worker = Worker::new(
863                self.channel_data.receiver.clone(),
864                Arc::clone(&self.worker_data),
865                None,
866            );
867
868            worker.start(None);
869            curr_worker_count = witnessed;
870        }
871    }
872
873    #[inline]
874    fn send_task_to_channel(&self, task: Job) -> Result<(), crossbeam_channel::SendError<Job>> {
875        self.channel_data.sender.send(task)?;
876
877        Ok(())
878    }
879
880    #[inline]
881    fn inner_join(&self, time_out: Option<Duration>) {
882        ThreadPool::_do_join(&self.worker_data, &self.channel_data.receiver, time_out);
883    }
884
885    #[inline]
886    fn inner_shutdown_join(self, timeout: Option<Duration>) {
887        let current_worker_data = self.worker_data.clone();
888        let receiver = self.channel_data.receiver.clone();
889        drop(self);
890        ThreadPool::_do_join(&current_worker_data, &receiver, timeout);
891    }
892
893    #[inline]
894    fn _do_join(
895        current_worker_data: &Arc<WorkerData>,
896        receiver: &crossbeam_channel::Receiver<Job>,
897        time_out: Option<Duration>,
898    ) {
899        // no thread is currently doing any work, return
900        if ThreadPool::is_idle(current_worker_data, receiver) {
901            return;
902        }
903
904        let join_generation = current_worker_data.join_generation.load(Ordering::SeqCst);
905        let guard = current_worker_data
906            .join_notify_mutex
907            .lock()
908            .expect("could not get join notify mutex lock");
909
910        match time_out {
911            Some(time_out) => {
912                let _ret_guard = current_worker_data
913                    .join_notify_condvar
914                    .wait_timeout_while(guard, time_out, |_| {
915                        join_generation
916                            == current_worker_data.join_generation.load(Ordering::Relaxed)
917                            && !ThreadPool::is_idle(current_worker_data, receiver)
918                    })
919                    .expect("could not wait for join condvar");
920            }
921            None => {
922                let _ret_guard = current_worker_data
923                    .join_notify_condvar
924                    .wait_while(guard, |_| {
925                        join_generation
926                            == current_worker_data.join_generation.load(Ordering::Relaxed)
927                            && !ThreadPool::is_idle(current_worker_data, receiver)
928                    })
929                    .expect("could not wait for join condvar");
930            }
931        };
932
933        // increment generation if current thread is first thread to be awakened from wait in current generation
934        let _ = current_worker_data.join_generation.compare_exchange(
935            join_generation,
936            join_generation.wrapping_add(1),
937            Ordering::SeqCst,
938            Ordering::SeqCst,
939        );
940    }
941
942    #[inline]
943    fn is_idle(
944        current_worker_data: &Arc<WorkerData>,
945        receiver: &crossbeam_channel::Receiver<Job>,
946    ) -> bool {
947        let (current_worker_count, current_idle_count) =
948            current_worker_data.worker_count_data.get_both();
949        current_idle_count == current_worker_count && receiver.is_empty()
950    }
951}
952
953impl Default for ThreadPool {
954    /// create default ThreadPool with the core pool size being equal to the number of cpus
955    /// and the max_size being twice the core size with a 60 second timeout
956    fn default() -> Self {
957        let num_cpus = num_cpus::get();
958        ThreadPool::new(
959            num_cpus,
960            std::cmp::max(num_cpus, num_cpus * 2),
961            Duration::from_secs(60),
962        )
963    }
964}
965
966/// A helper struct to aid creating a new `ThreadPool` using default values where no value was
967/// explicitly specified.
968#[derive(Default)]
969pub struct Builder {
970    name: Option<String>,
971    core_size: Option<usize>,
972    max_size: Option<usize>,
973    keep_alive: Option<Duration>,
974}
975
976impl Builder {
977    /// Create a new `Builder`.
978    pub fn new() -> Builder {
979        Builder::default()
980    }
981
982    /// Specify the name of the `ThreadPool` that will be used as prefix for the name of each worker thread.
983    /// By default the name is "rusty_pool_x" with x being a static pool counter.
984    pub fn name(mut self, name: String) -> Builder {
985        self.name = Some(name);
986        self
987    }
988
989    /// Specify the core pool size for the `ThreadPool`. The core pool size is the number of threads that stay alive
990    /// for the entire lifetime of the `ThreadPool` or, to be more precise, its channel. These threads are spawned if
991    /// a task is submitted to the `ThreadPool` and the current worker count is below the core pool size.
992    pub fn core_size(mut self, size: usize) -> Builder {
993        self.core_size = Some(size);
994        self
995    }
996
997    /// Specify the maximum pool size this `ThreadPool` may scale up to. This numbers represents the maximum number
998    /// of threads that may be alive at the same time within this pool. Additional threads above the core pool size
999    /// only remain idle for the duration specified by the `keep_alive` parameter before terminating. If the core pool
1000    /// is full, the current pool size is below the max size and there are no idle threads then additional threads
1001    /// will be spawned.
1002    pub fn max_size(mut self, size: usize) -> Builder {
1003        self.max_size = Some(size);
1004        self
1005    }
1006
1007    /// Specify the duration for which additional threads outside the core pool remain alive while not receiving any
1008    /// work before giving up and terminating.
1009    pub fn keep_alive(mut self, keep_alive: Duration) -> Builder {
1010        self.keep_alive = Some(keep_alive);
1011        self
1012    }
1013
1014    /// Build the `ThreadPool` using the parameters previously supplied to this `Builder` using the number of CPUs as
1015    /// default core size if none provided, twice the core size as max size if none provided, 60 seconds keep_alive
1016    /// if none provided and the default naming (rusty_pool_{pool_number}) if none provided.
1017    /// This function calls [`ThreadPool::new`](struct.ThreadPool.html#method.new) or
1018    /// [`ThreadPool::new_named`](struct.ThreadPool.html#method.new_named) depending on whether a name was provided.
1019    ///
1020    /// # Panics
1021    ///
1022    /// Building might panic if the `max_size` is 0 or lower than `core_size` or exceeds half
1023    /// the size of usize. This restriction exists because two counters (total workers and
1024    /// idle counters) are stored within one AtomicUsize.
1025    pub fn build(self) -> ThreadPool {
1026        use std::cmp::{max, min};
1027
1028        let core_size = self.core_size.unwrap_or_else(|| {
1029            let num_cpus = num_cpus::get();
1030            if let Some(max_size) = self.max_size {
1031                min(MAX_SIZE, min(num_cpus, max_size))
1032            } else {
1033                min(MAX_SIZE, num_cpus)
1034            }
1035        });
1036        // handle potential overflow: try using twice the core_size or return core_size
1037        let max_size = self
1038            .max_size
1039            .unwrap_or_else(|| min(MAX_SIZE, max(core_size, core_size * 2)));
1040        let keep_alive = self.keep_alive.unwrap_or_else(|| Duration::from_secs(60));
1041
1042        if let Some(name) = self.name {
1043            ThreadPool::new_named(name, core_size, max_size, keep_alive)
1044        } else {
1045            ThreadPool::new(core_size, max_size, keep_alive)
1046        }
1047    }
1048}
1049
1050#[derive(Clone)]
1051struct Worker {
1052    receiver: crossbeam_channel::Receiver<Job>,
1053    worker_data: Arc<WorkerData>,
1054    keep_alive: Option<Duration>,
1055}
1056
1057impl Worker {
1058    fn new(
1059        receiver: crossbeam_channel::Receiver<Job>,
1060        worker_data: Arc<WorkerData>,
1061        keep_alive: Option<Duration>,
1062    ) -> Self {
1063        Worker {
1064            receiver,
1065            worker_data,
1066            keep_alive,
1067        }
1068    }
1069
1070    fn start(self, task: Option<Job>) {
1071        let worker_name = format!(
1072            "{}_thread_{}",
1073            self.worker_data.pool_name,
1074            self.worker_data
1075                .worker_number
1076                .fetch_add(1, Ordering::Relaxed)
1077        );
1078
1079        thread::Builder::new()
1080            .name(worker_name)
1081            .spawn(move || {
1082                let mut sentinel = Sentinel::new(&self);
1083
1084                if let Some(task) = task {
1085                    self.exec_task_and_notify(&mut sentinel, task);
1086                }
1087
1088                loop {
1089                    // the two functions return different error types, but since the error type doesn't matter it is mapped to unit to make them compatible
1090                    let received_task: Result<Job, _> = match self.keep_alive {
1091                        Some(keep_alive) => self.receiver.recv_timeout(keep_alive).map_err(|_| ()),
1092                        None => self.receiver.recv().map_err(|_| ()),
1093                    };
1094
1095                    match received_task {
1096                        Ok(task) => {
1097                            // mark current as no longer idle and execute task
1098                            self.worker_data.worker_count_data.decrement_worker_idle();
1099                            self.exec_task_and_notify(&mut sentinel, task);
1100                        }
1101                        Err(_) => {
1102                            // either channel was broken because the sender disconnected or, if can_timeout is true, the Worker has not received any work during
1103                            // its keep_alive period and will now terminate, break working loop
1104                            break;
1105                        }
1106                    }
1107                }
1108
1109                // can decrement both at once as the thread only gets here from an idle state
1110                // (if waiting for work and receiving an error)
1111                self.worker_data.worker_count_data.decrement_both();
1112            })
1113            .expect("could not spawn thread");
1114    }
1115
1116    #[inline]
1117    fn exec_task_and_notify(&self, sentinel: &mut Sentinel, task: Job) {
1118        sentinel.is_working = true;
1119        task();
1120        sentinel.is_working = false;
1121        // can already mark as idle as this thread will continue the work loop
1122        self.mark_idle_and_notify_joiners_if_no_work();
1123    }
1124
1125    #[inline]
1126    fn mark_idle_and_notify_joiners_if_no_work(&self) {
1127        let (old_total_count, old_idle_count) = self
1128            .worker_data
1129            .worker_count_data
1130            .increment_worker_idle_ret_both();
1131        // if the last task was the last one in the current generation,
1132        // i.e. if incrementing the idle count leads to the idle count
1133        // being equal to the total worker count, notify joiners
1134        if old_total_count == old_idle_count + 1 && self.receiver.is_empty() {
1135            let _lock = self
1136                .worker_data
1137                .join_notify_mutex
1138                .lock()
1139                .expect("could not get join notify mutex lock");
1140            self.worker_data.join_notify_condvar.notify_all();
1141        }
1142    }
1143}
1144
1145/// Type that exists to manage worker exit on panic.
1146///
1147/// This type is constructed once per `Worker` and implements `Drop` to handle proper worker exit
1148/// in case the worker panics when executing the current task or anywhere else in its work loop.
1149/// If the `Sentinel` is dropped at the end of the worker's work loop and the current thread is
1150/// panicking, handle worker exit the same way as if the task completed normally (if the worker
1151/// panicked while executing a submitted task) then clone the worker and start it with an initial
1152/// task of `None`.
1153struct Sentinel<'s> {
1154    is_working: bool,
1155    worker_ref: &'s Worker,
1156}
1157
1158impl Sentinel<'_> {
1159    fn new(worker_ref: &Worker) -> Sentinel<'_> {
1160        Sentinel {
1161            is_working: false,
1162            worker_ref,
1163        }
1164    }
1165}
1166
1167impl Drop for Sentinel<'_> {
1168    fn drop(&mut self) {
1169        if thread::panicking() {
1170            if self.is_working {
1171                // worker thread panicked in the process of executing a submitted task,
1172                // run the same logic as if the task completed normally and mark it as
1173                // idle, since a clone of this worker will start the work loop as idle
1174                // thread
1175                self.worker_ref.mark_idle_and_notify_joiners_if_no_work();
1176            }
1177
1178            let worker = self.worker_ref.clone();
1179            worker.start(None);
1180        }
1181    }
1182}
1183
1184const WORKER_IDLE_MASK: usize = MAX_SIZE;
1185const INCREMENT_TOTAL: usize = 1 << (BITS / 2);
1186const INCREMENT_IDLE: usize = 1;
1187
1188/// Struct that stores and handles an `AtomicUsize` that stores the total worker count
1189/// in the higher half of bits and the idle worker count in the lower half of bits.
1190/// This allows to to increment / decrement both counters in a single atomic operation.
1191#[derive(Default)]
1192struct WorkerCountData {
1193    worker_count: AtomicUsize,
1194}
1195
1196impl WorkerCountData {
1197    fn get_total_worker_count(&self) -> usize {
1198        let curr_val = self.worker_count.load(Ordering::Relaxed);
1199        WorkerCountData::get_total_count(curr_val)
1200    }
1201
1202    fn get_idle_worker_count(&self) -> usize {
1203        let curr_val = self.worker_count.load(Ordering::Relaxed);
1204        WorkerCountData::get_idle_count(curr_val)
1205    }
1206
1207    fn get_both(&self) -> (usize, usize) {
1208        let curr_val = self.worker_count.load(Ordering::Relaxed);
1209        WorkerCountData::split(curr_val)
1210    }
1211
1212    // keep for testing and completion's sake
1213    #[allow(dead_code)]
1214    fn increment_both(&self) -> (usize, usize) {
1215        let old_val = self
1216            .worker_count
1217            .fetch_add(INCREMENT_TOTAL | INCREMENT_IDLE, Ordering::Relaxed);
1218        WorkerCountData::split(old_val)
1219    }
1220
1221    fn decrement_both(&self) -> (usize, usize) {
1222        let old_val = self
1223            .worker_count
1224            .fetch_sub(INCREMENT_TOTAL | INCREMENT_IDLE, Ordering::Relaxed);
1225        WorkerCountData::split(old_val)
1226    }
1227
1228    fn try_increment_worker_total(&self, expected: usize, max_total: usize) -> usize {
1229        self.try_increment_worker_count(expected, INCREMENT_TOTAL, max_total)
1230    }
1231
1232    fn try_increment_worker_count(
1233        &self,
1234        mut expected: usize,
1235        increment: usize,
1236        max_total: usize,
1237    ) -> usize {
1238        loop {
1239            match self.worker_count.compare_exchange_weak(
1240                expected,
1241                expected + increment,
1242                Ordering::Relaxed,
1243                Ordering::Relaxed,
1244            ) {
1245                Ok(witnessed) => return witnessed,
1246                Err(witnessed) if WorkerCountData::get_total_count(witnessed) >= max_total => {
1247                    return witnessed
1248                }
1249                Err(witnessed) => expected = witnessed,
1250            }
1251        }
1252    }
1253
1254    // keep for testing and completion's sake
1255    #[allow(dead_code)]
1256    fn increment_worker_total(&self) -> usize {
1257        let old_val = self
1258            .worker_count
1259            .fetch_add(INCREMENT_TOTAL, Ordering::Relaxed);
1260        WorkerCountData::get_total_count(old_val)
1261    }
1262
1263    // keep for testing and completion's sake
1264    #[allow(dead_code)]
1265    fn increment_worker_total_ret_both(&self) -> (usize, usize) {
1266        let old_val = self
1267            .worker_count
1268            .fetch_add(INCREMENT_TOTAL, Ordering::Relaxed);
1269        WorkerCountData::split(old_val)
1270    }
1271
1272    // keep for testing and completion's sake
1273    #[allow(dead_code)]
1274    fn decrement_worker_total(&self) -> usize {
1275        let old_val = self
1276            .worker_count
1277            .fetch_sub(INCREMENT_TOTAL, Ordering::Relaxed);
1278        WorkerCountData::get_total_count(old_val)
1279    }
1280
1281    // keep for testing and completion's sake
1282    #[allow(dead_code)]
1283    fn decrement_worker_total_ret_both(&self) -> (usize, usize) {
1284        let old_val = self
1285            .worker_count
1286            .fetch_sub(INCREMENT_TOTAL, Ordering::Relaxed);
1287        WorkerCountData::split(old_val)
1288    }
1289
1290    // keep for testing and completion's sake
1291    #[allow(dead_code)]
1292    fn increment_worker_idle(&self) -> usize {
1293        let old_val = self
1294            .worker_count
1295            .fetch_add(INCREMENT_IDLE, Ordering::Relaxed);
1296        WorkerCountData::get_idle_count(old_val)
1297    }
1298
1299    fn increment_worker_idle_ret_both(&self) -> (usize, usize) {
1300        let old_val = self
1301            .worker_count
1302            .fetch_add(INCREMENT_IDLE, Ordering::Relaxed);
1303        WorkerCountData::split(old_val)
1304    }
1305
1306    fn decrement_worker_idle(&self) -> usize {
1307        let old_val = self
1308            .worker_count
1309            .fetch_sub(INCREMENT_IDLE, Ordering::Relaxed);
1310        WorkerCountData::get_idle_count(old_val)
1311    }
1312
1313    // keep for testing and completion's sake
1314    #[allow(dead_code)]
1315    fn decrement_worker_idle_ret_both(&self) -> (usize, usize) {
1316        let old_val = self
1317            .worker_count
1318            .fetch_sub(INCREMENT_IDLE, Ordering::Relaxed);
1319        WorkerCountData::split(old_val)
1320    }
1321
1322    #[inline]
1323    fn split(val: usize) -> (usize, usize) {
1324        let total_count = val >> (BITS / 2);
1325        let idle_count = val & WORKER_IDLE_MASK;
1326        (total_count, idle_count)
1327    }
1328
1329    #[inline]
1330    fn get_total_count(val: usize) -> usize {
1331        val >> (BITS / 2)
1332    }
1333
1334    #[inline]
1335    fn get_idle_count(val: usize) -> usize {
1336        val & WORKER_IDLE_MASK
1337    }
1338}
1339
1340/// struct containing data shared between workers
1341struct WorkerData {
1342    pool_name: String,
1343    worker_count_data: WorkerCountData,
1344    worker_number: AtomicUsize,
1345    join_notify_condvar: Condvar,
1346    join_notify_mutex: Mutex<()>,
1347    join_generation: AtomicUsize,
1348}
1349
1350struct ChannelData {
1351    sender: crossbeam_channel::Sender<Job>,
1352    receiver: crossbeam_channel::Receiver<Job>,
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357
1358    use std::sync::{
1359        atomic::{AtomicUsize, Ordering},
1360        Arc,
1361    };
1362    use std::thread;
1363    use std::time::Duration;
1364
1365    use super::Builder;
1366    use super::ThreadPool;
1367    use super::WorkerCountData;
1368
1369    #[test]
1370    fn it_works() {
1371        let pool = ThreadPool::new(2, 10, Duration::from_secs(5));
1372        let count = Arc::new(AtomicUsize::new(0));
1373
1374        let count1 = count.clone();
1375        pool.execute(move || {
1376            count1.fetch_add(1, Ordering::Relaxed);
1377            thread::sleep(std::time::Duration::from_secs(4));
1378        });
1379        let count2 = count.clone();
1380        pool.execute(move || {
1381            count2.fetch_add(1, Ordering::Relaxed);
1382            thread::sleep(std::time::Duration::from_secs(4));
1383        });
1384        let count3 = count.clone();
1385        pool.execute(move || {
1386            count3.fetch_add(1, Ordering::Relaxed);
1387            thread::sleep(std::time::Duration::from_secs(4));
1388        });
1389        let count4 = count.clone();
1390        pool.execute(move || {
1391            count4.fetch_add(1, Ordering::Relaxed);
1392            thread::sleep(std::time::Duration::from_secs(4));
1393        });
1394        thread::sleep(std::time::Duration::from_secs(20));
1395        let count5 = count.clone();
1396        pool.execute(move || {
1397            count5.fetch_add(1, Ordering::Relaxed);
1398            thread::sleep(std::time::Duration::from_secs(4));
1399        });
1400        let count6 = count.clone();
1401        pool.execute(move || {
1402            count6.fetch_add(1, Ordering::Relaxed);
1403            thread::sleep(std::time::Duration::from_secs(4));
1404        });
1405        let count7 = count.clone();
1406        pool.execute(move || {
1407            count7.fetch_add(1, Ordering::Relaxed);
1408            thread::sleep(std::time::Duration::from_secs(4));
1409        });
1410        let count8 = count.clone();
1411        pool.execute(move || {
1412            count8.fetch_add(1, Ordering::Relaxed);
1413            thread::sleep(std::time::Duration::from_secs(4));
1414        });
1415        thread::sleep(std::time::Duration::from_secs(20));
1416
1417        let count = count.load(Ordering::Relaxed);
1418        let worker_count = pool.get_current_worker_count();
1419
1420        assert_eq!(count, 8);
1421        // assert that non-core threads were dropped
1422        assert_eq!(worker_count, 2);
1423        assert_eq!(pool.get_idle_worker_count(), 2);
1424    }
1425
1426    #[test]
1427    #[ignore]
1428    fn stress_test() {
1429        let pool = Arc::new(ThreadPool::new(3, 50, Duration::from_secs(30)));
1430        let counter = Arc::new(AtomicUsize::new(0));
1431
1432        for _ in 0..5 {
1433            let pool_1 = pool.clone();
1434            let clone = counter.clone();
1435            pool.execute(move || {
1436                for _ in 0..160 {
1437                    let clone = clone.clone();
1438                    pool_1.execute(move || {
1439                        clone.fetch_add(1, Ordering::Relaxed);
1440                        thread::sleep(Duration::from_secs(10));
1441                    });
1442                }
1443
1444                thread::sleep(Duration::from_secs(20));
1445
1446                for _ in 0..160 {
1447                    let clone = clone.clone();
1448                    pool_1.execute(move || {
1449                        clone.fetch_add(1, Ordering::Relaxed);
1450                        thread::sleep(Duration::from_secs(10));
1451                    });
1452                }
1453            });
1454        }
1455
1456        thread::sleep(Duration::from_secs(10));
1457        assert_eq!(pool.get_current_worker_count(), 50);
1458
1459        pool.join();
1460        assert_eq!(counter.load(Ordering::Relaxed), 1600);
1461        thread::sleep(Duration::from_secs(31));
1462        assert_eq!(pool.get_current_worker_count(), 3);
1463    }
1464
1465    #[test]
1466    fn test_join() {
1467        // use a thread pool with one thread max to make sure the second task starts after
1468        // pool.join() is called to make sure it joins future tasks as well
1469        let pool = ThreadPool::new(0, 1, Duration::from_secs(5));
1470        let counter = Arc::new(AtomicUsize::new(0));
1471
1472        let clone_1 = counter.clone();
1473        pool.execute(move || {
1474            thread::sleep(Duration::from_secs(5));
1475            clone_1.fetch_add(1, Ordering::Relaxed);
1476        });
1477
1478        let clone_2 = counter.clone();
1479        pool.execute(move || {
1480            thread::sleep(Duration::from_secs(5));
1481            clone_2.fetch_add(1, Ordering::Relaxed);
1482        });
1483
1484        pool.join();
1485
1486        assert_eq!(counter.load(Ordering::Relaxed), 2);
1487    }
1488
1489    #[test]
1490    fn test_join_timeout() {
1491        let pool = ThreadPool::new(0, 1, Duration::from_secs(5));
1492        let counter = Arc::new(AtomicUsize::new(0));
1493
1494        let clone = counter.clone();
1495        pool.execute(move || {
1496            thread::sleep(Duration::from_secs(10));
1497            clone.fetch_add(1, Ordering::Relaxed);
1498        });
1499
1500        pool.join_timeout(Duration::from_secs(5));
1501        assert_eq!(counter.load(Ordering::Relaxed), 0);
1502        pool.join();
1503        assert_eq!(counter.load(Ordering::Relaxed), 1);
1504    }
1505
1506    #[test]
1507    fn test_shutdown() {
1508        let pool = ThreadPool::new(1, 3, Duration::from_secs(5));
1509        let counter = Arc::new(AtomicUsize::new(0));
1510
1511        let clone_1 = counter.clone();
1512        pool.execute(move || {
1513            thread::sleep(Duration::from_secs(5));
1514            clone_1.fetch_add(1, Ordering::Relaxed);
1515        });
1516
1517        let clone_2 = counter.clone();
1518        pool.execute(move || {
1519            thread::sleep(Duration::from_secs(5));
1520            clone_2.fetch_add(1, Ordering::Relaxed);
1521        });
1522
1523        let clone_3 = counter.clone();
1524        pool.execute(move || {
1525            thread::sleep(Duration::from_secs(5));
1526            clone_3.fetch_add(1, Ordering::Relaxed);
1527        });
1528
1529        // since the pool only allows three threads this won't get the chance to run
1530        let clone_4 = counter.clone();
1531        pool.execute(move || {
1532            thread::sleep(Duration::from_secs(5));
1533            clone_4.fetch_add(1, Ordering::Relaxed);
1534        });
1535
1536        pool.join_timeout(Duration::from_secs(2));
1537        pool.shutdown();
1538
1539        thread::sleep(Duration::from_secs(5));
1540
1541        assert_eq!(counter.load(Ordering::Relaxed), 3);
1542    }
1543
1544    #[should_panic(
1545        expected = "max_size must be greater than 0 and greater or equal to the core pool size"
1546    )]
1547    #[test]
1548    fn test_panic_on_0_max_pool_size() {
1549        ThreadPool::new(0, 0, Duration::from_secs(2));
1550    }
1551
1552    #[should_panic(
1553        expected = "max_size must be greater than 0 and greater or equal to the core pool size"
1554    )]
1555    #[test]
1556    fn test_panic_on_smaller_max_than_core_pool_size() {
1557        ThreadPool::new(10, 4, Duration::from_secs(2));
1558    }
1559
1560    #[should_panic(expected = "max_size may not exceed")]
1561    #[test]
1562    fn test_panic_on_max_size_exceeds_half_usize() {
1563        ThreadPool::new(
1564            10,
1565            1 << ((std::mem::size_of::<usize>() * 8) / 2),
1566            Duration::from_secs(2),
1567        );
1568    }
1569
1570    #[test]
1571    fn test_empty_join() {
1572        let pool = ThreadPool::new(3, 10, Duration::from_secs(10));
1573        pool.join();
1574    }
1575
1576    #[test]
1577    fn test_join_when_complete() {
1578        let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1579
1580        pool.execute(|| {
1581            thread::sleep(Duration::from_millis(5000));
1582        });
1583
1584        thread::sleep(Duration::from_millis(5000));
1585        pool.join();
1586    }
1587
1588    #[test]
1589    fn test_full_usage() {
1590        let pool = ThreadPool::new(5, 50, Duration::from_secs(10));
1591
1592        for _ in 0..100 {
1593            pool.execute(|| {
1594                thread::sleep(Duration::from_secs(30));
1595            });
1596        }
1597
1598        thread::sleep(Duration::from_secs(10));
1599        assert_eq!(pool.get_current_worker_count(), 50);
1600
1601        pool.join();
1602        thread::sleep(Duration::from_secs(15));
1603        assert_eq!(pool.get_current_worker_count(), 5);
1604    }
1605
1606    #[test]
1607    fn test_shutdown_join() {
1608        let pool = ThreadPool::new(1, 1, Duration::from_secs(5));
1609        let counter = Arc::new(AtomicUsize::new(0));
1610
1611        let clone = counter.clone();
1612        pool.execute(move || {
1613            thread::sleep(Duration::from_secs(10));
1614            clone.fetch_add(1, Ordering::Relaxed);
1615        });
1616
1617        pool.shutdown_join();
1618        assert_eq!(counter.load(Ordering::Relaxed), 1);
1619    }
1620
1621    #[test]
1622    fn test_shutdown_join_timeout() {
1623        let pool = ThreadPool::new(1, 1, Duration::from_secs(5));
1624        let counter = Arc::new(AtomicUsize::new(0));
1625
1626        let clone = counter.clone();
1627        pool.execute(move || {
1628            thread::sleep(Duration::from_secs(10));
1629            clone.fetch_add(1, Ordering::Relaxed);
1630        });
1631
1632        pool.shutdown_join_timeout(Duration::from_secs(5));
1633        assert_eq!(counter.load(Ordering::Relaxed), 0);
1634    }
1635
1636    #[test]
1637    fn test_empty_shutdown_join() {
1638        let pool = ThreadPool::new(1, 5, Duration::from_secs(5));
1639        pool.shutdown_join();
1640    }
1641
1642    #[test]
1643    fn test_shutdown_core_pool() {
1644        let pool = ThreadPool::new(5, 5, Duration::from_secs(1));
1645        let counter = Arc::new(AtomicUsize::new(0));
1646        let worker_data = pool.worker_data.clone();
1647
1648        for _ in 0..7 {
1649            let clone = counter.clone();
1650            pool.execute(move || {
1651                thread::sleep(Duration::from_secs(2));
1652                clone.fetch_add(1, Ordering::Relaxed);
1653            });
1654        }
1655
1656        assert_eq!(pool.get_current_worker_count(), 5);
1657        assert_eq!(pool.get_idle_worker_count(), 0);
1658        pool.shutdown_join();
1659        assert_eq!(counter.load(Ordering::Relaxed), 7);
1660
1661        // give the workers time to exit
1662        thread::sleep(Duration::from_millis(50));
1663        assert_eq!(worker_data.worker_count_data.get_total_worker_count(), 0);
1664        assert_eq!(worker_data.worker_count_data.get_idle_worker_count(), 0);
1665    }
1666
1667    #[test]
1668    fn test_shutdown_idle_core_pool() {
1669        let pool = ThreadPool::new(5, 5, Duration::from_secs(1));
1670        let counter = Arc::new(AtomicUsize::new(0));
1671        let worker_data = pool.worker_data.clone();
1672
1673        for _ in 0..5 {
1674            let clone = counter.clone();
1675            pool.execute(move || {
1676                clone.fetch_add(1, Ordering::Relaxed);
1677            });
1678        }
1679
1680        pool.shutdown_join();
1681        assert_eq!(counter.load(Ordering::Relaxed), 5);
1682
1683        // give the workers time to exit
1684        thread::sleep(Duration::from_millis(50));
1685        assert_eq!(worker_data.worker_count_data.get_total_worker_count(), 0);
1686        assert_eq!(worker_data.worker_count_data.get_idle_worker_count(), 0);
1687    }
1688
1689    #[test]
1690    fn test_shutdown_on_complete() {
1691        let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1692
1693        pool.execute(|| {
1694            thread::sleep(Duration::from_millis(5000));
1695        });
1696
1697        thread::sleep(Duration::from_millis(5000));
1698        pool.shutdown_join();
1699    }
1700
1701    #[test]
1702    fn test_shutdown_after_complete() {
1703        let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1704
1705        pool.execute(|| {
1706            thread::sleep(Duration::from_millis(5000));
1707        });
1708
1709        thread::sleep(Duration::from_millis(7000));
1710        pool.shutdown_join();
1711    }
1712
1713    #[test]
1714    fn worker_count_test() {
1715        let worker_count_data = WorkerCountData::default();
1716
1717        assert_eq!(worker_count_data.get_total_worker_count(), 0);
1718        assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1719
1720        worker_count_data.increment_both();
1721
1722        assert_eq!(worker_count_data.get_total_worker_count(), 1);
1723        assert_eq!(worker_count_data.get_idle_worker_count(), 1);
1724
1725        for _ in 0..10 {
1726            worker_count_data.increment_both();
1727        }
1728
1729        assert_eq!(worker_count_data.get_total_worker_count(), 11);
1730        assert_eq!(worker_count_data.get_idle_worker_count(), 11);
1731
1732        for _ in 0..15 {
1733            worker_count_data.increment_worker_total();
1734        }
1735
1736        for _ in 0..7 {
1737            worker_count_data.increment_worker_idle();
1738        }
1739
1740        assert_eq!(worker_count_data.get_total_worker_count(), 26);
1741        assert_eq!(worker_count_data.get_idle_worker_count(), 18);
1742        assert_eq!(worker_count_data.get_both(), (26, 18));
1743
1744        for _ in 0..5 {
1745            worker_count_data.decrement_both();
1746        }
1747
1748        assert_eq!(worker_count_data.get_total_worker_count(), 21);
1749        assert_eq!(worker_count_data.get_idle_worker_count(), 13);
1750
1751        for _ in 0..13 {
1752            worker_count_data.decrement_worker_total();
1753        }
1754
1755        for _ in 0..4 {
1756            worker_count_data.decrement_worker_idle();
1757        }
1758
1759        assert_eq!(worker_count_data.get_total_worker_count(), 8);
1760        assert_eq!(worker_count_data.get_idle_worker_count(), 9);
1761
1762        for _ in 0..456789 {
1763            worker_count_data.increment_worker_total();
1764        }
1765
1766        assert_eq!(worker_count_data.get_total_worker_count(), 456797);
1767        assert_eq!(worker_count_data.get_idle_worker_count(), 9);
1768        assert_eq!(worker_count_data.get_both(), (456797, 9));
1769
1770        for _ in 0..23456 {
1771            worker_count_data.increment_worker_idle();
1772        }
1773
1774        assert_eq!(worker_count_data.get_total_worker_count(), 456797);
1775        assert_eq!(worker_count_data.get_idle_worker_count(), 23465);
1776
1777        for _ in 0..150000 {
1778            worker_count_data.decrement_worker_total();
1779        }
1780
1781        assert_eq!(worker_count_data.get_total_worker_count(), 306797);
1782        assert_eq!(worker_count_data.get_idle_worker_count(), 23465);
1783
1784        for _ in 0..10000 {
1785            worker_count_data.decrement_worker_idle();
1786        }
1787
1788        assert_eq!(worker_count_data.get_total_worker_count(), 306797);
1789        assert_eq!(worker_count_data.get_idle_worker_count(), 13465);
1790    }
1791
1792    #[test]
1793    fn test_try_increment_worker_total() {
1794        let worker_count_data = WorkerCountData::default();
1795
1796        let witness = worker_count_data.try_increment_worker_total(0, 5);
1797        assert_eq!(witness, 0);
1798        assert_eq!(worker_count_data.get_total_worker_count(), 1);
1799        assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1800
1801        let witness = worker_count_data.try_increment_worker_total(0, 5);
1802        assert_eq!(witness, 0x0000_0001_0000_0000);
1803        assert_eq!(worker_count_data.get_total_worker_count(), 2);
1804        assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1805
1806        worker_count_data.try_increment_worker_total(2, 5);
1807        worker_count_data.try_increment_worker_total(2, 5);
1808        worker_count_data.try_increment_worker_total(4, 5);
1809        worker_count_data.try_increment_worker_total(4, 5);
1810        let witness = worker_count_data.try_increment_worker_total(2, 5);
1811        assert_eq!(WorkerCountData::get_total_count(witness), 5);
1812        assert_eq!(WorkerCountData::get_idle_count(witness), 0);
1813        assert_eq!(worker_count_data.get_total_worker_count(), 5);
1814        assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1815
1816        let worker_count_data = Arc::new(worker_count_data);
1817
1818        let mut join_handles = Vec::with_capacity(5);
1819        for _ in 0..5 {
1820            let worker_count_data = worker_count_data.clone();
1821            let join_handle = thread::spawn(move || {
1822                for i in 0..5 {
1823                    worker_count_data.try_increment_worker_total(5 + i, 15);
1824                }
1825            });
1826
1827            join_handles.push(join_handle);
1828        }
1829
1830        for join_handle in join_handles {
1831            join_handle.join().unwrap();
1832        }
1833
1834        assert_eq!(worker_count_data.get_total_worker_count(), 15);
1835        assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1836    }
1837
1838    #[test]
1839    fn test_join_enqueued_task() {
1840        let pool = ThreadPool::new(3, 50, Duration::from_secs(20));
1841        let counter = Arc::new(AtomicUsize::new(0));
1842
1843        for _ in 0..160 {
1844            let clone = counter.clone();
1845            pool.execute(move || {
1846                thread::sleep(Duration::from_secs(10));
1847                clone.fetch_add(1, Ordering::Relaxed);
1848            });
1849        }
1850
1851        thread::sleep(Duration::from_secs(5));
1852        assert_eq!(pool.get_current_worker_count(), 50);
1853
1854        pool.join();
1855        assert_eq!(counter.load(Ordering::Relaxed), 160);
1856        thread::sleep(Duration::from_secs(21));
1857        assert_eq!(pool.get_current_worker_count(), 3);
1858    }
1859
1860    #[test]
1861    fn test_panic_all() {
1862        let pool = ThreadPool::new(3, 10, Duration::from_secs(2));
1863
1864        for _ in 0..10 {
1865            pool.execute(|| {
1866                panic!("test");
1867            })
1868        }
1869
1870        pool.join();
1871        thread::sleep(Duration::from_secs(5));
1872        assert_eq!(pool.get_current_worker_count(), 3);
1873        assert_eq!(pool.get_idle_worker_count(), 3);
1874    }
1875
1876    #[test]
1877    fn test_panic_some() {
1878        let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1879        let counter = Arc::new(AtomicUsize::new(0));
1880
1881        for i in 0..10 {
1882            let clone = counter.clone();
1883            pool.execute(move || {
1884                if i < 3 || i % 2 == 0 {
1885                    thread::sleep(Duration::from_secs(5));
1886                    clone.fetch_add(1, Ordering::Relaxed);
1887                } else {
1888                    thread::sleep(Duration::from_secs(5));
1889                    panic!("test");
1890                }
1891            })
1892        }
1893
1894        pool.join();
1895        assert_eq!(counter.load(Ordering::Relaxed), 6);
1896        assert_eq!(pool.get_current_worker_count(), 10);
1897        assert_eq!(pool.get_idle_worker_count(), 10);
1898        thread::sleep(Duration::from_secs(10));
1899        assert_eq!(pool.get_current_worker_count(), 3);
1900        assert_eq!(pool.get_idle_worker_count(), 3);
1901    }
1902
1903    #[test]
1904    fn test_panic_all_core_threads() {
1905        let pool = ThreadPool::new(3, 3, Duration::from_secs(1));
1906        let counter = Arc::new(AtomicUsize::new(0));
1907
1908        for _ in 0..3 {
1909            pool.execute(|| {
1910                panic!("test");
1911            })
1912        }
1913
1914        pool.join();
1915
1916        for i in 0..10 {
1917            let clone = counter.clone();
1918            pool.execute(move || {
1919                if i < 3 || i % 2 == 0 {
1920                    clone.fetch_add(1, Ordering::Relaxed);
1921                } else {
1922                    thread::sleep(Duration::from_secs(5));
1923                    panic!("test");
1924                }
1925            })
1926        }
1927
1928        pool.join();
1929        assert_eq!(counter.load(Ordering::Relaxed), 6);
1930        assert_eq!(pool.get_current_worker_count(), 3);
1931        assert_eq!(pool.get_idle_worker_count(), 3);
1932    }
1933
1934    #[test]
1935    fn test_drop_all_receivers() {
1936        let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
1937        let counter = Arc::new(AtomicUsize::new(0));
1938
1939        for _ in 0..3 {
1940            let clone = counter.clone();
1941            pool.execute(move || {
1942                clone.fetch_add(1, Ordering::Relaxed);
1943            })
1944        }
1945
1946        pool.join();
1947        assert_eq!(counter.load(Ordering::Relaxed), 3);
1948        thread::sleep(Duration::from_secs(10));
1949        assert_eq!(pool.get_current_worker_count(), 0);
1950
1951        for _ in 0..3 {
1952            let clone = counter.clone();
1953            pool.execute(move || {
1954                clone.fetch_add(1, Ordering::Relaxed);
1955            })
1956        }
1957
1958        pool.join();
1959        assert_eq!(counter.load(Ordering::Relaxed), 6);
1960    }
1961
1962    #[test]
1963    fn test_evaluate() {
1964        let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
1965
1966        let count = AtomicUsize::new(0);
1967
1968        let handle = pool.evaluate(move || {
1969            count.fetch_add(1, Ordering::Relaxed);
1970            thread::sleep(Duration::from_secs(5));
1971            count.fetch_add(1, Ordering::Relaxed)
1972        });
1973
1974        let result = handle.await_complete();
1975        assert_eq!(result, 1);
1976    }
1977
1978    #[test]
1979    fn test_multiple_evaluate() {
1980        let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
1981
1982        let count = AtomicUsize::new(0);
1983        let handle_1 = pool.evaluate(move || {
1984            for _ in 0..10000 {
1985                count.fetch_add(1, Ordering::Relaxed);
1986            }
1987
1988            thread::sleep(Duration::from_secs(5));
1989
1990            for _ in 0..10000 {
1991                count.fetch_add(1, Ordering::Relaxed);
1992            }
1993
1994            count.load(Ordering::Relaxed)
1995        });
1996
1997        let handle_2 = pool.evaluate(move || {
1998            let result = handle_1.await_complete();
1999            let mut count = result;
2000
2001            count += 15000;
2002
2003            thread::sleep(Duration::from_secs(5));
2004
2005            count += 20000;
2006
2007            count
2008        });
2009
2010        let result = handle_2.await_complete();
2011        assert_eq!(result, 55000);
2012    }
2013
2014    #[should_panic(expected = "could not receive message because channel was cancelled")]
2015    #[test]
2016    fn test_evaluate_panic() {
2017        let pool = Builder::new().core_size(5).max_size(50).build();
2018
2019        let handle = pool.evaluate(|| {
2020            let x = 3;
2021
2022            if x == 3 {
2023                panic!("expected panic")
2024            }
2025
2026            return x;
2027        });
2028
2029        handle.await_complete();
2030    }
2031
2032    #[test]
2033    fn test_complete_fut() {
2034        let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
2035
2036        async fn async_fn() -> i8 {
2037            8
2038        }
2039
2040        let fut = async_fn();
2041        let handle = pool.complete(fut);
2042
2043        assert_eq!(handle.await_complete(), 8);
2044    }
2045
2046    #[cfg(feature = "async")]
2047    #[test]
2048    fn test_spawn() {
2049        let pool = ThreadPool::default();
2050
2051        async fn add(x: i32, y: i32) -> i32 {
2052            x + y
2053        }
2054
2055        async fn multiply(x: i32, y: i32) -> i32 {
2056            x * y
2057        }
2058
2059        let count = Arc::new(AtomicUsize::new(0));
2060        let clone = count.clone();
2061        pool.spawn(async move {
2062            let a = add(2, 3).await; // 5
2063            let b = add(2, a).await; // 7
2064            let c = multiply(2, b).await; // 14
2065            let d = multiply(a, add(2, 1).await).await; // 15
2066            let e = add(c, d).await; // 29
2067
2068            clone.fetch_add(e as usize, Ordering::Relaxed);
2069        });
2070
2071        pool.join();
2072        assert_eq!(count.load(Ordering::Relaxed), 29);
2073    }
2074
2075    #[cfg(feature = "async")]
2076    #[test]
2077    fn test_spawn_await() {
2078        let pool = ThreadPool::default();
2079
2080        async fn sub(x: i32, y: i32) -> i32 {
2081            x - y
2082        }
2083
2084        async fn div(x: i32, y: i32) -> i32 {
2085            x / y
2086        }
2087
2088        let handle = pool.spawn_await(async {
2089            let a = sub(120, 10).await; // 110
2090            let b = div(sub(a, 10).await, 4).await; // 25
2091            div(sub(b, div(10, 2).await).await, 5).await // 4
2092        });
2093
2094        assert_eq!(handle.await_complete(), 4)
2095    }
2096
2097    #[test]
2098    fn test_drop_oneshot_receiver() {
2099        let pool = Builder::new().core_size(1).max_size(1).build();
2100
2101        let handle = pool.evaluate(|| {
2102            thread::sleep(Duration::from_secs(5));
2103            5
2104        });
2105
2106        drop(handle);
2107        thread::sleep(Duration::from_secs(10));
2108        let current_thread_index = pool.worker_data.worker_number.load(Ordering::Relaxed);
2109        // current worker number of 2 means that one worker has started (initial number is 1 -> first worker gets and increments number)
2110        // indicating that the worker did not panic else it would have been replaced.
2111        assert_eq!(current_thread_index, 2);
2112    }
2113
2114    #[test]
2115    fn test_builder_max_size() {
2116        Builder::new().max_size(1).build();
2117    }
2118
2119    #[test]
2120    fn test_multi_thread_join() {
2121        let pool = ThreadPool::default();
2122        let count = Arc::new(AtomicUsize::new(0));
2123
2124        let clone1 = count.clone();
2125        pool.execute(move || {
2126            thread::sleep(Duration::from_secs(10));
2127            clone1.fetch_add(1, Ordering::Relaxed);
2128        });
2129
2130        let clone2 = count.clone();
2131        pool.execute(move || {
2132            thread::sleep(Duration::from_secs(10));
2133            clone2.fetch_add(1, Ordering::Relaxed);
2134        });
2135
2136        let clone3 = count.clone();
2137        pool.execute(move || {
2138            thread::sleep(Duration::from_secs(10));
2139            clone3.fetch_add(1, Ordering::Relaxed);
2140        });
2141
2142        let pool2 = pool.clone();
2143        let clone4 = count.clone();
2144        thread::spawn(move || {
2145            thread::sleep(Duration::from_secs(5));
2146            pool2.execute(move || {
2147                thread::sleep(Duration::from_secs(15));
2148                clone4.fetch_add(2, Ordering::Relaxed);
2149            });
2150        });
2151
2152        let pool3 = pool.clone();
2153        let pool4 = pool.clone();
2154        let pool5 = pool.clone();
2155        let h1 = thread::spawn(move || {
2156            pool3.join();
2157        });
2158        let h2 = thread::spawn(move || {
2159            pool4.join();
2160        });
2161        let h3 = thread::spawn(move || {
2162            pool5.join();
2163        });
2164        h1.join().unwrap();
2165        h2.join().unwrap();
2166        h3.join().unwrap();
2167
2168        assert_eq!(count.load(Ordering::Relaxed), 5);
2169    }
2170
2171    #[test]
2172    fn test_start_core_threads() {
2173        let pool = Builder::new().core_size(5).build();
2174        pool.start_core_threads();
2175        assert_eq!(pool.get_current_worker_count(), 5);
2176        assert_eq!(pool.get_idle_worker_count(), 5);
2177    }
2178
2179    #[test]
2180    fn test_start_and_use_core_threads() {
2181        let pool = Builder::new()
2182            .core_size(5)
2183            .max_size(10)
2184            .keep_alive(Duration::from_secs(u64::MAX))
2185            .build();
2186        pool.start_core_threads();
2187        let result = pool.evaluate(|| 5 + 5).await_complete();
2188        assert_eq!(result, 10);
2189        assert_eq!(pool.get_current_worker_count(), 5);
2190    }
2191}