#![feature(test)]
extern crate test;
use test::Bencher;
use crossbeam_channel::{bounded, Sender};
use futures::channel::oneshot;
use pi_async_rt::rt::{
multi_thread::{MultiTaskRuntime, MultiTaskRuntimeBuilder, StealableTaskPool},
startup_global_time_loop, AsyncRuntime,
};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[derive(Clone)]
struct AtomicCounter(Sender<()>);
impl Drop for AtomicCounter {
fn drop(&mut self) {
let _ = self.0.send(()); }
}
fn rt(weights: [u8; 2]) -> MultiTaskRuntime<(), StealableTaskPool<()>> {
let pool = StealableTaskPool::with(4, 1000000, weights, 3000);
MultiTaskRuntimeBuilder::new(pool)
.thread_stack_size(2 * 1024 * 1024)
.init_worker_size(4)
.set_worker_limit(4, 4)
.build()
}
#[bench]
fn pi_async_spawn_empty_many(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
let rt = rt([254, 1]);
let (send, recv) = bounded(1);
b.iter(move || {
let rt0 = rt.clone();
let rt1 = rt.clone();
let rt2 = rt.clone();
let rt3 = rt.clone();
{
let counter = Arc::new(AtomicCounter(send.clone()));
let counter0 = counter.clone();
let counter1 = counter.clone();
let counter2 = counter.clone();
let counter3 = counter.clone();
thread::spawn(move || {
for _ in 0..2500 {
let counter_copy = counter0.clone();
if let Err(e) = rt0.spawn(async move {
drop(counter_copy);
}) {
panic!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
thread::spawn(move || {
for _ in 2500..5000 {
let counter_copy = counter1.clone();
if let Err(e) = rt1.spawn(async move {
drop(counter_copy);
}) {
panic!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
thread::spawn(move || {
for _ in 5000..7500 {
let counter_copy = counter2.clone();
if let Err(e) = rt2.spawn(async move {
drop(counter_copy);
}) {
panic!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
thread::spawn(move || {
for _ in 7500..10000 {
let counter_copy = counter3.clone();
if let Err(e) = rt3.spawn(async move {
drop(counter_copy);
}) {
println!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
}
let _ = recv.clone().recv().unwrap();
});
}
#[bench]
fn pi_async_await_empty_many(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
let rt = rt([254, 1]);
let (send, recv) = bounded(1);
b.iter(move || {
{
let counter = Arc::new(AtomicCounter(send.clone()));
let counter0 = counter.clone();
let counter1 = counter.clone();
let counter2 = counter.clone();
let counter3 = counter.clone();
let _ = rt.spawn(async move {
for _ in 0..2500 {
let counter_copy = counter0.clone();
async move {
drop(counter_copy);
}
.await;
}
});
let _ = rt.spawn(async move {
for _ in 2500..5000 {
let counter_copy = counter1.clone();
async move {
drop(counter_copy);
}
.await;
}
});
let _ = rt.spawn(async move {
for _ in 5000..7500 {
let counter_copy = counter2.clone();
async move {
drop(counter_copy);
}
.await;
}
});
let _ = rt.spawn(async move {
for _ in 7500..10000 {
let counter_copy = counter3.clone();
async move {
drop(counter_copy);
}
.await;
}
});
}
let _ = recv.clone().recv().unwrap();
});
}
#[bench]
fn pi_async_spawn_many(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
let rt = rt([1, 254]);
let (send, recv) = bounded(1);
b.iter(move || {
let rt0 = rt.clone();
let rt1 = rt.clone();
let rt2 = rt.clone();
let rt3 = rt.clone();
{
let counter = Arc::new(AtomicCounter(send.clone()));
let counter0 = counter.clone();
let counter1 = counter.clone();
let counter2 = counter.clone();
let counter3 = counter.clone();
let rt0_copy = rt0.clone();
let _ = rt0.spawn(async move {
for _ in 0..2500 {
let counter_copy = counter0.clone();
let _ = rt0_copy.spawn_local(async move {
drop(counter_copy);
});
}
});
let rt1_copy = rt1.clone();
let _ = rt1.spawn(async move {
for _ in 2500..5000 {
let counter_copy = counter1.clone();
let _ = rt1_copy.spawn_local(async move {
drop(counter_copy);
});
}
});
let rt2_copy = rt2.clone();
let _ = rt2.spawn(async move {
for _ in 5000..7500 {
let counter_copy = counter2.clone();
let _ = rt2_copy.spawn_local(async move {
drop(counter_copy);
});
}
});
let rt3_copy = rt3.clone();
let _ = rt3.spawn(async move {
for _ in 7500..10000 {
let counter_copy = counter3.clone();
let _ = rt3_copy.spawn_local(async move {
drop(counter_copy);
});
}
});
}
let _ = recv.clone().recv().unwrap();
});
}
#[bench]
fn pi_async_yield_many(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
let rt = rt([1, 254]);
const NUM_YIELD: usize = 1_000;
const TASKS: usize = 200;
b.iter(move || {
let (send, recv) = bounded(1);
{
let counter = Arc::new(AtomicCounter(send));
for _ in 0..TASKS {
let rt_copy = rt.clone();
let counter_copy = counter.clone();
let _ = rt.spawn(async move {
for _ in 0..NUM_YIELD {
rt_copy.yield_now().await;
}
drop(counter_copy);
});
}
}
let _ = recv.recv().unwrap();
});
}
#[bench]
fn pi_async_ping_pong(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
const NUM_PINGS: usize = 1_000;
let rt = rt([1, 254]);
let rem = Arc::new(AtomicUsize::new(0));
b.iter(|| {
let (send, recv) = bounded(1);
let rem = rem.clone();
rem.store(NUM_PINGS, Relaxed);
let rt_copy = rt.clone();
let _ = rt.spawn(async move {
let rt_clone = rt_copy.clone();
let _ = rt_copy.spawn_local(async move {
for _ in 0..NUM_PINGS {
let rem = rem.clone();
let send = send.clone();
let rt_clone_ = rt_clone.clone();
let _ = rt_clone.spawn_local(async move {
let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
let _ = rt_clone_.spawn_local(async move {
rx1.await.unwrap();
tx2.send(()).unwrap();
});
tx1.send(()).unwrap();
rx2.await.unwrap();
if 1 == rem.fetch_sub(1, Relaxed) {
send.send(()).unwrap();
}
});
}
});
});
let _ = recv.recv().unwrap();
});
}
#[bench]
fn pi_async_chained_spawn(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
const ITER: usize = 1_000;
let rt = rt([1, 254]);
let (send, recv) = bounded(1);
fn iter(rt: MultiTaskRuntime<(), StealableTaskPool<()>>, send: Sender<()>, n: usize) {
if n == 0 {
send.send(()).unwrap();
} else {
let rt_copy = rt.clone();
let _ = rt.spawn_priority(10, async move {
iter(rt_copy.clone(), send, n - 1);
});
}
}
b.iter(move || {
let rt_copy = rt.clone();
let send_copy = send.clone();
let _ = rt.spawn(async move {
let rt_clone = rt_copy.clone();
let send_clone = send_copy.clone();
let _ = rt_copy.spawn_priority(10, async move {
iter(rt_clone, send_clone, ITER);
});
});
let _ = recv.recv().unwrap();
});
}
#[bench]
fn pi_async_spawn_one_to_one(b: &mut Bencher) {
let _handle = startup_global_time_loop(100);
thread::sleep(Duration::from_millis(10000));
let rt = rt([1, 1]);
let (send, recv) = bounded(1);
b.iter(move || {
let rt0 = rt.clone();
let rt1 = rt.clone();
let rt2 = rt.clone();
let rt3 = rt.clone();
{
let counter = Arc::new(AtomicCounter(send.clone()));
let counter0 = counter.clone();
let counter1 = counter.clone();
let counter2 = counter.clone();
let counter3 = counter.clone();
thread::spawn(move || {
for _ in 0..2500 {
let rt0_copy = rt0.clone();
let counter_copy = counter0.clone();
if let Err(e) = rt0.spawn(async move {
let _ = rt0_copy.spawn_local(async move {
drop(counter_copy);
});
}) {
panic!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
thread::spawn(move || {
for _ in 2500..5000 {
let rt1_copy = rt1.clone();
let counter_copy = counter1.clone();
if let Err(e) = rt1.spawn(async move {
let _ = rt1_copy.spawn_local(async move {
drop(counter_copy);
});
}) {
panic!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
thread::spawn(move || {
for _ in 5000..7500 {
let rt2_copy = rt2.clone();
let counter_copy = counter2.clone();
if let Err(e) = rt2.spawn(async move {
let _ = rt2_copy.spawn_local(async move {
drop(counter_copy);
});
}) {
panic!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
thread::spawn(move || {
for _ in 7500..10000 {
let rt3_copy = rt3.clone();
let counter_copy = counter3.clone();
if let Err(e) = rt3.spawn(async move {
let _ = rt3_copy.spawn_local(async move {
drop(counter_copy);
});
}) {
println!("!!!> spawn empty singale task failed, reason: {:?}", e);
}
}
});
}
let _ = recv.clone().recv().unwrap();
});
}