use barrage::{Disconnected, SendError};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::time::Duration;
use std::sync::Arc;
use std::thread;
use tokio_test::assert_ok;
use std::sync::atomic::{AtomicBool, Ordering};
struct PollOnce<'a, F: Future + Unpin>(&'a mut F);
impl<'a, F: Future + Unpin> Future for PollOnce<'a, F> {
type Output = Poll<F::Output>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(Pin::new(&mut self.0).poll(cx))
}
}
#[tokio::test]
async fn one_message_unbounded() {
let (tx, rx) = barrage::new(None);
let rx2 = rx.clone();
tx.send_async("Hello!").await.unwrap();
assert_eq!(rx.recv_async().await, Ok("Hello!"));
assert_eq!(rx2.recv_async().await, Ok("Hello!"));
}
#[test]
fn shared_recv() {
let (tx, rx) = barrage::new(None);
let mut fut1 = rx.recv_async();
let mut fut2 = rx.recv_async();
let pin1 = Pin::new(&mut fut1);
let pin2 = Pin::new(&mut fut2);
let woken1 = Arc::new(AtomicBool::new(false));
let woken2 = Arc::new(AtomicBool::new(false));
let woken1_clone = woken1.clone();
let woken2_clone = woken2.clone();
let waker = waker_fn::waker_fn(move || woken1_clone.store(true, Ordering::SeqCst));
let waker2 = waker_fn::waker_fn(move || woken2_clone.store(true, Ordering::SeqCst));
assert!(pin1.poll(&mut Context::from_waker(&waker)).is_pending());
assert!(pin2.poll(&mut Context::from_waker(&waker2)).is_pending());
tx.send("Hello!").unwrap();
assert!(woken1.load(Ordering::SeqCst));
assert!(woken2.load(Ordering::SeqCst));
let pin1 = Pin::new(&mut fut1);
let pin2 = Pin::new(&mut fut2);
assert!(pin1.poll(&mut Context::from_waker(&waker)).is_ready());
assert!(pin2.poll(&mut Context::from_waker(&waker2)).is_pending());
}
#[tokio::test]
async fn sync_receive_from_wait_async_send() {
let (tx, rx) = barrage::new(None);
let handle = tokio::task::spawn_blocking(move || {
assert_eq!("Hello!", rx.recv().unwrap());
});
tokio::time::delay_for(Duration::from_millis(500)).await;
tx.send_async("Hello!").await.unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn sync_receive_from_wait_try_send() {
let (tx, rx) = barrage::new(None);
let handle = tokio::task::spawn_blocking(move || {
assert_eq!("Hello!", rx.recv().unwrap());
});
tokio::time::delay_for(Duration::from_millis(500)).await;
tx.try_send("Hello!").unwrap();
handle.await.unwrap();
}
#[tokio::test]
async fn sync_receive() {
let (tx, rx) = barrage::new(None);
tx.send_async("Hello!").await.unwrap();
assert_eq!("Hello!", rx.recv().unwrap());
}
#[tokio::test]
async fn new_recv_after_send() {
let (tx, rx) = barrage::new(None);
tx.send_async("Hello!").await.unwrap();
let rx2 = rx.clone();
tx.send_async("Hello 2!").await.unwrap();
assert_eq!(rx.recv_async().await, Ok("Hello!"));
assert_eq!(rx2.recv_async().await, Ok("Hello 2!"));
}
#[tokio::test]
async fn tx_drop_disconnect() {
let (tx, rx) = barrage::new(None);
tx.send_async("Hello!").await.unwrap();
drop(tx);
let rx2 = rx.clone();
assert_eq!(rx2.recv_async().await, Err(Disconnected));
assert_eq!(rx2.recv_async().await, Err(Disconnected));
assert_eq!(rx.recv_async().await, Ok("Hello!"));
assert_eq!(rx.recv_async().await, Err(Disconnected));
assert_eq!(rx.recv_async().await, Err(Disconnected));
}
#[tokio::test]
async fn rx_drop_disconnect() {
let (tx, rx) = barrage::new(None);
let _tx2 = tx.clone();
tx.send_async("Hello!").await.unwrap();
drop(rx);
assert_eq!(tx.send_async("Hello!").await, Err(SendError("Hello!")));
assert_eq!(tx.send_async("Hello!").await, Err(SendError("Hello!")));
}
#[tokio::test]
async fn bounded_wait_resume() {
let (tx, rx) = barrage::new(Some(1));
tx.send_async("Hello!").await.unwrap();
let mut fut = tx.send_async("Hello!");
assert_eq!(PollOnce(&mut fut).await, Poll::Pending);
assert_eq!(rx.recv_async().await, Ok("Hello!"));
assert_eq!(PollOnce(&mut fut).await, Poll::Ready(Ok(())));
}
#[tokio::test]
async fn try_send_bounded_wait_resume() {
let (tx, rx) = barrage::new(Some(1));
tx.try_send("Hello!").unwrap();
let mut fut = tx.send_async("Hello!");
assert_eq!(PollOnce(&mut fut).await, Poll::Pending);
assert_eq!(rx.recv_async().await, Ok("Hello!"));
assert_eq!(PollOnce(&mut fut).await, Poll::Ready(Ok(())));
}
#[tokio::test]
async fn sync_send_bounded_wait_resume() {
let (tx, rx) = barrage::new(Some(1));
tx.send("Hello!").unwrap();
let mut fut = tx.send_async("Hello!");
assert_eq!(PollOnce(&mut fut).await, Poll::Pending);
assert_eq!(rx.recv_async().await, Ok("Hello!"));
assert_eq!(PollOnce(&mut fut).await, Poll::Ready(Ok(())));
}
#[tokio::test(max_threads = 4)]
async fn no_reorder() {
let (tx, rx) = barrage::new(Some(1000));
let mut handles = Vec::new();
for _ in 0..4 {
let rx = rx.clone();
let mut cur = 0;
let task = tokio::spawn(async move {
while let Ok(n) = rx.recv_async().await {
assert_eq!(n, cur);
cur += 1;
}
});
handles.push(task);
}
let task = tokio::spawn(async move {
for i in 0..10_000usize {
tx.send_async(i).await.unwrap();
}
});
handles.push(task);
drop(rx);
for task in handles {
task.await.unwrap();
}
}
#[test]
fn broadcast_send_threaded() {
let (tx1, rx) = barrage::bounded(2);
let tx1 = Arc::new(tx1);
let tx2 = tx1.clone();
let th1 = thread::spawn(move || {
assert_ok!(tx1.send("one"));
assert_ok!(tx1.send("two"));
assert_ok!(tx1.send("three"));
});
let th2 = thread::spawn(move || {
tokio_test::block_on(async {
assert_ok!(tx2.send_async("inye").await);
assert_ok!(tx2.send_async("zimbini").await);
assert_ok!(tx2.send_async("zintathu").await);
});
});
tokio_test::block_on(async {
let mut num: usize = 0;
loop {
match rx.recv_async().await {
Ok(_) => num += 1,
Err(_) => break,
}
}
assert_eq!(num, 6);
});
assert_ok!(th1.join());
assert_ok!(th2.join());
}
#[test]
fn drop_rx() {
let (tx, rx1) = barrage::bounded(16);
let rx2 = rx1.clone();
let th1 = thread::spawn(move || {
tokio_test::block_on(async {
let v = assert_ok!(rx1.recv_async().await);
assert_eq!(v, "one");
let v = assert_ok!(rx1.recv_async().await);
assert_eq!(v, "two");
let v = assert_ok!(rx1.recv_async().await);
assert_eq!(v, "three");
assert!(rx1.recv_async().await.is_err());
});
});
let th2 = thread::spawn(move || {
drop(rx2);
});
assert_ok!(tx.send("one"));
assert_ok!(tx.send("two"));
assert_ok!(tx.send("three"));
drop(tx);
assert_ok!(th1.join());
assert_ok!(th2.join());
}
#[test]
fn shared_receiver_receives_once() {
let (tx, rx) = barrage::unbounded();
let shared1 = rx.clone().into_shared();
let shared2 = shared1.clone();
tx.try_send("Hello!").unwrap();
assert_eq!(shared1.try_recv(), Ok(Some("Hello!")));
assert_eq!(shared2.try_recv(), Ok(None));
assert_eq!(rx.try_recv(), Ok(Some("Hello!")));
}
#[test]
fn shared_receiver_same_mailbox() {
let (_, rx) = barrage::unbounded::<()>();
let shared_a_1 = rx.clone().into_shared();
let shared_a_2 = shared_a_1.clone();
let shared_b = rx.into_shared();
assert!(shared_a_1.same_mailbox(&shared_a_2));
assert!(!shared_a_1.same_mailbox(&shared_b));
}
#[test]
fn shared_receiver_drop() {
let (tx, rx) = barrage::unbounded();
let shared1 = rx.into_shared();
let shared2 = shared1.clone();
tx.try_send("Hello!").unwrap();
assert_eq!(shared1.try_recv(), Ok(Some("Hello!")));
assert_eq!(shared2.try_recv(), Ok(None));
drop(shared2);
tx.try_send("Hello2!").unwrap();
assert_eq!(shared1.try_recv(), Ok(Some("Hello2!")));
drop(shared1);
assert!(tx.try_send("Hello3!").is_err());
}
#[test]
fn shared_receiver_upgrade() {
let (tx, rx) = barrage::unbounded();
let shared1 = rx.into_shared();
let rx2 = shared1.clone().upgrade();
tx.try_send("Hello!").unwrap();
assert_eq!(shared1.try_recv(), Ok(Some("Hello!")));
assert_eq!(rx2.try_recv(), Ok(Some("Hello!")));
}