#[cfg(loom)]
mod tests {
use loom::sync::atomic::{AtomicUsize, Ordering};
use loom::sync::{Arc, Condvar, Mutex};
use loom::thread;
struct BoundedQueue<T> {
inner: Mutex<BoundedQueueInner<T>>,
not_full: Condvar,
not_empty: Condvar,
}
struct BoundedQueueInner<T> {
buf: Vec<T>,
capacity: usize,
closed: bool,
}
impl<T> BoundedQueue<T> {
fn new(capacity: usize) -> Self {
Self {
inner: Mutex::new(BoundedQueueInner {
buf: Vec::new(),
capacity,
closed: false,
}),
not_full: Condvar::new(),
not_empty: Condvar::new(),
}
}
fn push(&self, val: T) -> bool {
let mut inner = self.inner.lock().unwrap();
while inner.buf.len() >= inner.capacity && !inner.closed {
inner = self.not_full.wait(inner).unwrap();
}
if inner.closed {
return false;
}
inner.buf.push(val);
self.not_empty.notify_one();
true
}
fn pop(&self) -> Option<T> {
let mut inner = self.inner.lock().unwrap();
loop {
if let Some(val) = inner.buf.pop() {
self.not_full.notify_one();
return Some(val);
}
if inner.closed {
return None;
}
inner = self.not_empty.wait(inner).unwrap();
}
}
fn close(&self) {
let mut inner = self.inner.lock().unwrap();
inner.closed = true;
self.not_full.notify_all();
self.not_empty.notify_all();
}
}
#[test]
fn loom_tile_queue_no_lost_items() {
loom::model(|| {
let queue = Arc::new(BoundedQueue::new(2));
let received = Arc::new(AtomicUsize::new(0));
let q1 = Arc::clone(&queue);
let t1 = thread::spawn(move || {
q1.push(1);
q1.push(2);
});
let q2 = Arc::clone(&queue);
let t2 = thread::spawn(move || {
q2.push(3);
});
let q3 = Arc::clone(&queue);
let r = Arc::clone(&received);
let t3 = thread::spawn(move || {
let mut count = 0;
while let Some(_val) = q3.pop() {
count += 1;
r.fetch_add(1, Ordering::Relaxed);
}
count
});
t1.join().unwrap();
t2.join().unwrap();
queue.close();
let popped = t3.join().unwrap();
assert_eq!(popped + received.load(Ordering::Relaxed) - popped, 3);
assert_eq!(received.load(Ordering::Relaxed), 3);
});
}
#[test]
fn loom_level_barrier() {
loom::model(|| {
let completed = Arc::new(AtomicUsize::new(0));
let total_workers = 2;
let barrier_done = Arc::new((Mutex::new(false), Condvar::new()));
let c1 = Arc::clone(&completed);
let b1 = Arc::clone(&barrier_done);
let w1 = thread::spawn(move || {
let prev = c1.fetch_add(1, Ordering::Release);
if prev + 1 == total_workers {
let (lock, cvar) = &*b1;
let mut done = lock.lock().unwrap();
*done = true;
cvar.notify_all();
}
});
let c2 = Arc::clone(&completed);
let b2 = Arc::clone(&barrier_done);
let w2 = thread::spawn(move || {
let prev = c2.fetch_add(1, Ordering::Release);
if prev + 1 == total_workers {
let (lock, cvar) = &*b2;
let mut done = lock.lock().unwrap();
*done = true;
cvar.notify_all();
}
});
let (lock, cvar) = &*barrier_done;
let mut done = lock.lock().unwrap();
while !*done {
done = cvar.wait(done).unwrap();
}
w1.join().unwrap();
w2.join().unwrap();
assert_eq!(completed.load(Ordering::Acquire), total_workers);
});
}
#[test]
fn loom_backpressure() {
loom::model(|| {
let queue = Arc::new(BoundedQueue::new(1));
let producer_progress = Arc::new(AtomicUsize::new(0));
let q = Arc::clone(&queue);
let pp = Arc::clone(&producer_progress);
let producer = thread::spawn(move || {
q.push(1);
pp.fetch_add(1, Ordering::Release);
q.push(2);
pp.fetch_add(1, Ordering::Release);
});
let q = Arc::clone(&queue);
let consumer = thread::spawn(move || {
let v1 = q.pop();
let v2 = q.pop();
assert!(v1.is_some());
assert!(v2.is_some());
});
producer.join().unwrap();
queue.close();
consumer.join().unwrap();
assert_eq!(producer_progress.load(Ordering::Acquire), 2);
});
}
#[test]
fn loom_multi_producer_bounded() {
loom::model(|| {
let queue = Arc::new(BoundedQueue::new(1));
let sum = Arc::new(AtomicUsize::new(0));
let q1 = Arc::clone(&queue);
let p1 = thread::spawn(move || {
q1.push(10);
});
let q2 = Arc::clone(&queue);
let p2 = thread::spawn(move || {
q2.push(20);
});
let q3 = Arc::clone(&queue);
let s = Arc::clone(&sum);
let consumer = thread::spawn(move || {
while let Some(val) = q3.pop() {
s.fetch_add(val, Ordering::Relaxed);
}
});
p1.join().unwrap();
p2.join().unwrap();
queue.close();
consumer.join().unwrap();
assert_eq!(sum.load(Ordering::Relaxed), 30);
});
}
}