#![allow(non_snake_case)]
use batch_channel::SendError;
mod fixture;
use fixture::*;
#[test]
fn zero_capacity_rounds_up_to_one() {
let (tx, rx) = batch_channel::bounded(1);
block_on(async move {
tx.send(10).await.unwrap();
assert_eq!(Some(10), rx.recv().await);
});
}
#[test]
fn send_returns_SendError_if_receivers_dropped() {
let (tx, rx) = batch_channel::bounded(1);
drop(rx);
assert_eq!(Err(SendError("x")), block_on(tx.send("x")));
}
#[test]
fn recv_unblocks_send() {
let mut pool = LocalPool::new();
let (tx, rx) = batch_channel::bounded(1);
let state = StateVar::new("");
pool.spawn({
let state = state.clone();
async move {
state.set("sending");
tx.send(10).await.unwrap();
state.set("sent 1");
tx.send(20).await.unwrap();
state.set("sent 2");
}
});
pool.run_until_stalled();
assert_eq!("sent 1", state.get());
assert_eq!(Some(10), block_on(rx.recv()));
pool.run_until_stalled();
assert_eq!("sent 2", state.get());
assert_eq!(Some(20), block_on(rx.recv()));
pool.run();
}
#[test]
fn recv_batch_unblocks_send() {
let mut pool = LocalPool::new();
let (tx, rx) = batch_channel::bounded(1);
let state = StateVar::new("");
pool.spawn({
let state = state.clone();
async move {
state.set("sending");
tx.send(10).await.unwrap();
state.set("sent 1");
tx.send(20).await.unwrap();
state.set("sent 2");
}
});
pool.run_until_stalled();
assert_eq!("sent 1", state.get());
assert_eq!(vec![10], block_on(rx.recv_batch(5)));
pool.run_until_stalled();
assert_eq!("sent 2", state.get());
assert_eq!(vec![20], block_on(rx.recv_batch(5)));
pool.run();
}
#[test]
fn recv_vec_unblocks_send() {
let mut pool = LocalPool::new();
let (tx, rx) = batch_channel::bounded(1);
let state = StateVar::new("");
pool.spawn({
let state = state.clone();
async move {
state.set("sending");
tx.send(10).await.unwrap();
state.set("sent 1");
tx.send(20).await.unwrap();
state.set("sent 2");
}
});
pool.run_until_stalled();
assert_eq!("sent 1", state.get());
let mut batch = Vec::new();
block_on(rx.recv_vec(5, &mut batch));
assert_eq!(vec![10], batch);
pool.run_until_stalled();
assert_eq!("sent 2", state.get());
block_on(rx.recv_vec(5, &mut batch));
assert_eq!(vec![20], batch);
pool.run();
}
#[test]
fn recv_batch_returning_all() {
let (tx, rx) = batch_channel::bounded(3);
block_on(async move {
tx.send_iter([10, 20, 30]).await.unwrap();
assert_eq!(vec![10, 20, 30], rx.recv_batch(100).await);
})
}
#[test]
fn send_batch_blocks_as_needed() {
let mut pool = LocalPool::new();
let (tx, rx) = batch_channel::bounded(1);
pool.spawn(async move {
tx.send_iter([1, 2, 3, 4]).await.unwrap();
});
pool.block_on(async move {
assert_eq!(Some(1), rx.recv().await);
assert_eq!(Some(2), rx.recv().await);
assert_eq!(Some(3), rx.recv().await);
assert_eq!(Some(4), rx.recv().await);
assert_eq!(None, rx.recv().await);
});
pool.run();
}
#[test]
fn autobatch_batches() {
let mut pool = LocalPool::new();
let state = AtomicVar::new("");
let (tx, rx) = batch_channel::bounded(1);
let inner = state.clone();
pool.spawn(async move {
tx.autobatch(2, async move |tx| {
inner.set("0");
tx.send(1).await?;
inner.set("1");
tx.send(2).await?;
inner.set("2");
tx.send(3).await?;
inner.set("3");
tx.send(4).await?;
inner.set("4");
Ok(())
})
.await
.unwrap()
});
pool.run_until_stalled();
assert_eq!("1", state.get());
assert_eq!(Some(1), pool.run_until(rx.recv()));
assert_eq!("1", state.get());
assert_eq!(Some(2), pool.run_until(rx.recv()));
assert_eq!("3", state.get());
assert_eq!(Some(3), pool.run_until(rx.recv()));
assert_eq!("3", state.get());
assert_eq!(Some(4), pool.run_until(rx.recv()));
assert_eq!("4", state.get());
assert_eq!(None, pool.run_until(rx.recv()));
}
#[test]
fn autobatch_or_cancel_stops_if_receiver_is_dropped() {
let mut pool = LocalPool::new();
let state = AtomicVar::new("");
let (tx, rx) = batch_channel::bounded(1);
let inner = state.clone();
pool.spawn(tx.autobatch_or_cancel(2, async move |tx| {
inner.set("0");
tx.send(1).await?;
inner.set("1");
tx.send(2).await?;
inner.set("2");
tx.send(3).await?;
inner.set("3");
tx.send(4).await?;
inner.set("4");
Ok(())
}));
pool.run_until_stalled();
assert_eq!("1", state.get());
assert_eq!(Some(1), pool.run_until(rx.recv()));
assert_eq!("1", state.get());
drop(rx);
pool.run_until_stalled();
assert_eq!("1", state.get());
}
#[test]
fn clone_bounded_sender() {
let mut pool = LocalPool::new();
let (tx1, rx) = batch_channel::bounded::<()>(1);
let tx2 = tx1.clone();
drop(tx1);
let tx3 = tx2.clone();
drop(tx2);
drop(tx3);
assert_eq!(None, pool.run_until(rx.recv()));
}
#[test]
fn send_empty_iter_immediately_returns() {
let mut pool = LocalPool::new();
let state = AtomicVar::new("");
let (tx, rx) = batch_channel::bounded::<()>(1);
pool.spawn({
let state = state.clone();
async move {
state.set("1");
tx.send_iter([]).await.unwrap();
state.set("2");
}
});
pool.spawn({
let state = state.clone();
async move {
assert_eq!("2", state.get());
assert_eq!(None, rx.recv().await);
state.set("3");
}
});
pool.run();
assert_eq!("3", state.get());
}
#[test]
fn send_empty_iter_immediately_returns_even_if_rx_is_dropped() {
let mut pool = LocalPool::new();
let (tx, rx) = batch_channel::bounded::<()>(1);
drop(rx);
pool.block_on(async move {
assert_eq!(Ok(()), tx.send_iter([]).await);
});
}
#[test]
fn send_iter_completes_if_there_is_just_enough_capacity() {
let mut pool = LocalPool::new();
let state = AtomicVar::new("");
let (tx, rx) = batch_channel::bounded(2);
pool.spawn({
let state = state.clone();
async move {
state.set("1");
tx.send_iter([1, 2]).await.unwrap();
state.set("2");
}
});
pool.spawn({
let state = state.clone();
async move {
assert_eq!("2", state.get());
assert_eq!(Some(1), rx.recv().await);
assert_eq!(Some(2), rx.recv().await);
state.set("3");
}
});
pool.run();
assert_eq!("3", state.get());
}
#[test]
fn send_iter_wakes_receivers_if_it_hits_capacity() {
let mut pool = LocalPool::new();
let reader_state = AtomicVar::new("");
let writer_state = AtomicVar::new("");
let (tx, rx) = batch_channel::bounded(2);
pool.spawn({
let reader_state = reader_state.clone();
async move {
reader_state.set("a");
assert_eq!(Some(1), rx.recv().await);
reader_state.set("b");
assert_eq!(Some(2), rx.recv().await);
reader_state.set("c");
assert_eq!(Some(3), rx.recv().await);
reader_state.set("d");
assert_eq!(None, rx.recv().await);
reader_state.set("e");
}
});
pool.spawn({
let writer_state = writer_state.clone();
async move {
writer_state.set("1");
tx.send_iter([1, 2, 3]).await.unwrap();
writer_state.set("2");
}
});
pool.run();
assert_eq!("e", reader_state.get());
assert_eq!("2", writer_state.get());
}
#[test]
fn sender_and_receiver_of_noncloneable_can_clone() {
struct NoClone;
let (tx, rx) = batch_channel::bounded::<NoClone>(1);
_ = tx.clone();
_ = rx.clone();
}
#[test]
fn recv_vec_blocking() {
const CAPACITY: usize = 3;
let (tx, rx) = batch_channel::bounded_sync(CAPACITY);
tx.send_iter([10, 20]).unwrap();
let mut vec = Vec::with_capacity(CAPACITY);
rx.recv_vec(CAPACITY, &mut vec);
assert_eq!(vec![10, 20], vec);
}