affinitypool
A threadpool for running blocking jobs on a dedicated thread pool. Blocking tasks can be sent asynchronously to the pool, where the task will be queued until a worker thread is free to process the task. Tasks are processed in a FIFO order.
For optimised workloads, the affinity of each thread can be specified, ensuring that each thread can request to be pinned to a certain CPU core, allowing for more parallelism, and better performance guarantees for blocking workloads.
Examples
Basic Usage
Create a threadpool and spawn tasks that run on worker threads:
use affinitypool::Threadpool;
#[tokio::main]
async fn main() {
let pool = Threadpool::new(4);
let result = pool.spawn(|| {
println!("Hello from a worker thread!");
42
}).await;
assert_eq!(result, 42);
}
Using the Builder
Configure the threadpool with custom settings:
use affinitypool::Builder;
#[tokio::main]
async fn main() {
let pool = Builder::new()
.worker_threads(8) .thread_name("my-worker") .thread_stack_size(4_000_000) .build();
let mut handles = Vec::new();
for i in 0..100 {
handles.push(pool.spawn(move || {
let mut sum = 0u64;
for j in 0..1_000_000 {
sum = sum.wrapping_add((i * j) as u64);
}
sum
}));
}
for handle in handles {
let result = handle.await;
println!("Task completed with result: {result}");
}
}
CPU Affinity
Pin each worker thread to a specific CPU core for optimal performance:
use affinitypool::Builder;
#[tokio::main]
async fn main() {
let pool = Builder::new()
.thread_per_core(true)
.build();
for i in 0..100 {
pool.spawn(move || {
println!("Task {i} running on dedicated CPU core");
}).await;
}
}
Global Threadpool
Set up a global threadpool that can be accessed from anywhere:
use affinitypool::{Threadpool, spawn};
#[tokio::main]
async fn main() {
let pool = Threadpool::new(4);
pool.build_global().expect("Global threadpool already initialized");
let result = spawn(|| {
std::thread::sleep(std::time::Duration::from_millis(100));
"completed"
}).await;
assert_eq!(result, "completed");
process_data().await;
}
async fn process_data() {
let result = spawn(|| {
vec![1, 2, 3, 4, 5].iter().sum::<i32>()
}).await;
println!("Sum: {result}");
}
Local Spawning
Use spawn_local when you need to borrow data without the 'static lifetime requirement:
use affinitypool::Threadpool;
#[tokio::main]
async fn main() {
let pool = Threadpool::new(4);
let data = vec![1, 2, 3, 4, 5];
let multiplier = 10;
let result = pool.spawn_local(|| {
data.iter()
.map(|x| x * multiplier)
.collect::<Vec<_>>()
}).await;
println!("Result: {result:?}");
println!("Original data: {data:?}");
}
Handling Multiple Concurrent Tasks
Process multiple blocking tasks concurrently:
use affinitypool::Threadpool;
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
#[tokio::main]
async fn main() {
let pool = Threadpool::new(4);
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for i in 0..100 {
let counter = counter.clone();
handles.push(pool.spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(10));
counter.fetch_add(1, Ordering::SeqCst);
format!("Task {i} completed")
}));
}
for handle in handles {
let result = handle.await;
println!("{result}");
}
assert_eq!(counter.load(Ordering::SeqCst), 100);
println!("All tasks completed!");
}
Original
This code is heavily inspired by threadpool, with the CPU-based affinity code forked originally from core-affinity. Both are licensed under the Apache License 2.0 and MIT licenses.