pub struct WPool { /* private fields */ }Expand description
WPool is a thread pool that limits the number of tasks executing concurrently,
without restricting how many tasks can be queued. Submitting tasks is non-blocking,
so you can enqueue any number of tasks without waiting.
Implementations§
Source§impl WPool
impl WPool
Sourcepub fn new(max_workers: usize) -> Self
pub fn new(max_workers: usize) -> Self
new creates and starts a pool of worker threads.
The max_workers parameter specifies the maximum number of workers that can
execute tasks concurrently. When there are no incoming tasks, workers are
gradually stopped until there are no remaining workers.
use wpool::WPool;
let max_workers = 5;
let wp = WPool::new(max_workers);
wp.stop_wait();Sourcepub fn new_with_min(max_workers: usize, min_workers: usize) -> Self
pub fn new_with_min(max_workers: usize, min_workers: usize) -> Self
new_with_min creates and starts a pool of worker threads.
The max_workers parameter specifies the maximum number of workers that can
execute tasks concurrently. When there are no incoming tasks, workers are
gradually stopped until there are no remaining workers.
The min_workers parameter specifies up to the minimum amount of workers that
should aways exist, even when the pool is idle. This is designed to help
eliminate the overhead of spinning up threads from scratch.
If min_workers is greater than max_workers we panic.
use wpool::WPool;
let max_workers = 5;
let min_workers = 3;
let wp = WPool::new_with_min(max_workers, min_workers);
wp.stop_wait();Sourcepub fn max_capacity(&self) -> usize
pub fn max_capacity(&self) -> usize
The number of possible max_workers. This does not reflect active workers.
use wpool::WPool;
let max_workers = 5;
let wp = WPool::new(max_workers);
assert_eq!(wp.max_capacity(), max_workers);
wp.stop_wait();Sourcepub fn min_capacity(&self) -> usize
pub fn min_capacity(&self) -> usize
The number of possible min_workers. This does not reflect active workers.
use wpool::WPool;
let max_workers = 5;
let min_workers = 3;
let wp = WPool::new_with_min(max_workers, min_workers);
assert_eq!(wp.max_capacity(), max_workers);
assert_eq!(wp.min_capacity(), min_workers);
wp.stop_wait();Sourcepub fn worker_count(&self) -> usize
pub fn worker_count(&self) -> usize
The number of active workers.
use wpool::WPool;
use std::thread;
use std::time::Duration;
let max_workers = 5;
let wp = WPool::new(max_workers);
// Should have max workers here.
assert_eq!(wp.worker_count(), max_workers);
for _ in 0..max_workers {
wp.submit(move || {
thread::sleep(Duration::from_secs(1));
});
}
// Give some time for worker to spawn.
thread::sleep(Duration::from_millis(5));
// Should have `max_workers` amount of workers.
assert_eq!(wp.worker_count(), max_workers);
wp.stop_wait();
// Should have 0 workers now.
assert_eq!(wp.worker_count(), 0);Sourcepub fn submit<F>(&self, task: F)
pub fn submit<F>(&self, task: F)
Enqueues the given function.
Any external values needed by the task function must be captured in a closure. Any return values should be sent over a channel, or by similar means.
submit is non-blocking, regardless of the number of tasks submitted. Each task
is immediately given to an available worker. If there are no available workers, and
the maximum number of workers are already created, the task will be put in a wait queue.
As long as there are tasks in the wait queue, any additional new tasks are put in the wait queue. Tasks are removed from the wait queue as workers become available.
Sourcepub fn submit_wait<F>(&self, task: F)
pub fn submit_wait<F>(&self, task: F)
Enqueues the given function and blocks until it has been executed.
Unlike submit_confirm(...), this method waits until the job has finished executing.
submit_confirm(...) only blocks until the task is either assigned to a worker.
use wpool::WPool;
use std::time::Duration;
use std::thread;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
let wp = WPool::new(2);
let counter = Arc::new(AtomicUsize::new(0));
let c = Arc::clone(&counter);
// Block here until submitted job finishes.
wp.submit_wait(move || {
// If we fail to wait for this job to finish,
// our assert will run before this `Duration`.
thread::sleep(Duration::from_secs(1));
c.fetch_add(1, Ordering::SeqCst);
});
// Verify we waited for execution.
assert_eq!(counter.load(Ordering::SeqCst), 1);
wp.stop_wait();Sourcepub fn submit_confirm<F>(&self, task: F)
pub fn submit_confirm<F>(&self, task: F)
Enqueues the given function and blocks until it has been given to a worker.
Unlike submit_wait(...), this method only blocks until the task is given to a worker.
use wpool::WPool;
use std::thread;
use std::time::Duration;
let max_workers = 5;
let wp = WPool::new(max_workers);
for i in 1..=max_workers {
// Will block here until job is *submitted*.
wp.submit_confirm(|| {
thread::sleep(Duration::from_secs(2));
});
// Now you know that a worker has been spawned, or job placed in queue.
}
assert_eq!(wp.worker_count(), max_workers);
wp.stop_wait();Sourcepub fn stop_wait(&self)
pub fn stop_wait(&self)
Stops the pool and waits for currently running tasks, as well as any tasks
in the wait queue, to complete. Task submission is disallowed after
stop_wait() has been called.
Since creating the pool starts at least one thread, for the dispatcher,
stop() or stop_wait() should only be called when the worker pool is no
longer needed.
use wpool::WPool;
use std::time::Duration;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::thread;
let max_workers = 3;
// `num_jobs` is greater than `max_workers` so we can get jobs into wait queue.
let num_jobs = 5;
let wp = WPool::new(max_workers);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..num_jobs {
let c = Arc::clone(&counter);
wp.submit(move || {
// Sleep for a while so jobs are put into wait queue.
thread::sleep(Duration::from_secs(1));
// Increment counter to prove job executed.
c.fetch_add(1, Ordering::SeqCst);
});
}
// Allow currently executing jobs to complete PLUS the wait queue.
wp.stop_wait();
// Verify that all jobs executed.
assert_eq!(counter.load(Ordering::SeqCst), num_jobs);Sourcepub fn stop(&self)
pub fn stop(&self)
stop stops the worker pool and waits for only currently running tasks to
complete. Pending tasks that are not currently running are abandoned. Tasks
must not be submitted to the pool after calling stop.
Since creating the pool starts at least one thread, for the dispatcher,
stop() or stop_wait() should only be called when the worker pool is no
longer needed.
use wpool::WPool;
use std::time::Duration;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use std::thread;
let max_workers = 3;
// `num_jobs` is greater than `max_workers` so we can get jobs into wait queue.
let num_jobs = 5;
let wp = WPool::new(max_workers);
let counter = Arc::new(AtomicUsize::new(0));
for _ in 0..num_jobs {
let c = Arc::clone(&counter);
wp.submit(move || {
// Sleep for a while so jobs are put into wait queue.
thread::sleep(Duration::from_secs(1));
// Increment counter to prove job executed.
c.fetch_add(1, Ordering::SeqCst);
});
}
// Allow currently executing jobs to complete BUT abandoned the wait queue.
wp.stop();
// Verify that only up to `max_workers` jobs were complete.
assert_eq!(counter.load(Ordering::SeqCst), max_workers);Sourcepub fn pause(&self)
pub fn pause(&self)
Pause all possible workers and block until each worker has acknowledged that they’re paused.
You must explicity call resume(), stop(), or stop_wait() to unpause the
pool. If you want to unpause without any side-effects, call resume().
Paused workers are unable to accept new tasks, although you can still submit tasks, which will be picked up by workers once they’re resumed.
If the number of active workers is less than the pool maximum, workers will be spawned, up to the pool maximum, and immediately paused. This ensures that every worker that could possibly exist in the pool is paused.
use wpool::WPool;
let wp = WPool::new(2);
// Suppose you had a long running job that you need to wait for...
wp.submit(|| { /* Doing lots of work */ });
//
// ...but you had other tasks to do.
//
// Doing unrelated work here...
//
// Now you need to ensure your long running job is finished.
// `pause()` will block until all currently executing jobs have finished.
wp.pause();
// Now you know it has finished.
wp.resume();
wp.stop_wait();Sourcepub fn resume(&self)
pub fn resume(&self)
Resumes/unpauses all paused workers. If the pool is already stopped or is not paused we return early.
use wpool::WPool;
let wp = WPool::new(2);
wp.pause();
wp.resume();
wp.stop_wait();Sourcepub fn get_workers_panic_info(&self) -> Vec<PanicReport>
pub fn get_workers_panic_info(&self) -> Vec<PanicReport>
Returns all PanicInfo for workers that have panicked.
use wpool::WPool;
let wp = WPool::new(3);
wp.submit_confirm(|| panic!("something went wrong!"));
println!("{:#?}", wp.get_workers_panic_info());
// [
// PanicReport {
// thread_id: ThreadId(
// 3,
// ),
// message: "something went wrong!",
// backtrace: <backtrace here, removed for brevity>,
// },
// ]
wp.stop_wait();