use tokio::{task, time};
use testtools::task::expect_runtime;
use recstrm::*;
enum MyErr {
Something
}
#[tokio::test]
async fn send_after_recv() {
let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();
let jh = task::spawn(async move {
time::sleep(time::Duration::from_millis(500)).await;
tx.send_async("hello").await.unwrap();
});
if let Some(node) = rx.recv_async().await.unwrap() {
assert_eq!(node, "hello");
}
jh.await.unwrap();
}
#[tokio::test]
async fn send_before_recv() {
let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();
let jh = task::spawn(async move {
tx.send_async("hello").await.unwrap();
});
time::sleep(time::Duration::from_millis(500)).await;
if let Some(node) = rx.recv_async().await.unwrap() {
assert_eq!(node, "hello");
}
jh.await.unwrap();
}
#[tokio::test]
async fn eof_after_sender_drop() {
let (tx, rx) = Builder::new().queue_size(8).build::<(), ()>();
drop(tx);
let Ok(None) = rx.recv_async().await else {
panic!("Unexpectedly not Ok(None)");
};
}
#[tokio::test]
async fn eof_with_queued_records() {
let (tx, rx) = Builder::new().queue_size(8).build::<_, ()>();
tx.send_async(42).await.unwrap();
drop(tx);
let Ok(Some(v)) = rx.recv_async().await else {
panic!("Unexpectedly not Ok(Some(42))");
};
assert_eq!(v, 42);
let Ok(None) = rx.recv_async().await else {
panic!("Unexpectedly not Ok(None)");
};
}
#[tokio::test]
async fn try_send_full_queue() {
let (tx, _rx) = Builder::new().queue_size(1).build::<_, ()>();
tx.send_async(42).await.unwrap();
let Err(Error::QueueFull) = tx.try_send(1147) else {
panic!("Unexpectedly not Err(Error::QueueFull)");
};
}
#[tokio::test]
async fn order() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
tx.send_async(0).await.unwrap();
tx.send_async(1).await.unwrap();
tx.send_async(2).await.unwrap();
tx.send_async(3).await.unwrap();
assert_eq!(rx.recv_async().await.unwrap(), Some(0));
assert_eq!(rx.recv_async().await.unwrap(), Some(1));
assert_eq!(rx.recv_async().await.unwrap(), Some(2));
assert_eq!(rx.recv_async().await.unwrap(), Some(3));
}
#[tokio::test]
async fn batch_order() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch_async(ints.into_iter()).await.unwrap();
assert_eq!(rx.recv_async().await.unwrap(), Some(0));
assert_eq!(rx.recv_async().await.unwrap(), Some(1));
assert_eq!(rx.recv_async().await.unwrap(), Some(2));
assert_eq!(rx.recv_async().await.unwrap(), Some(3));
}
#[tokio::test]
async fn recv_all() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch_async(ints.into_iter()).await.unwrap();
let res = rx.recv_all_async().await.unwrap().unwrap();
assert_eq!(res, [0, 1, 2, 3]);
}
#[tokio::test]
async fn recv_atmost() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
let ints = [0, 1, 2, 3];
tx.send_batch_async(ints.into_iter()).await.unwrap();
let res = rx.recv_atmost_async(2).await.unwrap().unwrap();
assert_eq!(res, [0, 1]);
let res = rx.recv_atmost_async(2).await.unwrap().unwrap();
assert_eq!(res, [2, 3]);
}
#[tokio::test]
async fn sender_blocking() {
const GRACE_MS: u64 = 250;
let (tx, rx) = Builder::new().queue_size(2).build::<_, ()>();
let jh = task::spawn(async {
let ints = [0, 1];
tx.send_batch_async(ints.into_iter()).await.unwrap();
expect_runtime(time::Duration::from_millis(GRACE_MS), async move {
tx.send_async(2).await.unwrap();
})
.await;
});
time::sleep(time::Duration::from_millis(GRACE_MS)).await;
let res = rx.recv_atmost_async(2).await.unwrap().unwrap();
assert_eq!(res, [0, 1]);
let res = rx.recv_async().await.unwrap();
assert_eq!(res, Some(2));
jh.await.unwrap();
}
#[tokio::test]
async fn receiver_disappeared() {
let (tx, rx) = Builder::new().queue_size(4).build::<_, ()>();
drop(rx);
let Err(Error::ReceiverDisappeared) = tx.send_async("hello").await else {
panic!("Result unexpectedly not Err(Error::ReceiverDisappeared)");
};
}
#[tokio::test]
async fn fail_sender() {
let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();
tx.fail(MyErr::Something);
let Err(Error::App(MyErr::Something)) = rx.recv_async().await else {
panic!("Unexpected return value");
};
}
#[tokio::test]
async fn fail_receiver() {
let (tx, rx) = Builder::new().queue_size(4).build::<&str, MyErr>();
rx.fail(MyErr::Something);
let Err(Error::App(MyErr::Something)) = tx.send_async("hello").await else {
panic!("Unexpected return value");
};
}