Expand description
A sticky channel implementation for Tokio that routes messages to specific receivers based on ID hashing.
This crate provides message passing channels where messages with the same ID are consistently delivered to the same receiver. This is useful for workload distribution scenarios where you need to ensure that related messages are processed by the same consumer for ordering guarantees or stateful processing.
§Key Features
- Deterministic routing: Messages with the same ID always go to the same receiver
- Multiple producers: Senders can be cloned and used from multiple threads
- Async and sync receiving: Support for both
asyncand non-blocking receive operations - Cancel-safe: All operations work correctly with
tokio::select! - Bounded and unbounded channels: Choose between memory-bounded or unbounded channels
§Channel Types
§Unbounded Sticky Channel
use tokio_sticky_channel::unbounded_sticky_channel;
use std::num::NonZeroUsize;
#[tokio::main]
async fn main() {
// Create an unbounded sticky channel with 3 consumers
let (sender, mut receivers) = unbounded_sticky_channel::<&str, i32>(
NonZeroUsize::new(3).unwrap()
);
// Send messages with IDs - same ID always goes to same receiver
sender.send("user-123", 42).unwrap();
sender.send("user-456", 24).unwrap();
sender.send("user-123", 84).unwrap(); // Same receiver as first message
// Receive messages from different consumers
for receiver in &mut receivers {
if let Some(message) = receiver.try_recv().ok() {
println!("Received: {}", message);
}
}
}§Bounded Sticky Channel
use tokio_sticky_channel::sticky_channel;
use std::num::NonZeroUsize;
#[tokio::main]
async fn main() {
// Create a bounded sticky channel with 3 consumers and capacity of 100 per channel
let (sender, mut receivers) = sticky_channel::<&str, i32>(
NonZeroUsize::new(3).unwrap(),
100
);
// Send messages with IDs - will block if target channel is full
sender.send("user-123", 42).await.unwrap();
sender.send("user-456", 24).await.unwrap();
// Try to send without blocking - returns error if channel is full
match sender.try_send("user-789", 99) {
Ok(_) => println!("Message sent successfully"),
Err(e) => println!("Failed to send: {}", e),
}
// Drop sender to close channels
drop(sender);
// Receive messages from all receivers
for receiver in &mut receivers {
while let Some(message) = receiver.recv().await {
println!("Received: {}", message);
}
}
}§Architecture
The sticky channel uses consistent hashing to route messages:
- Senders: Compute
hash(id) % num_consumersto determine the target receiver - Internal channels: Each consumer has its own MPSC channel (bounded or unbounded)
- Receivers: Wrap Tokio’s receivers with additional convenience methods
§Performance Considerations
- Unbounded channels: Memory usage can grow if consumers can’t keep up
- Bounded channels: Provide backpressure but may block senders when full
- Hashing overhead: Each send operation computes a hash of the ID
- Load distribution: Hash distribution may not be perfectly even across consumers
Structs§
- Receiver
- Receive values from the associated
Sender. - Sender
- Send values to the associated
Receiver. - Unbounded
Receiver - Receive values from the associated
UnboundedSender. - Unbounded
Sender - Send values to the associated
UnboundedReceiver.
Enums§
- Send
Error - Error type for sending messages through the
UnboundedSender::sendandSender::send. - TryRecv
Error - Error type for receiving messages through
UnboundedReceiver::try_recvandReceiver::try_recv.
Functions§
- sticky_
channel - Creates a bounded sticky channel with the specified number of consumers, capacity and default hasher
(
RandomState). - sticky_
channel_ with_ hasher - Creates a bounded sticky channel with the specified number of consumers, capacity and a
BuildHasher. - unbounded_
sticky_ channel - Creates a sticky channel with the specified number of consumers and default hasher (
RandomState). - unbounded_
sticky_ channel_ with_ hasher - Creates a sticky channel with the specified number of consumers and a
BuildHasher.