tokio 0.2.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![feature(test)]

extern crate test;

use tokio::executor::thread_pool::{Builder, Spawner};
use tokio::sync::oneshot;

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::{mpsc, Arc};
use std::task::{Context, Poll};

struct Backoff(usize);

impl Future for Backoff {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        if self.0 == 0 {
            Poll::Ready(())
        } else {
            self.0 -= 1;
            cx.waker().wake_by_ref();
            Poll::Pending
        }
    }
}

const NUM_THREADS: usize = 6;

#[bench]
fn spawn_many(b: &mut test::Bencher) {
    const NUM_SPAWN: usize = 10_000;

    let threadpool = Builder::new().num_threads(NUM_THREADS).build();

    let (tx, rx) = mpsc::sync_channel(1000);
    let rem = Arc::new(AtomicUsize::new(0));

    b.iter(|| {
        rem.store(NUM_SPAWN, Relaxed);

        for _ in 0..NUM_SPAWN {
            let tx = tx.clone();
            let rem = rem.clone();

            threadpool.spawn(async move {
                if 1 == rem.fetch_sub(1, Relaxed) {
                    tx.send(()).unwrap();
                }
            });
        }

        let _ = rx.recv().unwrap();
    });
}

#[bench]
fn yield_many(b: &mut test::Bencher) {
    const NUM_YIELD: usize = 1_000;
    const TASKS_PER_CPU: usize = 50;

    let threadpool = Builder::new().num_threads(NUM_THREADS).build();

    let tasks = TASKS_PER_CPU * num_cpus::get_physical();
    let (tx, rx) = mpsc::sync_channel(tasks);

    b.iter(move || {
        for _ in 0..tasks {
            let tx = tx.clone();

            threadpool.spawn(async move {
                let backoff = Backoff(NUM_YIELD);
                backoff.await;
                tx.send(()).unwrap();
            });
        }

        for _ in 0..tasks {
            let _ = rx.recv().unwrap();
        }
    });
}

#[bench]
fn ping_pong(b: &mut test::Bencher) {
    const NUM_PINGS: usize = 1_000;

    let threadpool = Builder::new().num_threads(NUM_THREADS).build();

    let (done_tx, done_rx) = mpsc::sync_channel(1000);
    let rem = Arc::new(AtomicUsize::new(0));

    b.iter(|| {
        let done_tx = done_tx.clone();
        let rem = rem.clone();
        rem.store(NUM_PINGS, Relaxed);

        let spawner = threadpool.spawner().clone();

        threadpool.spawn(async move {
            for _ in 0..NUM_PINGS {
                let rem = rem.clone();
                let done_tx = done_tx.clone();

                let spawner2 = spawner.clone();

                spawner.spawn(async move {
                    let (tx1, rx1) = oneshot::channel();
                    let (tx2, rx2) = oneshot::channel();

                    spawner2.spawn(async move {
                        rx1.await.unwrap();
                        tx2.send(()).unwrap();
                    });

                    tx1.send(()).unwrap();
                    rx2.await.unwrap();

                    if 1 == rem.fetch_sub(1, Relaxed) {
                        done_tx.send(()).unwrap();
                    }
                });
            }
        });

        done_rx.recv().unwrap();
    });
}

#[bench]
fn chained_spawn(b: &mut test::Bencher) {
    const ITER: usize = 1_000;

    let threadpool = Builder::new().num_threads(NUM_THREADS).build();

    fn iter(spawner: Spawner, done_tx: mpsc::SyncSender<()>, n: usize) {
        if n == 0 {
            done_tx.send(()).unwrap();
        } else {
            let s2 = spawner.clone();
            spawner.spawn(async move {
                iter(s2, done_tx, n - 1);
            });
        }
    }

    let (done_tx, done_rx) = mpsc::sync_channel(1000);

    b.iter(move || {
        let done_tx = done_tx.clone();
        let spawner = threadpool.spawner().clone();
        threadpool.spawn(async move {
            iter(spawner, done_tx, ITER);
        });

        done_rx.recv().unwrap();
    });
}