[][src]Crate tokio_bus

tokio-bus provides a lock-free, bounded, single-produce, multi-consumer, broadcast channel usable as a Sink and Stream with tokio.

The bus implementation itself is the wonderful bus crate, this crate provides a layer on top to allow using the bus with tokio.

Example

use tokio;
use tokio_bus::Bus;
use futures::future::{Future, lazy, ok};
use futures::stream::{Stream, iter_ok};
use futures::sink::Sink;

let mut bus = Bus::new(64);
let rx1 = bus.add_rx();
let rx2 = bus.add_rx();

let send_values = bus
    .send_all(iter_ok::<_, ()>(vec![1, 2, 3, 4, 5, 6]));

let sum_values = rx1
    .fold(0i32, |acc, x| { ok(acc + x) });

let div_values = rx2
    .fold(1f64, |acc, x| { ok(x as f64 / acc) });

let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on_all(lazy(move || {
    tokio::spawn(send_values
        .map(|_| {})
        .map_err(|_| { panic!(); })
    );
    assert_eq!(sum_values.wait(), Ok(21));
    assert_eq!(div_values.wait(), Ok(3.2));
    ok::<(), ()>(())
})).unwrap();

Structs

Bus

A bus which buffers messages for all of its readers to eventually read. Allows the dynamic addition and removal of readers.

BusReader

The BusReader should not be manually crated, but rather crated by calling add_rx() on a Bus.