Expand description

§Messaging thread pool

Messaging thread pool is a collection of traits and structs for setting up a simple fix sized thread pool which holds a collection of a given type.

Instances of the objects are identified by an id which is unique within the thread pool.

Objects are distributed across the thread pool based on their id and ownership of the object is held there

Objects are communicated with via a user defined set of messages which effectively form an api. These messages are sent and received over crossbeam channels.

The object need to implement a set of simple traits and define a set of request/response messages to allow the thread pool infrastructure to handle the objects and to route messages to them.

The lifetimes of the objects are easy to reason about as is the behaviour of the thread pools themselves.

The original motivation was to provide support for a hierarchy of dependent, long-lived objects, that each required their own thread pools to avoid complex threading dependencies The objects in the thread pools were all CPU bound i.e. did not perform any significant I/O

§Example

use std::iter;
use messaging_thread_pool::{*, samples::*};

   // creates a thread pool with 4 threads.
   // The lifetime of the elements created (the Randoms in this case) will be tied to the
   // life of this struct
   let thread_pool = ThreadPool::<Randoms>::new(10);

   // create a 1000 Randoms across the thread pool by sending a thousand add requests.
   // The creation of these objects (with the keys 0..1000) will be distributed across
   // the 10 threads in the pool.
   // Their owning thread will create and store them.
   // They will not be dropped until they are either requested to be dropped or until the
   // thread pool itself is dropped.
   thread_pool
       .send_and_receive((0..1000usize).map(|i| RandomsAddRequest(i)))
       .expect("thread pool to be available")
       .for_each(|response: AddResponse| assert!(response.result().is_ok()));

   // now create 1000 messages asking them for the sum of the Randoms objects contained
   // random numbers.
   // The message will be routed to the thread to where the targeted object resides
   // This call will block until all of the work is done and the responses returned
   let sums: Vec<SumResponse> = thread_pool
       .send_and_receive((0..1000usize).map(|i| SumRequest(i)))
       .expect("thread pool to be available")
       .collect();
   assert_eq!(1000, sums.len());

   // get the mean of the randoms for object with id 0, this will execute on thread 0
   // this call will block until complete
   let mean_response_0: MeanResponse = thread_pool
       .send_and_receive(iter::once(MeanRequest(0)))
       .expect("thread pool to be available")
       .nth(0)
       .unwrap();
   println!("{}", mean_response_0.mean());

   // remove object with id 1
   // it will be dropped from the thread where it was residing
   thread_pool
       .send_and_receive(iter::once(RemovePoolItemRequest(1)))
       .expect("thread pool to be available")
       .for_each(|response: RemovePoolItemResponse| assert!(response.item_existed()));

   // add a new object with id 1000
   thread_pool
       .send_and_receive(iter::once(RandomsAddRequest(1000)))
       .expect("thread pool to be available")
       .for_each(|response: AddResponse| assert!(response.result().is_ok()));

   // all objects are dropped when the basic thread pool batcher is dropped
   // the threads are shutdown and joined back the the main thread
   drop(thread_pool);

§Limitations

The thread pool cannot be dynamically sized.
It is fixed at creation.
As there is a ThreadShutdown request it could be implied that therefore there should be a ThreadCreation request. This is not the case and it is not intended that individual threads will be shutdown in isolation and in fact this will lead to the thread pool panicking.
The shutdown request is intended to be called only when the whole thread pool is finished with and in fact it is probably best to avoid using it and to just drop the thread pool (which internally sends out all the required shutdown messages).

It was not really intended for anything other than long-lived CPU bound elements.

Re-exports§

Modules§

Structs§

Enums§

  • This enum holds either a request or it’s associated response
  • This enum defines all of the messages that can be used to communicate with the thread pool. Each element of the enum takes a RequestResponse struct which can contain either a request or a response

Traits§

  • A trait that is auto implemented for all types that is used to allow for multiple drop guards (or different types) to be returned in a single vec
  • This trait is implemented by requests that are targeted at a pool item with an id and by the corresponding responses coming back from said pool item.
    This trait is used internally by the thread pool to route requests to the appropriate thread in the thread pool.
  • This is the trait that needs to be implemented by a struct in order that it can be managed by the thread pool infrastructure
  • This trait allows for the pairing of requests and responses
  • This trait allows a consumer to use a trait instead of the concrete implementation of thread pool.\ Unfortunately the send_and_receive are not a precise match for corresponding function in crate::ThreadPool itself. This is because of the limitation of the trait return types (it has to return a boxed iterator)
  • This trait is useful when multiple levels are thread pools are used and each thread pool needs to be send and sync in order to be sent through the levels