lock_db/manager.rs
1//! The lock table: a sharded, contention-aware map from resources to holders.
2//!
3//! # Design
4//!
5//! A single global mutex over the whole lock table would serialise every
6//! acquire and release in the database, turning the lock manager itself into
7//! the bottleneck it exists to manage. Instead the table is split into a fixed
8//! number of independent shards, each guarding its own slice of the resource
9//! space behind its own mutex. Two transactions touching resources in different
10//! shards never contend on the same lock. The shard for a resource is chosen by
11//! Fibonacci hashing its id, which spreads sequential ids (the common case for
12//! page and row numbers) evenly across shards without paying for a
13//! general-purpose hasher on the hot path.
14//!
15//! Each shard also keeps a reverse index from transaction to the resources it
16//! holds in that shard, so releasing every lock a transaction owns is
17//! proportional to the number of locks held, not to the size of the table.
18//!
19//! This release ([crate-level docs](crate)) provides non-blocking acquisition:
20//! a request that cannot be granted immediately returns [`LockError::Conflict`]
21//! rather than waiting. Blocking acquisition with wait queues, and the
22//! deadlock detection that requires it, arrive in a later milestone.
23
24#[cfg(loom)]
25use loom::sync::{Mutex, MutexGuard};
26#[cfg(not(loom))]
27use std::sync::{Mutex, MutexGuard};
28
29use std::collections::HashMap;
30
31use crate::deadlock::{Deadlock, VictimPolicy, WaitForGraph};
32use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
33
34/// The victim policy the deadlock-aware acquisition path uses.
35const DEADLOCK_VICTIM_POLICY: VictimPolicy = VictimPolicy::Youngest;
36
37/// Multiplier for Fibonacci hashing: 2^64 divided by the golden ratio.
38const FIB_HASH: u64 = 0x9E37_79B9_7F4A_7C15;
39
40/// A transaction holding a resource, and the mode it holds it in.
41#[derive(Clone, Copy)]
42struct Holder {
43 txn: TxnId,
44 mode: LockMode,
45}
46
47/// The set of transactions currently holding one resource.
48///
49/// Holders are kept in an unordered `Vec` because the common case is a handful
50/// of shared readers or a single writer; a linear scan over a short, contiguous
51/// slice beats the constant overhead and indirection of a map for those sizes.
52struct LockEntry {
53 holders: Vec<Holder>,
54}
55
56impl LockEntry {
57 #[inline]
58 fn new() -> Self {
59 Self {
60 holders: Vec::new(),
61 }
62 }
63}
64
65/// A transaction holding a key range in a space, and the mode it holds.
66#[derive(Clone, Copy)]
67struct RangeHolder {
68 txn: TxnId,
69 range: KeyRange,
70 mode: LockMode,
71}
72
73/// The active range locks in one key space.
74///
75/// Held in an unordered `Vec` and scanned linearly for overlap on each request.
76/// Overlap is not a key-equality lookup, so a hash map does not help; an
77/// interval tree would lower the asymptotic cost but is heavier and is left for
78/// a later release if profiling shows range contention dominates.
79struct RangeSpace {
80 holders: Vec<RangeHolder>,
81}
82
83impl RangeSpace {
84 #[inline]
85 fn new() -> Self {
86 Self {
87 holders: Vec::new(),
88 }
89 }
90}
91
92/// The mutable state of one shard.
93struct ShardInner {
94 /// Point locks: resources with at least one holder, keyed by resource id.
95 locks: HashMap<ResourceId, LockEntry>,
96 /// Reverse index: the resources each transaction holds *in this shard*.
97 by_txn: HashMap<TxnId, Vec<ResourceId>>,
98 /// Range locks, keyed by the space (e.g. an index) they protect.
99 ranges: HashMap<ResourceId, RangeSpace>,
100 /// Reverse index for range locks: the (space, range) pairs each transaction
101 /// holds *in this shard*.
102 range_by_txn: HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
103}
104
105impl ShardInner {
106 fn new() -> Self {
107 Self {
108 locks: HashMap::new(),
109 by_txn: HashMap::new(),
110 ranges: HashMap::new(),
111 range_by_txn: HashMap::new(),
112 }
113 }
114}
115
116/// One independently locked partition of the table.
117struct Shard {
118 inner: Mutex<ShardInner>,
119}
120
121/// A sharded lock table mapping resources to the transactions that hold them.
122///
123/// `LockManager` is the primary entry point of the crate. It is `Send + Sync`
124/// and is meant to be shared behind an [`std::sync::Arc`] across all worker
125/// threads; every method takes `&self`, so no outer lock is needed.
126///
127/// # Examples
128///
129/// ```
130/// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
131///
132/// let lm = LockManager::new();
133/// let row = ResourceId::new(100);
134/// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
135///
136/// // Two transactions read the same row concurrently.
137/// lm.try_acquire(t1, row, LockMode::Shared).unwrap();
138/// lm.try_acquire(t2, row, LockMode::Shared).unwrap();
139/// assert_eq!(lm.holder_count(row), 2);
140///
141/// // Neither can take it exclusively while the other reads.
142/// assert!(lm.try_acquire(t1, row, LockMode::Exclusive).is_err());
143///
144/// // After both release, an exclusive lock is free to take.
145/// lm.release(t1, row).unwrap();
146/// lm.release(t2, row).unwrap();
147/// lm.try_acquire(t1, row, LockMode::Exclusive).unwrap();
148/// ```
149#[must_use = "a LockManager that is dropped immediately releases every lock it holds"]
150pub struct LockManager {
151 shards: Box<[Shard]>,
152 /// `log2(shards.len())`; `0` when there is a single shard.
153 bits: u32,
154 /// The deadlock-aware wait set: each waiting transaction and the single
155 /// (resource, mode) request it is blocked on. A global mutex, taken only by
156 /// the deadlock-aware [`request`](LockManager::request) path — the
157 /// non-blocking `try_acquire`/`release` fast path never touches it.
158 ///
159 /// Lock ordering: this mutex is always the *outer* lock. `request` takes it
160 /// and then a shard mutex; nothing ever takes a shard mutex and then this
161 /// one. `release_all` clears its own entry in a separate critical section,
162 /// never nested with a shard lock, so no cycle is possible.
163 waits: Mutex<HashMap<TxnId, (ResourceId, LockMode)>>,
164}
165
166/// The outcome of a deadlock-aware [`request`](LockManager::request).
167///
168/// Unlike [`try_acquire`](LockManager::try_acquire), `request` does not just
169/// fail on conflict — it records the wait and tells the caller whether to
170/// proceed, suspend, or abort.
171#[derive(Debug, Clone, PartialEq, Eq)]
172#[must_use = "the outcome decides whether the transaction proceeds, waits, or aborts"]
173pub enum Acquisition {
174 /// The lock was granted; the transaction holds it and may proceed.
175 Granted,
176 /// The lock is held incompatibly. The transaction is now registered as
177 /// waiting; the caller should suspend it and retry `request` later (for
178 /// example after a release). No deadlock was found.
179 Waiting,
180 /// Granting the wait would close a cycle in the wait-for graph. The caller
181 /// must abort the named victim (with [`release_all`](LockManager::release_all))
182 /// to break the deadlock. The victim may be the requesting transaction
183 /// itself or another transaction in the cycle.
184 Deadlock(Deadlock),
185}
186
187impl LockManager {
188 /// Creates a lock manager with a shard count chosen for the current machine.
189 ///
190 /// The count scales with the number of available CPUs (rounded up to a power
191 /// of two) so that contention on any single shard mutex stays low on
192 /// multi-core systems. Use [`with_shards`](Self::with_shards) to pin an
193 /// exact count, for example in tests or on memory-constrained targets.
194 ///
195 /// # Examples
196 ///
197 /// ```
198 /// use lock_db::LockManager;
199 ///
200 /// let lm = LockManager::new();
201 /// assert!(lm.shards().is_power_of_two());
202 /// ```
203 pub fn new() -> Self {
204 let parallelism = std::thread::available_parallelism()
205 .map(|n| n.get())
206 .unwrap_or(1);
207 let target = (parallelism.saturating_mul(4))
208 .next_power_of_two()
209 .clamp(16, 1024);
210 Self::with_shards(target)
211 }
212
213 /// Creates a lock manager with an explicit shard count.
214 ///
215 /// `shards` is rounded up to the next power of two (and a request of `0` is
216 /// treated as `1`), which lets the shard lookup use a shift instead of a
217 /// remainder. More shards reduce contention but cost a mutex and two small
218 /// maps each; fewer shards save memory at the cost of more collisions.
219 ///
220 /// # Examples
221 ///
222 /// ```
223 /// use lock_db::LockManager;
224 ///
225 /// // Rounded up to the next power of two.
226 /// assert_eq!(LockManager::with_shards(5).shards(), 8);
227 /// assert_eq!(LockManager::with_shards(0).shards(), 1);
228 /// ```
229 pub fn with_shards(shards: usize) -> Self {
230 let n = shards.max(1).next_power_of_two();
231 let bits = n.trailing_zeros();
232 let mut v = Vec::with_capacity(n);
233 for _ in 0..n {
234 v.push(Shard {
235 inner: Mutex::new(ShardInner::new()),
236 });
237 }
238 Self {
239 shards: v.into_boxed_slice(),
240 bits,
241 waits: Mutex::new(HashMap::new()),
242 }
243 }
244
245 /// Returns the number of shards in the table.
246 ///
247 /// Always a power of two.
248 #[inline]
249 #[must_use]
250 pub fn shards(&self) -> usize {
251 self.shards.len()
252 }
253
254 /// Tries to acquire `mode` on `res` for `txn` without blocking.
255 ///
256 /// The request is granted immediately and `Ok(())` is returned when:
257 ///
258 /// - `txn` already holds a lock on `res` that [covers](LockMode::covers)
259 /// `mode` (re-acquisition is idempotent, and asking for a weaker mode than
260 /// you hold is a no-op);
261 /// - `txn` already holds `res` in some mode and the
262 /// [join](LockMode::join) of that mode with `mode` is compatible with
263 /// every other holder (an in-place upgrade — for example shared to
264 /// exclusive when sole holder, or shared plus intention-exclusive to SIX);
265 /// or
266 /// - `txn` holds nothing on `res` and `mode` is compatible with every
267 /// current holder.
268 ///
269 /// Otherwise nothing is changed and [`LockError::Conflict`] is returned. The
270 /// caller decides whether to retry, wait, or abort; this method never blocks
271 /// the calling thread.
272 ///
273 /// # Errors
274 ///
275 /// Returns [`LockError::Conflict`] if the lock cannot be granted right now.
276 ///
277 /// # Examples
278 ///
279 /// ```
280 /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
281 ///
282 /// let lm = LockManager::new();
283 /// let key = ResourceId::new(7);
284 /// let t = TxnId::new(1);
285 ///
286 /// // Upgrade a shared lock to exclusive while sole holder.
287 /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
288 /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
289 /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Exclusive));
290 ///
291 /// // A second reader now conflicts with the upgraded exclusive lock.
292 /// let r = lm.try_acquire(TxnId::new(2), key, LockMode::Shared);
293 /// assert_eq!(r, Err(LockError::Conflict));
294 /// ```
295 pub fn try_acquire(
296 &self,
297 txn: TxnId,
298 res: ResourceId,
299 mode: LockMode,
300 ) -> Result<(), LockError> {
301 let mut guard = self.lock_shard(res);
302 let ShardInner { locks, by_txn, .. } = &mut *guard;
303 if Self::try_grant_locked(locks, by_txn, txn, res, mode) {
304 Ok(())
305 } else {
306 Err(LockError::Conflict)
307 }
308 }
309
310 /// Attempts to grant `mode` on `res` to `txn` against an already-locked
311 /// shard. Returns `true` if granted (idempotent re-acquire, in-place
312 /// upgrade, or fresh grant), `false` on conflict. Shared by
313 /// [`try_acquire`](Self::try_acquire) and [`request`](Self::request).
314 fn try_grant_locked(
315 locks: &mut HashMap<ResourceId, LockEntry>,
316 by_txn: &mut HashMap<TxnId, Vec<ResourceId>>,
317 txn: TxnId,
318 res: ResourceId,
319 mode: LockMode,
320 ) -> bool {
321 let entry = locks.entry(res).or_insert_with(LockEntry::new);
322
323 if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
324 let current = entry.holders[pos].mode;
325 if current.covers(mode) {
326 return true;
327 }
328 // Upgrade: the transaction ends up holding the join (least upper
329 // bound) of what it has and what it asked for. The upgraded mode
330 // must be compatible with every *other* holder.
331 let target = current.join(mode);
332 let blocked = entry
333 .holders
334 .iter()
335 .enumerate()
336 .any(|(i, h)| i != pos && !h.mode.compatible_with(target));
337 if blocked {
338 return false;
339 }
340 entry.holders[pos].mode = target;
341 return true;
342 }
343
344 if entry.holders.iter().all(|h| h.mode.compatible_with(mode)) {
345 entry.holders.push(Holder { txn, mode });
346 by_txn.entry(txn).or_default().push(res);
347 true
348 } else {
349 // The entry already had holders (an empty one would have matched the
350 // vacuous `all` above and been granted), so nothing to clean up.
351 false
352 }
353 }
354
355 /// Releases the lock `txn` holds on `res`.
356 ///
357 /// # Errors
358 ///
359 /// Returns [`LockError::NotHeld`] if `txn` holds no lock on `res`, which
360 /// usually means a double release or a bookkeeping mismatch in the caller.
361 ///
362 /// # Examples
363 ///
364 /// ```
365 /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
366 ///
367 /// let lm = LockManager::new();
368 /// let key = ResourceId::new(3);
369 /// let t = TxnId::new(1);
370 ///
371 /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
372 /// lm.release(t, key).unwrap();
373 /// assert_eq!(lm.release(t, key), Err(LockError::NotHeld));
374 /// ```
375 pub fn release(&self, txn: TxnId, res: ResourceId) -> Result<(), LockError> {
376 let mut guard = self.lock_shard(res);
377 let ShardInner { locks, by_txn, .. } = &mut *guard;
378
379 let entry = match locks.get_mut(&res) {
380 Some(entry) => entry,
381 None => return Err(LockError::NotHeld),
382 };
383 let pos = match entry.holders.iter().position(|h| h.txn == txn) {
384 Some(pos) => pos,
385 None => return Err(LockError::NotHeld),
386 };
387
388 let _ = entry.holders.swap_remove(pos);
389 if entry.holders.is_empty() {
390 let _ = locks.remove(&res);
391 }
392 Self::forget_resource(by_txn, txn, res);
393 Ok(())
394 }
395
396 /// Releases every lock held by `txn` across the whole table — both point
397 /// locks and range locks.
398 ///
399 /// This is the call a transaction layer makes at commit or abort to drop a
400 /// transaction's entire lock set at once. It returns the number of locks
401 /// released, and is proportional to that number rather than to the size of
402 /// the table.
403 ///
404 /// # Examples
405 ///
406 /// ```
407 /// use lock_db::{KeyRange, LockManager, LockMode, ResourceId, TxnId};
408 ///
409 /// let lm = LockManager::new();
410 /// let t = TxnId::new(1);
411 /// for id in 0..5 {
412 /// lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive).unwrap();
413 /// }
414 /// lm.try_acquire_range(t, ResourceId::new(99), KeyRange::point(1), LockMode::Shared).unwrap();
415 ///
416 /// assert_eq!(lm.release_all(t), 6); // 5 point locks + 1 range lock
417 /// assert_eq!(lm.release_all(t), 0); // idempotent once empty
418 /// ```
419 pub fn release_all(&self, txn: TxnId) -> usize {
420 // Clear any pending wait first, in its own critical section. This never
421 // nests with a shard lock, so it cannot deadlock against `request`
422 // (which takes `waits` then a shard); see the `waits` field docs.
423 {
424 let mut waits = self.lock_waits();
425 let _ = waits.remove(&txn);
426 }
427
428 let mut released = 0;
429 for shard in self.shards.iter() {
430 let mut guard = Self::lock(shard);
431 let ShardInner {
432 locks,
433 by_txn,
434 ranges,
435 range_by_txn,
436 } = &mut *guard;
437
438 if let Some(resources) = by_txn.remove(&txn) {
439 for res in resources {
440 if let Some(entry) = locks.get_mut(&res) {
441 if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
442 let _ = entry.holders.swap_remove(pos);
443 released += 1;
444 if entry.holders.is_empty() {
445 let _ = locks.remove(&res);
446 }
447 }
448 }
449 }
450 }
451
452 if let Some(spaces) = range_by_txn.remove(&txn) {
453 for (space, range) in spaces {
454 if let Some(rs) = ranges.get_mut(&space) {
455 if let Some(pos) = rs
456 .holders
457 .iter()
458 .position(|h| h.txn == txn && h.range == range)
459 {
460 let _ = rs.holders.swap_remove(pos);
461 released += 1;
462 if rs.holders.is_empty() {
463 let _ = ranges.remove(&space);
464 }
465 }
466 }
467 }
468 }
469 }
470 released
471 }
472
473 /// Acquires `mode` on `res` for `txn`, registering a wait and detecting
474 /// deadlock if it cannot be granted.
475 ///
476 /// This is the deadlock-aware counterpart to
477 /// [`try_acquire`](Self::try_acquire). The three outcomes are:
478 ///
479 /// - [`Acquisition::Granted`] — the lock was granted; proceed.
480 /// - [`Acquisition::Waiting`] — the lock is held incompatibly and `txn` is
481 /// now recorded in the wait-for graph. The caller should suspend the
482 /// transaction and call `request` again later (for example after a
483 /// release) to retry. No deadlock was found.
484 /// - [`Acquisition::Deadlock`] — granting the wait would close a cycle. The
485 /// caller must abort the [`Deadlock::victim`] with
486 /// [`release_all`](Self::release_all). The victim may be `txn` or another
487 /// transaction in the cycle.
488 ///
489 /// Detection is exact: the wait-for graph is rebuilt from the current lock
490 /// table on every call, so a wait left over from a lock that has since been
491 /// released contributes no edge, and a transaction is never reported as
492 /// deadlocked unless it genuinely is. The victim is chosen by the
493 /// [`VictimPolicy::Youngest`] policy; callers wanting a different policy can
494 /// apply [`WaitForGraph::pick_victim`] to [`Deadlock::cycle`] themselves.
495 ///
496 /// Only transactions that wait through `request` appear in the graph; a
497 /// transaction that spins on `try_acquire` is invisible to deadlock
498 /// detection. Range locks ([`try_acquire_range`](Self::try_acquire_range))
499 /// are likewise not tracked here.
500 ///
501 /// `request` serializes on a single wait-registry mutex, unlike the sharded
502 /// `try_acquire`; it is the path to use when you need deadlock detection.
503 ///
504 /// # Examples
505 ///
506 /// ```
507 /// use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
508 ///
509 /// let lm = LockManager::new();
510 /// let (a, b) = (ResourceId::new(1), ResourceId::new(2));
511 /// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
512 ///
513 /// // T1 holds A, T2 holds B.
514 /// assert_eq!(lm.request(t1, a, LockMode::Exclusive), Acquisition::Granted);
515 /// assert_eq!(lm.request(t2, b, LockMode::Exclusive), Acquisition::Granted);
516 ///
517 /// // T1 waits for B (held by T2): no cycle yet.
518 /// assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
519 ///
520 /// // T2 now waits for A (held by T1): that closes the cycle.
521 /// match lm.request(t2, a, LockMode::Exclusive) {
522 /// Acquisition::Deadlock(d) => {
523 /// assert_eq!(d.victim, TxnId::new(2)); // youngest in the cycle
524 /// lm.release_all(d.victim); // abort to break the deadlock
525 /// }
526 /// other => panic!("expected a deadlock, got {other:?}"),
527 /// }
528 /// ```
529 pub fn request(&self, txn: TxnId, res: ResourceId, mode: LockMode) -> Acquisition {
530 // `waits` is the outer lock; the grant attempt and graph build both take
531 // shard locks underneath it, never the reverse.
532 let mut waits = self.lock_waits();
533
534 let granted = {
535 let mut guard = self.lock_shard(res);
536 let ShardInner { locks, by_txn, .. } = &mut *guard;
537 Self::try_grant_locked(locks, by_txn, txn, res, mode)
538 };
539 if granted {
540 let _ = waits.remove(&txn);
541 return Acquisition::Granted;
542 }
543
544 let _ = waits.insert(txn, (res, mode));
545 let graph = self.build_wait_graph(&waits);
546 match graph.cycle_from(txn) {
547 Some(cycle) => {
548 let victim =
549 WaitForGraph::pick_victim(&cycle, DEADLOCK_VICTIM_POLICY).unwrap_or(txn);
550 Acquisition::Deadlock(Deadlock { victim, cycle })
551 }
552 None => Acquisition::Waiting,
553 }
554 }
555
556 /// Removes any pending wait for `txn` from the wait-for graph.
557 ///
558 /// Call this when a transaction that previously got [`Acquisition::Waiting`]
559 /// stops waiting without acquiring the lock (for example it timed out or was
560 /// aborted for another reason). [`release_all`](Self::release_all) already
561 /// clears the wait, so this is only needed when releasing nothing.
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
567 ///
568 /// let lm = LockManager::new();
569 /// let res = ResourceId::new(1);
570 /// lm.request(TxnId::new(1), res, LockMode::Exclusive);
571 /// // T2 waits, then gives up.
572 /// assert_eq!(lm.request(TxnId::new(2), res, LockMode::Exclusive), Acquisition::Waiting);
573 /// lm.cancel_wait(TxnId::new(2));
574 /// assert_eq!(lm.waiting_count(), 0);
575 /// ```
576 pub fn cancel_wait(&self, txn: TxnId) {
577 let mut waits = self.lock_waits();
578 let _ = waits.remove(&txn);
579 }
580
581 /// Scans the current wait set for a deadlock, returning one if found.
582 ///
583 /// This is the periodic-detection counterpart to the at-wait detection in
584 /// [`request`](Self::request): a background task can call it on an interval
585 /// instead of (or in addition to) acting on `request`'s result. It rebuilds
586 /// the wait-for graph from the current lock table, so it reports only
587 /// genuine deadlocks. Returns `None` when no cycle exists.
588 ///
589 /// # Examples
590 ///
591 /// ```
592 /// use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
593 ///
594 /// let lm = LockManager::new();
595 /// let (a, b) = (ResourceId::new(1), ResourceId::new(2));
596 /// lm.request(TxnId::new(1), a, LockMode::Exclusive);
597 /// lm.request(TxnId::new(2), b, LockMode::Exclusive);
598 /// lm.request(TxnId::new(1), b, LockMode::Exclusive); // T1 waits for T2
599 /// assert!(lm.find_deadlock().is_none());
600 /// lm.request(TxnId::new(2), a, LockMode::Exclusive); // T2 waits for T1: cycle
601 /// assert!(lm.find_deadlock().is_some());
602 /// ```
603 #[must_use]
604 pub fn find_deadlock(&self) -> Option<Deadlock> {
605 let waits = self.lock_waits();
606 let graph = self.build_wait_graph(&waits);
607 let cycle = graph.detect_cycle()?;
608 let victim = WaitForGraph::pick_victim(&cycle, DEADLOCK_VICTIM_POLICY)?;
609 Some(Deadlock { victim, cycle })
610 }
611
612 /// Returns the number of transactions currently registered as waiting.
613 ///
614 /// Mostly useful for diagnostics and tests.
615 #[must_use]
616 pub fn waiting_count(&self) -> usize {
617 self.lock_waits().len()
618 }
619
620 /// Builds a wait-for graph from the live wait set, reading the *current*
621 /// holders of each waited resource from the lock table. Rebuilding from
622 /// truth on every detection is what keeps detection from acting on a stale
623 /// edge. Called while holding the `waits` lock; takes shard locks underneath.
624 fn build_wait_graph(&self, waits: &HashMap<TxnId, (ResourceId, LockMode)>) -> WaitForGraph {
625 let mut graph = WaitForGraph::new();
626 for (&waiter, &(res, mode)) in waits {
627 let blockers = self.holders_blocking(waiter, res, mode);
628 graph.add_waits(waiter, &blockers);
629 }
630 graph
631 }
632
633 /// Returns the transactions, other than `waiter`, that currently hold `res`
634 /// in a mode incompatible with `mode` — the transactions `waiter` is blocked
635 /// by.
636 fn holders_blocking(&self, waiter: TxnId, res: ResourceId, mode: LockMode) -> Vec<TxnId> {
637 let guard = self.lock_shard(res);
638 guard.locks.get(&res).map_or_else(Vec::new, |entry| {
639 entry
640 .holders
641 .iter()
642 .filter(|h| h.txn != waiter && !h.mode.compatible_with(mode))
643 .map(|h| h.txn)
644 .collect()
645 })
646 }
647
648 /// Locks the wait registry, recovering its guard if the mutex was poisoned.
649 #[inline]
650 fn lock_waits(&self) -> MutexGuard<'_, HashMap<TxnId, (ResourceId, LockMode)>> {
651 match self.waits.lock() {
652 Ok(guard) => guard,
653 Err(poisoned) => poisoned.into_inner(),
654 }
655 }
656
657 /// Returns the number of transactions currently holding `res`.
658 ///
659 /// Mostly useful for diagnostics and tests; in steady state this is `0`,
660 /// `1` for an exclusive lock, or the reader count for a shared lock.
661 ///
662 /// # Examples
663 ///
664 /// ```
665 /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
666 ///
667 /// let lm = LockManager::new();
668 /// let key = ResourceId::new(1);
669 /// assert_eq!(lm.holder_count(key), 0);
670 /// lm.try_acquire(TxnId::new(1), key, LockMode::Shared).unwrap();
671 /// assert_eq!(lm.holder_count(key), 1);
672 /// ```
673 #[must_use]
674 pub fn holder_count(&self, res: ResourceId) -> usize {
675 let guard = self.lock_shard(res);
676 guard.locks.get(&res).map_or(0, |e| e.holders.len())
677 }
678
679 /// Returns the mode in which `txn` holds `res`, or `None` if it holds no
680 /// lock on it.
681 ///
682 /// # Examples
683 ///
684 /// ```
685 /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
686 ///
687 /// let lm = LockManager::new();
688 /// let key = ResourceId::new(1);
689 /// let t = TxnId::new(1);
690 /// assert_eq!(lm.mode_held(t, key), None);
691 /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
692 /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Shared));
693 /// ```
694 #[must_use]
695 pub fn mode_held(&self, txn: TxnId, res: ResourceId) -> Option<LockMode> {
696 let guard = self.lock_shard(res);
697 guard
698 .locks
699 .get(&res)
700 .and_then(|e| e.holders.iter().find(|h| h.txn == txn))
701 .map(|h| h.mode)
702 }
703
704 /// Tries to acquire `mode` over the key range `range` in key space `space`,
705 /// for `txn`, without blocking.
706 ///
707 /// A range lock protects a contiguous span of keys — use it to stop another
708 /// transaction from inserting into, or writing within, a range you have
709 /// read (phantom and predicate protection). `space` identifies the key space
710 /// the range lives in, typically an index; ranges in different spaces never
711 /// conflict.
712 ///
713 /// The request is granted unless some **other** transaction already holds an
714 /// [overlapping](KeyRange::overlaps) range in `space` in an
715 /// [incompatible](LockMode::compatible_with) mode. The same transaction may
716 /// hold several ranges in a space, including overlapping ones; range locks
717 /// are not merged or upgraded.
718 ///
719 /// # Errors
720 ///
721 /// Returns [`LockError::Conflict`] if an overlapping, incompatible range is
722 /// held by another transaction.
723 ///
724 /// # Examples
725 ///
726 /// ```
727 /// use lock_db::{KeyRange, LockError, LockManager, LockMode, ResourceId, TxnId};
728 ///
729 /// let lm = LockManager::new();
730 /// let index = ResourceId::new(1);
731 ///
732 /// // A read lock over [100, 200].
733 /// lm.try_acquire_range(TxnId::new(1), index, KeyRange::new(100, 200).unwrap(), LockMode::Shared).unwrap();
734 ///
735 /// // Another reader may share the overlapping range...
736 /// lm.try_acquire_range(TxnId::new(2), index, KeyRange::new(150, 250).unwrap(), LockMode::Shared).unwrap();
737 ///
738 /// // ...but a writer inside it conflicts.
739 /// assert_eq!(
740 /// lm.try_acquire_range(TxnId::new(3), index, KeyRange::point(150), LockMode::Exclusive),
741 /// Err(LockError::Conflict),
742 /// );
743 /// ```
744 pub fn try_acquire_range(
745 &self,
746 txn: TxnId,
747 space: ResourceId,
748 range: KeyRange,
749 mode: LockMode,
750 ) -> Result<(), LockError> {
751 let mut guard = self.lock_shard(space);
752 let ShardInner {
753 ranges,
754 range_by_txn,
755 ..
756 } = &mut *guard;
757 let rs = ranges.entry(space).or_insert_with(RangeSpace::new);
758
759 let conflict = rs
760 .holders
761 .iter()
762 .any(|h| h.txn != txn && h.range.overlaps(range) && !h.mode.compatible_with(mode));
763 if conflict {
764 // A conflict implies a pre-existing holder, so the space entry is
765 // non-empty and there is nothing to clean up.
766 return Err(LockError::Conflict);
767 }
768
769 rs.holders.push(RangeHolder { txn, range, mode });
770 range_by_txn.entry(txn).or_default().push((space, range));
771 Ok(())
772 }
773
774 /// Releases a range lock `txn` holds over `range` in `space`.
775 ///
776 /// Matches on the transaction and the exact range. If the transaction holds
777 /// several locks on the identical range (in different modes), one is
778 /// released per call.
779 ///
780 /// # Errors
781 ///
782 /// Returns [`LockError::NotHeld`] if `txn` holds no lock on that exact range
783 /// in `space`.
784 ///
785 /// # Examples
786 ///
787 /// ```
788 /// use lock_db::{KeyRange, LockError, LockManager, LockMode, ResourceId, TxnId};
789 ///
790 /// let lm = LockManager::new();
791 /// let index = ResourceId::new(1);
792 /// let r = KeyRange::new(1, 10).unwrap();
793 /// let t = TxnId::new(1);
794 ///
795 /// lm.try_acquire_range(t, index, r, LockMode::Exclusive).unwrap();
796 /// lm.release_range(t, index, r).unwrap();
797 /// assert_eq!(lm.release_range(t, index, r), Err(LockError::NotHeld));
798 /// ```
799 pub fn release_range(
800 &self,
801 txn: TxnId,
802 space: ResourceId,
803 range: KeyRange,
804 ) -> Result<(), LockError> {
805 let mut guard = self.lock_shard(space);
806 let ShardInner {
807 ranges,
808 range_by_txn,
809 ..
810 } = &mut *guard;
811
812 let rs = match ranges.get_mut(&space) {
813 Some(rs) => rs,
814 None => return Err(LockError::NotHeld),
815 };
816 let pos = match rs
817 .holders
818 .iter()
819 .position(|h| h.txn == txn && h.range == range)
820 {
821 Some(pos) => pos,
822 None => return Err(LockError::NotHeld),
823 };
824
825 let _ = rs.holders.swap_remove(pos);
826 if rs.holders.is_empty() {
827 let _ = ranges.remove(&space);
828 }
829 Self::forget_range(range_by_txn, txn, space, range);
830 Ok(())
831 }
832
833 /// Returns the number of range locks currently held in `space`.
834 ///
835 /// Counts every holder, across all transactions and modes. Mostly useful
836 /// for diagnostics and tests.
837 ///
838 /// # Examples
839 ///
840 /// ```
841 /// use lock_db::{KeyRange, LockManager, LockMode, ResourceId, TxnId};
842 ///
843 /// let lm = LockManager::new();
844 /// let index = ResourceId::new(1);
845 /// assert_eq!(lm.range_count(index), 0);
846 /// lm.try_acquire_range(TxnId::new(1), index, KeyRange::point(1), LockMode::Shared).unwrap();
847 /// assert_eq!(lm.range_count(index), 1);
848 /// ```
849 #[must_use]
850 pub fn range_count(&self, space: ResourceId) -> usize {
851 let guard = self.lock_shard(space);
852 guard.ranges.get(&space).map_or(0, |rs| rs.holders.len())
853 }
854
855 /// Drops `res` from a transaction's reverse-index entry, removing the entry
856 /// entirely once the transaction holds nothing else in the shard.
857 #[inline]
858 fn forget_resource(by_txn: &mut HashMap<TxnId, Vec<ResourceId>>, txn: TxnId, res: ResourceId) {
859 if let Some(resources) = by_txn.get_mut(&txn) {
860 if let Some(pos) = resources.iter().position(|r| *r == res) {
861 let _ = resources.swap_remove(pos);
862 }
863 if resources.is_empty() {
864 let _ = by_txn.remove(&txn);
865 }
866 }
867 }
868
869 /// Drops one `(space, range)` pair from a transaction's range reverse-index
870 /// entry, removing the entry entirely once it is empty.
871 #[inline]
872 fn forget_range(
873 range_by_txn: &mut HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
874 txn: TxnId,
875 space: ResourceId,
876 range: KeyRange,
877 ) {
878 if let Some(held) = range_by_txn.get_mut(&txn) {
879 if let Some(pos) = held.iter().position(|(s, r)| *s == space && *r == range) {
880 let _ = held.swap_remove(pos);
881 }
882 if held.is_empty() {
883 let _ = range_by_txn.remove(&txn);
884 }
885 }
886 }
887
888 /// Locks and returns the shard that owns `res`.
889 #[inline]
890 fn lock_shard(&self, res: ResourceId) -> MutexGuard<'_, ShardInner> {
891 Self::lock(&self.shards[self.shard_index(res)])
892 }
893
894 /// Locks a shard, recovering its guard if the mutex was poisoned.
895 ///
896 /// Critical sections in this module perform only infallible map and vector
897 /// operations and never panic, so poisoning cannot leave inconsistent
898 /// state. Recovering the guard keeps the lock manager available rather than
899 /// propagating a poison error that no caller could act on.
900 #[inline]
901 fn lock(shard: &Shard) -> MutexGuard<'_, ShardInner> {
902 match shard.inner.lock() {
903 Ok(guard) => guard,
904 Err(poisoned) => poisoned.into_inner(),
905 }
906 }
907
908 /// Maps a resource id to a shard index via Fibonacci hashing.
909 #[inline]
910 fn shard_index(&self, res: ResourceId) -> usize {
911 if self.bits == 0 {
912 return 0;
913 }
914 let hash = res.get().wrapping_mul(FIB_HASH);
915 // Take the top `bits` bits: the most-mixed end of a multiplicative hash.
916 (hash >> (u64::BITS - self.bits)) as usize
917 }
918}
919
920impl Default for LockManager {
921 fn default() -> Self {
922 Self::new()
923 }
924}
925
926#[cfg(all(test, not(loom)))]
927#[allow(clippy::unwrap_used)]
928mod tests {
929 use super::{Acquisition, FIB_HASH, LockManager};
930 use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
931
932 fn ids(t: u64, r: u64) -> (TxnId, ResourceId) {
933 (TxnId::new(t), ResourceId::new(r))
934 }
935
936 fn kr(start: u64, end: u64) -> KeyRange {
937 KeyRange::new(start, end).unwrap()
938 }
939
940 #[test]
941 fn test_shared_locks_coexist() {
942 let lm = LockManager::new();
943 let r = ResourceId::new(1);
944 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
945 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
946 lm.try_acquire(TxnId::new(3), r, LockMode::Shared).unwrap();
947 assert_eq!(lm.holder_count(r), 3);
948 }
949
950 #[test]
951 fn test_exclusive_excludes_shared() {
952 let lm = LockManager::new();
953 let (t1, r) = ids(1, 1);
954 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
955 assert_eq!(
956 lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
957 Err(LockError::Conflict)
958 );
959 }
960
961 #[test]
962 fn test_intention_shared_and_intention_exclusive_coexist() {
963 let lm = LockManager::new();
964 let r = ResourceId::new(1);
965 lm.try_acquire(TxnId::new(1), r, LockMode::IntentionShared)
966 .unwrap();
967 lm.try_acquire(TxnId::new(2), r, LockMode::IntentionExclusive)
968 .unwrap();
969 assert_eq!(lm.holder_count(r), 2);
970 }
971
972 #[test]
973 fn test_intention_exclusive_blocks_shared() {
974 let lm = LockManager::new();
975 let r = ResourceId::new(1);
976 lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive)
977 .unwrap();
978 assert_eq!(
979 lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
980 Err(LockError::Conflict)
981 );
982 // ...but another IX or an IS is fine.
983 lm.try_acquire(TxnId::new(3), r, LockMode::IntentionExclusive)
984 .unwrap();
985 lm.try_acquire(TxnId::new(4), r, LockMode::IntentionShared)
986 .unwrap();
987 }
988
989 #[test]
990 fn test_shared_plus_intention_exclusive_upgrades_to_six() {
991 let lm = LockManager::new();
992 let r = ResourceId::new(1);
993 let t = TxnId::new(1);
994 lm.try_acquire(t, r, LockMode::Shared).unwrap();
995 // Same txn now intends to write part of the subtree: S join IX = SIX.
996 lm.try_acquire(t, r, LockMode::IntentionExclusive).unwrap();
997 assert_eq!(lm.mode_held(t, r), Some(LockMode::SharedIntentionExclusive));
998 // An intention-shared holder still coexists with SIX.
999 lm.try_acquire(TxnId::new(2), r, LockMode::IntentionShared)
1000 .unwrap();
1001 // But a second reader does not.
1002 assert_eq!(
1003 lm.try_acquire(TxnId::new(3), r, LockMode::Shared),
1004 Err(LockError::Conflict)
1005 );
1006 }
1007
1008 #[test]
1009 fn test_intention_shared_upgrades_to_exclusive_when_sole_holder() {
1010 let lm = LockManager::new();
1011 let r = ResourceId::new(1);
1012 let t = TxnId::new(1);
1013 lm.try_acquire(t, r, LockMode::IntentionShared).unwrap();
1014 lm.try_acquire(t, r, LockMode::Exclusive).unwrap();
1015 assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
1016 }
1017
1018 #[test]
1019 fn test_upgrade_to_six_blocked_by_other_reader() {
1020 let lm = LockManager::new();
1021 let r = ResourceId::new(1);
1022 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1023 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1024 // Txn 1 wants IX too (-> SIX), but SIX is incompatible with txn 2's S.
1025 assert_eq!(
1026 lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive),
1027 Err(LockError::Conflict)
1028 );
1029 // The original shared lock is intact.
1030 assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
1031 }
1032
1033 #[test]
1034 fn test_hierarchy_protocol_row_write_under_table_intent() {
1035 // Model a database/table/page/row hierarchy as four resources, and run
1036 // the standard protocol: IX coarse-to-fine, then X on the row.
1037 let lm = LockManager::new();
1038 let (db, table, page, row) = (
1039 ResourceId::new(1),
1040 ResourceId::new(2),
1041 ResourceId::new(3),
1042 ResourceId::new(4),
1043 );
1044 let writer = TxnId::new(1);
1045 for res in [db, table, page] {
1046 lm.try_acquire(writer, res, LockMode::IntentionExclusive)
1047 .unwrap();
1048 }
1049 lm.try_acquire(writer, row, LockMode::Exclusive).unwrap();
1050
1051 // A concurrent reader can still take IS down to a different page/row.
1052 let reader = TxnId::new(2);
1053 for res in [db, table] {
1054 lm.try_acquire(reader, res, LockMode::IntentionShared)
1055 .unwrap();
1056 }
1057 // But it cannot read the row the writer holds exclusively.
1058 assert_eq!(
1059 lm.try_acquire(reader, row, LockMode::Shared),
1060 Err(LockError::Conflict)
1061 );
1062 }
1063
1064 #[test]
1065 fn test_exclusive_excludes_exclusive() {
1066 let lm = LockManager::new();
1067 let (t1, r) = ids(1, 1);
1068 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1069 assert_eq!(
1070 lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
1071 Err(LockError::Conflict)
1072 );
1073 }
1074
1075 #[test]
1076 fn test_shared_blocks_other_exclusive() {
1077 let lm = LockManager::new();
1078 let (t1, r) = ids(1, 1);
1079 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1080 assert_eq!(
1081 lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
1082 Err(LockError::Conflict)
1083 );
1084 }
1085
1086 #[test]
1087 fn test_reacquire_same_mode_is_idempotent() {
1088 let lm = LockManager::new();
1089 let (t1, r) = ids(1, 1);
1090 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1091 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1092 assert_eq!(lm.holder_count(r), 1);
1093 }
1094
1095 #[test]
1096 fn test_request_weaker_than_held_is_noop() {
1097 let lm = LockManager::new();
1098 let (t1, r) = ids(1, 1);
1099 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1100 // Asking for shared while holding exclusive keeps the stronger mode.
1101 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1102 assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
1103 assert_eq!(lm.holder_count(r), 1);
1104 }
1105
1106 #[test]
1107 fn test_upgrade_sole_holder_succeeds() {
1108 let lm = LockManager::new();
1109 let (t1, r) = ids(1, 1);
1110 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1111 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1112 assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
1113 assert_eq!(lm.holder_count(r), 1);
1114 }
1115
1116 #[test]
1117 fn test_upgrade_blocked_by_other_reader() {
1118 let lm = LockManager::new();
1119 let r = ResourceId::new(1);
1120 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1121 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1122 assert_eq!(
1123 lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive),
1124 Err(LockError::Conflict)
1125 );
1126 // The failed upgrade left the original shared lock intact.
1127 assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
1128 }
1129
1130 #[test]
1131 fn test_release_frees_resource_for_exclusive() {
1132 let lm = LockManager::new();
1133 let r = ResourceId::new(1);
1134 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1135 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1136 lm.release(TxnId::new(1), r).unwrap();
1137 // One reader remains, exclusive still blocked.
1138 assert!(
1139 lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
1140 .is_err()
1141 );
1142 lm.release(TxnId::new(2), r).unwrap();
1143 lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
1144 .unwrap();
1145 }
1146
1147 #[test]
1148 fn test_release_not_held_errors() {
1149 let lm = LockManager::new();
1150 let (t1, r) = ids(1, 1);
1151 assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
1152 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1153 assert_eq!(lm.release(TxnId::new(9), r), Err(LockError::NotHeld));
1154 }
1155
1156 #[test]
1157 fn test_double_release_errors() {
1158 let lm = LockManager::new();
1159 let (t1, r) = ids(1, 1);
1160 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1161 lm.release(t1, r).unwrap();
1162 assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
1163 }
1164
1165 #[test]
1166 fn test_release_all_drops_every_lock() {
1167 let lm = LockManager::with_shards(8);
1168 let t = TxnId::new(1);
1169 for id in 0..50 {
1170 lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
1171 .unwrap();
1172 }
1173 assert_eq!(lm.release_all(t), 50);
1174 for id in 0..50 {
1175 assert_eq!(lm.holder_count(ResourceId::new(id)), 0);
1176 }
1177 assert_eq!(lm.release_all(t), 0);
1178 }
1179
1180 #[test]
1181 fn test_release_all_leaves_other_txns_alone() {
1182 let lm = LockManager::new();
1183 let r = ResourceId::new(1);
1184 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1185 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1186 assert_eq!(lm.release_all(TxnId::new(1)), 1);
1187 assert_eq!(lm.mode_held(TxnId::new(2), r), Some(LockMode::Shared));
1188 assert_eq!(lm.holder_count(r), 1);
1189 }
1190
1191 #[test]
1192 fn test_resource_fully_released_can_be_taken_exclusively() {
1193 let lm = LockManager::new();
1194 let r = ResourceId::new(42);
1195 lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive)
1196 .unwrap();
1197 lm.release(TxnId::new(1), r).unwrap();
1198 assert_eq!(lm.holder_count(r), 0);
1199 lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive)
1200 .unwrap();
1201 }
1202
1203 // ---- range locks ----
1204
1205 #[test]
1206 fn test_range_shared_overlap_coexists() {
1207 let lm = LockManager::new();
1208 let space = ResourceId::new(1);
1209 lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
1210 .unwrap();
1211 lm.try_acquire_range(TxnId::new(2), space, kr(50, 150), LockMode::Shared)
1212 .unwrap();
1213 assert_eq!(lm.range_count(space), 2);
1214 }
1215
1216 #[test]
1217 fn test_range_exclusive_conflicts_on_overlap() {
1218 let lm = LockManager::new();
1219 let space = ResourceId::new(1);
1220 lm.try_acquire_range(TxnId::new(1), space, kr(100, 200), LockMode::Shared)
1221 .unwrap();
1222 assert_eq!(
1223 lm.try_acquire_range(
1224 TxnId::new(2),
1225 space,
1226 KeyRange::point(150),
1227 LockMode::Exclusive
1228 ),
1229 Err(LockError::Conflict)
1230 );
1231 }
1232
1233 #[test]
1234 fn test_range_disjoint_ranges_do_not_conflict() {
1235 let lm = LockManager::new();
1236 let space = ResourceId::new(1);
1237 lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
1238 .unwrap();
1239 lm.try_acquire_range(TxnId::new(2), space, kr(101, 200), LockMode::Exclusive)
1240 .unwrap();
1241 }
1242
1243 #[test]
1244 fn test_range_adjacent_inclusive_bounds_conflict() {
1245 let lm = LockManager::new();
1246 let space = ResourceId::new(1);
1247 lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
1248 .unwrap();
1249 // [100, 200] shares key 100 with [0, 100].
1250 assert_eq!(
1251 lm.try_acquire_range(TxnId::new(2), space, kr(100, 200), LockMode::Shared),
1252 Err(LockError::Conflict)
1253 );
1254 }
1255
1256 #[test]
1257 fn test_range_different_spaces_independent() {
1258 let lm = LockManager::new();
1259 lm.try_acquire_range(
1260 TxnId::new(1),
1261 ResourceId::new(1),
1262 kr(0, 100),
1263 LockMode::Exclusive,
1264 )
1265 .unwrap();
1266 // Same range, different space: no conflict.
1267 lm.try_acquire_range(
1268 TxnId::new(2),
1269 ResourceId::new(2),
1270 kr(0, 100),
1271 LockMode::Exclusive,
1272 )
1273 .unwrap();
1274 }
1275
1276 #[test]
1277 fn test_range_same_txn_overlap_allowed() {
1278 let lm = LockManager::new();
1279 let space = ResourceId::new(1);
1280 let t = TxnId::new(1);
1281 lm.try_acquire_range(t, space, kr(0, 100), LockMode::Exclusive)
1282 .unwrap();
1283 // A transaction does not conflict with its own ranges.
1284 lm.try_acquire_range(t, space, kr(50, 150), LockMode::Exclusive)
1285 .unwrap();
1286 assert_eq!(lm.range_count(space), 2);
1287 }
1288
1289 #[test]
1290 fn test_range_release_frees_overlap() {
1291 let lm = LockManager::new();
1292 let space = ResourceId::new(1);
1293 let r = kr(100, 200);
1294 lm.try_acquire_range(TxnId::new(1), space, r, LockMode::Exclusive)
1295 .unwrap();
1296 lm.release_range(TxnId::new(1), space, r).unwrap();
1297 assert_eq!(lm.range_count(space), 0);
1298 // Now another writer can take an overlapping range.
1299 lm.try_acquire_range(
1300 TxnId::new(2),
1301 space,
1302 KeyRange::point(150),
1303 LockMode::Exclusive,
1304 )
1305 .unwrap();
1306 }
1307
1308 #[test]
1309 fn test_range_release_not_held_errors() {
1310 let lm = LockManager::new();
1311 let space = ResourceId::new(1);
1312 assert_eq!(
1313 lm.release_range(TxnId::new(1), space, kr(0, 10)),
1314 Err(LockError::NotHeld)
1315 );
1316 lm.try_acquire_range(TxnId::new(1), space, kr(0, 10), LockMode::Shared)
1317 .unwrap();
1318 // Wrong range is NotHeld.
1319 assert_eq!(
1320 lm.release_range(TxnId::new(1), space, kr(0, 11)),
1321 Err(LockError::NotHeld)
1322 );
1323 }
1324
1325 #[test]
1326 fn test_release_all_drops_point_and_range_locks() {
1327 let lm = LockManager::new();
1328 let t = TxnId::new(1);
1329 for id in 0..3 {
1330 lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
1331 .unwrap();
1332 }
1333 lm.try_acquire_range(t, ResourceId::new(100), kr(0, 10), LockMode::Shared)
1334 .unwrap();
1335 lm.try_acquire_range(t, ResourceId::new(100), kr(20, 30), LockMode::Shared)
1336 .unwrap();
1337 assert_eq!(lm.release_all(t), 5); // 3 point + 2 range
1338 assert_eq!(lm.range_count(ResourceId::new(100)), 0);
1339 assert_eq!(lm.release_all(t), 0);
1340 }
1341
1342 #[test]
1343 fn test_release_all_range_leaves_other_txn() {
1344 let lm = LockManager::new();
1345 let space = ResourceId::new(1);
1346 lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
1347 .unwrap();
1348 lm.try_acquire_range(TxnId::new(2), space, kr(0, 100), LockMode::Shared)
1349 .unwrap();
1350 assert_eq!(lm.release_all(TxnId::new(1)), 1);
1351 assert_eq!(lm.range_count(space), 1);
1352 }
1353
1354 #[test]
1355 fn test_range_intention_modes_coexist() {
1356 // IS and IX range locks are compatible, just like point locks.
1357 let lm = LockManager::new();
1358 let space = ResourceId::new(1);
1359 lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::IntentionShared)
1360 .unwrap();
1361 lm.try_acquire_range(
1362 TxnId::new(2),
1363 space,
1364 kr(0, 100),
1365 LockMode::IntentionExclusive,
1366 )
1367 .unwrap();
1368 assert_eq!(lm.range_count(space), 2);
1369 }
1370
1371 // ---- deadlock-aware request ----
1372
1373 #[test]
1374 fn test_request_granted_on_free_resource() {
1375 let lm = LockManager::new();
1376 let (t, r) = ids(1, 1);
1377 assert_eq!(lm.request(t, r, LockMode::Exclusive), Acquisition::Granted);
1378 assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
1379 assert_eq!(lm.waiting_count(), 0);
1380 }
1381
1382 #[test]
1383 fn test_request_waiting_registers_wait() {
1384 let lm = LockManager::new();
1385 let r = ResourceId::new(1);
1386 assert_eq!(
1387 lm.request(TxnId::new(1), r, LockMode::Exclusive),
1388 Acquisition::Granted
1389 );
1390 assert_eq!(
1391 lm.request(TxnId::new(2), r, LockMode::Exclusive),
1392 Acquisition::Waiting
1393 );
1394 assert_eq!(lm.waiting_count(), 1);
1395 }
1396
1397 #[test]
1398 fn test_request_grant_clears_prior_wait() {
1399 let lm = LockManager::new();
1400 let r = ResourceId::new(1);
1401 let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
1402 assert_eq!(
1403 lm.request(TxnId::new(2), r, LockMode::Exclusive),
1404 Acquisition::Waiting
1405 );
1406 // T1 releases; T2 retries and is granted, clearing its wait.
1407 lm.release(TxnId::new(1), r).unwrap();
1408 assert_eq!(
1409 lm.request(TxnId::new(2), r, LockMode::Exclusive),
1410 Acquisition::Granted
1411 );
1412 assert_eq!(lm.waiting_count(), 0);
1413 }
1414
1415 #[test]
1416 fn test_classic_two_transaction_deadlock() {
1417 let lm = LockManager::new();
1418 let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1419 let (t1, t2) = (TxnId::new(1), TxnId::new(2));
1420
1421 assert_eq!(lm.request(t1, a, LockMode::Exclusive), Acquisition::Granted);
1422 assert_eq!(lm.request(t2, b, LockMode::Exclusive), Acquisition::Granted);
1423 assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
1424
1425 match lm.request(t2, a, LockMode::Exclusive) {
1426 Acquisition::Deadlock(d) => {
1427 assert_eq!(d.victim, t2); // youngest in the cycle
1428 assert_eq!(d.cycle.len(), 2);
1429 assert!(d.cycle.contains(&t1) && d.cycle.contains(&t2));
1430 }
1431 other => panic!("expected deadlock, got {other:?}"),
1432 }
1433 }
1434
1435 #[test]
1436 fn test_three_transaction_deadlock_cycle() {
1437 let lm = LockManager::new();
1438 let (a, b, c) = (ResourceId::new(1), ResourceId::new(2), ResourceId::new(3));
1439 let (t1, t2, t3) = (TxnId::new(1), TxnId::new(2), TxnId::new(3));
1440
1441 let _ = lm.request(t1, a, LockMode::Exclusive);
1442 let _ = lm.request(t2, b, LockMode::Exclusive);
1443 let _ = lm.request(t3, c, LockMode::Exclusive);
1444 // T1->B(T2), T2->C(T3), T3->A(T1): closes the loop on the third wait.
1445 assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
1446 assert_eq!(lm.request(t2, c, LockMode::Exclusive), Acquisition::Waiting);
1447 match lm.request(t3, a, LockMode::Exclusive) {
1448 Acquisition::Deadlock(d) => {
1449 assert_eq!(d.cycle.len(), 3);
1450 assert_eq!(d.victim, t3); // youngest
1451 }
1452 other => panic!("expected deadlock, got {other:?}"),
1453 }
1454 }
1455
1456 #[test]
1457 fn test_aborting_victim_breaks_deadlock() {
1458 let lm = LockManager::new();
1459 let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1460 let (t1, t2) = (TxnId::new(1), TxnId::new(2));
1461
1462 let _ = lm.request(t1, a, LockMode::Exclusive);
1463 let _ = lm.request(t2, b, LockMode::Exclusive);
1464 let _ = lm.request(t1, b, LockMode::Exclusive);
1465 let victim = match lm.request(t2, a, LockMode::Exclusive) {
1466 Acquisition::Deadlock(d) => d.victim,
1467 other => panic!("expected deadlock, got {other:?}"),
1468 };
1469 // Abort the victim: releases its locks and clears its wait.
1470 lm.release_all(victim);
1471 // The other transaction can now make progress.
1472 let survivor = if victim == t1 { t2 } else { t1 };
1473 let want = if survivor == t1 { b } else { a };
1474 assert_eq!(
1475 lm.request(survivor, want, LockMode::Exclusive),
1476 Acquisition::Granted
1477 );
1478 assert!(lm.find_deadlock().is_none());
1479 }
1480
1481 #[test]
1482 fn test_no_false_deadlock_after_release() {
1483 // T1 waits for T2; T2 releases (not via the wait path). A later detection
1484 // must not report a deadlock from the now-stale wait edge.
1485 let lm = LockManager::new();
1486 let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1487 let (t1, t2) = (TxnId::new(1), TxnId::new(2));
1488
1489 let _ = lm.request(t1, a, LockMode::Exclusive);
1490 let _ = lm.request(t2, b, LockMode::Exclusive);
1491 let _ = lm.request(t1, b, LockMode::Exclusive); // T1 waits for T2 on B
1492 lm.release(t2, b).unwrap(); // B is now free; T1's edge is stale
1493 // T2 wants A (held by T1). Were T1's stale edge still counted, this would
1494 // look like a cycle. It must not: B is free, so T1 has no real out-edge.
1495 assert_eq!(lm.request(t2, a, LockMode::Exclusive), Acquisition::Waiting);
1496 assert!(lm.find_deadlock().is_none());
1497 }
1498
1499 #[test]
1500 fn test_cancel_wait_removes_from_graph() {
1501 let lm = LockManager::new();
1502 let r = ResourceId::new(1);
1503 let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
1504 assert_eq!(
1505 lm.request(TxnId::new(2), r, LockMode::Exclusive),
1506 Acquisition::Waiting
1507 );
1508 lm.cancel_wait(TxnId::new(2));
1509 assert_eq!(lm.waiting_count(), 0);
1510 }
1511
1512 #[test]
1513 fn test_release_all_clears_wait() {
1514 let lm = LockManager::new();
1515 let r = ResourceId::new(1);
1516 let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
1517 let _ = lm.request(TxnId::new(2), r, LockMode::Exclusive); // T2 waits
1518 assert_eq!(lm.waiting_count(), 1);
1519 lm.release_all(TxnId::new(2));
1520 assert_eq!(lm.waiting_count(), 0);
1521 }
1522
1523 #[test]
1524 fn test_find_deadlock_none_without_cycle() {
1525 let lm = LockManager::new();
1526 let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1527 let _ = lm.request(TxnId::new(1), a, LockMode::Exclusive);
1528 let _ = lm.request(TxnId::new(2), b, LockMode::Exclusive);
1529 let _ = lm.request(TxnId::new(1), b, LockMode::Exclusive); // one-way wait
1530 assert!(lm.find_deadlock().is_none());
1531 }
1532
1533 #[test]
1534 fn test_shared_requests_do_not_deadlock() {
1535 // Two shared requests on the same resource both grant; no waiting.
1536 let lm = LockManager::new();
1537 let r = ResourceId::new(1);
1538 assert_eq!(
1539 lm.request(TxnId::new(1), r, LockMode::Shared),
1540 Acquisition::Granted
1541 );
1542 assert_eq!(
1543 lm.request(TxnId::new(2), r, LockMode::Shared),
1544 Acquisition::Granted
1545 );
1546 assert_eq!(lm.waiting_count(), 0);
1547 }
1548
1549 #[test]
1550 fn test_with_shards_rounds_up_to_power_of_two() {
1551 assert_eq!(LockManager::with_shards(1).shards(), 1);
1552 assert_eq!(LockManager::with_shards(3).shards(), 4);
1553 assert_eq!(LockManager::with_shards(5).shards(), 8);
1554 assert_eq!(LockManager::with_shards(0).shards(), 1);
1555 assert_eq!(LockManager::with_shards(64).shards(), 64);
1556 }
1557
1558 #[test]
1559 fn test_single_shard_routes_everything_to_index_zero() {
1560 let lm = LockManager::with_shards(1);
1561 for id in 0..1000 {
1562 assert_eq!(lm.shard_index(ResourceId::new(id)), 0);
1563 }
1564 }
1565
1566 #[test]
1567 fn test_shard_index_within_bounds() {
1568 let lm = LockManager::with_shards(16);
1569 for id in 0..10_000 {
1570 assert!(lm.shard_index(ResourceId::new(id)) < 16);
1571 }
1572 }
1573
1574 #[test]
1575 fn test_sequential_ids_spread_across_shards() {
1576 let lm = LockManager::with_shards(16);
1577 let mut seen = [false; 16];
1578 for id in 0..256 {
1579 seen[lm.shard_index(ResourceId::new(id))] = true;
1580 }
1581 // Fibonacci hashing should touch every shard well before 256 ids.
1582 assert!(seen.iter().all(|&hit| hit));
1583 }
1584
1585 #[test]
1586 fn test_locks_in_different_shards_are_independent() {
1587 // Two resources that hash to different shards do not interfere.
1588 let lm = LockManager::with_shards(16);
1589 let a = ResourceId::new(1);
1590 let b = ResourceId::new(2);
1591 lm.try_acquire(TxnId::new(1), a, LockMode::Exclusive)
1592 .unwrap();
1593 lm.try_acquire(TxnId::new(2), b, LockMode::Exclusive)
1594 .unwrap();
1595 assert_eq!(lm.holder_count(a), 1);
1596 assert_eq!(lm.holder_count(b), 1);
1597 }
1598
1599 #[test]
1600 fn test_fib_hash_constant_is_odd() {
1601 // A multiplicative-hash multiplier must be odd to be a bijection mod 2^64.
1602 assert_eq!(FIB_HASH & 1, 1);
1603 }
1604
1605 #[test]
1606 fn test_concurrent_shared_acquire_release_is_consistent() {
1607 use std::sync::Arc;
1608 use std::thread;
1609
1610 let lm = Arc::new(LockManager::new());
1611 let r = ResourceId::new(7);
1612 let mut handles = Vec::new();
1613 for t in 0..8u64 {
1614 let lm = Arc::clone(&lm);
1615 handles.push(thread::spawn(move || {
1616 let txn = TxnId::new(t);
1617 for _ in 0..1000 {
1618 lm.try_acquire(txn, r, LockMode::Shared).unwrap();
1619 lm.release(txn, r).unwrap();
1620 }
1621 }));
1622 }
1623 for h in handles {
1624 h.join().unwrap();
1625 }
1626 // Every acquire was paired with a release; the resource is free.
1627 assert_eq!(lm.holder_count(r), 0);
1628 }
1629
1630 #[test]
1631 fn test_concurrent_exclusive_is_mutually_exclusive() {
1632 use std::sync::Arc;
1633 use std::sync::atomic::{AtomicUsize, Ordering};
1634 use std::thread;
1635
1636 let lm = Arc::new(LockManager::new());
1637 let active = Arc::new(AtomicUsize::new(0));
1638 let r = ResourceId::new(11);
1639 let mut handles = Vec::new();
1640 for t in 0..8u64 {
1641 let lm = Arc::clone(&lm);
1642 let active = Arc::clone(&active);
1643 handles.push(thread::spawn(move || {
1644 let txn = TxnId::new(t);
1645 for _ in 0..2000 {
1646 if lm.try_acquire(txn, r, LockMode::Exclusive).is_ok() {
1647 // While we hold X, no one else may be inside this region.
1648 let inside = active.fetch_add(1, Ordering::SeqCst);
1649 assert_eq!(inside, 0);
1650 active.fetch_sub(1, Ordering::SeqCst);
1651 lm.release(txn, r).unwrap();
1652 }
1653 }
1654 }));
1655 }
1656 for h in handles {
1657 h.join().unwrap();
1658 }
1659 assert_eq!(lm.holder_count(r), 0);
1660 }
1661}