mod common;
use common::*;
use fibre::mpsc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
#[test]
fn mpsc_sync_spsc_smoke() {
let (tx, rx) = mpsc::unbounded_v1();
tx.send(10).unwrap();
assert_eq!(rx.recv().unwrap(), 10);
}
#[test]
fn mpsc_sync_try_send() {
let (tx, rx) = mpsc::unbounded_v1::<i32>();
assert_eq!(tx.try_send(10), Ok(()));
assert_eq!(rx.recv().unwrap(), 10);
drop(rx);
match tx.try_send(20) {
Err(fibre::error::TrySendError::Closed(val)) => assert_eq!(val, 20),
other => panic!("Expected TrySendError::Closed, got {:?}", other),
}
let (tx_async, rx_async) = mpsc::unbounded_v1_async::<i32>();
assert_eq!(tx_async.try_send(30), Ok(()));
let rx_async_recv = rx_async.to_sync(); assert_eq!(rx_async_recv.recv().unwrap(), 30);
drop(rx_async_recv);
match tx_async.try_send(40) {
Err(fibre::error::TrySendError::Closed(val)) => assert_eq!(val, 40),
other => panic!("Expected TrySendError::Closed, got {:?}", other),
}
}
#[test]
fn mpsc_sync_try_recv() {
let (tx, rx) = mpsc::unbounded_v1::<i32>();
assert_eq!(rx.try_recv(), Err(fibre::error::TryRecvError::Empty));
tx.send(1).unwrap();
assert_eq!(rx.try_recv(), Ok(1));
assert_eq!(rx.try_recv(), Err(fibre::error::TryRecvError::Empty));
}
#[test]
fn mpsc_sync_recv_blocks() {
let (tx, rx) = mpsc::unbounded_v1();
let handle = thread::spawn(move || {
thread::sleep(SHORT_TIMEOUT);
tx.send("hello").unwrap();
});
assert_eq!(rx.recv().unwrap(), "hello");
handle.join().unwrap();
}
#[test]
fn mpsc_sync_all_producers_drop_signals_disconnect() {
let (tx, rx) = mpsc::unbounded_v1::<()>();
let tx2 = tx.clone();
drop(tx);
drop(tx2);
assert_eq!(rx.recv(), Err(fibre::error::RecvError::Disconnected));
}
#[test]
fn mpsc_sync_consumer_drop_cleans_up() {
let drop_count = Arc::new(AtomicUsize::new(0));
struct DropCounter(Arc<AtomicUsize>);
impl Drop for DropCounter {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let (tx, rx) = mpsc::unbounded_v1();
tx.send(DropCounter(drop_count.clone())).unwrap();
tx.send(DropCounter(drop_count.clone())).unwrap();
drop(rx);
drop(tx);
assert_eq!(drop_count.load(Ordering::SeqCst), 2);
}
#[test]
fn mpsc_sync_multi_producer_stress() {
let (tx, rx) = mpsc::unbounded_v1();
let num_producers = 8;
let items_per_producer = ITEMS_HIGH;
let total_items = num_producers * items_per_producer;
let sum = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..num_producers {
let tx_clone = tx.clone();
handles.push(thread::spawn(move || {
for i in 1..=items_per_producer {
tx_clone.send(i).unwrap();
}
}));
}
drop(tx);
let sum_clone = Arc::clone(&sum);
let consumer_handle = thread::spawn(move || {
for _ in 0..total_items {
sum_clone.fetch_add(rx.recv().unwrap(), Ordering::Relaxed);
}
});
for handle in handles {
handle.join().unwrap();
}
consumer_handle.join().unwrap();
let expected_sum = num_producers * (items_per_producer * (items_per_producer + 1) / 2);
assert_eq!(sum.load(Ordering::Relaxed), expected_sum);
}
#[test]
fn mpsc_async_producer_to_sync_consumer() {
let (tx_async, rx_async) = mpsc::unbounded_v1_async();
let rx_sync = rx_async.to_sync();
let producer_handle = thread::spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.build()
.unwrap();
rt.block_on(async move {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
tx_async.send(456).await.unwrap();
});
});
assert_eq!(rx_sync.recv().unwrap(), 456);
producer_handle.join().unwrap();
}