Skip to main content

lock_db/
manager.rs

1//! The lock table: a sharded, contention-aware map from resources to holders.
2//!
3//! # Design
4//!
5//! A single global mutex over the whole lock table would serialise every
6//! acquire and release in the database, turning the lock manager itself into
7//! the bottleneck it exists to manage. Instead the table is split into a fixed
8//! number of independent shards, each guarding its own slice of the resource
9//! space behind its own mutex. Two transactions touching resources in different
10//! shards never contend on the same lock. The shard for a resource is chosen by
11//! Fibonacci hashing its id, which spreads sequential ids (the common case for
12//! page and row numbers) evenly across shards without paying for a
13//! general-purpose hasher on the hot path.
14//!
15//! Each shard also keeps a reverse index from transaction to the resources it
16//! holds in that shard, so releasing every lock a transaction owns is
17//! proportional to the number of locks held, not to the size of the table.
18//!
19//! This release ([crate-level docs](crate)) provides non-blocking acquisition:
20//! a request that cannot be granted immediately returns [`LockError::Conflict`]
21//! rather than waiting. Blocking acquisition with wait queues, and the
22//! deadlock detection that requires it, arrive in a later milestone.
23
24#[cfg(loom)]
25use loom::sync::{Mutex, MutexGuard};
26#[cfg(not(loom))]
27use std::sync::{Mutex, MutexGuard};
28
29use std::collections::HashMap;
30
31use crate::deadlock::{Deadlock, VictimPolicy, WaitForGraph};
32use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
33
34/// The victim policy the deadlock-aware acquisition path uses.
35const DEADLOCK_VICTIM_POLICY: VictimPolicy = VictimPolicy::Youngest;
36
37/// Multiplier for Fibonacci hashing: 2^64 divided by the golden ratio.
38const FIB_HASH: u64 = 0x9E37_79B9_7F4A_7C15;
39
40/// A transaction holding a resource, and the mode it holds it in.
41#[derive(Clone, Copy)]
42struct Holder {
43    txn: TxnId,
44    mode: LockMode,
45}
46
47/// The set of transactions currently holding one resource.
48///
49/// Holders are kept in an unordered `Vec` because the common case is a handful
50/// of shared readers or a single writer; a linear scan over a short, contiguous
51/// slice beats the constant overhead and indirection of a map for those sizes.
52struct LockEntry {
53    holders: Vec<Holder>,
54}
55
56impl LockEntry {
57    #[inline]
58    fn new() -> Self {
59        Self {
60            holders: Vec::new(),
61        }
62    }
63}
64
65/// A transaction holding a key range in a space, and the mode it holds.
66#[derive(Clone, Copy)]
67struct RangeHolder {
68    txn: TxnId,
69    range: KeyRange,
70    mode: LockMode,
71}
72
73/// The active range locks in one key space.
74///
75/// Held in an unordered `Vec` and scanned linearly for overlap on each request.
76/// Overlap is not a key-equality lookup, so a hash map does not help; an
77/// interval tree would lower the asymptotic cost but is heavier and is left for
78/// a later release if profiling shows range contention dominates.
79struct RangeSpace {
80    holders: Vec<RangeHolder>,
81}
82
83impl RangeSpace {
84    #[inline]
85    fn new() -> Self {
86        Self {
87            holders: Vec::new(),
88        }
89    }
90}
91
92/// The mutable state of one shard.
93struct ShardInner {
94    /// Point locks: resources with at least one holder, keyed by resource id.
95    locks: HashMap<ResourceId, LockEntry>,
96    /// Reverse index: the resources each transaction holds *in this shard*.
97    by_txn: HashMap<TxnId, Vec<ResourceId>>,
98    /// Range locks, keyed by the space (e.g. an index) they protect.
99    ranges: HashMap<ResourceId, RangeSpace>,
100    /// Reverse index for range locks: the (space, range) pairs each transaction
101    /// holds *in this shard*.
102    range_by_txn: HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
103}
104
105impl ShardInner {
106    fn new() -> Self {
107        Self {
108            locks: HashMap::new(),
109            by_txn: HashMap::new(),
110            ranges: HashMap::new(),
111            range_by_txn: HashMap::new(),
112        }
113    }
114}
115
116/// One independently locked partition of the table.
117struct Shard {
118    inner: Mutex<ShardInner>,
119}
120
121/// A sharded lock table mapping resources to the transactions that hold them.
122///
123/// `LockManager` is the primary entry point of the crate. It is `Send + Sync`
124/// and is meant to be shared behind an [`std::sync::Arc`] across all worker
125/// threads; every method takes `&self`, so no outer lock is needed.
126///
127/// # Examples
128///
129/// ```
130/// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
131///
132/// let lm = LockManager::new();
133/// let row = ResourceId::new(100);
134/// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
135///
136/// // Two transactions read the same row concurrently.
137/// lm.try_acquire(t1, row, LockMode::Shared).unwrap();
138/// lm.try_acquire(t2, row, LockMode::Shared).unwrap();
139/// assert_eq!(lm.holder_count(row), 2);
140///
141/// // Neither can take it exclusively while the other reads.
142/// assert!(lm.try_acquire(t1, row, LockMode::Exclusive).is_err());
143///
144/// // After both release, an exclusive lock is free to take.
145/// lm.release(t1, row).unwrap();
146/// lm.release(t2, row).unwrap();
147/// lm.try_acquire(t1, row, LockMode::Exclusive).unwrap();
148/// ```
149#[must_use = "a LockManager that is dropped immediately releases every lock it holds"]
150pub struct LockManager {
151    shards: Box<[Shard]>,
152    /// `log2(shards.len())`; `0` when there is a single shard.
153    bits: u32,
154    /// The deadlock-aware wait set: each waiting transaction and the single
155    /// (resource, mode) request it is blocked on. A global mutex, taken only by
156    /// the deadlock-aware [`request`](LockManager::request) path — the
157    /// non-blocking `try_acquire`/`release` fast path never touches it.
158    ///
159    /// Lock ordering: this mutex is always the *outer* lock. `request` takes it
160    /// and then a shard mutex; nothing ever takes a shard mutex and then this
161    /// one. `release_all` clears its own entry in a separate critical section,
162    /// never nested with a shard lock, so no cycle is possible.
163    waits: Mutex<HashMap<TxnId, (ResourceId, LockMode)>>,
164}
165
166/// The outcome of a deadlock-aware [`request`](LockManager::request).
167///
168/// Unlike [`try_acquire`](LockManager::try_acquire), `request` does not just
169/// fail on conflict — it records the wait and tells the caller whether to
170/// proceed, suspend, or abort.
171#[derive(Debug, Clone, PartialEq, Eq)]
172#[must_use = "the outcome decides whether the transaction proceeds, waits, or aborts"]
173pub enum Acquisition {
174    /// The lock was granted; the transaction holds it and may proceed.
175    Granted,
176    /// The lock is held incompatibly. The transaction is now registered as
177    /// waiting; the caller should suspend it and retry `request` later (for
178    /// example after a release). No deadlock was found.
179    Waiting,
180    /// Granting the wait would close a cycle in the wait-for graph. The caller
181    /// must abort the named victim (with [`release_all`](LockManager::release_all))
182    /// to break the deadlock. The victim may be the requesting transaction
183    /// itself or another transaction in the cycle.
184    Deadlock(Deadlock),
185}
186
187impl LockManager {
188    /// Creates a lock manager with a shard count chosen for the current machine.
189    ///
190    /// The count scales with the number of available CPUs (rounded up to a power
191    /// of two) so that contention on any single shard mutex stays low on
192    /// multi-core systems. Use [`with_shards`](Self::with_shards) to pin an
193    /// exact count, for example in tests or on memory-constrained targets.
194    ///
195    /// # Examples
196    ///
197    /// ```
198    /// use lock_db::LockManager;
199    ///
200    /// let lm = LockManager::new();
201    /// assert!(lm.shards().is_power_of_two());
202    /// ```
203    pub fn new() -> Self {
204        let parallelism = std::thread::available_parallelism()
205            .map(|n| n.get())
206            .unwrap_or(1);
207        let target = (parallelism.saturating_mul(4))
208            .next_power_of_two()
209            .clamp(16, 1024);
210        Self::with_shards(target)
211    }
212
213    /// Creates a lock manager with an explicit shard count.
214    ///
215    /// `shards` is rounded up to the next power of two (and a request of `0` is
216    /// treated as `1`), which lets the shard lookup use a shift instead of a
217    /// remainder. More shards reduce contention but cost a mutex and two small
218    /// maps each; fewer shards save memory at the cost of more collisions.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// use lock_db::LockManager;
224    ///
225    /// // Rounded up to the next power of two.
226    /// assert_eq!(LockManager::with_shards(5).shards(), 8);
227    /// assert_eq!(LockManager::with_shards(0).shards(), 1);
228    /// ```
229    pub fn with_shards(shards: usize) -> Self {
230        let n = shards.max(1).next_power_of_two();
231        let bits = n.trailing_zeros();
232        let mut v = Vec::with_capacity(n);
233        for _ in 0..n {
234            v.push(Shard {
235                inner: Mutex::new(ShardInner::new()),
236            });
237        }
238        Self {
239            shards: v.into_boxed_slice(),
240            bits,
241            waits: Mutex::new(HashMap::new()),
242        }
243    }
244
245    /// Returns the number of shards in the table.
246    ///
247    /// Always a power of two.
248    #[inline]
249    #[must_use]
250    pub fn shards(&self) -> usize {
251        self.shards.len()
252    }
253
254    /// Tries to acquire `mode` on `res` for `txn` without blocking.
255    ///
256    /// The request is granted immediately and `Ok(())` is returned when:
257    ///
258    /// - `txn` already holds a lock on `res` that [covers](LockMode::covers)
259    ///   `mode` (re-acquisition is idempotent, and asking for a weaker mode than
260    ///   you hold is a no-op);
261    /// - `txn` already holds `res` in some mode and the
262    ///   [join](LockMode::join) of that mode with `mode` is compatible with
263    ///   every other holder (an in-place upgrade — for example shared to
264    ///   exclusive when sole holder, or shared plus intention-exclusive to SIX);
265    ///   or
266    /// - `txn` holds nothing on `res` and `mode` is compatible with every
267    ///   current holder.
268    ///
269    /// Otherwise nothing is changed and [`LockError::Conflict`] is returned. The
270    /// caller decides whether to retry, wait, or abort; this method never blocks
271    /// the calling thread.
272    ///
273    /// # Errors
274    ///
275    /// Returns [`LockError::Conflict`] if the lock cannot be granted right now.
276    ///
277    /// # Examples
278    ///
279    /// ```
280    /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
281    ///
282    /// let lm = LockManager::new();
283    /// let key = ResourceId::new(7);
284    /// let t = TxnId::new(1);
285    ///
286    /// // Upgrade a shared lock to exclusive while sole holder.
287    /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
288    /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
289    /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Exclusive));
290    ///
291    /// // A second reader now conflicts with the upgraded exclusive lock.
292    /// let r = lm.try_acquire(TxnId::new(2), key, LockMode::Shared);
293    /// assert_eq!(r, Err(LockError::Conflict));
294    /// ```
295    pub fn try_acquire(
296        &self,
297        txn: TxnId,
298        res: ResourceId,
299        mode: LockMode,
300    ) -> Result<(), LockError> {
301        let mut guard = self.lock_shard(res);
302        let ShardInner { locks, by_txn, .. } = &mut *guard;
303        if Self::try_grant_locked(locks, by_txn, txn, res, mode) {
304            Ok(())
305        } else {
306            Err(LockError::Conflict)
307        }
308    }
309
310    /// Attempts to grant `mode` on `res` to `txn` against an already-locked
311    /// shard. Returns `true` if granted (idempotent re-acquire, in-place
312    /// upgrade, or fresh grant), `false` on conflict. Shared by
313    /// [`try_acquire`](Self::try_acquire) and [`request`](Self::request).
314    fn try_grant_locked(
315        locks: &mut HashMap<ResourceId, LockEntry>,
316        by_txn: &mut HashMap<TxnId, Vec<ResourceId>>,
317        txn: TxnId,
318        res: ResourceId,
319        mode: LockMode,
320    ) -> bool {
321        let entry = locks.entry(res).or_insert_with(LockEntry::new);
322
323        if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
324            let current = entry.holders[pos].mode;
325            if current.covers(mode) {
326                return true;
327            }
328            // Upgrade: the transaction ends up holding the join (least upper
329            // bound) of what it has and what it asked for. The upgraded mode
330            // must be compatible with every *other* holder.
331            let target = current.join(mode);
332            let blocked = entry
333                .holders
334                .iter()
335                .enumerate()
336                .any(|(i, h)| i != pos && !h.mode.compatible_with(target));
337            if blocked {
338                return false;
339            }
340            entry.holders[pos].mode = target;
341            return true;
342        }
343
344        if entry.holders.iter().all(|h| h.mode.compatible_with(mode)) {
345            entry.holders.push(Holder { txn, mode });
346            by_txn.entry(txn).or_default().push(res);
347            true
348        } else {
349            // The entry already had holders (an empty one would have matched the
350            // vacuous `all` above and been granted), so nothing to clean up.
351            false
352        }
353    }
354
355    /// Releases the lock `txn` holds on `res`.
356    ///
357    /// # Errors
358    ///
359    /// Returns [`LockError::NotHeld`] if `txn` holds no lock on `res`, which
360    /// usually means a double release or a bookkeeping mismatch in the caller.
361    ///
362    /// # Examples
363    ///
364    /// ```
365    /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
366    ///
367    /// let lm = LockManager::new();
368    /// let key = ResourceId::new(3);
369    /// let t = TxnId::new(1);
370    ///
371    /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
372    /// lm.release(t, key).unwrap();
373    /// assert_eq!(lm.release(t, key), Err(LockError::NotHeld));
374    /// ```
375    pub fn release(&self, txn: TxnId, res: ResourceId) -> Result<(), LockError> {
376        let mut guard = self.lock_shard(res);
377        let ShardInner { locks, by_txn, .. } = &mut *guard;
378
379        let entry = match locks.get_mut(&res) {
380            Some(entry) => entry,
381            None => return Err(LockError::NotHeld),
382        };
383        let pos = match entry.holders.iter().position(|h| h.txn == txn) {
384            Some(pos) => pos,
385            None => return Err(LockError::NotHeld),
386        };
387
388        let _ = entry.holders.swap_remove(pos);
389        if entry.holders.is_empty() {
390            let _ = locks.remove(&res);
391        }
392        Self::forget_resource(by_txn, txn, res);
393        Ok(())
394    }
395
396    /// Releases every lock held by `txn` across the whole table — both point
397    /// locks and range locks.
398    ///
399    /// This is the call a transaction layer makes at commit or abort to drop a
400    /// transaction's entire lock set at once. It returns the number of locks
401    /// released, and is proportional to that number rather than to the size of
402    /// the table.
403    ///
404    /// # Examples
405    ///
406    /// ```
407    /// use lock_db::{KeyRange, LockManager, LockMode, ResourceId, TxnId};
408    ///
409    /// let lm = LockManager::new();
410    /// let t = TxnId::new(1);
411    /// for id in 0..5 {
412    ///     lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive).unwrap();
413    /// }
414    /// lm.try_acquire_range(t, ResourceId::new(99), KeyRange::point(1), LockMode::Shared).unwrap();
415    ///
416    /// assert_eq!(lm.release_all(t), 6); // 5 point locks + 1 range lock
417    /// assert_eq!(lm.release_all(t), 0); // idempotent once empty
418    /// ```
419    pub fn release_all(&self, txn: TxnId) -> usize {
420        // Clear any pending wait first, in its own critical section. This never
421        // nests with a shard lock, so it cannot deadlock against `request`
422        // (which takes `waits` then a shard); see the `waits` field docs.
423        {
424            let mut waits = self.lock_waits();
425            let _ = waits.remove(&txn);
426        }
427
428        let mut released = 0;
429        for shard in self.shards.iter() {
430            let mut guard = Self::lock(shard);
431            let ShardInner {
432                locks,
433                by_txn,
434                ranges,
435                range_by_txn,
436            } = &mut *guard;
437
438            if let Some(resources) = by_txn.remove(&txn) {
439                for res in resources {
440                    if let Some(entry) = locks.get_mut(&res) {
441                        if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
442                            let _ = entry.holders.swap_remove(pos);
443                            released += 1;
444                            if entry.holders.is_empty() {
445                                let _ = locks.remove(&res);
446                            }
447                        }
448                    }
449                }
450            }
451
452            if let Some(spaces) = range_by_txn.remove(&txn) {
453                for (space, range) in spaces {
454                    if let Some(rs) = ranges.get_mut(&space) {
455                        if let Some(pos) = rs
456                            .holders
457                            .iter()
458                            .position(|h| h.txn == txn && h.range == range)
459                        {
460                            let _ = rs.holders.swap_remove(pos);
461                            released += 1;
462                            if rs.holders.is_empty() {
463                                let _ = ranges.remove(&space);
464                            }
465                        }
466                    }
467                }
468            }
469        }
470        released
471    }
472
473    /// Acquires `mode` on `res` for `txn`, registering a wait and detecting
474    /// deadlock if it cannot be granted.
475    ///
476    /// This is the deadlock-aware counterpart to
477    /// [`try_acquire`](Self::try_acquire). The three outcomes are:
478    ///
479    /// - [`Acquisition::Granted`] — the lock was granted; proceed.
480    /// - [`Acquisition::Waiting`] — the lock is held incompatibly and `txn` is
481    ///   now recorded in the wait-for graph. The caller should suspend the
482    ///   transaction and call `request` again later (for example after a
483    ///   release) to retry. No deadlock was found.
484    /// - [`Acquisition::Deadlock`] — granting the wait would close a cycle. The
485    ///   caller must abort the [`Deadlock::victim`] with
486    ///   [`release_all`](Self::release_all). The victim may be `txn` or another
487    ///   transaction in the cycle.
488    ///
489    /// Detection is exact: the wait-for graph is rebuilt from the current lock
490    /// table on every call, so a wait left over from a lock that has since been
491    /// released contributes no edge, and a transaction is never reported as
492    /// deadlocked unless it genuinely is. The victim is chosen by the
493    /// [`VictimPolicy::Youngest`] policy; callers wanting a different policy can
494    /// apply [`WaitForGraph::pick_victim`] to [`Deadlock::cycle`] themselves.
495    ///
496    /// Only transactions that wait through `request` appear in the graph; a
497    /// transaction that spins on `try_acquire` is invisible to deadlock
498    /// detection. Range locks ([`try_acquire_range`](Self::try_acquire_range))
499    /// are likewise not tracked here.
500    ///
501    /// `request` serializes on a single wait-registry mutex, unlike the sharded
502    /// `try_acquire`; it is the path to use when you need deadlock detection.
503    ///
504    /// # Examples
505    ///
506    /// ```
507    /// use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
508    ///
509    /// let lm = LockManager::new();
510    /// let (a, b) = (ResourceId::new(1), ResourceId::new(2));
511    /// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
512    ///
513    /// // T1 holds A, T2 holds B.
514    /// assert_eq!(lm.request(t1, a, LockMode::Exclusive), Acquisition::Granted);
515    /// assert_eq!(lm.request(t2, b, LockMode::Exclusive), Acquisition::Granted);
516    ///
517    /// // T1 waits for B (held by T2): no cycle yet.
518    /// assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
519    ///
520    /// // T2 now waits for A (held by T1): that closes the cycle.
521    /// match lm.request(t2, a, LockMode::Exclusive) {
522    ///     Acquisition::Deadlock(d) => {
523    ///         assert_eq!(d.victim, TxnId::new(2)); // youngest in the cycle
524    ///         lm.release_all(d.victim);            // abort to break the deadlock
525    ///     }
526    ///     other => panic!("expected a deadlock, got {other:?}"),
527    /// }
528    /// ```
529    pub fn request(&self, txn: TxnId, res: ResourceId, mode: LockMode) -> Acquisition {
530        // `waits` is the outer lock; the grant attempt and graph build both take
531        // shard locks underneath it, never the reverse.
532        let mut waits = self.lock_waits();
533
534        let granted = {
535            let mut guard = self.lock_shard(res);
536            let ShardInner { locks, by_txn, .. } = &mut *guard;
537            Self::try_grant_locked(locks, by_txn, txn, res, mode)
538        };
539        if granted {
540            let _ = waits.remove(&txn);
541            return Acquisition::Granted;
542        }
543
544        let _ = waits.insert(txn, (res, mode));
545        let graph = self.build_wait_graph(&waits);
546        match graph.cycle_from(txn) {
547            Some(cycle) => {
548                let victim =
549                    WaitForGraph::pick_victim(&cycle, DEADLOCK_VICTIM_POLICY).unwrap_or(txn);
550                Acquisition::Deadlock(Deadlock { victim, cycle })
551            }
552            None => Acquisition::Waiting,
553        }
554    }
555
556    /// Removes any pending wait for `txn` from the wait-for graph.
557    ///
558    /// Call this when a transaction that previously got [`Acquisition::Waiting`]
559    /// stops waiting without acquiring the lock (for example it timed out or was
560    /// aborted for another reason). [`release_all`](Self::release_all) already
561    /// clears the wait, so this is only needed when releasing nothing.
562    ///
563    /// # Examples
564    ///
565    /// ```
566    /// use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
567    ///
568    /// let lm = LockManager::new();
569    /// let res = ResourceId::new(1);
570    /// lm.request(TxnId::new(1), res, LockMode::Exclusive);
571    /// // T2 waits, then gives up.
572    /// assert_eq!(lm.request(TxnId::new(2), res, LockMode::Exclusive), Acquisition::Waiting);
573    /// lm.cancel_wait(TxnId::new(2));
574    /// assert_eq!(lm.waiting_count(), 0);
575    /// ```
576    pub fn cancel_wait(&self, txn: TxnId) {
577        let mut waits = self.lock_waits();
578        let _ = waits.remove(&txn);
579    }
580
581    /// Scans the current wait set for a deadlock, returning one if found.
582    ///
583    /// This is the periodic-detection counterpart to the at-wait detection in
584    /// [`request`](Self::request): a background task can call it on an interval
585    /// instead of (or in addition to) acting on `request`'s result. It rebuilds
586    /// the wait-for graph from the current lock table, so it reports only
587    /// genuine deadlocks. Returns `None` when no cycle exists.
588    ///
589    /// # Examples
590    ///
591    /// ```
592    /// use lock_db::{Acquisition, LockManager, LockMode, ResourceId, TxnId};
593    ///
594    /// let lm = LockManager::new();
595    /// let (a, b) = (ResourceId::new(1), ResourceId::new(2));
596    /// lm.request(TxnId::new(1), a, LockMode::Exclusive);
597    /// lm.request(TxnId::new(2), b, LockMode::Exclusive);
598    /// lm.request(TxnId::new(1), b, LockMode::Exclusive); // T1 waits for T2
599    /// assert!(lm.find_deadlock().is_none());
600    /// lm.request(TxnId::new(2), a, LockMode::Exclusive); // T2 waits for T1: cycle
601    /// assert!(lm.find_deadlock().is_some());
602    /// ```
603    #[must_use]
604    pub fn find_deadlock(&self) -> Option<Deadlock> {
605        let waits = self.lock_waits();
606        let graph = self.build_wait_graph(&waits);
607        let cycle = graph.detect_cycle()?;
608        let victim = WaitForGraph::pick_victim(&cycle, DEADLOCK_VICTIM_POLICY)?;
609        Some(Deadlock { victim, cycle })
610    }
611
612    /// Returns the number of transactions currently registered as waiting.
613    ///
614    /// Mostly useful for diagnostics and tests.
615    #[must_use]
616    pub fn waiting_count(&self) -> usize {
617        self.lock_waits().len()
618    }
619
620    /// Builds a wait-for graph from the live wait set, reading the *current*
621    /// holders of each waited resource from the lock table. Rebuilding from
622    /// truth on every detection is what keeps detection from acting on a stale
623    /// edge. Called while holding the `waits` lock; takes shard locks underneath.
624    fn build_wait_graph(&self, waits: &HashMap<TxnId, (ResourceId, LockMode)>) -> WaitForGraph {
625        let mut graph = WaitForGraph::new();
626        for (&waiter, &(res, mode)) in waits {
627            let blockers = self.holders_blocking(waiter, res, mode);
628            graph.add_waits(waiter, &blockers);
629        }
630        graph
631    }
632
633    /// Returns the transactions, other than `waiter`, that currently hold `res`
634    /// in a mode incompatible with `mode` — the transactions `waiter` is blocked
635    /// by.
636    fn holders_blocking(&self, waiter: TxnId, res: ResourceId, mode: LockMode) -> Vec<TxnId> {
637        let guard = self.lock_shard(res);
638        guard.locks.get(&res).map_or_else(Vec::new, |entry| {
639            entry
640                .holders
641                .iter()
642                .filter(|h| h.txn != waiter && !h.mode.compatible_with(mode))
643                .map(|h| h.txn)
644                .collect()
645        })
646    }
647
648    /// Locks the wait registry, recovering its guard if the mutex was poisoned.
649    #[inline]
650    fn lock_waits(&self) -> MutexGuard<'_, HashMap<TxnId, (ResourceId, LockMode)>> {
651        match self.waits.lock() {
652            Ok(guard) => guard,
653            Err(poisoned) => poisoned.into_inner(),
654        }
655    }
656
657    /// Returns the number of transactions currently holding `res`.
658    ///
659    /// Mostly useful for diagnostics and tests; in steady state this is `0`,
660    /// `1` for an exclusive lock, or the reader count for a shared lock.
661    ///
662    /// # Examples
663    ///
664    /// ```
665    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
666    ///
667    /// let lm = LockManager::new();
668    /// let key = ResourceId::new(1);
669    /// assert_eq!(lm.holder_count(key), 0);
670    /// lm.try_acquire(TxnId::new(1), key, LockMode::Shared).unwrap();
671    /// assert_eq!(lm.holder_count(key), 1);
672    /// ```
673    #[must_use]
674    pub fn holder_count(&self, res: ResourceId) -> usize {
675        let guard = self.lock_shard(res);
676        guard.locks.get(&res).map_or(0, |e| e.holders.len())
677    }
678
679    /// Returns the mode in which `txn` holds `res`, or `None` if it holds no
680    /// lock on it.
681    ///
682    /// # Examples
683    ///
684    /// ```
685    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
686    ///
687    /// let lm = LockManager::new();
688    /// let key = ResourceId::new(1);
689    /// let t = TxnId::new(1);
690    /// assert_eq!(lm.mode_held(t, key), None);
691    /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
692    /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Shared));
693    /// ```
694    #[must_use]
695    pub fn mode_held(&self, txn: TxnId, res: ResourceId) -> Option<LockMode> {
696        let guard = self.lock_shard(res);
697        guard
698            .locks
699            .get(&res)
700            .and_then(|e| e.holders.iter().find(|h| h.txn == txn))
701            .map(|h| h.mode)
702    }
703
704    /// Tries to acquire `mode` over the key range `range` in key space `space`,
705    /// for `txn`, without blocking.
706    ///
707    /// A range lock protects a contiguous span of keys — use it to stop another
708    /// transaction from inserting into, or writing within, a range you have
709    /// read (phantom and predicate protection). `space` identifies the key space
710    /// the range lives in, typically an index; ranges in different spaces never
711    /// conflict.
712    ///
713    /// The request is granted unless some **other** transaction already holds an
714    /// [overlapping](KeyRange::overlaps) range in `space` in an
715    /// [incompatible](LockMode::compatible_with) mode. The same transaction may
716    /// hold several ranges in a space, including overlapping ones; range locks
717    /// are not merged or upgraded.
718    ///
719    /// # Errors
720    ///
721    /// Returns [`LockError::Conflict`] if an overlapping, incompatible range is
722    /// held by another transaction.
723    ///
724    /// # Examples
725    ///
726    /// ```
727    /// use lock_db::{KeyRange, LockError, LockManager, LockMode, ResourceId, TxnId};
728    ///
729    /// let lm = LockManager::new();
730    /// let index = ResourceId::new(1);
731    ///
732    /// // A read lock over [100, 200].
733    /// lm.try_acquire_range(TxnId::new(1), index, KeyRange::new(100, 200).unwrap(), LockMode::Shared).unwrap();
734    ///
735    /// // Another reader may share the overlapping range...
736    /// lm.try_acquire_range(TxnId::new(2), index, KeyRange::new(150, 250).unwrap(), LockMode::Shared).unwrap();
737    ///
738    /// // ...but a writer inside it conflicts.
739    /// assert_eq!(
740    ///     lm.try_acquire_range(TxnId::new(3), index, KeyRange::point(150), LockMode::Exclusive),
741    ///     Err(LockError::Conflict),
742    /// );
743    /// ```
744    pub fn try_acquire_range(
745        &self,
746        txn: TxnId,
747        space: ResourceId,
748        range: KeyRange,
749        mode: LockMode,
750    ) -> Result<(), LockError> {
751        let mut guard = self.lock_shard(space);
752        let ShardInner {
753            ranges,
754            range_by_txn,
755            ..
756        } = &mut *guard;
757        let rs = ranges.entry(space).or_insert_with(RangeSpace::new);
758
759        let conflict = rs
760            .holders
761            .iter()
762            .any(|h| h.txn != txn && h.range.overlaps(range) && !h.mode.compatible_with(mode));
763        if conflict {
764            // A conflict implies a pre-existing holder, so the space entry is
765            // non-empty and there is nothing to clean up.
766            return Err(LockError::Conflict);
767        }
768
769        rs.holders.push(RangeHolder { txn, range, mode });
770        range_by_txn.entry(txn).or_default().push((space, range));
771        Ok(())
772    }
773
774    /// Releases a range lock `txn` holds over `range` in `space`.
775    ///
776    /// Matches on the transaction and the exact range. If the transaction holds
777    /// several locks on the identical range (in different modes), one is
778    /// released per call.
779    ///
780    /// # Errors
781    ///
782    /// Returns [`LockError::NotHeld`] if `txn` holds no lock on that exact range
783    /// in `space`.
784    ///
785    /// # Examples
786    ///
787    /// ```
788    /// use lock_db::{KeyRange, LockError, LockManager, LockMode, ResourceId, TxnId};
789    ///
790    /// let lm = LockManager::new();
791    /// let index = ResourceId::new(1);
792    /// let r = KeyRange::new(1, 10).unwrap();
793    /// let t = TxnId::new(1);
794    ///
795    /// lm.try_acquire_range(t, index, r, LockMode::Exclusive).unwrap();
796    /// lm.release_range(t, index, r).unwrap();
797    /// assert_eq!(lm.release_range(t, index, r), Err(LockError::NotHeld));
798    /// ```
799    pub fn release_range(
800        &self,
801        txn: TxnId,
802        space: ResourceId,
803        range: KeyRange,
804    ) -> Result<(), LockError> {
805        let mut guard = self.lock_shard(space);
806        let ShardInner {
807            ranges,
808            range_by_txn,
809            ..
810        } = &mut *guard;
811
812        let rs = match ranges.get_mut(&space) {
813            Some(rs) => rs,
814            None => return Err(LockError::NotHeld),
815        };
816        let pos = match rs
817            .holders
818            .iter()
819            .position(|h| h.txn == txn && h.range == range)
820        {
821            Some(pos) => pos,
822            None => return Err(LockError::NotHeld),
823        };
824
825        let _ = rs.holders.swap_remove(pos);
826        if rs.holders.is_empty() {
827            let _ = ranges.remove(&space);
828        }
829        Self::forget_range(range_by_txn, txn, space, range);
830        Ok(())
831    }
832
833    /// Returns the number of range locks currently held in `space`.
834    ///
835    /// Counts every holder, across all transactions and modes. Mostly useful
836    /// for diagnostics and tests.
837    ///
838    /// # Examples
839    ///
840    /// ```
841    /// use lock_db::{KeyRange, LockManager, LockMode, ResourceId, TxnId};
842    ///
843    /// let lm = LockManager::new();
844    /// let index = ResourceId::new(1);
845    /// assert_eq!(lm.range_count(index), 0);
846    /// lm.try_acquire_range(TxnId::new(1), index, KeyRange::point(1), LockMode::Shared).unwrap();
847    /// assert_eq!(lm.range_count(index), 1);
848    /// ```
849    #[must_use]
850    pub fn range_count(&self, space: ResourceId) -> usize {
851        let guard = self.lock_shard(space);
852        guard.ranges.get(&space).map_or(0, |rs| rs.holders.len())
853    }
854
855    /// Drops `res` from a transaction's reverse-index entry, removing the entry
856    /// entirely once the transaction holds nothing else in the shard.
857    #[inline]
858    fn forget_resource(by_txn: &mut HashMap<TxnId, Vec<ResourceId>>, txn: TxnId, res: ResourceId) {
859        if let Some(resources) = by_txn.get_mut(&txn) {
860            if let Some(pos) = resources.iter().position(|r| *r == res) {
861                let _ = resources.swap_remove(pos);
862            }
863            if resources.is_empty() {
864                let _ = by_txn.remove(&txn);
865            }
866        }
867    }
868
869    /// Drops one `(space, range)` pair from a transaction's range reverse-index
870    /// entry, removing the entry entirely once it is empty.
871    #[inline]
872    fn forget_range(
873        range_by_txn: &mut HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
874        txn: TxnId,
875        space: ResourceId,
876        range: KeyRange,
877    ) {
878        if let Some(held) = range_by_txn.get_mut(&txn) {
879            if let Some(pos) = held.iter().position(|(s, r)| *s == space && *r == range) {
880                let _ = held.swap_remove(pos);
881            }
882            if held.is_empty() {
883                let _ = range_by_txn.remove(&txn);
884            }
885        }
886    }
887
888    /// Locks and returns the shard that owns `res`.
889    #[inline]
890    fn lock_shard(&self, res: ResourceId) -> MutexGuard<'_, ShardInner> {
891        Self::lock(&self.shards[self.shard_index(res)])
892    }
893
894    /// Locks a shard, recovering its guard if the mutex was poisoned.
895    ///
896    /// Critical sections in this module perform only infallible map and vector
897    /// operations and never panic, so poisoning cannot leave inconsistent
898    /// state. Recovering the guard keeps the lock manager available rather than
899    /// propagating a poison error that no caller could act on.
900    #[inline]
901    fn lock(shard: &Shard) -> MutexGuard<'_, ShardInner> {
902        match shard.inner.lock() {
903            Ok(guard) => guard,
904            Err(poisoned) => poisoned.into_inner(),
905        }
906    }
907
908    /// Maps a resource id to a shard index via Fibonacci hashing.
909    #[inline]
910    fn shard_index(&self, res: ResourceId) -> usize {
911        if self.bits == 0 {
912            return 0;
913        }
914        let hash = res.get().wrapping_mul(FIB_HASH);
915        // Take the top `bits` bits: the most-mixed end of a multiplicative hash.
916        (hash >> (u64::BITS - self.bits)) as usize
917    }
918}
919
920impl Default for LockManager {
921    fn default() -> Self {
922        Self::new()
923    }
924}
925
926#[cfg(all(test, not(loom)))]
927#[allow(clippy::unwrap_used)]
928mod tests {
929    use super::{Acquisition, FIB_HASH, LockManager};
930    use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
931
932    fn ids(t: u64, r: u64) -> (TxnId, ResourceId) {
933        (TxnId::new(t), ResourceId::new(r))
934    }
935
936    fn kr(start: u64, end: u64) -> KeyRange {
937        KeyRange::new(start, end).unwrap()
938    }
939
940    #[test]
941    fn test_shared_locks_coexist() {
942        let lm = LockManager::new();
943        let r = ResourceId::new(1);
944        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
945        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
946        lm.try_acquire(TxnId::new(3), r, LockMode::Shared).unwrap();
947        assert_eq!(lm.holder_count(r), 3);
948    }
949
950    #[test]
951    fn test_exclusive_excludes_shared() {
952        let lm = LockManager::new();
953        let (t1, r) = ids(1, 1);
954        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
955        assert_eq!(
956            lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
957            Err(LockError::Conflict)
958        );
959    }
960
961    #[test]
962    fn test_intention_shared_and_intention_exclusive_coexist() {
963        let lm = LockManager::new();
964        let r = ResourceId::new(1);
965        lm.try_acquire(TxnId::new(1), r, LockMode::IntentionShared)
966            .unwrap();
967        lm.try_acquire(TxnId::new(2), r, LockMode::IntentionExclusive)
968            .unwrap();
969        assert_eq!(lm.holder_count(r), 2);
970    }
971
972    #[test]
973    fn test_intention_exclusive_blocks_shared() {
974        let lm = LockManager::new();
975        let r = ResourceId::new(1);
976        lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive)
977            .unwrap();
978        assert_eq!(
979            lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
980            Err(LockError::Conflict)
981        );
982        // ...but another IX or an IS is fine.
983        lm.try_acquire(TxnId::new(3), r, LockMode::IntentionExclusive)
984            .unwrap();
985        lm.try_acquire(TxnId::new(4), r, LockMode::IntentionShared)
986            .unwrap();
987    }
988
989    #[test]
990    fn test_shared_plus_intention_exclusive_upgrades_to_six() {
991        let lm = LockManager::new();
992        let r = ResourceId::new(1);
993        let t = TxnId::new(1);
994        lm.try_acquire(t, r, LockMode::Shared).unwrap();
995        // Same txn now intends to write part of the subtree: S join IX = SIX.
996        lm.try_acquire(t, r, LockMode::IntentionExclusive).unwrap();
997        assert_eq!(lm.mode_held(t, r), Some(LockMode::SharedIntentionExclusive));
998        // An intention-shared holder still coexists with SIX.
999        lm.try_acquire(TxnId::new(2), r, LockMode::IntentionShared)
1000            .unwrap();
1001        // But a second reader does not.
1002        assert_eq!(
1003            lm.try_acquire(TxnId::new(3), r, LockMode::Shared),
1004            Err(LockError::Conflict)
1005        );
1006    }
1007
1008    #[test]
1009    fn test_intention_shared_upgrades_to_exclusive_when_sole_holder() {
1010        let lm = LockManager::new();
1011        let r = ResourceId::new(1);
1012        let t = TxnId::new(1);
1013        lm.try_acquire(t, r, LockMode::IntentionShared).unwrap();
1014        lm.try_acquire(t, r, LockMode::Exclusive).unwrap();
1015        assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
1016    }
1017
1018    #[test]
1019    fn test_upgrade_to_six_blocked_by_other_reader() {
1020        let lm = LockManager::new();
1021        let r = ResourceId::new(1);
1022        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1023        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1024        // Txn 1 wants IX too (-> SIX), but SIX is incompatible with txn 2's S.
1025        assert_eq!(
1026            lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive),
1027            Err(LockError::Conflict)
1028        );
1029        // The original shared lock is intact.
1030        assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
1031    }
1032
1033    #[test]
1034    fn test_hierarchy_protocol_row_write_under_table_intent() {
1035        // Model a database/table/page/row hierarchy as four resources, and run
1036        // the standard protocol: IX coarse-to-fine, then X on the row.
1037        let lm = LockManager::new();
1038        let (db, table, page, row) = (
1039            ResourceId::new(1),
1040            ResourceId::new(2),
1041            ResourceId::new(3),
1042            ResourceId::new(4),
1043        );
1044        let writer = TxnId::new(1);
1045        for res in [db, table, page] {
1046            lm.try_acquire(writer, res, LockMode::IntentionExclusive)
1047                .unwrap();
1048        }
1049        lm.try_acquire(writer, row, LockMode::Exclusive).unwrap();
1050
1051        // A concurrent reader can still take IS down to a different page/row.
1052        let reader = TxnId::new(2);
1053        for res in [db, table] {
1054            lm.try_acquire(reader, res, LockMode::IntentionShared)
1055                .unwrap();
1056        }
1057        // But it cannot read the row the writer holds exclusively.
1058        assert_eq!(
1059            lm.try_acquire(reader, row, LockMode::Shared),
1060            Err(LockError::Conflict)
1061        );
1062    }
1063
1064    #[test]
1065    fn test_exclusive_excludes_exclusive() {
1066        let lm = LockManager::new();
1067        let (t1, r) = ids(1, 1);
1068        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1069        assert_eq!(
1070            lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
1071            Err(LockError::Conflict)
1072        );
1073    }
1074
1075    #[test]
1076    fn test_shared_blocks_other_exclusive() {
1077        let lm = LockManager::new();
1078        let (t1, r) = ids(1, 1);
1079        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1080        assert_eq!(
1081            lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
1082            Err(LockError::Conflict)
1083        );
1084    }
1085
1086    #[test]
1087    fn test_reacquire_same_mode_is_idempotent() {
1088        let lm = LockManager::new();
1089        let (t1, r) = ids(1, 1);
1090        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1091        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1092        assert_eq!(lm.holder_count(r), 1);
1093    }
1094
1095    #[test]
1096    fn test_request_weaker_than_held_is_noop() {
1097        let lm = LockManager::new();
1098        let (t1, r) = ids(1, 1);
1099        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1100        // Asking for shared while holding exclusive keeps the stronger mode.
1101        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1102        assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
1103        assert_eq!(lm.holder_count(r), 1);
1104    }
1105
1106    #[test]
1107    fn test_upgrade_sole_holder_succeeds() {
1108        let lm = LockManager::new();
1109        let (t1, r) = ids(1, 1);
1110        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1111        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1112        assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
1113        assert_eq!(lm.holder_count(r), 1);
1114    }
1115
1116    #[test]
1117    fn test_upgrade_blocked_by_other_reader() {
1118        let lm = LockManager::new();
1119        let r = ResourceId::new(1);
1120        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1121        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1122        assert_eq!(
1123            lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive),
1124            Err(LockError::Conflict)
1125        );
1126        // The failed upgrade left the original shared lock intact.
1127        assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
1128    }
1129
1130    #[test]
1131    fn test_release_frees_resource_for_exclusive() {
1132        let lm = LockManager::new();
1133        let r = ResourceId::new(1);
1134        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1135        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1136        lm.release(TxnId::new(1), r).unwrap();
1137        // One reader remains, exclusive still blocked.
1138        assert!(
1139            lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
1140                .is_err()
1141        );
1142        lm.release(TxnId::new(2), r).unwrap();
1143        lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
1144            .unwrap();
1145    }
1146
1147    #[test]
1148    fn test_release_not_held_errors() {
1149        let lm = LockManager::new();
1150        let (t1, r) = ids(1, 1);
1151        assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
1152        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
1153        assert_eq!(lm.release(TxnId::new(9), r), Err(LockError::NotHeld));
1154    }
1155
1156    #[test]
1157    fn test_double_release_errors() {
1158        let lm = LockManager::new();
1159        let (t1, r) = ids(1, 1);
1160        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
1161        lm.release(t1, r).unwrap();
1162        assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
1163    }
1164
1165    #[test]
1166    fn test_release_all_drops_every_lock() {
1167        let lm = LockManager::with_shards(8);
1168        let t = TxnId::new(1);
1169        for id in 0..50 {
1170            lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
1171                .unwrap();
1172        }
1173        assert_eq!(lm.release_all(t), 50);
1174        for id in 0..50 {
1175            assert_eq!(lm.holder_count(ResourceId::new(id)), 0);
1176        }
1177        assert_eq!(lm.release_all(t), 0);
1178    }
1179
1180    #[test]
1181    fn test_release_all_leaves_other_txns_alone() {
1182        let lm = LockManager::new();
1183        let r = ResourceId::new(1);
1184        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
1185        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
1186        assert_eq!(lm.release_all(TxnId::new(1)), 1);
1187        assert_eq!(lm.mode_held(TxnId::new(2), r), Some(LockMode::Shared));
1188        assert_eq!(lm.holder_count(r), 1);
1189    }
1190
1191    #[test]
1192    fn test_resource_fully_released_can_be_taken_exclusively() {
1193        let lm = LockManager::new();
1194        let r = ResourceId::new(42);
1195        lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive)
1196            .unwrap();
1197        lm.release(TxnId::new(1), r).unwrap();
1198        assert_eq!(lm.holder_count(r), 0);
1199        lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive)
1200            .unwrap();
1201    }
1202
1203    // ---- range locks ----
1204
1205    #[test]
1206    fn test_range_shared_overlap_coexists() {
1207        let lm = LockManager::new();
1208        let space = ResourceId::new(1);
1209        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
1210            .unwrap();
1211        lm.try_acquire_range(TxnId::new(2), space, kr(50, 150), LockMode::Shared)
1212            .unwrap();
1213        assert_eq!(lm.range_count(space), 2);
1214    }
1215
1216    #[test]
1217    fn test_range_exclusive_conflicts_on_overlap() {
1218        let lm = LockManager::new();
1219        let space = ResourceId::new(1);
1220        lm.try_acquire_range(TxnId::new(1), space, kr(100, 200), LockMode::Shared)
1221            .unwrap();
1222        assert_eq!(
1223            lm.try_acquire_range(
1224                TxnId::new(2),
1225                space,
1226                KeyRange::point(150),
1227                LockMode::Exclusive
1228            ),
1229            Err(LockError::Conflict)
1230        );
1231    }
1232
1233    #[test]
1234    fn test_range_disjoint_ranges_do_not_conflict() {
1235        let lm = LockManager::new();
1236        let space = ResourceId::new(1);
1237        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
1238            .unwrap();
1239        lm.try_acquire_range(TxnId::new(2), space, kr(101, 200), LockMode::Exclusive)
1240            .unwrap();
1241    }
1242
1243    #[test]
1244    fn test_range_adjacent_inclusive_bounds_conflict() {
1245        let lm = LockManager::new();
1246        let space = ResourceId::new(1);
1247        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
1248            .unwrap();
1249        // [100, 200] shares key 100 with [0, 100].
1250        assert_eq!(
1251            lm.try_acquire_range(TxnId::new(2), space, kr(100, 200), LockMode::Shared),
1252            Err(LockError::Conflict)
1253        );
1254    }
1255
1256    #[test]
1257    fn test_range_different_spaces_independent() {
1258        let lm = LockManager::new();
1259        lm.try_acquire_range(
1260            TxnId::new(1),
1261            ResourceId::new(1),
1262            kr(0, 100),
1263            LockMode::Exclusive,
1264        )
1265        .unwrap();
1266        // Same range, different space: no conflict.
1267        lm.try_acquire_range(
1268            TxnId::new(2),
1269            ResourceId::new(2),
1270            kr(0, 100),
1271            LockMode::Exclusive,
1272        )
1273        .unwrap();
1274    }
1275
1276    #[test]
1277    fn test_range_same_txn_overlap_allowed() {
1278        let lm = LockManager::new();
1279        let space = ResourceId::new(1);
1280        let t = TxnId::new(1);
1281        lm.try_acquire_range(t, space, kr(0, 100), LockMode::Exclusive)
1282            .unwrap();
1283        // A transaction does not conflict with its own ranges.
1284        lm.try_acquire_range(t, space, kr(50, 150), LockMode::Exclusive)
1285            .unwrap();
1286        assert_eq!(lm.range_count(space), 2);
1287    }
1288
1289    #[test]
1290    fn test_range_release_frees_overlap() {
1291        let lm = LockManager::new();
1292        let space = ResourceId::new(1);
1293        let r = kr(100, 200);
1294        lm.try_acquire_range(TxnId::new(1), space, r, LockMode::Exclusive)
1295            .unwrap();
1296        lm.release_range(TxnId::new(1), space, r).unwrap();
1297        assert_eq!(lm.range_count(space), 0);
1298        // Now another writer can take an overlapping range.
1299        lm.try_acquire_range(
1300            TxnId::new(2),
1301            space,
1302            KeyRange::point(150),
1303            LockMode::Exclusive,
1304        )
1305        .unwrap();
1306    }
1307
1308    #[test]
1309    fn test_range_release_not_held_errors() {
1310        let lm = LockManager::new();
1311        let space = ResourceId::new(1);
1312        assert_eq!(
1313            lm.release_range(TxnId::new(1), space, kr(0, 10)),
1314            Err(LockError::NotHeld)
1315        );
1316        lm.try_acquire_range(TxnId::new(1), space, kr(0, 10), LockMode::Shared)
1317            .unwrap();
1318        // Wrong range is NotHeld.
1319        assert_eq!(
1320            lm.release_range(TxnId::new(1), space, kr(0, 11)),
1321            Err(LockError::NotHeld)
1322        );
1323    }
1324
1325    #[test]
1326    fn test_release_all_drops_point_and_range_locks() {
1327        let lm = LockManager::new();
1328        let t = TxnId::new(1);
1329        for id in 0..3 {
1330            lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
1331                .unwrap();
1332        }
1333        lm.try_acquire_range(t, ResourceId::new(100), kr(0, 10), LockMode::Shared)
1334            .unwrap();
1335        lm.try_acquire_range(t, ResourceId::new(100), kr(20, 30), LockMode::Shared)
1336            .unwrap();
1337        assert_eq!(lm.release_all(t), 5); // 3 point + 2 range
1338        assert_eq!(lm.range_count(ResourceId::new(100)), 0);
1339        assert_eq!(lm.release_all(t), 0);
1340    }
1341
1342    #[test]
1343    fn test_release_all_range_leaves_other_txn() {
1344        let lm = LockManager::new();
1345        let space = ResourceId::new(1);
1346        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
1347            .unwrap();
1348        lm.try_acquire_range(TxnId::new(2), space, kr(0, 100), LockMode::Shared)
1349            .unwrap();
1350        assert_eq!(lm.release_all(TxnId::new(1)), 1);
1351        assert_eq!(lm.range_count(space), 1);
1352    }
1353
1354    #[test]
1355    fn test_range_intention_modes_coexist() {
1356        // IS and IX range locks are compatible, just like point locks.
1357        let lm = LockManager::new();
1358        let space = ResourceId::new(1);
1359        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::IntentionShared)
1360            .unwrap();
1361        lm.try_acquire_range(
1362            TxnId::new(2),
1363            space,
1364            kr(0, 100),
1365            LockMode::IntentionExclusive,
1366        )
1367        .unwrap();
1368        assert_eq!(lm.range_count(space), 2);
1369    }
1370
1371    // ---- deadlock-aware request ----
1372
1373    #[test]
1374    fn test_request_granted_on_free_resource() {
1375        let lm = LockManager::new();
1376        let (t, r) = ids(1, 1);
1377        assert_eq!(lm.request(t, r, LockMode::Exclusive), Acquisition::Granted);
1378        assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
1379        assert_eq!(lm.waiting_count(), 0);
1380    }
1381
1382    #[test]
1383    fn test_request_waiting_registers_wait() {
1384        let lm = LockManager::new();
1385        let r = ResourceId::new(1);
1386        assert_eq!(
1387            lm.request(TxnId::new(1), r, LockMode::Exclusive),
1388            Acquisition::Granted
1389        );
1390        assert_eq!(
1391            lm.request(TxnId::new(2), r, LockMode::Exclusive),
1392            Acquisition::Waiting
1393        );
1394        assert_eq!(lm.waiting_count(), 1);
1395    }
1396
1397    #[test]
1398    fn test_request_grant_clears_prior_wait() {
1399        let lm = LockManager::new();
1400        let r = ResourceId::new(1);
1401        let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
1402        assert_eq!(
1403            lm.request(TxnId::new(2), r, LockMode::Exclusive),
1404            Acquisition::Waiting
1405        );
1406        // T1 releases; T2 retries and is granted, clearing its wait.
1407        lm.release(TxnId::new(1), r).unwrap();
1408        assert_eq!(
1409            lm.request(TxnId::new(2), r, LockMode::Exclusive),
1410            Acquisition::Granted
1411        );
1412        assert_eq!(lm.waiting_count(), 0);
1413    }
1414
1415    #[test]
1416    fn test_classic_two_transaction_deadlock() {
1417        let lm = LockManager::new();
1418        let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1419        let (t1, t2) = (TxnId::new(1), TxnId::new(2));
1420
1421        assert_eq!(lm.request(t1, a, LockMode::Exclusive), Acquisition::Granted);
1422        assert_eq!(lm.request(t2, b, LockMode::Exclusive), Acquisition::Granted);
1423        assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
1424
1425        match lm.request(t2, a, LockMode::Exclusive) {
1426            Acquisition::Deadlock(d) => {
1427                assert_eq!(d.victim, t2); // youngest in the cycle
1428                assert_eq!(d.cycle.len(), 2);
1429                assert!(d.cycle.contains(&t1) && d.cycle.contains(&t2));
1430            }
1431            other => panic!("expected deadlock, got {other:?}"),
1432        }
1433    }
1434
1435    #[test]
1436    fn test_three_transaction_deadlock_cycle() {
1437        let lm = LockManager::new();
1438        let (a, b, c) = (ResourceId::new(1), ResourceId::new(2), ResourceId::new(3));
1439        let (t1, t2, t3) = (TxnId::new(1), TxnId::new(2), TxnId::new(3));
1440
1441        let _ = lm.request(t1, a, LockMode::Exclusive);
1442        let _ = lm.request(t2, b, LockMode::Exclusive);
1443        let _ = lm.request(t3, c, LockMode::Exclusive);
1444        // T1->B(T2), T2->C(T3), T3->A(T1): closes the loop on the third wait.
1445        assert_eq!(lm.request(t1, b, LockMode::Exclusive), Acquisition::Waiting);
1446        assert_eq!(lm.request(t2, c, LockMode::Exclusive), Acquisition::Waiting);
1447        match lm.request(t3, a, LockMode::Exclusive) {
1448            Acquisition::Deadlock(d) => {
1449                assert_eq!(d.cycle.len(), 3);
1450                assert_eq!(d.victim, t3); // youngest
1451            }
1452            other => panic!("expected deadlock, got {other:?}"),
1453        }
1454    }
1455
1456    #[test]
1457    fn test_aborting_victim_breaks_deadlock() {
1458        let lm = LockManager::new();
1459        let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1460        let (t1, t2) = (TxnId::new(1), TxnId::new(2));
1461
1462        let _ = lm.request(t1, a, LockMode::Exclusive);
1463        let _ = lm.request(t2, b, LockMode::Exclusive);
1464        let _ = lm.request(t1, b, LockMode::Exclusive);
1465        let victim = match lm.request(t2, a, LockMode::Exclusive) {
1466            Acquisition::Deadlock(d) => d.victim,
1467            other => panic!("expected deadlock, got {other:?}"),
1468        };
1469        // Abort the victim: releases its locks and clears its wait.
1470        lm.release_all(victim);
1471        // The other transaction can now make progress.
1472        let survivor = if victim == t1 { t2 } else { t1 };
1473        let want = if survivor == t1 { b } else { a };
1474        assert_eq!(
1475            lm.request(survivor, want, LockMode::Exclusive),
1476            Acquisition::Granted
1477        );
1478        assert!(lm.find_deadlock().is_none());
1479    }
1480
1481    #[test]
1482    fn test_no_false_deadlock_after_release() {
1483        // T1 waits for T2; T2 releases (not via the wait path). A later detection
1484        // must not report a deadlock from the now-stale wait edge.
1485        let lm = LockManager::new();
1486        let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1487        let (t1, t2) = (TxnId::new(1), TxnId::new(2));
1488
1489        let _ = lm.request(t1, a, LockMode::Exclusive);
1490        let _ = lm.request(t2, b, LockMode::Exclusive);
1491        let _ = lm.request(t1, b, LockMode::Exclusive); // T1 waits for T2 on B
1492        lm.release(t2, b).unwrap(); // B is now free; T1's edge is stale
1493        // T2 wants A (held by T1). Were T1's stale edge still counted, this would
1494        // look like a cycle. It must not: B is free, so T1 has no real out-edge.
1495        assert_eq!(lm.request(t2, a, LockMode::Exclusive), Acquisition::Waiting);
1496        assert!(lm.find_deadlock().is_none());
1497    }
1498
1499    #[test]
1500    fn test_cancel_wait_removes_from_graph() {
1501        let lm = LockManager::new();
1502        let r = ResourceId::new(1);
1503        let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
1504        assert_eq!(
1505            lm.request(TxnId::new(2), r, LockMode::Exclusive),
1506            Acquisition::Waiting
1507        );
1508        lm.cancel_wait(TxnId::new(2));
1509        assert_eq!(lm.waiting_count(), 0);
1510    }
1511
1512    #[test]
1513    fn test_release_all_clears_wait() {
1514        let lm = LockManager::new();
1515        let r = ResourceId::new(1);
1516        let _ = lm.request(TxnId::new(1), r, LockMode::Exclusive);
1517        let _ = lm.request(TxnId::new(2), r, LockMode::Exclusive); // T2 waits
1518        assert_eq!(lm.waiting_count(), 1);
1519        lm.release_all(TxnId::new(2));
1520        assert_eq!(lm.waiting_count(), 0);
1521    }
1522
1523    #[test]
1524    fn test_find_deadlock_none_without_cycle() {
1525        let lm = LockManager::new();
1526        let (a, b) = (ResourceId::new(1), ResourceId::new(2));
1527        let _ = lm.request(TxnId::new(1), a, LockMode::Exclusive);
1528        let _ = lm.request(TxnId::new(2), b, LockMode::Exclusive);
1529        let _ = lm.request(TxnId::new(1), b, LockMode::Exclusive); // one-way wait
1530        assert!(lm.find_deadlock().is_none());
1531    }
1532
1533    #[test]
1534    fn test_shared_requests_do_not_deadlock() {
1535        // Two shared requests on the same resource both grant; no waiting.
1536        let lm = LockManager::new();
1537        let r = ResourceId::new(1);
1538        assert_eq!(
1539            lm.request(TxnId::new(1), r, LockMode::Shared),
1540            Acquisition::Granted
1541        );
1542        assert_eq!(
1543            lm.request(TxnId::new(2), r, LockMode::Shared),
1544            Acquisition::Granted
1545        );
1546        assert_eq!(lm.waiting_count(), 0);
1547    }
1548
1549    #[test]
1550    fn test_with_shards_rounds_up_to_power_of_two() {
1551        assert_eq!(LockManager::with_shards(1).shards(), 1);
1552        assert_eq!(LockManager::with_shards(3).shards(), 4);
1553        assert_eq!(LockManager::with_shards(5).shards(), 8);
1554        assert_eq!(LockManager::with_shards(0).shards(), 1);
1555        assert_eq!(LockManager::with_shards(64).shards(), 64);
1556    }
1557
1558    #[test]
1559    fn test_single_shard_routes_everything_to_index_zero() {
1560        let lm = LockManager::with_shards(1);
1561        for id in 0..1000 {
1562            assert_eq!(lm.shard_index(ResourceId::new(id)), 0);
1563        }
1564    }
1565
1566    #[test]
1567    fn test_shard_index_within_bounds() {
1568        let lm = LockManager::with_shards(16);
1569        for id in 0..10_000 {
1570            assert!(lm.shard_index(ResourceId::new(id)) < 16);
1571        }
1572    }
1573
1574    #[test]
1575    fn test_sequential_ids_spread_across_shards() {
1576        let lm = LockManager::with_shards(16);
1577        let mut seen = [false; 16];
1578        for id in 0..256 {
1579            seen[lm.shard_index(ResourceId::new(id))] = true;
1580        }
1581        // Fibonacci hashing should touch every shard well before 256 ids.
1582        assert!(seen.iter().all(|&hit| hit));
1583    }
1584
1585    #[test]
1586    fn test_locks_in_different_shards_are_independent() {
1587        // Two resources that hash to different shards do not interfere.
1588        let lm = LockManager::with_shards(16);
1589        let a = ResourceId::new(1);
1590        let b = ResourceId::new(2);
1591        lm.try_acquire(TxnId::new(1), a, LockMode::Exclusive)
1592            .unwrap();
1593        lm.try_acquire(TxnId::new(2), b, LockMode::Exclusive)
1594            .unwrap();
1595        assert_eq!(lm.holder_count(a), 1);
1596        assert_eq!(lm.holder_count(b), 1);
1597    }
1598
1599    #[test]
1600    fn test_fib_hash_constant_is_odd() {
1601        // A multiplicative-hash multiplier must be odd to be a bijection mod 2^64.
1602        assert_eq!(FIB_HASH & 1, 1);
1603    }
1604
1605    #[test]
1606    fn test_concurrent_shared_acquire_release_is_consistent() {
1607        use std::sync::Arc;
1608        use std::thread;
1609
1610        let lm = Arc::new(LockManager::new());
1611        let r = ResourceId::new(7);
1612        let mut handles = Vec::new();
1613        for t in 0..8u64 {
1614            let lm = Arc::clone(&lm);
1615            handles.push(thread::spawn(move || {
1616                let txn = TxnId::new(t);
1617                for _ in 0..1000 {
1618                    lm.try_acquire(txn, r, LockMode::Shared).unwrap();
1619                    lm.release(txn, r).unwrap();
1620                }
1621            }));
1622        }
1623        for h in handles {
1624            h.join().unwrap();
1625        }
1626        // Every acquire was paired with a release; the resource is free.
1627        assert_eq!(lm.holder_count(r), 0);
1628    }
1629
1630    #[test]
1631    fn test_concurrent_exclusive_is_mutually_exclusive() {
1632        use std::sync::Arc;
1633        use std::sync::atomic::{AtomicUsize, Ordering};
1634        use std::thread;
1635
1636        let lm = Arc::new(LockManager::new());
1637        let active = Arc::new(AtomicUsize::new(0));
1638        let r = ResourceId::new(11);
1639        let mut handles = Vec::new();
1640        for t in 0..8u64 {
1641            let lm = Arc::clone(&lm);
1642            let active = Arc::clone(&active);
1643            handles.push(thread::spawn(move || {
1644                let txn = TxnId::new(t);
1645                for _ in 0..2000 {
1646                    if lm.try_acquire(txn, r, LockMode::Exclusive).is_ok() {
1647                        // While we hold X, no one else may be inside this region.
1648                        let inside = active.fetch_add(1, Ordering::SeqCst);
1649                        assert_eq!(inside, 0);
1650                        active.fetch_sub(1, Ordering::SeqCst);
1651                        lm.release(txn, r).unwrap();
1652                    }
1653                }
1654            }));
1655        }
1656        for h in handles {
1657            h.join().unwrap();
1658        }
1659        assert_eq!(lm.holder_count(r), 0);
1660    }
1661}