Crate clocked_dispatch

source ·
Expand description

Provides a message dispatch service where each receiver is aware of messages passed to other peers. In particular, if a message is sent to some receiver r, another receiver r' will be aware that one message has been dispatched when it does a subsequent read. Furthermore, the dispatcher ensures that messages are delivered in order by not emitting data until all input sources have confirmed that they will not send data with lower sequence numbers.

The library ensures that a sender will not block due to the slowness of a receiver that is not the intended recipient of the message in question. For example, if there are two receivers, r and r', r.send(v) will not block even though r' is not currently reading from its input channel.

The library is implemented by routing all messages through a single dispatcher. This central dispatcher operates in one of two modes, forwarding or serializing.

  • In serializing mode, it assigns a monotonically increasing timestamp to each message, and forwards it to the intended recipient’s queue.
  • In forwarding mode, it accepts timestamped messages from sources, and outputs them to the intended recipients in order. Messages are buffered by the dispatcher until each of the receiver’s sources are at least as up-to-date as the message’s timestamp. These timestamps must be sequentially assigned, but may be sent to the dispatcher in any order. The dispatcher guarantees that they are delivered in-order.

This dual-mode operation allows dispatchers to be composed in a hierarchical fashion, with a serializing dispatcher at the “top”, and forwarding dispatchers “below”.

Examples:

Simple usage:

use std::thread;
use clocked_dispatch;

// Create a dispatcher
let d = clocked_dispatch::new(1);

// Create a simple streaming channel
let (tx, rx) = d.new("atx1", "arx");
thread::spawn(move|| {
    tx.send(10);
});
assert_eq!(rx.recv().unwrap().0.unwrap(), 10);

Shared usage:

use std::thread;
use clocked_dispatch;

// Create a dispatcher.
// Notice that we need more buffer space to the dispatcher here.
// This is because clone() needs to talk to the dispatcher, but the buffer to the dispatcher
// may already have been filled up by the sends in the threads we spawned.
let d = clocked_dispatch::new(10);

// Create a shared channel that can be sent along from many threads
// where tx is the sending half (tx for transmission), and rx is the receiving
// half (rx for receiving).
let (tx, rx) = d.new("atx", "arx");
for i in 0..10 {
    let tx = tx.clone(format!("atx{}", i));
    thread::spawn(move|| {
        tx.send(i);
    });
}

for _ in 0..10 {
    let j = rx.recv().unwrap().0.unwrap();
    assert!(0 <= j && j < 10);
}

Accessing timestamps:

use clocked_dispatch;
let m = clocked_dispatch::new(10);
let (tx_a, rx_a) = m.new("atx1", "a");

// notice that we can't use _ here even though tx_b is unused because
// then tx_b would be dropped, causing rx_b to be closed immediately
let (tx_b, rx_b) = m.new("btx1", "b");
let _ = tx_b;

tx_a.send("a1");
let x = rx_a.recv().unwrap();
assert_eq!(x.0, Some("a1"));
assert_eq!(rx_b.recv(), Ok((None, x.1)));

tx_a.send("a2");
tx_a.send("a3");

let a1 = rx_a.recv().unwrap();
assert_eq!(a1.0, Some("a2"));

let a2 = rx_a.recv().unwrap();
assert_eq!(a2.0, Some("a3"));

// b must see the timestamp from either a1 or a2
// it could see a1 if a2 hasn't yet been delivered
let b = rx_b.recv().unwrap();
assert_eq!(b.0, None);
assert!(b.1 == a1.1 || b.1 == a2.1);

In-order delivery

use clocked_dispatch;
use std::sync::mpsc;

let m = clocked_dispatch::new(10);
let (tx1, rx) = m.new("tx1", "a");
let tx2 = tx1.clone("tx2");

tx1.forward(Some("a1"), 1);
assert_eq!(rx.try_recv(), Err(mpsc::TryRecvError::Empty));

tx2.forward(None, 1);
assert_eq!(rx.recv(), Ok((Some("a1"), 1)));

Structs

A sending half of a clocked synchronous channel that only allows broadcast. This half can only be owned by one thread, but it can be cloned to send to other threads. A ClockedBroadcaster can be constructed from a ClockedSender using ClockedSender::into_broadcaster.
The receiving half of a clocked synchronous channel.
The sending half of a clocked synchronous channel. This half can only be owned by one thread, but it can be cloned to send to other threads.
Dispatch coordinator for adding additional clocked channels.

Functions

Creates a new clocked dispatch. Dispatch channels can be constructed by calling new on the returned dispatcher.
Creates a new clocked dispatch whose automatically assigned sequence numbers start at a given value.