Module tracked

Module tracked 

Source
Expand description

A channel that tracks message delivery.

This channel provides message delivery tracking. Each sent message includes a Guard that tracks when the message has been fully processed. When ALL references to the guard are dropped, the message is marked as delivered.

§Features

  • Watermarks: Get the highest sequence number where all messages up to it have been delivered
  • Batches: Assign batches to messages and track pending counts per batch
  • Clonable Guards: Guards can be cloned and shared; delivery happens when all clones are dropped

§Sequence Number Overflow

Uses u64 for sequence numbers. At 100 messages per nanosecond, overflow occurs after ~5.85 years. Systems requiring more message throughput should implement periodic resets or use external sequence management.

§Example

use futures::executor::block_on;
use commonware_utils::channels::tracked;
block_on(async {
    let (mut sender, mut receiver) = tracked::bounded::<String, u64>(10);
    // Send a message with batch ID
    let sequence = sender.send(Some(1), "hello".to_string()).await.unwrap();
    // Check pending messages
    assert_eq!(sender.pending(1), 1);
    assert_eq!(sender.watermark(), 0);
    // Receive and process
    let msg = receiver.recv().await.unwrap();
    assert_eq!(msg.data, "hello");
    // Clone the guard - delivery won't happen until all clones are dropped
    let guard_clone = msg.guard.clone();
    drop(msg.guard);
    assert_eq!(sender.watermark(), 0); // Still not delivered
    // Drop the last guard reference to mark as delivered
    drop(guard_clone);
    assert_eq!(sender.pending(1), 0);
    assert_eq!(sender.watermark(), 1);
});

Structs§

Guard
A guard that tracks message delivery. When dropped, the message is marked as delivered.
Message
A message containing data and a Guard that tracks delivery.
Receiver
A receiver that wraps FutReceiver and provides tracked messages.
Sender
A sender that wraps Sender and tracks message delivery.

Functions§

bounded
Create a new bounded channel with delivery tracking.