use std::{
fmt, mem,
ops::RangeBounds,
sync::{Arc, atomic::Ordering::Relaxed},
};
#[cfg(feature = "loom")]
use loom::sync::atomic::AtomicUsize;
#[cfg(not(feature = "loom"))]
use std::sync::atomic::AtomicUsize;
use housekeeping::qsbr;
#[cfg(feature = "loom")]
#[test]
fn small_exhaustive() {
let runs = RunCounter::default();
loom::model(move || {
runs.start();
let global = Arc::new(Global::new(Config {
num_threads: 3,
num_refreshes: 4,
num_reregistrations: 1,
}));
for index in 0..global.config.num_threads {
let global = global.clone();
loom::thread::Builder::new()
.name(format!("t{index}"))
.spawn(move || Thread::new(index, global).run())
.unwrap();
}
});
}
#[cfg(all(miri, not(feature = "loom")))]
#[test]
fn random() {
let global = Arc::new(Global::new(Config {
num_threads: 3,
num_refreshes: 64,
num_reregistrations: 6,
}));
std::thread::scope(|s| {
for index in 0..global.config.num_threads {
let global = global.clone();
std::thread::Builder::new()
.name(format!("t{index}"))
.spawn_scoped(s, move || Thread::new(index, global).run())
.unwrap();
}
});
}
#[cfg(all(not(miri), not(feature = "loom")))]
#[test]
fn stress() {
let global = Arc::new(Global::new(Config {
num_threads: std::thread::available_parallelism().unwrap().get(),
num_refreshes: 1048576,
num_reregistrations: 16384,
}));
std::thread::scope(|s| {
for index in 0..global.config.num_threads {
let global = global.clone();
std::thread::Builder::new()
.name(format!("t{index}"))
.spawn_scoped(s, move || Thread::new(index, global).run())
.unwrap();
}
});
}
struct Config {
num_threads: usize,
num_refreshes: usize,
num_reregistrations: usize,
}
struct Global {
config: Config,
schedule: qsbr::Schedule,
positions: Vec<AtomicUsize>,
base: AtomicUsize,
}
impl Global {
fn new(config: Config) -> Self {
Self {
schedule: qsbr::Schedule::new(),
positions: (0..config.num_threads)
.map(|_| AtomicUsize::new(usize::MAX))
.collect(),
base: AtomicUsize::new(2),
config,
}
}
}
struct Thread {
index: usize,
global: Arc<Global>,
user: Option<qsbr::User>,
pos: usize,
}
impl Thread {
fn new(index: usize, global: Arc<Global>) -> Self {
Self {
index,
global,
user: None,
pos: usize::MAX,
}
}
fn run(mut self) {
self.join();
let config = &self.global.config;
let num_iters = config.num_refreshes + config.num_reregistrations;
let rereg_freq = config.num_reregistrations.div_ceil(num_iters);
for iter in 0..num_iters {
if iter.is_multiple_of(rereg_freq) {
self.leave();
self.join();
continue;
}
let mut user = self.user.take().unwrap();
match user.progress(&self.global.schedule) {
Some(leaving) => {
self.pos += 1;
self.check_rest_positions(self.pos - 1..=self.pos);
if let Some(_last_leaving) = leaving.leave_last() {
self.check_rest_positions(self.pos..=self.pos);
self.global.base.store(self.pos - 1, Relaxed);
}
}
None => {
self.check_rest_positions(self.pos.saturating_sub(1)..=self.pos + 1);
}
}
assert_eq!(user.index(), self.pos % 3);
self.user = Some(user);
}
self.leave();
}
fn join(&mut self) {
assert!(self.user.is_none() && self.pos == usize::MAX);
let user = self.global.schedule.register();
let index = user.index();
let base = self.global.base.load(Relaxed);
let pos = base + 3 - (base + 3 - index) % 3;
self.global.positions[self.index].store(pos, Relaxed);
self.user = Some(user);
self.pos = pos;
}
fn leave(&mut self) {
let user = self.user.take().unwrap();
let pos = mem::replace(&mut self.pos, usize::MAX);
self.global.positions[self.index].store(usize::MAX, Relaxed);
let leaving = user.deregister(&self.global.schedule);
self.check_rest_positions(pos.saturating_sub(1)..=pos + 1);
if let Some(_last_leaving) = leaving.leave_last() {
self.check_rest_positions(pos + 1..=pos + 1);
self.global.base.store(pos, Relaxed);
}
}
fn check_rest_positions<R>(&self, range: R)
where
R: RangeBounds<usize> + fmt::Debug,
{
for (index, position) in self.global.positions.iter().enumerate() {
if index == self.index {
continue;
}
let position = position.load(Relaxed);
if position == usize::MAX {
continue;
}
assert!(
range.contains(&position),
"{position:?} (of [t{index}]) did not fit {range:?}"
);
}
}
}
#[cfg(feature = "loom")]
#[derive(Default)]
struct RunCounter {
started: std::sync::atomic::AtomicUsize,
}
#[cfg(feature = "loom")]
impl RunCounter {
fn start(&self) {
use std::io::Write;
let run = self.started.fetch_add(1, Relaxed);
if run.is_multiple_of(4096) {
print!("\rRun {run}");
let _ = std::io::stdout().flush();
}
}
}
#[cfg(feature = "loom")]
impl Drop for RunCounter {
fn drop(&mut self) {
println!("\rFinished after {} runs", *self.started.get_mut());
}
}