Crate burst_pool [] [src]

An SPMC channel with low, consistent latency, designed for cases where values arrive in a batch and must be processed now-or-never.

burst-pool differs from a regular spmc channel in one major way: sending fails when all workers are currently busy. This lets the sender distribute as much work as possible, and then handle overflow work differently (probably by throwing it away).

Performance

tl;dr: Same ballpark as spmc. Best when the number of workers is less than number of cores.

I'm using the excellent spmc crate as a benchmark. spmc provides a reliable unbounded channel, and when the pool is overloaded it has the normal "queueing" semantics (as opposed to burst-pool's "return-to-sender" semantics). The metric we care about is enqueuerecv latency. Below are some kernel density estimates. The "n/m" notation in the labels means "n messages sent to m workers".

Here's my interpretation of the numbers:

These observations are consistent with the expected performance characteristics. (See Design, below.)

Run cargo bench to make some measurements on your own machine; there's also a gnuplot script which you can use to visualise the results. (Pro tip: If your results are significantly worse than those above, your kernel might be powering down CPU cores too eagerly. If you care about latency more than battery life, consider setting max_cstate = 0.)

Usage

The API requires two calls to actually send a message to a receiver: first you place your message in the mailbox of one of your workers (enqueue); then you wake the worker up, notifying it that it should check its mailbox (wake_all). A call to enqueue will only succeed if the sender can find an empty mailbox to put the message in (ie., at least one of your workers must currently be blocking on a call to recv). If enqueue fails, you get your message back. If it succeeds, the message will be recieved by exactly one receiver the next time you call wake_all.

// Create a handle for sending strings
let mut sender: Sender<String> = Sender::new();

// Create a handle for receiving strings, and pass it off to a worker thread.
// Repeat this step as necessary.
let mut receiver: Receiver<String> = sender.mk_receiver();
let th = std::thread::spawn(move ||
    loop {
        let x = receiver.recv().unwrap();
        println!("{}", x);
    }
);

// Give the worker thread some time to spawn
sleep_ms(10);

// Send a string to the worker and unblock it
sender.enqueue(Box::new(String::from("hello")));
sender.wake_all();
sleep_ms(10);       // wait for it to process the first string

// Send another string
sender.enqueue(Box::new(String::from("world!")));
sender.wake_all();
sleep_ms(10);

// Drop the send handle, signalling the worker to shutdown
std::mem::drop(sender);
th.join().unwrap_err();  // RecvError::Orphaned

Design

Each receiver has a "slot" which is either empty, blocked, or contains a pointer to some work. Whenever a receiver's slot is empty, it goes to sleep by polling an eventfd. When issuing work, the sender goes through the slots round-robin, placing work in the empty ones. It then signals the eventfd, waking up all sleeping receivers. If a receiver wakes up and finds work in its slot, it takes the work and blocks its slot. If a receivers wakes up and finds its slot is still empty, it goes back to sleep. When a receiver has finished processing the work, it unblocks its slot.

This design means that we expect enqueuerecv latencies to be independent of the number of payloads sent. However, we expect it to become much worse as soon as there are more worker threads than cores available. The benchmark results are consistent with these expectations.

Portability

This crate is Linux-only.

Structs

Receiver
Sender

Enums

RecvError