tokio 0.2.0-alpha.2

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![feature(test)]
#![warn(rust_2018_idioms)]

pub extern crate test;

mod prelude {
    pub use futures::*;
    pub use tokio::net::{TcpListener, TcpStream};
    pub use tokio::reactor::Reactor;
    pub use tokio_io::io::read_to_end;

    pub use std::io::{self, Read, Write};
    pub use std::thread;
    pub use std::time::Duration;
    pub use test::{self, Bencher};
}

mod connect_churn {
    use crate::prelude::*;

    const NUM: usize = 300;
    const CONCURRENT: usize = 8;

    #[bench]
    fn one_thread(b: &mut Bencher) {
        let addr = "127.0.0.1:0".parse().unwrap();

        b.iter(move || {
            let listener = TcpListener::bind(&addr).unwrap();
            let addr = listener.local_addr().unwrap();

            // Spawn a single future that accepts & drops connections
            let serve_incomings = listener
                .incoming()
                .map_err(|e| panic!("server err: {:?}", e))
                .for_each(|_| Ok(()));

            let connects = stream::iter_result((0..NUM).map(|_| {
                Ok(TcpStream::connect(&addr).and_then(|sock| {
                    sock.set_linger(Some(Duration::from_secs(0))).unwrap();
                    read_to_end(sock, vec![])
                }))
            }));

            let connects_concurrent = connects
                .buffer_unordered(CONCURRENT)
                .map_err(|e| panic!("client err: {:?}", e))
                .for_each(|_| Ok(()));

            serve_incomings
                .select(connects_concurrent)
                .map(|_| ())
                .map_err(|_| ())
                .wait()
                .unwrap();
        });
    }

    fn n_workers(n: usize, b: &mut Bencher) {
        let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
        let (addr_tx, addr_rx) = sync::oneshot::channel();

        // Spawn reactor thread
        let server_thread = thread::spawn(move || {
            // Bind the TCP listener
            let listener = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();

            // Get the address being listened on.
            let addr = listener.local_addr().unwrap();

            // Send the remote & address back to the main thread
            addr_tx.send(addr).unwrap();

            // Spawn a single future that accepts & drops connections
            let serve_incomings = listener
                .incoming()
                .map_err(|e| panic!("server err: {:?}", e))
                .for_each(|_| Ok(()));

            // Run server
            serve_incomings
                .select(shutdown_rx)
                .map(|_| ())
                .map_err(|_| ())
                .wait()
                .unwrap();
        });

        // Get the bind addr of the server
        let addr = addr_rx.wait().unwrap();

        b.iter(move || {
            use std::sync::{Arc, Barrier};

            // Create a barrier to coordinate threads
            let barrier = Arc::new(Barrier::new(n + 1));

            // Spawn worker threads
            let threads: Vec<_> = (0..n)
                .map(|_| {
                    let barrier = barrier.clone();
                    let addr = addr.clone();

                    thread::spawn(move || {
                        let connects = stream::iter_result((0..(NUM / n)).map(|_| {
                            Ok(TcpStream::connect(&addr)
                                .map_err(|e| panic!("connect err: {:?}", e))
                                .and_then(|sock| {
                                    sock.set_linger(Some(Duration::from_secs(0))).unwrap();
                                    read_to_end(sock, vec![])
                                }))
                        }));

                        barrier.wait();

                        connects
                            .buffer_unordered(CONCURRENT)
                            .map_err(|e| panic!("client err: {:?}", e))
                            .for_each(|_| Ok(()))
                            .wait()
                            .unwrap();
                    })
                })
                .collect();

            barrier.wait();

            for th in threads {
                th.join().unwrap();
            }
        });

        // Shutdown the server
        shutdown_tx.send(()).unwrap();
        server_thread.join().unwrap();
    }

    #[bench]
    fn two_threads(b: &mut Bencher) {
        n_workers(1, b);
    }

    #[bench]
    fn multi_threads(b: &mut Bencher) {
        n_workers(4, b);
    }
}

mod transfer {
    use crate::prelude::*;
    use std::{cmp, mem};
    use tokio_io::try_nb;

    const MB: usize = 3 * 1024 * 1024;

    struct Drain {
        sock: TcpStream,
        chunk: usize,
    }

    impl Future for Drain {
        type Item = ();
        type Error = io::Error;

        fn poll(&mut self) -> Poll<(), io::Error> {
            let mut buf: [u8; 1024] = unsafe { mem::uninitialized() };

            loop {
                match try_nb!(self.sock.read(&mut buf[..self.chunk])) {
                    0 => return Ok(Async::Ready(())),
                    _ => {}
                }
            }
        }
    }

    struct Transfer {
        sock: TcpStream,
        rem: usize,
        chunk: usize,
    }

    impl Future for Transfer {
        type Item = ();
        type Error = io::Error;

        fn poll(&mut self) -> Poll<(), io::Error> {
            while self.rem > 0 {
                let len = cmp::min(self.rem, self.chunk);
                let buf = &DATA[..len];

                let n = try_nb!(self.sock.write(&buf));
                self.rem -= n;
            }

            Ok(Async::Ready(()))
        }
    }

    static DATA: [u8; 1024] = [0; 1024];

    fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
        let addr = "127.0.0.1:0".parse().unwrap();

        b.iter(move || {
            let listener = TcpListener::bind(&addr).unwrap();
            let addr = listener.local_addr().unwrap();

            // Spawn a single future that accepts 1 connection, Drain it and drops
            let server = listener
                .incoming()
                .into_future() // take the first connection
                .map_err(|(e, _other_incomings)| e)
                .map(|(connection, _other_incomings)| connection.unwrap())
                .and_then(|sock| {
                    sock.set_linger(Some(Duration::from_secs(0))).unwrap();
                    let drain = Drain {
                        sock,
                        chunk: read_size,
                    };
                    drain
                        .map(|_| ())
                        .map_err(|e| panic!("server error: {:?}", e))
                })
                .map_err(|e| panic!("server err: {:?}", e));

            let client = TcpStream::connect(&addr)
                .and_then(move |sock| Transfer {
                    sock,
                    rem: MB,
                    chunk: write_size,
                })
                .map_err(|e| panic!("client err: {:?}", e));

            server.join(client).wait().unwrap();
        });
    }

    mod small_chunks {
        use crate::prelude::*;

        #[bench]
        fn one_thread(b: &mut Bencher) {
            super::one_thread(b, 32, 32);
        }
    }

    mod big_chunks {
        use crate::prelude::*;

        #[bench]
        fn one_thread(b: &mut Bencher) {
            super::one_thread(b, 1_024, 1_024);
        }
    }
}