Expand description
Flexible thread pool implementations for all your multithreaded processing needs.
§Overview
This library includes two thread pool implementations, Threadpool and OrderedThreadpool.
Threadpoolconsumes jobs passed to it, processes them, and then yields them back out in the order they finish.OrderedThreadpoolconsumes jobs passed to it, processes them, and then yields them out in the same order they were received in.
Aside from a few small details, they share many of the same features and properties.
Thread pools can be used for many purposes: they can be spawned momentarily to process a single iterator, and then destroyed (although for that purpose, rayon is likely a better choice); they can be spawned for long-running tasks that require multiple workers to respond, like a webservice or database management system; they can be spun up and reused multiple times to evade the overhead of repeatedly creating and destroying threads to perform repeated parallel work; the list goes on.
Additionally, this library provides a little helper utility ReduceAsync::reduce_async, which implements a unique feedback-loop style fully parallelized reducing algorithm. It pairs nicely when combined with the thread pools, so there are a suite of filtering, mapping, and reduction extensions available to make including its use in your code simple and straightforward.
These are all the current iteration extensions implemented in this library:
FilterMapMultithread::filter_map_multithreadFilterMapAsync::filter_map_asyncFilterMapAsyncUnordered::filter_map_async_unorderedReduceAsync::reduce_asyncReduceAsyncCommutative::reduce_async_commutativeFilterMapReduceAsync::filter_map_reduce_asyncFilterMapReduceAsyncCommutative::filter_map_reduce_async_commutativePipe::pipe
TL;DR: Spawn a thread pool, use it to asynchronously process data, then discard it once you finish. Or don’t, I’m not your dad.
§Examples
§Synchronously process a sequence of elements:
use std::thread::{self, scope};
use threadpools::*;
fn is_prime(n: usize) -> bool {
if n < 2 {
return false;
}
for i in 2..=((n as f64).sqrt() as usize) {
if n % i == 0 {
return false;
}
}
true
}
scope(|scope| {
let pool = Threadpool::new(|x| is_prime(x).then_some(x), scope);
pool.submit_all(0..1000);
let total: usize = pool.into_iter().sum();
assert_eq!(total, 76127);
})§Asynchronously process a sequence of elements:
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::{self, scope};
use std::time::Duration;
use threadpools::*;
fn is_prime(n: usize) -> bool {
if n < 2 {
return false;
}
for i in 2..=((n as f64).sqrt() as usize) {
if n % i == 0 {
return false;
}
}
true
}
let total = AtomicUsize::new(0);
scope(|scope| {
let pool = Threadpool::new(|x| is_prime(x).then_some(x), scope);
pool.producer(0..1000);
pool.consumer(|x| {
total.fetch_add(x, Ordering::SeqCst);
});
pool.wait_until_finished();
// unavoidable necessity to wait for the consumer to finish running;
// this is a nonstandard usage of the consumer.
// A real world application would have a more robust, purpose-built
// synchronization strategy.
thread::sleep(Duration::from_millis(50));
let total = total.load(Ordering::Acquire);
assert_eq!(total, 76127);
})§Filter, Map, & Reduce Asynchronously:
use threadpools::*;
let vals = 0..10000usize;
let sequential_result = vals
.clone()
.filter_map(|x: usize| {
let x = x.pow(3) % 100;
(x > 50).then_some(x)
})
.reduce(|a, b| a + b)
.unwrap();
let parallel_result = vals
.filter_map_reduce_async_commutative(
|x: usize| {
let x = x.pow(3) % 100;
(x > 50).then_some(x)
},
|a, b| a + b,
)
.unwrap();
assert_eq!(sequential_result, parallel_result);§Chain multiple pools together:
use threadpools::*;
use std::thread::scope;
scope(|scope| {
let sequential_result = (0..10000usize)
.filter_map(|x: usize| {
let x = x.pow(3) % 100;
(x > 50).then_some(x)
})
.reduce(|a, b| a + b)
.unwrap();
let parallel_result = (0..10000usize)
.pipe(Threadpool::new(|x: usize| Some(x.pow(3)), scope))
.pipe(Threadpool::new(|x: usize| Some(x % 100), scope))
.pipe(Threadpool::new(|x: usize| (x > 50).then_some(x), scope))
.into_iter()
.reduce_async_commutative(|a, b| a + b)
.unwrap();
assert_eq!(sequential_result, parallel_result);
});See /tests for more usage examples.
Re-exports§
Modules§
- ordered
- Ordered thread pool implementation.
- reduction
- Multithreaded reducers.
- unordered
- Standard, unordered thread pool implementation.
Traits§
- Filter
MapReduce Async - Extension trait to provide the
filter_map_reduce_asyncfunction for iterators. - Filter
MapReduce Async Commutative - Extension trait to provide the
filter_map_reduce_async_commutativefunction for iterators. - Generic
Threadpool - Generic trait representing a thread pool.
- Pipe
- Extension trait to provide the
pipemethod for iterators.