#![cfg(feature = "std")]
#![allow(clippy::unwrap_used)]
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::thread;
use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
fn next_rand(state: &mut u64) -> u64 {
let mut x = *state;
x ^= x << 13;
x ^= x >> 7;
x ^= x << 17;
*state = x;
x
}
#[test]
fn stress_mixed_shared_exclusive_mutual_exclusion() {
const THREADS: u64 = 8;
const RESOURCES: u64 = 16;
const ITERS: usize = 20_000;
let lm = Arc::new(LockManager::new());
let readers: Arc<Vec<AtomicUsize>> =
Arc::new((0..RESOURCES).map(|_| AtomicUsize::new(0)).collect());
let writers: Arc<Vec<AtomicUsize>> =
Arc::new((0..RESOURCES).map(|_| AtomicUsize::new(0)).collect());
let mut handles = Vec::new();
for t in 0..THREADS {
let lm = Arc::clone(&lm);
let readers = Arc::clone(&readers);
let writers = Arc::clone(&writers);
handles.push(thread::spawn(move || {
let txn = TxnId::new(t);
let mut rng = t.wrapping_add(1);
for _ in 0..ITERS {
let r = (next_rand(&mut rng) % RESOURCES) as usize;
let res = ResourceId::new(r as u64);
let exclusive = next_rand(&mut rng) & 1 == 0;
let mode = if exclusive {
LockMode::Exclusive
} else {
LockMode::Shared
};
if lm.try_acquire(txn, res, mode).is_err() {
continue;
}
if exclusive {
let prior_writers = writers[r].fetch_add(1, Ordering::SeqCst);
assert_eq!(prior_writers, 0, "two writers in resource {r}");
assert_eq!(
readers[r].load(Ordering::SeqCst),
0,
"writer coexists with a reader in resource {r}"
);
writers[r].fetch_sub(1, Ordering::SeqCst);
} else {
readers[r].fetch_add(1, Ordering::SeqCst);
assert_eq!(
writers[r].load(Ordering::SeqCst),
0,
"reader coexists with a writer in resource {r}"
);
readers[r].fetch_sub(1, Ordering::SeqCst);
}
lm.release(txn, res).unwrap();
}
}));
}
for h in handles {
h.join().expect("worker thread panicked");
}
for r in 0..RESOURCES {
assert_eq!(
lm.holder_count(ResourceId::new(r)),
0,
"resource {r} leaked"
);
}
}
#[test]
fn stress_deadlock_storm_always_makes_progress() {
const THREADS: u64 = 8;
const RESOURCES: u64 = 6;
const COMMITS_PER_THREAD: usize = 100;
const ATTEMPT_CAP: usize = 200_000;
const SPIN_BUDGET: usize = 50_000;
let lm = Arc::new(LockManager::new());
let next_txn = Arc::new(AtomicU64::new(1));
fn acquire_pair(
lm: &LockManager,
txn: TxnId,
first: u64,
second: u64,
spin_budget: usize,
) -> bool {
for &r in &[first, second] {
let res = ResourceId::new(r);
let mut spins = 0;
loop {
match lm.request(txn, res, LockMode::Exclusive) {
Acquisition::Granted => break,
Acquisition::Deadlock(_) => return false,
Acquisition::Waiting => {
spins += 1;
if spins > spin_budget {
return false;
}
thread::yield_now();
}
}
}
}
true
}
let mut handles = Vec::new();
for t in 0..THREADS {
let lm = Arc::clone(&lm);
let next_txn = Arc::clone(&next_txn);
handles.push(thread::spawn(move || {
let mut rng = t.wrapping_add(1).wrapping_mul(0x9E37_79B9);
let mut committed = 0usize;
let mut attempts = 0usize;
while committed < COMMITS_PER_THREAD {
attempts += 1;
assert!(
attempts < ATTEMPT_CAP,
"thread {t} made no progress: {committed} commits in {attempts} attempts",
);
let txn = TxnId::new(next_txn.fetch_add(1, Ordering::Relaxed));
let a = next_rand(&mut rng) % RESOURCES;
let mut b = next_rand(&mut rng) % RESOURCES;
if b == a {
b = (a + 1) % RESOURCES;
}
let ok = acquire_pair(&lm, txn, a, b, SPIN_BUDGET);
let _ = lm.release_all(txn);
if ok {
committed += 1;
}
}
committed
}));
}
let mut total = 0;
for h in handles {
total += h.join().expect("worker thread panicked");
}
assert_eq!(total, THREADS as usize * COMMITS_PER_THREAD);
assert_eq!(lm.waiting_count(), 0, "waits leaked");
for r in 0..RESOURCES {
assert_eq!(
lm.holder_count(ResourceId::new(r)),
0,
"resource {r} leaked"
);
}
assert!(
lm.find_deadlock().is_none(),
"a deadlock survived the storm"
);
}