reactors 0.1.4

Performance-focused cross-platform asynchronous IO implementation
Documentation
use std::{thread::spawn, time::Duration};

use criterion::{async_executor::FuturesExecutor, *};
use futures::executor::block_on;
use reactors::Reactor;

async fn setup_tokio_server() -> anyhow::Result<()> {
    use tokio::io::AsyncWriteExt;
    use tokio::{io::AsyncReadExt, net::TcpListener};

    let listener = TcpListener::bind("127.0.0.1:1812").await?;

    loop {
        let (mut conn, _) = listener.accept().await?;

        let mut buff = [0u8; 11];

        conn.read_exact(&mut buff).await?;

        assert_eq!(&buff, b"hello world");

        conn.write_all(&buff).await?;
    }
}

async fn tokio_client() -> anyhow::Result<()> {
    use tokio::io::AsyncWriteExt;
    use tokio::{io::AsyncReadExt, net::TcpStream};

    let mut conn = TcpStream::connect("127.0.0.1:1812").await?;

    let mut buff = [0u8; 11];

    conn.write_all(&b"hello world"[..]).await?;

    conn.read_exact(&mut buff).await?;

    assert_eq!(&buff, b"hello world");

    Ok(())
}

fn bench_tokio(c: &mut Criterion) {
    spawn(|| {
        use tokio::runtime::Runtime;

        let rt = Runtime::new().unwrap();

        rt.block_on(setup_tokio_server()).unwrap();
    });

    let rt = tokio::runtime::Runtime::new().unwrap();

    c.bench_function("echo tokio", |b| b.to_async(&rt).iter(|| tokio_client()));
}

async fn setup_reactors_server(reactor: reactors::io::IoReactor) -> anyhow::Result<()> {
    use futures::TryStreamExt;
    use futures::{AsyncReadExt, AsyncWriteExt};
    use reactors::io::socket::tcp::TcpAcceptor;

    let mut acceptor = TcpAcceptor::new(reactor, "127.0.0.1:1813".parse()?, None)?;

    while let Some((conn, remote)) = acceptor.try_next().await? {
        log::debug!("accept {}", remote);
        let mut reader = conn.to_read_stream(None);
        let mut writer = conn.to_write_stream(None);

        let mut buff = [0u8; 11];

        reader.read_exact(&mut buff).await?;

        assert_eq!(&buff, b"hello world");

        writer.write_all(&buff).await?;
    }

    Ok(())
}

async fn reactor_client(reactor: reactors::io::IoReactor) -> anyhow::Result<()> {
    use futures::{AsyncReadExt, AsyncWriteExt};
    use reactors::io::socket::tcp::TcpStream;

    let conn = TcpStream::connect(reactor, "127.0.0.1:1813".parse()?, None, None).await?;

    let mut reader = conn.to_read_stream(None);
    let mut writer = conn.to_write_stream(None);

    let mut buff = [0u8; 11];

    writer.write_all(&b"hello world"[..]).await?;

    reader.read_exact(&mut buff).await?;

    assert_eq!(&buff, b"hello world");

    Ok(())
}

fn bench_reactors(c: &mut Criterion) {
    // pretty_env_logger::init();
    use reactors::io::IoReactor;

    let reactor = IoReactor::default();

    let mut server_background_reactor = reactor.clone();

    spawn(move || loop {
        server_background_reactor
            .poll_once(Duration::from_millis(600))
            .unwrap();
    });

    let server_reactor = reactor.clone();

    spawn(move || {
        block_on(setup_reactors_server(server_reactor)).unwrap();
    });

    c.bench_function("echo reactors", |b| {
        b.to_async(FuturesExecutor)
            .iter(|| reactor_client(reactor.clone()))
    });
}

criterion_group!(benches, bench_reactors, bench_tokio);
criterion_main!(benches);