pub struct ThreadPool { /* private fields */ }
Expand description

Self growing / shrinking ThreadPool implementation based on crossbeam’s multi-producer multi-consumer channels that enables awaiting the result of a task and offers async support.

This ThreadPool has two different pool sizes; a core pool size filled with threads that live for as long as the channel and a max pool size which describes the maximum amount of worker threads that may live at the same time. Those additional non-core threads have a specific keep_alive time described when creating the ThreadPool that defines how long such threads may be idle for without receiving any work before giving up and terminating their work loop.

This ThreadPool does not spawn any threads until a task is submitted to it. Then it will create a new thread for each task until the core pool size is full. After that a new thread will only be created upon an execute() call if the current pool is lower than the max pool size and there are no idle threads.

Functions like evaluate() and complete() return a JoinHandle that may be used to await the result of a submitted task or future. JoinHandles may be sent to the thread pool to create a task that blocks a worker thread until it receives the result of the other task and then operates on the result. If the task panics the JoinHandle receives a cancellation error. This is implemented using a futures oneshot channel to communicate with the worker thread.

This ThreadPool may be used as a futures executor if the “async” feature is enabled, which is the case by default. The “async” feature includes the spawn() and try_spawn() functions which create a task that polls the future one by one and creates a waker that re-submits the future to the pool when it can make progress. Without the “async” feature, futures can simply be executed to completion using the complete function, which simply blocks a worker thread until the future has been polled to completion.

The “async” feature can be disabled if not need by adding the following to your Cargo dependency:

[dependencies.rusty_pool]
default-features = false
version = "*"

When creating a new worker this ThreadPool tries to increment the worker count using a compare-and-swap mechanism, if the increment fails because the total worker count has been incremented to the specified limit (the core_size when trying to create a core thread, else the max_size) by another thread, the pool tries to create a non-core worker instead (if previously trying to create a core worker and no idle worker exists) or sends the task to the channel instead. Panicking workers are always cloned and replaced.

Locks are only used for the join functions to lock the Condvar, apart from that this ThreadPool implementation fully relies on crossbeam and atomic operations. This ThreadPool decides whether it is currently idle (and should fast-return join attempts) by comparing the total worker count to the idle worker count, which are two values stored in one AtomicUsize (both half the size of usize) making sure that if both are updated they may be updated in a single atomic operation.

The thread pool and its crossbeam channel can be destroyed by using the shutdown function, however that does not stop tasks that are already running but will terminate the thread the next time it will try to fetch work from the channel. The channel is only destroyed once all clones of the ThreadPool have been shut down / dropped.

Usage

Create a new ThreadPool:

use rusty_pool::Builder;
use rusty_pool::ThreadPool;
// Create default `ThreadPool` configuration with the number of CPUs as core pool size
let pool = ThreadPool::default();
// Create a `ThreadPool` with default naming:
use std::time::Duration;
let pool2 = ThreadPool::new(5, 50, Duration::from_secs(60));
// Create a `ThreadPool` with a custom name:
let pool3 = ThreadPool::new_named(String::from("my_pool"), 5, 50, Duration::from_secs(60));
// using the Builder struct:
let pool4 = Builder::new().core_size(5).max_size(50).build();

Submit a closure for execution in the ThreadPool:

use rusty_pool::ThreadPool;
use std::thread;
use std::time::Duration;
let pool = ThreadPool::default();
pool.execute(|| {
    thread::sleep(Duration::from_secs(5));
    print!("hello");
});

Submit a task and await the result:

use rusty_pool::ThreadPool;
use std::thread;
use std::time::Duration;
let pool = ThreadPool::default();
let handle = pool.evaluate(|| {
    thread::sleep(Duration::from_secs(5));
    return 4;
});
let result = handle.await_complete();
assert_eq!(result, 4);

Spawn futures using the ThreadPool:

async fn some_async_fn(x: i32, y: i32) -> i32 {
    x + y
}

async fn other_async_fn(x: i32, y: i32) -> i32 {
    x - y
}

use rusty_pool::ThreadPool;
let pool = ThreadPool::default();

// simply complete future by blocking a worker until the future has been completed
let handle = pool.complete(async {
    let a = some_async_fn(4, 6).await; // 10
    let b = some_async_fn(a, 3).await; // 13
    let c = other_async_fn(b, a).await; // 3
    some_async_fn(c, 5).await // 8
});
assert_eq!(handle.await_complete(), 8);

use std::sync::{Arc, atomic::{AtomicI32, Ordering}};

// spawn future and create waker that automatically re-submits itself to the threadpool if ready to make progress, this requires the "async" feature which is enabled by default
let count = Arc::new(AtomicI32::new(0));
let clone = count.clone();
pool.spawn(async move {
    let a = some_async_fn(3, 6).await; // 9
    let b = other_async_fn(a, 4).await; // 5
    let c = some_async_fn(b, 7).await; // 12
    clone.fetch_add(c, Ordering::Relaxed);
});
pool.join();
assert_eq!(count.load(Ordering::Relaxed), 12);

Join and shut down the ThreadPool:

use std::thread;
use std::time::Duration;
use rusty_pool::ThreadPool;
use std::sync::{Arc, atomic::{AtomicI32, Ordering}};

let pool = ThreadPool::default();
for _ in 0..10 {
    pool.execute(|| { thread::sleep(Duration::from_secs(10)) })
}
// wait for all threads to become idle, i.e. all tasks to be completed including tasks added by other threads after join() is called by this thread or for the timeout to be reached
pool.join_timeout(Duration::from_secs(5));

let count = Arc::new(AtomicI32::new(0));
for _ in 0..15 {
    let clone = count.clone();
    pool.execute(move || {
        thread::sleep(Duration::from_secs(5));
        clone.fetch_add(1, Ordering::Relaxed);
    });
}

// shut down and drop the only instance of this `ThreadPool` (no clones) causing the channel to be broken leading all workers to exit after completing their current work
// and wait for all workers to become idle, i.e. finish their work.
pool.shutdown_join();
assert_eq!(count.load(Ordering::Relaxed), 15);

Implementations

Construct a new ThreadPool with the specified core pool size, max pool size and keep_alive time for non-core threads. This function does not spawn any threads. This ThreadPool will receive a default name in the following format: “rusty_pool_” + pool number.

core_size specifies the amount of threads to keep alive for as long as the ThreadPool exists and its channel remains connected.

max_size specifies the maximum number of worker threads that may exist at the same time.

keep_alive specifies the duration for which to keep non-core pool worker threads alive while they do not receive any work.

Panics

This function will panic if max_size is 0, lower than core_size or exceeds half the size of usize. This restriction exists because two counters (total workers and idle counters) are stored within one AtomicUsize.

Construct a new ThreadPool with the specified name, core pool size, max pool size and keep_alive time for non-core threads. This function does not spawn any threads.

name the name of the ThreadPool that will be used as prefix for each thread.

core_size specifies the amount of threads to keep alive for as long as the ThreadPool exists and its channel remains connected.

max_size specifies the maximum number of worker threads that may exist at the same time.

keep_alive specifies the duration for which to keep non-core pool worker threads alive while they do not receive any work.

Panics

This function will panic if max_size is 0, lower than core_size or exceeds half the size of usize. This restriction exists because two counters (total workers and idle counters) are stored within one AtomicUsize.

Get the number of live workers, includes all workers waiting for work or executing tasks.

This counter is incremented when creating a new worker. The value is increment just before the worker starts executing its initial task. Incrementing the worker total might fail if the total has already reached the specified limit (either core_size or max_size) after being incremented by another thread, as of rusty_pool 0.5.0 failed attempts to create a worker no longer skews the worker total as failed attempts to increment the worker total does not increment the value at all. This counter is decremented when a worker reaches the end of its working loop, which for non-core threads might happen if it does not receive any work during its keep alive time, for core threads this only happens once the channel is disconnected.

Get the number of workers currently waiting for work. Those threads are currently polling from the crossbeam receiver. Core threads wait indefinitely and might remain in this state until the ThreadPool is dropped. The remaining threads give up after waiting for the specified keep_alive time.

Send a new task to the worker threads. This function is responsible for sending the message through the channel and creating new workers if needed. If the current worker count is lower than the core pool size this function will always create a new worker. If the current worker count is equal to or greater than the core pool size this function only creates a new worker if the worker count is below the max pool size and there are no idle threads.

When attempting to increment the total worker count before creating a worker fails due to the counter reaching the provided limit (core_size when attempting to create core thread, else max_size) after being incremented by another thread, the pool tries to create a non-core worker instead (if previously trying to create a core worker and no idle worker exists) or sends the task to the channel instead. If incrementing the counter succeeded, either because the current value of the counter matched the expected value or because the last observed value was still below the limit, the worker starts with the provided task as initial task and spawns its thread.

Panics

This function might panic if try_execute returns an error when the crossbeam channel has been closed unexpectedly. This should never occur under normal circumstances using safe code, as shutting down the ThreadPool consumes ownership and the crossbeam channel is never dropped unless dropping the ThreadPool.

Send a new task to the worker threads. This function is responsible for sending the message through the channel and creating new workers if needed. If the current worker count is lower than the core pool size this function will always create a new worker. If the current worker count is equal to or greater than the core pool size this function only creates a new worker if the worker count is below the max pool size and there are no idle threads.

When attempting to increment the total worker count before creating a worker fails due to the counter reaching the provided limit (core_size when attempting to create core thread, else max_size) after being incremented by another thread, the pool tries to create a non-core worker instead (if previously trying to create a core worker and no idle worker exists) or sends the task to the channel instead. If incrementing the counter succeeded, either because the current value of the counter matched the expected value or because the last observed value was still below the limit, the worker starts with the provided task as initial task and spawns its thread.

Errors

This function might return crossbeam_channel::SendError if the sender was dropped unexpectedly.

Send a new task to the worker threads and return a JoinHandle that may be used to await the result. This function is responsible for sending the message through the channel and creating new workers if needed. If the current worker count is lower than the core pool size this function will always create a new worker. If the current worker count is equal to or greater than the core pool size this function only creates a new worker if the worker count is below the max pool size and there are no idle threads.

When attempting to increment the total worker count before creating a worker fails due to the counter reaching the provided limit (core_size when attempting to create core thread, else max_size) after being incremented by another thread, the pool tries to create a non-core worker instead (if previously trying to create a core worker and no idle worker exists) or sends the task to the channel instead. If incrementing the counter succeeded, either because the current value of the counter matched the expected value or because the last observed value was still below the limit, the worker starts with the provided task as initial task and spawns its thread.

Panics

This function might panic if try_execute returns an error when the crossbeam channel has been closed unexpectedly. This should never occur under normal circumstances using safe code, as shutting down the ThreadPool consumes ownership and the crossbeam channel is never dropped unless dropping the ThreadPool.

Send a new task to the worker threads and return a JoinHandle that may be used to await the result. This function is responsible for sending the message through the channel and creating new workers if needed. If the current worker count is lower than the core pool size this function will always create a new worker. If the current worker count is equal to or greater than the core pool size this function only creates a new worker if the worker count is below the max pool size and there are no idle threads.

When attempting to increment the total worker count before creating a worker fails due to the counter reaching the provided limit (core_size when attempting to create core thread, else max_size) after being incremented by another thread, the pool tries to create a non-core worker instead (if previously trying to create a core worker and no idle worker exists) or sends the task to the channel instead. If incrementing the counter succeeded, either because the current value of the counter matched the expected value or because the last observed value was still below the limit, the worker starts with the provided task as initial task and spawns its thread.

Errors

This function might return crossbeam_channel::SendError if the sender was dropped unexpectedly.

Send a task to the ThreadPool that completes the given Future and return a JoinHandle that may be used to await the result. This function simply calls evaluate() with a closure that calls block_on with the provided future.

Panic

This function panics if the task fails to be sent to the ThreadPool due to the channel being broken.

Send a task to the ThreadPool that completes the given Future and return a JoinHandle that may be used to await the result. This function simply calls try_evaluate() with a closure that calls block_on with the provided future.

Errors

This function returns crossbeam_channel::SendError if the task fails to be sent to the ThreadPool due to the channel being broken.

Submit a Future to be polled by this ThreadPool. Unlike complete() this does not block a worker until the Future has been completed but polls the Future once at a time and creates a Waker that re-submits the Future to this pool when awakened. Since Arc<AsyncTask> implements the Task trait this function simply constructs the AsyncTask and calls execute().

Panic

This function panics if the task fails to be sent to the ThreadPool due to the channel being broken.

Submit a Future to be polled by this ThreadPool. Unlike try_complete() this does not block a worker until the Future has been completed but polls the Future once at a time and creates a Waker that re-submits the Future to this pool when awakened. Since Arc<AsyncTask> implements the Task trait this function simply constructs the AsyncTask and calls try_execute().

Errors

This function returns crossbeam_channel::SendError if the task fails to be sent to the ThreadPool due to the channel being broken.

Create a top-level Future that awaits the provided Future and then sends the result to the returned JoinHandle. Unlike complete() this does not block a worker until the Future has been completed but polls the Future once at a time and creates a Waker that re-submits the Future to this pool when awakened. Since Arc<AsyncTask> implements the Task trait this function simply constructs the AsyncTask and calls execute().

This enables awaiting the final result outside of an async context like complete() while still polling the future lazily instead of eagerly blocking the worker until the future is done.

Panic

This function panics if the task fails to be sent to the ThreadPool due to the channel being broken.

Create a top-level Future that awaits the provided Future and then sends the result to the returned JoinHandle. Unlike try_complete() this does not block a worker until the Future has been completed but polls the Future once at a time and creates a Waker that re-submits the Future to this pool when awakened. Since Arc<AsyncTask> implements the Task trait this function simply constructs the AsyncTask and calls try_execute().

This enables awaiting the final result outside of an async context like complete() while still polling the future lazily instead of eagerly blocking the worker until the future is done.

Errors

This function returns crossbeam_channel::SendError if the task fails to be sent to the ThreadPool due to the channel being broken.

Blocks the current thread until there aren’t any non-idle threads anymore. This includes work started after calling this function. This function blocks until the next time this ThreadPool completes all of its work, except if all threads are idle and the channel is empty at the time of calling this function, in which case it will fast-return.

This utilizes a Condvar that is notified by workers when they complete a job and notice that the channel is currently empty and it was the last thread to finish the current generation of work (i.e. when incrementing the idle worker counter brings the value up to the total worker counter, meaning it’s the last thread to become idle).

Blocks the current thread until there aren’t any non-idle threads anymore or until the specified time_out Duration passes, whichever happens first. This includes work started after calling this function. This function blocks until the next time this ThreadPool completes all of its work, (or until the time_out is reached) except if all threads are idle and the channel is empty at the time of calling this function, in which case it will fast-return.

This utilizes a Condvar that is notified by workers when they complete a job and notice that the channel is currently empty and it was the last thread to finish the current generation of work (i.e. when incrementing the idle worker counter brings the value up to the total worker counter, meaning it’s the last thread to become idle).

Destroy this ThreadPool by claiming ownership and dropping the value, causing the Sender to drop thus disconnecting the channel. Threads in this pool that are currently executing a task will finish what they’re doing until they check the channel, discovering that it has been disconnected from the sender and thus terminate their work loop.

If other clones of this ThreadPool exist the sender will remain intact and tasks submitted to those clones will succeed, this includes pending AsyncTask instances as they hold an owned clone of the ThreadPool to re-submit awakened futures.

Destroy this ThreadPool by claiming ownership and dropping the value, causing the Sender to drop thus disconnecting the channel. Threads in this pool that are currently executing a task will finish what they’re doing until they check the channel, discovering that it has been disconnected from the sender and thus terminate their work loop.

If other clones of this ThreadPool exist the sender will remain intact and tasks submitted to those clones will succeed, this includes pending AsyncTask instances as they hold an owned clone of the ThreadPool to re-submit awakened futures.

This function additionally joins all workers after dropping the pool to wait for all work to finish. Blocks the current thread until there aren’t any non-idle threads anymore. This function blocks until this ThreadPool completes all of its work, except if all threads are idle and the channel is empty at the time of calling this function, in which case the join will fast-return. If other live clones of this ThreadPool exist this behaves the same as calling join on a live ThreadPool as tasks submitted to one of the clones will be joined as well.

The join utilizes a Condvar that is notified by workers when they complete a job and notice that the channel is currently empty and it was the last thread to finish the current generation of work (i.e. when incrementing the idle worker counter brings the value up to the total worker counter, meaning it’s the last thread to become idle).

Destroy this ThreadPool by claiming ownership and dropping the value, causing the Sender to drop thus disconnecting the channel. Threads in this pool that are currently executing a task will finish what they’re doing until they check the channel, discovering that it has been disconnected from the sender and thus terminate their work loop.

If other clones of this ThreadPool exist the sender will remain intact and tasks submitted to those clones will succeed, this includes pending AsyncTask instances as they hold an owned clone of the ThreadPool to re-submit awakened futures.

This function additionally joins all workers after dropping the pool to wait for all work to finish. Blocks the current thread until there aren’t any non-idle threads anymore or until the specified time_out Duration passes, whichever happens first. This function blocks until this ThreadPool completes all of its work, (or until the time_out is reached) except if all threads are idle and the channel is empty at the time of calling this function, in which case the join will fast-return. If other live clones of this ThreadPool exist this behaves the same as calling join on a live ThreadPool as tasks submitted to one of the clones will be joined as well.

The join utilizes a Condvar that is notified by workers when they complete a job and notice that the channel is currently empty and it was the last thread to finish the current generation of work (i.e. when incrementing the idle worker counter brings the value up to the total worker counter, meaning it’s the last thread to become idle).

Return the name of this pool, used as prefix for each worker thread.

Starts all core workers by creating core idle workers until the total worker count reaches the core count.

Returns immediately if the current worker count is already >= core size.

Trait Implementations

Returns a copy of the value. Read more

Performs copy-assignment from source. Read more

create default ThreadPool with the core pool size being equal to the number of cpus and the max_size being twice the core size with a 60 second timeout

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The resulting type after obtaining ownership.

Creates owned data from borrowed data, usually by cloning. Read more

🔬 This is a nightly-only experimental API. (toowned_clone_into)

Uses borrowed data to replace owned data, usually by cloning. Read more

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.