Crate clocked_dispatch
source ·Expand description
Provides a message dispatch service where each receiver is aware of messages passed to other
peers. In particular, if a message is sent to some receiver r
, another receiver r'
will be
aware that one message has been dispatched when it does a subsequent read. Furthermore, the
dispatcher ensures that messages are delivered in order by not emitting data until all input
sources have confirmed that they will not send data with lower sequence numbers.
The library ensures that a sender will not block due to the slowness of a receiver that is not
the intended recipient of the message in question. For example, if there are two receivers, r
and r'
, r.send(v)
will not block even though r'
is not currently reading from its input
channel.
The library is implemented by routing all messages through a single dispatcher. This central dispatcher operates in one of two modes, forwarding or serializing.
- In serializing mode, it assigns a monotonically increasing timestamp to each message, and forwards it to the intended recipient’s queue.
- In forwarding mode, it accepts timestamped messages from sources, and outputs them to the intended recipients in order. Messages are buffered by the dispatcher until each of the receiver’s sources are at least as up-to-date as the message’s timestamp. These timestamps must be sequentially assigned, but may be sent to the dispatcher in any order. The dispatcher guarantees that they are delivered in-order.
This dual-mode operation allows dispatchers to be composed in a hierarchical fashion, with a serializing dispatcher at the “top”, and forwarding dispatchers “below”.
Examples:
Simple usage:
use std::thread;
use clocked_dispatch;
// Create a dispatcher
let d = clocked_dispatch::new(1);
// Create a simple streaming channel
let (tx, rx) = d.new("atx1", "arx");
thread::spawn(move|| {
tx.send(10);
});
assert_eq!(rx.recv().unwrap().0.unwrap(), 10);
Shared usage:
use std::thread;
use clocked_dispatch;
// Create a dispatcher.
// Notice that we need more buffer space to the dispatcher here.
// This is because clone() needs to talk to the dispatcher, but the buffer to the dispatcher
// may already have been filled up by the sends in the threads we spawned.
let d = clocked_dispatch::new(10);
// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, rx) = d.new("atx", "arx");
for i in 0..10 {
let tx = tx.clone(format!("atx{}", i));
thread::spawn(move|| {
tx.send(i);
});
}
for _ in 0..10 {
let j = rx.recv().unwrap().0.unwrap();
assert!(0 <= j && j < 10);
}
Accessing timestamps:
use clocked_dispatch;
let m = clocked_dispatch::new(10);
let (tx_a, rx_a) = m.new("atx1", "a");
// notice that we can't use _ here even though tx_b is unused because
// then tx_b would be dropped, causing rx_b to be closed immediately
let (tx_b, rx_b) = m.new("btx1", "b");
let _ = tx_b;
tx_a.send("a1");
let x = rx_a.recv().unwrap();
assert_eq!(x.0, Some("a1"));
assert_eq!(rx_b.recv(), Ok((None, x.1)));
tx_a.send("a2");
tx_a.send("a3");
let a1 = rx_a.recv().unwrap();
assert_eq!(a1.0, Some("a2"));
let a2 = rx_a.recv().unwrap();
assert_eq!(a2.0, Some("a3"));
// b must see the timestamp from either a1 or a2
// it could see a1 if a2 hasn't yet been delivered
let b = rx_b.recv().unwrap();
assert_eq!(b.0, None);
assert!(b.1 == a1.1 || b.1 == a2.1);
In-order delivery
use clocked_dispatch;
use std::sync::mpsc;
let m = clocked_dispatch::new(10);
let (tx1, rx) = m.new("tx1", "a");
let tx2 = tx1.clone("tx2");
tx1.forward(Some("a1"), 1);
assert_eq!(rx.try_recv(), Err(mpsc::TryRecvError::Empty));
tx2.forward(None, 1);
assert_eq!(rx.recv(), Ok((Some("a1"), 1)));
Structs
ClockedBroadcaster
can be constructed from a ClockedSender
using ClockedSender::into_broadcaster
.Functions
new
on the
returned dispatcher.