Crate tokio_bus

Crate tokio_bus 

Source
Expand description

tokio-bus provides a lock-free, bounded, single-produce, multi-consumer, broadcast channel usable as a Sink and Stream with tokio.

The bus implementation itself is the wonderful bus crate, this crate provides a layer on top to allow using the bus with tokio.

§Example

use tokio;
use tokio_bus::Bus;
use futures::future::{Future, lazy, ok};
use futures::stream::{Stream, iter_ok};
use futures::sink::Sink;

let mut bus = Bus::new(64);
let rx1 = bus.add_rx();
let rx2 = bus.add_rx();

let send_values = bus
    .send_all(iter_ok::<_, ()>(vec![1, 2, 3, 4, 5, 6]));

let sum_values = rx1
    .fold(0i32, |acc, x| { ok(acc + x) });

let div_values = rx2
    .fold(1f64, |acc, x| { ok(x as f64 / acc) });

let runtime = tokio::runtime::Runtime::new().unwrap();
runtime.block_on_all(lazy(move || {
    tokio::spawn(send_values
        .map(|_| {})
        .map_err(|_| { panic!(); })
    );
    assert_eq!(sum_values.wait(), Ok(21));
    assert_eq!(div_values.wait(), Ok(3.2));
    ok::<(), ()>(())
})).unwrap();

Structs§

Bus
A bus which buffers messages for all of its readers to eventually read. Allows the dynamic addition and removal of readers.
BusReader
The BusReader should not be manually crated, but rather crated by calling add_rx() on a Bus.