lock_db/manager.rs
1//! The lock table: a sharded, contention-aware map from resources to holders.
2//!
3//! # Design
4//!
5//! A single global mutex over the whole lock table would serialise every
6//! acquire and release in the database, turning the lock manager itself into
7//! the bottleneck it exists to manage. Instead the table is split into a fixed
8//! number of independent shards, each guarding its own slice of the resource
9//! space behind its own mutex. Two transactions touching resources in different
10//! shards never contend on the same lock. The shard for a resource is chosen by
11//! Fibonacci hashing its id, which spreads sequential ids (the common case for
12//! page and row numbers) evenly across shards without paying for a
13//! general-purpose hasher on the hot path.
14//!
15//! Each shard also keeps a reverse index from transaction to the resources it
16//! holds in that shard, so releasing every lock a transaction owns is
17//! proportional to the number of locks held, not to the size of the table.
18//!
19//! This release ([crate-level docs](crate)) provides non-blocking acquisition:
20//! a request that cannot be granted immediately returns [`LockError::Conflict`]
21//! rather than waiting. Blocking acquisition with wait queues, and the
22//! deadlock detection that requires it, arrive in a later milestone.
23
24#[cfg(loom)]
25use loom::sync::{Mutex, MutexGuard};
26#[cfg(not(loom))]
27use std::sync::{Mutex, MutexGuard};
28
29use std::collections::HashMap;
30
31use crate::{LockError, LockMode, ResourceId, TxnId};
32
33/// Multiplier for Fibonacci hashing: 2^64 divided by the golden ratio.
34const FIB_HASH: u64 = 0x9E37_79B9_7F4A_7C15;
35
36/// A transaction holding a resource, and the mode it holds it in.
37#[derive(Clone, Copy)]
38struct Holder {
39 txn: TxnId,
40 mode: LockMode,
41}
42
43/// The set of transactions currently holding one resource.
44///
45/// Holders are kept in an unordered `Vec` because the common case is a handful
46/// of shared readers or a single writer; a linear scan over a short, contiguous
47/// slice beats the constant overhead and indirection of a map for those sizes.
48struct LockEntry {
49 holders: Vec<Holder>,
50}
51
52impl LockEntry {
53 #[inline]
54 fn new() -> Self {
55 Self {
56 holders: Vec::new(),
57 }
58 }
59}
60
61/// The mutable state of one shard.
62struct ShardInner {
63 /// Resources with at least one holder, keyed by resource id.
64 locks: HashMap<ResourceId, LockEntry>,
65 /// Reverse index: the resources each transaction holds *in this shard*.
66 by_txn: HashMap<TxnId, Vec<ResourceId>>,
67}
68
69impl ShardInner {
70 fn new() -> Self {
71 Self {
72 locks: HashMap::new(),
73 by_txn: HashMap::new(),
74 }
75 }
76}
77
78/// One independently locked partition of the table.
79struct Shard {
80 inner: Mutex<ShardInner>,
81}
82
83/// A sharded lock table mapping resources to the transactions that hold them.
84///
85/// `LockManager` is the primary entry point of the crate. It is `Send + Sync`
86/// and is meant to be shared behind an [`std::sync::Arc`] across all worker
87/// threads; every method takes `&self`, so no outer lock is needed.
88///
89/// # Examples
90///
91/// ```
92/// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
93///
94/// let lm = LockManager::new();
95/// let row = ResourceId::new(100);
96/// let (t1, t2) = (TxnId::new(1), TxnId::new(2));
97///
98/// // Two transactions read the same row concurrently.
99/// lm.try_acquire(t1, row, LockMode::Shared).unwrap();
100/// lm.try_acquire(t2, row, LockMode::Shared).unwrap();
101/// assert_eq!(lm.holder_count(row), 2);
102///
103/// // Neither can take it exclusively while the other reads.
104/// assert!(lm.try_acquire(t1, row, LockMode::Exclusive).is_err());
105///
106/// // After both release, an exclusive lock is free to take.
107/// lm.release(t1, row).unwrap();
108/// lm.release(t2, row).unwrap();
109/// lm.try_acquire(t1, row, LockMode::Exclusive).unwrap();
110/// ```
111#[must_use = "a LockManager that is dropped immediately releases every lock it holds"]
112pub struct LockManager {
113 shards: Box<[Shard]>,
114 /// `log2(shards.len())`; `0` when there is a single shard.
115 bits: u32,
116}
117
118impl LockManager {
119 /// Creates a lock manager with a shard count chosen for the current machine.
120 ///
121 /// The count scales with the number of available CPUs (rounded up to a power
122 /// of two) so that contention on any single shard mutex stays low on
123 /// multi-core systems. Use [`with_shards`](Self::with_shards) to pin an
124 /// exact count, for example in tests or on memory-constrained targets.
125 ///
126 /// # Examples
127 ///
128 /// ```
129 /// use lock_db::LockManager;
130 ///
131 /// let lm = LockManager::new();
132 /// assert!(lm.shards().is_power_of_two());
133 /// ```
134 pub fn new() -> Self {
135 let parallelism = std::thread::available_parallelism()
136 .map(|n| n.get())
137 .unwrap_or(1);
138 let target = (parallelism.saturating_mul(4))
139 .next_power_of_two()
140 .clamp(16, 1024);
141 Self::with_shards(target)
142 }
143
144 /// Creates a lock manager with an explicit shard count.
145 ///
146 /// `shards` is rounded up to the next power of two (and a request of `0` is
147 /// treated as `1`), which lets the shard lookup use a shift instead of a
148 /// remainder. More shards reduce contention but cost a mutex and two small
149 /// maps each; fewer shards save memory at the cost of more collisions.
150 ///
151 /// # Examples
152 ///
153 /// ```
154 /// use lock_db::LockManager;
155 ///
156 /// // Rounded up to the next power of two.
157 /// assert_eq!(LockManager::with_shards(5).shards(), 8);
158 /// assert_eq!(LockManager::with_shards(0).shards(), 1);
159 /// ```
160 pub fn with_shards(shards: usize) -> Self {
161 let n = shards.max(1).next_power_of_two();
162 let bits = n.trailing_zeros();
163 let mut v = Vec::with_capacity(n);
164 for _ in 0..n {
165 v.push(Shard {
166 inner: Mutex::new(ShardInner::new()),
167 });
168 }
169 Self {
170 shards: v.into_boxed_slice(),
171 bits,
172 }
173 }
174
175 /// Returns the number of shards in the table.
176 ///
177 /// Always a power of two.
178 #[inline]
179 #[must_use]
180 pub fn shards(&self) -> usize {
181 self.shards.len()
182 }
183
184 /// Tries to acquire `mode` on `res` for `txn` without blocking.
185 ///
186 /// The request is granted immediately and `Ok(())` is returned when:
187 ///
188 /// - `txn` already holds a lock on `res` that covers `mode` (re-acquisition
189 /// is idempotent, and asking for a weaker mode than you hold is a no-op);
190 /// - `txn` already holds `res` shared, wants it exclusive, and is the only
191 /// holder (an in-place upgrade); or
192 /// - no other transaction holds `res` in a mode incompatible with `mode`.
193 ///
194 /// Otherwise nothing is changed and [`LockError::Conflict`] is returned. The
195 /// caller decides whether to retry, wait, or abort; this method never blocks
196 /// the calling thread.
197 ///
198 /// # Errors
199 ///
200 /// Returns [`LockError::Conflict`] if the lock cannot be granted right now.
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
206 ///
207 /// let lm = LockManager::new();
208 /// let key = ResourceId::new(7);
209 /// let t = TxnId::new(1);
210 ///
211 /// // Upgrade a shared lock to exclusive while sole holder.
212 /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
213 /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
214 /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Exclusive));
215 ///
216 /// // A second reader now conflicts with the upgraded exclusive lock.
217 /// let r = lm.try_acquire(TxnId::new(2), key, LockMode::Shared);
218 /// assert_eq!(r, Err(LockError::Conflict));
219 /// ```
220 pub fn try_acquire(
221 &self,
222 txn: TxnId,
223 res: ResourceId,
224 mode: LockMode,
225 ) -> Result<(), LockError> {
226 let mut guard = self.lock_shard(res);
227 let ShardInner { locks, by_txn } = &mut *guard;
228 let entry = locks.entry(res).or_insert_with(LockEntry::new);
229
230 if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
231 let current = entry.holders[pos].mode;
232 if current.covers(mode) {
233 return Ok(());
234 }
235 // Upgrade request (shared -> exclusive): only when sole holder.
236 if entry.holders.len() == 1 {
237 entry.holders[pos].mode = mode;
238 return Ok(());
239 }
240 return Err(LockError::Conflict);
241 }
242
243 if entry.holders.iter().all(|h| h.mode.compatible_with(mode)) {
244 entry.holders.push(Holder { txn, mode });
245 by_txn.entry(txn).or_default().push(res);
246 Ok(())
247 } else {
248 // The entry already had holders (an empty one would have matched the
249 // vacuous `all` above and been granted), so nothing to clean up.
250 Err(LockError::Conflict)
251 }
252 }
253
254 /// Releases the lock `txn` holds on `res`.
255 ///
256 /// # Errors
257 ///
258 /// Returns [`LockError::NotHeld`] if `txn` holds no lock on `res`, which
259 /// usually means a double release or a bookkeeping mismatch in the caller.
260 ///
261 /// # Examples
262 ///
263 /// ```
264 /// use lock_db::{LockError, LockManager, LockMode, ResourceId, TxnId};
265 ///
266 /// let lm = LockManager::new();
267 /// let key = ResourceId::new(3);
268 /// let t = TxnId::new(1);
269 ///
270 /// lm.try_acquire(t, key, LockMode::Exclusive).unwrap();
271 /// lm.release(t, key).unwrap();
272 /// assert_eq!(lm.release(t, key), Err(LockError::NotHeld));
273 /// ```
274 pub fn release(&self, txn: TxnId, res: ResourceId) -> Result<(), LockError> {
275 let mut guard = self.lock_shard(res);
276 let ShardInner { locks, by_txn } = &mut *guard;
277
278 let entry = match locks.get_mut(&res) {
279 Some(entry) => entry,
280 None => return Err(LockError::NotHeld),
281 };
282 let pos = match entry.holders.iter().position(|h| h.txn == txn) {
283 Some(pos) => pos,
284 None => return Err(LockError::NotHeld),
285 };
286
287 let _ = entry.holders.swap_remove(pos);
288 if entry.holders.is_empty() {
289 let _ = locks.remove(&res);
290 }
291 Self::forget_resource(by_txn, txn, res);
292 Ok(())
293 }
294
295 /// Releases every lock held by `txn` across the whole table.
296 ///
297 /// This is the call a transaction layer makes at commit or abort to drop a
298 /// transaction's entire lock set at once. It returns the number of locks
299 /// released, and is proportional to that number rather than to the size of
300 /// the table.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
306 ///
307 /// let lm = LockManager::new();
308 /// let t = TxnId::new(1);
309 /// for id in 0..5 {
310 /// lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive).unwrap();
311 /// }
312 ///
313 /// assert_eq!(lm.release_all(t), 5);
314 /// assert_eq!(lm.release_all(t), 0); // idempotent once empty
315 /// ```
316 pub fn release_all(&self, txn: TxnId) -> usize {
317 let mut released = 0;
318 for shard in self.shards.iter() {
319 let mut guard = Self::lock(shard);
320 let ShardInner { locks, by_txn } = &mut *guard;
321 let Some(resources) = by_txn.remove(&txn) else {
322 continue;
323 };
324 for res in resources {
325 if let Some(entry) = locks.get_mut(&res) {
326 if let Some(pos) = entry.holders.iter().position(|h| h.txn == txn) {
327 let _ = entry.holders.swap_remove(pos);
328 released += 1;
329 if entry.holders.is_empty() {
330 let _ = locks.remove(&res);
331 }
332 }
333 }
334 }
335 }
336 released
337 }
338
339 /// Returns the number of transactions currently holding `res`.
340 ///
341 /// Mostly useful for diagnostics and tests; in steady state this is `0`,
342 /// `1` for an exclusive lock, or the reader count for a shared lock.
343 ///
344 /// # Examples
345 ///
346 /// ```
347 /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
348 ///
349 /// let lm = LockManager::new();
350 /// let key = ResourceId::new(1);
351 /// assert_eq!(lm.holder_count(key), 0);
352 /// lm.try_acquire(TxnId::new(1), key, LockMode::Shared).unwrap();
353 /// assert_eq!(lm.holder_count(key), 1);
354 /// ```
355 #[must_use]
356 pub fn holder_count(&self, res: ResourceId) -> usize {
357 let guard = self.lock_shard(res);
358 guard.locks.get(&res).map_or(0, |e| e.holders.len())
359 }
360
361 /// Returns the mode in which `txn` holds `res`, or `None` if it holds no
362 /// lock on it.
363 ///
364 /// # Examples
365 ///
366 /// ```
367 /// use lock_db::{LockManager, LockMode, ResourceId, TxnId};
368 ///
369 /// let lm = LockManager::new();
370 /// let key = ResourceId::new(1);
371 /// let t = TxnId::new(1);
372 /// assert_eq!(lm.mode_held(t, key), None);
373 /// lm.try_acquire(t, key, LockMode::Shared).unwrap();
374 /// assert_eq!(lm.mode_held(t, key), Some(LockMode::Shared));
375 /// ```
376 #[must_use]
377 pub fn mode_held(&self, txn: TxnId, res: ResourceId) -> Option<LockMode> {
378 let guard = self.lock_shard(res);
379 guard
380 .locks
381 .get(&res)
382 .and_then(|e| e.holders.iter().find(|h| h.txn == txn))
383 .map(|h| h.mode)
384 }
385
386 /// Drops `res` from a transaction's reverse-index entry, removing the entry
387 /// entirely once the transaction holds nothing else in the shard.
388 #[inline]
389 fn forget_resource(by_txn: &mut HashMap<TxnId, Vec<ResourceId>>, txn: TxnId, res: ResourceId) {
390 if let Some(resources) = by_txn.get_mut(&txn) {
391 if let Some(pos) = resources.iter().position(|r| *r == res) {
392 let _ = resources.swap_remove(pos);
393 }
394 if resources.is_empty() {
395 let _ = by_txn.remove(&txn);
396 }
397 }
398 }
399
400 /// Locks and returns the shard that owns `res`.
401 #[inline]
402 fn lock_shard(&self, res: ResourceId) -> MutexGuard<'_, ShardInner> {
403 Self::lock(&self.shards[self.shard_index(res)])
404 }
405
406 /// Locks a shard, recovering its guard if the mutex was poisoned.
407 ///
408 /// Critical sections in this module perform only infallible map and vector
409 /// operations and never panic, so poisoning cannot leave inconsistent
410 /// state. Recovering the guard keeps the lock manager available rather than
411 /// propagating a poison error that no caller could act on.
412 #[inline]
413 fn lock(shard: &Shard) -> MutexGuard<'_, ShardInner> {
414 match shard.inner.lock() {
415 Ok(guard) => guard,
416 Err(poisoned) => poisoned.into_inner(),
417 }
418 }
419
420 /// Maps a resource id to a shard index via Fibonacci hashing.
421 #[inline]
422 fn shard_index(&self, res: ResourceId) -> usize {
423 if self.bits == 0 {
424 return 0;
425 }
426 let hash = res.get().wrapping_mul(FIB_HASH);
427 // Take the top `bits` bits: the most-mixed end of a multiplicative hash.
428 (hash >> (u64::BITS - self.bits)) as usize
429 }
430}
431
432impl Default for LockManager {
433 fn default() -> Self {
434 Self::new()
435 }
436}
437
438#[cfg(all(test, not(loom)))]
439#[allow(clippy::unwrap_used)]
440mod tests {
441 use super::{FIB_HASH, LockManager};
442 use crate::{LockError, LockMode, ResourceId, TxnId};
443
444 fn ids(t: u64, r: u64) -> (TxnId, ResourceId) {
445 (TxnId::new(t), ResourceId::new(r))
446 }
447
448 #[test]
449 fn test_shared_locks_coexist() {
450 let lm = LockManager::new();
451 let r = ResourceId::new(1);
452 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
453 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
454 lm.try_acquire(TxnId::new(3), r, LockMode::Shared).unwrap();
455 assert_eq!(lm.holder_count(r), 3);
456 }
457
458 #[test]
459 fn test_exclusive_excludes_shared() {
460 let lm = LockManager::new();
461 let (t1, r) = ids(1, 1);
462 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
463 assert_eq!(
464 lm.try_acquire(TxnId::new(2), r, LockMode::Shared),
465 Err(LockError::Conflict)
466 );
467 }
468
469 #[test]
470 fn test_exclusive_excludes_exclusive() {
471 let lm = LockManager::new();
472 let (t1, r) = ids(1, 1);
473 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
474 assert_eq!(
475 lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
476 Err(LockError::Conflict)
477 );
478 }
479
480 #[test]
481 fn test_shared_blocks_other_exclusive() {
482 let lm = LockManager::new();
483 let (t1, r) = ids(1, 1);
484 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
485 assert_eq!(
486 lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive),
487 Err(LockError::Conflict)
488 );
489 }
490
491 #[test]
492 fn test_reacquire_same_mode_is_idempotent() {
493 let lm = LockManager::new();
494 let (t1, r) = ids(1, 1);
495 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
496 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
497 assert_eq!(lm.holder_count(r), 1);
498 }
499
500 #[test]
501 fn test_request_weaker_than_held_is_noop() {
502 let lm = LockManager::new();
503 let (t1, r) = ids(1, 1);
504 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
505 // Asking for shared while holding exclusive keeps the stronger mode.
506 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
507 assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
508 assert_eq!(lm.holder_count(r), 1);
509 }
510
511 #[test]
512 fn test_upgrade_sole_holder_succeeds() {
513 let lm = LockManager::new();
514 let (t1, r) = ids(1, 1);
515 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
516 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
517 assert_eq!(lm.mode_held(t1, r), Some(LockMode::Exclusive));
518 assert_eq!(lm.holder_count(r), 1);
519 }
520
521 #[test]
522 fn test_upgrade_blocked_by_other_reader() {
523 let lm = LockManager::new();
524 let r = ResourceId::new(1);
525 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
526 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
527 assert_eq!(
528 lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive),
529 Err(LockError::Conflict)
530 );
531 // The failed upgrade left the original shared lock intact.
532 assert_eq!(lm.mode_held(TxnId::new(1), r), Some(LockMode::Shared));
533 }
534
535 #[test]
536 fn test_release_frees_resource_for_exclusive() {
537 let lm = LockManager::new();
538 let r = ResourceId::new(1);
539 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
540 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
541 lm.release(TxnId::new(1), r).unwrap();
542 // One reader remains, exclusive still blocked.
543 assert!(
544 lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
545 .is_err()
546 );
547 lm.release(TxnId::new(2), r).unwrap();
548 lm.try_acquire(TxnId::new(3), r, LockMode::Exclusive)
549 .unwrap();
550 }
551
552 #[test]
553 fn test_release_not_held_errors() {
554 let lm = LockManager::new();
555 let (t1, r) = ids(1, 1);
556 assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
557 lm.try_acquire(t1, r, LockMode::Shared).unwrap();
558 assert_eq!(lm.release(TxnId::new(9), r), Err(LockError::NotHeld));
559 }
560
561 #[test]
562 fn test_double_release_errors() {
563 let lm = LockManager::new();
564 let (t1, r) = ids(1, 1);
565 lm.try_acquire(t1, r, LockMode::Exclusive).unwrap();
566 lm.release(t1, r).unwrap();
567 assert_eq!(lm.release(t1, r), Err(LockError::NotHeld));
568 }
569
570 #[test]
571 fn test_release_all_drops_every_lock() {
572 let lm = LockManager::with_shards(8);
573 let t = TxnId::new(1);
574 for id in 0..50 {
575 lm.try_acquire(t, ResourceId::new(id), LockMode::Exclusive)
576 .unwrap();
577 }
578 assert_eq!(lm.release_all(t), 50);
579 for id in 0..50 {
580 assert_eq!(lm.holder_count(ResourceId::new(id)), 0);
581 }
582 assert_eq!(lm.release_all(t), 0);
583 }
584
585 #[test]
586 fn test_release_all_leaves_other_txns_alone() {
587 let lm = LockManager::new();
588 let r = ResourceId::new(1);
589 lm.try_acquire(TxnId::new(1), r, LockMode::Shared).unwrap();
590 lm.try_acquire(TxnId::new(2), r, LockMode::Shared).unwrap();
591 assert_eq!(lm.release_all(TxnId::new(1)), 1);
592 assert_eq!(lm.mode_held(TxnId::new(2), r), Some(LockMode::Shared));
593 assert_eq!(lm.holder_count(r), 1);
594 }
595
596 #[test]
597 fn test_resource_fully_released_can_be_taken_exclusively() {
598 let lm = LockManager::new();
599 let r = ResourceId::new(42);
600 lm.try_acquire(TxnId::new(1), r, LockMode::Exclusive)
601 .unwrap();
602 lm.release(TxnId::new(1), r).unwrap();
603 assert_eq!(lm.holder_count(r), 0);
604 lm.try_acquire(TxnId::new(2), r, LockMode::Exclusive)
605 .unwrap();
606 }
607
608 #[test]
609 fn test_with_shards_rounds_up_to_power_of_two() {
610 assert_eq!(LockManager::with_shards(1).shards(), 1);
611 assert_eq!(LockManager::with_shards(3).shards(), 4);
612 assert_eq!(LockManager::with_shards(5).shards(), 8);
613 assert_eq!(LockManager::with_shards(0).shards(), 1);
614 assert_eq!(LockManager::with_shards(64).shards(), 64);
615 }
616
617 #[test]
618 fn test_single_shard_routes_everything_to_index_zero() {
619 let lm = LockManager::with_shards(1);
620 for id in 0..1000 {
621 assert_eq!(lm.shard_index(ResourceId::new(id)), 0);
622 }
623 }
624
625 #[test]
626 fn test_shard_index_within_bounds() {
627 let lm = LockManager::with_shards(16);
628 for id in 0..10_000 {
629 assert!(lm.shard_index(ResourceId::new(id)) < 16);
630 }
631 }
632
633 #[test]
634 fn test_sequential_ids_spread_across_shards() {
635 let lm = LockManager::with_shards(16);
636 let mut seen = [false; 16];
637 for id in 0..256 {
638 seen[lm.shard_index(ResourceId::new(id))] = true;
639 }
640 // Fibonacci hashing should touch every shard well before 256 ids.
641 assert!(seen.iter().all(|&hit| hit));
642 }
643
644 #[test]
645 fn test_locks_in_different_shards_are_independent() {
646 // Two resources that hash to different shards do not interfere.
647 let lm = LockManager::with_shards(16);
648 let a = ResourceId::new(1);
649 let b = ResourceId::new(2);
650 lm.try_acquire(TxnId::new(1), a, LockMode::Exclusive)
651 .unwrap();
652 lm.try_acquire(TxnId::new(2), b, LockMode::Exclusive)
653 .unwrap();
654 assert_eq!(lm.holder_count(a), 1);
655 assert_eq!(lm.holder_count(b), 1);
656 }
657
658 #[test]
659 fn test_fib_hash_constant_is_odd() {
660 // A multiplicative-hash multiplier must be odd to be a bijection mod 2^64.
661 assert_eq!(FIB_HASH & 1, 1);
662 }
663
664 #[test]
665 fn test_concurrent_shared_acquire_release_is_consistent() {
666 use std::sync::Arc;
667 use std::thread;
668
669 let lm = Arc::new(LockManager::new());
670 let r = ResourceId::new(7);
671 let mut handles = Vec::new();
672 for t in 0..8u64 {
673 let lm = Arc::clone(&lm);
674 handles.push(thread::spawn(move || {
675 let txn = TxnId::new(t);
676 for _ in 0..1000 {
677 lm.try_acquire(txn, r, LockMode::Shared).unwrap();
678 lm.release(txn, r).unwrap();
679 }
680 }));
681 }
682 for h in handles {
683 h.join().unwrap();
684 }
685 // Every acquire was paired with a release; the resource is free.
686 assert_eq!(lm.holder_count(r), 0);
687 }
688
689 #[test]
690 fn test_concurrent_exclusive_is_mutually_exclusive() {
691 use std::sync::Arc;
692 use std::sync::atomic::{AtomicUsize, Ordering};
693 use std::thread;
694
695 let lm = Arc::new(LockManager::new());
696 let active = Arc::new(AtomicUsize::new(0));
697 let r = ResourceId::new(11);
698 let mut handles = Vec::new();
699 for t in 0..8u64 {
700 let lm = Arc::clone(&lm);
701 let active = Arc::clone(&active);
702 handles.push(thread::spawn(move || {
703 let txn = TxnId::new(t);
704 for _ in 0..2000 {
705 if lm.try_acquire(txn, r, LockMode::Exclusive).is_ok() {
706 // While we hold X, no one else may be inside this region.
707 let inside = active.fetch_add(1, Ordering::SeqCst);
708 assert_eq!(inside, 0);
709 active.fetch_sub(1, Ordering::SeqCst);
710 lm.release(txn, r).unwrap();
711 }
712 }
713 }));
714 }
715 for h in handles {
716 h.join().unwrap();
717 }
718 assert_eq!(lm.holder_count(r), 0);
719 }
720}