bus
Bus provides a lock-free, bounded, single-producer, multi-consumer, broadcast channel.
It uses a circular buffer and atomic instructions to implement a lock-free single-producer,
multi-consumer channel. The interface is similar to that of the std::sync::mpsc
channels,
except that multiple consumers (readers of the channel) can be produced, whereas only a single
sender can exist. Furthermore, in contrast to most multi-consumer FIFO queues, bus is
broadcast; every send goes to every consumer.
I haven't seen this particular implementation in literature (some extra bookkeeping is necessary to allow multiple consumers), but a lot of related reading can be found in Ross Bencina's blog post "Some notes on lock-free and wait-free algorithms".
Bus achieves broadcast by cloning the element in question, which is why T
must implement
Clone
. However, Bus is clever about only cloning when necessary. Specifically, the last
consumer to see a given value will move it instead of cloning, which means no cloning is
happening for the single-consumer case. For cases where cloning is expensive, Arc
should be
used instead.
In a single-producer, single-consumer setup (which is the only one that Bus and
mpsc::sync_channel
both support), Bus gets ~2x the performance of mpsc::sync_channel
on
my machine. YMMV. You can check your performance on Nightly using
$ cargo bench --features bench
To see multi-consumer results, run the benchmark utility instead (should work on stable too)
$ cargo build --bin bench --release
$ target/release/bench
Examples
Single-send, multi-consumer example
use Bus;
let mut bus = new;
let mut rx1 = bus.add_rx;
let mut rx2 = bus.add_rx;
bus.broadcast;
assert_eq!;
assert_eq!;
Multi-send, multi-consumer example
use Bus;
use thread;
let mut bus = new;
let mut rx1 = bus.add_rx;
let mut rx2 = bus.add_rx;
// start a thread that sends 1..100
let j = spawn;
// every value should be received by both receivers
for i in 1..100
j.join.unwrap;
Many-to-many channel using a dispatcher
use Bus;
use thread;
use mpsc;
// set up fan-in
let = sync_channel;
let tx2 = tx1.clone;
// set up fan-out
let mut mix_tx = new;
let mut rx1 = mix_tx.add_rx;
let mut rx2 = mix_tx.add_rx;
// start dispatcher
spawn;
// sends on tx1 are received ...
tx1.send.unwrap;
// ... by both receiver rx1 ...
assert_eq!;
// ... and receiver rx2
assert_eq!;
// same with sends on tx2
tx2.send.unwrap;
assert_eq!;
assert_eq!;