Expand description
A channel that tracks message delivery.
This channel provides message delivery tracking. Each sent message includes a Guard that tracks when the message has been fully processed. When ALL references to the guard are dropped, the message is marked as delivered.
§Features
- Watermarks: Get the highest sequence number where all messages up to it have been delivered
- Batches: Assign batches to messages and track pending counts per batch
- Clonable Guards: Guards can be cloned and shared; delivery happens when all clones are dropped
§Sequence Number Overflow
Uses u64 for sequence numbers. At 100 messages per nanosecond, overflow occurs after ~5.85 years. Systems requiring more message throughput should implement periodic resets or use external sequence management.
§Example
use futures::executor::block_on;
use commonware_utils::channels::tracked;
block_on(async {
let (mut sender, mut receiver) = tracked::bounded::<String, u64>(10);
// Send a message with batch ID
let sequence = sender.send(Some(1), "hello".to_string()).await.unwrap();
// Check pending messages
assert_eq!(sender.pending(1), 1);
assert_eq!(sender.watermark(), 0);
// Receive and process
let msg = receiver.recv().await.unwrap();
assert_eq!(msg.data, "hello");
// Clone the guard - delivery won't happen until all clones are dropped
let guard_clone = msg.guard.clone();
drop(msg.guard);
assert_eq!(sender.watermark(), 0); // Still not delivered
// Drop the last guard reference to mark as delivered
drop(guard_clone);
assert_eq!(sender.pending(1), 0);
assert_eq!(sender.watermark(), 1);
});
Structs§
- Guard
- A guard that tracks message delivery. When dropped, the message is marked as delivered.
- Message
- A message containing data and a Guard that tracks delivery.
- Receiver
- A receiver that wraps FutReceiver and provides tracked messages.
- Sender
- A sender that wraps
Sender
and tracks message delivery.
Functions§
- bounded
- Create a new bounded channel with delivery tracking.