tokio-core 0.1.14

Core I/O and event loop primitives for asynchronous I/O in Rust. Foundation for the rest of the tokio crates.
Documentation
#![feature(test)]

extern crate futures;
extern crate tokio_core;

#[macro_use]
extern crate tokio_io;

pub extern crate test;

mod prelude {
    pub use futures::*;
    pub use tokio_core::reactor::Core;
    pub use tokio_core::net::{TcpListener, TcpStream};
    pub use tokio_io::io::read_to_end;

    pub use test::{self, Bencher};
    pub use std::thread;
    pub use std::time::Duration;
    pub use std::io::{self, Read, Write};
}

mod connect_churn {
    use ::prelude::*;

    const NUM: usize = 300;
    const CONCURRENT: usize = 8;

    #[bench]
    fn one_thread(b: &mut Bencher) {
        let addr = "127.0.0.1:0".parse().unwrap();
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let listener = TcpListener::bind(&addr, &handle).unwrap();
        let addr = listener.local_addr().unwrap();

        // Spawn a single task that accepts & drops connections
        handle.spawn(
            listener.incoming()
                .map_err(|e| panic!("server err: {:?}", e))
                .for_each(|_| Ok(())));

        b.iter(move || {
            let connects = stream::iter((0..NUM).map(|_| {
                Ok(TcpStream::connect(&addr, &handle)
                    .and_then(|sock| {
                        sock.set_linger(Some(Duration::from_secs(0))).unwrap();
                        read_to_end(sock, vec![])
                    }))
            }));

            core.run(
                connects.buffer_unordered(CONCURRENT)
                    .map_err(|e| panic!("client err: {:?}", e))
                    .for_each(|_| Ok(()))).unwrap();
        });
    }

    fn n_workers(n: usize, b: &mut Bencher) {
        let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
        let (remote_tx, remote_rx) = ::std::sync::mpsc::channel();

        // Spawn reactor thread
        thread::spawn(move || {
            // Create the core
            let mut core = Core::new().unwrap();

            // Reactor handles
            let handle = core.handle();
            let remote = handle.remote().clone();

            // Bind the TCP listener
            let listener = TcpListener::bind(
                &"127.0.0.1:0".parse().unwrap(), &handle).unwrap();

            // Get the address being listened on.
            let addr = listener.local_addr().unwrap();

            // Send the remote & address back to the main thread
            remote_tx.send((remote, addr)).unwrap();

            // Spawn a single task that accepts & drops connections
            handle.spawn(
                listener.incoming()
                    .map_err(|e| panic!("server err: {:?}", e))
                    .for_each(|_| Ok(())));

            // Run the reactor
            core.run(shutdown_rx).unwrap();
        });

        // Get the remote info
        let (remote, addr) = remote_rx.recv().unwrap();

        b.iter(move || {
            use std::sync::{Barrier, Arc};

            // Create a barrier to coordinate threads
            let barrier = Arc::new(Barrier::new(n + 1));

            // Spawn worker threads
            let threads: Vec<_> = (0..n).map(|_| {
                let barrier = barrier.clone();
                let remote = remote.clone();
                let addr = addr.clone();

                thread::spawn(move || {
                    let connects = stream::iter((0..(NUM / n)).map(|_| {
                        // TODO: Once `Handle` is `Send / Sync`, update this

                        let (socket_tx, socket_rx) = sync::oneshot::channel();

                        remote.spawn(move |handle| {
                            TcpStream::connect(&addr, &handle)
                                .map_err(|e| panic!("connect err: {:?}", e))
                                .then(|res| socket_tx.send(res))
                                .map_err(|_| ())
                        });

                        Ok(socket_rx
                            .then(|res| res.unwrap())
                            .and_then(|sock| {
                                sock.set_linger(Some(Duration::from_secs(0))).unwrap();
                                read_to_end(sock, vec![])
                            }))
                    }));

                    barrier.wait();

                    connects.buffer_unordered(CONCURRENT)
                        .map_err(|e| panic!("client err: {:?}", e))
                        .for_each(|_| Ok(())).wait().unwrap();
                })
            }).collect();

            barrier.wait();

            for th in threads {
                th.join().unwrap();
            }
        });

        // Shutdown the reactor
        shutdown_tx.send(()).unwrap();
    }

    #[bench]
    fn two_threads(b: &mut Bencher) {
        n_workers(1, b);
    }

    #[bench]
    fn multi_threads(b: &mut Bencher) {
        n_workers(4, b);
    }
}

mod transfer {
    use ::prelude::*;
    use std::{cmp, mem};

    const MB: usize = 3 * 1024 * 1024;

    struct Drain {
        sock: TcpStream,
        chunk: usize,
    }

    impl Future for Drain {
        type Item = ();
        type Error = io::Error;

        fn poll(&mut self) -> Poll<(), io::Error> {
            let mut buf: [u8; 1024] = unsafe { mem::uninitialized() };

            loop {
                match try_nb!(self.sock.read(&mut buf[..self.chunk])) {
                    0 => return Ok(Async::Ready(())),
                    _ => {}
                }
            }
        }
    }

    struct Transfer {
        sock: TcpStream,
        rem: usize,
        chunk: usize,
    }

    impl Future for Transfer {
        type Item = ();
        type Error = io::Error;

        fn poll(&mut self) -> Poll<(), io::Error> {
            while self.rem > 0 {
                let len = cmp::min(self.rem, self.chunk);
                let buf = &DATA[..len];

                let n = try_nb!(self.sock.write(&buf));
                self.rem -= n;
            }

            Ok(Async::Ready(()))
        }
    }

    static DATA: [u8; 1024] = [0; 1024];

    fn one_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
        let addr = "127.0.0.1:0".parse().unwrap();
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let listener = TcpListener::bind(&addr, &handle).unwrap();
        let addr = listener.local_addr().unwrap();

        let h2 = handle.clone();

        // Spawn a single task that accepts & drops connections
        handle.spawn(
            listener.incoming()
                .map_err(|e| panic!("server err: {:?}", e))
                .for_each(move |(sock, _)| {
                    sock.set_linger(Some(Duration::from_secs(0))).unwrap();
                    let drain = Drain {
                        sock: sock,
                        chunk: read_size,
                    };

                    h2.spawn(drain.map_err(|e| panic!("server error: {:?}", e)));

                    Ok(())
                }));

        b.iter(move || {
            let client = TcpStream::connect(&addr, &handle)
                .and_then(|sock| {
                    Transfer {
                        sock: sock,
                        rem: MB,
                        chunk: write_size,
                    }
                });

            core.run(
                client.map_err(|e| panic!("client err: {:?}", e))
                ).unwrap();
        });
    }

    fn cross_thread(b: &mut Bencher, read_size: usize, write_size: usize) {
        let (shutdown_tx, shutdown_rx) = sync::oneshot::channel();
        let (remote_tx, remote_rx) = ::std::sync::mpsc::channel();

        // Spawn reactor thread
        thread::spawn(move || {
            // Create the core
            let mut core = Core::new().unwrap();

            // Reactor handles
            let handle = core.handle();
            let remote = handle.remote().clone();

            remote_tx.send(remote).unwrap();
            core.run(shutdown_rx).unwrap();
        });

        let remote = remote_rx.recv().unwrap();

        b.iter(move || {
            let (server_tx, server_rx) = sync::oneshot::channel();
            let (client_tx, client_rx) = sync::oneshot::channel();

            remote.spawn(|handle| {
                let sock = TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle).unwrap();
                server_tx.send(sock).unwrap();
                Ok(())
            });

            let remote2 = remote.clone();

            server_rx.and_then(move |server| {
                let addr = server.local_addr().unwrap();

                remote2.spawn(move |handle| {
                    let fut = TcpStream::connect(&addr, &handle);
                    client_tx.send(fut).ok().unwrap();
                    Ok(())
                });

                let client = client_rx
                    .then(|res| res.unwrap())
                    .and_then(move |sock| {
                        Transfer {
                            sock: sock,
                            rem: MB,
                            chunk: write_size,
                        }
                    });

                let server = server.incoming().into_future()
                    .map_err(|(e, _)| e)
                    .and_then(move |(sock, _)| {
                        let sock = sock.unwrap().0;
                        sock.set_linger(Some(Duration::from_secs(0))).unwrap();

                        Drain {
                            sock: sock,
                            chunk: read_size,
                        }
                    });

                client
                    .join(server)
                    .then(|res| {
                        let _ = res.unwrap();
                        Ok(())
                    })
            }).wait().unwrap();
        });

        // Shutdown the reactor
        shutdown_tx.send(()).unwrap();
    }

    mod small_chunks {
        use ::prelude::*;

        #[bench]
        fn one_thread(b: &mut Bencher) {
            super::one_thread(b, 32, 32);
        }

        #[bench]
        fn cross_thread(b: &mut Bencher) {
            super::cross_thread(b, 32, 32);
        }
    }

    mod big_chunks {
        use ::prelude::*;

        #[bench]
        fn one_thread(b: &mut Bencher) {
            super::one_thread(b, 1_024, 1_024);
        }

        #[bench]
        fn cross_thread(b: &mut Bencher) {
            super::cross_thread(b, 1_024, 1_024);
        }
    }
}