ruchei 0.1.3-a.0

Utilities for working with many streams
Documentation
//! Various benchmarks for in-memory multicast.

use std::{
    pin::{Pin, pin},
    task::{Context, Poll},
    time::Instant,
};

use futures_channel::mpsc::{SendError, UnboundedReceiver, UnboundedSender, unbounded};
use futures_util::{
    Future, Sink, SinkExt, Stream, StreamExt,
    future::{select, select_all},
};
use pin_project::pin_project;
use ruchei::{
    connection_item::ConnectionItemExt,
    echo::buffered::EchoBuffered,
    multicast::{
        buffered::MulticastBuffered, bufferless::MulticastBufferless, replay::MulticastReplay,
    },
};

#[pin_project]
struct Channel<T> {
    #[pin]
    sender: UnboundedSender<T>,
    #[pin]
    receiver: UnboundedReceiver<T>,
}

impl<T> Stream for Channel<T> {
    type Item = Result<T, SendError>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.project().receiver.poll_next(cx).map(|o| o.map(Ok))
    }
}

impl<T> Sink<T> for Channel<T> {
    type Error = SendError;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().sender.poll_ready(cx)
    }

    fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
        self.project().sender.start_send(item)
    }

    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.project().receiver.close();
        Poll::Ready(Ok(()))
    }
}

fn channel<T>() -> (Channel<T>, Channel<T>) {
    let (sender0, receiver1) = unbounded();
    let (sender1, receiver0) = unbounded();
    (
        Channel {
            sender: sender0,
            receiver: receiver0,
        },
        Channel {
            sender: sender1,
            receiver: receiver1,
        },
    )
}

const N: usize = 100_000;

async fn single(mut remote: Channel<usize>, factor: usize) {
    for i in 0..N {
        remote.send(i).await.unwrap();
    }
    for _ in 0..N * factor {
        remote.next().await.unwrap().unwrap();
    }
}

fn setup<Fut: Future<Output = ()>>(
    f: impl Fn(Vec<Channel<usize>>) -> Fut,
) -> (UnboundedReceiver<Channel<usize>>, impl Future<Output = ()>) {
    let (streams_s, streams_r) = unbounded();
    let mut remotes = Vec::new();
    for _ in 0..10 {
        let (local, remote) = channel();
        remotes.push(remote);
        streams_s.unbounded_send(local).unwrap();
    }
    (streams_r, async move {
        let start = Instant::now();
        f(remotes).await;
        println!("{:?}", start.elapsed());
    })
}

async fn test_one<F1: Future<Output = ()>, F2: Future<Output = ()>>(
    f1: impl Fn(UnboundedReceiver<Channel<usize>>) -> F1,
    f2: impl Fn(Vec<Channel<usize>>) -> F2,
) {
    let (streams_r, future) = setup(f2);
    select(pin!(future), pin!(f1(streams_r))).await;
}

async fn test_sequential(mut remotes: Vec<Channel<usize>>) {
    for i in 0..N {
        for remote in remotes.iter_mut() {
            remote.send(i).await.unwrap();
        }
    }
    for _ in 0..N {
        for _ in 0..remotes.len() {
            for remote in remotes.iter_mut() {
                remote.next().await.unwrap().unwrap();
            }
        }
    }
}

async fn test_select(remotes: Vec<Channel<usize>>) {
    let factor = remotes.len();
    select_all(
        remotes
            .into_iter()
            .map(|remote| Box::pin(single(remote, factor))),
    )
    .await;
}

async fn test_two<Fut: Future<Output = ()>>(f: impl Fn(UnboundedReceiver<Channel<usize>>) -> Fut) {
    test_one(&f, test_sequential).await;
    test_one(&f, test_select).await;
}

#[async_std::main]
async fn main() {
    test_two(|streams| async move {
        streams
            .multicast_replay()
            .connection_item_ignore()
            .echo_buffered()
            .await
            .unwrap()
    })
    .await;
    test_two(|streams| async move {
        streams
            .multicast_buffered()
            .connection_item_ignore()
            .echo_buffered()
            .await
            .unwrap()
    })
    .await;
    test_two(|streams| async move {
        streams
            .multicast_bufferless()
            .connection_item_ignore()
            .echo_buffered()
            .await
            .unwrap()
    })
    .await;
    test_two(|streams| async move {
        streams
            .multicast_bufferless()
            .connection_item_ignore()
            .echo_buffered()
            .await
            .unwrap()
    })
    .await;
}