#![allow(clippy::redundant_clone)]
use rand::Rng;
use shuttle::rand::thread_rng;
use shuttle::sync::{Condvar, Mutex};
use shuttle::{check_random, replay, thread};
use std::sync::Arc;
use test_log::test;
#[derive(Clone)]
struct BoundedBuffer<T: Copy> {
inner: Arc<Mutex<Inner<T>>>,
cond: Arc<Condvar>,
}
struct Inner<T: Copy> {
buffer: Box<[T]>,
buffer_size: usize,
put_at: usize,
take_at: usize,
occupied: usize,
}
impl<T: Copy + Default> BoundedBuffer<T> {
fn new(buffer_size: usize) -> Self {
let inner = Inner {
buffer: vec![T::default(); buffer_size].into_boxed_slice(),
buffer_size,
put_at: 0,
take_at: 0,
occupied: 0,
};
BoundedBuffer {
inner: Arc::new(Mutex::new(inner)),
cond: Arc::new(Condvar::new()),
}
}
fn put(&self, x: T) {
let mut this = self.inner.lock().unwrap();
while this.occupied == this.buffer_size {
this = self.cond.wait(this).unwrap();
}
this.occupied += 1;
this.put_at %= this.buffer_size;
let put_at = this.put_at;
this.buffer[put_at] = x;
this.put_at += 1;
self.cond.notify_one();
}
fn take(&self) -> T {
let mut this = self.inner.lock().unwrap();
while this.occupied == 0 {
this = self.cond.wait(this).unwrap();
}
this.occupied -= 1;
this.take_at %= this.buffer_size;
let result = this.buffer[this.take_at];
this.take_at += 1;
self.cond.notify_one();
result
}
}
fn reader(buffer: BoundedBuffer<usize>, iterations: usize) {
for _ in 0..iterations {
let _ = buffer.take();
}
}
fn writer(buffer: BoundedBuffer<usize>, iterations: usize) {
for i in 0..iterations {
buffer.put(i);
}
}
#[test]
fn test_bounded_buffer_trivial() {
check_random(
|| {
let buffer = BoundedBuffer::new(1);
let reader = {
let buffer = buffer.clone();
thread::spawn(move || reader(buffer, 10))
};
let writer = {
let buffer = buffer.clone();
thread::spawn(move || writer(buffer, 10))
};
reader.join().unwrap();
writer.join().unwrap();
},
1000,
)
}
#[test]
#[should_panic(expected = "deadlock")]
fn test_bounded_buffer_find_deadlock_configuration() {
check_random(
move || {
let mut rng = thread_rng();
let buffer_size = rng.gen_range(0usize..5) + 1;
let readers = rng.gen_range(0usize..5) + 1;
let writers = rng.gen_range(0usize..5) + 1;
let iterations = rng.gen_range(0usize..10) + 1;
let total_iterations = iterations * readers;
let writer_iterations = total_iterations / writers;
let remainder = total_iterations % writers;
tracing::info!(buffer_size, readers, writers, iterations);
let buffer = BoundedBuffer::new(buffer_size);
let mut tasks = vec![];
for _ in 0..readers {
let buffer = buffer.clone();
tasks.push(thread::spawn(move || reader(buffer, iterations)));
}
for i in 0..writers {
let buffer = buffer.clone();
let mut w = writer_iterations;
if i == writers - 1 {
w += remainder;
}
tasks.push(thread::spawn(move || writer(buffer, w)));
}
for task in tasks {
task.join().unwrap();
}
},
1000,
)
}
#[allow(clippy::vec_init_then_push)]
fn bounded_buffer_minimal() {
let buffer = BoundedBuffer::new(1);
let mut tasks = vec![];
tasks.push({
let buffer = buffer.clone();
thread::spawn(move || reader(buffer, 5))
});
tasks.push({
let buffer = buffer.clone();
thread::spawn(move || reader(buffer, 5))
});
tasks.push({
let buffer = buffer.clone();
thread::spawn(move || writer(buffer, 10))
});
for task in tasks {
task.join().unwrap();
}
}
#[test]
#[should_panic(expected = "deadlock")]
fn test_bounded_buffer_minimal_deadlock() {
check_random(bounded_buffer_minimal, 1000)
}
#[test]
#[should_panic(expected = "deadlock")]
fn test_bounded_buffer_minimal_deadlock_replay() {
replay(bounded_buffer_minimal, "910219e6c5a886c7a1f2d29e01106050a42ddb12455102")
}