Expand description
§Messaging Thread Pool
A typed thread pool library for managing stateful, long-lived objects that communicate
via messages. Unlike traditional thread pools, objects in this pool are pinned to their
assigned thread for their entire lifetime, enabling the use of non-Send/Sync types
like Rc<RefCell<T>>.
§When to Use This Library
Use messaging_thread_pool when you need:
- Thread-bound state: Objects that own
Rc,RefCell, raw pointers, or FFI resources - Actor-like patterns: Long-lived stateful objects with message-based APIs
- Sequential consistency: Messages to the same object processed in order, no races
- Lock-free operations: No
Mutex/RwLockneeded since each object has single-threaded access
If your data is Send + Sync and you just need parallel computation, consider rayon
instead.
§Quick Start
The recommended approach uses the #[pool_item] attribute macro to minimize boilerplate:
use messaging_thread_pool::{ThreadPool, IdTargeted, pool_item};
// 1. Define your struct with an ID field
#[derive(Debug)]
pub struct Counter {
id: u64,
value: i64,
}
impl IdTargeted for Counter {
fn id(&self) -> u64 { self.id }
}
// 2. Use #[pool_item] on the impl block and #[messaging] on methods
#[pool_item]
impl Counter {
pub fn new(id: u64) -> Self {
Self { id, value: 0 }
}
#[messaging(IncrementRequest, IncrementResponse)]
pub fn increment(&mut self, amount: i64) -> i64 {
self.value += amount;
self.value
}
#[messaging(GetValueRequest, GetValueResponse)]
pub fn get_value(&self) -> i64 {
self.value
}
}
// 3. Create a thread pool and interact with pool items
let pool = ThreadPool::<Counter>::new(4);
// Create a counter with ID 1
pool.send_and_receive_once(CounterInit(1)).expect("pool available");
// Increment it
let response: IncrementResponse = pool
.send_and_receive_once(IncrementRequest(1, 10))
.expect("pool available");
assert_eq!(response.result, 10);
// Get current value
let response: GetValueResponse = pool
.send_and_receive_once(GetValueRequest(1))
.expect("pool available");
assert_eq!(response.result, 10);§Key Concepts
§Pool Items
A pool item is any struct that implements the PoolItem trait. Each pool item:
- Has a unique
u64ID within the pool - Lives on a single thread (determined by
id % thread_count) - Receives messages sequentially via its
process_messagemethod
The #[pool_item] macro generates the PoolItem implementation for you.
§Messages
Communication with pool items happens through request/response messages:
- Each method marked with
#[messaging(RequestType, ResponseType)]becomes a message endpoint - The macro generates the request struct (with ID + method parameters) and response struct
- Messages are routed to the correct thread based on the target ID
§Thread Affinity
Pool items are distributed across threads using id % thread_count. All messages
targeting the same ID go to the same thread, ensuring:
- No concurrent access to the same pool item
- Consistent ordering of message processing
- Warm CPU caches for frequently accessed items
§Using Non-Send/Sync Types
The main advantage of this library is supporting thread-bound types. Here’s an example
using Rc<RefCell<T>> for shared internal state:
use std::cell::RefCell;
use std::rc::Rc;
use messaging_thread_pool::{ThreadPool, IdTargeted, pool_item};
#[derive(Debug, Clone)]
struct Helper {
data: Rc<RefCell<Vec<String>>>,
}
#[derive(Debug)]
pub struct Session {
id: u64,
data: Rc<RefCell<Vec<String>>>,
helper: Helper,
}
impl IdTargeted for Session {
fn id(&self) -> u64 { self.id }
}
#[pool_item]
impl Session {
pub fn new(id: u64) -> Self {
let data = Rc::new(RefCell::new(Vec::new()));
let helper = Helper { data: data.clone() };
Self { id, data, helper }
}
#[messaging(AddRequest, AddResponse)]
pub fn add(&self, item: String) {
// No locks needed - just borrow_mut!
self.helper.data.borrow_mut().push(item);
}
}See samples::UserSession for a complete working example.
§Batch Operations
For efficiency, send multiple requests at once using ThreadPool::send_and_receive:
let pool = ThreadPool::<Randoms>::new(4);
// Create 100 items in parallel
pool.send_and_receive((0..100u64).map(RandomsAddRequest))
.expect("pool available")
.for_each(|response| assert!(response.result().is_ok()));
// Query all of them
let sums: Vec<u128> = pool
.send_and_receive((0..100u64).map(SumRequest))
.expect("pool available")
.map(|r| r.sum())
.collect();§Testing with Mocks
The SenderAndReceiver trait allows mocking the thread pool in tests:
use messaging_thread_pool::{SenderAndReceiver, SenderAndReceiverMock, samples::*};
// Code that depends on a thread pool takes a generic parameter
fn sum_means<T: SenderAndReceiver<Randoms>>(pool: &T, ids: &[u64]) -> u128 {
pool.send_and_receive(ids.iter().map(|id| MeanRequest(*id)))
.expect("pool available")
.map(|r: MeanResponse| r.mean())
.sum()
}
// In tests, use SenderAndReceiverMock
let mock = SenderAndReceiverMock::<Randoms, MeanRequest>::new_with_expected_requests(
vec![MeanRequest(1), MeanRequest(2)],
vec![
MeanResponse { id: 1, result: 100 },
MeanResponse { id: 2, result: 200 },
],
);
assert_eq!(sum_means(&mock, &[1, 2]), 300);See samples for more comprehensive examples, and SenderAndReceiverMock for
mock configuration options.
§The #[pool_item] Macro
The macro accepts optional parameters:
// Custom initialization request type (for complex constructors)
#[pool_item(Init = "MyCustomInitRequest")]
// Custom shutdown handler
#[pool_item(Shutdown = "my_shutdown_method")]
// Both
#[pool_item(Init = "MyCustomInitRequest", Shutdown = "my_shutdown_method")]For details, see the macro documentation in pool_item.
§Legacy API
The api_specification! macro is the older way to define pool items. New code should
use #[pool_item] instead, which is simpler and generates less boilerplate.
§Performance Considerations
The message-passing overhead becomes significant for very short operations (<50ms). This library is best suited for:
- CPU-bound work with moderate to long execution times
- Operations where thread affinity improves cache locality
- Stateful objects that would otherwise require complex locking
§Module Overview
ThreadPool- The main entry point for creating and managing poolsPoolItem- Trait implemented by types managed in the poolIdTargeted- Trait for types that have an ID for routingSenderAndReceiver- Trait for abstracting pool communication (enables mocking)samples- Example implementations to learn fromid_provider- Utilities for generating unique IDs
Re-exports§
pub use request_response::RequestResponse;pub use id_being_processed::*;pub use pool_item::*;pub use sender_couplet::*;pub use thread_request_response::*;
Modules§
- api_
specification - Legacy API Specification Macro
- global_
test_ scope - id_
based_ writer - id_
being_ processed - id_
provider - ID Provider Utilities
- pool_
item - request_
response - samples
- Sample Pool Item Implementations
- sender_
and_ receiver_ raw_ mock - sender_
couplet - thread_
request_ response - Thread Request/Response Types
Macros§
- api_
specification - Generates an API enum and trait implementations for a pool item.
Structs§
- IdBased
Blocking - This struct is used to encapsulate the functionality of the IdBasedBlocking struct
- Sender
AndReceiver Mock - A mock implementation of
SenderAndReceiverfor testing. - Thread
Pool - A pool of threads for managing stateful
PoolIteminstances.
Constants§
Traits§
- IdTargeted
- A trait for types that have an ID used for routing within the thread pool.
- Request
With Response - This trait allows for the pairing of requests and responses.
- Sender
AndReceiver - Trait for types that can send requests to pool items and receive responses.
- Thread
Safe Sender AndReceiver - A thread-safe version of
SenderAndReceiver.
Attribute Macros§
- pool_
item - Attribute macro that generates
PoolItemimplementation and message types.