mod common;
use common::*;
use fibre::error::{RecvError, SendError, TrySendError, TryRecvError};
use fibre::mpmc as mpmc;
use std::collections::HashSet;
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::thread;
fn run_sync_mpmc_test(num_producers: usize, num_consumers: usize, items_per_producer: usize, channel_capacity: usize) {
let (tx, rx) = mpmc::channel(channel_capacity);
let total_items_expected = num_producers * items_per_producer;
let received_items_set = Arc::new(std::sync::Mutex::new(HashSet::new()));
let received_count = Arc::new(AtomicUsize::new(0));
let mut consumer_handles = Vec::new();
for _ in 0..num_consumers {
let rx_clone = rx.clone();
let received_set_clone = Arc::clone(&received_items_set);
let received_count_clone = Arc::clone(&received_count);
consumer_handles.push(thread::spawn(move || {
let mut local_count = 0;
while let Ok(item) = rx_clone.recv() {
assert!(
received_set_clone.lock().unwrap().insert(item),
"Duplicate item received!"
);
received_count_clone.fetch_add(1, AtomicOrdering::Relaxed);
local_count += 1;
}
local_count
}));
}
drop(rx);
let mut producer_handles = Vec::new();
for p_id in 0..num_producers {
let tx_clone = tx.clone();
producer_handles.push(thread::spawn(move || {
for i in 0..items_per_producer {
let item = p_id * items_per_producer + i;
tx_clone.send(item).unwrap();
}
}));
}
drop(tx);
for handle in producer_handles {
handle.join().expect("Producer thread panicked");
}
for handle in consumer_handles {
handle.join().expect("Consumer thread panicked");
}
assert_eq!(received_count.load(AtomicOrdering::Relaxed), total_items_expected);
assert_eq!(received_items_set.lock().unwrap().len(), total_items_expected);
}
#[test]
fn sync_v2_1p_1c_basic() {
run_sync_mpmc_test(1, 1, ITEMS_HIGH, 16);
}
#[test]
fn sync_v2_mp_1c_basic() {
run_sync_mpmc_test(4, 1, ITEMS_MEDIUM, 16);
}
#[test]
fn sync_v2_1p_mc_basic() {
run_sync_mpmc_test(1, 4, ITEMS_HIGH, 16);
}
#[test]
fn sync_v2_mp_mc_contention() {
run_sync_mpmc_test(4, 4, ITEMS_HIGH, 4); }
#[test]
fn sync_v2_unbounded_channel() {
let (tx, rx) = mpmc::unbounded();
let num_items = 5000;
let producer = thread::spawn(move || {
for i in 0..num_items {
tx.send(i).unwrap();
}
});
let consumer = thread::spawn(move || {
for i in 0..num_items {
assert_eq!(rx.recv().unwrap(), i);
}
});
producer.join().unwrap();
consumer.join().unwrap();
}
#[test]
fn sync_v2_rendezvous_channel() {
run_sync_mpmc_test(2, 2, ITEMS_MEDIUM, 0);
}
#[test]
fn sync_v2_drop_producer_signals_disconnect() {
let (tx, rx) = mpmc::channel::<i32>(5);
let tx2 = tx.clone();
tx.send(1).unwrap();
drop(tx);
tx2.send(2).unwrap();
drop(tx2);
assert_eq!(rx.recv().unwrap(), 1);
assert_eq!(rx.recv().unwrap(), 2);
assert_eq!(rx.recv(), Err(RecvError::Disconnected));
}
#[test]
fn sync_v2_drop_receiver_signals_closed() {
let (tx, rx) = mpmc::channel::<i32>(5);
let rx2 = rx.clone();
drop(rx);
drop(rx2);
assert_eq!(tx.send(1), Err(SendError::Closed));
}
#[test]
fn sync_v2_try_send_full_and_try_recv_empty() {
let (tx, rx) = mpmc::channel(1);
tx.send(100).unwrap();
match tx.try_send(200) {
Err(TrySendError::Full(val)) => assert_eq!(val, 200),
_ => panic!("Expected channel to be full"),
}
assert_eq!(rx.recv().unwrap(), 100);
match rx.try_recv() {
Err(TryRecvError::Empty) => {} _ => panic!("Expected channel to be empty"),
}
}