pub struct ThreadPool<P>where
P: PoolItem,{ /* private fields */ }Expand description
A pool of threads for managing stateful PoolItem instances.
ThreadPool is the main entry point for this library. It:
- Spawns a fixed number of worker threads
- Distributes pool items across threads based on their IDs
- Routes messages to the correct thread for processing
- Ensures sequential message processing per pool item (no concurrent access)
§Creating a Thread Pool
use messaging_thread_pool::{ThreadPool, samples::Randoms};
// Create a pool with 4 worker threads
let pool = ThreadPool::<Randoms>::new(4);
// The pool is now ready to accept messages
assert_eq!(pool.thread_count(), 4);§Creating Pool Items
Pool items are created by sending initialization requests:
use messaging_thread_pool::{ThreadPool, samples::*};
let pool = ThreadPool::<Randoms>::new(4);
// Create a single item with ID 0
pool.send_and_receive_once(RandomsAddRequest(0)).expect("pool available");
// Or create many at once (IDs 1-99, since 0 already exists)
pool.send_and_receive((1..100u64).map(RandomsAddRequest))
.expect("pool available")
.for_each(|response| assert!(response.result().is_ok()));§Sending Messages
Once items exist, send messages to interact with them:
use messaging_thread_pool::{ThreadPool, samples::*};
let pool = ThreadPool::<Randoms>::new(4);
pool.send_and_receive_once(RandomsAddRequest(1)).expect("pool available");
// Send a single message
let response: MeanResponse = pool
.send_and_receive_once(MeanRequest(1))
.expect("pool available");
// Send multiple messages (processed in parallel across threads)
// First create items 0-9
pool.send_and_receive((0..10u64).filter(|id| *id != 1).map(RandomsAddRequest))
.expect("pool available")
.for_each(|_| {});
let responses: Vec<SumResponse> = pool
.send_and_receive((0..10u64).map(SumRequest))
.expect("pool available")
.collect();§Removing Items
Items can be removed explicitly:
use messaging_thread_pool::{ThreadPool, samples::*, RemovePoolItemRequest};
let pool = ThreadPool::<Randoms>::new(4);
pool.send_and_receive_once(RandomsAddRequest(1)).expect("pool available");
let response = pool
.send_and_receive_once(RemovePoolItemRequest(1))
.expect("pool available");
assert!(response.item_existed());§Shutdown
The pool shuts down automatically when dropped, or explicitly via shutdown:
use messaging_thread_pool::{ThreadPool, samples::Randoms};
let pool = ThreadPool::<Randoms>::new(4);
// ... use the pool ...
// Explicit shutdown (returns shutdown responses from items)
let responses = pool.shutdown();
// Or just drop it (implicit shutdown)
// drop(pool);§Thread Distribution
Items are assigned to threads using id % thread_count. All messages for the same
ID go to the same thread. This ensures:
- No concurrent access to the same pool item
- Sequential message ordering per item
- Predictable data locality
§Thread Safety
ThreadPool is Send + Sync and implements SenderAndReceiver, making it
suitable for:
- Sharing across multiple threads (e.g., via
Arc<ThreadPool<P>>) - Use in async contexts
- Nested thread pool patterns (see
samples::RandomsBatch)
Implementations§
Source§impl<P> ThreadPool<P>where
P: PoolItem + 'static,
impl<P> ThreadPool<P>where
P: PoolItem + 'static,
Sourcepub fn new(thread_pool_size: u64) -> Self
pub fn new(thread_pool_size: u64) -> Self
This function creates a new ThreadPool
Internally it creates a collection of threads. It has the ability to communicate with the threads via a vec of channels (there is one channel for each spawned thread)
The number of threads is determined by the passed in thread_pool_size
Source§impl<P> ThreadPool<P>where
P: PoolItem,
impl<P> ThreadPool<P>where
P: PoolItem,
Sourcepub fn send_and_receive<T>(
&self,
requests: impl Iterator<Item = T>,
) -> Result<impl Iterator<Item = T::Response>, SendError<SenderCouplet<P>>>where
T: RequestWithResponse<P> + IdTargeted,
pub fn send_and_receive<T>(
&self,
requests: impl Iterator<Item = T>,
) -> Result<impl Iterator<Item = T::Response>, SendError<SenderCouplet<P>>>where
T: RequestWithResponse<P> + IdTargeted,
This function sends a request to a worker thread and receives a response back
The request is received as an iterator and the responses are received back as an iterator
pub fn send_and_receive_once<T>(
&self,
request: T,
) -> Result<T::Response, SendError<SenderCouplet<P>>>where
T: RequestWithResponse<P> + IdTargeted,
Source§impl<P> ThreadPool<P>where
P: PoolItem,
impl<P> ThreadPool<P>where
P: PoolItem,
Sourcepub fn shutdown(&self) -> Vec<ThreadShutdownResponse>
pub fn shutdown(&self) -> Vec<ThreadShutdownResponse>
This function requests that the thread pool shutdowns It sends the shutdown message to each of it’s contained PoolThreads The sending of this message should cause the message loop to exit and the thread to end
Source§impl<P> ThreadPool<P>where
P: PoolItem,
impl<P> ThreadPool<P>where
P: PoolItem,
Sourcepub fn thread_count(&self) -> usize
pub fn thread_count(&self) -> usize
Returns the number of worker threads in this pool.
This is the value passed to new during construction.
Trait Implementations§
Source§impl<P> Debug for ThreadPool<P>
impl<P> Debug for ThreadPool<P>
Source§impl<P> Drop for ThreadPool<P>where
P: PoolItem,
impl<P> Drop for ThreadPool<P>where
P: PoolItem,
Source§impl<P> SenderAndReceiver<P> for ThreadPool<P>where
P: PoolItem,
An implementation of the SenderAndReceiver trait for ThreadPool.
impl<P> SenderAndReceiver<P> for ThreadPool<P>where
P: PoolItem,
An implementation of the SenderAndReceiver trait for ThreadPool.