Skip to main content

lock_db/
manager.rs

1//! The lock table: a sharded, contention-aware map from resources to holders.
2//!
3//! # Design
4//!
5//! A single global mutex over the whole lock table would serialise every
6//! acquire and release in the database, turning the lock manager itself into
7//! the bottleneck it exists to manage. Instead the table is split into a fixed
8//! number of independent shards, each guarding its own slice of the resource
9//! space behind its own mutex. Two transactions touching resources in different
10//! shards never contend on the same lock. The shard for a resource is chosen by
11//! Fibonacci hashing its id, which spreads sequential ids (the common case for
12//! page and row numbers) evenly across shards without paying for a
13//! general-purpose hasher on the hot path.
14//!
15//! Each shard also keeps a reverse index from transaction to the resources it
16//! holds in that shard, so releasing every lock a transaction owns is
17//! proportional to the number of locks held, not to the size of the table.
18//!
19//! This release ([crate-level docs](crate)) provides non-blocking acquisition:
20//! a request that cannot be granted immediately returns [`LockError::Conflict`]
21//! rather than waiting. Blocking acquisition with wait queues, and the
22//! deadlock detection that requires it, arrive in a later milestone.
23
24#[cfg(loom)]
25use loom::sync::{Mutex, MutexGuard};
26#[cfg(not(loom))]
27use std::sync::{Mutex, MutexGuard};
28
29use std::collections::HashMap;
30
31use crate::{LockError, LockMode, ResourceId, TxnId};
32
33/// Multiplier for Fibonacci hashing: 2^64 divided by the golden ratio.
34const FIB_HASH: u64 = 0x9E37_79B9_7F4A_7C15;
35
36/// A transaction holding a resource, and the mode it holds it in.
37#[derive(Clone, Copy)]
38struct Holder {
39    txn: TxnId,
40    mode: LockMode,
41}
42
43/// The set of transactions currently holding one resource.
44///
45/// Holders are kept in an unordered `Vec` because the common case is a handful
46/// of shared readers or a single writer; a linear scan over a short, contiguous
47/// slice beats the constant overhead and indirection of a map for those sizes.
48struct LockEntry {
49    holders: Vec<Holder>,
50}
51
52impl LockEntry {
53    #[inline]
54    fn new() -> Self {
55        Self {
56            holders: Vec::new(),
57        }
58    }
59}
60
61/// The mutable state of one shard.
62struct ShardInner {
63    /// Resources with at least one holder, keyed by resource id.
64    locks: HashMap<ResourceId, LockEntry>,
65    /// Reverse index: the resources each transaction holds *in this shard*.
66    by_txn: HashMap<TxnId, Vec<ResourceId>>,
67}
68
69impl ShardInner {
70    fn new() -> Self {
71        Self {
72            locks: HashMap::new(),
73            by_txn: HashMap::new(),
74        }
75    }
76}
77
78/// One independently locked partition of the table.
79struct Shard {
80    inner: Mutex<ShardInner>,
81}
82
83/// A sharded lock table mapping resources to the transactions that hold them.
84///
85/// `LockManager` is the primary entry point of the crate. It is `Send + Sync`
86/// and is meant to be shared behind an [`std::sync::Arc`] across all worker
87/// threads; every method takes `&self`, so no outer lock is needed.
88///
89/// # Examples
90///
91/// ```
92/// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
93///
94/// let lm = LockManager::new();
95/// let row = ResourceId::new(100);
96/// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
97///
98/// // Two transactions read the same row concurrently.
99/// lm.try_acquire(t1, row, LockMode::Shared).unwrap();
100/// lm.try_acquire(t2, row, LockMode::Shared).unwrap();
101/// assert_eq!(lm.holder_count(row), 2);
102///
103/// // Neither can take it exclusively while the other reads.
104/// assert!(lm.try_acquire(t1, row, LockMode::Exclusive).is_err());
105///
106/// // After both release, an exclusive lock is free to take.
107/// lm.release(t1, row).unwrap();
108/// lm.release(t2, row).unwrap();
109/// lm.try_acquire(t1, row, LockMode::Exclusive).unwrap();
110/// ```
111#[must_use = "a LockManager that is dropped immediately releases every lock it holds"]
112pub struct LockManager {
113    shards: Box<[Shard]>,
114    /// `log2(shards.len())`; `0` when there is a single shard.
115    bits: u32,
116}
117
118impl LockManager {
119    /// Creates a lock manager with a shard count chosen for the current machine.
120    ///
121    /// The count scales with the number of available CPUs (rounded up to a power
122    /// of two) so that contention on any single shard mutex stays low on
123    /// multi-core systems. Use [`with_shards`](Self::with_shards) to pin an
124    /// exact count, for example in tests or on memory-constrained targets.
125    ///
126    /// # Examples
127    ///
128    /// ```
129    /// use lock_db::LockManager;
130    ///
131    /// let lm = LockManager::new();
132    /// assert!(lm.shards().is_power_of_two());
133    /// ```
134    pub fn new() -> Self {
135        let parallelism = std::thread::available_parallelism()
136            .map(|n| n.get())
137            .unwrap_or(1);
138        let target = (parallelism.saturating_mul(4))
139            .next_power_of_two()
140            .clamp(16, 1024);
141        Self::with_shards(target)
142    }
143
144    /// Creates a lock manager with an explicit shard count.
145    ///
146    /// `shards` is rounded up to the next power of two (and a request of `0` is
147    /// treated as `1`), which lets the shard lookup use a shift instead of a
148    /// remainder. More shards reduce contention but cost a mutex and two small
149    /// maps each; fewer shards save memory at the cost of more collisions.
150    ///
151    /// # Examples
152    ///
153    /// ```
154    /// use lock_db::LockManager;
155    ///
156    /// // Rounded up to the next power of two.
157    /// assert_eq!(LockManager::with_shards(5).shards(), 8);
158    /// assert_eq!(LockManager::with_shards(0).shards(), 1);
159    /// ```
160    pub fn with_shards(shards: usize) -> Self {
161        let n = shards.max(1).next_power_of_two();
162        let bits = n.trailing_zeros();
163        let mut v = Vec::with_capacity(n);
164        for _ in 0..n {
165            v.push(Shard {
166                inner: Mutex::new(ShardInner::new()),
167            });
168        }
169        Self {
170            shards: v.into_boxed_slice(),
171            bits,
172        }
173    }
174
175    /// Returns the number of shards in the table.
176    ///
177    /// Always a power of two.
178    #[inline]
179    #[must_use]
180    pub fn shards(&self) -> usize {
181        self.shards.len()
182    }
183
184    /// Tries to acquire `mode` on `res` for `txn` without blocking.
185    ///
186    /// The request is granted immediately and `Ok(())` is returned when:
187    ///
188    /// - `txn` already holds a lock on `res` that covers `mode` (re-acquisition
189    ///   is idempotent, and asking for a weaker mode than you hold is a no-op);
190    /// - `txn` already holds `res` shared, wants it exclusive, and is the only
191    ///   holder (an in-place upgrade); or
192    /// - no other transaction holds `res` in a mode incompatible with `mode`.
193    ///
194    /// Otherwise nothing is changed and [`LockError::Conflict`] is returned. The
195    /// caller decides whether to retry, wait, or abort; this method never blocks
196    /// the calling thread.
197    ///
198    /// # Errors
199    ///
200    /// Returns [`LockError::Conflict`] if the lock cannot be granted right now.
201    ///
202    /// # Examples
203    ///
204    /// ```
205    /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
206    ///
207    /// let lm = LockManager::new();
208    /// let key = ResourceId::new(7);
209    /// let t = TxnId::new(1);
210    ///
211    /// // Upgrade a shared lock to exclusive while sole holder.
212    /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
213    /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
214    /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Exclusive));
215    ///
216    /// // A second reader now conflicts with the upgraded exclusive lock.
217    /// let r = lm.try_acquire(TxnId::new(2), key, LockMode::Shared);
218    /// assert_eq!(r, Err(LockError::Conflict));
219    /// ```
220    pub fn try_acquire(
221        &self,
222        txn: TxnId,
223        res: ResourceId,
224        mode: LockMode,
225    ) -> Result<(), LockError> {
226        let mut guard = self.lock_shard(res);
227        let ShardInner { locks, by_txn } = &mut *guard;
228        let entry = locks.entry(res).or_insert_with(LockEntry::new);
229
230        if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
231            let current = entry.holders[pos].mode;
232            if current.covers(mode) {
233                return Ok(());
234            }
235            // Upgrade request (shared -> exclusive): only when sole holder.
236            if entry.holders.len() == 1 {
237                entry.holders[pos].mode = mode;
238                return Ok(());
239            }
240            return Err(LockError::Conflict);
241        }
242
243        if entry.holders.iter().all(|h| h.mode.compatible_with(mode)) {
244            entry.holders.push(Holder { txn, mode });
245            by_txn.entry(txn).or_default().push(res);
246            Ok(())
247        } else {
248            // The entry already had holders (an empty one would have matched the
249            // vacuous `all` above and been granted), so nothing to clean up.
250            Err(LockError::Conflict)
251        }
252    }
253
254    /// Releases the lock `txn` holds on `res`.
255    ///
256    /// # Errors
257    ///
258    /// Returns [`LockError::NotHeld`] if `txn` holds no lock on `res`, which
259    /// usually means a double release or a bookkeeping mismatch in the caller.
260    ///
261    /// # Examples
262    ///
263    /// ```
264    /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
265    ///
266    /// let lm = LockManager::new();
267    /// let key = ResourceId::new(3);
268    /// let t = TxnId::new(1);
269    ///
270    /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
271    /// lm.release(t, key).unwrap();
272    /// assert_eq!(lm.release(t, key), Err(LockError::NotHeld));
273    /// ```
274    pub fn release(&self, txn: TxnId, res: ResourceId) -> Result<(), LockError> {
275        let mut guard = self.lock_shard(res);
276        let ShardInner { locks, by_txn } = &mut *guard;
277
278        let entry = match locks.get_mut(&res) {
279            Some(entry) => entry,
280            None => return Err(LockError::NotHeld),
281        };
282        let pos = match entry.holders.iter().position(|h| h.txn == txn) {
283            Some(pos) => pos,
284            None => return Err(LockError::NotHeld),
285        };
286
287        let _ = entry.holders.swap_remove(pos);
288        if entry.holders.is_empty() {
289            let _ = locks.remove(&res);
290        }
291        Self::forget_resource(by_txn, txn, res);
292        Ok(())
293    }
294
295    /// Releases every lock held by `txn` across the whole table.
296    ///
297    /// This is the call a transaction layer makes at commit or abort to drop a
298    /// transaction's entire lock set at once. It returns the number of locks
299    /// released, and is proportional to that number rather than to the size of
300    /// the table.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
306    ///
307    /// let lm = LockManager::new();
308    /// let t = TxnId::new(1);
309    /// for id in 0..5 {
310    ///     lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive).unwrap();
311    /// }
312    ///
313    /// assert_eq!(lm.release_all(t), 5);
314    /// assert_eq!(lm.release_all(t), 0); // idempotent once empty
315    /// ```
316    pub fn release_all(&self, txn: TxnId) -> usize {
317        let mut released = 0;
318        for shard in self.shards.iter() {
319            let mut guard = Self::lock(shard);
320            let ShardInner { locks, by_txn } = &mut *guard;
321            let Some(resources) = by_txn.remove(&txn) else {
322                continue;
323            };
324            for res in resources {
325                if let Some(entry) = locks.get_mut(&res) {
326                    if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
327                        let _ = entry.holders.swap_remove(pos);
328                        released += 1;
329                        if entry.holders.is_empty() {
330                            let _ = locks.remove(&res);
331                        }
332                    }
333                }
334            }
335        }
336        released
337    }
338
339    /// Returns the number of transactions currently holding `res`.
340    ///
341    /// Mostly useful for diagnostics and tests; in steady state this is `0`,
342    /// `1` for an exclusive lock, or the reader count for a shared lock.
343    ///
344    /// # Examples
345    ///
346    /// ```
347    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
348    ///
349    /// let lm = LockManager::new();
350    /// let key = ResourceId::new(1);
351    /// assert_eq!(lm.holder_count(key), 0);
352    /// lm.try_acquire(TxnId::new(1), key, LockMode::Shared).unwrap();
353    /// assert_eq!(lm.holder_count(key), 1);
354    /// ```
355    #[must_use]
356    pub fn holder_count(&self, res: ResourceId) -> usize {
357        let guard = self.lock_shard(res);
358        guard.locks.get(&res).map_or(0, |e| e.holders.len())
359    }
360
361    /// Returns the mode in which `txn` holds `res`, or `None` if it holds no
362    /// lock on it.
363    ///
364    /// # Examples
365    ///
366    /// ```
367    /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
368    ///
369    /// let lm = LockManager::new();
370    /// let key = ResourceId::new(1);
371    /// let t = TxnId::new(1);
372    /// assert_eq!(lm.mode_held(t, key), None);
373    /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
374    /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Shared));
375    /// ```
376    #[must_use]
377    pub fn mode_held(&self, txn: TxnId, res: ResourceId) -> Option<LockMode> {
378        let guard = self.lock_shard(res);
379        guard
380            .locks
381            .get(&res)
382            .and_then(|e| e.holders.iter().find(|h| h.txn == txn))
383            .map(|h| h.mode)
384    }
385
386    /// Drops `res` from a transaction's reverse-index entry, removing the entry
387    /// entirely once the transaction holds nothing else in the shard.
388    #[inline]
389    fn forget_resource(by_txn: &mut HashMap<TxnId, Vec<ResourceId>>, txn: TxnId, res: ResourceId) {
390        if let Some(resources) = by_txn.get_mut(&txn) {
391            if let Some(pos) = resources.iter().position(|r| *r == res) {
392                let _ = resources.swap_remove(pos);
393            }
394            if resources.is_empty() {
395                let _ = by_txn.remove(&txn);
396            }
397        }
398    }
399
400    /// Locks and returns the shard that owns `res`.
401    #[inline]
402    fn lock_shard(&self, res: ResourceId) -> MutexGuard<'_, ShardInner> {
403        Self::lock(&self.shards[self.shard_index(res)])
404    }
405
406    /// Locks a shard, recovering its guard if the mutex was poisoned.
407    ///
408    /// Critical sections in this module perform only infallible map and vector
409    /// operations and never panic, so poisoning cannot leave inconsistent
410    /// state. Recovering the guard keeps the lock manager available rather than
411    /// propagating a poison error that no caller could act on.
412    #[inline]
413    fn lock(shard: &Shard) -> MutexGuard<'_, ShardInner> {
414        match shard.inner.lock() {
415            Ok(guard) => guard,
416            Err(poisoned) => poisoned.into_inner(),
417        }
418    }
419
420    /// Maps a resource id to a shard index via Fibonacci hashing.
421    #[inline]
422    fn shard_index(&self, res: ResourceId) -> usize {
423        if self.bits == 0 {
424            return 0;
425        }
426        let hash = res.get().wrapping_mul(FIB_HASH);
427        // Take the top `bits` bits: the most-mixed end of a multiplicative hash.
428        (hash >> (u64::BITS - self.bits)) as usize
429    }
430}
431
432impl Default for LockManager {
433    fn default() -> Self {
434        Self::new()
435    }
436}
437
438#[cfg(all(test, not(loom)))]
439#[allow(clippy::unwrap_used)]
440mod tests {
441    use super::{FIB_HASH, LockManager};
442    use crate::{LockError, LockMode, ResourceId, TxnId};
443
444    fn ids(t: u64, r: u64) -> (TxnId, ResourceId) {
445        (TxnId::new(t), ResourceId::new(r))
446    }
447
448    #[test]
449    fn test_shared_locks_coexist() {
450        let lm = LockManager::new();
451        let r = ResourceId::new(1);
452        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
453        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
454        lm.try_acquire(TxnId::new(3), r, LockMode::Shared).unwrap();
455        assert_eq!(lm.holder_count(r), 3);
456    }
457
458    #[test]
459    fn test_exclusive_excludes_shared() {
460        let lm = LockManager::new();
461        let (t1, r) = ids(1, 1);
462        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
463        assert_eq!(
464            lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
465            Err(LockError::Conflict)
466        );
467    }
468
469    #[test]
470    fn test_exclusive_excludes_exclusive() {
471        let lm = LockManager::new();
472        let (t1, r) = ids(1, 1);
473        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
474        assert_eq!(
475            lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
476            Err(LockError::Conflict)
477        );
478    }
479
480    #[test]
481    fn test_shared_blocks_other_exclusive() {
482        let lm = LockManager::new();
483        let (t1, r) = ids(1, 1);
484        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
485        assert_eq!(
486            lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
487            Err(LockError::Conflict)
488        );
489    }
490
491    #[test]
492    fn test_reacquire_same_mode_is_idempotent() {
493        let lm = LockManager::new();
494        let (t1, r) = ids(1, 1);
495        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
496        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
497        assert_eq!(lm.holder_count(r), 1);
498    }
499
500    #[test]
501    fn test_request_weaker_than_held_is_noop() {
502        let lm = LockManager::new();
503        let (t1, r) = ids(1, 1);
504        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
505        // Asking for shared while holding exclusive keeps the stronger mode.
506        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
507        assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
508        assert_eq!(lm.holder_count(r), 1);
509    }
510
511    #[test]
512    fn test_upgrade_sole_holder_succeeds() {
513        let lm = LockManager::new();
514        let (t1, r) = ids(1, 1);
515        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
516        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
517        assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
518        assert_eq!(lm.holder_count(r), 1);
519    }
520
521    #[test]
522    fn test_upgrade_blocked_by_other_reader() {
523        let lm = LockManager::new();
524        let r = ResourceId::new(1);
525        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
526        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
527        assert_eq!(
528            lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive),
529            Err(LockError::Conflict)
530        );
531        // The failed upgrade left the original shared lock intact.
532        assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
533    }
534
535    #[test]
536    fn test_release_frees_resource_for_exclusive() {
537        let lm = LockManager::new();
538        let r = ResourceId::new(1);
539        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
540        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
541        lm.release(TxnId::new(1), r).unwrap();
542        // One reader remains, exclusive still blocked.
543        assert!(
544            lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
545                .is_err()
546        );
547        lm.release(TxnId::new(2), r).unwrap();
548        lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
549            .unwrap();
550    }
551
552    #[test]
553    fn test_release_not_held_errors() {
554        let lm = LockManager::new();
555        let (t1, r) = ids(1, 1);
556        assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
557        lm.try_acquire(t1, r, LockMode::Shared).unwrap();
558        assert_eq!(lm.release(TxnId::new(9), r), Err(LockError::NotHeld));
559    }
560
561    #[test]
562    fn test_double_release_errors() {
563        let lm = LockManager::new();
564        let (t1, r) = ids(1, 1);
565        lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
566        lm.release(t1, r).unwrap();
567        assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
568    }
569
570    #[test]
571    fn test_release_all_drops_every_lock() {
572        let lm = LockManager::with_shards(8);
573        let t = TxnId::new(1);
574        for id in 0..50 {
575            lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
576                .unwrap();
577        }
578        assert_eq!(lm.release_all(t), 50);
579        for id in 0..50 {
580            assert_eq!(lm.holder_count(ResourceId::new(id)), 0);
581        }
582        assert_eq!(lm.release_all(t), 0);
583    }
584
585    #[test]
586    fn test_release_all_leaves_other_txns_alone() {
587        let lm = LockManager::new();
588        let r = ResourceId::new(1);
589        lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
590        lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
591        assert_eq!(lm.release_all(TxnId::new(1)), 1);
592        assert_eq!(lm.mode_held(TxnId::new(2), r), Some(LockMode::Shared));
593        assert_eq!(lm.holder_count(r), 1);
594    }
595
596    #[test]
597    fn test_resource_fully_released_can_be_taken_exclusively() {
598        let lm = LockManager::new();
599        let r = ResourceId::new(42);
600        lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive)
601            .unwrap();
602        lm.release(TxnId::new(1), r).unwrap();
603        assert_eq!(lm.holder_count(r), 0);
604        lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive)
605            .unwrap();
606    }
607
608    #[test]
609    fn test_with_shards_rounds_up_to_power_of_two() {
610        assert_eq!(LockManager::with_shards(1).shards(), 1);
611        assert_eq!(LockManager::with_shards(3).shards(), 4);
612        assert_eq!(LockManager::with_shards(5).shards(), 8);
613        assert_eq!(LockManager::with_shards(0).shards(), 1);
614        assert_eq!(LockManager::with_shards(64).shards(), 64);
615    }
616
617    #[test]
618    fn test_single_shard_routes_everything_to_index_zero() {
619        let lm = LockManager::with_shards(1);
620        for id in 0..1000 {
621            assert_eq!(lm.shard_index(ResourceId::new(id)), 0);
622        }
623    }
624
625    #[test]
626    fn test_shard_index_within_bounds() {
627        let lm = LockManager::with_shards(16);
628        for id in 0..10_000 {
629            assert!(lm.shard_index(ResourceId::new(id)) < 16);
630        }
631    }
632
633    #[test]
634    fn test_sequential_ids_spread_across_shards() {
635        let lm = LockManager::with_shards(16);
636        let mut seen = [false; 16];
637        for id in 0..256 {
638            seen[lm.shard_index(ResourceId::new(id))] = true;
639        }
640        // Fibonacci hashing should touch every shard well before 256 ids.
641        assert!(seen.iter().all(|&hit| hit));
642    }
643
644    #[test]
645    fn test_locks_in_different_shards_are_independent() {
646        // Two resources that hash to different shards do not interfere.
647        let lm = LockManager::with_shards(16);
648        let a = ResourceId::new(1);
649        let b = ResourceId::new(2);
650        lm.try_acquire(TxnId::new(1), a, LockMode::Exclusive)
651            .unwrap();
652        lm.try_acquire(TxnId::new(2), b, LockMode::Exclusive)
653            .unwrap();
654        assert_eq!(lm.holder_count(a), 1);
655        assert_eq!(lm.holder_count(b), 1);
656    }
657
658    #[test]
659    fn test_fib_hash_constant_is_odd() {
660        // A multiplicative-hash multiplier must be odd to be a bijection mod 2^64.
661        assert_eq!(FIB_HASH & 1, 1);
662    }
663
664    #[test]
665    fn test_concurrent_shared_acquire_release_is_consistent() {
666        use std::sync::Arc;
667        use std::thread;
668
669        let lm = Arc::new(LockManager::new());
670        let r = ResourceId::new(7);
671        let mut handles = Vec::new();
672        for t in 0..8u64 {
673            let lm = Arc::clone(&lm);
674            handles.push(thread::spawn(move || {
675                let txn = TxnId::new(t);
676                for _ in 0..1000 {
677                    lm.try_acquire(txn, r, LockMode::Shared).unwrap();
678                    lm.release(txn, r).unwrap();
679                }
680            }));
681        }
682        for h in handles {
683            h.join().unwrap();
684        }
685        // Every acquire was paired with a release; the resource is free.
686        assert_eq!(lm.holder_count(r), 0);
687    }
688
689    #[test]
690    fn test_concurrent_exclusive_is_mutually_exclusive() {
691        use std::sync::Arc;
692        use std::sync::atomic::{AtomicUsize, Ordering};
693        use std::thread;
694
695        let lm = Arc::new(LockManager::new());
696        let active = Arc::new(AtomicUsize::new(0));
697        let r = ResourceId::new(11);
698        let mut handles = Vec::new();
699        for t in 0..8u64 {
700            let lm = Arc::clone(&lm);
701            let active = Arc::clone(&active);
702            handles.push(thread::spawn(move || {
703                let txn = TxnId::new(t);
704                for _ in 0..2000 {
705                    if lm.try_acquire(txn, r, LockMode::Exclusive).is_ok() {
706                        // While we hold X, no one else may be inside this region.
707                        let inside = active.fetch_add(1, Ordering::SeqCst);
708                        assert_eq!(inside, 0);
709                        active.fetch_sub(1, Ordering::SeqCst);
710                        lm.release(txn, r).unwrap();
711                    }
712                }
713            }));
714        }
715        for h in handles {
716            h.join().unwrap();
717        }
718        assert_eq!(lm.holder_count(r), 0);
719    }
720}