#![feature(test)]
extern crate futures;
extern crate test;
extern crate tokio_sync;
type Medium = [usize; 64];
type Large = [Medium; 64];
mod tokio {
use futures::{future, Async, Future, Sink, Stream};
use std::thread;
use test::{self, Bencher};
use tokio_sync::mpsc::*;
#[bench]
fn bounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Medium>(1_000));
})
}
#[bench]
fn unbounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded_channel::<super::Medium>());
})
}
#[bench]
fn bounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Large>(1_000));
})
}
#[bench]
fn unbounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded_channel::<super::Large>());
})
}
#[bench]
fn send_one_message(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
tx.try_send(1).unwrap();
assert_eq!(Async::Ready(Some(1)), rx.poll().unwrap());
})
}
#[bench]
fn send_one_message_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<super::Large>(1_000);
let _ = tx.try_send([[0; 64]; 64]);
let _ = test::black_box(&rx.poll());
})
}
#[bench]
fn bounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = channel::<i32>(1_000);
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(1);
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_not_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(1);
tx.try_send(1).unwrap();
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = unbounded_channel::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
let (_tx, mut rx) = unbounded_channel::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
for i in 0..1000 {
tx.try_send(i).unwrap();
assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
}
})
}
#[bench]
fn bounded_uncontended_1_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<super::Large>(1_000);
for i in 0..1000 {
let _ = tx.try_send([[i; 64]; 64]);
let _ = test::black_box(&rx.poll());
}
})
}
#[bench]
fn bounded_uncontended_2(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1000);
for i in 0..1000 {
tx.try_send(i).unwrap();
}
for i in 0..1000 {
assert_eq!(Async::Ready(Some(i)), rx.poll().unwrap());
}
})
}
#[bench]
fn contended_unbounded_tx(b: &mut Bencher) {
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..4 {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for mut tx in rx.iter() {
for i in 0..1_000 {
tx.try_send(i).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1_000_000);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(4 * 1_000);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
#[bench]
fn contended_bounded_tx(b: &mut Bencher) {
const THREADS: usize = 4;
const ITERS: usize = 100;
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..THREADS {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for tx in rx.iter() {
let mut tx = tx.wait();
for i in 0..ITERS {
tx.send(i as i32).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(THREADS * ITERS);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
}
mod legacy {
use futures::sync::mpsc::*;
use futures::{future, Async, Future, Sink, Stream};
use std::thread;
use test::{self, Bencher};
#[bench]
fn bounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Medium>(1_000));
})
}
#[bench]
fn unbounded_new_medium(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded::<super::Medium>());
})
}
#[bench]
fn bounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&channel::<super::Large>(1_000));
})
}
#[bench]
fn unbounded_new_large(b: &mut Bencher) {
b.iter(|| {
let _ = test::black_box(&unbounded::<super::Large>());
})
}
#[bench]
fn send_one_message(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel(1_000);
tx.try_send(1).unwrap();
assert_eq!(Ok(Async::Ready(Some(1))), rx.poll());
})
}
#[bench]
fn send_one_message_large(b: &mut Bencher) {
b.iter(|| {
let (mut tx, mut rx) = channel::<super::Large>(1_000);
let _ = tx.try_send([[0; 64]; 64]);
let _ = test::black_box(&rx.poll());
})
}
#[bench]
fn bounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = channel::<i32>(1_000);
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(0);
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn bounded_tx_poll_not_ready(b: &mut Bencher) {
let (mut tx, _rx) = channel::<i32>(0);
tx.try_send(1).unwrap();
b.iter(|| {
future::lazy(|| {
assert!(tx.poll_ready().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready(b: &mut Bencher) {
let (_tx, mut rx) = unbounded::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_rx_not_ready_x5(b: &mut Bencher) {
let (_tx, mut rx) = unbounded::<i32>();
b.iter(|| {
future::lazy(|| {
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
assert!(rx.poll().unwrap().is_not_ready());
Ok::<_, ()>(())
})
.wait()
.unwrap();
})
}
#[bench]
fn unbounded_uncontended_1(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
}
})
}
#[bench]
fn unbounded_uncontended_1_large(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded::<super::Large>();
for i in 0..1000 {
let _ = UnboundedSender::unbounded_send(&tx, [[i; 64]; 64]);
let _ = test::black_box(&rx.poll());
}
})
}
#[bench]
fn unbounded_uncontended_2(b: &mut Bencher) {
b.iter(|| {
let (tx, mut rx) = unbounded();
for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
}
for i in 0..1000 {
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll());
}
})
}
#[bench]
fn multi_thread_unbounded_tx(b: &mut Bencher) {
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..4 {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for mut tx in rx.iter() {
for i in 0..1_000 {
tx.try_send(i).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1_000_000);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(4 * 1_000);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
#[bench]
fn contended_bounded_tx(b: &mut Bencher) {
const THREADS: usize = 4;
const ITERS: usize = 100;
let mut threads = vec![];
let mut txs = vec![];
for _ in 0..THREADS {
let (tx, rx) = ::std::sync::mpsc::channel::<Sender<i32>>();
txs.push(tx);
threads.push(thread::spawn(move || {
for tx in rx.iter() {
let mut tx = tx.wait();
for i in 0..ITERS {
tx.send(i as i32).unwrap();
}
}
}));
}
b.iter(|| {
let (tx, rx) = channel::<i32>(1);
for th in &txs {
th.send(tx.clone()).unwrap();
}
drop(tx);
let rx = rx.wait().take(THREADS * ITERS);
for v in rx {
let _ = test::black_box(v);
}
});
drop(txs);
for th in threads {
th.join().unwrap();
}
}
}