#[cfg(loom)]
use loom::sync::{Mutex, MutexGuard};
#[cfg(not(loom))]
use std::sync::{Mutex, MutexGuard};
use std::collections::HashMap;
use crate::deadlock::{Deadlock, VictimPolicy, WaitForGraph};
use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
const DEADLOCK_VICTIM_POLICY: VictimPolicy = VictimPolicy::Youngest;
const FIB_HASH: u64 = 0x9E37_79B9_7F4A_7C15;
#[derive(Clone, Copy)]
struct Holder {
txn: TxnId,
mode: LockMode,
}
struct LockEntry {
holders: Vec<Holder>,
}
impl LockEntry {
#[inline]
fn new() -> Self {
Self {
holders: Vec::new(),
}
}
}
#[derive(Clone, Copy)]
struct RangeHolder {
txn: TxnId,
range: KeyRange,
mode: LockMode,
}
struct RangeSpace {
holders: Vec<RangeHolder>,
}
impl RangeSpace {
#[inline]
fn new() -> Self {
Self {
holders: Vec::new(),
}
}
}
struct ShardInner {
locks: HashMap<ResourceId, LockEntry>,
by_txn: HashMap<TxnId, Vec<ResourceId>>,
ranges: HashMap<ResourceId, RangeSpace>,
range_by_txn: HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
}
impl ShardInner {
fn new() -> Self {
Self {
locks: HashMap::new(),
by_txn: HashMap::new(),
ranges: HashMap::new(),
range_by_txn: HashMap::new(),
}
}
}
struct Shard {
inner: Mutex<ShardInner>,
}
#[must_use = "a LockManager that is dropped immediately releases every lock it holds"]
pub struct LockManager {
shards: Box<[Shard]>,
bits: u32,
waits: Mutex<HashMap<TxnId, (ResourceId, LockMode)>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[must_use = "the outcome decides whether the transaction proceeds, waits, or aborts"]
pub enum Acquisition {
Granted,
Waiting,
Deadlock(Deadlock),
}
impl LockManager {
pub fn new() -> Self {
let parallelism = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
let target = (parallelism.saturating_mul(4))
.next_power_of_two()
.clamp(16, 1024);
Self::with_shards(target)
}
pub fn with_shards(shards: usize) -> Self {
let n = shards.max(1).next_power_of_two();
let bits = n.trailing_zeros();
let mut v = Vec::with_capacity(n);
for _ in 0..n {
v.push(Shard {
inner: Mutex::new(ShardInner::new()),
});
}
Self {
shards: v.into_boxed_slice(),
bits,
waits: Mutex::new(HashMap::new()),
}
}
#[inline]
#[must_use]
pub fn shards(&self) -> usize {
self.shards.len()
}
pub fn try_acquire(
&self,
txn: TxnId,
res: ResourceId,
mode: LockMode,
) -> Result<(), LockError> {
let mut guard = self.lock_shard(res);
let ShardInner { locks, by_txn, .. } = &mut *guard;
if Self::try_grant_locked(locks, by_txn, txn, res, mode) {
Ok(())
} else {
Err(LockError::Conflict)
}
}
fn try_grant_locked(
locks: &mut HashMap<ResourceId, LockEntry>,
by_txn: &mut HashMap<TxnId, Vec<ResourceId>>,
txn: TxnId,
res: ResourceId,
mode: LockMode,
) -> bool {
let entry = locks.entry(res).or_insert_with(LockEntry::new);
if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
let current = entry.holders[pos].mode;
if current.covers(mode) {
return true;
}
let target = current.join(mode);
let blocked = entry
.holders
.iter()
.enumerate()
.any(|(i, h)| i != pos && !h.mode.compatible_with(target));
if blocked {
return false;
}
entry.holders[pos].mode = target;
return true;
}
if entry.holders.iter().all(|h| h.mode.compatible_with(mode)) {
entry.holders.push(Holder { txn, mode });
by_txn.entry(txn).or_default().push(res);
true
} else {
false
}
}
pub fn release(&self, txn: TxnId, res: ResourceId) -> Result<(), LockError> {
let mut guard = self.lock_shard(res);
let ShardInner { locks, by_txn, .. } = &mut *guard;
let entry = match locks.get_mut(&res) {
Some(entry) => entry,
None => return Err(LockError::NotHeld),
};
let pos = match entry.holders.iter().position(|h| h.txn == txn) {
Some(pos) => pos,
None => return Err(LockError::NotHeld),
};
let _ = entry.holders.swap_remove(pos);
if entry.holders.is_empty() {
let _ = locks.remove(&res);
}
Self::forget_resource(by_txn, txn, res);
Ok(())
}
pub fn release_all(&self, txn: TxnId) -> usize {
{
let mut waits = self.lock_waits();
let _ = waits.remove(&txn);
}
let mut released = 0;
for shard in self.shards.iter() {
let mut guard = Self::lock(shard);
let ShardInner {
locks,
by_txn,
ranges,
range_by_txn,
} = &mut *guard;
if let Some(resources) = by_txn.remove(&txn) {
for res in resources {
if let Some(entry) = locks.get_mut(&res) {
if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
let _ = entry.holders.swap_remove(pos);
released += 1;
if entry.holders.is_empty() {
let _ = locks.remove(&res);
}
}
}
}
}
if let Some(spaces) = range_by_txn.remove(&txn) {
for (space, range) in spaces {
if let Some(rs) = ranges.get_mut(&space) {
if let Some(pos) = rs
.holders
.iter()
.position(|h| h.txn == txn && h.range == range)
{
let _ = rs.holders.swap_remove(pos);
released += 1;
if rs.holders.is_empty() {
let _ = ranges.remove(&space);
}
}
}
}
}
}
released
}
pub fn request(&self, txn: TxnId, res: ResourceId, mode: LockMode) -> Acquisition {
let mut waits = self.lock_waits();
let granted = {
let mut guard = self.lock_shard(res);
let ShardInner { locks, by_txn, .. } = &mut *guard;
Self::try_grant_locked(locks, by_txn, txn, res, mode)
};
if granted {
let _ = waits.remove(&txn);
return Acquisition::Granted;
}
let _ = waits.insert(txn, (res, mode));
let graph = self.build_wait_graph(&waits);
match graph.cycle_from(txn) {
Some(cycle) => {
let victim =
WaitForGraph::pick_victim(&cycle, DEADLOCK_VICTIM_POLICY).unwrap_or(txn);
Acquisition::Deadlock(Deadlock { victim, cycle })
}
None => Acquisition::Waiting,
}
}
pub fn cancel_wait(&self, txn: TxnId) {
let mut waits = self.lock_waits();
let _ = waits.remove(&txn);
}
#[must_use]
pub fn find_deadlock(&self) -> Option<Deadlock> {
let waits = self.lock_waits();
let graph = self.build_wait_graph(&waits);
let cycle = graph.detect_cycle()?;
let victim = WaitForGraph::pick_victim(&cycle, DEADLOCK_VICTIM_POLICY)?;
Some(Deadlock { victim, cycle })
}
#[must_use]
pub fn waiting_count(&self) -> usize {
self.lock_waits().len()
}
fn build_wait_graph(&self, waits: &HashMap<TxnId, (ResourceId, LockMode)>) -> WaitForGraph {
let mut graph = WaitForGraph::new();
for (&waiter, &(res, mode)) in waits {
let blockers = self.holders_blocking(waiter, res, mode);
graph.add_waits(waiter, &blockers);
}
graph
}
fn holders_blocking(&self, waiter: TxnId, res: ResourceId, mode: LockMode) -> Vec<TxnId> {
let guard = self.lock_shard(res);
guard.locks.get(&res).map_or_else(Vec::new, |entry| {
entry
.holders
.iter()
.filter(|h| h.txn != waiter && !h.mode.compatible_with(mode))
.map(|h| h.txn)
.collect()
})
}
#[inline]
fn lock_waits(&self) -> MutexGuard<'_, HashMap<TxnId, (ResourceId, LockMode)>> {
match self.waits.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
#[must_use]
pub fn holder_count(&self, res: ResourceId) -> usize {
let guard = self.lock_shard(res);
guard.locks.get(&res).map_or(0, |e| e.holders.len())
}
#[must_use]
pub fn mode_held(&self, txn: TxnId, res: ResourceId) -> Option<LockMode> {
let guard = self.lock_shard(res);
guard
.locks
.get(&res)
.and_then(|e| e.holders.iter().find(|h| h.txn == txn))
.map(|h| h.mode)
}
pub fn try_acquire_range(
&self,
txn: TxnId,
space: ResourceId,
range: KeyRange,
mode: LockMode,
) -> Result<(), LockError> {
let mut guard = self.lock_shard(space);
let ShardInner {
ranges,
range_by_txn,
..
} = &mut *guard;
let rs = ranges.entry(space).or_insert_with(RangeSpace::new);
let conflict = rs
.holders
.iter()
.any(|h| h.txn != txn && h.range.overlaps(range) && !h.mode.compatible_with(mode));
if conflict {
return Err(LockError::Conflict);
}
rs.holders.push(RangeHolder { txn, range, mode });
range_by_txn.entry(txn).or_default().push((space, range));
Ok(())
}
pub fn release_range(
&self,
txn: TxnId,
space: ResourceId,
range: KeyRange,
) -> Result<(), LockError> {
let mut guard = self.lock_shard(space);
let ShardInner {
ranges,
range_by_txn,
..
} = &mut *guard;
let rs = match ranges.get_mut(&space) {
Some(rs) => rs,
None => return Err(LockError::NotHeld),
};
let pos = match rs
.holders
.iter()
.position(|h| h.txn == txn && h.range == range)
{
Some(pos) => pos,
None => return Err(LockError::NotHeld),
};
let _ = rs.holders.swap_remove(pos);
if rs.holders.is_empty() {
let _ = ranges.remove(&space);
}
Self::forget_range(range_by_txn, txn, space, range);
Ok(())
}
#[must_use]
pub fn range_count(&self, space: ResourceId) -> usize {
let guard = self.lock_shard(space);
guard.ranges.get(&space).map_or(0, |rs| rs.holders.len())
}
#[inline]
fn forget_resource(by_txn: &mut HashMap<TxnId, Vec<ResourceId>>, txn: TxnId, res: ResourceId) {
if let Some(resources) = by_txn.get_mut(&txn) {
if let Some(pos) = resources.iter().position(|r| *r == res) {
let _ = resources.swap_remove(pos);
}
if resources.is_empty() {
let _ = by_txn.remove(&txn);
}
}
}
#[inline]
fn forget_range(
range_by_txn: &mut HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
txn: TxnId,
space: ResourceId,
range: KeyRange,
) {
if let Some(held) = range_by_txn.get_mut(&txn) {
if let Some(pos) = held.iter().position(|(s, r)| *s == space && *r == range) {
let _ = held.swap_remove(pos);
}
if held.is_empty() {
let _ = range_by_txn.remove(&txn);
}
}
}
#[inline]
fn lock_shard(&self, res: ResourceId) -> MutexGuard<'_, ShardInner> {
Self::lock(&self.shards[self.shard_index(res)])
}
#[inline]
fn lock(shard: &Shard) -> MutexGuard<'_, ShardInner> {
match shard.inner.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
#[inline]
fn shard_index(&self, res: ResourceId) -> usize {
if self.bits == 0 {
return 0;
}
let hash = res.get().wrapping_mul(FIB_HASH);
(hash >> (u64::BITS - self.bits)) as usize
}
}
impl Default for LockManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(all(test, not(loom)))]
#[allow(clippy::unwrap_used)]
mod tests {
use super::{Acquisition, FIB_HASH, LockManager};
use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
fn ids(t: u64, r: u64) -> (TxnId, ResourceId) {
(TxnId::new(t), ResourceId::new(r))
}
fn kr(start: u64, end: u64) -> KeyRange {
KeyRange::new(start, end).unwrap()
}
#[test]
fn test_shared_locks_coexist() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
lm.try_acquire(TxnId::new(3), r, LockMode::Shared).unwrap();
assert_eq!(lm.holder_count(r), 3);
}
#[test]
fn test_exclusive_excludes_shared() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
Err(LockError::Conflict)
);
}
#[test]
fn test_intention_shared_and_intention_exclusive_coexist() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::IntentionShared)
.unwrap();
lm.try_acquire(TxnId::new(2), r, LockMode::IntentionExclusive)
.unwrap();
assert_eq!(lm.holder_count(r), 2);
}
#[test]
fn test_intention_exclusive_blocks_shared() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive)
.unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
Err(LockError::Conflict)
);
lm.try_acquire(TxnId::new(3), r, LockMode::IntentionExclusive)
.unwrap();
lm.try_acquire(TxnId::new(4), r, LockMode::IntentionShared)
.unwrap();
}
#[test]
fn test_shared_plus_intention_exclusive_upgrades_to_six() {
let lm = LockManager::new();
let r = ResourceId::new(1);
let t = TxnId::new(1);
lm.try_acquire(t, r, LockMode::Shared).unwrap();
lm.try_acquire(t, r, LockMode::IntentionExclusive).unwrap();
assert_eq!(lm.mode_held(t, r), Some(LockMode::SharedIntentionExclusive));
lm.try_acquire(TxnId::new(2), r, LockMode::IntentionShared)
.unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(3), r, LockMode::Shared),
Err(LockError::Conflict)
);
}
#[test]
fn test_intention_shared_upgrades_to_exclusive_when_sole_holder() {
let lm = LockManager::new();
let r = ResourceId::new(1);
let t = TxnId::new(1);
lm.try_acquire(t, r, LockMode::IntentionShared).unwrap();
lm.try_acquire(t, r, LockMode::Exclusive).unwrap();
assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
}
#[test]
fn test_upgrade_to_six_blocked_by_other_reader() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive),
Err(LockError::Conflict)
);
assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
}
#[test]
fn test_hierarchy_protocol_row_write_under_table_intent() {
let lm = LockManager::new();
let (db, table, page, row) = (
ResourceId::new(1),
ResourceId::new(2),
ResourceId::new(3),
ResourceId::new(4),
);
let writer = TxnId::new(1);
for res in [db, table, page] {
lm.try_acquire(writer, res, LockMode::IntentionExclusive)
.unwrap();
}
lm.try_acquire(writer, row, LockMode::Exclusive).unwrap();
let reader = TxnId::new(2);
for res in [db, table] {
lm.try_acquire(reader, res, LockMode::IntentionShared)
.unwrap();
}
assert_eq!(
lm.try_acquire(reader, row, LockMode::Shared),
Err(LockError::Conflict)
);
}
#[test]
fn test_exclusive_excludes_exclusive() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
Err(LockError::Conflict)
);
}
#[test]
fn test_shared_blocks_other_exclusive() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Shared).unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
Err(LockError::Conflict)
);
}
#[test]
fn test_reacquire_same_mode_is_idempotent() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Shared).unwrap();
lm.try_acquire(t1, r, LockMode::Shared).unwrap();
assert_eq!(lm.holder_count(r), 1);
}
#[test]
fn test_request_weaker_than_held_is_noop() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
lm.try_acquire(t1, r, LockMode::Shared).unwrap();
assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
assert_eq!(lm.holder_count(r), 1);
}
#[test]
fn test_upgrade_sole_holder_succeeds() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Shared).unwrap();
lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
assert_eq!(lm.holder_count(r), 1);
}
#[test]
fn test_upgrade_blocked_by_other_reader() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
assert_eq!(
lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive),
Err(LockError::Conflict)
);
assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
}
#[test]
fn test_release_frees_resource_for_exclusive() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
lm.release(TxnId::new(1), r).unwrap();
assert!(
lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
.is_err()
);
lm.release(TxnId::new(2), r).unwrap();
lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
.unwrap();
}
#[test]
fn test_release_not_held_errors() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
lm.try_acquire(t1, r, LockMode::Shared).unwrap();
assert_eq!(lm.release(TxnId::new(9), r), Err(LockError::NotHeld));
}
#[test]
fn test_double_release_errors() {
let lm = LockManager::new();
let (t1, r) = ids(1, 1);
lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
lm.release(t1, r).unwrap();
assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
}
#[test]
fn test_release_all_drops_every_lock() {
let lm = LockManager::with_shards(8);
let t = TxnId::new(1);
for id in 0..50 {
lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
.unwrap();
}
assert_eq!(lm.release_all(t), 50);
for id in 0..50 {
assert_eq!(lm.holder_count(ResourceId::new(id)), 0);
}
assert_eq!(lm.release_all(t), 0);
}
#[test]
fn test_release_all_leaves_other_txns_alone() {
let lm = LockManager::new();
let r = ResourceId::new(1);
lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
assert_eq!(lm.release_all(TxnId::new(1)), 1);
assert_eq!(lm.mode_held(TxnId::new(2), r), Some(LockMode::Shared));
assert_eq!(lm.holder_count(r), 1);
}
#[test]
fn test_resource_fully_released_can_be_taken_exclusively() {
let lm = LockManager::new();
let r = ResourceId::new(42);
lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive)
.unwrap();
lm.release(TxnId::new(1), r).unwrap();
assert_eq!(lm.holder_count(r), 0);
lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive)
.unwrap();
}
#[test]
fn test_range_shared_overlap_coexists() {
let lm = LockManager::new();
let space = ResourceId::new(1);
lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
.unwrap();
lm.try_acquire_range(TxnId::new(2), space, kr(50, 150), LockMode::Shared)
.unwrap();
assert_eq!(lm.range_count(space), 2);
}
#[test]
fn test_range_exclusive_conflicts_on_overlap() {
let lm = LockManager::new();
let space = ResourceId::new(1);
lm.try_acquire_range(TxnId::new(1), space, kr(100, 200), LockMode::Shared)
.unwrap();
assert_eq!(
lm.try_acquire_range(
TxnId::new(2),
space,
KeyRange::point(150),
LockMode::Exclusive
),
Err(LockError::Conflict)
);
}
#[test]
fn test_range_disjoint_ranges_do_not_conflict() {
let lm = LockManager::new();
let space = ResourceId::new(1);
lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
.unwrap();
lm.try_acquire_range(TxnId::new(2), space, kr(101, 200), LockMode::Exclusive)
.unwrap();
}
#[test]
fn test_range_adjacent_inclusive_bounds_conflict() {
let lm = LockManager::new();
let space = ResourceId::new(1);
lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
.unwrap();
assert_eq!(
lm.try_acquire_range(TxnId::new(2), space, kr(100, 200), LockMode::Shared),
Err(LockError::Conflict)
);
}
#[test]
fn test_range_different_spaces_independent() {
let lm = LockManager::new();
lm.try_acquire_range(
TxnId::new(1),
ResourceId::new(1),
kr(0, 100),
LockMode::Exclusive,
)
.unwrap();
lm.try_acquire_range(
TxnId::new(2),
ResourceId::new(2),
kr(0, 100),
LockMode::Exclusive,
)
.unwrap();
}
#[test]
fn test_range_same_txn_overlap_allowed() {
let lm = LockManager::new();
let space = ResourceId::new(1);
let t = TxnId::new(1);
lm.try_acquire_range(t, space, kr(0, 100), LockMode::Exclusive)
.unwrap();
lm.try_acquire_range(t, space, kr(50, 150), LockMode::Exclusive)
.unwrap();
assert_eq!(lm.range_count(space), 2);
}
#[test]
fn test_range_release_frees_overlap() {
let lm = LockManager::new();
let space = ResourceId::new(1);
let r = kr(100, 200);
lm.try_acquire_range(TxnId::new(1), space, r, LockMode::Exclusive)
.unwrap();
lm.release_range(TxnId::new(1), space, r).unwrap();
assert_eq!(lm.range_count(space), 0);
lm.try_acquire_range(
TxnId::new(2),
space,
KeyRange::point(150),
LockMode::Exclusive,
)
.unwrap();
}
#[test]
fn test_range_release_not_held_errors() {
let lm = LockManager::new();
let space = ResourceId::new(1);
assert_eq!(
lm.release_range(TxnId::new(1), space, kr(0, 10)),
Err(LockError::NotHeld)
);
lm.try_acquire_range(TxnId::new(1), space, kr(0, 10), LockMode::Shared)
.unwrap();
assert_eq!(
lm.release_range(TxnId::new(1), space, kr(0, 11)),
Err(LockError::NotHeld)
);
}
#[test]
fn test_release_all_drops_point_and_range_locks() {
let lm = LockManager::new();
let t = TxnId::new(1);
for id in 0..3 {
lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
.unwrap();
}
lm.try_acquire_range(t, ResourceId::new(100), kr(0, 10), LockMode::Shared)
.unwrap();
lm.try_acquire_range(t, ResourceId::new(100), kr(20, 30), LockMode::Shared)
.unwrap();
assert_eq!(lm.release_all(t), 5); assert_eq!(lm.range_count(ResourceId::new(100)), 0);
assert_eq!(lm.release_all(t), 0);
}
#[test]
fn test_release_all_range_leaves_other_txn() {
let lm = LockManager::new();
let space = ResourceId::new(1);
lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
.unwrap();
lm.try_acquire_range(TxnId::new(2), space, kr(0, 100), LockMode::Shared)
.unwrap();
assert_eq!(lm.release_all(TxnId::new(1)), 1);
assert_eq!(lm.range_count(space), 1);
}
#[test]
fn test_range_intention_modes_coexist() {
let lm = LockManager::new();
let space = ResourceId::new(1);
lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::IntentionShared)
.unwrap();
lm.try_acquire_range(
TxnId::new(2),
space,
kr(0, 100),
LockMode::IntentionExclusive,
)
.unwrap();
assert_eq!(lm.range_count(space), 2);
}
#[test]
fn test_request_granted_on_free_resource() {
let lm = LockManager::new();
let (t, r) = ids(1, 1);
assert_eq!(lm.request(t, r, LockMode::Exclusive), Acquisition::Granted);
assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
assert_eq!(lm.waiting_count(), 0);
}
#[test]
fn test_request_waiting_registers_wait() {
let lm = LockManager::new();
let r = ResourceId::new(1);
assert_eq!(
lm.request(TxnId::new(1), r, LockMode::Exclusive),
Acquisition::Granted
);
assert_eq!(
lm.request(TxnId::new(2), r, LockMode::Exclusive),
Acquisition::Waiting
);
assert_eq!(lm.waiting_count(), 1);
}
#[test]
fn test_request_grant_clears_prior_wait() {
let lm = LockManager::new();
let r = ResourceId::new(1);
let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
assert_eq!(
lm.request(TxnId::new(2), r, LockMode::Exclusive),
Acquisition::Waiting
);
lm.release(TxnId::new(1), r).unwrap();
assert_eq!(
lm.request(TxnId::new(2), r, LockMode::Exclusive),
Acquisition::Granted
);
assert_eq!(lm.waiting_count(), 0);
}
#[test]
fn test_classic_two_transaction_deadlock() {
let lm = LockManager::new();
let (a, b) = (ResourceId::new(1), ResourceId::new(2));
let (t1, t2) = (TxnId::new(1), TxnId::new(2));
assert_eq!(lm.request(t1, a, LockMode::Exclusive), Acquisition::Granted);
assert_eq!(lm.request(t2, b, LockMode::Exclusive), Acquisition::Granted);
assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
match lm.request(t2, a, LockMode::Exclusive) {
Acquisition::Deadlock(d) => {
assert_eq!(d.victim, t2); assert_eq!(d.cycle.len(), 2);
assert!(d.cycle.contains(&t1) && d.cycle.contains(&t2));
}
other => panic!("expected deadlock, got {other:?}"),
}
}
#[test]
fn test_three_transaction_deadlock_cycle() {
let lm = LockManager::new();
let (a, b, c) = (ResourceId::new(1), ResourceId::new(2), ResourceId::new(3));
let (t1, t2, t3) = (TxnId::new(1), TxnId::new(2), TxnId::new(3));
let _ = lm.request(t1, a, LockMode::Exclusive);
let _ = lm.request(t2, b, LockMode::Exclusive);
let _ = lm.request(t3, c, LockMode::Exclusive);
assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
assert_eq!(lm.request(t2, c, LockMode::Exclusive), Acquisition::Waiting);
match lm.request(t3, a, LockMode::Exclusive) {
Acquisition::Deadlock(d) => {
assert_eq!(d.cycle.len(), 3);
assert_eq!(d.victim, t3); }
other => panic!("expected deadlock, got {other:?}"),
}
}
#[test]
fn test_aborting_victim_breaks_deadlock() {
let lm = LockManager::new();
let (a, b) = (ResourceId::new(1), ResourceId::new(2));
let (t1, t2) = (TxnId::new(1), TxnId::new(2));
let _ = lm.request(t1, a, LockMode::Exclusive);
let _ = lm.request(t2, b, LockMode::Exclusive);
let _ = lm.request(t1, b, LockMode::Exclusive);
let victim = match lm.request(t2, a, LockMode::Exclusive) {
Acquisition::Deadlock(d) => d.victim,
other => panic!("expected deadlock, got {other:?}"),
};
lm.release_all(victim);
let survivor = if victim == t1 { t2 } else { t1 };
let want = if survivor == t1 { b } else { a };
assert_eq!(
lm.request(survivor, want, LockMode::Exclusive),
Acquisition::Granted
);
assert!(lm.find_deadlock().is_none());
}
#[test]
fn test_no_false_deadlock_after_release() {
let lm = LockManager::new();
let (a, b) = (ResourceId::new(1), ResourceId::new(2));
let (t1, t2) = (TxnId::new(1), TxnId::new(2));
let _ = lm.request(t1, a, LockMode::Exclusive);
let _ = lm.request(t2, b, LockMode::Exclusive);
let _ = lm.request(t1, b, LockMode::Exclusive); lm.release(t2, b).unwrap(); assert_eq!(lm.request(t2, a, LockMode::Exclusive), Acquisition::Waiting);
assert!(lm.find_deadlock().is_none());
}
#[test]
fn test_cancel_wait_removes_from_graph() {
let lm = LockManager::new();
let r = ResourceId::new(1);
let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
assert_eq!(
lm.request(TxnId::new(2), r, LockMode::Exclusive),
Acquisition::Waiting
);
lm.cancel_wait(TxnId::new(2));
assert_eq!(lm.waiting_count(), 0);
}
#[test]
fn test_release_all_clears_wait() {
let lm = LockManager::new();
let r = ResourceId::new(1);
let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
let _ = lm.request(TxnId::new(2), r, LockMode::Exclusive); assert_eq!(lm.waiting_count(), 1);
lm.release_all(TxnId::new(2));
assert_eq!(lm.waiting_count(), 0);
}
#[test]
fn test_find_deadlock_none_without_cycle() {
let lm = LockManager::new();
let (a, b) = (ResourceId::new(1), ResourceId::new(2));
let _ = lm.request(TxnId::new(1), a, LockMode::Exclusive);
let _ = lm.request(TxnId::new(2), b, LockMode::Exclusive);
let _ = lm.request(TxnId::new(1), b, LockMode::Exclusive); assert!(lm.find_deadlock().is_none());
}
#[test]
fn test_shared_requests_do_not_deadlock() {
let lm = LockManager::new();
let r = ResourceId::new(1);
assert_eq!(
lm.request(TxnId::new(1), r, LockMode::Shared),
Acquisition::Granted
);
assert_eq!(
lm.request(TxnId::new(2), r, LockMode::Shared),
Acquisition::Granted
);
assert_eq!(lm.waiting_count(), 0);
}
#[test]
fn test_with_shards_rounds_up_to_power_of_two() {
assert_eq!(LockManager::with_shards(1).shards(), 1);
assert_eq!(LockManager::with_shards(3).shards(), 4);
assert_eq!(LockManager::with_shards(5).shards(), 8);
assert_eq!(LockManager::with_shards(0).shards(), 1);
assert_eq!(LockManager::with_shards(64).shards(), 64);
}
#[test]
fn test_single_shard_routes_everything_to_index_zero() {
let lm = LockManager::with_shards(1);
for id in 0..1000 {
assert_eq!(lm.shard_index(ResourceId::new(id)), 0);
}
}
#[test]
fn test_shard_index_within_bounds() {
let lm = LockManager::with_shards(16);
for id in 0..10_000 {
assert!(lm.shard_index(ResourceId::new(id)) < 16);
}
}
#[test]
fn test_sequential_ids_spread_across_shards() {
let lm = LockManager::with_shards(16);
let mut seen = [false; 16];
for id in 0..256 {
seen[lm.shard_index(ResourceId::new(id))] = true;
}
assert!(seen.iter().all(|&hit| hit));
}
#[test]
fn test_locks_in_different_shards_are_independent() {
let lm = LockManager::with_shards(16);
let a = ResourceId::new(1);
let b = ResourceId::new(2);
lm.try_acquire(TxnId::new(1), a, LockMode::Exclusive)
.unwrap();
lm.try_acquire(TxnId::new(2), b, LockMode::Exclusive)
.unwrap();
assert_eq!(lm.holder_count(a), 1);
assert_eq!(lm.holder_count(b), 1);
}
#[test]
fn test_fib_hash_constant_is_odd() {
assert_eq!(FIB_HASH & 1, 1);
}
#[test]
fn test_concurrent_shared_acquire_release_is_consistent() {
use std::sync::Arc;
use std::thread;
let lm = Arc::new(LockManager::new());
let r = ResourceId::new(7);
let mut handles = Vec::new();
for t in 0..8u64 {
let lm = Arc::clone(&lm);
handles.push(thread::spawn(move || {
let txn = TxnId::new(t);
for _ in 0..1000 {
lm.try_acquire(txn, r, LockMode::Shared).unwrap();
lm.release(txn, r).unwrap();
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(lm.holder_count(r), 0);
}
#[test]
fn test_concurrent_exclusive_is_mutually_exclusive() {
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
let lm = Arc::new(LockManager::new());
let active = Arc::new(AtomicUsize::new(0));
let r = ResourceId::new(11);
let mut handles = Vec::new();
for t in 0..8u64 {
let lm = Arc::clone(&lm);
let active = Arc::clone(&active);
handles.push(thread::spawn(move || {
let txn = TxnId::new(t);
for _ in 0..2000 {
if lm.try_acquire(txn, r, LockMode::Exclusive).is_ok() {
let inside = active.fetch_add(1, Ordering::SeqCst);
assert_eq!(inside, 0);
active.fetch_sub(1, Ordering::SeqCst);
lm.release(txn, r).unwrap();
}
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(lm.holder_count(r), 0);
}
}