Skip to main content

lock_db/
manager.rs

1//! The lock table: a sharded, contention-aware map from resources to holders.
2//!
3//! # Design
4//!
5//! A single global mutex over the whole lock table would serialise every
6//! acquire and release in the database, turning the lock manager itself into
7//! the bottleneck it exists to manage. Instead the table is split into a fixed
8//! number of independent shards, each guarding its own slice of the resource
9//! space behind its own mutex. Two transactions touching resources in different
10//! shards never contend on the same lock. The shard for a resource is chosen by
11//! Fibonacci hashing its id, which spreads sequential ids (the common case for
12//! page and row numbers) evenly across shards without paying for a
13//! general-purpose hasher on the hot path.
14//!
15//! Each shard also keeps a reverse index from transaction to the resources it
16//! holds in that shard, so releasing every lock a transaction owns is
17//! proportional to the number of locks held, not to the size of the table.
18//!
19//! This release ([crate-level docs](crate)) provides non-blocking acquisition:
20//! a request that cannot be granted immediately returns [`LockError::Conflict`]
21//! rather than waiting. Blocking acquisition with wait queues, and the
22//! deadlock detection that requires it, arrive in a later milestone.
23
24#[cfg(loom)]
25use loom::sync::{Mutex, MutexGuard};
26#[cfg(not(loom))]
27use std::sync::{Mutex, MutexGuard};
28
29use std::collections::HashMap;
30
31use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
32
33/// Multiplier for Fibonacci hashing: 2^64 divided by the golden ratio.
34const FIB_HASH: u64 = 0x9E37_79B9_7F4A_7C15;
35
36/// A transaction holding a resource, and the mode it holds it in.
37#[derive(Clone, Copy)]
38struct Holder {
39    txn: TxnId,
40    mode: LockMode,
41}
42
43/// The set of transactions currently holding one resource.
44///
45/// Holders are kept in an unordered `Vec` because the common case is a handful
46/// of shared readers or a single writer; a linear scan over a short, contiguous
47/// slice beats the constant overhead and indirection of a map for those sizes.
48struct LockEntry {
49    holders: Vec<Holder>,
50}
51
52impl LockEntry {
53    #[inline]
54    fn new() -> Self {
55        Self {
56            holders: Vec::new(),
57        }
58    }
59}
60
61/// A transaction holding a key range in a space, and the mode it holds.
62#[derive(Clone, Copy)]
63struct RangeHolder {
64    txn: TxnId,
65    range: KeyRange,
66    mode: LockMode,
67}
68
69/// The active range locks in one key space.
70///
71/// Held in an unordered `Vec` and scanned linearly for overlap on each request.
72/// Overlap is not a key-equality lookup, so a hash map does not help; an
73/// interval tree would lower the asymptotic cost but is heavier and is left for
74/// a later release if profiling shows range contention dominates.
75struct RangeSpace {
76    holders: Vec<RangeHolder>,
77}
78
79impl RangeSpace {
80    #[inline]
81    fn new() -> Self {
82        Self {
83            holders: Vec::new(),
84        }
85    }
86}
87
88/// The mutable state of one shard.
89struct ShardInner {
90    /// Point locks: resources with at least one holder, keyed by resource id.
91    locks: HashMap<ResourceId, LockEntry>,
92    /// Reverse index: the resources each transaction holds *in this shard*.
93    by_txn: HashMap<TxnId, Vec<ResourceId>>,
94    /// Range locks, keyed by the space (e.g. an index) they protect.
95    ranges: HashMap<ResourceId, RangeSpace>,
96    /// Reverse index for range locks: the (space, range) pairs each transaction
97    /// holds *in this shard*.
98    range_by_txn: HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
99}
100
101impl ShardInner {
102    fn new() -> Self {
103        Self {
104            locks: HashMap::new(),
105            by_txn: HashMap::new(),
106            ranges: HashMap::new(),
107            range_by_txn: HashMap::new(),
108        }
109    }
110}
111
112/// One independently locked partition of the table.
113struct Shard {
114    inner: Mutex<ShardInner>,
115}
116
117/// A sharded lock table mapping resources to the transactions that hold them.
118///
119/// `LockManager` is the primary entry point of the crate. It is `Send + Sync`
120/// and is meant to be shared behind an [`std::sync::Arc`] across all worker
121/// threads; every method takes `&self`, so no outer lock is needed.
122///
123/// # Examples
124///
125/// ```
126/// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
127///
128/// let lm = LockManager::new();
129/// let row = ResourceId::new(100);
130/// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
131///
132/// // Two transactions read the same row concurrently.
133/// lm.try_acquire(t1, row, LockMode::Shared).unwrap();
134/// lm.try_acquire(t2, row, LockMode::Shared).unwrap();
135/// assert_eq!(lm.holder_count(row), 2);
136///
137/// // Neither can take it exclusively while the other reads.
138/// assert!(lm.try_acquire(t1, row, LockMode::Exclusive).is_err());
139///
140/// // After both release, an exclusive lock is free to take.
141/// lm.release(t1, row).unwrap();
142/// lm.release(t2, row).unwrap();
143/// lm.try_acquire(t1, row, LockMode::Exclusive).unwrap();
144/// ```
145#[must_use = "a LockManager that is dropped immediately releases every lock it holds"]
146pub struct LockManager {
147    shards: Box<[Shard]>,
148    /// `log2(shards.len())`; `0` when there is a single shard.
149    bits: u32,
150}
151
152impl LockManager {
153    /// Creates a lock manager with a shard count chosen for the current machine.
154    ///
155    /// The count scales with the number of available CPUs (rounded up to a power
156    /// of two) so that contention on any single shard mutex stays low on
157    /// multi-core systems. Use [`with_shards`](Self::with_shards) to pin an
158    /// exact count, for example in tests or on memory-constrained targets.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// use lock_db::LockManager;
164    ///
165    /// let lm = LockManager::new();
166    /// assert!(lm.shards().is_power_of_two());
167    /// ```
168    pub fn new() -> Self {
169        let parallelism = std::thread::available_parallelism()
170            .map(|n| n.get())
171            .unwrap_or(1);
172        let target = (parallelism.saturating_mul(4))
173            .next_power_of_two()
174            .clamp(16, 1024);
175        Self::with_shards(target)
176    }
177
178    /// Creates a lock manager with an explicit shard count.
179    ///
180    /// `shards` is rounded up to the next power of two (and a request of `0` is
181    /// treated as `1`), which lets the shard lookup use a shift instead of a
182    /// remainder. More shards reduce contention but cost a mutex and two small
183    /// maps each; fewer shards save memory at the cost of more collisions.
184    ///
185    /// # Examples
186    ///
187    /// ```
188    /// use lock_db::LockManager;
189    ///
190    /// // Rounded up to the next power of two.
191    /// assert_eq!(LockManager::with_shards(5).shards(), 8);
192    /// assert_eq!(LockManager::with_shards(0).shards(), 1);
193    /// ```
194    pub fn with_shards(shards: usize) -> Self {
195        let n = shards.max(1).next_power_of_two();
196        let bits = n.trailing_zeros();
197        let mut v = Vec::with_capacity(n);
198        for _ in 0..n {
199            v.push(Shard {
200                inner: Mutex::new(ShardInner::new()),
201            });
202        }
203        Self {
204            shards: v.into_boxed_slice(),
205            bits,
206        }
207    }
208
209    /// Returns the number of shards in the table.
210    ///
211    /// Always a power of two.
212    #[inline]
213    #[must_use]
214    pub fn shards(&self) -> usize {
215        self.shards.len()
216    }
217
218    /// Tries to acquire `mode` on `res` for `txn` without blocking.
219    ///
220    /// The request is granted immediately and `Ok(())` is returned when:
221    ///
222    /// - `txn` already holds a lock on `res` that [covers](LockMode::covers)
223    ///   `mode` (re-acquisition is idempotent, and asking for a weaker mode than
224    ///   you hold is a no-op);
225    /// - `txn` already holds `res` in some mode and the
226    ///   [join](LockMode::join) of that mode with `mode` is compatible with
227    ///   every other holder (an in-place upgrade — for example shared to
228    ///   exclusive when sole holder, or shared plus intention-exclusive to SIX);
229    ///   or
230    /// - `txn` holds nothing on `res` and `mode` is compatible with every
231    ///   current holder.
232    ///
233    /// Otherwise nothing is changed and [`LockError::Conflict`] is returned. The
234    /// caller decides whether to retry, wait, or abort; this method never blocks
235    /// the calling thread.
236    ///
237    /// # Errors
238    ///
239    /// Returns [`LockError::Conflict`] if the lock cannot be granted right now.
240    ///
241    /// # Examples
242    ///
243    /// ```
244    /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
245    ///
246    /// let lm = LockManager::new();
247    /// let key = ResourceId::new(7);
248    /// let t = TxnId::new(1);
249    ///
250    /// // Upgrade a shared lock to exclusive while sole holder.
251    /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
252    /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
253    /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Exclusive));
254    ///
255    /// // A second reader now conflicts with the upgraded exclusive lock.
256    /// let r = lm.try_acquire(TxnId::new(2), key, LockMode::Shared);
257    /// assert_eq!(r, Err(LockError::Conflict));
258    /// ```
259    pub fn try_acquire(
260        &self,
261        txn: TxnId,
262        res: ResourceId,
263        mode: LockMode,
264    ) -> Result<(), LockError> {
265        let mut guard = self.lock_shard(res);
266        let ShardInner { locks, by_txn, .. } = &mut *guard;
267        let entry = locks.entry(res).or_insert_with(LockEntry::new);
268
269        if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
270            let current = entry.holders[pos].mode;
271            if current.covers(mode) {
272                return Ok(());
273            }
274            // Upgrade: the transaction ends up holding the join (least upper
275            // bound) of what it has and what it asked for. The upgraded mode
276            // must be compatible with every *other* holder.
277            let target = current.join(mode);
278            let blocked = entry
279                .holders
280                .iter()
281                .enumerate()
282                .any(|(i, h)| i != pos && !h.mode.compatible_with(target));
283            if blocked {
284                return Err(LockError::Conflict);
285            }
286            entry.holders[pos].mode = target;
287            return Ok(());
288        }
289
290        if entry.holders.iter().all(|h| h.mode.compatible_with(mode)) {
291            entry.holders.push(Holder { txn, mode });
292            by_txn.entry(txn).or_default().push(res);
293            Ok(())
294        } else {
295            // The entry already had holders (an empty one would have matched the
296            // vacuous `all` above and been granted), so nothing to clean up.
297            Err(LockError::Conflict)
298        }
299    }
300
301    /// Releases the lock `txn` holds on `res`.
302    ///
303    /// # Errors
304    ///
305    /// Returns [`LockError::NotHeld`] if `txn` holds no lock on `res`, which
306    /// usually means a double release or a bookkeeping mismatch in the caller.
307    ///
308    /// # Examples
309    ///
310    /// ```
311    /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
312    ///
313    /// let lm = LockManager::new();
314    /// let key = ResourceId::new(3);
315    /// let t = TxnId::new(1);
316    ///
317    /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
318    /// lm.release(t, key).unwrap();
319    /// assert_eq!(lm.release(t, key), Err(LockError::NotHeld));
320    /// ```
321    pub fn release(&self, txn: TxnId, res: ResourceId) -> Result<(), LockError> {
322        let mut guard = self.lock_shard(res);
323        let ShardInner { locks, by_txn, .. } = &mut *guard;
324
325        let entry = match locks.get_mut(&res) {
326            Some(entry) => entry,
327            None => return Err(LockError::NotHeld),
328        };
329        let pos = match entry.holders.iter().position(|h| h.txn == txn) {
330            Some(pos) => pos,
331            None => return Err(LockError::NotHeld),
332        };
333
334        let _ = entry.holders.swap_remove(pos);
335        if entry.holders.is_empty() {
336            let _ = locks.remove(&res);
337        }
338        Self::forget_resource(by_txn, txn, res);
339        Ok(())
340    }
341
342    /// Releases every lock held by `txn` across the whole table — both point
343    /// locks and range locks.
344    ///
345    /// This is the call a transaction layer makes at commit or abort to drop a
346    /// transaction's entire lock set at once. It returns the number of locks
347    /// released, and is proportional to that number rather than to the size of
348    /// the table.
349    ///
350    /// # Examples
351    ///
352    /// ```
353    /// use lock_db::{KeyRange, LockManager, LockMode, ResourceId, TxnId};
354    ///
355    /// let lm = LockManager::new();
356    /// let t = TxnId::new(1);
357    /// for id in 0..5 {
358    ///     lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive).unwrap();
359    /// }
360    /// lm.try_acquire_range(t, ResourceId::new(99), KeyRange::point(1), LockMode::Shared).unwrap();
361    ///
362    /// assert_eq!(lm.release_all(t), 6); // 5 point locks + 1 range lock
363    /// assert_eq!(lm.release_all(t), 0); // idempotent once empty
364    /// ```
365    pub fn release_all(&self, txn: TxnId) -> usize {
366        let mut released = 0;
367        for shard in self.shards.iter() {
368            let mut guard = Self::lock(shard);
369            let ShardInner {
370                locks,
371                by_txn,
372                ranges,
373                range_by_txn,
374            } = &mut *guard;
375
376            if let Some(resources) = by_txn.remove(&txn) {
377                for res in resources {
378                    if let Some(entry) = locks.get_mut(&res) {
379                        if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
380                            let _ = entry.holders.swap_remove(pos);
381                            released += 1;
382                            if entry.holders.is_empty() {
383                                let _ = locks.remove(&res);
384                            }
385                        }
386                    }
387                }
388            }
389
390            if let Some(spaces) = range_by_txn.remove(&txn) {
391                for (space, range) in spaces {
392                    if let Some(rs) = ranges.get_mut(&space) {
393                        if let Some(pos) = rs
394                            .holders
395                            .iter()
396                            .position(|h| h.txn == txn && h.range == range)
397                        {
398                            let _ = rs.holders.swap_remove(pos);
399                            released += 1;
400                            if rs.holders.is_empty() {
401                                let _ = ranges.remove(&space);
402                            }
403                        }
404                    }
405                }
406            }
407        }
408        released
409    }
410
411    /// Returns the number of transactions currently holding `res`.
412    ///
413    /// Mostly useful for diagnostics and tests; in steady state this is `0`,
414    /// `1` for an exclusive lock, or the reader count for a shared lock.
415    ///
416    /// # Examples
417    ///
418    /// ```
419    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
420    ///
421    /// let lm = LockManager::new();
422    /// let key = ResourceId::new(1);
423    /// assert_eq!(lm.holder_count(key), 0);
424    /// lm.try_acquire(TxnId::new(1), key, LockMode::Shared).unwrap();
425    /// assert_eq!(lm.holder_count(key), 1);
426    /// ```
427    #[must_use]
428    pub fn holder_count(&self, res: ResourceId) -> usize {
429        let guard = self.lock_shard(res);
430        guard.locks.get(&res).map_or(0, |e| e.holders.len())
431    }
432
433    /// Returns the mode in which `txn` holds `res`, or `None` if it holds no
434    /// lock on it.
435    ///
436    /// # Examples
437    ///
438    /// ```
439    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
440    ///
441    /// let lm = LockManager::new();
442    /// let key = ResourceId::new(1);
443    /// let t = TxnId::new(1);
444    /// assert_eq!(lm.mode_held(t, key), None);
445    /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
446    /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Shared));
447    /// ```
448    #[must_use]
449    pub fn mode_held(&self, txn: TxnId, res: ResourceId) -> Option<LockMode> {
450        let guard = self.lock_shard(res);
451        guard
452            .locks
453            .get(&res)
454            .and_then(|e| e.holders.iter().find(|h| h.txn == txn))
455            .map(|h| h.mode)
456    }
457
458    /// Tries to acquire `mode` over the key range `range` in key space `space`,
459    /// for `txn`, without blocking.
460    ///
461    /// A range lock protects a contiguous span of keys — use it to stop another
462    /// transaction from inserting into, or writing within, a range you have
463    /// read (phantom and predicate protection). `space` identifies the key space
464    /// the range lives in, typically an index; ranges in different spaces never
465    /// conflict.
466    ///
467    /// The request is granted unless some **other** transaction already holds an
468    /// [overlapping](KeyRange::overlaps) range in `space` in an
469    /// [incompatible](LockMode::compatible_with) mode. The same transaction may
470    /// hold several ranges in a space, including overlapping ones; range locks
471    /// are not merged or upgraded.
472    ///
473    /// # Errors
474    ///
475    /// Returns [`LockError::Conflict`] if an overlapping, incompatible range is
476    /// held by another transaction.
477    ///
478    /// # Examples
479    ///
480    /// ```
481    /// use lock_db::{KeyRange, LockError, LockManager, LockMode, ResourceId, TxnId};
482    ///
483    /// let lm = LockManager::new();
484    /// let index = ResourceId::new(1);
485    ///
486    /// // A read lock over [100, 200].
487    /// lm.try_acquire_range(TxnId::new(1), index, KeyRange::new(100, 200).unwrap(), LockMode::Shared).unwrap();
488    ///
489    /// // Another reader may share the overlapping range...
490    /// lm.try_acquire_range(TxnId::new(2), index, KeyRange::new(150, 250).unwrap(), LockMode::Shared).unwrap();
491    ///
492    /// // ...but a writer inside it conflicts.
493    /// assert_eq!(
494    ///     lm.try_acquire_range(TxnId::new(3), index, KeyRange::point(150), LockMode::Exclusive),
495    ///     Err(LockError::Conflict),
496    /// );
497    /// ```
498    pub fn try_acquire_range(
499        &self,
500        txn: TxnId,
501        space: ResourceId,
502        range: KeyRange,
503        mode: LockMode,
504    ) -> Result<(), LockError> {
505        let mut guard = self.lock_shard(space);
506        let ShardInner {
507            ranges,
508            range_by_txn,
509            ..
510        } = &mut *guard;
511        let rs = ranges.entry(space).or_insert_with(RangeSpace::new);
512
513        let conflict = rs
514            .holders
515            .iter()
516            .any(|h| h.txn != txn && h.range.overlaps(range) && !h.mode.compatible_with(mode));
517        if conflict {
518            // A conflict implies a pre-existing holder, so the space entry is
519            // non-empty and there is nothing to clean up.
520            return Err(LockError::Conflict);
521        }
522
523        rs.holders.push(RangeHolder { txn, range, mode });
524        range_by_txn.entry(txn).or_default().push((space, range));
525        Ok(())
526    }
527
528    /// Releases a range lock `txn` holds over `range` in `space`.
529    ///
530    /// Matches on the transaction and the exact range. If the transaction holds
531    /// several locks on the identical range (in different modes), one is
532    /// released per call.
533    ///
534    /// # Errors
535    ///
536    /// Returns [`LockError::NotHeld`] if `txn` holds no lock on that exact range
537    /// in `space`.
538    ///
539    /// # Examples
540    ///
541    /// ```
542    /// use lock_db::{KeyRange, LockError, LockManager, LockMode, ResourceId, TxnId};
543    ///
544    /// let lm = LockManager::new();
545    /// let index = ResourceId::new(1);
546    /// let r = KeyRange::new(1, 10).unwrap();
547    /// let t = TxnId::new(1);
548    ///
549    /// lm.try_acquire_range(t, index, r, LockMode::Exclusive).unwrap();
550    /// lm.release_range(t, index, r).unwrap();
551    /// assert_eq!(lm.release_range(t, index, r), Err(LockError::NotHeld));
552    /// ```
553    pub fn release_range(
554        &self,
555        txn: TxnId,
556        space: ResourceId,
557        range: KeyRange,
558    ) -> Result<(), LockError> {
559        let mut guard = self.lock_shard(space);
560        let ShardInner {
561            ranges,
562            range_by_txn,
563            ..
564        } = &mut *guard;
565
566        let rs = match ranges.get_mut(&space) {
567            Some(rs) => rs,
568            None => return Err(LockError::NotHeld),
569        };
570        let pos = match rs
571            .holders
572            .iter()
573            .position(|h| h.txn == txn && h.range == range)
574        {
575            Some(pos) => pos,
576            None => return Err(LockError::NotHeld),
577        };
578
579        let _ = rs.holders.swap_remove(pos);
580        if rs.holders.is_empty() {
581            let _ = ranges.remove(&space);
582        }
583        Self::forget_range(range_by_txn, txn, space, range);
584        Ok(())
585    }
586
587    /// Returns the number of range locks currently held in `space`.
588    ///
589    /// Counts every holder, across all transactions and modes. Mostly useful
590    /// for diagnostics and tests.
591    ///
592    /// # Examples
593    ///
594    /// ```
595    /// use lock_db::{KeyRange, LockManager, LockMode, ResourceId, TxnId};
596    ///
597    /// let lm = LockManager::new();
598    /// let index = ResourceId::new(1);
599    /// assert_eq!(lm.range_count(index), 0);
600    /// lm.try_acquire_range(TxnId::new(1), index, KeyRange::point(1), LockMode::Shared).unwrap();
601    /// assert_eq!(lm.range_count(index), 1);
602    /// ```
603    #[must_use]
604    pub fn range_count(&self, space: ResourceId) -> usize {
605        let guard = self.lock_shard(space);
606        guard.ranges.get(&space).map_or(0, |rs| rs.holders.len())
607    }
608
609    /// Drops `res` from a transaction's reverse-index entry, removing the entry
610    /// entirely once the transaction holds nothing else in the shard.
611    #[inline]
612    fn forget_resource(by_txn: &mut HashMap<TxnId, Vec<ResourceId>>, txn: TxnId, res: ResourceId) {
613        if let Some(resources) = by_txn.get_mut(&txn) {
614            if let Some(pos) = resources.iter().position(|r| *r == res) {
615                let _ = resources.swap_remove(pos);
616            }
617            if resources.is_empty() {
618                let _ = by_txn.remove(&txn);
619            }
620        }
621    }
622
623    /// Drops one `(space, range)` pair from a transaction's range reverse-index
624    /// entry, removing the entry entirely once it is empty.
625    #[inline]
626    fn forget_range(
627        range_by_txn: &mut HashMap<TxnId, Vec<(ResourceId, KeyRange)>>,
628        txn: TxnId,
629        space: ResourceId,
630        range: KeyRange,
631    ) {
632        if let Some(held) = range_by_txn.get_mut(&txn) {
633            if let Some(pos) = held.iter().position(|(s, r)| *s == space && *r == range) {
634                let _ = held.swap_remove(pos);
635            }
636            if held.is_empty() {
637                let _ = range_by_txn.remove(&txn);
638            }
639        }
640    }
641
642    /// Locks and returns the shard that owns `res`.
643    #[inline]
644    fn lock_shard(&self, res: ResourceId) -> MutexGuard<'_, ShardInner> {
645        Self::lock(&self.shards[self.shard_index(res)])
646    }
647
648    /// Locks a shard, recovering its guard if the mutex was poisoned.
649    ///
650    /// Critical sections in this module perform only infallible map and vector
651    /// operations and never panic, so poisoning cannot leave inconsistent
652    /// state. Recovering the guard keeps the lock manager available rather than
653    /// propagating a poison error that no caller could act on.
654    #[inline]
655    fn lock(shard: &Shard) -> MutexGuard<'_, ShardInner> {
656        match shard.inner.lock() {
657            Ok(guard) => guard,
658            Err(poisoned) => poisoned.into_inner(),
659        }
660    }
661
662    /// Maps a resource id to a shard index via Fibonacci hashing.
663    #[inline]
664    fn shard_index(&self, res: ResourceId) -> usize {
665        if self.bits == 0 {
666            return 0;
667        }
668        let hash = res.get().wrapping_mul(FIB_HASH);
669        // Take the top `bits` bits: the most-mixed end of a multiplicative hash.
670        (hash >> (u64::BITS - self.bits)) as usize
671    }
672}
673
674impl Default for LockManager {
675    fn default() -> Self {
676        Self::new()
677    }
678}
679
680#[cfg(all(test, not(loom)))]
681#[allow(clippy::unwrap_used)]
682mod tests {
683    use super::{FIB_HASH, LockManager};
684    use crate::{KeyRange, LockError, LockMode, ResourceId, TxnId};
685
686    fn ids(t: u64, r: u64) -> (TxnId, ResourceId) {
687        (TxnId::new(t), ResourceId::new(r))
688    }
689
690    fn kr(start: u64, end: u64) -> KeyRange {
691        KeyRange::new(start, end).unwrap()
692    }
693
694    #[test]
695    fn test_shared_locks_coexist() {
696        let lm = LockManager::new();
697        let r = ResourceId::new(1);
698        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
699        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
700        lm.try_acquire(TxnId::new(3), r, LockMode::Shared).unwrap();
701        assert_eq!(lm.holder_count(r), 3);
702    }
703
704    #[test]
705    fn test_exclusive_excludes_shared() {
706        let lm = LockManager::new();
707        let (t1, r) = ids(1, 1);
708        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
709        assert_eq!(
710            lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
711            Err(LockError::Conflict)
712        );
713    }
714
715    #[test]
716    fn test_intention_shared_and_intention_exclusive_coexist() {
717        let lm = LockManager::new();
718        let r = ResourceId::new(1);
719        lm.try_acquire(TxnId::new(1), r, LockMode::IntentionShared)
720            .unwrap();
721        lm.try_acquire(TxnId::new(2), r, LockMode::IntentionExclusive)
722            .unwrap();
723        assert_eq!(lm.holder_count(r), 2);
724    }
725
726    #[test]
727    fn test_intention_exclusive_blocks_shared() {
728        let lm = LockManager::new();
729        let r = ResourceId::new(1);
730        lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive)
731            .unwrap();
732        assert_eq!(
733            lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
734            Err(LockError::Conflict)
735        );
736        // ...but another IX or an IS is fine.
737        lm.try_acquire(TxnId::new(3), r, LockMode::IntentionExclusive)
738            .unwrap();
739        lm.try_acquire(TxnId::new(4), r, LockMode::IntentionShared)
740            .unwrap();
741    }
742
743    #[test]
744    fn test_shared_plus_intention_exclusive_upgrades_to_six() {
745        let lm = LockManager::new();
746        let r = ResourceId::new(1);
747        let t = TxnId::new(1);
748        lm.try_acquire(t, r, LockMode::Shared).unwrap();
749        // Same txn now intends to write part of the subtree: S join IX = SIX.
750        lm.try_acquire(t, r, LockMode::IntentionExclusive).unwrap();
751        assert_eq!(lm.mode_held(t, r), Some(LockMode::SharedIntentionExclusive));
752        // An intention-shared holder still coexists with SIX.
753        lm.try_acquire(TxnId::new(2), r, LockMode::IntentionShared)
754            .unwrap();
755        // But a second reader does not.
756        assert_eq!(
757            lm.try_acquire(TxnId::new(3), r, LockMode::Shared),
758            Err(LockError::Conflict)
759        );
760    }
761
762    #[test]
763    fn test_intention_shared_upgrades_to_exclusive_when_sole_holder() {
764        let lm = LockManager::new();
765        let r = ResourceId::new(1);
766        let t = TxnId::new(1);
767        lm.try_acquire(t, r, LockMode::IntentionShared).unwrap();
768        lm.try_acquire(t, r, LockMode::Exclusive).unwrap();
769        assert_eq!(lm.mode_held(t, r), Some(LockMode::Exclusive));
770    }
771
772    #[test]
773    fn test_upgrade_to_six_blocked_by_other_reader() {
774        let lm = LockManager::new();
775        let r = ResourceId::new(1);
776        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
777        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
778        // Txn 1 wants IX too (-> SIX), but SIX is incompatible with txn 2's S.
779        assert_eq!(
780            lm.try_acquire(TxnId::new(1), r, LockMode::IntentionExclusive),
781            Err(LockError::Conflict)
782        );
783        // The original shared lock is intact.
784        assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
785    }
786
787    #[test]
788    fn test_hierarchy_protocol_row_write_under_table_intent() {
789        // Model a database/table/page/row hierarchy as four resources, and run
790        // the standard protocol: IX coarse-to-fine, then X on the row.
791        let lm = LockManager::new();
792        let (db, table, page, row) = (
793            ResourceId::new(1),
794            ResourceId::new(2),
795            ResourceId::new(3),
796            ResourceId::new(4),
797        );
798        let writer = TxnId::new(1);
799        for res in [db, table, page] {
800            lm.try_acquire(writer, res, LockMode::IntentionExclusive)
801                .unwrap();
802        }
803        lm.try_acquire(writer, row, LockMode::Exclusive).unwrap();
804
805        // A concurrent reader can still take IS down to a different page/row.
806        let reader = TxnId::new(2);
807        for res in [db, table] {
808            lm.try_acquire(reader, res, LockMode::IntentionShared)
809                .unwrap();
810        }
811        // But it cannot read the row the writer holds exclusively.
812        assert_eq!(
813            lm.try_acquire(reader, row, LockMode::Shared),
814            Err(LockError::Conflict)
815        );
816    }
817
818    #[test]
819    fn test_exclusive_excludes_exclusive() {
820        let lm = LockManager::new();
821        let (t1, r) = ids(1, 1);
822        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
823        assert_eq!(
824            lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
825            Err(LockError::Conflict)
826        );
827    }
828
829    #[test]
830    fn test_shared_blocks_other_exclusive() {
831        let lm = LockManager::new();
832        let (t1, r) = ids(1, 1);
833        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
834        assert_eq!(
835            lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
836            Err(LockError::Conflict)
837        );
838    }
839
840    #[test]
841    fn test_reacquire_same_mode_is_idempotent() {
842        let lm = LockManager::new();
843        let (t1, r) = ids(1, 1);
844        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
845        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
846        assert_eq!(lm.holder_count(r), 1);
847    }
848
849    #[test]
850    fn test_request_weaker_than_held_is_noop() {
851        let lm = LockManager::new();
852        let (t1, r) = ids(1, 1);
853        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
854        // Asking for shared while holding exclusive keeps the stronger mode.
855        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
856        assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
857        assert_eq!(lm.holder_count(r), 1);
858    }
859
860    #[test]
861    fn test_upgrade_sole_holder_succeeds() {
862        let lm = LockManager::new();
863        let (t1, r) = ids(1, 1);
864        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
865        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
866        assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
867        assert_eq!(lm.holder_count(r), 1);
868    }
869
870    #[test]
871    fn test_upgrade_blocked_by_other_reader() {
872        let lm = LockManager::new();
873        let r = ResourceId::new(1);
874        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
875        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
876        assert_eq!(
877            lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive),
878            Err(LockError::Conflict)
879        );
880        // The failed upgrade left the original shared lock intact.
881        assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
882    }
883
884    #[test]
885    fn test_release_frees_resource_for_exclusive() {
886        let lm = LockManager::new();
887        let r = ResourceId::new(1);
888        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
889        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
890        lm.release(TxnId::new(1), r).unwrap();
891        // One reader remains, exclusive still blocked.
892        assert!(
893            lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
894                .is_err()
895        );
896        lm.release(TxnId::new(2), r).unwrap();
897        lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
898            .unwrap();
899    }
900
901    #[test]
902    fn test_release_not_held_errors() {
903        let lm = LockManager::new();
904        let (t1, r) = ids(1, 1);
905        assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
906        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
907        assert_eq!(lm.release(TxnId::new(9), r), Err(LockError::NotHeld));
908    }
909
910    #[test]
911    fn test_double_release_errors() {
912        let lm = LockManager::new();
913        let (t1, r) = ids(1, 1);
914        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
915        lm.release(t1, r).unwrap();
916        assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
917    }
918
919    #[test]
920    fn test_release_all_drops_every_lock() {
921        let lm = LockManager::with_shards(8);
922        let t = TxnId::new(1);
923        for id in 0..50 {
924            lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
925                .unwrap();
926        }
927        assert_eq!(lm.release_all(t), 50);
928        for id in 0..50 {
929            assert_eq!(lm.holder_count(ResourceId::new(id)), 0);
930        }
931        assert_eq!(lm.release_all(t), 0);
932    }
933
934    #[test]
935    fn test_release_all_leaves_other_txns_alone() {
936        let lm = LockManager::new();
937        let r = ResourceId::new(1);
938        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
939        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
940        assert_eq!(lm.release_all(TxnId::new(1)), 1);
941        assert_eq!(lm.mode_held(TxnId::new(2), r), Some(LockMode::Shared));
942        assert_eq!(lm.holder_count(r), 1);
943    }
944
945    #[test]
946    fn test_resource_fully_released_can_be_taken_exclusively() {
947        let lm = LockManager::new();
948        let r = ResourceId::new(42);
949        lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive)
950            .unwrap();
951        lm.release(TxnId::new(1), r).unwrap();
952        assert_eq!(lm.holder_count(r), 0);
953        lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive)
954            .unwrap();
955    }
956
957    // ---- range locks ----
958
959    #[test]
960    fn test_range_shared_overlap_coexists() {
961        let lm = LockManager::new();
962        let space = ResourceId::new(1);
963        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
964            .unwrap();
965        lm.try_acquire_range(TxnId::new(2), space, kr(50, 150), LockMode::Shared)
966            .unwrap();
967        assert_eq!(lm.range_count(space), 2);
968    }
969
970    #[test]
971    fn test_range_exclusive_conflicts_on_overlap() {
972        let lm = LockManager::new();
973        let space = ResourceId::new(1);
974        lm.try_acquire_range(TxnId::new(1), space, kr(100, 200), LockMode::Shared)
975            .unwrap();
976        assert_eq!(
977            lm.try_acquire_range(
978                TxnId::new(2),
979                space,
980                KeyRange::point(150),
981                LockMode::Exclusive
982            ),
983            Err(LockError::Conflict)
984        );
985    }
986
987    #[test]
988    fn test_range_disjoint_ranges_do_not_conflict() {
989        let lm = LockManager::new();
990        let space = ResourceId::new(1);
991        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
992            .unwrap();
993        lm.try_acquire_range(TxnId::new(2), space, kr(101, 200), LockMode::Exclusive)
994            .unwrap();
995    }
996
997    #[test]
998    fn test_range_adjacent_inclusive_bounds_conflict() {
999        let lm = LockManager::new();
1000        let space = ResourceId::new(1);
1001        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Exclusive)
1002            .unwrap();
1003        // [100, 200] shares key 100 with [0, 100].
1004        assert_eq!(
1005            lm.try_acquire_range(TxnId::new(2), space, kr(100, 200), LockMode::Shared),
1006            Err(LockError::Conflict)
1007        );
1008    }
1009
1010    #[test]
1011    fn test_range_different_spaces_independent() {
1012        let lm = LockManager::new();
1013        lm.try_acquire_range(
1014            TxnId::new(1),
1015            ResourceId::new(1),
1016            kr(0, 100),
1017            LockMode::Exclusive,
1018        )
1019        .unwrap();
1020        // Same range, different space: no conflict.
1021        lm.try_acquire_range(
1022            TxnId::new(2),
1023            ResourceId::new(2),
1024            kr(0, 100),
1025            LockMode::Exclusive,
1026        )
1027        .unwrap();
1028    }
1029
1030    #[test]
1031    fn test_range_same_txn_overlap_allowed() {
1032        let lm = LockManager::new();
1033        let space = ResourceId::new(1);
1034        let t = TxnId::new(1);
1035        lm.try_acquire_range(t, space, kr(0, 100), LockMode::Exclusive)
1036            .unwrap();
1037        // A transaction does not conflict with its own ranges.
1038        lm.try_acquire_range(t, space, kr(50, 150), LockMode::Exclusive)
1039            .unwrap();
1040        assert_eq!(lm.range_count(space), 2);
1041    }
1042
1043    #[test]
1044    fn test_range_release_frees_overlap() {
1045        let lm = LockManager::new();
1046        let space = ResourceId::new(1);
1047        let r = kr(100, 200);
1048        lm.try_acquire_range(TxnId::new(1), space, r, LockMode::Exclusive)
1049            .unwrap();
1050        lm.release_range(TxnId::new(1), space, r).unwrap();
1051        assert_eq!(lm.range_count(space), 0);
1052        // Now another writer can take an overlapping range.
1053        lm.try_acquire_range(
1054            TxnId::new(2),
1055            space,
1056            KeyRange::point(150),
1057            LockMode::Exclusive,
1058        )
1059        .unwrap();
1060    }
1061
1062    #[test]
1063    fn test_range_release_not_held_errors() {
1064        let lm = LockManager::new();
1065        let space = ResourceId::new(1);
1066        assert_eq!(
1067            lm.release_range(TxnId::new(1), space, kr(0, 10)),
1068            Err(LockError::NotHeld)
1069        );
1070        lm.try_acquire_range(TxnId::new(1), space, kr(0, 10), LockMode::Shared)
1071            .unwrap();
1072        // Wrong range is NotHeld.
1073        assert_eq!(
1074            lm.release_range(TxnId::new(1), space, kr(0, 11)),
1075            Err(LockError::NotHeld)
1076        );
1077    }
1078
1079    #[test]
1080    fn test_release_all_drops_point_and_range_locks() {
1081        let lm = LockManager::new();
1082        let t = TxnId::new(1);
1083        for id in 0..3 {
1084            lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
1085                .unwrap();
1086        }
1087        lm.try_acquire_range(t, ResourceId::new(100), kr(0, 10), LockMode::Shared)
1088            .unwrap();
1089        lm.try_acquire_range(t, ResourceId::new(100), kr(20, 30), LockMode::Shared)
1090            .unwrap();
1091        assert_eq!(lm.release_all(t), 5); // 3 point + 2 range
1092        assert_eq!(lm.range_count(ResourceId::new(100)), 0);
1093        assert_eq!(lm.release_all(t), 0);
1094    }
1095
1096    #[test]
1097    fn test_release_all_range_leaves_other_txn() {
1098        let lm = LockManager::new();
1099        let space = ResourceId::new(1);
1100        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::Shared)
1101            .unwrap();
1102        lm.try_acquire_range(TxnId::new(2), space, kr(0, 100), LockMode::Shared)
1103            .unwrap();
1104        assert_eq!(lm.release_all(TxnId::new(1)), 1);
1105        assert_eq!(lm.range_count(space), 1);
1106    }
1107
1108    #[test]
1109    fn test_range_intention_modes_coexist() {
1110        // IS and IX range locks are compatible, just like point locks.
1111        let lm = LockManager::new();
1112        let space = ResourceId::new(1);
1113        lm.try_acquire_range(TxnId::new(1), space, kr(0, 100), LockMode::IntentionShared)
1114            .unwrap();
1115        lm.try_acquire_range(
1116            TxnId::new(2),
1117            space,
1118            kr(0, 100),
1119            LockMode::IntentionExclusive,
1120        )
1121        .unwrap();
1122        assert_eq!(lm.range_count(space), 2);
1123    }
1124
1125    #[test]
1126    fn test_with_shards_rounds_up_to_power_of_two() {
1127        assert_eq!(LockManager::with_shards(1).shards(), 1);
1128        assert_eq!(LockManager::with_shards(3).shards(), 4);
1129        assert_eq!(LockManager::with_shards(5).shards(), 8);
1130        assert_eq!(LockManager::with_shards(0).shards(), 1);
1131        assert_eq!(LockManager::with_shards(64).shards(), 64);
1132    }
1133
1134    #[test]
1135    fn test_single_shard_routes_everything_to_index_zero() {
1136        let lm = LockManager::with_shards(1);
1137        for id in 0..1000 {
1138            assert_eq!(lm.shard_index(ResourceId::new(id)), 0);
1139        }
1140    }
1141
1142    #[test]
1143    fn test_shard_index_within_bounds() {
1144        let lm = LockManager::with_shards(16);
1145        for id in 0..10_000 {
1146            assert!(lm.shard_index(ResourceId::new(id)) < 16);
1147        }
1148    }
1149
1150    #[test]
1151    fn test_sequential_ids_spread_across_shards() {
1152        let lm = LockManager::with_shards(16);
1153        let mut seen = [false; 16];
1154        for id in 0..256 {
1155            seen[lm.shard_index(ResourceId::new(id))] = true;
1156        }
1157        // Fibonacci hashing should touch every shard well before 256 ids.
1158        assert!(seen.iter().all(|&hit| hit));
1159    }
1160
1161    #[test]
1162    fn test_locks_in_different_shards_are_independent() {
1163        // Two resources that hash to different shards do not interfere.
1164        let lm = LockManager::with_shards(16);
1165        let a = ResourceId::new(1);
1166        let b = ResourceId::new(2);
1167        lm.try_acquire(TxnId::new(1), a, LockMode::Exclusive)
1168            .unwrap();
1169        lm.try_acquire(TxnId::new(2), b, LockMode::Exclusive)
1170            .unwrap();
1171        assert_eq!(lm.holder_count(a), 1);
1172        assert_eq!(lm.holder_count(b), 1);
1173    }
1174
1175    #[test]
1176    fn test_fib_hash_constant_is_odd() {
1177        // A multiplicative-hash multiplier must be odd to be a bijection mod 2^64.
1178        assert_eq!(FIB_HASH & 1, 1);
1179    }
1180
1181    #[test]
1182    fn test_concurrent_shared_acquire_release_is_consistent() {
1183        use std::sync::Arc;
1184        use std::thread;
1185
1186        let lm = Arc::new(LockManager::new());
1187        let r = ResourceId::new(7);
1188        let mut handles = Vec::new();
1189        for t in 0..8u64 {
1190            let lm = Arc::clone(&lm);
1191            handles.push(thread::spawn(move || {
1192                let txn = TxnId::new(t);
1193                for _ in 0..1000 {
1194                    lm.try_acquire(txn, r, LockMode::Shared).unwrap();
1195                    lm.release(txn, r).unwrap();
1196                }
1197            }));
1198        }
1199        for h in handles {
1200            h.join().unwrap();
1201        }
1202        // Every acquire was paired with a release; the resource is free.
1203        assert_eq!(lm.holder_count(r), 0);
1204    }
1205
1206    #[test]
1207    fn test_concurrent_exclusive_is_mutually_exclusive() {
1208        use std::sync::Arc;
1209        use std::sync::atomic::{AtomicUsize, Ordering};
1210        use std::thread;
1211
1212        let lm = Arc::new(LockManager::new());
1213        let active = Arc::new(AtomicUsize::new(0));
1214        let r = ResourceId::new(11);
1215        let mut handles = Vec::new();
1216        for t in 0..8u64 {
1217            let lm = Arc::clone(&lm);
1218            let active = Arc::clone(&active);
1219            handles.push(thread::spawn(move || {
1220                let txn = TxnId::new(t);
1221                for _ in 0..2000 {
1222                    if lm.try_acquire(txn, r, LockMode::Exclusive).is_ok() {
1223                        // While we hold X, no one else may be inside this region.
1224                        let inside = active.fetch_add(1, Ordering::SeqCst);
1225                        assert_eq!(inside, 0);
1226                        active.fetch_sub(1, Ordering::SeqCst);
1227                        lm.release(txn, r).unwrap();
1228                    }
1229                }
1230            }));
1231        }
1232        for h in handles {
1233            h.join().unwrap();
1234        }
1235        assert_eq!(lm.holder_count(r), 0);
1236    }
1237}