use kovan_queue::array_queue::ArrayQueue;
use std::sync::Arc;
use std::thread;
#[test]
fn test_simple_push_pop() {
let q = ArrayQueue::new(2);
assert!(q.is_empty());
assert!(!q.is_full());
assert!(q.push(1).is_ok());
assert!(!q.is_empty());
assert!(!q.is_full());
assert!(q.push(2).is_ok());
assert!(!q.is_empty());
assert!(q.is_full());
assert!(q.push(3).is_err());
assert_eq!(q.pop(), Some(1));
assert!(!q.is_full());
assert_eq!(q.pop(), Some(2));
assert!(q.is_empty());
assert_eq!(q.pop(), None);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_concurrent_push_pop() {
let q = Arc::new(ArrayQueue::new(100));
let mut handles = vec![];
for i in 0..4 {
let q = q.clone();
handles.push(thread::spawn(move || {
for j in 0..100 {
while q.push(i * 100 + j).is_err() {
thread::yield_now();
}
}
}));
}
for _ in 0..4 {
let q = q.clone();
handles.push(thread::spawn(move || {
for _ in 0..100 {
while q.pop().is_none() {
thread::yield_now();
}
}
}));
}
for h in handles {
h.join().unwrap();
}
assert!(q.is_empty());
}
#[test]
fn test_capacity_one_rejects_push_past_capacity() {
let q = ArrayQueue::new(1);
assert_eq!(q.capacity(), 1);
assert!(q.is_empty());
assert!(!q.is_full());
assert!(q.push(10).is_ok());
assert!(q.is_full());
assert_eq!(q.push(20), Err(20));
assert_eq!(q.pop(), Some(10));
assert!(q.is_empty());
assert_eq!(q.pop(), None);
}
#[test]
fn test_capacity_zero_rounds_to_one_and_works() {
let q = ArrayQueue::new(0);
assert_eq!(q.capacity(), 1);
assert!(q.push(1).is_ok());
assert_eq!(q.push(2), Err(2));
assert_eq!(q.pop(), Some(1));
assert_eq!(q.pop(), None);
}
#[test]
fn test_capacity_one_multi_lap_cycles() {
let q = ArrayQueue::new(1);
for i in 0..1000u64 {
assert!(q.push(i).is_ok());
assert_eq!(q.push(u64::MAX - i), Err(u64::MAX - i));
assert!(q.is_full());
assert_eq!(q.pop(), Some(i));
assert_eq!(q.pop(), None);
}
}
#[test]
fn test_capacity_one_drop_with_resident_value_terminates() {
use std::sync::atomic::{AtomicUsize, Ordering};
static DROPS: AtomicUsize = AtomicUsize::new(0);
struct D;
impl Drop for D {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::Relaxed);
}
}
{
let q = ArrayQueue::new(1);
assert!(q.push(D).is_ok());
assert!(q.push(D).is_err()); } assert_eq!(DROPS.load(Ordering::Relaxed), 2);
}
#[test]
#[cfg_attr(miri, ignore)]
fn test_capacity_one_concurrent_terminates_with_exact_accounting() {
const PRODUCERS: u64 = 4;
const PER_PRODUCER: u64 = 1000;
let q = Arc::new(ArrayQueue::new(1));
let mut handles = vec![];
for p in 0..PRODUCERS {
let q = q.clone();
handles.push(thread::spawn(move || {
for j in 0..PER_PRODUCER {
let mut v = p * PER_PRODUCER + j;
loop {
match q.push(v) {
Ok(()) => break,
Err(rejected) => {
v = rejected;
thread::yield_now();
}
}
}
}
}));
}
let popped_sum = Arc::new(std::sync::atomic::AtomicU64::new(0));
let popped_count = Arc::new(std::sync::atomic::AtomicU64::new(0));
for _ in 0..2 {
let q = q.clone();
let sum = popped_sum.clone();
let count = popped_count.clone();
handles.push(thread::spawn(move || {
use std::sync::atomic::Ordering;
loop {
if count.load(Ordering::Acquire) >= PRODUCERS * PER_PRODUCER {
break;
}
if let Some(v) = q.pop() {
sum.fetch_add(v, Ordering::Relaxed);
count.fetch_add(1, Ordering::AcqRel);
} else {
thread::yield_now();
}
}
}));
}
for h in handles {
h.join().unwrap();
}
let n = PRODUCERS * PER_PRODUCER;
assert_eq!(popped_count.load(std::sync::atomic::Ordering::Acquire), n);
assert_eq!(
popped_sum.load(std::sync::atomic::Ordering::Relaxed),
n * (n - 1) / 2
);
assert!(q.is_empty());
}