#![cfg(feature = "shuttle")]
use kovan_queue::array_queue::ArrayQueue;
use std::sync::Arc;
use std::sync::atomic::{AtomicIsize, Ordering};
const CAPACITY: usize = 4;
const ITEMS_PER_PRODUCER: u64 = 6;
const PRODUCERS: u64 = 2;
const CONSUMERS: u64 = 2;
fn mpmc_no_loss_no_duplication() {
let queue = Arc::new(ArrayQueue::<u64>::new(CAPACITY));
let producers: Vec<_> = (0..PRODUCERS)
.map(|p| {
let queue = Arc::clone(&queue);
shuttle::thread::spawn(move || {
for i in 0..ITEMS_PER_PRODUCER {
let mut value = p * ITEMS_PER_PRODUCER + i;
while let Err(rejected) = queue.push(value) {
value = rejected;
shuttle::hint::spin_loop();
}
}
})
})
.collect();
let remaining = Arc::new(AtomicIsize::new((PRODUCERS * ITEMS_PER_PRODUCER) as isize));
let consumers: Vec<_> = (0..CONSUMERS)
.map(|_| {
let queue = Arc::clone(&queue);
let remaining = Arc::clone(&remaining);
shuttle::thread::spawn(move || {
let mut popped = Vec::new();
loop {
if remaining.fetch_sub(1, Ordering::AcqRel) <= 0 {
remaining.fetch_add(1, Ordering::AcqRel); break;
}
loop {
if let Some(v) = queue.pop() {
popped.push(v);
break;
}
shuttle::hint::spin_loop();
}
}
popped
})
})
.collect();
for p in producers {
p.join().unwrap();
}
let mut all_popped: Vec<u64> = consumers
.into_iter()
.flat_map(|c| c.join().unwrap())
.collect();
all_popped.sort_unstable();
let mut expected: Vec<u64> = (0..PRODUCERS * ITEMS_PER_PRODUCER).collect();
expected.sort_unstable();
assert_eq!(
all_popped, expected,
"MPMC push/pop lost or duplicated an item"
);
}
#[test]
fn shuttle_array_queue_mpmc_no_loss_no_duplication() {
shuttle::check_pct(mpmc_no_loss_no_duplication, 5000, 5);
}