ThreadPool

Struct ThreadPool 

Source
pub struct ThreadPool<P>
where P: PoolItem,
{ /* private fields */ }
Expand description

A pool of threads for managing stateful PoolItem instances.

ThreadPool is the main entry point for this library. It:

  • Spawns a fixed number of worker threads
  • Distributes pool items across threads based on their IDs
  • Routes messages to the correct thread for processing
  • Ensures sequential message processing per pool item (no concurrent access)

§Creating a Thread Pool

use messaging_thread_pool::{ThreadPool, samples::Randoms};

// Create a pool with 4 worker threads
let pool = ThreadPool::<Randoms>::new(4);

// The pool is now ready to accept messages
assert_eq!(pool.thread_count(), 4);

§Creating Pool Items

Pool items are created by sending initialization requests:

use messaging_thread_pool::{ThreadPool, samples::*};

let pool = ThreadPool::<Randoms>::new(4);

// Create a single item with ID 0
pool.send_and_receive_once(RandomsAddRequest(0)).expect("pool available");

// Or create many at once (IDs 1-99, since 0 already exists)
pool.send_and_receive((1..100u64).map(RandomsAddRequest))
    .expect("pool available")
    .for_each(|response| assert!(response.result().is_ok()));

§Sending Messages

Once items exist, send messages to interact with them:

use messaging_thread_pool::{ThreadPool, samples::*};

let pool = ThreadPool::<Randoms>::new(4);
pool.send_and_receive_once(RandomsAddRequest(1)).expect("pool available");

// Send a single message
let response: MeanResponse = pool
    .send_and_receive_once(MeanRequest(1))
    .expect("pool available");

// Send multiple messages (processed in parallel across threads)
// First create items 0-9
pool.send_and_receive((0..10u64).filter(|id| *id != 1).map(RandomsAddRequest))
    .expect("pool available")
    .for_each(|_| {});
let responses: Vec<SumResponse> = pool
    .send_and_receive((0..10u64).map(SumRequest))
    .expect("pool available")
    .collect();

§Removing Items

Items can be removed explicitly:

use messaging_thread_pool::{ThreadPool, samples::*, RemovePoolItemRequest};

let pool = ThreadPool::<Randoms>::new(4);
pool.send_and_receive_once(RandomsAddRequest(1)).expect("pool available");

let response = pool
    .send_and_receive_once(RemovePoolItemRequest(1))
    .expect("pool available");
assert!(response.item_existed());

§Shutdown

The pool shuts down automatically when dropped, or explicitly via shutdown:

use messaging_thread_pool::{ThreadPool, samples::Randoms};

let pool = ThreadPool::<Randoms>::new(4);
// ... use the pool ...

// Explicit shutdown (returns shutdown responses from items)
let responses = pool.shutdown();

// Or just drop it (implicit shutdown)
// drop(pool);

§Thread Distribution

Items are assigned to threads using id % thread_count. All messages for the same ID go to the same thread. This ensures:

  • No concurrent access to the same pool item
  • Sequential message ordering per item
  • Predictable data locality

§Thread Safety

ThreadPool is Send + Sync and implements SenderAndReceiver, making it suitable for:

  • Sharing across multiple threads (e.g., via Arc<ThreadPool<P>>)
  • Use in async contexts
  • Nested thread pool patterns (see samples::RandomsBatch)

Implementations§

Source§

impl<P> ThreadPool<P>
where P: PoolItem + 'static,

Source

pub fn new(thread_pool_size: u64) -> Self

This function creates a new ThreadPool

Internally it creates a collection of threads. It has the ability to communicate with the threads via a vec of channels (there is one channel for each spawned thread)

The number of threads is determined by the passed in thread_pool_size

Source§

impl<P> ThreadPool<P>
where P: PoolItem,

Source

pub fn send_and_receive<T>( &self, requests: impl Iterator<Item = T>, ) -> Result<impl Iterator<Item = T::Response>, SendError<SenderCouplet<P>>>

This function sends a request to a worker thread and receives a response back

The request is received as an iterator and the responses are received back as an iterator

Source

pub fn send_and_receive_once<T>( &self, request: T, ) -> Result<T::Response, SendError<SenderCouplet<P>>>

Source§

impl<P> ThreadPool<P>
where P: PoolItem,

Source

pub fn shutdown(&self) -> Vec<ThreadShutdownResponse>

This function requests that the thread pool shutdowns It sends the shutdown message to each of it’s contained PoolThreads The sending of this message should cause the message loop to exit and the thread to end

Source§

impl<P> ThreadPool<P>
where P: PoolItem,

Source

pub fn thread_count(&self) -> usize

Returns the number of worker threads in this pool.

This is the value passed to new during construction.

Trait Implementations§

Source§

impl<P> Debug for ThreadPool<P>
where P: PoolItem + Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<P> Drop for ThreadPool<P>
where P: PoolItem,

Source§

fn drop(&mut self)

implement drop to shutdown all of the thread pools threads

Source§

impl<P> SenderAndReceiver<P> for ThreadPool<P>
where P: PoolItem,

An implementation of the SenderAndReceiver trait for ThreadPool.

Source§

fn send_and_receive<'a, T>( &'a self, requests: impl Iterator<Item = T> + 'a, ) -> Result<Box<dyn Iterator<Item = T::Response> + 'a>, SendError<SenderCouplet<P>>>
where T: RequestWithResponse<P> + IdTargeted + 'a,

Send multiple requests and receive their responses. Read more
Source§

fn send_and_receive_one<'a, T>( &'a self, request: T, ) -> Result<T::Response, SendError<SenderCouplet<P>>>
where T: RequestWithResponse<P> + IdTargeted + 'a,

Convenience method for sending a single request and receiving its response. Read more

Auto Trait Implementations§

§

impl<P> !Freeze for ThreadPool<P>

§

impl<P> RefUnwindSafe for ThreadPool<P>

§

impl<P> Send for ThreadPool<P>

§

impl<P> Sync for ThreadPool<P>

§

impl<P> Unpin for ThreadPool<P>

§

impl<P> UnwindSafe for ThreadPool<P>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more