Crate messaging_thread_pool

Source
Expand description

§Messaging thread pool

Messaging thread pool is a collection of traits and structs for setting up a simple fixed-sized thread pool which holds a collection of a given type.

Instances of the objects are identified by an id which is unique within the thread pool.

Objects are distributed across the thread pool based on their id and ownership of the object is held there.

Objects are communicated with via a user defined set of messages which effectively form an API. These messages are sent and received over crossbeam channels.

The objects need to implement a set of simple traits and define a set of request/response messages to allow the thread pool infrastructure to handle the objects and to route messages to them.

The lifetimes of the objects are easy to reason about, as is the behaviour of the thread pools themselves.

The original motivation was to provide support for a hierarchy of dependent, long-lived objects, that each required their own thread pools to avoid complex threading dependencies The objects in the thread pools were all CPU bound i.e. did not perform any significant I/O.

§Example

use std::iter;
use messaging_thread_pool::{*, samples::*};

   // creates a thread pool with 4 threads.
   // The lifetime of the elements created (the Randoms in this case) will be tied to the
   // life of this struct
   let thread_pool = ThreadPool::<Randoms>::new(10);

   // create a 1000 Randoms across the thread pool by sending a thousand add requests.
   // The creation of these objects (with the keys 0..1000) will be distributed across
   // the 10 threads in the pool.
   // Their owning thread will create and store them.
   // They will not be dropped until they are either requested to be dropped or until the
   // thread pool itself is dropped.
   thread_pool
       .send_and_receive((0..1000u64).map(|i| RandomsAddRequest(i)))
       .expect("thread pool to be available")
       .for_each(|response: AddResponse| assert!(response.result().is_ok()));

   // now create 1000 messages asking them for the sum of the Randoms objects contained
   // random numbers.
   // The message will be routed to the thread to where the targeted object resides
   // This call will block until all of the work is done and the responses returned
   let sums: Vec<SumResponse> = thread_pool
       .send_and_receive((0..1000u64).map(|i| SumRequest(i)))
       .expect("thread pool to be available")
       .collect();
   assert_eq!(1000, sums.len());

   // get the mean of the randoms for object with id 0, this will execute on thread 0
   // this call will block until complete
   let mean_response_0: MeanResponse = thread_pool
       .send_and_receive_once(MeanRequest(0))
       .expect("thread pool to be available");
   println!("{}", mean_response_0.mean());

   // remove object with id 1
   // it will be dropped from the thread where it was residing
   assert!(thread_pool
       .send_and_receive_once(RemovePoolItemRequest(1))
       .expect("thread pool to be available")
       .item_existed());

   // add a new object with id 1000
   assert!(thread_pool
       .send_and_receive_once(RandomsAddRequest(1000))
       .expect("thread pool to be available")
       .result().is_ok());

   // all objects are dropped when the basic thread pool batcher is dropped
   // the threads are shutdown and joined back the the main thread
   drop(thread_pool);

§Limitations

The thread pool cannot be dynamically sized.
It is fixed at creation.
As there is a ThreadShutdown request it could be implied that therefore there should be a ThreadCreation request. This is not the case, and it is not intended that individual threads will be shutdown in isolation and in fact this will lead to the thread pool panicking.
The shutdown request is intended to be called only when the whole thread pool is finished with and in fact it is probably best to avoid using it and to just drop the thread pool (which internally sends out all the required shutdown messages).\

It was not really intended for anything other than long-lived CPU bound elements.

Re-exports§

pub use id_being_processed::*;
pub use sender_couplet::*;

Modules§

api_specification
global_test_scope
id_based_writer
id_being_processed
id_provider
new_pool_item_error
samples
sender_and_receiver_raw_mock
sender_couplet

Macros§

api_specification
This macro generates an API enum and implements various generics and conversions for provided types.

Structs§

AddResponse
This struct is returned in response to a request to add a pool item to the thread pool The success field indicates that the pool item was successfully constructed
IdBasedBlocking
This struct is used to encapsulate the functionality of the IdBasedBlocking struct
NewPoolItemError
This is a ‘better than nothing’ error implementation It allows the initialisation code to return error information as a string
RemovePoolItemRequest
A request to remove a pool item
RemovePoolItemResponse
The response received after a request to remove a pool item
SenderAndReceiverMock
This structure enables the mocking of a crate::ThreadPool
ThreadAbortRequest
Request to abort a thread; for testing only
ThreadAbortResponse
Response from a ThreadAbortRequest
ThreadEchoRequest
For debug purposes only send a message to a thread within the thread pool
ThreadEchoResponse
For debug purposes only; a message for responding to an echo request targeting a specific thread
ThreadPool
This struct represents a pool of threads that can target a particular type of resource (a resource being a struct that implements PoolItem)
ThreadShutdownRequest
A request to shutdown a thread pool;
ThreadShutdownResponse
The response received from a ThreadShutdownRequest

Enums§

RequestResponse
This enum holds either a request or it’s associated response
ThreadRequestResponse
This enum defines all of the messages that can be used to communicate with the thread pool. Each element of the enum takes a RequestResponse struct which can contain either a request or a response

Constants§

ID_BEING_PROCESSED

Traits§

IdTargeted
This trait is implemented by requests that are targeted at a pool item with an id and by the corresponding responses coming back from said pool item.
This trait is used internally by the thread pool to route requests to the appropriate thread in the thread pool.
PoolItem
This is the trait that needs to be implemented by a struct in order that it can be managed by the thread pool infrastructure
RequestWithResponse
This trait allows for the pairing of requests and responses
SenderAndReceiver
This trait allows a consumer to use a trait instead of the concrete implementation of thread pool.\ Unfortunately the send_and_receive are not a precise match for corresponding function in crate::ThreadPool itself. This is because of the limitation of the trait return types (it has to return a boxed iterator)
ThreadSafeSenderAndReceiver
This trait is useful when multiple levels are thread pools are used and each thread pool needs to be send and sync in order to be sent through the levels