use nio::{Runtime, RuntimeBuilder, spawn};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering::Relaxed},
mpsc,
};
mod support {
pub mod futures;
}
use support::futures;
use support::futures::sync::oneshot;
use support::futures::test::assert_ok;
fn multi() -> Runtime {
RuntimeBuilder::new().rt().unwrap()
}
#[test]
fn many_oneshot_futures() {
const NUM: usize = if cfg!(miri) { 20 } else { 1_000 };
for _ in 0..5 {
let (tx, rx) = mpsc::channel();
let rt = multi().context();
let cnt = Arc::new(AtomicUsize::new(0));
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
}
}
#[test]
fn spawn_two() {
let rt = multi();
let out = rt.block_on(|| async {
let (tx, rx) = oneshot::channel();
spawn(async move {
spawn(async move {
tx.send("ZOMG").unwrap();
});
});
assert_ok!(rx.await)
});
assert_eq!(out, "ZOMG");
}
#[test]
fn many_multishot_futures() {
const CHAIN: usize = if cfg!(miri) { 20 } else { 200 };
const CYCLES: usize = if cfg!(miri) { 3 } else { 5 };
const TRACKS: usize = if cfg!(miri) { 2 } else { 50 };
let rt = multi();
let mut start_txs = Vec::with_capacity(TRACKS);
let mut final_rxs = Vec::with_capacity(TRACKS);
for _ in 0..TRACKS {
let (start_tx, mut chain_rx) = futures::sync::mpsc::channel(10);
for _ in 0..CHAIN {
let (next_tx, next_rx) = futures::sync::mpsc::channel(10);
rt.spawn(async move {
while let Some(v) = chain_rx.recv().await {
next_tx.send(v).await.unwrap();
}
});
chain_rx = next_rx;
}
let (final_tx, final_rx) = futures::sync::mpsc::channel(10);
let cycle_tx = start_tx.clone();
let mut rem = CYCLES;
rt.spawn(async move {
for _ in 0..CYCLES {
let msg = chain_rx.recv().await.unwrap();
rem -= 1;
if rem == 0 {
final_tx.send(msg).await.unwrap();
} else {
cycle_tx.send(msg).await.unwrap();
}
}
});
start_txs.push(start_tx);
final_rxs.push(final_rx);
}
rt.block_on(|| async move {
for start_tx in start_txs {
start_tx.send("ping").await.unwrap();
}
for mut final_rx in final_rxs {
final_rx.recv().await.unwrap();
}
});
}
#[test]
fn lifo_slot_budget() {
async fn my_fn() {
spawn_another();
}
fn spawn_another() {
spawn(my_fn());
}
let rt = multi();
let (send, recv) = oneshot::channel();
rt.spawn(async move {
spawn(my_fn());
let _ = send.send(());
});
let _ = rt.block_on(|| recv);
}
#[test]
fn blocking() {
const NUM: usize = if cfg!(miri) { 50 } else { 1_000 };
let (tx, rx) = mpsc::channel();
let rt = RuntimeBuilder::new()
.threadpool_load_factor(1)
.rt()
.unwrap();
let cnt = Arc::new(AtomicUsize::new(0));
let block = Arc::new(std::sync::Barrier::new(5));
for _ in 0..4 {
let block = block.clone();
let ctx = rt.context();
rt.spawn(async move {
ctx.spawn_blocking(move || {
block.wait();
block.wait();
})
});
}
block.wait();
for _ in 0..NUM {
let cnt = cnt.clone();
let tx = tx.clone();
rt.spawn(async move {
let num = cnt.fetch_add(1, Relaxed) + 1;
if num == NUM {
tx.send(()).unwrap();
}
});
}
rx.recv().unwrap();
block.wait();
}
#[test]
fn multi_threadpool() {
let rt1 = multi();
let rt2 = multi();
let (tx, rx) = oneshot::channel();
let (done_tx, done_rx) = mpsc::channel();
rt2.spawn(async move {
rx.await.unwrap();
done_tx.send(()).unwrap();
});
rt1.spawn(async move {
tx.send(()).unwrap();
});
done_rx.recv().unwrap();
}