use std::{
future::Future,
pin::Pin,
task::Context,
time::{Duration, Instant},
};
use futures::{future::join_all, FutureExt};
use loole::{bounded, RecvError, SendError};
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
async fn async_sleep(ms: u64) {
tokio::time::sleep(Duration::from_millis(ms)).await
}
#[test]
fn ordered_deques() {
let (tx, rx) = bounded(0);
let mut send_future_1 = tx.send_async(1);
let mut send_future_2 = tx.send_async(2);
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let cx = &mut cx;
assert!(Pin::new(&mut send_future_1).poll(cx).is_pending());
assert!(Pin::new(&mut send_future_2).poll(cx).is_pending());
drop(rx);
assert_eq!(
Pin::new(&mut send_future_2).poll(cx),
std::task::Poll::Ready(Err(SendError(2)))
);
assert_eq!(
Pin::new(&mut send_future_1).poll(cx),
std::task::Poll::Ready(Err(SendError(1)))
);
}
#[tokio::test]
async fn async_send_before_recv_buffer_0() {
let (tx, rx) = bounded(0);
tokio::spawn(tx.send_async(1));
async_sleep(100).await;
assert_eq!(rx.recv_async().await, Ok(1));
}
#[tokio::test]
async fn async_recv_before_send_buffer_0() {
let (tx, rx) = bounded(0);
let h = tokio::spawn(rx.recv_async());
async_sleep(100).await;
let f = h.map(|x| x.unwrap());
assert_eq!(tx.send_async(1).await, Ok(()));
assert_eq!(f.await, Ok(1));
}
#[tokio::test]
async fn async_close_before_send_buffer_0() {
let (tx, rx) = bounded::<()>(0);
drop(rx);
assert_eq!(tx.send_async(()).await, Err(SendError(())));
}
#[tokio::test]
async fn async_send_before_recv_buffer_1() {
let (tx, rx) = bounded(1);
assert_eq!(tx.send_async(1).await, Ok(()));
assert_eq!(rx.recv_async().await, Ok(1));
}
#[tokio::test]
async fn async_recv_before_send_buffer_1() {
let (tx, rx) = bounded(1);
let h = tokio::spawn(rx.recv_async());
async_sleep(100).await;
assert_eq!(tx.send_async(1).await, Ok(()));
assert_eq!(h.await.unwrap(), Ok(1));
}
#[tokio::test]
async fn async_recv_after_manually_closed_sender() {
let (tx, rx) = bounded(1);
assert_eq!(tx.send_async(1).await, Ok(()));
assert!(tx.close());
assert_eq!(rx.recv_async().await, Ok(1));
assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}
#[tokio::test]
async fn async_recv_after_manually_closeed_receiver() {
let (tx, rx) = bounded(1);
assert_eq!(tx.send_async(1).await, Ok(()));
assert!(rx.close());
assert_eq!(rx.recv_async().await, Ok(1));
assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}
#[tokio::test]
async fn async_2_sends_before_2_recvs_buffer_1() {
let (tx, rx) = bounded(1);
assert_eq!(tx.capacity(), Some(1));
assert_eq!(tx.len(), 0);
tx.send_async(1).await.unwrap();
assert_eq!(tx.len(), 1);
tokio::spawn(tx.send_async(2));
async_sleep(100).await;
assert_eq!(rx.len(), 1);
let r1r = rx.recv_async().await;
assert_eq!(r1r, Ok(1));
assert_eq!(rx.len(), 1);
let r2r = rx.recv_async().await;
assert_eq!(r2r, Ok(2));
assert_eq!(rx.len(), 0);
}
#[tokio::test]
async fn async_close_before_recv_buffer_0() {
let (tx, rx) = bounded::<()>(0);
drop(tx);
assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}
#[tokio::test]
async fn async_close_before_recv_buffer_1() {
let (tx, rx) = bounded(1);
assert_eq!(tx.send_async(1).await, Ok(()));
drop(tx);
assert_eq!(rx.recv_async().await, Ok(1));
assert_eq!(rx.recv_async().await, Err(RecvError::Disconnected));
}
#[tokio::test]
async fn async_concurrent_writes_and_reads_buffer_0() {
let (tx, rx) = bounded(0);
let _sends = tokio::spawn(join_all([
tx.send_async(1),
tx.send_async(2),
tx.send_async(3),
]));
let recvs = tokio::spawn(join_all([
rx.recv_async(),
rx.recv_async(),
rx.recv_async(),
]))
.await
.unwrap();
assert_eq!(recvs, vec![Ok(1), Ok(2), Ok(3)]);
}
#[tokio::test]
async fn async_shift_pending_send_buffer_0() {
let (tx, rx) = bounded(0);
let tx_clone = tx.clone();
let h1 = tokio::spawn(async move {
let start = Instant::now();
assert_eq!(tx.send_async(1).await, Ok(()));
start.elapsed()
});
let h2 = tokio::spawn(async move {
let start = Instant::now();
async_sleep(100).await;
assert_eq!(tx_clone.send_async(2).await, Ok(()));
start.elapsed()
});
tokio::spawn(async move {
async_sleep(1000).await;
assert_eq!(rx.recv(), Ok(1));
async_sleep(1000).await;
assert_eq!(rx.recv(), Ok(2));
});
let elapsed1 = h1.await.unwrap();
assert!(elapsed1 >= ms(900));
assert!(elapsed1 < ms(1100));
let elapsed2 = h2.await.unwrap();
assert!(elapsed2 >= ms(1900));
assert!(elapsed2 < ms(2100));
}
#[tokio::test]
async fn async_shift_pending_send_buffer_2() {
let (tx, rx) = bounded(2);
assert_eq!(tx.send_async(1).await, Ok(()));
assert_eq!(tx.send_async(2).await, Ok(()));
let h = tokio::spawn(async move {
let start = Instant::now();
assert_eq!(tx.send_async(3).await, Ok(()));
start.elapsed()
});
tokio::spawn(async move {
async_sleep(1000).await;
assert_eq!(rx.recv_async().await, Ok(1));
})
.await
.unwrap();
let elapsed = h.await.unwrap();
assert!(elapsed >= ms(900));
assert!(elapsed < ms(1100));
}