use rand::Rng;
use shuttle::sync::{Condvar, Mutex};
use shuttle::{check_dfs, check_random, replay, thread};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use test_log::test;
#[test]
fn notify_one() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while !*guard {
guard = cond.wait(guard).unwrap();
}
});
}
*lock.lock().unwrap() = true;
cond.notify_one();
},
None,
)
}
#[test]
fn notify_one_while() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let guard = lock.lock().unwrap();
let guard = cond.wait_while(guard, |flag| !*flag).unwrap();
assert!(*guard);
});
}
*lock.lock().unwrap() = true;
cond.notify_one();
},
None,
)
}
fn two_workers<F>(signal_thread: F)
where
F: Fn(Arc<Condvar>),
{
let lock = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while !*guard {
guard = cond.wait(guard).unwrap();
}
});
}
*lock.lock().unwrap() = true;
signal_thread(cond);
}
#[test]
fn notify_all() {
check_dfs(|| two_workers(|cond| cond.notify_all()), None)
}
#[test]
fn multiple_notify_one() {
check_dfs(
|| {
two_workers(|cond| {
cond.notify_one();
cond.notify_one();
})
},
None,
)
}
#[test]
#[should_panic(expected = "deadlock")]
fn notify_one_deadlock() {
check_dfs(
|| {
two_workers(|cond| {
cond.notify_one();
})
},
None,
)
}
#[test]
fn notify_one_all() {
check_dfs(
|| {
two_workers(|cond| {
cond.notify_one();
cond.notify_all();
})
},
None,
)
}
#[test]
fn notify_all_one() {
check_dfs(
|| {
two_workers(|cond| {
cond.notify_all();
cond.notify_one();
})
},
None,
)
}
#[test]
#[should_panic(expected = "found the failing execution")]
fn notify_one_order() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(0u8));
let cond = Arc::new(Condvar::new());
let sequencer_cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let sequencer_cond = Arc::clone(&sequencer_cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while *guard != 1 {
guard = sequencer_cond.wait(guard).unwrap();
}
*guard = 2;
sequencer_cond.notify_all();
while *guard < 5 {
guard = cond.wait(guard).unwrap();
}
*guard = 10;
});
}
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let sequencer_cond = Arc::clone(&sequencer_cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while *guard != 3 {
guard = sequencer_cond.wait(guard).unwrap();
}
*guard = 4;
sequencer_cond.notify_all();
while *guard < 5 {
guard = cond.wait(guard).unwrap();
}
*guard = 20;
});
}
let mut guard = lock.lock().unwrap();
*guard = 1;
sequencer_cond.notify_all();
while *guard != 2 {
guard = sequencer_cond.wait(guard).unwrap();
}
*guard = 3;
sequencer_cond.notify_all();
while *guard != 4 {
guard = sequencer_cond.wait(guard).unwrap();
}
*guard = 5;
cond.notify_one();
drop(guard);
assert_ne!(*lock.lock().unwrap(), 20, "found the failing execution");
cond.notify_one();
},
None,
)
}
fn producer_consumer_broken1() {
let lock = Arc::new(Mutex::new(()));
let cond = Arc::new(Condvar::new());
let count = Arc::new(AtomicUsize::new(0));
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
if count.load(Ordering::SeqCst) == 0 {
guard = cond.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
cond.notify_one();
drop(guard); }
});
}
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
if count.load(Ordering::SeqCst) == 1 {
guard = cond.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
}
#[test]
#[should_panic]
fn check_producer_consumer_broken1() {
check_random(producer_consumer_broken1, 5000)
}
#[test]
#[should_panic(expected = "nothing to get")]
fn replay_producer_consumer_broken1() {
replay(
producer_consumer_broken1,
"910219ccf2ead7a59dee9e4590000282249100208904",
)
}
fn producer_consumer_broken2() {
let lock = Arc::new(Mutex::new(()));
let cond = Arc::new(Condvar::new());
let count = Arc::new(AtomicUsize::new(0));
for _ in 0..2 {
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..1 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 0 {
guard = cond.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
});
}
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 1 {
guard = cond.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
cond.notify_one();
drop(guard);
}
}
#[test]
#[should_panic]
fn check_producer_consumer_broken2() {
check_random(producer_consumer_broken2, 5000)
}
#[test]
#[should_panic(expected = "deadlock")]
fn replay_producer_consumer_broken2() {
replay(producer_consumer_broken2, "91021499a0ee829bee85922b104410200052a404")
}
#[test]
fn producer_consumer_correct() {
check_random(
|| {
let lock = Arc::new(Mutex::new(()));
let is_empty = Arc::new(Condvar::new()); let is_full = Arc::new(Condvar::new()); let count = Arc::new(AtomicUsize::new(0));
for _ in 0..2 {
let lock = Arc::clone(&lock);
let is_empty = Arc::clone(&is_empty);
let is_full = Arc::clone(&is_full);
let count = Arc::clone(&count);
thread::spawn(move || {
for _ in 0..1 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 0 {
guard = is_full.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
is_empty.notify_one();
drop(guard);
}
});
}
for _ in 0..2 {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 1 {
guard = is_empty.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
is_full.notify_one();
drop(guard);
}
},
20000,
)
}
#[test]
fn producer_consumer_random() {
check_random(
move || {
let mut rng = shuttle::rand::thread_rng();
let num_producers = 1 + rng.gen::<usize>() % 3;
let num_consumers = 1 + rng.gen::<usize>() % 3;
let num_events = (num_producers * num_consumers) * (1 + rng.gen::<usize>() % 4);
let lock = Arc::new(Mutex::new(()));
let is_empty = Arc::new(Condvar::new()); let is_full = Arc::new(Condvar::new()); let count = Arc::new(AtomicUsize::new(0));
let consumers = (0..num_consumers)
.map(|_| {
let lock = Arc::clone(&lock);
let is_empty = Arc::clone(&is_empty);
let is_full = Arc::clone(&is_full);
let count = Arc::clone(&count);
thread::spawn(move || {
let events = num_events / num_consumers;
for _ in 0..events {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 0 {
guard = is_full.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 1, "nothing to get");
count.store(0, Ordering::SeqCst);
is_empty.notify_one();
drop(guard);
}
})
})
.collect::<Vec<_>>();
let producers = (0..num_producers)
.map(|_| {
let lock = Arc::clone(&lock);
let is_empty = Arc::clone(&is_empty);
let is_full = Arc::clone(&is_full);
let count = Arc::clone(&count);
thread::spawn(move || {
let events = num_events / num_producers;
for _ in 0..events {
let mut guard = lock.lock().unwrap();
while count.load(Ordering::SeqCst) == 1 {
guard = is_empty.wait(guard).unwrap();
}
assert_eq!(count.load(Ordering::SeqCst), 0, "no space to put");
count.store(1, Ordering::SeqCst);
is_full.notify_one();
drop(guard);
}
})
})
.collect::<Vec<_>>();
for consumer in consumers {
consumer.join().unwrap();
}
for producer in producers {
producer.join().unwrap();
}
},
5000,
)
}
#[test]
fn notify_one_timeout() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let mut guard = lock.lock().unwrap();
while !*guard {
guard = cond.wait_timeout(guard, Duration::from_secs(10)).unwrap().0;
}
});
}
*lock.lock().unwrap() = true;
cond.notify_one();
},
None,
)
}
#[test]
fn notify_one_while_timeout() {
check_dfs(
|| {
let lock = Arc::new(Mutex::new(false));
let cond = Arc::new(Condvar::new());
{
let lock = Arc::clone(&lock);
let cond = Arc::clone(&cond);
thread::spawn(move || {
let guard = lock.lock().unwrap();
let (guard, timeout) = cond
.wait_timeout_while(guard, Duration::from_secs(10), |flag| !*flag)
.unwrap();
assert!(*guard);
assert!(!timeout.timed_out());
});
}
*lock.lock().unwrap() = true;
cond.notify_one();
},
None,
)
}