stream-broadcast 0.1.1

Runtime independent broadcast, which only polls it's underlying stream if no pending data is available
Documentation

Runtime independent broadcast, which only polls it's underlying stream if no pending data is available.

use futures::StreamExt;
use stream_broadcast::StreamBroadcastExt;

#[tokio::main]
async fn main() {
    let broadcast = futures::stream::iter(0..4).broadcast(3);
    let broadcast2 = broadcast.clone();
    assert_eq!(4, broadcast.count().await);
    // Number Zero wasn't available anymore due to BroadcastSize=3
    assert_eq!(vec![(1, 1), (0,2), (0,3)], broadcast2.collect::<Vec<_>>().await);
}

Uses #![forbid(unsafe_code)]

Difference to other libraries:

shared_stream:

  • Caches the entire stream from start, which is not practical for big datasets. This crate streams from the same position where the clone-origin is currently at
  • shared_stream never skips an entry. This library only provides information about missing data
  • High risk of leaking memory

tokio::sync::broadcast:

  • Broadcasts don't implement Stream directly, but tokio_stream provides a wrapper.
  • Entries are pushed actively to the sender (No Lazy evaluation when stream is paused). This requires a subroutine, which has to be managed somehow.
  • Instead of returning missing frames in the ErrorVariant (tokio_stream), this library returns a tuple (missing_frames_since_last_frame, TData) to mitigate errors when doing stuff like stream.count()