rusty_pool/lib.rs
1#[cfg(feature = "async")]
2use futures::{
3 future::BoxFuture,
4 task::{waker_ref, ArcWake},
5};
6use futures_channel::oneshot;
7use futures_executor::block_on;
8use std::future::Future;
9use std::option::Option;
10use std::sync::{
11 atomic::{AtomicUsize, Ordering},
12 Arc, Condvar, Mutex,
13};
14#[cfg(feature = "async")]
15use std::task::Context;
16use std::thread;
17use std::time::Duration;
18
19const BITS: usize = std::mem::size_of::<usize>() * 8;
20/// The absolute maximum number of workers. This corresponds to the maximum value that can be stored within half the bits of usize,
21/// as two counters (total workers and idle workers) are stored in one AtomicUsize.
22pub const MAX_SIZE: usize = (1 << (BITS / 2)) - 1;
23
24type Job = Box<dyn FnOnce() + Send + 'static>;
25
26/// Trait to implement for all items that may be executed by the `ThreadPool`.
27pub trait Task<R: Send>: Send {
28 /// Execute this task and return its result.
29 fn run(self) -> R;
30
31 /// Transform this `Task` into a heap allocated `FnOnce` if possible.
32 ///
33 /// Used by [`ThreadPool::execute`](struct.ThreadPool.html#method.execute) to turn this `Task` into a `Job`
34 /// directly without having to create an additional `Job` that calls this `Task`.
35 fn into_fn(self) -> Option<Box<dyn FnOnce() -> R + Send + 'static>>;
36
37 /// Return `true` if calling [`Task::into_fn`] on this `Task` returns `Some`.
38 fn is_fn(&self) -> bool;
39}
40
41/// Implement the `Task` trait for any FnOnce closure that returns a thread-safe result.
42impl<R, F> Task<R> for F
43where
44 R: Send,
45 F: FnOnce() -> R + Send + 'static,
46{
47 fn run(self) -> R {
48 self()
49 }
50
51 fn into_fn(self) -> Option<Box<dyn FnOnce() -> R + Send + 'static>> {
52 Some(Box::new(self))
53 }
54
55 fn is_fn(&self) -> bool {
56 true
57 }
58}
59
60/// Handle returned by [`ThreadPool::evaluate`](struct.ThreadPool.html#method.evaluate) and [`ThreadPool::complete`](struct.ThreadPool.html#method.complete)
61/// that allows to block the current thread and wait for the result of a submitted task. The returned `JoinHandle` may also be sent to the [`ThreadPool`](struct.ThreadPool.html)
62/// to create a task that blocks a worker thread until the task is completed and then does something with the result. This handle communicates with the worker thread
63/// using a oneshot channel blocking the thread when [`try_await_complete()`](struct.JoinHandle.html#method.try_await_complete) is called until a message, i.e. the result of the
64/// task, is received.
65pub struct JoinHandle<T: Send> {
66 pub receiver: oneshot::Receiver<T>,
67}
68
69impl<T: Send> JoinHandle<T> {
70 /// Block the current thread until the result of the task is received.
71 ///
72 /// # Errors
73 ///
74 /// This function might return a `oneshot::Canceled` if the channel was broken
75 /// before the result was received. This is generally the case if execution of
76 /// the task panicked.
77 pub fn try_await_complete(self) -> Result<T, oneshot::Canceled> {
78 block_on(self.receiver)
79 }
80
81 /// Block the current thread until the result of the task is received.
82 ///
83 /// # Panics
84 ///
85 /// This function might panic if [`try_await_complete()`](struct.JoinHandle.html#method.try_await_complete) returns `oneshot::Canceled`.
86 /// This is generally the case if execution of the task panicked and the sender was dropped before sending a result to the receiver.
87 pub fn await_complete(self) -> T {
88 self.try_await_complete()
89 .expect("could not receive message because channel was cancelled")
90 }
91}
92
93#[cfg(feature = "async")]
94struct AsyncTask {
95 future: Mutex<Option<BoxFuture<'static, ()>>>,
96 pool: ThreadPool,
97}
98
99/// Implement `ArcWake` for `AsyncTask` by re-submitting the `AsyncTask` i.e. the `Future` to the pool.
100#[cfg(feature = "async")]
101impl ArcWake for AsyncTask {
102 fn wake_by_ref(arc_self: &Arc<Self>) {
103 let cloned_task = arc_self.clone();
104 arc_self
105 .pool
106 .try_execute(cloned_task)
107 .expect("failed to wake future because message could not be sent to pool");
108 }
109}
110
111/// Implement the `Task` trait for `AsyncTask` in order to make it executable for the pool by
112/// creating a waker and polling the future.
113#[cfg(feature = "async")]
114impl Task<()> for Arc<AsyncTask> {
115 fn run(self) {
116 let mut future_slot = self.future.lock().expect("failed to acquire mutex");
117 if let Some(mut future) = future_slot.take() {
118 let waker = waker_ref(&self);
119 let context = &mut Context::from_waker(&*waker);
120 if future.as_mut().poll(context).is_pending() {
121 *future_slot = Some(future);
122 }
123 }
124 }
125
126 fn into_fn(self) -> Option<Box<dyn FnOnce() + Send + 'static>> {
127 None
128 }
129
130 fn is_fn(&self) -> bool {
131 false
132 }
133}
134
135// assert that Send is implemented
136trait ThreadSafe: Send {}
137
138impl<R: Send> ThreadSafe for dyn Task<R> {}
139
140impl<R: Send> ThreadSafe for JoinHandle<R> {}
141
142impl ThreadSafe for ThreadPool {}
143
144/// Self growing / shrinking `ThreadPool` implementation based on crossbeam's
145/// multi-producer multi-consumer channels that enables awaiting the result of a
146/// task and offers async support.
147///
148/// This `ThreadPool` has two different pool sizes; a core pool size filled with
149/// threads that live for as long as the channel and a max pool size which describes
150/// the maximum amount of worker threads that may live at the same time.
151/// Those additional non-core threads have a specific keep_alive time described when
152/// creating the `ThreadPool` that defines how long such threads may be idle for
153/// without receiving any work before giving up and terminating their work loop.
154///
155/// This `ThreadPool` does not spawn any threads until a task is submitted to it.
156/// Then it will create a new thread for each task until the core pool size is full.
157/// After that a new thread will only be created upon an `execute()` call if the
158/// current pool is lower than the max pool size and there are no idle threads.
159///
160/// Functions like `evaluate()` and `complete()` return a `JoinHandle` that may be used
161/// to await the result of a submitted task or future. JoinHandles may be sent to the
162/// thread pool to create a task that blocks a worker thread until it receives the
163/// result of the other task and then operates on the result. If the task panics the
164/// `JoinHandle` receives a cancellation error. This is implemented using a futures
165/// oneshot channel to communicate with the worker thread.
166///
167/// This `ThreadPool` may be used as a futures executor if the "async" feature is enabled,
168/// which is the case by default. The "async" feature includes the `spawn()` and
169/// `try_spawn()` functions which create a task that polls the future one by one and
170/// creates a waker that re-submits the future to the pool when it can make progress.
171/// Without the "async" feature, futures can simply be executed to completion using
172/// the `complete` function, which simply blocks a worker thread until the future has
173/// been polled to completion.
174///
175/// The "async" feature can be disabled if not need by adding the following to your
176/// Cargo dependency:
177/// ```toml
178/// [dependencies.rusty_pool]
179/// default-features = false
180/// version = "*"
181/// ```
182///
183/// When creating a new worker this `ThreadPool` tries to increment the worker count
184/// using a compare-and-swap mechanism, if the increment fails because the total worker
185/// count has been incremented to the specified limit (the core_size when trying to
186/// create a core thread, else the max_size) by another thread, the pool tries to create
187/// a non-core worker instead (if previously trying to create a core worker and no idle
188/// worker exists) or sends the task to the channel instead. Panicking workers are always
189/// cloned and replaced.
190///
191/// Locks are only used for the join functions to lock the `Condvar`, apart from that
192/// this `ThreadPool` implementation fully relies on crossbeam and atomic operations.
193/// This `ThreadPool` decides whether it is currently idle (and should fast-return
194/// join attempts) by comparing the total worker count to the idle worker count, which
195/// are two values stored in one `AtomicUsize` (both half the size of usize) making sure
196/// that if both are updated they may be updated in a single atomic operation.
197///
198/// The thread pool and its crossbeam channel can be destroyed by using the shutdown
199/// function, however that does not stop tasks that are already running but will
200/// terminate the thread the next time it will try to fetch work from the channel.
201/// The channel is only destroyed once all clones of the `ThreadPool` have been
202/// shut down / dropped.
203///
204/// # Usage
205/// Create a new `ThreadPool`:
206/// ```rust
207/// use rusty_pool::Builder;
208/// use rusty_pool::ThreadPool;
209/// // Create default `ThreadPool` configuration with the number of CPUs as core pool size
210/// let pool = ThreadPool::default();
211/// // Create a `ThreadPool` with default naming:
212/// use std::time::Duration;
213/// let pool2 = ThreadPool::new(5, 50, Duration::from_secs(60));
214/// // Create a `ThreadPool` with a custom name:
215/// let pool3 = ThreadPool::new_named(String::from("my_pool"), 5, 50, Duration::from_secs(60));
216/// // using the Builder struct:
217/// let pool4 = Builder::new().core_size(5).max_size(50).build();
218/// ```
219///
220/// Submit a closure for execution in the `ThreadPool`:
221/// ```rust
222/// use rusty_pool::ThreadPool;
223/// use std::thread;
224/// use std::time::Duration;
225/// let pool = ThreadPool::default();
226/// pool.execute(|| {
227/// thread::sleep(Duration::from_secs(5));
228/// print!("hello");
229/// });
230/// ```
231///
232/// Submit a task and await the result:
233/// ```rust
234/// use rusty_pool::ThreadPool;
235/// use std::thread;
236/// use std::time::Duration;
237/// let pool = ThreadPool::default();
238/// let handle = pool.evaluate(|| {
239/// thread::sleep(Duration::from_secs(5));
240/// return 4;
241/// });
242/// let result = handle.await_complete();
243/// assert_eq!(result, 4);
244/// ```
245///
246/// Spawn futures using the `ThreadPool`:
247/// ```rust
248/// async fn some_async_fn(x: i32, y: i32) -> i32 {
249/// x + y
250/// }
251///
252/// async fn other_async_fn(x: i32, y: i32) -> i32 {
253/// x - y
254/// }
255///
256/// use rusty_pool::ThreadPool;
257/// let pool = ThreadPool::default();
258///
259/// // simply complete future by blocking a worker until the future has been completed
260/// let handle = pool.complete(async {
261/// let a = some_async_fn(4, 6).await; // 10
262/// let b = some_async_fn(a, 3).await; // 13
263/// let c = other_async_fn(b, a).await; // 3
264/// some_async_fn(c, 5).await // 8
265/// });
266/// assert_eq!(handle.await_complete(), 8);
267///
268/// use std::sync::{Arc, atomic::{AtomicI32, Ordering}};
269///
270/// // spawn future and create waker that automatically re-submits itself to the threadpool if ready to make progress, this requires the "async" feature which is enabled by default
271/// let count = Arc::new(AtomicI32::new(0));
272/// let clone = count.clone();
273/// pool.spawn(async move {
274/// let a = some_async_fn(3, 6).await; // 9
275/// let b = other_async_fn(a, 4).await; // 5
276/// let c = some_async_fn(b, 7).await; // 12
277/// clone.fetch_add(c, Ordering::Relaxed);
278/// });
279/// pool.join();
280/// assert_eq!(count.load(Ordering::Relaxed), 12);
281/// ```
282///
283/// Join and shut down the `ThreadPool`:
284/// ```rust
285/// use std::thread;
286/// use std::time::Duration;
287/// use rusty_pool::ThreadPool;
288/// use std::sync::{Arc, atomic::{AtomicI32, Ordering}};
289///
290/// let pool = ThreadPool::default();
291/// for _ in 0..10 {
292/// pool.execute(|| { thread::sleep(Duration::from_secs(10)) })
293/// }
294/// // wait for all threads to become idle, i.e. all tasks to be completed including tasks added by other threads after join() is called by this thread or for the timeout to be reached
295/// pool.join_timeout(Duration::from_secs(5));
296///
297/// let count = Arc::new(AtomicI32::new(0));
298/// for _ in 0..15 {
299/// let clone = count.clone();
300/// pool.execute(move || {
301/// thread::sleep(Duration::from_secs(5));
302/// clone.fetch_add(1, Ordering::Relaxed);
303/// });
304/// }
305///
306/// // shut down and drop the only instance of this `ThreadPool` (no clones) causing the channel to be broken leading all workers to exit after completing their current work
307/// // and wait for all workers to become idle, i.e. finish their work.
308/// pool.shutdown_join();
309/// assert_eq!(count.load(Ordering::Relaxed), 15);
310/// ```
311#[derive(Clone)]
312pub struct ThreadPool {
313 core_size: usize,
314 max_size: usize,
315 keep_alive: Duration,
316 channel_data: Arc<ChannelData>,
317 worker_data: Arc<WorkerData>,
318}
319
320impl ThreadPool {
321 /// Construct a new `ThreadPool` with the specified core pool size, max pool size
322 /// and keep_alive time for non-core threads. This function does not spawn any
323 /// threads. This `ThreadPool` will receive a default name in the following format:
324 /// "rusty_pool_" + pool number.
325 ///
326 /// `core_size` specifies the amount of threads to keep alive for as long as
327 /// the `ThreadPool` exists and its channel remains connected.
328 ///
329 /// `max_size` specifies the maximum number of worker threads that may exist
330 /// at the same time.
331 ///
332 /// `keep_alive` specifies the duration for which to keep non-core pool
333 /// worker threads alive while they do not receive any work.
334 ///
335 /// # Panics
336 ///
337 /// This function will panic if max_size is 0, lower than core_size or exceeds half
338 /// the size of usize. This restriction exists because two counters (total workers and
339 /// idle counters) are stored within one AtomicUsize.
340 pub fn new(core_size: usize, max_size: usize, keep_alive: Duration) -> Self {
341 static POOL_COUNTER: AtomicUsize = AtomicUsize::new(1);
342 let name = format!(
343 "rusty_pool_{}",
344 POOL_COUNTER.fetch_add(1, Ordering::Relaxed)
345 );
346 ThreadPool::new_named(name, core_size, max_size, keep_alive)
347 }
348
349 /// Construct a new `ThreadPool` with the specified name, core pool size, max pool size
350 /// and keep_alive time for non-core threads. This function does not spawn any
351 /// threads.
352 ///
353 /// `name` the name of the `ThreadPool` that will be used as prefix for each
354 /// thread.
355 ///
356 /// `core_size` specifies the amount of threads to keep alive for as long as
357 /// the `ThreadPool` exists and its channel remains connected.
358 ///
359 /// `max_size` specifies the maximum number of worker threads that may exist
360 /// at the same time.
361 ///
362 /// `keep_alive` specifies the duration for which to keep non-core pool
363 /// worker threads alive while they do not receive any work.
364 ///
365 /// # Panics
366 ///
367 /// This function will panic if max_size is 0, lower than core_size or exceeds half
368 /// the size of usize. This restriction exists because two counters (total workers and
369 /// idle counters) are stored within one AtomicUsize.
370 pub fn new_named(
371 name: String,
372 core_size: usize,
373 max_size: usize,
374 keep_alive: Duration,
375 ) -> Self {
376 let (sender, receiver) = crossbeam_channel::unbounded();
377
378 if max_size == 0 || max_size < core_size {
379 panic!("max_size must be greater than 0 and greater or equal to the core pool size");
380 } else if max_size > MAX_SIZE {
381 panic!(
382 "max_size may not exceed {}, the maximum value that can be stored within half the bits of usize ({} -> {} bits in this case)",
383 MAX_SIZE,
384 BITS,
385 BITS / 2
386 );
387 }
388
389 let worker_data = WorkerData {
390 pool_name: name,
391 worker_count_data: WorkerCountData::default(),
392 worker_number: AtomicUsize::new(1),
393 join_notify_condvar: Condvar::new(),
394 join_notify_mutex: Mutex::new(()),
395 join_generation: AtomicUsize::new(0),
396 };
397
398 let channel_data = ChannelData { sender, receiver };
399
400 Self {
401 core_size,
402 max_size,
403 keep_alive,
404 channel_data: Arc::new(channel_data),
405 worker_data: Arc::new(worker_data),
406 }
407 }
408
409 /// Get the number of live workers, includes all workers waiting for work or executing tasks.
410 ///
411 /// This counter is incremented when creating a new worker. The value is increment just before
412 /// the worker starts executing its initial task. Incrementing the worker total might fail
413 /// if the total has already reached the specified limit (either core_size or max_size) after
414 /// being incremented by another thread, as of rusty_pool 0.5.0 failed attempts to create a worker
415 /// no longer skews the worker total as failed attempts to increment the worker total does not
416 /// increment the value at all.
417 /// This counter is decremented when a worker reaches the end of its working loop, which for non-core
418 /// threads might happen if it does not receive any work during its keep alive time,
419 /// for core threads this only happens once the channel is disconnected.
420 pub fn get_current_worker_count(&self) -> usize {
421 self.worker_data.worker_count_data.get_total_worker_count()
422 }
423
424 /// Get the number of workers currently waiting for work. Those threads are currently
425 /// polling from the crossbeam receiver. Core threads wait indefinitely and might remain
426 /// in this state until the `ThreadPool` is dropped. The remaining threads give up after
427 /// waiting for the specified keep_alive time.
428 pub fn get_idle_worker_count(&self) -> usize {
429 self.worker_data.worker_count_data.get_idle_worker_count()
430 }
431
432 /// Send a new task to the worker threads. This function is responsible for sending the message through the
433 /// channel and creating new workers if needed. If the current worker count is lower than the core pool size
434 /// this function will always create a new worker. If the current worker count is equal to or greater than
435 /// the core pool size this function only creates a new worker if the worker count is below the max pool size
436 /// and there are no idle threads.
437 ///
438 /// When attempting to increment the total worker count before creating a worker fails due to the
439 /// counter reaching the provided limit (core_size when attempting to create core thread, else
440 /// max_size) after being incremented by another thread, the pool tries to create
441 /// a non-core worker instead (if previously trying to create a core worker and no idle
442 /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
443 /// either because the current value of the counter matched the expected value or because the
444 /// last observed value was still below the limit, the worker starts with the provided task as
445 /// initial task and spawns its thread.
446 ///
447 /// # Panics
448 ///
449 /// This function might panic if `try_execute` returns an error when the crossbeam channel has been
450 /// closed unexpectedly.
451 /// This should never occur under normal circumstances using safe code, as shutting down the `ThreadPool`
452 /// consumes ownership and the crossbeam channel is never dropped unless dropping the `ThreadPool`.
453 pub fn execute<T: Task<()> + 'static>(&self, task: T) {
454 if self.try_execute(task).is_err() {
455 panic!("the channel of the thread pool has been closed");
456 }
457 }
458
459 /// Send a new task to the worker threads. This function is responsible for sending the message through the
460 /// channel and creating new workers if needed. If the current worker count is lower than the core pool size
461 /// this function will always create a new worker. If the current worker count is equal to or greater than
462 /// the core pool size this function only creates a new worker if the worker count is below the max pool size
463 /// and there are no idle threads.
464 ///
465 /// When attempting to increment the total worker count before creating a worker fails due to the
466 /// counter reaching the provided limit (core_size when attempting to create core thread, else
467 /// max_size) after being incremented by another thread, the pool tries to create
468 /// a non-core worker instead (if previously trying to create a core worker and no idle
469 /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
470 /// either because the current value of the counter matched the expected value or because the
471 /// last observed value was still below the limit, the worker starts with the provided task as
472 /// initial task and spawns its thread.
473 ///
474 /// # Errors
475 ///
476 /// This function might return `crossbeam_channel::SendError` if the sender was dropped unexpectedly.
477 pub fn try_execute<T: Task<()> + 'static>(
478 &self,
479 task: T,
480 ) -> Result<(), crossbeam_channel::SendError<Job>> {
481 if task.is_fn() {
482 self.try_execute_task(
483 task.into_fn()
484 .expect("Task::into_fn returned None despite is_fn returning true"),
485 )
486 } else {
487 self.try_execute_task(Box::new(move || {
488 task.run();
489 }))
490 }
491 }
492
493 /// Send a new task to the worker threads and return a [`JoinHandle`](struct.JoinHandle.html) that may be used to await
494 /// the result. This function is responsible for sending the message through the channel and creating new
495 /// workers if needed. If the current worker count is lower than the core pool size this function will always
496 /// create a new worker. If the current worker count is equal to or greater than the core pool size this
497 /// function only creates a new worker if the worker count is below the max pool size and there are no idle
498 /// threads.
499 ///
500 /// When attempting to increment the total worker count before creating a worker fails due to the
501 /// counter reaching the provided limit (core_size when attempting to create core thread, else
502 /// max_size) after being incremented by another thread, the pool tries to create
503 /// a non-core worker instead (if previously trying to create a core worker and no idle
504 /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
505 /// either because the current value of the counter matched the expected value or because the
506 /// last observed value was still below the limit, the worker starts with the provided task as
507 /// initial task and spawns its thread.
508 ///
509 /// # Panics
510 ///
511 /// This function might panic if `try_execute` returns an error when the crossbeam channel has been
512 /// closed unexpectedly.
513 /// This should never occur under normal circumstances using safe code, as shutting down the `ThreadPool`
514 /// consumes ownership and the crossbeam channel is never dropped unless dropping the `ThreadPool`.
515 pub fn evaluate<R: Send + 'static, T: Task<R> + 'static>(&self, task: T) -> JoinHandle<R> {
516 match self.try_evaluate(task) {
517 Ok(handle) => handle,
518 Err(e) => panic!("the channel of the thread pool has been closed: {:?}", e),
519 }
520 }
521
522 /// Send a new task to the worker threads and return a [`JoinHandle`](struct.JoinHandle.html) that may be used to await
523 /// the result. This function is responsible for sending the message through the channel and creating new
524 /// workers if needed. If the current worker count is lower than the core pool size this function will always
525 /// create a new worker. If the current worker count is equal to or greater than the core pool size this
526 /// function only creates a new worker if the worker count is below the max pool size and there are no idle
527 /// threads.
528 ///
529 /// When attempting to increment the total worker count before creating a worker fails due to the
530 /// counter reaching the provided limit (core_size when attempting to create core thread, else
531 /// max_size) after being incremented by another thread, the pool tries to create
532 /// a non-core worker instead (if previously trying to create a core worker and no idle
533 /// worker exists) or sends the task to the channel instead. If incrementing the counter succeeded,
534 /// either because the current value of the counter matched the expected value or because the
535 /// last observed value was still below the limit, the worker starts with the provided task as
536 /// initial task and spawns its thread.
537 ///
538 /// # Errors
539 ///
540 /// This function might return `crossbeam_channel::SendError` if the sender was dropped unexpectedly.
541 pub fn try_evaluate<R: Send + 'static, T: Task<R> + 'static>(
542 &self,
543 task: T,
544 ) -> Result<JoinHandle<R>, crossbeam_channel::SendError<Job>> {
545 let (sender, receiver) = oneshot::channel::<R>();
546 let join_handle = JoinHandle { receiver };
547 let job = || {
548 let result = task.run();
549 // if the receiver was dropped that means the caller was not interested in the result
550 let _ignored_result = sender.send(result);
551 };
552
553 let execute_attempt = self.try_execute_task(Box::new(job));
554 execute_attempt.map(|_| join_handle)
555 }
556
557 /// Send a task to the `ThreadPool` that completes the given `Future` and return a [`JoinHandle`](struct.JoinHandle.html)
558 /// that may be used to await the result. This function simply calls [`evaluate()`](struct.ThreadPool.html#method.evaluate)
559 /// with a closure that calls `block_on` with the provided future.
560 ///
561 /// # Panic
562 ///
563 /// This function panics if the task fails to be sent to the `ThreadPool` due to the channel being broken.
564 pub fn complete<R: Send + 'static>(
565 &self,
566 future: impl Future<Output = R> + 'static + Send,
567 ) -> JoinHandle<R> {
568 self.evaluate(|| block_on(future))
569 }
570
571 /// Send a task to the `ThreadPool` that completes the given `Future` and return a [`JoinHandle`](struct.JoinHandle.html)
572 /// that may be used to await the result. This function simply calls [`try_evaluate()`](struct.ThreadPool.html#method.try_evaluate)
573 /// with a closure that calls `block_on` with the provided future.
574 ///
575 /// # Errors
576 ///
577 /// This function returns `crossbeam_channel::SendError` if the task fails to be sent to the `ThreadPool` due to the channel being broken.
578 pub fn try_complete<R: Send + 'static>(
579 &self,
580 future: impl Future<Output = R> + 'static + Send,
581 ) -> Result<JoinHandle<R>, crossbeam_channel::SendError<Job>> {
582 self.try_evaluate(|| block_on(future))
583 }
584
585 /// Submit a `Future` to be polled by this `ThreadPool`. Unlike [`complete()`](struct.ThreadPool.html#method.complete) this does not
586 /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
587 /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
588 /// function simply constructs the `AsyncTask` and calls [`execute()`](struct.ThreadPool.html#method.execute).
589 ///
590 /// # Panic
591 ///
592 /// This function panics if the task fails to be sent to the `ThreadPool` due to the channel being broken.
593 #[cfg(feature = "async")]
594 pub fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
595 let future_task = Arc::new(AsyncTask {
596 future: Mutex::new(Some(Box::pin(future))),
597 pool: self.clone(),
598 });
599
600 self.execute(future_task)
601 }
602
603 /// Submit a `Future` to be polled by this `ThreadPool`. Unlike [`try_complete()`](struct.ThreadPool.html#method.try_complete) this does not
604 /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
605 /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
606 /// function simply constructs the `AsyncTask` and calls [`try_execute()`](struct.ThreadPool.html#method.try_execute).
607 ///
608 /// # Errors
609 ///
610 /// This function returns `crossbeam_channel::SendError` if the task fails to be sent to the `ThreadPool` due to the channel being broken.
611 #[cfg(feature = "async")]
612 pub fn try_spawn(
613 &self,
614 future: impl Future<Output = ()> + 'static + Send,
615 ) -> Result<(), crossbeam_channel::SendError<Job>> {
616 let future_task = Arc::new(AsyncTask {
617 future: Mutex::new(Some(Box::pin(future))),
618 pool: self.clone(),
619 });
620
621 self.try_execute(future_task)
622 }
623
624 /// Create a top-level `Future` that awaits the provided `Future` and then sends the result to the
625 /// returned [`JoinHandle`](struct.JoinHandle.html). Unlike [`complete()`](struct.ThreadPool.html#method.complete) this does not
626 /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
627 /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
628 /// function simply constructs the `AsyncTask` and calls [`execute()`](struct.ThreadPool.html#method.execute).
629 ///
630 /// This enables awaiting the final result outside of an async context like [`complete()`](struct.ThreadPool.html#method.complete) while still
631 /// polling the future lazily instead of eagerly blocking the worker until the future is done.
632 ///
633 /// # Panic
634 ///
635 /// This function panics if the task fails to be sent to the `ThreadPool` due to the channel being broken.
636 #[cfg(feature = "async")]
637 pub fn spawn_await<R: Send + 'static>(
638 &self,
639 future: impl Future<Output = R> + 'static + Send,
640 ) -> JoinHandle<R> {
641 match self.try_spawn_await(future) {
642 Ok(handle) => handle,
643 Err(e) => panic!("the channel of the thread pool has been closed: {:?}", e),
644 }
645 }
646
647 /// Create a top-level `Future` that awaits the provided `Future` and then sends the result to the
648 /// returned [`JoinHandle`](struct.JoinHandle.html). Unlike [`try_complete()`](struct.ThreadPool.html#method.try_complete) this does not
649 /// block a worker until the `Future` has been completed but polls the `Future` once at a time and creates a `Waker`
650 /// that re-submits the Future to this pool when awakened. Since `Arc<AsyncTask>` implements the [`Task`](trait.Task.html) trait this
651 /// function simply constructs the `AsyncTask` and calls [`try_execute()`](struct.ThreadPool.html#method.try_execute).
652 ///
653 /// This enables awaiting the final result outside of an async context like [`complete()`](struct.ThreadPool.html#method.complete) while still
654 /// polling the future lazily instead of eagerly blocking the worker until the future is done.
655 ///
656 /// # Errors
657 ///
658 /// This function returns `crossbeam_channel::SendError` if the task fails to be sent to the `ThreadPool` due to the channel being broken.
659 #[cfg(feature = "async")]
660 pub fn try_spawn_await<R: Send + 'static>(
661 &self,
662 future: impl Future<Output = R> + 'static + Send,
663 ) -> Result<JoinHandle<R>, crossbeam_channel::SendError<Job>> {
664 let (sender, receiver) = oneshot::channel::<R>();
665 let join_handle = JoinHandle { receiver };
666
667 self.try_spawn(async {
668 let result = future.await;
669 // if the receiver was dropped that means the caller was not interested in the result
670 let _ignored_result = sender.send(result);
671 })
672 .map(|_| join_handle)
673 }
674
675 #[inline]
676 fn try_execute_task(&self, task: Job) -> Result<(), crossbeam_channel::SendError<Job>> {
677 // create a new worker either if the current worker count is lower than the core pool size
678 // or if there are no idle threads and the current worker count is lower than the max pool size
679 let worker_count_data = &self.worker_data.worker_count_data;
680 let mut worker_count_val = worker_count_data.worker_count.load(Ordering::Relaxed);
681 let (mut curr_worker_count, idle_worker_count) = WorkerCountData::split(worker_count_val);
682 let mut curr_idle_count = idle_worker_count;
683
684 // always create a new worker if current pool size is below core size
685 if curr_worker_count < self.core_size {
686 let witnessed =
687 worker_count_data.try_increment_worker_total(worker_count_val, self.core_size);
688
689 // the witnessed value matched the expected value, meaning the initial exchange succeeded, or the final witnessed
690 // value is still below the coreSize, meaning the increment eventually succeeded
691 if witnessed == worker_count_val
692 || WorkerCountData::get_total_count(witnessed) < self.core_size
693 {
694 let worker = Worker::new(
695 self.channel_data.receiver.clone(),
696 Arc::clone(&self.worker_data),
697 None,
698 );
699
700 worker.start(Some(task));
701 return Ok(());
702 }
703
704 curr_worker_count = WorkerCountData::get_total_count(witnessed);
705 curr_idle_count = WorkerCountData::get_idle_count(witnessed);
706 worker_count_val = witnessed;
707 }
708
709 // create a new worker if the current worker count is below the maxSize and the pool has been observed to be busy
710 // (no idle workers) during the invocation of this function
711 if curr_worker_count < self.max_size && (idle_worker_count == 0 || curr_idle_count == 0) {
712 let witnessed =
713 worker_count_data.try_increment_worker_total(worker_count_val, self.max_size);
714
715 if witnessed == worker_count_val
716 || WorkerCountData::get_total_count(witnessed) < self.max_size
717 {
718 let worker = Worker::new(
719 self.channel_data.receiver.clone(),
720 Arc::clone(&self.worker_data),
721 Some(self.keep_alive),
722 );
723
724 worker.start(Some(task));
725 return Ok(());
726 }
727 }
728
729 self.send_task_to_channel(task)
730 }
731
732 /// Blocks the current thread until there aren't any non-idle threads anymore.
733 /// This includes work started after calling this function.
734 /// This function blocks until the next time this `ThreadPool` completes all of its work,
735 /// except if all threads are idle and the channel is empty at the time of calling this
736 /// function, in which case it will fast-return.
737 ///
738 /// This utilizes a `Condvar` that is notified by workers when they complete a job and notice
739 /// that the channel is currently empty and it was the last thread to finish the current
740 /// generation of work (i.e. when incrementing the idle worker counter brings the value
741 /// up to the total worker counter, meaning it's the last thread to become idle).
742 pub fn join(&self) {
743 self.inner_join(None);
744 }
745
746 /// Blocks the current thread until there aren't any non-idle threads anymore or until the
747 /// specified time_out Duration passes, whichever happens first.
748 /// This includes work started after calling this function.
749 /// This function blocks until the next time this `ThreadPool` completes all of its work,
750 /// (or until the time_out is reached) except if all threads are idle and the channel is
751 /// empty at the time of calling this function, in which case it will fast-return.
752 ///
753 /// This utilizes a `Condvar` that is notified by workers when they complete a job and notice
754 /// that the channel is currently empty and it was the last thread to finish the current
755 /// generation of work (i.e. when incrementing the idle worker counter brings the value
756 /// up to the total worker counter, meaning it's the last thread to become idle).
757 pub fn join_timeout(&self, time_out: Duration) {
758 self.inner_join(Some(time_out));
759 }
760
761 /// Destroy this `ThreadPool` by claiming ownership and dropping the value,
762 /// causing the `Sender` to drop thus disconnecting the channel.
763 /// Threads in this pool that are currently executing a task will finish what
764 /// they're doing until they check the channel, discovering that it has been
765 /// disconnected from the sender and thus terminate their work loop.
766 ///
767 /// If other clones of this `ThreadPool` exist the sender will remain intact
768 /// and tasks submitted to those clones will succeed, this includes pending
769 /// `AsyncTask` instances as they hold an owned clone of the `ThreadPool`
770 /// to re-submit awakened futures.
771 pub fn shutdown(self) {
772 drop(self);
773 }
774
775 /// Destroy this `ThreadPool` by claiming ownership and dropping the value,
776 /// causing the `Sender` to drop thus disconnecting the channel.
777 /// Threads in this pool that are currently executing a task will finish what
778 /// they're doing until they check the channel, discovering that it has been
779 /// disconnected from the sender and thus terminate their work loop.
780 ///
781 /// If other clones of this `ThreadPool` exist the sender will remain intact
782 /// and tasks submitted to those clones will succeed, this includes pending
783 /// `AsyncTask` instances as they hold an owned clone of the `ThreadPool`
784 /// to re-submit awakened futures.
785 ///
786 /// This function additionally joins all workers after dropping the pool to
787 /// wait for all work to finish.
788 /// Blocks the current thread until there aren't any non-idle threads anymore.
789 /// This function blocks until this `ThreadPool` completes all of its work,
790 /// except if all threads are idle and the channel is empty at the time of
791 /// calling this function, in which case the join will fast-return.
792 /// If other live clones of this `ThreadPool` exist this behaves the same as
793 /// calling [`join`](struct.ThreadPool.html#method.join) on a live `ThreadPool` as tasks submitted
794 /// to one of the clones will be joined as well.
795 ///
796 /// The join utilizes a `Condvar` that is notified by workers when they complete a job and notice
797 /// that the channel is currently empty and it was the last thread to finish the current
798 /// generation of work (i.e. when incrementing the idle worker counter brings the value
799 /// up to the total worker counter, meaning it's the last thread to become idle).
800 pub fn shutdown_join(self) {
801 self.inner_shutdown_join(None);
802 }
803
804 /// Destroy this `ThreadPool` by claiming ownership and dropping the value,
805 /// causing the `Sender` to drop thus disconnecting the channel.
806 /// Threads in this pool that are currently executing a task will finish what
807 /// they're doing until they check the channel, discovering that it has been
808 /// disconnected from the sender and thus terminate their work loop.
809 ///
810 /// If other clones of this `ThreadPool` exist the sender will remain intact
811 /// and tasks submitted to those clones will succeed, this includes pending
812 /// `AsyncTask` instances as they hold an owned clone of the `ThreadPool`
813 /// to re-submit awakened futures.
814 ///
815 /// This function additionally joins all workers after dropping the pool to
816 /// wait for all work to finish.
817 /// Blocks the current thread until there aren't any non-idle threads anymore or until the
818 /// specified time_out Duration passes, whichever happens first.
819 /// This function blocks until this `ThreadPool` completes all of its work,
820 /// (or until the time_out is reached) except if all threads are idle and the channel is
821 /// empty at the time of calling this function, in which case the join will fast-return.
822 /// If other live clones of this `ThreadPool` exist this behaves the same as
823 /// calling [`join`](struct.ThreadPool.html#method.join) on a live `ThreadPool` as tasks submitted
824 /// to one of the clones will be joined as well.
825 ///
826 /// The join utilizes a `Condvar` that is notified by workers when they complete a job and notice
827 /// that the channel is currently empty and it was the last thread to finish the current
828 /// generation of work (i.e. when incrementing the idle worker counter brings the value
829 /// up to the total worker counter, meaning it's the last thread to become idle).
830 pub fn shutdown_join_timeout(self, timeout: Duration) {
831 self.inner_shutdown_join(Some(timeout));
832 }
833
834 /// Return the name of this pool, used as prefix for each worker thread.
835 pub fn get_name(&self) -> &str {
836 &self.worker_data.pool_name
837 }
838
839 /// Starts all core workers by creating core idle workers until the total worker count reaches the core count.
840 ///
841 /// Returns immediately if the current worker count is already >= core size.
842 pub fn start_core_threads(&self) {
843 let worker_count_data = &self.worker_data.worker_count_data;
844
845 let core_size = self.core_size;
846 let mut curr_worker_count = worker_count_data.worker_count.load(Ordering::Relaxed);
847 if WorkerCountData::get_total_count(curr_worker_count) >= core_size {
848 return;
849 }
850
851 loop {
852 let witnessed = worker_count_data.try_increment_worker_count(
853 curr_worker_count,
854 INCREMENT_TOTAL | INCREMENT_IDLE,
855 core_size,
856 );
857
858 if WorkerCountData::get_total_count(witnessed) >= core_size {
859 return;
860 }
861
862 let worker = Worker::new(
863 self.channel_data.receiver.clone(),
864 Arc::clone(&self.worker_data),
865 None,
866 );
867
868 worker.start(None);
869 curr_worker_count = witnessed;
870 }
871 }
872
873 #[inline]
874 fn send_task_to_channel(&self, task: Job) -> Result<(), crossbeam_channel::SendError<Job>> {
875 self.channel_data.sender.send(task)?;
876
877 Ok(())
878 }
879
880 #[inline]
881 fn inner_join(&self, time_out: Option<Duration>) {
882 ThreadPool::_do_join(&self.worker_data, &self.channel_data.receiver, time_out);
883 }
884
885 #[inline]
886 fn inner_shutdown_join(self, timeout: Option<Duration>) {
887 let current_worker_data = self.worker_data.clone();
888 let receiver = self.channel_data.receiver.clone();
889 drop(self);
890 ThreadPool::_do_join(¤t_worker_data, &receiver, timeout);
891 }
892
893 #[inline]
894 fn _do_join(
895 current_worker_data: &Arc<WorkerData>,
896 receiver: &crossbeam_channel::Receiver<Job>,
897 time_out: Option<Duration>,
898 ) {
899 // no thread is currently doing any work, return
900 if ThreadPool::is_idle(current_worker_data, receiver) {
901 return;
902 }
903
904 let join_generation = current_worker_data.join_generation.load(Ordering::SeqCst);
905 let guard = current_worker_data
906 .join_notify_mutex
907 .lock()
908 .expect("could not get join notify mutex lock");
909
910 match time_out {
911 Some(time_out) => {
912 let _ret_guard = current_worker_data
913 .join_notify_condvar
914 .wait_timeout_while(guard, time_out, |_| {
915 join_generation
916 == current_worker_data.join_generation.load(Ordering::Relaxed)
917 && !ThreadPool::is_idle(current_worker_data, receiver)
918 })
919 .expect("could not wait for join condvar");
920 }
921 None => {
922 let _ret_guard = current_worker_data
923 .join_notify_condvar
924 .wait_while(guard, |_| {
925 join_generation
926 == current_worker_data.join_generation.load(Ordering::Relaxed)
927 && !ThreadPool::is_idle(current_worker_data, receiver)
928 })
929 .expect("could not wait for join condvar");
930 }
931 };
932
933 // increment generation if current thread is first thread to be awakened from wait in current generation
934 let _ = current_worker_data.join_generation.compare_exchange(
935 join_generation,
936 join_generation.wrapping_add(1),
937 Ordering::SeqCst,
938 Ordering::SeqCst,
939 );
940 }
941
942 #[inline]
943 fn is_idle(
944 current_worker_data: &Arc<WorkerData>,
945 receiver: &crossbeam_channel::Receiver<Job>,
946 ) -> bool {
947 let (current_worker_count, current_idle_count) =
948 current_worker_data.worker_count_data.get_both();
949 current_idle_count == current_worker_count && receiver.is_empty()
950 }
951}
952
953impl Default for ThreadPool {
954 /// create default ThreadPool with the core pool size being equal to the number of cpus
955 /// and the max_size being twice the core size with a 60 second timeout
956 fn default() -> Self {
957 let num_cpus = num_cpus::get();
958 ThreadPool::new(
959 num_cpus,
960 std::cmp::max(num_cpus, num_cpus * 2),
961 Duration::from_secs(60),
962 )
963 }
964}
965
966/// A helper struct to aid creating a new `ThreadPool` using default values where no value was
967/// explicitly specified.
968#[derive(Default)]
969pub struct Builder {
970 name: Option<String>,
971 core_size: Option<usize>,
972 max_size: Option<usize>,
973 keep_alive: Option<Duration>,
974}
975
976impl Builder {
977 /// Create a new `Builder`.
978 pub fn new() -> Builder {
979 Builder::default()
980 }
981
982 /// Specify the name of the `ThreadPool` that will be used as prefix for the name of each worker thread.
983 /// By default the name is "rusty_pool_x" with x being a static pool counter.
984 pub fn name(mut self, name: String) -> Builder {
985 self.name = Some(name);
986 self
987 }
988
989 /// Specify the core pool size for the `ThreadPool`. The core pool size is the number of threads that stay alive
990 /// for the entire lifetime of the `ThreadPool` or, to be more precise, its channel. These threads are spawned if
991 /// a task is submitted to the `ThreadPool` and the current worker count is below the core pool size.
992 pub fn core_size(mut self, size: usize) -> Builder {
993 self.core_size = Some(size);
994 self
995 }
996
997 /// Specify the maximum pool size this `ThreadPool` may scale up to. This numbers represents the maximum number
998 /// of threads that may be alive at the same time within this pool. Additional threads above the core pool size
999 /// only remain idle for the duration specified by the `keep_alive` parameter before terminating. If the core pool
1000 /// is full, the current pool size is below the max size and there are no idle threads then additional threads
1001 /// will be spawned.
1002 pub fn max_size(mut self, size: usize) -> Builder {
1003 self.max_size = Some(size);
1004 self
1005 }
1006
1007 /// Specify the duration for which additional threads outside the core pool remain alive while not receiving any
1008 /// work before giving up and terminating.
1009 pub fn keep_alive(mut self, keep_alive: Duration) -> Builder {
1010 self.keep_alive = Some(keep_alive);
1011 self
1012 }
1013
1014 /// Build the `ThreadPool` using the parameters previously supplied to this `Builder` using the number of CPUs as
1015 /// default core size if none provided, twice the core size as max size if none provided, 60 seconds keep_alive
1016 /// if none provided and the default naming (rusty_pool_{pool_number}) if none provided.
1017 /// This function calls [`ThreadPool::new`](struct.ThreadPool.html#method.new) or
1018 /// [`ThreadPool::new_named`](struct.ThreadPool.html#method.new_named) depending on whether a name was provided.
1019 ///
1020 /// # Panics
1021 ///
1022 /// Building might panic if the `max_size` is 0 or lower than `core_size` or exceeds half
1023 /// the size of usize. This restriction exists because two counters (total workers and
1024 /// idle counters) are stored within one AtomicUsize.
1025 pub fn build(self) -> ThreadPool {
1026 use std::cmp::{max, min};
1027
1028 let core_size = self.core_size.unwrap_or_else(|| {
1029 let num_cpus = num_cpus::get();
1030 if let Some(max_size) = self.max_size {
1031 min(MAX_SIZE, min(num_cpus, max_size))
1032 } else {
1033 min(MAX_SIZE, num_cpus)
1034 }
1035 });
1036 // handle potential overflow: try using twice the core_size or return core_size
1037 let max_size = self
1038 .max_size
1039 .unwrap_or_else(|| min(MAX_SIZE, max(core_size, core_size * 2)));
1040 let keep_alive = self.keep_alive.unwrap_or_else(|| Duration::from_secs(60));
1041
1042 if let Some(name) = self.name {
1043 ThreadPool::new_named(name, core_size, max_size, keep_alive)
1044 } else {
1045 ThreadPool::new(core_size, max_size, keep_alive)
1046 }
1047 }
1048}
1049
1050#[derive(Clone)]
1051struct Worker {
1052 receiver: crossbeam_channel::Receiver<Job>,
1053 worker_data: Arc<WorkerData>,
1054 keep_alive: Option<Duration>,
1055}
1056
1057impl Worker {
1058 fn new(
1059 receiver: crossbeam_channel::Receiver<Job>,
1060 worker_data: Arc<WorkerData>,
1061 keep_alive: Option<Duration>,
1062 ) -> Self {
1063 Worker {
1064 receiver,
1065 worker_data,
1066 keep_alive,
1067 }
1068 }
1069
1070 fn start(self, task: Option<Job>) {
1071 let worker_name = format!(
1072 "{}_thread_{}",
1073 self.worker_data.pool_name,
1074 self.worker_data
1075 .worker_number
1076 .fetch_add(1, Ordering::Relaxed)
1077 );
1078
1079 thread::Builder::new()
1080 .name(worker_name)
1081 .spawn(move || {
1082 let mut sentinel = Sentinel::new(&self);
1083
1084 if let Some(task) = task {
1085 self.exec_task_and_notify(&mut sentinel, task);
1086 }
1087
1088 loop {
1089 // the two functions return different error types, but since the error type doesn't matter it is mapped to unit to make them compatible
1090 let received_task: Result<Job, _> = match self.keep_alive {
1091 Some(keep_alive) => self.receiver.recv_timeout(keep_alive).map_err(|_| ()),
1092 None => self.receiver.recv().map_err(|_| ()),
1093 };
1094
1095 match received_task {
1096 Ok(task) => {
1097 // mark current as no longer idle and execute task
1098 self.worker_data.worker_count_data.decrement_worker_idle();
1099 self.exec_task_and_notify(&mut sentinel, task);
1100 }
1101 Err(_) => {
1102 // either channel was broken because the sender disconnected or, if can_timeout is true, the Worker has not received any work during
1103 // its keep_alive period and will now terminate, break working loop
1104 break;
1105 }
1106 }
1107 }
1108
1109 // can decrement both at once as the thread only gets here from an idle state
1110 // (if waiting for work and receiving an error)
1111 self.worker_data.worker_count_data.decrement_both();
1112 })
1113 .expect("could not spawn thread");
1114 }
1115
1116 #[inline]
1117 fn exec_task_and_notify(&self, sentinel: &mut Sentinel, task: Job) {
1118 sentinel.is_working = true;
1119 task();
1120 sentinel.is_working = false;
1121 // can already mark as idle as this thread will continue the work loop
1122 self.mark_idle_and_notify_joiners_if_no_work();
1123 }
1124
1125 #[inline]
1126 fn mark_idle_and_notify_joiners_if_no_work(&self) {
1127 let (old_total_count, old_idle_count) = self
1128 .worker_data
1129 .worker_count_data
1130 .increment_worker_idle_ret_both();
1131 // if the last task was the last one in the current generation,
1132 // i.e. if incrementing the idle count leads to the idle count
1133 // being equal to the total worker count, notify joiners
1134 if old_total_count == old_idle_count + 1 && self.receiver.is_empty() {
1135 let _lock = self
1136 .worker_data
1137 .join_notify_mutex
1138 .lock()
1139 .expect("could not get join notify mutex lock");
1140 self.worker_data.join_notify_condvar.notify_all();
1141 }
1142 }
1143}
1144
1145/// Type that exists to manage worker exit on panic.
1146///
1147/// This type is constructed once per `Worker` and implements `Drop` to handle proper worker exit
1148/// in case the worker panics when executing the current task or anywhere else in its work loop.
1149/// If the `Sentinel` is dropped at the end of the worker's work loop and the current thread is
1150/// panicking, handle worker exit the same way as if the task completed normally (if the worker
1151/// panicked while executing a submitted task) then clone the worker and start it with an initial
1152/// task of `None`.
1153struct Sentinel<'s> {
1154 is_working: bool,
1155 worker_ref: &'s Worker,
1156}
1157
1158impl Sentinel<'_> {
1159 fn new(worker_ref: &Worker) -> Sentinel<'_> {
1160 Sentinel {
1161 is_working: false,
1162 worker_ref,
1163 }
1164 }
1165}
1166
1167impl Drop for Sentinel<'_> {
1168 fn drop(&mut self) {
1169 if thread::panicking() {
1170 if self.is_working {
1171 // worker thread panicked in the process of executing a submitted task,
1172 // run the same logic as if the task completed normally and mark it as
1173 // idle, since a clone of this worker will start the work loop as idle
1174 // thread
1175 self.worker_ref.mark_idle_and_notify_joiners_if_no_work();
1176 }
1177
1178 let worker = self.worker_ref.clone();
1179 worker.start(None);
1180 }
1181 }
1182}
1183
1184const WORKER_IDLE_MASK: usize = MAX_SIZE;
1185const INCREMENT_TOTAL: usize = 1 << (BITS / 2);
1186const INCREMENT_IDLE: usize = 1;
1187
1188/// Struct that stores and handles an `AtomicUsize` that stores the total worker count
1189/// in the higher half of bits and the idle worker count in the lower half of bits.
1190/// This allows to to increment / decrement both counters in a single atomic operation.
1191#[derive(Default)]
1192struct WorkerCountData {
1193 worker_count: AtomicUsize,
1194}
1195
1196impl WorkerCountData {
1197 fn get_total_worker_count(&self) -> usize {
1198 let curr_val = self.worker_count.load(Ordering::Relaxed);
1199 WorkerCountData::get_total_count(curr_val)
1200 }
1201
1202 fn get_idle_worker_count(&self) -> usize {
1203 let curr_val = self.worker_count.load(Ordering::Relaxed);
1204 WorkerCountData::get_idle_count(curr_val)
1205 }
1206
1207 fn get_both(&self) -> (usize, usize) {
1208 let curr_val = self.worker_count.load(Ordering::Relaxed);
1209 WorkerCountData::split(curr_val)
1210 }
1211
1212 // keep for testing and completion's sake
1213 #[allow(dead_code)]
1214 fn increment_both(&self) -> (usize, usize) {
1215 let old_val = self
1216 .worker_count
1217 .fetch_add(INCREMENT_TOTAL | INCREMENT_IDLE, Ordering::Relaxed);
1218 WorkerCountData::split(old_val)
1219 }
1220
1221 fn decrement_both(&self) -> (usize, usize) {
1222 let old_val = self
1223 .worker_count
1224 .fetch_sub(INCREMENT_TOTAL | INCREMENT_IDLE, Ordering::Relaxed);
1225 WorkerCountData::split(old_val)
1226 }
1227
1228 fn try_increment_worker_total(&self, expected: usize, max_total: usize) -> usize {
1229 self.try_increment_worker_count(expected, INCREMENT_TOTAL, max_total)
1230 }
1231
1232 fn try_increment_worker_count(
1233 &self,
1234 mut expected: usize,
1235 increment: usize,
1236 max_total: usize,
1237 ) -> usize {
1238 loop {
1239 match self.worker_count.compare_exchange_weak(
1240 expected,
1241 expected + increment,
1242 Ordering::Relaxed,
1243 Ordering::Relaxed,
1244 ) {
1245 Ok(witnessed) => return witnessed,
1246 Err(witnessed) if WorkerCountData::get_total_count(witnessed) >= max_total => {
1247 return witnessed
1248 }
1249 Err(witnessed) => expected = witnessed,
1250 }
1251 }
1252 }
1253
1254 // keep for testing and completion's sake
1255 #[allow(dead_code)]
1256 fn increment_worker_total(&self) -> usize {
1257 let old_val = self
1258 .worker_count
1259 .fetch_add(INCREMENT_TOTAL, Ordering::Relaxed);
1260 WorkerCountData::get_total_count(old_val)
1261 }
1262
1263 // keep for testing and completion's sake
1264 #[allow(dead_code)]
1265 fn increment_worker_total_ret_both(&self) -> (usize, usize) {
1266 let old_val = self
1267 .worker_count
1268 .fetch_add(INCREMENT_TOTAL, Ordering::Relaxed);
1269 WorkerCountData::split(old_val)
1270 }
1271
1272 // keep for testing and completion's sake
1273 #[allow(dead_code)]
1274 fn decrement_worker_total(&self) -> usize {
1275 let old_val = self
1276 .worker_count
1277 .fetch_sub(INCREMENT_TOTAL, Ordering::Relaxed);
1278 WorkerCountData::get_total_count(old_val)
1279 }
1280
1281 // keep for testing and completion's sake
1282 #[allow(dead_code)]
1283 fn decrement_worker_total_ret_both(&self) -> (usize, usize) {
1284 let old_val = self
1285 .worker_count
1286 .fetch_sub(INCREMENT_TOTAL, Ordering::Relaxed);
1287 WorkerCountData::split(old_val)
1288 }
1289
1290 // keep for testing and completion's sake
1291 #[allow(dead_code)]
1292 fn increment_worker_idle(&self) -> usize {
1293 let old_val = self
1294 .worker_count
1295 .fetch_add(INCREMENT_IDLE, Ordering::Relaxed);
1296 WorkerCountData::get_idle_count(old_val)
1297 }
1298
1299 fn increment_worker_idle_ret_both(&self) -> (usize, usize) {
1300 let old_val = self
1301 .worker_count
1302 .fetch_add(INCREMENT_IDLE, Ordering::Relaxed);
1303 WorkerCountData::split(old_val)
1304 }
1305
1306 fn decrement_worker_idle(&self) -> usize {
1307 let old_val = self
1308 .worker_count
1309 .fetch_sub(INCREMENT_IDLE, Ordering::Relaxed);
1310 WorkerCountData::get_idle_count(old_val)
1311 }
1312
1313 // keep for testing and completion's sake
1314 #[allow(dead_code)]
1315 fn decrement_worker_idle_ret_both(&self) -> (usize, usize) {
1316 let old_val = self
1317 .worker_count
1318 .fetch_sub(INCREMENT_IDLE, Ordering::Relaxed);
1319 WorkerCountData::split(old_val)
1320 }
1321
1322 #[inline]
1323 fn split(val: usize) -> (usize, usize) {
1324 let total_count = val >> (BITS / 2);
1325 let idle_count = val & WORKER_IDLE_MASK;
1326 (total_count, idle_count)
1327 }
1328
1329 #[inline]
1330 fn get_total_count(val: usize) -> usize {
1331 val >> (BITS / 2)
1332 }
1333
1334 #[inline]
1335 fn get_idle_count(val: usize) -> usize {
1336 val & WORKER_IDLE_MASK
1337 }
1338}
1339
1340/// struct containing data shared between workers
1341struct WorkerData {
1342 pool_name: String,
1343 worker_count_data: WorkerCountData,
1344 worker_number: AtomicUsize,
1345 join_notify_condvar: Condvar,
1346 join_notify_mutex: Mutex<()>,
1347 join_generation: AtomicUsize,
1348}
1349
1350struct ChannelData {
1351 sender: crossbeam_channel::Sender<Job>,
1352 receiver: crossbeam_channel::Receiver<Job>,
1353}
1354
1355#[cfg(test)]
1356mod tests {
1357
1358 use std::sync::{
1359 atomic::{AtomicUsize, Ordering},
1360 Arc,
1361 };
1362 use std::thread;
1363 use std::time::Duration;
1364
1365 use super::Builder;
1366 use super::ThreadPool;
1367 use super::WorkerCountData;
1368
1369 #[test]
1370 fn it_works() {
1371 let pool = ThreadPool::new(2, 10, Duration::from_secs(5));
1372 let count = Arc::new(AtomicUsize::new(0));
1373
1374 let count1 = count.clone();
1375 pool.execute(move || {
1376 count1.fetch_add(1, Ordering::Relaxed);
1377 thread::sleep(std::time::Duration::from_secs(4));
1378 });
1379 let count2 = count.clone();
1380 pool.execute(move || {
1381 count2.fetch_add(1, Ordering::Relaxed);
1382 thread::sleep(std::time::Duration::from_secs(4));
1383 });
1384 let count3 = count.clone();
1385 pool.execute(move || {
1386 count3.fetch_add(1, Ordering::Relaxed);
1387 thread::sleep(std::time::Duration::from_secs(4));
1388 });
1389 let count4 = count.clone();
1390 pool.execute(move || {
1391 count4.fetch_add(1, Ordering::Relaxed);
1392 thread::sleep(std::time::Duration::from_secs(4));
1393 });
1394 thread::sleep(std::time::Duration::from_secs(20));
1395 let count5 = count.clone();
1396 pool.execute(move || {
1397 count5.fetch_add(1, Ordering::Relaxed);
1398 thread::sleep(std::time::Duration::from_secs(4));
1399 });
1400 let count6 = count.clone();
1401 pool.execute(move || {
1402 count6.fetch_add(1, Ordering::Relaxed);
1403 thread::sleep(std::time::Duration::from_secs(4));
1404 });
1405 let count7 = count.clone();
1406 pool.execute(move || {
1407 count7.fetch_add(1, Ordering::Relaxed);
1408 thread::sleep(std::time::Duration::from_secs(4));
1409 });
1410 let count8 = count.clone();
1411 pool.execute(move || {
1412 count8.fetch_add(1, Ordering::Relaxed);
1413 thread::sleep(std::time::Duration::from_secs(4));
1414 });
1415 thread::sleep(std::time::Duration::from_secs(20));
1416
1417 let count = count.load(Ordering::Relaxed);
1418 let worker_count = pool.get_current_worker_count();
1419
1420 assert_eq!(count, 8);
1421 // assert that non-core threads were dropped
1422 assert_eq!(worker_count, 2);
1423 assert_eq!(pool.get_idle_worker_count(), 2);
1424 }
1425
1426 #[test]
1427 #[ignore]
1428 fn stress_test() {
1429 let pool = Arc::new(ThreadPool::new(3, 50, Duration::from_secs(30)));
1430 let counter = Arc::new(AtomicUsize::new(0));
1431
1432 for _ in 0..5 {
1433 let pool_1 = pool.clone();
1434 let clone = counter.clone();
1435 pool.execute(move || {
1436 for _ in 0..160 {
1437 let clone = clone.clone();
1438 pool_1.execute(move || {
1439 clone.fetch_add(1, Ordering::Relaxed);
1440 thread::sleep(Duration::from_secs(10));
1441 });
1442 }
1443
1444 thread::sleep(Duration::from_secs(20));
1445
1446 for _ in 0..160 {
1447 let clone = clone.clone();
1448 pool_1.execute(move || {
1449 clone.fetch_add(1, Ordering::Relaxed);
1450 thread::sleep(Duration::from_secs(10));
1451 });
1452 }
1453 });
1454 }
1455
1456 thread::sleep(Duration::from_secs(10));
1457 assert_eq!(pool.get_current_worker_count(), 50);
1458
1459 pool.join();
1460 assert_eq!(counter.load(Ordering::Relaxed), 1600);
1461 thread::sleep(Duration::from_secs(31));
1462 assert_eq!(pool.get_current_worker_count(), 3);
1463 }
1464
1465 #[test]
1466 fn test_join() {
1467 // use a thread pool with one thread max to make sure the second task starts after
1468 // pool.join() is called to make sure it joins future tasks as well
1469 let pool = ThreadPool::new(0, 1, Duration::from_secs(5));
1470 let counter = Arc::new(AtomicUsize::new(0));
1471
1472 let clone_1 = counter.clone();
1473 pool.execute(move || {
1474 thread::sleep(Duration::from_secs(5));
1475 clone_1.fetch_add(1, Ordering::Relaxed);
1476 });
1477
1478 let clone_2 = counter.clone();
1479 pool.execute(move || {
1480 thread::sleep(Duration::from_secs(5));
1481 clone_2.fetch_add(1, Ordering::Relaxed);
1482 });
1483
1484 pool.join();
1485
1486 assert_eq!(counter.load(Ordering::Relaxed), 2);
1487 }
1488
1489 #[test]
1490 fn test_join_timeout() {
1491 let pool = ThreadPool::new(0, 1, Duration::from_secs(5));
1492 let counter = Arc::new(AtomicUsize::new(0));
1493
1494 let clone = counter.clone();
1495 pool.execute(move || {
1496 thread::sleep(Duration::from_secs(10));
1497 clone.fetch_add(1, Ordering::Relaxed);
1498 });
1499
1500 pool.join_timeout(Duration::from_secs(5));
1501 assert_eq!(counter.load(Ordering::Relaxed), 0);
1502 pool.join();
1503 assert_eq!(counter.load(Ordering::Relaxed), 1);
1504 }
1505
1506 #[test]
1507 fn test_shutdown() {
1508 let pool = ThreadPool::new(1, 3, Duration::from_secs(5));
1509 let counter = Arc::new(AtomicUsize::new(0));
1510
1511 let clone_1 = counter.clone();
1512 pool.execute(move || {
1513 thread::sleep(Duration::from_secs(5));
1514 clone_1.fetch_add(1, Ordering::Relaxed);
1515 });
1516
1517 let clone_2 = counter.clone();
1518 pool.execute(move || {
1519 thread::sleep(Duration::from_secs(5));
1520 clone_2.fetch_add(1, Ordering::Relaxed);
1521 });
1522
1523 let clone_3 = counter.clone();
1524 pool.execute(move || {
1525 thread::sleep(Duration::from_secs(5));
1526 clone_3.fetch_add(1, Ordering::Relaxed);
1527 });
1528
1529 // since the pool only allows three threads this won't get the chance to run
1530 let clone_4 = counter.clone();
1531 pool.execute(move || {
1532 thread::sleep(Duration::from_secs(5));
1533 clone_4.fetch_add(1, Ordering::Relaxed);
1534 });
1535
1536 pool.join_timeout(Duration::from_secs(2));
1537 pool.shutdown();
1538
1539 thread::sleep(Duration::from_secs(5));
1540
1541 assert_eq!(counter.load(Ordering::Relaxed), 3);
1542 }
1543
1544 #[should_panic(
1545 expected = "max_size must be greater than 0 and greater or equal to the core pool size"
1546 )]
1547 #[test]
1548 fn test_panic_on_0_max_pool_size() {
1549 ThreadPool::new(0, 0, Duration::from_secs(2));
1550 }
1551
1552 #[should_panic(
1553 expected = "max_size must be greater than 0 and greater or equal to the core pool size"
1554 )]
1555 #[test]
1556 fn test_panic_on_smaller_max_than_core_pool_size() {
1557 ThreadPool::new(10, 4, Duration::from_secs(2));
1558 }
1559
1560 #[should_panic(expected = "max_size may not exceed")]
1561 #[test]
1562 fn test_panic_on_max_size_exceeds_half_usize() {
1563 ThreadPool::new(
1564 10,
1565 1 << ((std::mem::size_of::<usize>() * 8) / 2),
1566 Duration::from_secs(2),
1567 );
1568 }
1569
1570 #[test]
1571 fn test_empty_join() {
1572 let pool = ThreadPool::new(3, 10, Duration::from_secs(10));
1573 pool.join();
1574 }
1575
1576 #[test]
1577 fn test_join_when_complete() {
1578 let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1579
1580 pool.execute(|| {
1581 thread::sleep(Duration::from_millis(5000));
1582 });
1583
1584 thread::sleep(Duration::from_millis(5000));
1585 pool.join();
1586 }
1587
1588 #[test]
1589 fn test_full_usage() {
1590 let pool = ThreadPool::new(5, 50, Duration::from_secs(10));
1591
1592 for _ in 0..100 {
1593 pool.execute(|| {
1594 thread::sleep(Duration::from_secs(30));
1595 });
1596 }
1597
1598 thread::sleep(Duration::from_secs(10));
1599 assert_eq!(pool.get_current_worker_count(), 50);
1600
1601 pool.join();
1602 thread::sleep(Duration::from_secs(15));
1603 assert_eq!(pool.get_current_worker_count(), 5);
1604 }
1605
1606 #[test]
1607 fn test_shutdown_join() {
1608 let pool = ThreadPool::new(1, 1, Duration::from_secs(5));
1609 let counter = Arc::new(AtomicUsize::new(0));
1610
1611 let clone = counter.clone();
1612 pool.execute(move || {
1613 thread::sleep(Duration::from_secs(10));
1614 clone.fetch_add(1, Ordering::Relaxed);
1615 });
1616
1617 pool.shutdown_join();
1618 assert_eq!(counter.load(Ordering::Relaxed), 1);
1619 }
1620
1621 #[test]
1622 fn test_shutdown_join_timeout() {
1623 let pool = ThreadPool::new(1, 1, Duration::from_secs(5));
1624 let counter = Arc::new(AtomicUsize::new(0));
1625
1626 let clone = counter.clone();
1627 pool.execute(move || {
1628 thread::sleep(Duration::from_secs(10));
1629 clone.fetch_add(1, Ordering::Relaxed);
1630 });
1631
1632 pool.shutdown_join_timeout(Duration::from_secs(5));
1633 assert_eq!(counter.load(Ordering::Relaxed), 0);
1634 }
1635
1636 #[test]
1637 fn test_empty_shutdown_join() {
1638 let pool = ThreadPool::new(1, 5, Duration::from_secs(5));
1639 pool.shutdown_join();
1640 }
1641
1642 #[test]
1643 fn test_shutdown_core_pool() {
1644 let pool = ThreadPool::new(5, 5, Duration::from_secs(1));
1645 let counter = Arc::new(AtomicUsize::new(0));
1646 let worker_data = pool.worker_data.clone();
1647
1648 for _ in 0..7 {
1649 let clone = counter.clone();
1650 pool.execute(move || {
1651 thread::sleep(Duration::from_secs(2));
1652 clone.fetch_add(1, Ordering::Relaxed);
1653 });
1654 }
1655
1656 assert_eq!(pool.get_current_worker_count(), 5);
1657 assert_eq!(pool.get_idle_worker_count(), 0);
1658 pool.shutdown_join();
1659 assert_eq!(counter.load(Ordering::Relaxed), 7);
1660
1661 // give the workers time to exit
1662 thread::sleep(Duration::from_millis(50));
1663 assert_eq!(worker_data.worker_count_data.get_total_worker_count(), 0);
1664 assert_eq!(worker_data.worker_count_data.get_idle_worker_count(), 0);
1665 }
1666
1667 #[test]
1668 fn test_shutdown_idle_core_pool() {
1669 let pool = ThreadPool::new(5, 5, Duration::from_secs(1));
1670 let counter = Arc::new(AtomicUsize::new(0));
1671 let worker_data = pool.worker_data.clone();
1672
1673 for _ in 0..5 {
1674 let clone = counter.clone();
1675 pool.execute(move || {
1676 clone.fetch_add(1, Ordering::Relaxed);
1677 });
1678 }
1679
1680 pool.shutdown_join();
1681 assert_eq!(counter.load(Ordering::Relaxed), 5);
1682
1683 // give the workers time to exit
1684 thread::sleep(Duration::from_millis(50));
1685 assert_eq!(worker_data.worker_count_data.get_total_worker_count(), 0);
1686 assert_eq!(worker_data.worker_count_data.get_idle_worker_count(), 0);
1687 }
1688
1689 #[test]
1690 fn test_shutdown_on_complete() {
1691 let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1692
1693 pool.execute(|| {
1694 thread::sleep(Duration::from_millis(5000));
1695 });
1696
1697 thread::sleep(Duration::from_millis(5000));
1698 pool.shutdown_join();
1699 }
1700
1701 #[test]
1702 fn test_shutdown_after_complete() {
1703 let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1704
1705 pool.execute(|| {
1706 thread::sleep(Duration::from_millis(5000));
1707 });
1708
1709 thread::sleep(Duration::from_millis(7000));
1710 pool.shutdown_join();
1711 }
1712
1713 #[test]
1714 fn worker_count_test() {
1715 let worker_count_data = WorkerCountData::default();
1716
1717 assert_eq!(worker_count_data.get_total_worker_count(), 0);
1718 assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1719
1720 worker_count_data.increment_both();
1721
1722 assert_eq!(worker_count_data.get_total_worker_count(), 1);
1723 assert_eq!(worker_count_data.get_idle_worker_count(), 1);
1724
1725 for _ in 0..10 {
1726 worker_count_data.increment_both();
1727 }
1728
1729 assert_eq!(worker_count_data.get_total_worker_count(), 11);
1730 assert_eq!(worker_count_data.get_idle_worker_count(), 11);
1731
1732 for _ in 0..15 {
1733 worker_count_data.increment_worker_total();
1734 }
1735
1736 for _ in 0..7 {
1737 worker_count_data.increment_worker_idle();
1738 }
1739
1740 assert_eq!(worker_count_data.get_total_worker_count(), 26);
1741 assert_eq!(worker_count_data.get_idle_worker_count(), 18);
1742 assert_eq!(worker_count_data.get_both(), (26, 18));
1743
1744 for _ in 0..5 {
1745 worker_count_data.decrement_both();
1746 }
1747
1748 assert_eq!(worker_count_data.get_total_worker_count(), 21);
1749 assert_eq!(worker_count_data.get_idle_worker_count(), 13);
1750
1751 for _ in 0..13 {
1752 worker_count_data.decrement_worker_total();
1753 }
1754
1755 for _ in 0..4 {
1756 worker_count_data.decrement_worker_idle();
1757 }
1758
1759 assert_eq!(worker_count_data.get_total_worker_count(), 8);
1760 assert_eq!(worker_count_data.get_idle_worker_count(), 9);
1761
1762 for _ in 0..456789 {
1763 worker_count_data.increment_worker_total();
1764 }
1765
1766 assert_eq!(worker_count_data.get_total_worker_count(), 456797);
1767 assert_eq!(worker_count_data.get_idle_worker_count(), 9);
1768 assert_eq!(worker_count_data.get_both(), (456797, 9));
1769
1770 for _ in 0..23456 {
1771 worker_count_data.increment_worker_idle();
1772 }
1773
1774 assert_eq!(worker_count_data.get_total_worker_count(), 456797);
1775 assert_eq!(worker_count_data.get_idle_worker_count(), 23465);
1776
1777 for _ in 0..150000 {
1778 worker_count_data.decrement_worker_total();
1779 }
1780
1781 assert_eq!(worker_count_data.get_total_worker_count(), 306797);
1782 assert_eq!(worker_count_data.get_idle_worker_count(), 23465);
1783
1784 for _ in 0..10000 {
1785 worker_count_data.decrement_worker_idle();
1786 }
1787
1788 assert_eq!(worker_count_data.get_total_worker_count(), 306797);
1789 assert_eq!(worker_count_data.get_idle_worker_count(), 13465);
1790 }
1791
1792 #[test]
1793 fn test_try_increment_worker_total() {
1794 let worker_count_data = WorkerCountData::default();
1795
1796 let witness = worker_count_data.try_increment_worker_total(0, 5);
1797 assert_eq!(witness, 0);
1798 assert_eq!(worker_count_data.get_total_worker_count(), 1);
1799 assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1800
1801 let witness = worker_count_data.try_increment_worker_total(0, 5);
1802 assert_eq!(witness, 0x0000_0001_0000_0000);
1803 assert_eq!(worker_count_data.get_total_worker_count(), 2);
1804 assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1805
1806 worker_count_data.try_increment_worker_total(2, 5);
1807 worker_count_data.try_increment_worker_total(2, 5);
1808 worker_count_data.try_increment_worker_total(4, 5);
1809 worker_count_data.try_increment_worker_total(4, 5);
1810 let witness = worker_count_data.try_increment_worker_total(2, 5);
1811 assert_eq!(WorkerCountData::get_total_count(witness), 5);
1812 assert_eq!(WorkerCountData::get_idle_count(witness), 0);
1813 assert_eq!(worker_count_data.get_total_worker_count(), 5);
1814 assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1815
1816 let worker_count_data = Arc::new(worker_count_data);
1817
1818 let mut join_handles = Vec::with_capacity(5);
1819 for _ in 0..5 {
1820 let worker_count_data = worker_count_data.clone();
1821 let join_handle = thread::spawn(move || {
1822 for i in 0..5 {
1823 worker_count_data.try_increment_worker_total(5 + i, 15);
1824 }
1825 });
1826
1827 join_handles.push(join_handle);
1828 }
1829
1830 for join_handle in join_handles {
1831 join_handle.join().unwrap();
1832 }
1833
1834 assert_eq!(worker_count_data.get_total_worker_count(), 15);
1835 assert_eq!(worker_count_data.get_idle_worker_count(), 0);
1836 }
1837
1838 #[test]
1839 fn test_join_enqueued_task() {
1840 let pool = ThreadPool::new(3, 50, Duration::from_secs(20));
1841 let counter = Arc::new(AtomicUsize::new(0));
1842
1843 for _ in 0..160 {
1844 let clone = counter.clone();
1845 pool.execute(move || {
1846 thread::sleep(Duration::from_secs(10));
1847 clone.fetch_add(1, Ordering::Relaxed);
1848 });
1849 }
1850
1851 thread::sleep(Duration::from_secs(5));
1852 assert_eq!(pool.get_current_worker_count(), 50);
1853
1854 pool.join();
1855 assert_eq!(counter.load(Ordering::Relaxed), 160);
1856 thread::sleep(Duration::from_secs(21));
1857 assert_eq!(pool.get_current_worker_count(), 3);
1858 }
1859
1860 #[test]
1861 fn test_panic_all() {
1862 let pool = ThreadPool::new(3, 10, Duration::from_secs(2));
1863
1864 for _ in 0..10 {
1865 pool.execute(|| {
1866 panic!("test");
1867 })
1868 }
1869
1870 pool.join();
1871 thread::sleep(Duration::from_secs(5));
1872 assert_eq!(pool.get_current_worker_count(), 3);
1873 assert_eq!(pool.get_idle_worker_count(), 3);
1874 }
1875
1876 #[test]
1877 fn test_panic_some() {
1878 let pool = ThreadPool::new(3, 10, Duration::from_secs(5));
1879 let counter = Arc::new(AtomicUsize::new(0));
1880
1881 for i in 0..10 {
1882 let clone = counter.clone();
1883 pool.execute(move || {
1884 if i < 3 || i % 2 == 0 {
1885 thread::sleep(Duration::from_secs(5));
1886 clone.fetch_add(1, Ordering::Relaxed);
1887 } else {
1888 thread::sleep(Duration::from_secs(5));
1889 panic!("test");
1890 }
1891 })
1892 }
1893
1894 pool.join();
1895 assert_eq!(counter.load(Ordering::Relaxed), 6);
1896 assert_eq!(pool.get_current_worker_count(), 10);
1897 assert_eq!(pool.get_idle_worker_count(), 10);
1898 thread::sleep(Duration::from_secs(10));
1899 assert_eq!(pool.get_current_worker_count(), 3);
1900 assert_eq!(pool.get_idle_worker_count(), 3);
1901 }
1902
1903 #[test]
1904 fn test_panic_all_core_threads() {
1905 let pool = ThreadPool::new(3, 3, Duration::from_secs(1));
1906 let counter = Arc::new(AtomicUsize::new(0));
1907
1908 for _ in 0..3 {
1909 pool.execute(|| {
1910 panic!("test");
1911 })
1912 }
1913
1914 pool.join();
1915
1916 for i in 0..10 {
1917 let clone = counter.clone();
1918 pool.execute(move || {
1919 if i < 3 || i % 2 == 0 {
1920 clone.fetch_add(1, Ordering::Relaxed);
1921 } else {
1922 thread::sleep(Duration::from_secs(5));
1923 panic!("test");
1924 }
1925 })
1926 }
1927
1928 pool.join();
1929 assert_eq!(counter.load(Ordering::Relaxed), 6);
1930 assert_eq!(pool.get_current_worker_count(), 3);
1931 assert_eq!(pool.get_idle_worker_count(), 3);
1932 }
1933
1934 #[test]
1935 fn test_drop_all_receivers() {
1936 let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
1937 let counter = Arc::new(AtomicUsize::new(0));
1938
1939 for _ in 0..3 {
1940 let clone = counter.clone();
1941 pool.execute(move || {
1942 clone.fetch_add(1, Ordering::Relaxed);
1943 })
1944 }
1945
1946 pool.join();
1947 assert_eq!(counter.load(Ordering::Relaxed), 3);
1948 thread::sleep(Duration::from_secs(10));
1949 assert_eq!(pool.get_current_worker_count(), 0);
1950
1951 for _ in 0..3 {
1952 let clone = counter.clone();
1953 pool.execute(move || {
1954 clone.fetch_add(1, Ordering::Relaxed);
1955 })
1956 }
1957
1958 pool.join();
1959 assert_eq!(counter.load(Ordering::Relaxed), 6);
1960 }
1961
1962 #[test]
1963 fn test_evaluate() {
1964 let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
1965
1966 let count = AtomicUsize::new(0);
1967
1968 let handle = pool.evaluate(move || {
1969 count.fetch_add(1, Ordering::Relaxed);
1970 thread::sleep(Duration::from_secs(5));
1971 count.fetch_add(1, Ordering::Relaxed)
1972 });
1973
1974 let result = handle.await_complete();
1975 assert_eq!(result, 1);
1976 }
1977
1978 #[test]
1979 fn test_multiple_evaluate() {
1980 let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
1981
1982 let count = AtomicUsize::new(0);
1983 let handle_1 = pool.evaluate(move || {
1984 for _ in 0..10000 {
1985 count.fetch_add(1, Ordering::Relaxed);
1986 }
1987
1988 thread::sleep(Duration::from_secs(5));
1989
1990 for _ in 0..10000 {
1991 count.fetch_add(1, Ordering::Relaxed);
1992 }
1993
1994 count.load(Ordering::Relaxed)
1995 });
1996
1997 let handle_2 = pool.evaluate(move || {
1998 let result = handle_1.await_complete();
1999 let mut count = result;
2000
2001 count += 15000;
2002
2003 thread::sleep(Duration::from_secs(5));
2004
2005 count += 20000;
2006
2007 count
2008 });
2009
2010 let result = handle_2.await_complete();
2011 assert_eq!(result, 55000);
2012 }
2013
2014 #[should_panic(expected = "could not receive message because channel was cancelled")]
2015 #[test]
2016 fn test_evaluate_panic() {
2017 let pool = Builder::new().core_size(5).max_size(50).build();
2018
2019 let handle = pool.evaluate(|| {
2020 let x = 3;
2021
2022 if x == 3 {
2023 panic!("expected panic")
2024 }
2025
2026 return x;
2027 });
2028
2029 handle.await_complete();
2030 }
2031
2032 #[test]
2033 fn test_complete_fut() {
2034 let pool = ThreadPool::new(0, 3, Duration::from_secs(5));
2035
2036 async fn async_fn() -> i8 {
2037 8
2038 }
2039
2040 let fut = async_fn();
2041 let handle = pool.complete(fut);
2042
2043 assert_eq!(handle.await_complete(), 8);
2044 }
2045
2046 #[cfg(feature = "async")]
2047 #[test]
2048 fn test_spawn() {
2049 let pool = ThreadPool::default();
2050
2051 async fn add(x: i32, y: i32) -> i32 {
2052 x + y
2053 }
2054
2055 async fn multiply(x: i32, y: i32) -> i32 {
2056 x * y
2057 }
2058
2059 let count = Arc::new(AtomicUsize::new(0));
2060 let clone = count.clone();
2061 pool.spawn(async move {
2062 let a = add(2, 3).await; // 5
2063 let b = add(2, a).await; // 7
2064 let c = multiply(2, b).await; // 14
2065 let d = multiply(a, add(2, 1).await).await; // 15
2066 let e = add(c, d).await; // 29
2067
2068 clone.fetch_add(e as usize, Ordering::Relaxed);
2069 });
2070
2071 pool.join();
2072 assert_eq!(count.load(Ordering::Relaxed), 29);
2073 }
2074
2075 #[cfg(feature = "async")]
2076 #[test]
2077 fn test_spawn_await() {
2078 let pool = ThreadPool::default();
2079
2080 async fn sub(x: i32, y: i32) -> i32 {
2081 x - y
2082 }
2083
2084 async fn div(x: i32, y: i32) -> i32 {
2085 x / y
2086 }
2087
2088 let handle = pool.spawn_await(async {
2089 let a = sub(120, 10).await; // 110
2090 let b = div(sub(a, 10).await, 4).await; // 25
2091 div(sub(b, div(10, 2).await).await, 5).await // 4
2092 });
2093
2094 assert_eq!(handle.await_complete(), 4)
2095 }
2096
2097 #[test]
2098 fn test_drop_oneshot_receiver() {
2099 let pool = Builder::new().core_size(1).max_size(1).build();
2100
2101 let handle = pool.evaluate(|| {
2102 thread::sleep(Duration::from_secs(5));
2103 5
2104 });
2105
2106 drop(handle);
2107 thread::sleep(Duration::from_secs(10));
2108 let current_thread_index = pool.worker_data.worker_number.load(Ordering::Relaxed);
2109 // current worker number of 2 means that one worker has started (initial number is 1 -> first worker gets and increments number)
2110 // indicating that the worker did not panic else it would have been replaced.
2111 assert_eq!(current_thread_index, 2);
2112 }
2113
2114 #[test]
2115 fn test_builder_max_size() {
2116 Builder::new().max_size(1).build();
2117 }
2118
2119 #[test]
2120 fn test_multi_thread_join() {
2121 let pool = ThreadPool::default();
2122 let count = Arc::new(AtomicUsize::new(0));
2123
2124 let clone1 = count.clone();
2125 pool.execute(move || {
2126 thread::sleep(Duration::from_secs(10));
2127 clone1.fetch_add(1, Ordering::Relaxed);
2128 });
2129
2130 let clone2 = count.clone();
2131 pool.execute(move || {
2132 thread::sleep(Duration::from_secs(10));
2133 clone2.fetch_add(1, Ordering::Relaxed);
2134 });
2135
2136 let clone3 = count.clone();
2137 pool.execute(move || {
2138 thread::sleep(Duration::from_secs(10));
2139 clone3.fetch_add(1, Ordering::Relaxed);
2140 });
2141
2142 let pool2 = pool.clone();
2143 let clone4 = count.clone();
2144 thread::spawn(move || {
2145 thread::sleep(Duration::from_secs(5));
2146 pool2.execute(move || {
2147 thread::sleep(Duration::from_secs(15));
2148 clone4.fetch_add(2, Ordering::Relaxed);
2149 });
2150 });
2151
2152 let pool3 = pool.clone();
2153 let pool4 = pool.clone();
2154 let pool5 = pool.clone();
2155 let h1 = thread::spawn(move || {
2156 pool3.join();
2157 });
2158 let h2 = thread::spawn(move || {
2159 pool4.join();
2160 });
2161 let h3 = thread::spawn(move || {
2162 pool5.join();
2163 });
2164 h1.join().unwrap();
2165 h2.join().unwrap();
2166 h3.join().unwrap();
2167
2168 assert_eq!(count.load(Ordering::Relaxed), 5);
2169 }
2170
2171 #[test]
2172 fn test_start_core_threads() {
2173 let pool = Builder::new().core_size(5).build();
2174 pool.start_core_threads();
2175 assert_eq!(pool.get_current_worker_count(), 5);
2176 assert_eq!(pool.get_idle_worker_count(), 5);
2177 }
2178
2179 #[test]
2180 fn test_start_and_use_core_threads() {
2181 let pool = Builder::new()
2182 .core_size(5)
2183 .max_size(10)
2184 .keep_alive(Duration::from_secs(u64::MAX))
2185 .build();
2186 pool.start_core_threads();
2187 let result = pool.evaluate(|| 5 + 5).await_complete();
2188 assert_eq!(result, 10);
2189 assert_eq!(pool.get_current_worker_count(), 5);
2190 }
2191}