noxu_txn/lock_manager.rs
1//! Lock manager for Noxu DB.
2//!
3//!
4//! The LockManager is the central authority for all lock operations in the
5//! system. It manages N sharded lock tables, each protected by its own mutex,
6//! to allow concurrent lock operations on different LSNs.
7//!
8//! # Internal lock ordering (H-2, 2026 audit F-6.2)
9//!
10//! Two internal mutexes must never be held simultaneously, but when code
11//! paths need to update BOTH in sequence the canonical order is:
12//!
13//! **shard mutex first, then waiter_graph mutex**.
14//!
15//! Concretely:
16//! - Lock the relevant `lock_tables[idx]` shard first.
17//! - Release the shard before (or immediately before) acquiring `waiter_graph`.
18//! - Never acquire a shard while holding `waiter_graph`.
19//!
20//! All victim-cleanup paths (flush_waiter + clear_wait) are structured to
21//! acquire the shard first, then call `clear_wait()` after the shard guard
22//! is dropped. This prevents a lock-ordering inversion that would otherwise
23//! create a potential process hang under extreme contention.
24
25use hashbrown::{HashMap, HashSet};
26use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use noxu_sync::{Condvar, Mutex};
30
31use crate::lock_info::WaiterNotify;
32use crate::{
33 DeadlockDetector, Lock, LockGrantType, LockStats, LockType, TxnError,
34};
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Number of lock table shards.
38///
39/// Multiple lock tables reduce contention by allowing concurrent operations
40/// on locks in different tables. 64 shards provide good distribution across
41/// multi-core systems under high concurrency (16+ threads). The hash
42/// function spreads LSNs uniformly so collision probability is low.
43/// Default number of lock-table shards when not explicitly configured.
44/// Noxu deliberately defaults to 64 (a documented deviation from JE's default
45/// of 1) for higher write concurrency; the count is configurable via
46/// `LOCK_N_LOCK_TABLES` (`noxu.lock.nLockTables`).
47const DEFAULT_N_LOCK_TABLES: usize = 64;
48
49/// The LockManager manages all locks in the system.
50///
51/// Locks are sharded across N_LOCK_TABLES tables, each protected by its own
52/// mutex. This allows concurrent lock operations on different LSNs.
53///
54/// # Architecture
55///
56/// - Each lock is identified by an LSN (packed u64)
57/// - Locks are hashed to one of N lock tables
58/// - Each table has its own mutex for fine-grained locking
59/// - Lock objects start as Thin locks and mutate to Full locks when needed
60///
61/// # Blocking / waiting
62///
63/// When `lock()` cannot grant immediately it:
64/// 1. Registers the calling thread as a waiter (inside the shard mutex) and
65/// attaches a per-waiter `Arc<(Mutex<bool>, Condvar)>` notify pair.
66/// 2. Checks for deadlocks using the `DeadlockDetector` before sleeping.
67/// 3. Releases the shard mutex and waits on the condvar for up to
68/// `lock_timeout_ms` milliseconds.
69/// 4. On wakeup re-acquires the shard mutex and checks ownership.
70/// 5. On timeout removes itself from the waiter list and returns
71/// `TxnError::LockTimeout`.
72///
73/// This mirrors the flow in `LockManager.lock()` / `waitForLock()`.
74///
75///
76pub struct LockManager {
77 /// Sharded lock tables, keyed by LSN.
78 lock_tables: Vec<Mutex<HashMap<u64, Lock>>>,
79 /// Number of shards (== lock_tables.len()); configurable via
80 /// `LOCK_N_LOCK_TABLES`. Cached so get_table_index avoids a Vec len read.
81 n_lock_tables: usize,
82
83 /// Statistics tracking.
84 stats: LockManagerStats,
85
86 /// Default lock-wait timeout in milliseconds.
87 ///
88 /// 0 means wait forever (`EnvironmentConfig.setLockTimeout(0)`).
89 /// Configured at open time from `EnvironmentConfig`; can be overridden
90 /// per-call via `lock_with_timeout()`.
91 ///
92 ///
93 lock_timeout_ms: AtomicU64,
94
95 /// Locker sharing registry: maps locker_id → share_group_id.
96 ///
97 /// ThreadLockers register their thread_id (as i64) as the group_id.
98 /// HandleLockers with a buddy register the buddy's ID as the group_id.
99 /// Two lockers are in the same sharing group iff they map to the same
100 /// group_id, and thus bypass lock-conflict detection (
101 /// `Locker.sharesLocksWith(other)`).
102 ///
103 /// (thread-locker map), extended
104 /// to support HandleLocker buddy sharing.
105 share_registry: RwLock<HashMap<i64, i64>>,
106
107 /// Incremental waits-for graph for O(1) deadlock detection.
108 ///
109 /// Maps waiting_locker_id → [owner_locker_ids it is blocked by].
110 /// Inserted O(1) when a locker enters the wait path; removed when it
111 /// exits (grant, timeout, or deadlock abort).
112 ///
113 /// `check_deadlock_for_waiter` reads from this small graph instead of
114 /// rescanning all N_LOCK_TABLES shards — eliminates the O(64) full scan
115 /// that stalls all threads under high contention.
116 waiter_graph: Mutex<HashMap<i64, Vec<i64>>>,
117
118 /// Diagnostic label registry: maps locker_id → static label such as
119 /// `"txn"`, `"auto-txn"`, or `"cleaner"`.
120 ///
121 /// Used by [`LockManager::format_locker`] to render a typed identifier
122 /// like `"auto-txn:42"` in deadlock and timeout error messages so a
123 /// deadlock involving a synthetic auto-commit txn and an explicit txn is
124 /// visibly distinguishable from one involving two explicit txns.
125 ///
126 /// Closes the second F12 residual (May 2026 audit follow-up). Lockers
127 /// without a registered label are reported as `"locker:<id>"`.
128 locker_labels: RwLock<HashMap<i64, &'static str>>,
129
130 /// Set of locker IDs that are NOT preemptable (importunate lockers must
131 /// not have their locks stolen). JE tracks this per-`Locker` via
132 /// `getPreemptable()`; the lock manager needs it to decide, during an
133 /// importunate steal, whether a remaining owner blocks the steal
134 /// (LockManager.java:556 — "Lock holder is non-preemptable, wait again").
135 /// A locker absent from this set is preemptable (the default).
136 non_preemptable: RwLock<HashSet<i64>>,
137}
138
139/// Internal statistics tracking.
140struct LockManagerStats {
141 /// Total number of lock requests.
142 lock_requests: AtomicU64,
143
144 /// Total number of lock waits (blocked requests).
145 lock_waits: AtomicU64,
146
147 /// Total number of lock acquisitions that timed out.
148 lock_timeouts: AtomicU64,
149}
150
151impl LockManager {
152 /// Creates a new LockManager with N_LOCK_TABLES shards and the default
153 /// lock timeout of 500 ms (matching default).
154 pub fn new() -> Self {
155 Self::with_lock_timeout(500)
156 }
157
158 /// Creates a new LockManager with a specific default lock timeout.
159 ///
160 /// `timeout_ms == 0` means wait forever (`setLockTimeout(0, MILLISECONDS)`).
161 ///
162 /// Call this from `EnvironmentImpl` after reading `EnvironmentConfig.lock_timeout_ms`.
163 pub fn with_lock_timeout(timeout_ms: u64) -> Self {
164 Self::with_config(timeout_ms, DEFAULT_N_LOCK_TABLES)
165 }
166
167 /// Creates a new LockManager with an explicit shard count (JE
168 /// `LOCK_N_LOCK_TABLES` / `noxu.lock.nLockTables`) and lock timeout.
169 /// Production (`EnvironmentImpl`) passes the configured value; `0` or
170 /// values are clamped to at least 1 shard.
171 pub fn with_config(timeout_ms: u64, n_lock_tables: usize) -> Self {
172 let n_lock_tables = n_lock_tables.max(1);
173 let mut lock_tables = Vec::with_capacity(n_lock_tables);
174 for _ in 0..n_lock_tables {
175 lock_tables.push(Mutex::new(HashMap::new()));
176 }
177
178 Self {
179 lock_tables,
180 n_lock_tables,
181 stats: LockManagerStats {
182 lock_requests: AtomicU64::new(0),
183 lock_waits: AtomicU64::new(0),
184 lock_timeouts: AtomicU64::new(0),
185 },
186 lock_timeout_ms: AtomicU64::new(timeout_ms),
187 share_registry: RwLock::new(HashMap::new()),
188 waiter_graph: Mutex::new(HashMap::new()),
189 locker_labels: RwLock::new(HashMap::new()),
190 non_preemptable: RwLock::new(HashSet::new()),
191 }
192 }
193
194 /// Registers a diagnostic label for `locker_id`.
195 ///
196 /// Stored in `Self::locker_labels` and looked up by
197 /// [`Self::format_locker`] when building deadlock / lock-timeout error
198 /// messages. Typical labels are `"txn"` (explicit transaction),
199 /// `"auto-txn"` (synthetic auto-commit transaction created by
200 /// `TxnManager::begin_auto_txn`), and `"cleaner"` (cleaner-locker IDs).
201 ///
202 /// Re-registering the same `locker_id` overwrites the previous label.
203 /// Lockers without a registered label are reported as `"locker:<id>"`,
204 /// which preserves backward compatibility with callers that never
205 /// registered.
206 pub fn register_locker_label(&self, locker_id: i64, label: &'static str) {
207 self.locker_labels.write().unwrap().insert(locker_id, label);
208 }
209
210 /// Removes the diagnostic label for `locker_id`.
211 ///
212 /// Called when a transaction (explicit or synthetic auto-commit)
213 /// terminates so the registry does not grow without bound. Idempotent —
214 /// removing an unknown id is a no-op.
215 pub fn unregister_locker_label(&self, locker_id: i64) {
216 self.locker_labels.write().unwrap().remove(&locker_id);
217 }
218
219 /// Returns a typed identifier string for `locker_id`.
220 ///
221 /// Looks up the label registered via [`Self::register_locker_label`] and
222 /// returns `"<label>:<id>"`; if no label is registered, returns
223 /// `"locker:<id>"`.
224 ///
225 /// Used to format the `requester` and `owner` fields of
226 /// [`TxnError::LockTimeout`] and the message body of
227 /// [`TxnError::Deadlock`] so a mixed auto-commit/explicit-txn deadlock
228 /// reports e.g. `"auto-txn:42"` and `"txn:17"` rather than two opaque
229 /// integers — closing the second F12 residual.
230 pub fn format_locker(&self, locker_id: i64) -> String {
231 match self.locker_labels.read().unwrap().get(&locker_id).copied() {
232 Some(label) => format!("{label}:{locker_id}"),
233 None => format!("locker:{locker_id}"),
234 }
235 }
236
237 /// Returns a comma-separated typed identifier list for `locker_ids`.
238 ///
239 /// Convenience wrapper used in deadlock error messages.
240 pub fn format_lockers(&self, locker_ids: &[i64]) -> String {
241 let mut out = String::new();
242 for (i, id) in locker_ids.iter().enumerate() {
243 if i > 0 {
244 out.push_str(", ");
245 }
246 out.push_str(&self.format_locker(*id));
247 }
248 out
249 }
250
251 /// Updates the default lock timeout.
252 ///
253 /// Thread-safe; takes effect for subsequent `lock()` calls.
254 ///
255 pub fn set_lock_timeout(&self, timeout_ms: u64) {
256 self.lock_timeout_ms.store(timeout_ms, Ordering::Relaxed);
257 }
258
259 /// Returns the current default lock timeout in milliseconds.
260 pub fn get_lock_timeout_ms(&self) -> u64 {
261 self.lock_timeout_ms.load(Ordering::Relaxed)
262 }
263
264 /// Acquires a lock on the given LSN for the given locker, blocking the
265 /// calling thread if necessary.
266 ///
267 /// # Arguments
268 ///
269 /// * `lsn` - The LSN to lock (packed u64)
270 /// * `locker_id` - The ID of the requesting locker
271 /// * `lock_type` - The type of lock requested
272 /// * `non_blocking` - If true, return `LockNotAvailable` instead of waiting
273 /// * `jump_ahead_of_waiters` - If true, skip ahead of existing waiters
274 /// * `lock_timeout_ms` - How long to wait; 0 = wait forever
275 ///
276 /// # Returns
277 ///
278 /// The `LockGrantType` on success:
279 /// - `New` / `Promotion` / `Existing` — lock held immediately
280 /// - `NoneNeeded` — `lock_type` was `None`
281 ///
282 /// # Errors
283 ///
284 /// - `TxnError::RangeRestart` if `lock_type` is `Restart`
285 /// - `TxnError::LockNotAvailable` if `non_blocking` and lock unavailable
286 /// - `TxnError::LockTimeout` if the timeout expired while waiting
287 /// - `TxnError::Deadlock` if a wait-for cycle is detected before waiting
288 ///
289 ///
290 #[inline]
291 pub fn lock(
292 &self,
293 lsn: u64,
294 locker_id: i64,
295 lock_type: LockType,
296 non_blocking: bool,
297 jump_ahead_of_waiters: bool,
298 ) -> Result<LockGrantType, TxnError> {
299 self.lock_with_timeout(
300 lsn,
301 locker_id,
302 lock_type,
303 non_blocking,
304 jump_ahead_of_waiters,
305 self.lock_timeout_ms.load(Ordering::Relaxed),
306 )
307 }
308
309 /// Like `lock()` but the caller supplies the timeout in milliseconds.
310 /// `timeout_ms == 0` means wait forever.
311 ///
312 ///
313 pub fn lock_with_timeout(
314 &self,
315 lsn: u64,
316 locker_id: i64,
317 lock_type: LockType,
318 non_blocking: bool,
319 jump_ahead_of_waiters: bool,
320 timeout_ms: u64,
321 ) -> Result<LockGrantType, TxnError> {
322 // No lock needed for dirty-read, return immediately.
323 if lock_type == LockType::None {
324 return Ok(LockGrantType::NoneNeeded);
325 }
326
327 // Special restart lock type throws immediately.
328 if lock_type == LockType::Restart {
329 return Err(TxnError::RangeRestart);
330 }
331
332 // Track statistics.
333 self.stats.lock_requests.fetch_add(1, Ordering::Relaxed);
334
335 let table_idx = self.get_table_index(lsn);
336
337 // Snapshot the sharing registry before entering the lock-table
338 // critical section so `LockImpl::try_lock_with_sharing` can consult
339 // `sharesLocksWith` on every acquisition. JE `LockImpl.tryLock`
340 // checks `!locker.sharesLocksWith(ownerLocker) &&
341 // !ownerLocker.sharesLocksWith(locker)` (LockImpl.java:647-648) on
342 // EVERY lock request, letting siblings (two ThreadLockers on the
343 // same thread, or a HandleLocker + its buddy txn) co-own a lock
344 // without conflict.
345 let shares = self.build_shares_fn(locker_id);
346
347 // --- Phase 1: attempt to acquire the lock under the shard mutex. ---
348 //
349 // "Attempt to lock without any initial wait."
350 let (initial_grant, owner_ids, notify_pair) = {
351 let mut table = self.lock_tables[table_idx].lock();
352 let lock = table.entry(lsn).or_insert_with(Lock::new_thin);
353
354 let result = lock.lock_with_sharing(
355 lock_type,
356 locker_id,
357 non_blocking,
358 jump_ahead_of_waiters,
359 &shares,
360 );
361
362 if result.success {
363 // Granted immediately; no waiting needed.
364 return Ok(result.lock_grant);
365 }
366
367 if result.lock_grant == LockGrantType::Denied {
368 // Non-blocking request was denied.
369 return Err(TxnError::LockNotAvailable { lsn });
370 }
371
372 // We must wait. Collect owner IDs for deadlock detection and
373 // attach a per-waiter notify pair to our waiter entry.
374 //
375 // "locker.setWaitingFor(lsn, type)" then deadlock detect then
376 // "locker.wait(timeout)".
377 self.stats.lock_waits.fetch_add(1, Ordering::Relaxed);
378
379 let owner_ids = lock.get_owner_ids();
380
381 // Build the notify pair and attach it to our waiter entry so the
382 // releasing thread can wake us.
383 let pair: WaiterNotify =
384 Arc::new((Mutex::new(false), Condvar::new()));
385 lock.set_waiter_notify(locker_id, pair.clone());
386
387 (result.lock_grant, owner_ids, pair)
388 };
389 // Shard mutex is released here.
390
391 // Register in the incremental waits-for graph so deadlock detection
392 // can find this edge without rescanning all lock-table shards.
393 self.record_wait(locker_id, &owner_ids);
394
395 // --- Phase 2: deadlock detection before sleeping. ---
396 //
397 // Runs DeadlockChecker after setWaitingFor. If the current
398 // locker is selected as the victim, throw DeadlockException.
399 //
400 // We build a lightweight waits-for snapshot from the current lock
401 // table state and check for a cycle. If this locker is the victim
402 // OR if the cycle cannot be broken without aborting this locker
403 // (i.e. the victim is not reachable / not waiting), we abort.
404 //
405 // Note: a single-pass snapshot may be incomplete when both threads
406 // are entering the wait path simultaneously. We therefore also
407 // perform a deadlock check after each spurious wakeup inside the
408 // wait loop (Phase 3).
409 if let Some(deadlock_err) = self
410 .check_deadlock_for_waiter(lsn, locker_id, lock_type, &owner_ids)
411 {
412 // We are the chosen victim. Flush from waiter list and throw.
413 // H-2: use flush_and_clear_waiter to acquire shard before
414 // waiter_graph (canonical lock ordering).
415 //
416 // TXN-F4 (design point, intentionally NOT changed): JE
417 // `LockManager.waitForLock` proactively wakes the victim via
418 // `notifyVictim` so it exits its own `wait()` promptly; here the
419 // selected victim returns synchronously the moment its own check
420 // detects the cycle, so no cross-thread victim notify is needed.
421 // This assumes every waiter on a cycle reaches the same
422 // victim-consistency conclusion on its next check slice (≤50 ms),
423 // which holds because all checks read the same `waiter_graph`
424 // snapshot and use the same deterministic victim selection.
425 self.flush_and_clear_waiter(table_idx, lsn, locker_id);
426 return Err(deadlock_err);
427 }
428
429 // --- Phase 3: wait on the condvar. ---
430 //
431 // "locker.wait(timeRemaining)" in a loop, checking ownership on
432 // each wakeup. We also re-run deadlock detection on each
433 // iteration so that cycles formed after we enter the wait path
434 // are caught.
435 let start = std::time::Instant::now();
436 let (mutex, condvar) = &*notify_pair;
437 let mut granted_guard = mutex.lock();
438
439 loop {
440 if *granted_guard {
441 // We were woken by the releasing thread which set our flag and
442 // called notify_all. Ownership was already transferred to us
443 // inside release() -> try_lock().
444 break;
445 }
446
447 // Compute remaining time.
448 let remaining_ms = if timeout_ms == 0 {
449 0 // 0 means wait forever
450 } else {
451 let elapsed = start.elapsed().as_millis() as u64;
452 if elapsed >= timeout_ms {
453 // Already timed out before we even slept this iteration.
454 drop(granted_guard);
455 // H-2: shard before waiter_graph.
456 self.flush_and_clear_waiter(table_idx, lsn, locker_id);
457 self.stats.lock_timeouts.fetch_add(1, Ordering::Relaxed);
458 return Err(TxnError::LockTimeout {
459 timeout_ms,
460 lsn,
461 owner: format!(
462 "[{}] on LSN {lsn}",
463 self.format_lockers(&owner_ids)
464 ),
465 requested_type: lock_type,
466 requester: self.format_locker(locker_id),
467 });
468 }
469 timeout_ms - elapsed
470 };
471
472 // Use a short slice (up to 50 ms) so we can re-check for
473 // deadlocks that may form after we entered the wait path.
474 // uses a "deadlock detection delay" for the same purpose.
475 let slice_ms =
476 if remaining_ms == 0 { 50 } else { remaining_ms.min(50) };
477
478 let timed_out = condvar
479 .wait_for(&mut granted_guard, Duration::from_millis(slice_ms))
480 .timed_out();
481
482 if *granted_guard {
483 // Granted while we were sleeping.
484 break;
485 }
486
487 // Re-run deadlock detection after each wakeup / slice expiry.
488 // This catches deadlocks that formed after our initial check.
489 drop(granted_guard);
490 {
491 let cur_owner_ids = {
492 let table = self.lock_tables[table_idx].lock();
493 table
494 .get(&lsn)
495 .map(|l| l.get_owner_ids())
496 .unwrap_or_default()
497 };
498 if let Some(deadlock_err) = self.check_deadlock_for_waiter(
499 lsn,
500 locker_id,
501 lock_type,
502 &cur_owner_ids,
503 ) {
504 // H-2: shard before waiter_graph.
505 self.flush_and_clear_waiter(table_idx, lsn, locker_id);
506 return Err(deadlock_err);
507 }
508 }
509 granted_guard = mutex.lock();
510
511 if *granted_guard {
512 break;
513 }
514
515 if timed_out {
516 // Check if total time is exceeded.
517 if timeout_ms > 0
518 && start.elapsed().as_millis() as u64 >= timeout_ms
519 {
520 drop(granted_guard);
521 // H-2: shard before waiter_graph.
522 self.flush_and_clear_waiter(table_idx, lsn, locker_id);
523 self.stats.lock_timeouts.fetch_add(1, Ordering::Relaxed);
524 return Err(TxnError::LockTimeout {
525 timeout_ms,
526 lsn,
527 owner: format!(
528 "[{}] on LSN {lsn}",
529 self.format_lockers(&owner_ids)
530 ),
531 requested_type: lock_type,
532 requester: self.format_locker(locker_id),
533 });
534 }
535 }
536
537 // Spurious wakeup or slice expired without timeout; loop.
538 }
539
540 drop(granted_guard);
541 self.clear_wait(locker_id);
542
543 // Determine which grant type to report. On wakeup the lock type we
544 // actually hold is exactly what we requested (or a promotion of it).
545 // Reconstruct the grant type from context.
546 //
547 // WaitRestart: the waiter's lock_type was changed to Restart in
548 // lock_impl::lock(), so the lock was never added to the owner set.
549 // Returning RangeRestart tells the caller (lock_ln / put) to abort
550 // the current scan and restart — mirroring JE's RangeRestartException.
551 let grant = match initial_grant {
552 LockGrantType::WaitNew => LockGrantType::New,
553 LockGrantType::WaitPromotion => LockGrantType::Promotion,
554 LockGrantType::WaitRestart => {
555 return Err(TxnError::RangeRestart);
556 }
557 other => other,
558 };
559
560 Ok(grant)
561 }
562
563 /// Importunate lock acquisition for HA replay (`ReplayTxn`).
564 ///
565 /// JE `LockManager.waitForLock`: an importunate locker that would block
566 /// steals the lock from preemptable owners instead of waiting
567 /// (`if (isImportunate) { result = stealLock(...); if (result.success)
568 /// break; continue; }`, LockManager.java:552-557), and deadlock
569 /// detection is skipped for it (LockManager.java:472).
570 ///
571 /// Strategy: attempt non-blocking; if granted, done. Otherwise steal
572 /// from preemptable owners and re-attempt (`stealLockInternal`,
573 /// LockManager.java:1599). If the steal grants the lock, done. If a
574 /// remaining owner is non-preemptable the steal fails, so fall back to a
575 /// normal blocking wait (mirroring JE's `continue` — wait for the
576 /// non-preemptable holder to release, then retry).
577 pub fn lock_importunate_with_timeout(
578 &self,
579 lsn: u64,
580 locker_id: i64,
581 lock_type: LockType,
582 non_blocking: bool,
583 timeout_ms: u64,
584 ) -> Result<LockGrantType, TxnError> {
585 if lock_type == LockType::None {
586 return Ok(LockGrantType::NoneNeeded);
587 }
588 if lock_type == LockType::Restart {
589 return Err(TxnError::RangeRestart);
590 }
591
592 // Try a normal non-blocking attempt first (jumpAheadOfWaiters=false,
593 // as JE Locker.lock always passes — Locker.java:503).
594 match self.lock(lsn, locker_id, lock_type, true, false) {
595 Ok(grant) => return Ok(grant),
596 Err(TxnError::LockNotAvailable { .. }) => {
597 // Blocked: steal from preemptable owners and re-attempt.
598 if self.steal_lock_in_wait(lsn, locker_id, lock_type) {
599 return Ok(LockGrantType::New);
600 }
601 // A remaining owner is non-preemptable; fall back to a normal
602 // blocking wait (JE `continue`).
603 }
604 Err(e) => return Err(e),
605 }
606
607 if non_blocking {
608 return Err(TxnError::LockNotAvailable { lsn });
609 }
610 self.lock_with_timeout(
611 lsn, locker_id, lock_type, false, false, timeout_ms,
612 )
613 }
614
615 /// Releases a lock on the given LSN for the given locker.
616 ///
617 /// Promotes compatible waiters to owners, signals their condvars so they
618 /// wake up, and removes the lock entry when it becomes empty.
619 ///
620 ///
621 pub fn release(&self, lsn: u64, locker_id: i64) -> Result<(), TxnError> {
622 let table_idx = self.get_table_index(lsn);
623 // Snapshot this locker's share group so the RANGE_INSERT restart-wake
624 // check in LockImpl::release_with_sharing can honor sharesLocksWith
625 // (JE rangeInsertConflict) without holding the registry across the
626 // table borrow.
627 let shares_fn = self.build_shares_fn(locker_id);
628 let mut table = self.lock_tables[table_idx].lock();
629
630 if let Some(lock) = table.get_mut(&lsn) {
631 // release() moves eligible waiters to owners and signals each
632 // granted waiter's condvar inside LockImpl::release().
633 let _notify_ids = lock.release_with_sharing(locker_id, &shares_fn);
634
635 // If the lock has no owners and no waiters, remove it from the
636 // table to free memory.
637 if lock.n_owners() == 0 && lock.n_waiters() == 0 {
638 table.remove(&lsn);
639 }
640 }
641
642 Ok(())
643 }
644
645 /// Releases every lock currently held by `locker_id`, across all
646 /// shards. Returns the number of (lsn, lock) entries the locker
647 /// actually owned and released.
648 ///
649 /// Equivalent to a manual `for lsn in lockers_locks(id): release(lsn, id)`,
650 /// but does not require the caller to track which LSNs the locker
651 /// has touched. The cleaner uses this in three situations:
652 ///
653 /// - **Reaping abandoned cleaner-locker IDs.** `migrate_ln_slot`
654 /// allocates a fresh locker id per migration attempt
655 /// (`next_cleaner_locker_id`), takes a non-blocking read
656 /// lock, and releases. If `release()` fails for any reason
657 /// the entry would otherwise leak, since the locker id is
658 /// never reused. The cleaner can call this method when its
659 /// run terminates to sweep up anything its short-lived
660 /// locker ids left behind.
661 /// - **Catastrophic per-locker abort.** When a deadlock-detector
662 /// victim is too far along to drain its own per-txn write_locks
663 /// map (e.g. it is in the middle of `commit_inner_after_read_drain`
664 /// and the panic handler needs to clean up), this method
665 /// guarantees the lock-manager view drops the locker even if
666 /// the per-txn view is corrupt.
667 /// - **Test cleanup.** Many integration tests hold a `LockManager`
668 /// across multiple txns and need a quick "drop everything for
669 /// locker N" without re-creating the manager.
670 ///
671 /// Errors from individual `Lock::release` calls are logged and
672 /// the sweep continues; the count returned is the number of
673 /// release attempts (each removing the locker from one Lock),
674 /// not the number that succeeded — losing one lock release leaks
675 /// one entry, but losing the whole sweep would defeat the
676 /// purpose.
677 pub fn release_all_for_locker(&self, locker_id: i64) -> usize {
678 let mut released = 0usize;
679 // Snapshot this locker's share group for the RANGE_INSERT restart-wake
680 // check (JE rangeInsertConflict / sharesLocksWith).
681 let shares_fn = self.build_shares_fn(locker_id);
682 for table in &self.lock_tables {
683 let mut table = table.lock();
684 // Collect target LSNs first to avoid mutating the map
685 // while iterating it.
686 let target_lsns: Vec<u64> = table
687 .iter()
688 .filter_map(|(lsn, lock)| {
689 if lock.get_owned_lock_type(locker_id).is_some() {
690 Some(*lsn)
691 } else {
692 None
693 }
694 })
695 .collect();
696 for lsn in target_lsns {
697 if let Some(lock) = table.get_mut(&lsn) {
698 let _notify_ids =
699 lock.release_with_sharing(locker_id, &shares_fn);
700 released += 1;
701 if lock.n_owners() == 0 && lock.n_waiters() == 0 {
702 table.remove(&lsn);
703 }
704 }
705 }
706 }
707 released
708 }
709
710 /// Downgrades a write lock to a read lock.
711 ///
712 ///
713 pub fn demote(&self, lsn: u64, locker_id: i64) -> Result<(), TxnError> {
714 let table_idx = self.get_table_index(lsn);
715 let mut table = self.lock_tables[table_idx].lock();
716
717 if let Some(lock) = table.get_mut(&lsn) {
718 lock.demote(locker_id);
719 }
720
721 Ok(())
722 }
723
724 /// Steals a lock for the given locker.
725 ///
726 /// Used by the HA replayer to forcibly acquire locks, removing all other
727 /// preemptable owners.
728 ///
729 ///
730 pub fn steal_lock(&self, lsn: u64, locker_id: i64) -> Result<(), TxnError> {
731 let table_idx = self.get_table_index(lsn);
732 let mut table = self.lock_tables[table_idx].lock();
733
734 let lock = table.entry(lsn).or_insert_with(Lock::new_thin);
735 let _preempted = lock.steal_lock(locker_id);
736
737 Ok(())
738 }
739
740 /// Importunate lock steal performed inside the wait loop, mirroring JE
741 /// `LockManager.stealLockInternal` (LockManager.java:1599): flush this
742 /// locker's waiter entry, remove all preemptable owners, then re-attempt
743 /// the lock. Non-preemptable owners (other importunate lockers) are left
744 /// in place, so the re-attempt may still fail — returning `false` tells
745 /// the wait loop to keep waiting (LockManager.java:556 `continue`).
746 ///
747 /// Returns `true` iff the lock is now held by `locker_id`.
748 fn steal_lock_in_wait(
749 &self,
750 lsn: u64,
751 locker_id: i64,
752 lock_type: LockType,
753 ) -> bool {
754 // Only preempt owners that are not marked non-preemptable.
755 let non_preemptable: Option<HashSet<i64>> = {
756 let np = self.non_preemptable.read().unwrap();
757 if np.is_empty() { None } else { Some(np.clone()) }
758 };
759 let preemptable_fn = move |owner_id: i64| -> bool {
760 match &non_preemptable {
761 Some(np) => !np.contains(&owner_id),
762 None => true,
763 }
764 };
765 let shares = self.build_shares_fn(locker_id);
766
767 let table_idx = self.get_table_index(lsn);
768 let mut table = self.lock_tables[table_idx].lock();
769 let lock = table.entry(lsn).or_insert_with(Lock::new_thin);
770
771 // flushWaiter: our waiter entry may still be present.
772 lock.flush_waiter(locker_id);
773 // stealLock: remove all preemptable owners.
774 let _preempted =
775 lock.steal_lock_preemptable(locker_id, &preemptable_fn);
776 // Re-attempt as a non-blocking, jump-ahead request.
777 let result = lock.lock_with_sharing(
778 lock_type, locker_id, true, // non_blocking
779 false, // jump_ahead_of_waiters
780 &shares,
781 );
782 result.success
783 }
784
785 /// Returns true if the given locker owns a write lock on the LSN.
786 ///
787 ///
788 pub fn is_owned_write_lock(&self, lsn: u64, locker_id: i64) -> bool {
789 let table_idx = self.get_table_index(lsn);
790 let table = self.lock_tables[table_idx].lock();
791
792 if let Some(lock) = table.get(&lsn) {
793 lock.is_owned_write_lock(locker_id)
794 } else {
795 false
796 }
797 }
798
799 /// Returns the lock type owned by the locker, or None.
800 ///
801 ///
802 pub fn get_owned_lock_type(
803 &self,
804 lsn: u64,
805 locker_id: i64,
806 ) -> Option<LockType> {
807 let table_idx = self.get_table_index(lsn);
808 let table = self.lock_tables[table_idx].lock();
809
810 if let Some(lock) = table.get(&lsn) {
811 lock.get_owned_lock_type(locker_id)
812 } else {
813 None
814 }
815 }
816
817 /// Returns the owner count and waiter count for a given LSN.
818 pub fn get_lock_info(&self, lsn: u64) -> (usize, usize) {
819 let table_idx = self.get_table_index(lsn);
820 let table = self.lock_tables[table_idx].lock();
821
822 if let Some(lock) = table.get(&lsn) {
823 (lock.n_owners(), lock.n_waiters())
824 } else {
825 (0, 0)
826 }
827 }
828
829 /// Returns current lock statistics.
830 ///
831 ///
832 pub fn get_stats(&self) -> LockStats {
833 // Single pass over all lock tables to compute live counts. n_waiters
834 // and n_owners were previously hardcoded to 0 / lock-count; report the
835 // real aggregate so callers (and tests) can observe contention.
836 let mut n_total_locks: u64 = 0;
837 let mut n_owners: u64 = 0;
838 let mut n_waiters: u64 = 0;
839 for table in &self.lock_tables {
840 let table = table.lock();
841 for lock in table.values() {
842 n_total_locks += 1;
843 n_owners += lock.n_owners() as u64;
844 n_waiters += lock.n_waiters() as u64;
845 }
846 }
847 LockStats {
848 lock_requests: self.stats.lock_requests.load(Ordering::Relaxed),
849 lock_waits: self.stats.lock_waits.load(Ordering::Relaxed),
850 n_owners,
851 n_waiters,
852 n_total_locks,
853 n_read_locks: 0,
854 n_write_locks: 0,
855 n_lock_timeouts: self.stats.lock_timeouts.load(Ordering::Relaxed),
856 }
857 }
858
859 /// Returns the number of lock entries across all tables.
860 pub fn n_total_locks(&self) -> usize {
861 let mut total = 0;
862 for table in &self.lock_tables {
863 total += table.lock().len();
864 }
865 total
866 }
867
868 // ========================================================================
869 // Lock-sharing registry — `LockManager.threadLockers` analogue
870 // ========================================================================
871
872 /// Registers a locker in the sharing registry with the given group ID.
873 ///
874 /// All lockers sharing the same `group_id` bypass conflict detection with
875 /// each other (`Locker.sharesLocksWith(other)`).
876 ///
877 /// Called by `ThreadLocker::new()` (group = thread_id) and by
878 /// `HandleLocker::with_buddy()` (group = buddy_locker_id).
879 ///
880 ///
881 pub fn register_locker_sharing(&self, locker_id: i64, group_id: i64) {
882 self.share_registry.write().unwrap().insert(locker_id, group_id);
883 }
884
885 /// Removes a locker from the sharing registry.
886 ///
887 /// Called by `ThreadLocker::drop()` and `HandleLocker::drop()`.
888 ///
889 ///
890 pub fn unregister_locker_sharing(&self, locker_id: i64) {
891 self.share_registry.write().unwrap().remove(&locker_id);
892 }
893
894 /// Marks `locker_id` as non-preemptable (its locks cannot be stolen).
895 ///
896 /// Called for importunate lockers (HA `ReplayTxn`). JE tracks this per
897 /// `Locker` via `getPreemptable()`; the lock manager mirrors it so the
898 /// importunate steal in the wait loop can honor a non-preemptable owner
899 /// (LockManager.java:556).
900 pub fn register_non_preemptable(&self, locker_id: i64) {
901 self.non_preemptable.write().unwrap().insert(locker_id);
902 }
903
904 /// Clears the non-preemptable mark for `locker_id`.
905 pub fn unregister_non_preemptable(&self, locker_id: i64) {
906 self.non_preemptable.write().unwrap().remove(&locker_id);
907 }
908
909 /// Returns true if `a` and `b` are in the same lock-sharing group.
910 ///
911 /// Used by `ThreadLocker::shares_locks_with()` and
912 /// `HandleLocker::shares_locks_with()`.
913 pub fn same_share_group(&self, a: i64, b: i64) -> bool {
914 let registry = self.share_registry.read().unwrap();
915 match (registry.get(&a), registry.get(&b)) {
916 (Some(ga), Some(gb)) => ga == gb,
917 _ => false,
918 }
919 }
920
921 /// Builds the `sharesLocksWith` predicate for `locker_id`, used by every
922 /// acquisition (`lock_with_timeout`) and release (`release`) so that
923 /// `LockImpl` can skip conflict detection between cooperating lockers.
924 /// `shares(owner_id)` returns true iff `owner_id` is in `locker_id`'s
925 /// sharing group. JE: `LockImpl.tryLock` / `rangeInsertConflict` consult
926 /// `sharesLocksWith` (LockImpl.java:647-648, 719).
927 ///
928 /// The registry HashMap is cloned only when `locker_id` actually belongs
929 /// to a sharing group; the common path (BasicLockers, most internal ops)
930 /// shares with no one and returns a closure that allocates nothing.
931 fn build_shares_fn(&self, locker_id: i64) -> impl Fn(i64) -> bool + use<> {
932 let requester_group: Option<i64> =
933 self.share_registry.read().unwrap().get(&locker_id).copied();
934 let registry_snapshot: Option<hashbrown::HashMap<i64, i64>> =
935 if requester_group.is_some() {
936 Some(self.share_registry.read().unwrap().clone())
937 } else {
938 None
939 };
940 move |owner_id: i64| -> bool {
941 match (requester_group, ®istry_snapshot) {
942 (Some(req_group), Some(reg)) => {
943 reg.get(&owner_id).copied() == Some(req_group)
944 }
945 _ => false,
946 }
947 }
948 }
949
950 /// Deprecated alias for `lock_with_timeout()`.
951 ///
952 /// TXN-F2 fix: the plain `lock()` / `lock_with_timeout()` path now
953 /// consults the sharing registry on every acquisition (JE
954 /// `LockImpl.tryLock` checks `sharesLocksWith`, LockImpl.java:647-648),
955 /// so a separate sharing entry point is redundant. Kept as a thin
956 /// forwarder so existing callers keep compiling; new code should call
957 /// `lock` / `lock_with_timeout` directly.
958 pub fn lock_with_sharing(
959 &self,
960 lsn: u64,
961 locker_id: i64,
962 lock_type: LockType,
963 non_blocking: bool,
964 jump_ahead_of_waiters: bool,
965 ) -> Result<LockGrantType, TxnError> {
966 self.lock(
967 lsn,
968 locker_id,
969 lock_type,
970 non_blocking,
971 jump_ahead_of_waiters,
972 )
973 }
974
975 /// Deprecated alias for `lock_with_timeout()`; see `lock_with_sharing`.
976 pub fn lock_with_sharing_and_timeout(
977 &self,
978 lsn: u64,
979 locker_id: i64,
980 lock_type: LockType,
981 non_blocking: bool,
982 jump_ahead_of_waiters: bool,
983 timeout_ms: u64,
984 ) -> Result<LockGrantType, TxnError> {
985 self.lock_with_timeout(
986 lsn,
987 locker_id,
988 lock_type,
989 non_blocking,
990 jump_ahead_of_waiters,
991 timeout_ms,
992 )
993 }
994
995 // ========================================================================
996
997 /// Returns the lock table index for a given LSN.
998 ///
999 ///
1000 ///
1001 #[inline]
1002 /// Returns the configured number of lock-table shards.
1003 pub fn n_lock_tables(&self) -> usize {
1004 self.n_lock_tables
1005 }
1006
1007 fn get_table_index(&self, lsn: u64) -> usize {
1008 ((lsn as usize) & 0x7fff_ffff) % self.n_lock_tables
1009 }
1010
1011 /// Records that `locker_id` is now waiting on `owner_ids` in the
1012 /// incremental waits-for graph. Called right after Phase 1 in both wait
1013 /// paths, before the first deadlock check.
1014 fn record_wait(&self, locker_id: i64, owner_ids: &[i64]) {
1015 let mut graph = self.waiter_graph.lock();
1016 graph.insert(locker_id, owner_ids.to_vec());
1017 }
1018
1019 /// Removes `locker_id` from the incremental waits-for graph. Called at
1020 /// every exit point after `record_wait`: grant, timeout, and deadlock abort.
1021 fn clear_wait(&self, locker_id: i64) {
1022 let mut graph = self.waiter_graph.lock();
1023 graph.remove(&locker_id);
1024 }
1025
1026 /// Removes `locker_id` from the on-shard waiter list and from the
1027 /// incremental waiter graph, in canonical lock order (shard first).
1028 ///
1029 /// H-2 (2026 audit F-6.2): all victim-cleanup paths must
1030 /// acquire the shard mutex BEFORE (or without) the waiter_graph mutex.
1031 /// This helper enforces the ordering: it locks the shard, flushes the
1032 /// waiter entry, drops the shard guard, then calls `clear_wait()` to
1033 /// remove from the waiter_graph. Never call `clear_wait()` before this.
1034 fn flush_and_clear_waiter(
1035 &self,
1036 table_idx: usize,
1037 lsn: u64,
1038 locker_id: i64,
1039 ) {
1040 // Shard first (canonical ordering).
1041 {
1042 let mut table = self.lock_tables[table_idx].lock();
1043 if let Some(lock) = table.get_mut(&lsn) {
1044 lock.flush_waiter(locker_id);
1045 if lock.n_owners() == 0 && lock.n_waiters() == 0 {
1046 table.remove(&lsn);
1047 }
1048 }
1049 }
1050 // Waiter_graph after shard is released.
1051 self.clear_wait(locker_id);
1052 }
1053
1054 /// aborted as the victim.
1055 ///
1056 /// Returns `Some(TxnError::Deadlock)` if the cycle is detected and this
1057 /// locker is the chosen victim, `None` otherwise.
1058 ///
1059 /// Reads the incremental `waiter_graph` snapshot — O(n_active_waiters),
1060 /// no shard re-acquisition. Victim selection uses "youngest locker"
1061 /// heuristic (highest locker_id, i.e. most recently started transaction)
1062 /// since we avoid the O(N_LOCK_TABLES) scan needed for exact lock counts.
1063 fn check_deadlock_for_waiter(
1064 &self,
1065 lsn: u64,
1066 locker_id: i64,
1067 lock_type: LockType,
1068 owner_ids: &[i64],
1069 ) -> Option<TxnError> {
1070 // Build the waits-for snapshot from the incremental graph. Also
1071 // ensure the current requester's edge is present (record_wait may
1072 // not have been called yet on the very first check).
1073 let waits_for: HashMap<i64, HashSet<i64>> = {
1074 let graph = self.waiter_graph.lock();
1075 let mut wf: HashMap<i64, HashSet<i64>> = graph
1076 .iter()
1077 .map(|(&wid, owners)| (wid, owners.iter().copied().collect()))
1078 .collect();
1079 wf.entry(locker_id)
1080 .or_insert_with(|| owner_ids.iter().copied().collect());
1081 wf
1082 };
1083
1084 let cycle = DeadlockDetector::detect(locker_id, owner_ids, &waits_for)?;
1085 // Compute per-locker lock counts for the cycle so select_victim can
1086 // apply its primary criterion (fewest locks held) instead of falling
1087 // through to the youngest-locker tiebreaker. This walks every shard,
1088 // but it only runs when a deadlock cycle has been detected (a rare
1089 // event), so the scan cost is amortized over the rare deadlock
1090 // event and is not on the common no-cycle path.
1091 let lock_counts = self.compute_lock_counts(&cycle);
1092 let victim = DeadlockDetector::select_victim(&cycle, &lock_counts);
1093
1094 if victim == locker_id {
1095 // Format the cycle as typed locker IDs (e.g.
1096 // `"auto-txn:42 -> txn:17"`) so a mixed auto-commit/explicit-txn
1097 // deadlock is visibly distinguishable in the error message.
1098 // Closes the second F12 residual.
1099 let cycle_fmt = self.format_lockers(&cycle);
1100 let victim_fmt = self.format_locker(locker_id);
1101 Some(TxnError::Deadlock(format!(
1102 "deadlock cycle detected ({cycle_fmt}); {victim_fmt} chosen \
1103 as victim while waiting for LSN {lsn} ({lock_type:?})"
1104 )))
1105 } else {
1106 None
1107 }
1108 }
1109
1110 /// Tallies, for every locker_id in `cycle`, the number of locks they
1111 /// currently hold across all shards.
1112 ///
1113 /// Used by deadlock victim selection so the primary criterion (fewest
1114 /// locks held = least work to roll back) can be applied. Walks every
1115 /// shard but is only called after a deadlock cycle has been detected,
1116 /// so the scan cost is paid only on the rare cycle path, never on the
1117 /// common no-cycle path.
1118 fn compute_lock_counts(&self, cycle: &[i64]) -> HashMap<i64, usize> {
1119 use std::collections::HashSet;
1120 let cycle_set: HashSet<i64> = cycle.iter().copied().collect();
1121 let mut counts: HashMap<i64, usize> =
1122 cycle.iter().copied().map(|id| (id, 0usize)).collect();
1123 for shard in &self.lock_tables {
1124 let table = shard.lock();
1125 for lock in table.values() {
1126 for owner_id in lock.get_owner_ids() {
1127 if cycle_set.contains(&owner_id) {
1128 *counts.entry(owner_id).or_insert(0) += 1;
1129 }
1130 }
1131 }
1132 }
1133 counts
1134 }
1135}
1136
1137impl Default for LockManager {
1138 fn default() -> Self {
1139 Self::new()
1140 }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145 use super::*;
1146 use std::sync::Arc;
1147 use std::thread;
1148 use std::time::Duration;
1149
1150 // -----------------------------------------------------------------------
1151 // Original single-threaded tests (preserved)
1152 // -----------------------------------------------------------------------
1153
1154 #[test]
1155 fn test_new_lock_manager() {
1156 let lm = LockManager::new();
1157 assert_eq!(lm.n_total_locks(), 0);
1158
1159 let stats = lm.get_stats();
1160 assert_eq!(stats.lock_requests, 0);
1161 assert_eq!(stats.lock_waits, 0);
1162 }
1163
1164 #[test]
1165 fn test_lock_type_none() {
1166 let lm = LockManager::new();
1167 let result = lm.lock(1000, 1, LockType::None, false, false);
1168 assert!(result.is_ok());
1169 assert_eq!(result.unwrap(), LockGrantType::NoneNeeded);
1170
1171 let stats = lm.get_stats();
1172 assert_eq!(stats.lock_requests, 0);
1173 }
1174
1175 #[test]
1176 fn test_lock_type_restart() {
1177 let lm = LockManager::new();
1178 let result = lm.lock(1000, 1, LockType::Restart, false, false);
1179 assert!(result.is_err());
1180 assert!(matches!(result.unwrap_err(), TxnError::RangeRestart));
1181 }
1182
1183 #[test]
1184 fn test_basic_lock_release() {
1185 let lm = LockManager::new();
1186
1187 let result = lm.lock(1000, 1, LockType::Read, false, false);
1188 assert!(result.is_ok());
1189 assert_eq!(result.unwrap(), LockGrantType::New);
1190 assert_eq!(lm.n_total_locks(), 1);
1191
1192 let result = lm.release(1000, 1);
1193 assert!(result.is_ok());
1194 assert_eq!(lm.n_total_locks(), 0);
1195 }
1196
1197 #[test]
1198 fn test_multiple_readers() {
1199 let lm = LockManager::new();
1200
1201 let result = lm.lock(1000, 1, LockType::Read, false, false);
1202 assert!(result.is_ok());
1203 assert_eq!(result.unwrap(), LockGrantType::New);
1204
1205 let result = lm.lock(1000, 2, LockType::Read, false, false);
1206 assert!(result.is_ok());
1207 assert_eq!(result.unwrap(), LockGrantType::New);
1208
1209 assert_eq!(lm.n_total_locks(), 1);
1210 let (owners, waiters) = lm.get_lock_info(1000);
1211 assert_eq!(owners, 2);
1212 assert_eq!(waiters, 0);
1213 }
1214
1215 #[test]
1216 fn test_non_blocking_denied() {
1217 let lm = LockManager::new();
1218
1219 let result = lm.lock(1000, 1, LockType::Write, false, false);
1220 assert!(result.is_ok());
1221 assert_eq!(result.unwrap(), LockGrantType::New);
1222
1223 let result = lm.lock(1000, 2, LockType::Write, true, false);
1224 assert!(result.is_err());
1225 assert!(matches!(
1226 result.unwrap_err(),
1227 TxnError::LockNotAvailable { .. }
1228 ));
1229 }
1230
1231 #[test]
1232 fn test_table_sharding() {
1233 let lm = LockManager::new();
1234
1235 for i in 0..100u64 {
1236 let result =
1237 lm.lock(i * 1000, i as i64, LockType::Write, false, false);
1238 assert!(result.is_ok());
1239 }
1240
1241 assert_eq!(lm.n_total_locks(), 100);
1242
1243 let idx1 = lm.get_table_index(1000);
1244 let idx2 = lm.get_table_index(2000);
1245 assert!(idx1 < lm.n_lock_tables);
1246 assert!(idx2 < lm.n_lock_tables);
1247 }
1248
1249 #[test]
1250 fn test_lock_cleanup() {
1251 let lm = LockManager::new();
1252
1253 for i in 0..100 {
1254 let _ = lm.lock(i, 1, LockType::Write, false, false);
1255 }
1256 assert_eq!(lm.n_total_locks(), 100);
1257
1258 for i in 0..100 {
1259 let _ = lm.release(i, 1);
1260 }
1261
1262 assert_eq!(lm.n_total_locks(), 0);
1263 }
1264
1265 /// TXN-F3 regression: an importunate locker steals a held, conflicting
1266 /// lock from a preemptable owner instead of waiting/timing out. JE
1267 /// `LockManager.waitForLock`: `if (isImportunate) { result =
1268 /// stealLock(...); if (result.success) break; }` (LockManager.java:552).
1269 #[test]
1270 fn test_importunate_steals_conflicting_lock() {
1271 const LSN: u64 = 4242;
1272 let lm = LockManager::new();
1273
1274 // Owner 1 (preemptable) holds a Write lock.
1275 assert_eq!(
1276 lm.lock(LSN, 1, LockType::Write, false, false).unwrap(),
1277 LockGrantType::New
1278 );
1279 assert!(lm.is_owned_write_lock(LSN, 1));
1280
1281 // Importunate locker 2 requests a conflicting Write lock. Use a
1282 // non_blocking=false call with a SHORT timeout: pre-fix this would
1283 // either time out (LockTimeout) or block; post-fix the steal grants
1284 // it immediately.
1285 let grant = lm
1286 .lock_importunate_with_timeout(LSN, 2, LockType::Write, false, 200)
1287 .expect("importunate locker must steal the conflicting lock");
1288 assert!(matches!(grant, LockGrantType::New | LockGrantType::Existing));
1289
1290 // The steal preempted owner 1; locker 2 now holds the write lock.
1291 assert!(lm.is_owned_write_lock(LSN, 2));
1292 assert!(!lm.is_owned_write_lock(LSN, 1));
1293 }
1294
1295 /// A non-preemptable owner blocks the steal: the importunate request
1296 /// falls back to a normal wait and times out rather than stealing.
1297 /// JE: "Lock holder is non-preemptable, wait again" (LockManager.java:556).
1298 #[test]
1299 fn test_importunate_cannot_steal_non_preemptable() {
1300 const LSN: u64 = 4343;
1301 let lm = LockManager::new();
1302
1303 // Owner 1 is non-preemptable (another importunate locker).
1304 lm.register_non_preemptable(1);
1305 assert_eq!(
1306 lm.lock(LSN, 1, LockType::Write, false, false).unwrap(),
1307 LockGrantType::New
1308 );
1309
1310 // Importunate locker 2 cannot steal; with a short timeout it must
1311 // time out, and owner 1 keeps the lock.
1312 let r = lm.lock_importunate_with_timeout(
1313 LSN,
1314 2,
1315 LockType::Write,
1316 false,
1317 100,
1318 );
1319 assert!(matches!(r, Err(TxnError::LockTimeout { .. })));
1320 assert!(lm.is_owned_write_lock(LSN, 1));
1321 assert!(!lm.is_owned_write_lock(LSN, 2));
1322 }
1323
1324 #[test]
1325 fn test_statistics() {
1326 let lm = LockManager::new();
1327
1328 let _ = lm.lock(1000, 1, LockType::Read, false, false);
1329 let _ = lm.lock(1000, 2, LockType::Read, false, false);
1330
1331 let stats = lm.get_stats();
1332 assert_eq!(stats.lock_requests, 2);
1333 }
1334
1335 #[test]
1336 fn test_is_owned_write_lock() {
1337 let lm = LockManager::new();
1338
1339 let _ = lm.lock(1000, 1, LockType::Write, false, false);
1340
1341 assert!(lm.is_owned_write_lock(1000, 1));
1342 assert!(!lm.is_owned_write_lock(1000, 2));
1343 assert!(!lm.is_owned_write_lock(2000, 1));
1344 }
1345
1346 #[test]
1347 fn test_get_owned_lock_type() {
1348 let lm = LockManager::new();
1349
1350 let _ = lm.lock(1000, 1, LockType::Read, false, false);
1351
1352 assert_eq!(lm.get_owned_lock_type(1000, 1), Some(LockType::Read));
1353 assert_eq!(lm.get_owned_lock_type(1000, 2), None);
1354 assert_eq!(lm.get_owned_lock_type(2000, 1), None);
1355 }
1356
1357 #[test]
1358 fn test_demote() {
1359 let lm = LockManager::new();
1360
1361 let _ = lm.lock(1000, 1, LockType::Write, false, false);
1362 assert!(lm.is_owned_write_lock(1000, 1));
1363
1364 let _ = lm.demote(1000, 1);
1365 assert!(!lm.is_owned_write_lock(1000, 1));
1366 assert_eq!(lm.get_owned_lock_type(1000, 1), Some(LockType::Read));
1367 }
1368
1369 #[test]
1370 fn test_steal_lock() {
1371 let lm = LockManager::new();
1372
1373 let _ = lm.lock(1000, 1, LockType::Read, false, false);
1374 assert_eq!(lm.get_owned_lock_type(1000, 1), Some(LockType::Read));
1375
1376 let _ = lm.steal_lock(1000, 2);
1377 }
1378
1379 // -----------------------------------------------------------------------
1380 // Multi-threaded blocking tests
1381 // -----------------------------------------------------------------------
1382
1383 /// Thread A holds a write lock; thread B blocks on it. When A releases,
1384 /// B should be granted the lock.
1385 ///
1386 /// Waitforlock / notifyall flow.
1387 #[test]
1388 fn test_blocking_lock_granted_on_release() {
1389 let lm = Arc::new(LockManager::new());
1390 const LSN: u64 = 0xDEAD_BEEF;
1391
1392 // Thread A acquires the write lock.
1393 lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1394
1395 // Sync: wait for B to register as waiter before A releases.
1396 let ready = Arc::new((Mutex::new(false), Condvar::new()));
1397
1398 let lm_b = Arc::clone(&lm);
1399 let ready_b = Arc::clone(&ready);
1400 let b = thread::spawn(move || {
1401 // Signal that B is about to block.
1402 {
1403 let (m, cv) = &*ready_b;
1404 let mut g = m.lock();
1405 *g = true;
1406 cv.notify_all();
1407 }
1408 // Block until A releases (5 s timeout so test doesn't hang).
1409
1410 lm_b.lock_with_timeout(LSN, 2, LockType::Write, false, false, 5000)
1411 });
1412
1413 // Wait until B has at least started, then give it a moment to block.
1414 {
1415 let (m, cv) = &*ready;
1416 let mut g = m.lock();
1417 while !*g {
1418 cv.wait(&mut g);
1419 }
1420 }
1421 // Small sleep so B enters the condvar wait.
1422 thread::sleep(Duration::from_millis(50));
1423
1424 // A releases the lock.
1425 lm.release(LSN, 1).unwrap();
1426
1427 // B should wake up and get the lock.
1428 let result = b.join().unwrap();
1429 assert!(result.is_ok(), "thread B expected Ok, got {:?}", result);
1430 assert_eq!(result.unwrap(), LockGrantType::New);
1431 }
1432
1433 /// Thread A holds a write lock. Thread B waits with a short timeout.
1434 /// A never releases, so B should receive `LockTimeout`.
1435 #[test]
1436 fn test_lock_timeout() {
1437 let lm = Arc::new(LockManager::new());
1438 const LSN: u64 = 0xCAFE_BABE;
1439
1440 // Thread A acquires the write lock and holds it for the entire test.
1441 lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1442
1443 let lm_b = Arc::clone(&lm);
1444 let b = thread::spawn(move || {
1445 // 100 ms timeout — A will not release.
1446 lm_b.lock_with_timeout(LSN, 2, LockType::Read, false, false, 100)
1447 });
1448
1449 let result = b.join().unwrap();
1450 assert!(
1451 matches!(result, Err(TxnError::LockTimeout { .. })),
1452 "expected LockTimeout, got {:?}",
1453 result
1454 );
1455
1456 // Clean up: A releases.
1457 lm.release(LSN, 1).unwrap();
1458 }
1459
1460 /// A -> holds X, waits Y; B -> holds Y, waits X.
1461 /// One of them must get `Deadlock` error.
1462 #[test]
1463 fn test_deadlock_detected() {
1464 let lm = Arc::new(LockManager::new());
1465 const LSN_X: u64 = 0x1111_1111;
1466 const LSN_Y: u64 = 0x2222_2222;
1467
1468 // Thread A holds X.
1469 lm.lock(LSN_X, 1, LockType::Write, false, false).unwrap();
1470
1471 // Thread B holds Y.
1472 lm.lock(LSN_Y, 2, LockType::Write, false, false).unwrap();
1473
1474 let lm_a = Arc::clone(&lm);
1475 let lm_b = Arc::clone(&lm);
1476
1477 // A waits for Y (held by B), B waits for X (held by A) — classic
1478 // deadlock. Use a generous timeout so the deadlock detector fires
1479 // rather than the timeout.
1480 let a = thread::spawn(move || {
1481 lm_a.lock_with_timeout(
1482 LSN_Y,
1483 1,
1484 LockType::Write,
1485 false,
1486 false,
1487 3000,
1488 )
1489 });
1490
1491 // Give A a moment to register as waiter.
1492 thread::sleep(Duration::from_millis(50));
1493
1494 let b = thread::spawn(move || {
1495 lm_b.lock_with_timeout(
1496 LSN_X,
1497 2,
1498 LockType::Write,
1499 false,
1500 false,
1501 3000,
1502 )
1503 });
1504
1505 let res_a = a.join().unwrap();
1506 let res_b = b.join().unwrap();
1507
1508 // At least one must be a Deadlock error.
1509 let one_deadlock = matches!(res_a, Err(TxnError::Deadlock(_)))
1510 || matches!(res_b, Err(TxnError::Deadlock(_)));
1511 assert!(
1512 one_deadlock,
1513 "expected at least one Deadlock, got a={:?} b={:?}",
1514 res_a, res_b
1515 );
1516 }
1517
1518 /// One write lock released; multiple waiting readers must all be granted.
1519 ///
1520 /// grants all compatible waiters at once in LockImpl.release().
1521 #[test]
1522 fn test_multiple_readers_unblocked() {
1523 let lm = Arc::new(LockManager::new());
1524 const LSN: u64 = 0xFEED_FACE;
1525 const N_READERS: usize = 4;
1526
1527 // Writer holds the lock.
1528 lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1529
1530 let started = Arc::new((Mutex::new(0usize), Condvar::new()));
1531 let mut handles = Vec::new();
1532
1533 for i in 0..N_READERS {
1534 let lm_r = Arc::clone(&lm);
1535 let started_r = Arc::clone(&started);
1536 let h = thread::spawn(move || {
1537 {
1538 let (m, cv) = &*started_r;
1539 let mut g = m.lock();
1540 *g += 1;
1541 cv.notify_all();
1542 }
1543 lm_r.lock_with_timeout(
1544 LSN,
1545 (i + 2) as i64,
1546 LockType::Read,
1547 false,
1548 false,
1549 5000,
1550 )
1551 });
1552 handles.push(h);
1553 }
1554
1555 // Wait until all readers have signalled.
1556 {
1557 let (m, cv) = &*started;
1558 let mut g = m.lock();
1559 while *g < N_READERS {
1560 cv.wait(&mut g);
1561 }
1562 }
1563 // Allow time for all readers to block.
1564 thread::sleep(Duration::from_millis(80));
1565
1566 // Release the write lock.
1567 lm.release(LSN, 1).unwrap();
1568
1569 // All readers should have been granted.
1570 for h in handles {
1571 let result = h.join().unwrap();
1572 assert!(result.is_ok(), "reader expected Ok, got {:?}", result);
1573 assert_eq!(result.unwrap(), LockGrantType::New);
1574 }
1575 }
1576
1577 // -----------------------------------------------------------------------
1578 // Ported from LockManagerTest.java — testNegatives
1579 // -----------------------------------------------------------------------
1580
1581 /// Query methods return false before
1582 /// a lock is acquired, and the lock entry is cleaned up after release.
1583 #[test]
1584 fn test_je_negatives_query_before_lock() {
1585 let lm = LockManager::new();
1586 let lsn: u64 = 1;
1587
1588 // No lock held yet.
1589 assert_eq!(lm.get_owned_lock_type(lsn, 1), None);
1590 assert_eq!(lm.get_owned_lock_type(lsn, 1), None); // write check
1591 let (owners, _) = lm.get_lock_info(lsn);
1592 assert_eq!(owners, 0);
1593
1594 // Acquire READ lock for locker 1.
1595 let r = lm.lock(lsn, 1, LockType::Read, false, false);
1596 assert!(r.is_ok());
1597 assert_eq!(r.unwrap(), LockGrantType::New);
1598
1599 // A second request for the same lock by the same locker → EXISTING.
1600 let r2 = lm.lock(lsn, 1, LockType::Read, false, false);
1601 assert_eq!(r2.unwrap(), LockGrantType::Existing);
1602
1603 // Locker 2 does not own it.
1604 assert_eq!(lm.get_owned_lock_type(lsn, 2), None);
1605
1606 // The lock entry exists.
1607 let (owners, _) = lm.get_lock_info(lsn);
1608 assert_eq!(owners, 1);
1609
1610 // Release a non-existent LSN — should not panic and lock should persist.
1611 let _ = lm.release(2, 1); // lsn=2 doesn't exist
1612 let (owners2, _) = lm.get_lock_info(lsn);
1613 assert_eq!(owners2, 1);
1614
1615 // Release by a non-owner (locker 2) should not release lsn=1.
1616 let _ = lm.release(lsn, 2);
1617 let (owners3, _) = lm.get_lock_info(lsn);
1618 assert_eq!(owners3, 1);
1619 assert_eq!(lm.get_owned_lock_type(lsn, 1), Some(LockType::Read));
1620
1621 // True release by the actual owner.
1622 lm.release(lsn, 1).unwrap();
1623 let (owners4, _) = lm.get_lock_info(lsn);
1624 assert_eq!(owners4, 0);
1625 assert_eq!(lm.get_owned_lock_type(lsn, 1), None);
1626 }
1627
1628 /// Holding write then requesting
1629 /// READ for the same locker succeeds (WRITE subsumes READ).
1630 #[test]
1631 fn test_je_write_then_read_same_locker_ok() {
1632 let lm = LockManager::new();
1633 let lsn: u64 = 1;
1634
1635 lm.lock(lsn, 1, LockType::Write, false, false).unwrap();
1636 // READ request for same locker — must succeed (EXISTING or better).
1637 let r = lm.lock(lsn, 1, LockType::Read, false, false);
1638 assert!(r.is_ok());
1639 // A third WRITE request should also be EXISTING.
1640 let r2 = lm.lock(lsn, 1, LockType::Write, false, false);
1641 assert_eq!(r2.unwrap(), LockGrantType::Existing);
1642 }
1643
1644 // -----------------------------------------------------------------------
1645 // Ported from LockManagerTest.java — testSR15926LargeNodeIds
1646 // -----------------------------------------------------------------------
1647
1648 /// Lsn values with the
1649 /// sign bit set (> 0x80000000) must hash to a non-negative table index.
1650 #[test]
1651 fn test_je_large_lsn_no_negative_index() {
1652 let lm = LockManager::new();
1653 // 0x80000000 is the value from the original bug report.
1654 let large_lsn: u64 = 0x80000000u64;
1655 let result = lm.lock(large_lsn, 1, LockType::Write, false, false);
1656 assert!(result.is_ok(), "large LSN should not cause a panic or error");
1657 lm.release(large_lsn, 1).unwrap();
1658 }
1659
1660 // -----------------------------------------------------------------------
1661 // Ported from LockManagerTest — distinct-LSN distribution across shards
1662 // -----------------------------------------------------------------------
1663
1664 /// Verify shard distribution: distinct LSNs are independently managed
1665 /// regardless of the configured shard count (default 64).
1666 #[test]
1667 fn test_with_config_shard_count_honored() {
1668 // DRIFT-2 regression: the shard count must come from config, not a
1669 // hardcoded constant. default new() = 64; with_config respects N.
1670 assert_eq!(LockManager::new().n_lock_tables(), 64);
1671 assert_eq!(LockManager::with_config(500, 8).n_lock_tables(), 8);
1672 // Clamped to at least 1.
1673 assert_eq!(LockManager::with_config(500, 0).n_lock_tables(), 1);
1674 // get_table_index stays within the configured shard count.
1675 let lm = LockManager::with_config(500, 8);
1676 for lsn in [1u64, 7, 64, 1000, u64::MAX] {
1677 assert!(lm.get_table_index(lsn) < 8);
1678 }
1679 }
1680
1681 #[test]
1682 fn test_je_sixteen_lock_tables() {
1683 // Distribute 16 distinct LSNs and check all are independently managed
1684 // (shard count is an instance field, default 64).
1685 let lm = LockManager::new();
1686 for i in 0..16u64 {
1687 lm.lock(i, 1, LockType::Write, false, false).unwrap();
1688 }
1689 assert_eq!(lm.n_total_locks(), 16);
1690 for i in 0..16u64 {
1691 lm.release(i, 1).unwrap();
1692 }
1693 assert_eq!(lm.n_total_locks(), 0);
1694 }
1695
1696 // -----------------------------------------------------------------------
1697 // Ported from LockManagerTest.java — testMultipleReaders
1698 // -----------------------------------------------------------------------
1699
1700 /// Three concurrent threads
1701 /// can all hold read locks simultaneously.
1702 #[test]
1703 fn test_je_multiple_readers_concurrent() {
1704 let lm = Arc::new(LockManager::new());
1705 const LSN: u64 = 0xAAAA;
1706 let ready = Arc::new((Mutex::new(0usize), Condvar::new()));
1707 let mut handles = Vec::new();
1708
1709 for locker_id in 1i64..=3 {
1710 let lm2 = Arc::clone(&lm);
1711 let ready2 = Arc::clone(&ready);
1712 let h = thread::spawn(move || {
1713 lm2.lock(LSN, locker_id, LockType::Read, false, false).unwrap();
1714 assert_eq!(
1715 lm2.get_owned_lock_type(LSN, locker_id),
1716 Some(LockType::Read)
1717 );
1718 {
1719 let (m, cv) = &*ready2;
1720 let mut g = m.lock();
1721 *g += 1;
1722 cv.notify_all();
1723 }
1724 // Wait for all three to own
1725 {
1726 let (m, cv) = &*ready2;
1727 let mut g = m.lock();
1728 while *g < 3 {
1729 cv.wait(&mut g);
1730 }
1731 }
1732 lm2.release(LSN, locker_id).unwrap();
1733 });
1734 handles.push(h);
1735 }
1736 for h in handles {
1737 h.join().unwrap();
1738 }
1739 }
1740
1741 // -----------------------------------------------------------------------
1742 // Ported from LockManagerTest.java — testNonBlockingLock1 / 2
1743 // -----------------------------------------------------------------------
1744
1745 /// A read lock is held;
1746 /// a non-blocking write request is denied; after release the write succeeds.
1747 #[test]
1748 fn test_je_nonblocking_write_denied_then_granted() {
1749 let lm = Arc::new(LockManager::new());
1750 const LSN: u64 = 0xBBBB;
1751
1752 // Thread 1 holds a read lock.
1753 lm.lock(LSN, 1, LockType::Read, false, false).unwrap();
1754
1755 let lm2 = Arc::clone(&lm);
1756 let h = thread::spawn(move || {
1757 // Non-blocking write → must be denied.
1758 let r = lm2.lock(LSN, 2, LockType::Write, true, false);
1759 assert!(
1760 matches!(r, Err(TxnError::LockNotAvailable { .. })),
1761 "expected LockNotAvailable, got {:?}",
1762 r
1763 );
1764 // Locker 2 is not an owner.
1765 assert_eq!(lm2.get_owned_lock_type(LSN, 2), None);
1766 let (_, waiters) = lm2.get_lock_info(LSN);
1767 assert_eq!(waiters, 0);
1768 let (owners, _) = lm2.get_lock_info(LSN);
1769 assert_eq!(owners, 1);
1770 });
1771 h.join().unwrap();
1772
1773 // Now release locker 1; locker 2 can acquire afterwards.
1774 lm.release(LSN, 1).unwrap();
1775 let r2 = lm.lock(LSN, 2, LockType::Write, false, false);
1776 assert_eq!(r2.unwrap(), LockGrantType::New);
1777 lm.release(LSN, 2).unwrap();
1778 }
1779
1780 /// A write lock is held;
1781 /// a non-blocking read request is denied; after release the read succeeds.
1782 #[test]
1783 fn test_je_nonblocking_read_denied_then_granted() {
1784 let lm = Arc::new(LockManager::new());
1785 const LSN: u64 = 0xCCCC;
1786
1787 // Locker 1 holds a write lock.
1788 lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1789 assert!(lm.is_owned_write_lock(LSN, 1));
1790
1791 // Non-blocking read for locker 2 → denied.
1792 let r = lm.lock(LSN, 2, LockType::Read, true, false);
1793 assert!(
1794 matches!(r, Err(TxnError::LockNotAvailable { .. })),
1795 "expected LockNotAvailable, got {:?}",
1796 r
1797 );
1798 assert_eq!(lm.get_owned_lock_type(LSN, 2), None);
1799
1800 // Release locker 1, then locker 2 can read.
1801 lm.release(LSN, 1).unwrap();
1802 let r2 = lm.lock(LSN, 2, LockType::Read, false, false);
1803 assert_eq!(r2.unwrap(), LockGrantType::New);
1804 assert!(!lm.is_owned_write_lock(LSN, 2));
1805 lm.release(LSN, 2).unwrap();
1806 }
1807
1808 // -----------------------------------------------------------------------
1809 // Ported from LockManagerTest.java — testMultipleReadersSingleWrite1
1810 // -----------------------------------------------------------------------
1811
1812 /// Two readers
1813 /// hold a lock; a writer blocks; when both readers release the writer is
1814 /// granted.
1815 #[test]
1816 fn test_je_two_readers_one_writer_blocks_then_granted() {
1817 let lm = Arc::new(LockManager::new());
1818 const LSN: u64 = 0xDDDD;
1819 let writers_waiting = Arc::new((Mutex::new(false), Condvar::new()));
1820
1821 // Locker 1 and 2 acquire read locks upfront.
1822 lm.lock(LSN, 1, LockType::Read, false, false).unwrap();
1823 lm.lock(LSN, 2, LockType::Read, false, false).unwrap();
1824
1825 let lm3 = Arc::clone(&lm);
1826 let ww = Arc::clone(&writers_waiting);
1827 let writer = thread::spawn(move || {
1828 {
1829 let (m, cv) = &*ww;
1830 let mut g = m.lock();
1831 *g = true;
1832 cv.notify_all();
1833 }
1834 // Block until both readers release.
1835 lm3.lock_with_timeout(LSN, 3, LockType::Write, false, false, 5000)
1836 });
1837
1838 // Wait until writer has registered as waiter.
1839 {
1840 let (m, cv) = &*writers_waiting;
1841 let mut g = m.lock();
1842 while !*g {
1843 cv.wait(&mut g);
1844 }
1845 }
1846 thread::sleep(Duration::from_millis(30));
1847
1848 let (_, waiters) = lm.get_lock_info(LSN);
1849 assert_eq!(waiters, 1, "writer should be waiting");
1850
1851 lm.release(LSN, 1).unwrap();
1852 lm.release(LSN, 2).unwrap();
1853
1854 let result = writer.join().unwrap();
1855 assert!(
1856 result.is_ok(),
1857 "writer should have been granted, got {:?}",
1858 result
1859 );
1860 assert!(lm.is_owned_write_lock(LSN, 3));
1861 lm.release(LSN, 3).unwrap();
1862 }
1863
1864 // -----------------------------------------------------------------------
1865 // Ported from DeadlockTest.java — testDeadlockBetweenTwoLockers
1866 // -----------------------------------------------------------------------
1867
1868 /// Classic 2-locker
1869 /// deadlock. Locker 1 holds L1 and waits for L2; locker 2 holds L2 and
1870 /// waits for L1. At least one must receive a Deadlock error.
1871 #[test]
1872 fn test_je_deadlock_two_lockers() {
1873 let lm = Arc::new(LockManager::new());
1874 const L1: u64 = 0x1001;
1875 const L2: u64 = 0x2002;
1876
1877 lm.lock(L1, 1, LockType::Write, false, false).unwrap();
1878 lm.lock(L2, 2, LockType::Write, false, false).unwrap();
1879
1880 let lm_a = Arc::clone(&lm);
1881 let lm_b = Arc::clone(&lm);
1882
1883 let a = thread::spawn(move || {
1884 lm_a.lock_with_timeout(L2, 1, LockType::Write, false, false, 3000)
1885 });
1886 thread::sleep(Duration::from_millis(50));
1887 let b = thread::spawn(move || {
1888 lm_b.lock_with_timeout(L1, 2, LockType::Write, false, false, 3000)
1889 });
1890
1891 let ra = a.join().unwrap();
1892 let rb = b.join().unwrap();
1893
1894 let one_dead = matches!(ra, Err(TxnError::Deadlock(_)))
1895 || matches!(rb, Err(TxnError::Deadlock(_)));
1896 assert!(
1897 one_dead,
1898 "expected at least one Deadlock, got a={:?} b={:?}",
1899 ra, rb
1900 );
1901 }
1902
1903 // -----------------------------------------------------------------------
1904 // Ported from DeadlockTest.java — testDeadlockAmongThreeLockers
1905 // -----------------------------------------------------------------------
1906
1907 /// 3-locker cycle.
1908 /// Locker1 → L2, Locker2 → L3, Locker3 → L1. At least one deadlock.
1909 #[test]
1910 fn test_je_deadlock_three_lockers_cycle() {
1911 let lm = Arc::new(LockManager::new());
1912 const L1: u64 = 0x3001;
1913 const L2: u64 = 0x3002;
1914 const L3: u64 = 0x3003;
1915
1916 // Each locker acquires its first lock.
1917 lm.lock(L1, 1, LockType::Write, false, false).unwrap();
1918 lm.lock(L2, 2, LockType::Write, false, false).unwrap();
1919 lm.lock(L3, 3, LockType::Write, false, false).unwrap();
1920
1921 let lm1 = Arc::clone(&lm);
1922 let lm2 = Arc::clone(&lm);
1923 let lm3 = Arc::clone(&lm);
1924
1925 let t1 = thread::spawn(move || {
1926 lm1.lock_with_timeout(L2, 1, LockType::Write, false, false, 3000)
1927 });
1928 thread::sleep(Duration::from_millis(30));
1929 let t2 = thread::spawn(move || {
1930 lm2.lock_with_timeout(L3, 2, LockType::Write, false, false, 3000)
1931 });
1932 thread::sleep(Duration::from_millis(30));
1933 let t3 = thread::spawn(move || {
1934 lm3.lock_with_timeout(L1, 3, LockType::Write, false, false, 3000)
1935 });
1936
1937 let r1 = t1.join().unwrap();
1938 let r2 = t2.join().unwrap();
1939 let r3 = t3.join().unwrap();
1940
1941 let any_dead = matches!(r1, Err(TxnError::Deadlock(_)))
1942 || matches!(r2, Err(TxnError::Deadlock(_)))
1943 || matches!(r3, Err(TxnError::Deadlock(_)));
1944 assert!(
1945 any_dead,
1946 "3-locker cycle: expected at least one Deadlock error"
1947 );
1948 }
1949
1950 // -----------------------------------------------------------------------
1951 // Ported from DeadlockTest.java — testThrowCorrectException
1952 // -----------------------------------------------------------------------
1953
1954 /// A single waiter with
1955 /// no cycle should time out with LockTimeout (not Deadlock).
1956 #[test]
1957 fn test_je_no_cycle_gives_timeout_not_deadlock() {
1958 let lm = Arc::new(LockManager::new());
1959 const LSN: u64 = 0x4444;
1960
1961 // Locker 1 holds the lock and never releases.
1962 lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1963
1964 let lm2 = Arc::clone(&lm);
1965 let h = thread::spawn(move || {
1966 lm2.lock_with_timeout(LSN, 2, LockType::Write, false, false, 200)
1967 });
1968
1969 let r = h.join().unwrap();
1970 assert!(
1971 matches!(r, Err(TxnError::LockTimeout { .. })),
1972 "no cycle → expected LockTimeout, got {:?}",
1973 r
1974 );
1975
1976 lm.release(LSN, 1).unwrap();
1977 }
1978
1979 // -----------------------------------------------------------------------
1980 // Ported from LockManagerTest — lock statistics increment
1981 // -----------------------------------------------------------------------
1982
1983 /// Lock statistics (lock_requests, lock_waits) must increment correctly.
1984 #[test]
1985 fn test_je_lock_stats_increment() {
1986 let lm = LockManager::new();
1987
1988 lm.lock(10, 1, LockType::Read, false, false).unwrap();
1989 lm.lock(10, 2, LockType::Read, false, false).unwrap();
1990 lm.lock(20, 3, LockType::Write, false, false).unwrap();
1991
1992 let stats = lm.get_stats();
1993 assert_eq!(stats.lock_requests, 3, "3 lock requests should be counted");
1994 // No waits because all were immediately granted.
1995 assert_eq!(stats.lock_waits, 0, "no waits expected");
1996 }
1997
1998 // -----------------------------------------------------------------------
1999 // Ported from LockManagerTest.java — testUpgradeLock
2000 // -----------------------------------------------------------------------
2001
2002 /// A promotion waiter (locker
2003 /// that already holds a read lock) is placed ahead of new write waiters
2004 /// so it gets the write lock before them.
2005 #[test]
2006 fn test_je_upgrade_lock_butts_in_front() {
2007 let lm = Arc::new(LockManager::new());
2008 const LSN: u64 = 0x5555;
2009
2010 // Locker 1 and 2 hold read locks.
2011 lm.lock(LSN, 1, LockType::Read, false, false).unwrap();
2012 lm.lock(LSN, 2, LockType::Read, false, false).unwrap();
2013
2014 let lm3 = Arc::clone(&lm);
2015 let lm2 = Arc::clone(&lm);
2016
2017 // Locker 3 waits for write (new waiter).
2018 let t3 = thread::spawn(move || {
2019 lm3.lock_with_timeout(LSN, 3, LockType::Write, false, false, 5000)
2020 });
2021 thread::sleep(Duration::from_millis(30));
2022
2023 // Locker 2 upgrades read → write (promotion waiter, should jump ahead).
2024 let t2 = thread::spawn(move || {
2025 lm2.lock_with_timeout(LSN, 2, LockType::Write, false, false, 5000)
2026 });
2027 thread::sleep(Duration::from_millis(20));
2028
2029 // Release locker 1's read lock; locker 2's promotion should be granted
2030 // before locker 3.
2031 lm.release(LSN, 1).unwrap();
2032
2033 let r2 = t2.join().unwrap();
2034 assert!(r2.is_ok(), "locker 2 promotion should succeed, got {:?}", r2);
2035 assert_eq!(r2.unwrap(), LockGrantType::Promotion);
2036
2037 // Now release locker 2's write; locker 3 gets it.
2038 lm.release(LSN, 2).unwrap();
2039 let r3 = t3.join().unwrap();
2040 assert!(
2041 r3.is_ok(),
2042 "locker 3 should succeed after locker 2, got {:?}",
2043 r3
2044 );
2045 lm.release(LSN, 3).unwrap();
2046 }
2047
2048 // -----------------------------------------------------------------------
2049 // release_all_for_locker
2050 // -----------------------------------------------------------------------
2051
2052 #[test]
2053 fn release_all_for_locker_returns_count() {
2054 let lm = LockManager::new();
2055 // Locker 7 takes 5 locks, locker 8 takes 2.
2056 for lsn in [10u64, 20, 30, 40, 50] {
2057 lm.lock(lsn, 7, LockType::Read, false, false).unwrap();
2058 }
2059 for lsn in [100u64, 200] {
2060 lm.lock(lsn, 8, LockType::Write, false, false).unwrap();
2061 }
2062 assert_eq!(lm.n_total_locks(), 7);
2063
2064 let released = lm.release_all_for_locker(7);
2065 assert_eq!(released, 5);
2066 // Only locker 8's 2 locks remain.
2067 assert_eq!(lm.n_total_locks(), 2);
2068
2069 let released2 = lm.release_all_for_locker(8);
2070 assert_eq!(released2, 2);
2071 assert_eq!(lm.n_total_locks(), 0);
2072 }
2073
2074 #[test]
2075 fn release_all_for_locker_unknown_id_is_zero() {
2076 let lm = LockManager::new();
2077 lm.lock(1, 1, LockType::Read, false, false).unwrap();
2078 let released = lm.release_all_for_locker(999);
2079 assert_eq!(released, 0);
2080 assert_eq!(lm.n_total_locks(), 1);
2081 lm.release(1, 1).unwrap();
2082 }
2083
2084 #[test]
2085 fn release_all_for_locker_idempotent() {
2086 // Calling twice is safe — second call reaps zero entries.
2087 let lm = LockManager::new();
2088 lm.lock(1, 1, LockType::Read, false, false).unwrap();
2089 lm.lock(2, 1, LockType::Write, false, false).unwrap();
2090 assert_eq!(lm.release_all_for_locker(1), 2);
2091 assert_eq!(lm.release_all_for_locker(1), 0);
2092 }
2093
2094 #[test]
2095 fn release_all_for_locker_preserves_other_owners() {
2096 // Multiple lockers sharing a read lock at the same LSN: releasing
2097 // one locker leaves the others' entry intact.
2098 let lm = LockManager::new();
2099 lm.lock(1, 1, LockType::Read, false, false).unwrap();
2100 lm.lock(1, 2, LockType::Read, false, false).unwrap();
2101 lm.lock(1, 3, LockType::Read, false, false).unwrap();
2102
2103 let released = lm.release_all_for_locker(2);
2104 assert_eq!(released, 1);
2105 // Lock entry persists because lockers 1 and 3 still own it.
2106 assert_eq!(lm.n_total_locks(), 1);
2107
2108 // Verify locker 2 no longer has it.
2109 let released_again = lm.release_all_for_locker(2);
2110 assert_eq!(released_again, 0);
2111
2112 lm.release(1, 1).unwrap();
2113 lm.release(1, 3).unwrap();
2114 assert_eq!(lm.n_total_locks(), 0);
2115 }
2116
2117 #[test]
2118 fn release_all_for_locker_clears_lock_when_last_owner_leaves() {
2119 let lm = LockManager::new();
2120 lm.lock(42, 1, LockType::Write, false, false).unwrap();
2121 assert_eq!(lm.n_total_locks(), 1);
2122 lm.release_all_for_locker(1);
2123 // Lock entry was the last owner of LSN 42 — entry removed.
2124 assert_eq!(lm.n_total_locks(), 0);
2125 }
2126
2127 /// H-2 regression: verify that no internal deadlock occurs when the lock
2128 /// manager processes concurrent waiter registrations and deadlock-victim
2129 /// cleanups. Before this fix, different code paths acquired shard and
2130 /// waiter_graph mutexes in inconsistent order, creating a potential
2131 /// process hang under extreme contention.
2132 ///
2133 /// The test spawns two threads:
2134 /// Thread A: holds a write lock on LSN 1, then waits on LSN 2.
2135 /// Thread B: holds a write lock on LSN 2, then waits on LSN 1.
2136 /// This is a classic 2-txn deadlock cycle. The lock manager must detect
2137 /// it (aborting one victim) and complete without hanging. The 2-second
2138 /// timeout is the safety net.
2139 #[test]
2140 fn test_lock_ordering_no_internal_deadlock() {
2141 use std::sync::{Arc, Barrier};
2142 use std::thread;
2143 use std::time::Duration;
2144
2145 let lm = Arc::new(LockManager::new());
2146 const LSN_A: u64 = 0xDEAD_0001;
2147 const LSN_B: u64 = 0xDEAD_0002;
2148 const LOCKER_A: i64 = 1001;
2149 const LOCKER_B: i64 = 1002;
2150
2151 // Both threads acquire their first lock before trying for the second.
2152 let barrier = Arc::new(Barrier::new(2));
2153
2154 let lm_a = Arc::clone(&lm);
2155 let barrier_a = Arc::clone(&barrier);
2156 let t_a = thread::spawn(move || {
2157 // Locker A grabs LSN_A, then tries to grab LSN_B (held by B).
2158 lm_a.lock(LSN_A, LOCKER_A, LockType::Write, false, false).unwrap();
2159 barrier_a.wait(); // both sides have their first lock
2160 lm_a.lock(LSN_B, LOCKER_A, LockType::Write, false, false)
2161 });
2162
2163 let lm_b = Arc::clone(&lm);
2164 let barrier_b = Arc::clone(&barrier);
2165 let t_b = thread::spawn(move || {
2166 // Locker B grabs LSN_B, then tries to grab LSN_A (held by A).
2167 lm_b.lock(LSN_B, LOCKER_B, LockType::Write, false, false).unwrap();
2168 barrier_b.wait(); // both sides have their first lock
2169 lm_b.lock(LSN_A, LOCKER_B, LockType::Write, false, false)
2170 });
2171
2172 // One thread must deadlock; the other must complete. Neither should hang.
2173 let res_a = t_a.join();
2174 let res_b = t_b.join();
2175
2176 // Exactly one of the two must be a deadlock error.
2177 let both = [res_a, res_b];
2178 let n_deadlocks = both
2179 .iter()
2180 .filter(|r| matches!(r, Ok(Err(TxnError::Deadlock(_)))))
2181 .count();
2182 let n_success = both.iter().filter(|r| matches!(r, Ok(Ok(_)))).count();
2183 // Allow for timeout as well (one deadlock or one timeout + one success)
2184 assert!(
2185 (n_deadlocks == 1 && n_success <= 1) || n_deadlocks == 2,
2186 "expected at least one deadlock error, got: n_deadlocks={n_deadlocks} n_success={n_success}"
2187 );
2188 let _ = Duration::from_secs(0); // suppress unused import warning
2189 }
2190
2191 /// H-4 regression: when select_victim has populated lock_counts, the
2192 /// transaction holding the fewest locks is chosen, regardless of which
2193 /// is youngest.
2194 ///
2195 /// Construct a 2-locker cycle where the *older* (lower-id) locker holds
2196 /// many additional locks and the *younger* (higher-id) locker holds
2197 /// only the cycle lock plus a couple more, then verify the younger
2198 /// locker is selected. (With the previous bug, lock_counts was always
2199 /// empty so select_victim fell through to the youngest-tiebreaker; the
2200 /// younger would be chosen *for the wrong reason*. This test pins the
2201 /// counts so the primary criterion drives the choice.)
2202 #[test]
2203 fn test_h4_victim_selection_uses_lock_counts() {
2204 let lm = Arc::new(LockManager::new());
2205 // L_OLD is held by locker 1 (older, holds 5 unrelated locks).
2206 const L_OLD: u64 = 0x6001;
2207 // L_NEW is held by locker 2 (younger, holds 0 unrelated locks).
2208 const L_NEW: u64 = 0x6002;
2209
2210 // Locker 1 owns 5 unrelated locks then takes L_OLD.
2211 for i in 0..5 {
2212 lm.lock(0x7000 + i, 1, LockType::Write, false, false).unwrap();
2213 }
2214 lm.lock(L_OLD, 1, LockType::Write, false, false).unwrap();
2215
2216 // Locker 2 owns 0 unrelated locks, then takes L_NEW.
2217 lm.lock(L_NEW, 2, LockType::Write, false, false).unwrap();
2218
2219 // Compute counts on the cycle [1, 2].
2220 let counts = lm.compute_lock_counts(&[1, 2]);
2221 assert_eq!(
2222 counts.get(&1).copied().unwrap_or(0),
2223 6,
2224 "locker 1 holds 6 locks"
2225 );
2226 assert_eq!(
2227 counts.get(&2).copied().unwrap_or(0),
2228 1,
2229 "locker 2 holds 1 lock"
2230 );
2231
2232 // select_victim with these counts must pick locker 2 (fewest locks).
2233 let victim = DeadlockDetector::select_victim(&[1, 2], &counts);
2234 assert_eq!(victim, 2, "victim must be locker 2 (fewest locks held)");
2235 }
2236
2237 /// TXN-1 regression test: `lock_with_sharing_and_timeout` must detect a
2238 /// deadlock formed WHILE waiting (not only after a 50 ms slice).
2239 ///
2240 /// Setup: two lockers on the sharing path (HandleLocker-like) each hold
2241 /// a lock the other wants. The deadlock must be detected and returned as
2242 /// `DeadlockException` well within the test timeout, NOT after 50 ms.
2243 ///
2244 /// This is a structural test rather than a timing test: we form a clear
2245 /// two-node cycle via `lock_with_sharing_and_timeout` and verify the error
2246 /// is a deadlock (not a timeout). Prior to the fix, the check only fired
2247 /// on `timed_out.timed_out()` with stale owner IDs, so a deadlock on the
2248 /// sharing path could wait a full 50 ms slice before being detected.
2249 #[test]
2250 fn test_txn1_sharing_path_deadlock_detected_promptly() {
2251 use std::sync::Barrier;
2252 use std::thread;
2253
2254 let lm = Arc::new(LockManager::new());
2255 const L1: u64 = 0xA001;
2256 const L2: u64 = 0xA002;
2257 const LOCKER_A: i64 = 101;
2258 const LOCKER_B: i64 = 102;
2259 const TIMEOUT_MS: u64 = 5_000;
2260
2261 // A holds L1; B holds L2.
2262 lm.lock(L1, LOCKER_A, LockType::Write, false, false).unwrap();
2263 lm.lock(L2, LOCKER_B, LockType::Write, false, false).unwrap();
2264
2265 let barrier = Arc::new(Barrier::new(2));
2266 let lm_b = Arc::clone(&lm);
2267 let barrier_b = Arc::clone(&barrier);
2268
2269 // Thread B tries to acquire L1 (blocked by A).
2270 let handle = thread::spawn(move || {
2271 barrier_b.wait();
2272 lm_b.lock_with_sharing_and_timeout(
2273 L1,
2274 LOCKER_B,
2275 LockType::Write,
2276 false,
2277 false,
2278 TIMEOUT_MS,
2279 )
2280 });
2281
2282 // Main thread: wait until B is queued, then try L2 (blocked by B).
2283 barrier.wait();
2284 // Small yield to let B enter the wait loop before we enqueue ourselves.
2285 std::thread::sleep(Duration::from_millis(5));
2286 let result_a = lm.lock_with_sharing_and_timeout(
2287 L2,
2288 LOCKER_A,
2289 LockType::Write,
2290 false,
2291 false,
2292 TIMEOUT_MS,
2293 );
2294
2295 let result_b = handle.join().expect("thread B panicked");
2296
2297 // Exactly one of the two must get a DeadlockException; the other
2298 // should succeed or also get a deadlock. Both getting deadlock is fine.
2299 let a_dl = matches!(result_a, Err(TxnError::Deadlock(..)));
2300 let b_dl = matches!(result_b, Err(TxnError::Deadlock(..)));
2301 assert!(
2302 a_dl || b_dl,
2303 "TXN-1: expected at least one DeadlockException on the sharing path; \
2304 got A={result_a:?}, B={result_b:?}"
2305 );
2306 }
2307}