[][src]Crate tokio_bus

tokio-bus provides a lock-free, bounded, single-produce, multi-consumer, broadcast channel usable as a Sink and Stream with tokio.

The bus implementation itself is the wonderful bus crate, this crate provides a layer on top to allow using the bus with tokio.


use tokio;
use tokio_bus::Bus;
use futures::future::{Future, lazy, ok};
use futures::stream::{Stream, iter_ok};
use futures::sink::Sink;

let mut bus = Bus::new(64);
let rx1 = bus.add_rx();
let rx2 = bus.add_rx();

let send_values = bus
    .send_all(iter_ok::<_, ()>(vec![1, 2, 3, 4, 5, 6]));

let sum_values = rx1
    .fold(0i32, |acc, x| { ok(acc + x) });

let div_values = rx2
    .fold(1f64, |acc, x| { ok(x as f64 / acc) });

let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on_all(lazy(move || {
        .map(|_| {})
        .map_err(|_| { panic!(); })
    assert_eq!(sum_values.wait(), Ok(21));
    assert_eq!(div_values.wait(), Ok(3.2));
    ok::<(), ()>(())



A bus which buffers messages for all of its readers to eventually read. Allows the dynamic addition and removal of readers.


The BusReader should not be manually crated, but rather crated by calling add_rx() on a Bus.