Skip to main content

noxu_txn/
lock_manager.rs

1//! Lock manager for Noxu DB.
2//!
3//!
4//! The LockManager is the central authority for all lock operations in the
5//! system. It manages N sharded lock tables, each protected by its own mutex,
6//! to allow concurrent lock operations on different LSNs.
7//!
8//! # Internal lock ordering (H-2, 2026 audit F-6.2)
9//!
10//! Two internal mutexes must never be held simultaneously, but when code
11//! paths need to update BOTH in sequence the canonical order is:
12//!
13//!   **shard mutex first, then waiter_graph mutex**.
14//!
15//! Concretely:
16//! - Lock the relevant `lock_tables[idx]` shard first.
17//! - Release the shard before (or immediately before) acquiring `waiter_graph`.
18//! - Never acquire a shard while holding `waiter_graph`.
19//!
20//! All victim-cleanup paths (flush_waiter + clear_wait) are structured to
21//! acquire the shard first, then call `clear_wait()` after the shard guard
22//! is dropped. This prevents a lock-ordering inversion that would otherwise
23//! create a potential process hang under extreme contention.
24
25use hashbrown::{HashMap, HashSet};
26use std::sync::{Arc, RwLock};
27use std::time::Duration;
28
29use noxu_sync::{Condvar, Mutex};
30
31use crate::lock_info::WaiterNotify;
32use crate::{
33    DeadlockDetector, Lock, LockGrantType, LockStats, LockType, TxnError,
34};
35use std::sync::atomic::{AtomicU64, Ordering};
36
37/// Number of lock table shards.
38///
39/// Multiple lock tables reduce contention by allowing concurrent operations
40/// on locks in different tables.  64 shards provide good distribution across
41/// multi-core systems under high concurrency (16+ threads).  The hash
42/// function spreads LSNs uniformly so collision probability is low.
43/// Default number of lock-table shards when not explicitly configured.
44/// Noxu deliberately defaults to 64 (a documented deviation from JE's default
45/// of 1) for higher write concurrency; the count is configurable via
46/// `LOCK_N_LOCK_TABLES` (`noxu.lock.nLockTables`).
47const DEFAULT_N_LOCK_TABLES: usize = 64;
48
49/// The LockManager manages all locks in the system.
50///
51/// Locks are sharded across N_LOCK_TABLES tables, each protected by its own
52/// mutex.  This allows concurrent lock operations on different LSNs.
53///
54/// # Architecture
55///
56/// - Each lock is identified by an LSN (packed u64)
57/// - Locks are hashed to one of N lock tables
58/// - Each table has its own mutex for fine-grained locking
59/// - Lock objects start as Thin locks and mutate to Full locks when needed
60///
61/// # Blocking / waiting
62///
63/// When `lock()` cannot grant immediately it:
64/// 1. Registers the calling thread as a waiter (inside the shard mutex) and
65///    attaches a per-waiter `Arc<(Mutex<bool>, Condvar)>` notify pair.
66/// 2. Checks for deadlocks using the `DeadlockDetector` before sleeping.
67/// 3. Releases the shard mutex and waits on the condvar for up to
68///    `lock_timeout_ms` milliseconds.
69/// 4. On wakeup re-acquires the shard mutex and checks ownership.
70/// 5. On timeout removes itself from the waiter list and returns
71///    `TxnError::LockTimeout`.
72///
73/// This mirrors the flow in `LockManager.lock()` / `waitForLock()`.
74///
75///
76pub struct LockManager {
77    /// Sharded lock tables, keyed by LSN.
78    lock_tables: Vec<Mutex<HashMap<u64, Lock>>>,
79    /// Number of shards (== lock_tables.len()); configurable via
80    /// `LOCK_N_LOCK_TABLES`. Cached so get_table_index avoids a Vec len read.
81    n_lock_tables: usize,
82
83    /// Statistics tracking.
84    stats: LockManagerStats,
85
86    /// Default lock-wait timeout in milliseconds.
87    ///
88    /// 0 means wait forever (`EnvironmentConfig.setLockTimeout(0)`).
89    /// Configured at open time from `EnvironmentConfig`; can be overridden
90    /// per-call via `lock_with_timeout()`.
91    ///
92    ///
93    lock_timeout_ms: AtomicU64,
94
95    /// Locker sharing registry: maps locker_id → share_group_id.
96    ///
97    /// ThreadLockers register their thread_id (as i64) as the group_id.
98    /// HandleLockers with a buddy register the buddy's ID as the group_id.
99    /// Two lockers are in the same sharing group iff they map to the same
100    /// group_id, and thus bypass lock-conflict detection (
101    /// `Locker.sharesLocksWith(other)`).
102    ///
103    /// (thread-locker map), extended
104    /// to support HandleLocker buddy sharing.
105    share_registry: RwLock<HashMap<i64, i64>>,
106
107    /// Incremental waits-for graph for O(1) deadlock detection.
108    ///
109    /// Maps waiting_locker_id → [owner_locker_ids it is blocked by].
110    /// Inserted O(1) when a locker enters the wait path; removed when it
111    /// exits (grant, timeout, or deadlock abort).
112    ///
113    /// `check_deadlock_for_waiter` reads from this small graph instead of
114    /// rescanning all N_LOCK_TABLES shards — eliminates the O(64) full scan
115    /// that stalls all threads under high contention.
116    waiter_graph: Mutex<HashMap<i64, Vec<i64>>>,
117
118    /// Diagnostic label registry: maps locker_id → static label such as
119    /// `"txn"`, `"auto-txn"`, or `"cleaner"`.
120    ///
121    /// Used by [`LockManager::format_locker`] to render a typed identifier
122    /// like `"auto-txn:42"` in deadlock and timeout error messages so a
123    /// deadlock involving a synthetic auto-commit txn and an explicit txn is
124    /// visibly distinguishable from one involving two explicit txns.
125    ///
126    /// Closes the second F12 residual (May 2026 audit follow-up).  Lockers
127    /// without a registered label are reported as `"locker:<id>"`.
128    locker_labels: RwLock<HashMap<i64, &'static str>>,
129
130    /// Set of locker IDs that are NOT preemptable (importunate lockers must
131    /// not have their locks stolen).  JE tracks this per-`Locker` via
132    /// `getPreemptable()`; the lock manager needs it to decide, during an
133    /// importunate steal, whether a remaining owner blocks the steal
134    /// (LockManager.java:556 — "Lock holder is non-preemptable, wait again").
135    /// A locker absent from this set is preemptable (the default).
136    non_preemptable: RwLock<HashSet<i64>>,
137}
138
139/// Internal statistics tracking.
140struct LockManagerStats {
141    /// Total number of lock requests.
142    lock_requests: AtomicU64,
143
144    /// Total number of lock waits (blocked requests).
145    lock_waits: AtomicU64,
146
147    /// Total number of lock acquisitions that timed out.
148    lock_timeouts: AtomicU64,
149}
150
151impl LockManager {
152    /// Creates a new LockManager with N_LOCK_TABLES shards and the default
153    /// lock timeout of 500 ms (matching default).
154    pub fn new() -> Self {
155        Self::with_lock_timeout(500)
156    }
157
158    /// Creates a new LockManager with a specific default lock timeout.
159    ///
160    /// `timeout_ms == 0` means wait forever (`setLockTimeout(0, MILLISECONDS)`).
161    ///
162    /// Call this from `EnvironmentImpl` after reading `EnvironmentConfig.lock_timeout_ms`.
163    pub fn with_lock_timeout(timeout_ms: u64) -> Self {
164        Self::with_config(timeout_ms, DEFAULT_N_LOCK_TABLES)
165    }
166
167    /// Creates a new LockManager with an explicit shard count (JE
168    /// `LOCK_N_LOCK_TABLES` / `noxu.lock.nLockTables`) and lock timeout.
169    /// Production (`EnvironmentImpl`) passes the configured value; `0` or
170    /// values are clamped to at least 1 shard.
171    pub fn with_config(timeout_ms: u64, n_lock_tables: usize) -> Self {
172        let n_lock_tables = n_lock_tables.max(1);
173        let mut lock_tables = Vec::with_capacity(n_lock_tables);
174        for _ in 0..n_lock_tables {
175            lock_tables.push(Mutex::new(HashMap::new()));
176        }
177
178        Self {
179            lock_tables,
180            n_lock_tables,
181            stats: LockManagerStats {
182                lock_requests: AtomicU64::new(0),
183                lock_waits: AtomicU64::new(0),
184                lock_timeouts: AtomicU64::new(0),
185            },
186            lock_timeout_ms: AtomicU64::new(timeout_ms),
187            share_registry: RwLock::new(HashMap::new()),
188            waiter_graph: Mutex::new(HashMap::new()),
189            locker_labels: RwLock::new(HashMap::new()),
190            non_preemptable: RwLock::new(HashSet::new()),
191        }
192    }
193
194    /// Registers a diagnostic label for `locker_id`.
195    ///
196    /// Stored in `Self::locker_labels` and looked up by
197    /// [`Self::format_locker`] when building deadlock / lock-timeout error
198    /// messages.  Typical labels are `"txn"` (explicit transaction),
199    /// `"auto-txn"` (synthetic auto-commit transaction created by
200    /// `TxnManager::begin_auto_txn`), and `"cleaner"` (cleaner-locker IDs).
201    ///
202    /// Re-registering the same `locker_id` overwrites the previous label.
203    /// Lockers without a registered label are reported as `"locker:<id>"`,
204    /// which preserves backward compatibility with callers that never
205    /// registered.
206    pub fn register_locker_label(&self, locker_id: i64, label: &'static str) {
207        self.locker_labels.write().unwrap().insert(locker_id, label);
208    }
209
210    /// Removes the diagnostic label for `locker_id`.
211    ///
212    /// Called when a transaction (explicit or synthetic auto-commit)
213    /// terminates so the registry does not grow without bound.  Idempotent —
214    /// removing an unknown id is a no-op.
215    pub fn unregister_locker_label(&self, locker_id: i64) {
216        self.locker_labels.write().unwrap().remove(&locker_id);
217    }
218
219    /// Returns a typed identifier string for `locker_id`.
220    ///
221    /// Looks up the label registered via [`Self::register_locker_label`] and
222    /// returns `"<label>:<id>"`; if no label is registered, returns
223    /// `"locker:<id>"`.
224    ///
225    /// Used to format the `requester` and `owner` fields of
226    /// [`TxnError::LockTimeout`] and the message body of
227    /// [`TxnError::Deadlock`] so a mixed auto-commit/explicit-txn deadlock
228    /// reports e.g. `"auto-txn:42"` and `"txn:17"` rather than two opaque
229    /// integers — closing the second F12 residual.
230    pub fn format_locker(&self, locker_id: i64) -> String {
231        match self.locker_labels.read().unwrap().get(&locker_id).copied() {
232            Some(label) => format!("{label}:{locker_id}"),
233            None => format!("locker:{locker_id}"),
234        }
235    }
236
237    /// Returns a comma-separated typed identifier list for `locker_ids`.
238    ///
239    /// Convenience wrapper used in deadlock error messages.
240    pub fn format_lockers(&self, locker_ids: &[i64]) -> String {
241        let mut out = String::new();
242        for (i, id) in locker_ids.iter().enumerate() {
243            if i > 0 {
244                out.push_str(", ");
245            }
246            out.push_str(&self.format_locker(*id));
247        }
248        out
249    }
250
251    /// Updates the default lock timeout.
252    ///
253    /// Thread-safe; takes effect for subsequent `lock()` calls.
254    ///
255    pub fn set_lock_timeout(&self, timeout_ms: u64) {
256        self.lock_timeout_ms.store(timeout_ms, Ordering::Relaxed);
257    }
258
259    /// Returns the current default lock timeout in milliseconds.
260    pub fn get_lock_timeout_ms(&self) -> u64 {
261        self.lock_timeout_ms.load(Ordering::Relaxed)
262    }
263
264    /// Acquires a lock on the given LSN for the given locker, blocking the
265    /// calling thread if necessary.
266    ///
267    /// # Arguments
268    ///
269    /// * `lsn` - The LSN to lock (packed u64)
270    /// * `locker_id` - The ID of the requesting locker
271    /// * `lock_type` - The type of lock requested
272    /// * `non_blocking` - If true, return `LockNotAvailable` instead of waiting
273    /// * `jump_ahead_of_waiters` - If true, skip ahead of existing waiters
274    /// * `lock_timeout_ms` - How long to wait; 0 = wait forever
275    ///
276    /// # Returns
277    ///
278    /// The `LockGrantType` on success:
279    /// - `New` / `Promotion` / `Existing` — lock held immediately
280    /// - `NoneNeeded` — `lock_type` was `None`
281    ///
282    /// # Errors
283    ///
284    /// - `TxnError::RangeRestart` if `lock_type` is `Restart`
285    /// - `TxnError::LockNotAvailable` if `non_blocking` and lock unavailable
286    /// - `TxnError::LockTimeout` if the timeout expired while waiting
287    /// - `TxnError::Deadlock` if a wait-for cycle is detected before waiting
288    ///
289    ///
290    #[inline]
291    pub fn lock(
292        &self,
293        lsn: u64,
294        locker_id: i64,
295        lock_type: LockType,
296        non_blocking: bool,
297        jump_ahead_of_waiters: bool,
298    ) -> Result<LockGrantType, TxnError> {
299        self.lock_with_timeout(
300            lsn,
301            locker_id,
302            lock_type,
303            non_blocking,
304            jump_ahead_of_waiters,
305            self.lock_timeout_ms.load(Ordering::Relaxed),
306        )
307    }
308
309    /// Like `lock()` but the caller supplies the timeout in milliseconds.
310    /// `timeout_ms == 0` means wait forever.
311    ///
312    ///
313    pub fn lock_with_timeout(
314        &self,
315        lsn: u64,
316        locker_id: i64,
317        lock_type: LockType,
318        non_blocking: bool,
319        jump_ahead_of_waiters: bool,
320        timeout_ms: u64,
321    ) -> Result<LockGrantType, TxnError> {
322        // No lock needed for dirty-read, return immediately.
323        if lock_type == LockType::None {
324            return Ok(LockGrantType::NoneNeeded);
325        }
326
327        // Special restart lock type throws immediately.
328        if lock_type == LockType::Restart {
329            return Err(TxnError::RangeRestart);
330        }
331
332        // Track statistics.
333        self.stats.lock_requests.fetch_add(1, Ordering::Relaxed);
334
335        let table_idx = self.get_table_index(lsn);
336
337        // Snapshot the sharing registry before entering the lock-table
338        // critical section so `LockImpl::try_lock_with_sharing` can consult
339        // `sharesLocksWith` on every acquisition.  JE `LockImpl.tryLock`
340        // checks `!locker.sharesLocksWith(ownerLocker) &&
341        // !ownerLocker.sharesLocksWith(locker)` (LockImpl.java:647-648) on
342        // EVERY lock request, letting siblings (two ThreadLockers on the
343        // same thread, or a HandleLocker + its buddy txn) co-own a lock
344        // without conflict.
345        let shares = self.build_shares_fn(locker_id);
346
347        // --- Phase 1: attempt to acquire the lock under the shard mutex. ---
348        //
349        // "Attempt to lock without any initial wait."
350        let (initial_grant, owner_ids, notify_pair) = {
351            let mut table = self.lock_tables[table_idx].lock();
352            let lock = table.entry(lsn).or_insert_with(Lock::new_thin);
353
354            let result = lock.lock_with_sharing(
355                lock_type,
356                locker_id,
357                non_blocking,
358                jump_ahead_of_waiters,
359                &shares,
360            );
361
362            if result.success {
363                // Granted immediately; no waiting needed.
364                return Ok(result.lock_grant);
365            }
366
367            if result.lock_grant == LockGrantType::Denied {
368                // Non-blocking request was denied.
369                return Err(TxnError::LockNotAvailable { lsn });
370            }
371
372            // We must wait.  Collect owner IDs for deadlock detection and
373            // attach a per-waiter notify pair to our waiter entry.
374            //
375            // "locker.setWaitingFor(lsn, type)" then deadlock detect then
376            //     "locker.wait(timeout)".
377            self.stats.lock_waits.fetch_add(1, Ordering::Relaxed);
378
379            let owner_ids = lock.get_owner_ids();
380
381            // Build the notify pair and attach it to our waiter entry so the
382            // releasing thread can wake us.
383            let pair: WaiterNotify =
384                Arc::new((Mutex::new(false), Condvar::new()));
385            lock.set_waiter_notify(locker_id, pair.clone());
386
387            (result.lock_grant, owner_ids, pair)
388        };
389        // Shard mutex is released here.
390
391        // Register in the incremental waits-for graph so deadlock detection
392        // can find this edge without rescanning all lock-table shards.
393        self.record_wait(locker_id, &owner_ids);
394
395        // --- Phase 2: deadlock detection before sleeping. ---
396        //
397        // Runs DeadlockChecker after setWaitingFor.  If the current
398        // locker is selected as the victim, throw DeadlockException.
399        //
400        // We build a lightweight waits-for snapshot from the current lock
401        // table state and check for a cycle.  If this locker is the victim
402        // OR if the cycle cannot be broken without aborting this locker
403        // (i.e. the victim is not reachable / not waiting), we abort.
404        //
405        // Note: a single-pass snapshot may be incomplete when both threads
406        // are entering the wait path simultaneously.  We therefore also
407        // perform a deadlock check after each spurious wakeup inside the
408        // wait loop (Phase 3).
409        if let Some(deadlock_err) = self
410            .check_deadlock_for_waiter(lsn, locker_id, lock_type, &owner_ids)
411        {
412            // We are the chosen victim.  Flush from waiter list and throw.
413            // H-2: use flush_and_clear_waiter to acquire shard before
414            // waiter_graph (canonical lock ordering).
415            //
416            // TXN-F4 (design point, intentionally NOT changed): JE
417            // `LockManager.waitForLock` proactively wakes the victim via
418            // `notifyVictim` so it exits its own `wait()` promptly; here the
419            // selected victim returns synchronously the moment its own check
420            // detects the cycle, so no cross-thread victim notify is needed.
421            // This assumes every waiter on a cycle reaches the same
422            // victim-consistency conclusion on its next check slice (≤50 ms),
423            // which holds because all checks read the same `waiter_graph`
424            // snapshot and use the same deterministic victim selection.
425            self.flush_and_clear_waiter(table_idx, lsn, locker_id);
426            return Err(deadlock_err);
427        }
428
429        // --- Phase 3: wait on the condvar. ---
430        //
431        // "locker.wait(timeRemaining)" in a loop, checking ownership on
432        //     each wakeup.  We also re-run deadlock detection on each
433        //     iteration so that cycles formed after we enter the wait path
434        //     are caught.
435        let start = std::time::Instant::now();
436        let (mutex, condvar) = &*notify_pair;
437        let mut granted_guard = mutex.lock();
438
439        loop {
440            if *granted_guard {
441                // We were woken by the releasing thread which set our flag and
442                // called notify_all.  Ownership was already transferred to us
443                // inside release() -> try_lock().
444                break;
445            }
446
447            // Compute remaining time.
448            let remaining_ms = if timeout_ms == 0 {
449                0 // 0 means wait forever
450            } else {
451                let elapsed = start.elapsed().as_millis() as u64;
452                if elapsed >= timeout_ms {
453                    // Already timed out before we even slept this iteration.
454                    drop(granted_guard);
455                    // H-2: shard before waiter_graph.
456                    self.flush_and_clear_waiter(table_idx, lsn, locker_id);
457                    self.stats.lock_timeouts.fetch_add(1, Ordering::Relaxed);
458                    return Err(TxnError::LockTimeout {
459                        timeout_ms,
460                        lsn,
461                        owner: format!(
462                            "[{}] on LSN {lsn}",
463                            self.format_lockers(&owner_ids)
464                        ),
465                        requested_type: lock_type,
466                        requester: self.format_locker(locker_id),
467                    });
468                }
469                timeout_ms - elapsed
470            };
471
472            // Use a short slice (up to 50 ms) so we can re-check for
473            // deadlocks that may form after we entered the wait path.
474            // uses a "deadlock detection delay" for the same purpose.
475            let slice_ms =
476                if remaining_ms == 0 { 50 } else { remaining_ms.min(50) };
477
478            let timed_out = condvar
479                .wait_for(&mut granted_guard, Duration::from_millis(slice_ms))
480                .timed_out();
481
482            if *granted_guard {
483                // Granted while we were sleeping.
484                break;
485            }
486
487            // Re-run deadlock detection after each wakeup / slice expiry.
488            // This catches deadlocks that formed after our initial check.
489            drop(granted_guard);
490            {
491                let cur_owner_ids = {
492                    let table = self.lock_tables[table_idx].lock();
493                    table
494                        .get(&lsn)
495                        .map(|l| l.get_owner_ids())
496                        .unwrap_or_default()
497                };
498                if let Some(deadlock_err) = self.check_deadlock_for_waiter(
499                    lsn,
500                    locker_id,
501                    lock_type,
502                    &cur_owner_ids,
503                ) {
504                    // H-2: shard before waiter_graph.
505                    self.flush_and_clear_waiter(table_idx, lsn, locker_id);
506                    return Err(deadlock_err);
507                }
508            }
509            granted_guard = mutex.lock();
510
511            if *granted_guard {
512                break;
513            }
514
515            if timed_out {
516                // Check if total time is exceeded.
517                if timeout_ms > 0
518                    && start.elapsed().as_millis() as u64 >= timeout_ms
519                {
520                    drop(granted_guard);
521                    // H-2: shard before waiter_graph.
522                    self.flush_and_clear_waiter(table_idx, lsn, locker_id);
523                    self.stats.lock_timeouts.fetch_add(1, Ordering::Relaxed);
524                    return Err(TxnError::LockTimeout {
525                        timeout_ms,
526                        lsn,
527                        owner: format!(
528                            "[{}] on LSN {lsn}",
529                            self.format_lockers(&owner_ids)
530                        ),
531                        requested_type: lock_type,
532                        requester: self.format_locker(locker_id),
533                    });
534                }
535            }
536
537            // Spurious wakeup or slice expired without timeout; loop.
538        }
539
540        drop(granted_guard);
541        self.clear_wait(locker_id);
542
543        // Determine which grant type to report.  On wakeup the lock type we
544        // actually hold is exactly what we requested (or a promotion of it).
545        // Reconstruct the grant type from context.
546        //
547        // WaitRestart: the waiter's lock_type was changed to Restart in
548        // lock_impl::lock(), so the lock was never added to the owner set.
549        // Returning RangeRestart tells the caller (lock_ln / put) to abort
550        // the current scan and restart — mirroring JE's RangeRestartException.
551        let grant = match initial_grant {
552            LockGrantType::WaitNew => LockGrantType::New,
553            LockGrantType::WaitPromotion => LockGrantType::Promotion,
554            LockGrantType::WaitRestart => {
555                return Err(TxnError::RangeRestart);
556            }
557            other => other,
558        };
559
560        Ok(grant)
561    }
562
563    /// Importunate lock acquisition for HA replay (`ReplayTxn`).
564    ///
565    /// JE `LockManager.waitForLock`: an importunate locker that would block
566    /// steals the lock from preemptable owners instead of waiting
567    /// (`if (isImportunate) { result = stealLock(...); if (result.success)
568    /// break; continue; }`, LockManager.java:552-557), and deadlock
569    /// detection is skipped for it (LockManager.java:472).
570    ///
571    /// Strategy: attempt non-blocking; if granted, done.  Otherwise steal
572    /// from preemptable owners and re-attempt (`stealLockInternal`,
573    /// LockManager.java:1599).  If the steal grants the lock, done.  If a
574    /// remaining owner is non-preemptable the steal fails, so fall back to a
575    /// normal blocking wait (mirroring JE's `continue` — wait for the
576    /// non-preemptable holder to release, then retry).
577    pub fn lock_importunate_with_timeout(
578        &self,
579        lsn: u64,
580        locker_id: i64,
581        lock_type: LockType,
582        non_blocking: bool,
583        timeout_ms: u64,
584    ) -> Result<LockGrantType, TxnError> {
585        if lock_type == LockType::None {
586            return Ok(LockGrantType::NoneNeeded);
587        }
588        if lock_type == LockType::Restart {
589            return Err(TxnError::RangeRestart);
590        }
591
592        // Try a normal non-blocking attempt first (jumpAheadOfWaiters=false,
593        // as JE Locker.lock always passes — Locker.java:503).
594        match self.lock(lsn, locker_id, lock_type, true, false) {
595            Ok(grant) => return Ok(grant),
596            Err(TxnError::LockNotAvailable { .. }) => {
597                // Blocked: steal from preemptable owners and re-attempt.
598                if self.steal_lock_in_wait(lsn, locker_id, lock_type) {
599                    return Ok(LockGrantType::New);
600                }
601                // A remaining owner is non-preemptable; fall back to a normal
602                // blocking wait (JE `continue`).
603            }
604            Err(e) => return Err(e),
605        }
606
607        if non_blocking {
608            return Err(TxnError::LockNotAvailable { lsn });
609        }
610        self.lock_with_timeout(
611            lsn, locker_id, lock_type, false, false, timeout_ms,
612        )
613    }
614
615    /// Releases a lock on the given LSN for the given locker.
616    ///
617    /// Promotes compatible waiters to owners, signals their condvars so they
618    /// wake up, and removes the lock entry when it becomes empty.
619    ///
620    ///
621    pub fn release(&self, lsn: u64, locker_id: i64) -> Result<(), TxnError> {
622        let table_idx = self.get_table_index(lsn);
623        // Snapshot this locker's share group so the RANGE_INSERT restart-wake
624        // check in LockImpl::release_with_sharing can honor sharesLocksWith
625        // (JE rangeInsertConflict) without holding the registry across the
626        // table borrow.
627        let shares_fn = self.build_shares_fn(locker_id);
628        let mut table = self.lock_tables[table_idx].lock();
629
630        if let Some(lock) = table.get_mut(&lsn) {
631            // release() moves eligible waiters to owners and signals each
632            // granted waiter's condvar inside LockImpl::release().
633            let _notify_ids = lock.release_with_sharing(locker_id, &shares_fn);
634
635            // If the lock has no owners and no waiters, remove it from the
636            // table to free memory.
637            if lock.n_owners() == 0 && lock.n_waiters() == 0 {
638                table.remove(&lsn);
639            }
640        }
641
642        Ok(())
643    }
644
645    /// Releases every lock currently held by `locker_id`, across all
646    /// shards. Returns the number of (lsn, lock) entries the locker
647    /// actually owned and released.
648    ///
649    /// Equivalent to a manual `for lsn in lockers_locks(id): release(lsn, id)`,
650    /// but does not require the caller to track which LSNs the locker
651    /// has touched. The cleaner uses this in three situations:
652    ///
653    ///   - **Reaping abandoned cleaner-locker IDs.** `migrate_ln_slot`
654    ///     allocates a fresh locker id per migration attempt
655    ///     (`next_cleaner_locker_id`), takes a non-blocking read
656    ///     lock, and releases. If `release()` fails for any reason
657    ///     the entry would otherwise leak, since the locker id is
658    ///     never reused. The cleaner can call this method when its
659    ///     run terminates to sweep up anything its short-lived
660    ///     locker ids left behind.
661    ///   - **Catastrophic per-locker abort.** When a deadlock-detector
662    ///     victim is too far along to drain its own per-txn write_locks
663    ///     map (e.g. it is in the middle of `commit_inner_after_read_drain`
664    ///     and the panic handler needs to clean up), this method
665    ///     guarantees the lock-manager view drops the locker even if
666    ///     the per-txn view is corrupt.
667    ///   - **Test cleanup.** Many integration tests hold a `LockManager`
668    ///     across multiple txns and need a quick "drop everything for
669    ///     locker N" without re-creating the manager.
670    ///
671    /// Errors from individual `Lock::release` calls are logged and
672    /// the sweep continues; the count returned is the number of
673    /// release attempts (each removing the locker from one Lock),
674    /// not the number that succeeded — losing one lock release leaks
675    /// one entry, but losing the whole sweep would defeat the
676    /// purpose.
677    pub fn release_all_for_locker(&self, locker_id: i64) -> usize {
678        let mut released = 0usize;
679        // Snapshot this locker's share group for the RANGE_INSERT restart-wake
680        // check (JE rangeInsertConflict / sharesLocksWith).
681        let shares_fn = self.build_shares_fn(locker_id);
682        for table in &self.lock_tables {
683            let mut table = table.lock();
684            // Collect target LSNs first to avoid mutating the map
685            // while iterating it.
686            let target_lsns: Vec<u64> = table
687                .iter()
688                .filter_map(|(lsn, lock)| {
689                    if lock.get_owned_lock_type(locker_id).is_some() {
690                        Some(*lsn)
691                    } else {
692                        None
693                    }
694                })
695                .collect();
696            for lsn in target_lsns {
697                if let Some(lock) = table.get_mut(&lsn) {
698                    let _notify_ids =
699                        lock.release_with_sharing(locker_id, &shares_fn);
700                    released += 1;
701                    if lock.n_owners() == 0 && lock.n_waiters() == 0 {
702                        table.remove(&lsn);
703                    }
704                }
705            }
706        }
707        released
708    }
709
710    /// Downgrades a write lock to a read lock.
711    ///
712    ///
713    pub fn demote(&self, lsn: u64, locker_id: i64) -> Result<(), TxnError> {
714        let table_idx = self.get_table_index(lsn);
715        let mut table = self.lock_tables[table_idx].lock();
716
717        if let Some(lock) = table.get_mut(&lsn) {
718            lock.demote(locker_id);
719        }
720
721        Ok(())
722    }
723
724    /// Steals a lock for the given locker.
725    ///
726    /// Used by the HA replayer to forcibly acquire locks, removing all other
727    /// preemptable owners.
728    ///
729    ///
730    pub fn steal_lock(&self, lsn: u64, locker_id: i64) -> Result<(), TxnError> {
731        let table_idx = self.get_table_index(lsn);
732        let mut table = self.lock_tables[table_idx].lock();
733
734        let lock = table.entry(lsn).or_insert_with(Lock::new_thin);
735        let _preempted = lock.steal_lock(locker_id);
736
737        Ok(())
738    }
739
740    /// Importunate lock steal performed inside the wait loop, mirroring JE
741    /// `LockManager.stealLockInternal` (LockManager.java:1599): flush this
742    /// locker's waiter entry, remove all preemptable owners, then re-attempt
743    /// the lock.  Non-preemptable owners (other importunate lockers) are left
744    /// in place, so the re-attempt may still fail — returning `false` tells
745    /// the wait loop to keep waiting (LockManager.java:556 `continue`).
746    ///
747    /// Returns `true` iff the lock is now held by `locker_id`.
748    fn steal_lock_in_wait(
749        &self,
750        lsn: u64,
751        locker_id: i64,
752        lock_type: LockType,
753    ) -> bool {
754        // Only preempt owners that are not marked non-preemptable.
755        let non_preemptable: Option<HashSet<i64>> = {
756            let np = self.non_preemptable.read().unwrap();
757            if np.is_empty() { None } else { Some(np.clone()) }
758        };
759        let preemptable_fn = move |owner_id: i64| -> bool {
760            match &non_preemptable {
761                Some(np) => !np.contains(&owner_id),
762                None => true,
763            }
764        };
765        let shares = self.build_shares_fn(locker_id);
766
767        let table_idx = self.get_table_index(lsn);
768        let mut table = self.lock_tables[table_idx].lock();
769        let lock = table.entry(lsn).or_insert_with(Lock::new_thin);
770
771        // flushWaiter: our waiter entry may still be present.
772        lock.flush_waiter(locker_id);
773        // stealLock: remove all preemptable owners.
774        let _preempted =
775            lock.steal_lock_preemptable(locker_id, &preemptable_fn);
776        // Re-attempt as a non-blocking, jump-ahead request.
777        let result = lock.lock_with_sharing(
778            lock_type, locker_id, true,  // non_blocking
779            false, // jump_ahead_of_waiters
780            &shares,
781        );
782        result.success
783    }
784
785    /// Returns true if the given locker owns a write lock on the LSN.
786    ///
787    ///
788    pub fn is_owned_write_lock(&self, lsn: u64, locker_id: i64) -> bool {
789        let table_idx = self.get_table_index(lsn);
790        let table = self.lock_tables[table_idx].lock();
791
792        if let Some(lock) = table.get(&lsn) {
793            lock.is_owned_write_lock(locker_id)
794        } else {
795            false
796        }
797    }
798
799    /// Returns the lock type owned by the locker, or None.
800    ///
801    ///
802    pub fn get_owned_lock_type(
803        &self,
804        lsn: u64,
805        locker_id: i64,
806    ) -> Option<LockType> {
807        let table_idx = self.get_table_index(lsn);
808        let table = self.lock_tables[table_idx].lock();
809
810        if let Some(lock) = table.get(&lsn) {
811            lock.get_owned_lock_type(locker_id)
812        } else {
813            None
814        }
815    }
816
817    /// Returns the owner count and waiter count for a given LSN.
818    pub fn get_lock_info(&self, lsn: u64) -> (usize, usize) {
819        let table_idx = self.get_table_index(lsn);
820        let table = self.lock_tables[table_idx].lock();
821
822        if let Some(lock) = table.get(&lsn) {
823            (lock.n_owners(), lock.n_waiters())
824        } else {
825            (0, 0)
826        }
827    }
828
829    /// Returns current lock statistics.
830    ///
831    ///
832    pub fn get_stats(&self) -> LockStats {
833        // Single pass over all lock tables to compute live counts. n_waiters
834        // and n_owners were previously hardcoded to 0 / lock-count; report the
835        // real aggregate so callers (and tests) can observe contention.
836        let mut n_total_locks: u64 = 0;
837        let mut n_owners: u64 = 0;
838        let mut n_waiters: u64 = 0;
839        for table in &self.lock_tables {
840            let table = table.lock();
841            for lock in table.values() {
842                n_total_locks += 1;
843                n_owners += lock.n_owners() as u64;
844                n_waiters += lock.n_waiters() as u64;
845            }
846        }
847        LockStats {
848            lock_requests: self.stats.lock_requests.load(Ordering::Relaxed),
849            lock_waits: self.stats.lock_waits.load(Ordering::Relaxed),
850            n_owners,
851            n_waiters,
852            n_total_locks,
853            n_read_locks: 0,
854            n_write_locks: 0,
855            n_lock_timeouts: self.stats.lock_timeouts.load(Ordering::Relaxed),
856        }
857    }
858
859    /// Returns the number of lock entries across all tables.
860    pub fn n_total_locks(&self) -> usize {
861        let mut total = 0;
862        for table in &self.lock_tables {
863            total += table.lock().len();
864        }
865        total
866    }
867
868    // ========================================================================
869    // Lock-sharing registry — `LockManager.threadLockers` analogue
870    // ========================================================================
871
872    /// Registers a locker in the sharing registry with the given group ID.
873    ///
874    /// All lockers sharing the same `group_id` bypass conflict detection with
875    /// each other (`Locker.sharesLocksWith(other)`).
876    ///
877    /// Called by `ThreadLocker::new()` (group = thread_id) and by
878    /// `HandleLocker::with_buddy()` (group = buddy_locker_id).
879    ///
880    ///
881    pub fn register_locker_sharing(&self, locker_id: i64, group_id: i64) {
882        self.share_registry.write().unwrap().insert(locker_id, group_id);
883    }
884
885    /// Removes a locker from the sharing registry.
886    ///
887    /// Called by `ThreadLocker::drop()` and `HandleLocker::drop()`.
888    ///
889    ///
890    pub fn unregister_locker_sharing(&self, locker_id: i64) {
891        self.share_registry.write().unwrap().remove(&locker_id);
892    }
893
894    /// Marks `locker_id` as non-preemptable (its locks cannot be stolen).
895    ///
896    /// Called for importunate lockers (HA `ReplayTxn`).  JE tracks this per
897    /// `Locker` via `getPreemptable()`; the lock manager mirrors it so the
898    /// importunate steal in the wait loop can honor a non-preemptable owner
899    /// (LockManager.java:556).
900    pub fn register_non_preemptable(&self, locker_id: i64) {
901        self.non_preemptable.write().unwrap().insert(locker_id);
902    }
903
904    /// Clears the non-preemptable mark for `locker_id`.
905    pub fn unregister_non_preemptable(&self, locker_id: i64) {
906        self.non_preemptable.write().unwrap().remove(&locker_id);
907    }
908
909    /// Returns true if `a` and `b` are in the same lock-sharing group.
910    ///
911    /// Used by `ThreadLocker::shares_locks_with()` and
912    /// `HandleLocker::shares_locks_with()`.
913    pub fn same_share_group(&self, a: i64, b: i64) -> bool {
914        let registry = self.share_registry.read().unwrap();
915        match (registry.get(&a), registry.get(&b)) {
916            (Some(ga), Some(gb)) => ga == gb,
917            _ => false,
918        }
919    }
920
921    /// Builds the `sharesLocksWith` predicate for `locker_id`, used by every
922    /// acquisition (`lock_with_timeout`) and release (`release`) so that
923    /// `LockImpl` can skip conflict detection between cooperating lockers.
924    /// `shares(owner_id)` returns true iff `owner_id` is in `locker_id`'s
925    /// sharing group.  JE: `LockImpl.tryLock` / `rangeInsertConflict` consult
926    /// `sharesLocksWith` (LockImpl.java:647-648, 719).
927    ///
928    /// The registry HashMap is cloned only when `locker_id` actually belongs
929    /// to a sharing group; the common path (BasicLockers, most internal ops)
930    /// shares with no one and returns a closure that allocates nothing.
931    fn build_shares_fn(&self, locker_id: i64) -> impl Fn(i64) -> bool + use<> {
932        let requester_group: Option<i64> =
933            self.share_registry.read().unwrap().get(&locker_id).copied();
934        let registry_snapshot: Option<hashbrown::HashMap<i64, i64>> =
935            if requester_group.is_some() {
936                Some(self.share_registry.read().unwrap().clone())
937            } else {
938                None
939            };
940        move |owner_id: i64| -> bool {
941            match (requester_group, &registry_snapshot) {
942                (Some(req_group), Some(reg)) => {
943                    reg.get(&owner_id).copied() == Some(req_group)
944                }
945                _ => false,
946            }
947        }
948    }
949
950    /// Deprecated alias for `lock_with_timeout()`.
951    ///
952    /// TXN-F2 fix: the plain `lock()` / `lock_with_timeout()` path now
953    /// consults the sharing registry on every acquisition (JE
954    /// `LockImpl.tryLock` checks `sharesLocksWith`, LockImpl.java:647-648),
955    /// so a separate sharing entry point is redundant.  Kept as a thin
956    /// forwarder so existing callers keep compiling; new code should call
957    /// `lock` / `lock_with_timeout` directly.
958    pub fn lock_with_sharing(
959        &self,
960        lsn: u64,
961        locker_id: i64,
962        lock_type: LockType,
963        non_blocking: bool,
964        jump_ahead_of_waiters: bool,
965    ) -> Result<LockGrantType, TxnError> {
966        self.lock(
967            lsn,
968            locker_id,
969            lock_type,
970            non_blocking,
971            jump_ahead_of_waiters,
972        )
973    }
974
975    /// Deprecated alias for `lock_with_timeout()`; see `lock_with_sharing`.
976    pub fn lock_with_sharing_and_timeout(
977        &self,
978        lsn: u64,
979        locker_id: i64,
980        lock_type: LockType,
981        non_blocking: bool,
982        jump_ahead_of_waiters: bool,
983        timeout_ms: u64,
984    ) -> Result<LockGrantType, TxnError> {
985        self.lock_with_timeout(
986            lsn,
987            locker_id,
988            lock_type,
989            non_blocking,
990            jump_ahead_of_waiters,
991            timeout_ms,
992        )
993    }
994
995    // ========================================================================
996
997    /// Returns the lock table index for a given LSN.
998    ///
999    ///
1000    ///
1001    #[inline]
1002    /// Returns the configured number of lock-table shards.
1003    pub fn n_lock_tables(&self) -> usize {
1004        self.n_lock_tables
1005    }
1006
1007    fn get_table_index(&self, lsn: u64) -> usize {
1008        ((lsn as usize) & 0x7fff_ffff) % self.n_lock_tables
1009    }
1010
1011    /// Records that `locker_id` is now waiting on `owner_ids` in the
1012    /// incremental waits-for graph.  Called right after Phase 1 in both wait
1013    /// paths, before the first deadlock check.
1014    fn record_wait(&self, locker_id: i64, owner_ids: &[i64]) {
1015        let mut graph = self.waiter_graph.lock();
1016        graph.insert(locker_id, owner_ids.to_vec());
1017    }
1018
1019    /// Removes `locker_id` from the incremental waits-for graph.  Called at
1020    /// every exit point after `record_wait`: grant, timeout, and deadlock abort.
1021    fn clear_wait(&self, locker_id: i64) {
1022        let mut graph = self.waiter_graph.lock();
1023        graph.remove(&locker_id);
1024    }
1025
1026    /// Removes `locker_id` from the on-shard waiter list and from the
1027    /// incremental waiter graph, in canonical lock order (shard first).
1028    ///
1029    /// H-2 (2026 audit F-6.2): all victim-cleanup paths must
1030    /// acquire the shard mutex BEFORE (or without) the waiter_graph mutex.
1031    /// This helper enforces the ordering: it locks the shard, flushes the
1032    /// waiter entry, drops the shard guard, then calls `clear_wait()` to
1033    /// remove from the waiter_graph.  Never call `clear_wait()` before this.
1034    fn flush_and_clear_waiter(
1035        &self,
1036        table_idx: usize,
1037        lsn: u64,
1038        locker_id: i64,
1039    ) {
1040        // Shard first (canonical ordering).
1041        {
1042            let mut table = self.lock_tables[table_idx].lock();
1043            if let Some(lock) = table.get_mut(&lsn) {
1044                lock.flush_waiter(locker_id);
1045                if lock.n_owners() == 0 && lock.n_waiters() == 0 {
1046                    table.remove(&lsn);
1047                }
1048            }
1049        }
1050        // Waiter_graph after shard is released.
1051        self.clear_wait(locker_id);
1052    }
1053
1054    /// aborted as the victim.
1055    ///
1056    /// Returns `Some(TxnError::Deadlock)` if the cycle is detected and this
1057    /// locker is the chosen victim, `None` otherwise.
1058    ///
1059    /// Reads the incremental `waiter_graph` snapshot — O(n_active_waiters),
1060    /// no shard re-acquisition.  Victim selection uses "youngest locker"
1061    /// heuristic (highest locker_id, i.e. most recently started transaction)
1062    /// since we avoid the O(N_LOCK_TABLES) scan needed for exact lock counts.
1063    fn check_deadlock_for_waiter(
1064        &self,
1065        lsn: u64,
1066        locker_id: i64,
1067        lock_type: LockType,
1068        owner_ids: &[i64],
1069    ) -> Option<TxnError> {
1070        // Build the waits-for snapshot from the incremental graph.  Also
1071        // ensure the current requester's edge is present (record_wait may
1072        // not have been called yet on the very first check).
1073        let waits_for: HashMap<i64, HashSet<i64>> = {
1074            let graph = self.waiter_graph.lock();
1075            let mut wf: HashMap<i64, HashSet<i64>> = graph
1076                .iter()
1077                .map(|(&wid, owners)| (wid, owners.iter().copied().collect()))
1078                .collect();
1079            wf.entry(locker_id)
1080                .or_insert_with(|| owner_ids.iter().copied().collect());
1081            wf
1082        };
1083
1084        let cycle = DeadlockDetector::detect(locker_id, owner_ids, &waits_for)?;
1085        // Compute per-locker lock counts for the cycle so select_victim can
1086        // apply its primary criterion (fewest locks held) instead of falling
1087        // through to the youngest-locker tiebreaker.  This walks every shard,
1088        // but it only runs when a deadlock cycle has been detected (a rare
1089        // event), so the scan cost is amortized over the rare deadlock
1090        // event and is not on the common no-cycle path.
1091        let lock_counts = self.compute_lock_counts(&cycle);
1092        let victim = DeadlockDetector::select_victim(&cycle, &lock_counts);
1093
1094        if victim == locker_id {
1095            // Format the cycle as typed locker IDs (e.g.
1096            // `"auto-txn:42 -> txn:17"`) so a mixed auto-commit/explicit-txn
1097            // deadlock is visibly distinguishable in the error message.
1098            // Closes the second F12 residual.
1099            let cycle_fmt = self.format_lockers(&cycle);
1100            let victim_fmt = self.format_locker(locker_id);
1101            Some(TxnError::Deadlock(format!(
1102                "deadlock cycle detected ({cycle_fmt}); {victim_fmt} chosen \
1103                 as victim while waiting for LSN {lsn} ({lock_type:?})"
1104            )))
1105        } else {
1106            None
1107        }
1108    }
1109
1110    /// Tallies, for every locker_id in `cycle`, the number of locks they
1111    /// currently hold across all shards.
1112    ///
1113    /// Used by deadlock victim selection so the primary criterion (fewest
1114    /// locks held = least work to roll back) can be applied.  Walks every
1115    /// shard but is only called after a deadlock cycle has been detected,
1116    /// so the scan cost is paid only on the rare cycle path, never on the
1117    /// common no-cycle path.
1118    fn compute_lock_counts(&self, cycle: &[i64]) -> HashMap<i64, usize> {
1119        use std::collections::HashSet;
1120        let cycle_set: HashSet<i64> = cycle.iter().copied().collect();
1121        let mut counts: HashMap<i64, usize> =
1122            cycle.iter().copied().map(|id| (id, 0usize)).collect();
1123        for shard in &self.lock_tables {
1124            let table = shard.lock();
1125            for lock in table.values() {
1126                for owner_id in lock.get_owner_ids() {
1127                    if cycle_set.contains(&owner_id) {
1128                        *counts.entry(owner_id).or_insert(0) += 1;
1129                    }
1130                }
1131            }
1132        }
1133        counts
1134    }
1135}
1136
1137impl Default for LockManager {
1138    fn default() -> Self {
1139        Self::new()
1140    }
1141}
1142
1143#[cfg(test)]
1144mod tests {
1145    use super::*;
1146    use std::sync::Arc;
1147    use std::thread;
1148    use std::time::Duration;
1149
1150    // -----------------------------------------------------------------------
1151    // Original single-threaded tests (preserved)
1152    // -----------------------------------------------------------------------
1153
1154    #[test]
1155    fn test_new_lock_manager() {
1156        let lm = LockManager::new();
1157        assert_eq!(lm.n_total_locks(), 0);
1158
1159        let stats = lm.get_stats();
1160        assert_eq!(stats.lock_requests, 0);
1161        assert_eq!(stats.lock_waits, 0);
1162    }
1163
1164    #[test]
1165    fn test_lock_type_none() {
1166        let lm = LockManager::new();
1167        let result = lm.lock(1000, 1, LockType::None, false, false);
1168        assert!(result.is_ok());
1169        assert_eq!(result.unwrap(), LockGrantType::NoneNeeded);
1170
1171        let stats = lm.get_stats();
1172        assert_eq!(stats.lock_requests, 0);
1173    }
1174
1175    #[test]
1176    fn test_lock_type_restart() {
1177        let lm = LockManager::new();
1178        let result = lm.lock(1000, 1, LockType::Restart, false, false);
1179        assert!(result.is_err());
1180        assert!(matches!(result.unwrap_err(), TxnError::RangeRestart));
1181    }
1182
1183    #[test]
1184    fn test_basic_lock_release() {
1185        let lm = LockManager::new();
1186
1187        let result = lm.lock(1000, 1, LockType::Read, false, false);
1188        assert!(result.is_ok());
1189        assert_eq!(result.unwrap(), LockGrantType::New);
1190        assert_eq!(lm.n_total_locks(), 1);
1191
1192        let result = lm.release(1000, 1);
1193        assert!(result.is_ok());
1194        assert_eq!(lm.n_total_locks(), 0);
1195    }
1196
1197    #[test]
1198    fn test_multiple_readers() {
1199        let lm = LockManager::new();
1200
1201        let result = lm.lock(1000, 1, LockType::Read, false, false);
1202        assert!(result.is_ok());
1203        assert_eq!(result.unwrap(), LockGrantType::New);
1204
1205        let result = lm.lock(1000, 2, LockType::Read, false, false);
1206        assert!(result.is_ok());
1207        assert_eq!(result.unwrap(), LockGrantType::New);
1208
1209        assert_eq!(lm.n_total_locks(), 1);
1210        let (owners, waiters) = lm.get_lock_info(1000);
1211        assert_eq!(owners, 2);
1212        assert_eq!(waiters, 0);
1213    }
1214
1215    #[test]
1216    fn test_non_blocking_denied() {
1217        let lm = LockManager::new();
1218
1219        let result = lm.lock(1000, 1, LockType::Write, false, false);
1220        assert!(result.is_ok());
1221        assert_eq!(result.unwrap(), LockGrantType::New);
1222
1223        let result = lm.lock(1000, 2, LockType::Write, true, false);
1224        assert!(result.is_err());
1225        assert!(matches!(
1226            result.unwrap_err(),
1227            TxnError::LockNotAvailable { .. }
1228        ));
1229    }
1230
1231    #[test]
1232    fn test_table_sharding() {
1233        let lm = LockManager::new();
1234
1235        for i in 0..100u64 {
1236            let result =
1237                lm.lock(i * 1000, i as i64, LockType::Write, false, false);
1238            assert!(result.is_ok());
1239        }
1240
1241        assert_eq!(lm.n_total_locks(), 100);
1242
1243        let idx1 = lm.get_table_index(1000);
1244        let idx2 = lm.get_table_index(2000);
1245        assert!(idx1 < lm.n_lock_tables);
1246        assert!(idx2 < lm.n_lock_tables);
1247    }
1248
1249    #[test]
1250    fn test_lock_cleanup() {
1251        let lm = LockManager::new();
1252
1253        for i in 0..100 {
1254            let _ = lm.lock(i, 1, LockType::Write, false, false);
1255        }
1256        assert_eq!(lm.n_total_locks(), 100);
1257
1258        for i in 0..100 {
1259            let _ = lm.release(i, 1);
1260        }
1261
1262        assert_eq!(lm.n_total_locks(), 0);
1263    }
1264
1265    /// TXN-F3 regression: an importunate locker steals a held, conflicting
1266    /// lock from a preemptable owner instead of waiting/timing out.  JE
1267    /// `LockManager.waitForLock`: `if (isImportunate) { result =
1268    /// stealLock(...); if (result.success) break; }` (LockManager.java:552).
1269    #[test]
1270    fn test_importunate_steals_conflicting_lock() {
1271        const LSN: u64 = 4242;
1272        let lm = LockManager::new();
1273
1274        // Owner 1 (preemptable) holds a Write lock.
1275        assert_eq!(
1276            lm.lock(LSN, 1, LockType::Write, false, false).unwrap(),
1277            LockGrantType::New
1278        );
1279        assert!(lm.is_owned_write_lock(LSN, 1));
1280
1281        // Importunate locker 2 requests a conflicting Write lock.  Use a
1282        // non_blocking=false call with a SHORT timeout: pre-fix this would
1283        // either time out (LockTimeout) or block; post-fix the steal grants
1284        // it immediately.
1285        let grant = lm
1286            .lock_importunate_with_timeout(LSN, 2, LockType::Write, false, 200)
1287            .expect("importunate locker must steal the conflicting lock");
1288        assert!(matches!(grant, LockGrantType::New | LockGrantType::Existing));
1289
1290        // The steal preempted owner 1; locker 2 now holds the write lock.
1291        assert!(lm.is_owned_write_lock(LSN, 2));
1292        assert!(!lm.is_owned_write_lock(LSN, 1));
1293    }
1294
1295    /// A non-preemptable owner blocks the steal: the importunate request
1296    /// falls back to a normal wait and times out rather than stealing.
1297    /// JE: "Lock holder is non-preemptable, wait again" (LockManager.java:556).
1298    #[test]
1299    fn test_importunate_cannot_steal_non_preemptable() {
1300        const LSN: u64 = 4343;
1301        let lm = LockManager::new();
1302
1303        // Owner 1 is non-preemptable (another importunate locker).
1304        lm.register_non_preemptable(1);
1305        assert_eq!(
1306            lm.lock(LSN, 1, LockType::Write, false, false).unwrap(),
1307            LockGrantType::New
1308        );
1309
1310        // Importunate locker 2 cannot steal; with a short timeout it must
1311        // time out, and owner 1 keeps the lock.
1312        let r = lm.lock_importunate_with_timeout(
1313            LSN,
1314            2,
1315            LockType::Write,
1316            false,
1317            100,
1318        );
1319        assert!(matches!(r, Err(TxnError::LockTimeout { .. })));
1320        assert!(lm.is_owned_write_lock(LSN, 1));
1321        assert!(!lm.is_owned_write_lock(LSN, 2));
1322    }
1323
1324    #[test]
1325    fn test_statistics() {
1326        let lm = LockManager::new();
1327
1328        let _ = lm.lock(1000, 1, LockType::Read, false, false);
1329        let _ = lm.lock(1000, 2, LockType::Read, false, false);
1330
1331        let stats = lm.get_stats();
1332        assert_eq!(stats.lock_requests, 2);
1333    }
1334
1335    #[test]
1336    fn test_is_owned_write_lock() {
1337        let lm = LockManager::new();
1338
1339        let _ = lm.lock(1000, 1, LockType::Write, false, false);
1340
1341        assert!(lm.is_owned_write_lock(1000, 1));
1342        assert!(!lm.is_owned_write_lock(1000, 2));
1343        assert!(!lm.is_owned_write_lock(2000, 1));
1344    }
1345
1346    #[test]
1347    fn test_get_owned_lock_type() {
1348        let lm = LockManager::new();
1349
1350        let _ = lm.lock(1000, 1, LockType::Read, false, false);
1351
1352        assert_eq!(lm.get_owned_lock_type(1000, 1), Some(LockType::Read));
1353        assert_eq!(lm.get_owned_lock_type(1000, 2), None);
1354        assert_eq!(lm.get_owned_lock_type(2000, 1), None);
1355    }
1356
1357    #[test]
1358    fn test_demote() {
1359        let lm = LockManager::new();
1360
1361        let _ = lm.lock(1000, 1, LockType::Write, false, false);
1362        assert!(lm.is_owned_write_lock(1000, 1));
1363
1364        let _ = lm.demote(1000, 1);
1365        assert!(!lm.is_owned_write_lock(1000, 1));
1366        assert_eq!(lm.get_owned_lock_type(1000, 1), Some(LockType::Read));
1367    }
1368
1369    #[test]
1370    fn test_steal_lock() {
1371        let lm = LockManager::new();
1372
1373        let _ = lm.lock(1000, 1, LockType::Read, false, false);
1374        assert_eq!(lm.get_owned_lock_type(1000, 1), Some(LockType::Read));
1375
1376        let _ = lm.steal_lock(1000, 2);
1377    }
1378
1379    // -----------------------------------------------------------------------
1380    // Multi-threaded blocking tests
1381    // -----------------------------------------------------------------------
1382
1383    /// Thread A holds a write lock; thread B blocks on it.  When A releases,
1384    /// B should be granted the lock.
1385    ///
1386    /// Waitforlock / notifyall flow.
1387    #[test]
1388    fn test_blocking_lock_granted_on_release() {
1389        let lm = Arc::new(LockManager::new());
1390        const LSN: u64 = 0xDEAD_BEEF;
1391
1392        // Thread A acquires the write lock.
1393        lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1394
1395        // Sync: wait for B to register as waiter before A releases.
1396        let ready = Arc::new((Mutex::new(false), Condvar::new()));
1397
1398        let lm_b = Arc::clone(&lm);
1399        let ready_b = Arc::clone(&ready);
1400        let b = thread::spawn(move || {
1401            // Signal that B is about to block.
1402            {
1403                let (m, cv) = &*ready_b;
1404                let mut g = m.lock();
1405                *g = true;
1406                cv.notify_all();
1407            }
1408            // Block until A releases (5 s timeout so test doesn't hang).
1409
1410            lm_b.lock_with_timeout(LSN, 2, LockType::Write, false, false, 5000)
1411        });
1412
1413        // Wait until B has at least started, then give it a moment to block.
1414        {
1415            let (m, cv) = &*ready;
1416            let mut g = m.lock();
1417            while !*g {
1418                cv.wait(&mut g);
1419            }
1420        }
1421        // Small sleep so B enters the condvar wait.
1422        thread::sleep(Duration::from_millis(50));
1423
1424        // A releases the lock.
1425        lm.release(LSN, 1).unwrap();
1426
1427        // B should wake up and get the lock.
1428        let result = b.join().unwrap();
1429        assert!(result.is_ok(), "thread B expected Ok, got {:?}", result);
1430        assert_eq!(result.unwrap(), LockGrantType::New);
1431    }
1432
1433    /// Thread A holds a write lock.  Thread B waits with a short timeout.
1434    /// A never releases, so B should receive `LockTimeout`.
1435    #[test]
1436    fn test_lock_timeout() {
1437        let lm = Arc::new(LockManager::new());
1438        const LSN: u64 = 0xCAFE_BABE;
1439
1440        // Thread A acquires the write lock and holds it for the entire test.
1441        lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1442
1443        let lm_b = Arc::clone(&lm);
1444        let b = thread::spawn(move || {
1445            // 100 ms timeout — A will not release.
1446            lm_b.lock_with_timeout(LSN, 2, LockType::Read, false, false, 100)
1447        });
1448
1449        let result = b.join().unwrap();
1450        assert!(
1451            matches!(result, Err(TxnError::LockTimeout { .. })),
1452            "expected LockTimeout, got {:?}",
1453            result
1454        );
1455
1456        // Clean up: A releases.
1457        lm.release(LSN, 1).unwrap();
1458    }
1459
1460    /// A -> holds X, waits Y; B -> holds Y, waits X.
1461    /// One of them must get `Deadlock` error.
1462    #[test]
1463    fn test_deadlock_detected() {
1464        let lm = Arc::new(LockManager::new());
1465        const LSN_X: u64 = 0x1111_1111;
1466        const LSN_Y: u64 = 0x2222_2222;
1467
1468        // Thread A holds X.
1469        lm.lock(LSN_X, 1, LockType::Write, false, false).unwrap();
1470
1471        // Thread B holds Y.
1472        lm.lock(LSN_Y, 2, LockType::Write, false, false).unwrap();
1473
1474        let lm_a = Arc::clone(&lm);
1475        let lm_b = Arc::clone(&lm);
1476
1477        // A waits for Y (held by B), B waits for X (held by A) — classic
1478        // deadlock.  Use a generous timeout so the deadlock detector fires
1479        // rather than the timeout.
1480        let a = thread::spawn(move || {
1481            lm_a.lock_with_timeout(
1482                LSN_Y,
1483                1,
1484                LockType::Write,
1485                false,
1486                false,
1487                3000,
1488            )
1489        });
1490
1491        // Give A a moment to register as waiter.
1492        thread::sleep(Duration::from_millis(50));
1493
1494        let b = thread::spawn(move || {
1495            lm_b.lock_with_timeout(
1496                LSN_X,
1497                2,
1498                LockType::Write,
1499                false,
1500                false,
1501                3000,
1502            )
1503        });
1504
1505        let res_a = a.join().unwrap();
1506        let res_b = b.join().unwrap();
1507
1508        // At least one must be a Deadlock error.
1509        let one_deadlock = matches!(res_a, Err(TxnError::Deadlock(_)))
1510            || matches!(res_b, Err(TxnError::Deadlock(_)));
1511        assert!(
1512            one_deadlock,
1513            "expected at least one Deadlock, got a={:?} b={:?}",
1514            res_a, res_b
1515        );
1516    }
1517
1518    /// One write lock released; multiple waiting readers must all be granted.
1519    ///
1520    /// grants all compatible waiters at once in LockImpl.release().
1521    #[test]
1522    fn test_multiple_readers_unblocked() {
1523        let lm = Arc::new(LockManager::new());
1524        const LSN: u64 = 0xFEED_FACE;
1525        const N_READERS: usize = 4;
1526
1527        // Writer holds the lock.
1528        lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1529
1530        let started = Arc::new((Mutex::new(0usize), Condvar::new()));
1531        let mut handles = Vec::new();
1532
1533        for i in 0..N_READERS {
1534            let lm_r = Arc::clone(&lm);
1535            let started_r = Arc::clone(&started);
1536            let h = thread::spawn(move || {
1537                {
1538                    let (m, cv) = &*started_r;
1539                    let mut g = m.lock();
1540                    *g += 1;
1541                    cv.notify_all();
1542                }
1543                lm_r.lock_with_timeout(
1544                    LSN,
1545                    (i + 2) as i64,
1546                    LockType::Read,
1547                    false,
1548                    false,
1549                    5000,
1550                )
1551            });
1552            handles.push(h);
1553        }
1554
1555        // Wait until all readers have signalled.
1556        {
1557            let (m, cv) = &*started;
1558            let mut g = m.lock();
1559            while *g < N_READERS {
1560                cv.wait(&mut g);
1561            }
1562        }
1563        // Allow time for all readers to block.
1564        thread::sleep(Duration::from_millis(80));
1565
1566        // Release the write lock.
1567        lm.release(LSN, 1).unwrap();
1568
1569        // All readers should have been granted.
1570        for h in handles {
1571            let result = h.join().unwrap();
1572            assert!(result.is_ok(), "reader expected Ok, got {:?}", result);
1573            assert_eq!(result.unwrap(), LockGrantType::New);
1574        }
1575    }
1576
1577    // -----------------------------------------------------------------------
1578    // Ported from LockManagerTest.java — testNegatives
1579    // -----------------------------------------------------------------------
1580
1581    /// Query methods return false before
1582    /// a lock is acquired, and the lock entry is cleaned up after release.
1583    #[test]
1584    fn test_je_negatives_query_before_lock() {
1585        let lm = LockManager::new();
1586        let lsn: u64 = 1;
1587
1588        // No lock held yet.
1589        assert_eq!(lm.get_owned_lock_type(lsn, 1), None);
1590        assert_eq!(lm.get_owned_lock_type(lsn, 1), None); // write check
1591        let (owners, _) = lm.get_lock_info(lsn);
1592        assert_eq!(owners, 0);
1593
1594        // Acquire READ lock for locker 1.
1595        let r = lm.lock(lsn, 1, LockType::Read, false, false);
1596        assert!(r.is_ok());
1597        assert_eq!(r.unwrap(), LockGrantType::New);
1598
1599        // A second request for the same lock by the same locker → EXISTING.
1600        let r2 = lm.lock(lsn, 1, LockType::Read, false, false);
1601        assert_eq!(r2.unwrap(), LockGrantType::Existing);
1602
1603        // Locker 2 does not own it.
1604        assert_eq!(lm.get_owned_lock_type(lsn, 2), None);
1605
1606        // The lock entry exists.
1607        let (owners, _) = lm.get_lock_info(lsn);
1608        assert_eq!(owners, 1);
1609
1610        // Release a non-existent LSN — should not panic and lock should persist.
1611        let _ = lm.release(2, 1); // lsn=2 doesn't exist
1612        let (owners2, _) = lm.get_lock_info(lsn);
1613        assert_eq!(owners2, 1);
1614
1615        // Release by a non-owner (locker 2) should not release lsn=1.
1616        let _ = lm.release(lsn, 2);
1617        let (owners3, _) = lm.get_lock_info(lsn);
1618        assert_eq!(owners3, 1);
1619        assert_eq!(lm.get_owned_lock_type(lsn, 1), Some(LockType::Read));
1620
1621        // True release by the actual owner.
1622        lm.release(lsn, 1).unwrap();
1623        let (owners4, _) = lm.get_lock_info(lsn);
1624        assert_eq!(owners4, 0);
1625        assert_eq!(lm.get_owned_lock_type(lsn, 1), None);
1626    }
1627
1628    /// Holding write then requesting
1629    /// READ for the same locker succeeds (WRITE subsumes READ).
1630    #[test]
1631    fn test_je_write_then_read_same_locker_ok() {
1632        let lm = LockManager::new();
1633        let lsn: u64 = 1;
1634
1635        lm.lock(lsn, 1, LockType::Write, false, false).unwrap();
1636        // READ request for same locker — must succeed (EXISTING or better).
1637        let r = lm.lock(lsn, 1, LockType::Read, false, false);
1638        assert!(r.is_ok());
1639        // A third WRITE request should also be EXISTING.
1640        let r2 = lm.lock(lsn, 1, LockType::Write, false, false);
1641        assert_eq!(r2.unwrap(), LockGrantType::Existing);
1642    }
1643
1644    // -----------------------------------------------------------------------
1645    // Ported from LockManagerTest.java — testSR15926LargeNodeIds
1646    // -----------------------------------------------------------------------
1647
1648    /// Lsn values with the
1649    /// sign bit set (> 0x80000000) must hash to a non-negative table index.
1650    #[test]
1651    fn test_je_large_lsn_no_negative_index() {
1652        let lm = LockManager::new();
1653        // 0x80000000 is the value from the original bug report.
1654        let large_lsn: u64 = 0x80000000u64;
1655        let result = lm.lock(large_lsn, 1, LockType::Write, false, false);
1656        assert!(result.is_ok(), "large LSN should not cause a panic or error");
1657        lm.release(large_lsn, 1).unwrap();
1658    }
1659
1660    // -----------------------------------------------------------------------
1661    // Ported from LockManagerTest — distinct-LSN distribution across shards
1662    // -----------------------------------------------------------------------
1663
1664    /// Verify shard distribution: distinct LSNs are independently managed
1665    /// regardless of the configured shard count (default 64).
1666    #[test]
1667    fn test_with_config_shard_count_honored() {
1668        // DRIFT-2 regression: the shard count must come from config, not a
1669        // hardcoded constant. default new() = 64; with_config respects N.
1670        assert_eq!(LockManager::new().n_lock_tables(), 64);
1671        assert_eq!(LockManager::with_config(500, 8).n_lock_tables(), 8);
1672        // Clamped to at least 1.
1673        assert_eq!(LockManager::with_config(500, 0).n_lock_tables(), 1);
1674        // get_table_index stays within the configured shard count.
1675        let lm = LockManager::with_config(500, 8);
1676        for lsn in [1u64, 7, 64, 1000, u64::MAX] {
1677            assert!(lm.get_table_index(lsn) < 8);
1678        }
1679    }
1680
1681    #[test]
1682    fn test_je_sixteen_lock_tables() {
1683        // Distribute 16 distinct LSNs and check all are independently managed
1684        // (shard count is an instance field, default 64).
1685        let lm = LockManager::new();
1686        for i in 0..16u64 {
1687            lm.lock(i, 1, LockType::Write, false, false).unwrap();
1688        }
1689        assert_eq!(lm.n_total_locks(), 16);
1690        for i in 0..16u64 {
1691            lm.release(i, 1).unwrap();
1692        }
1693        assert_eq!(lm.n_total_locks(), 0);
1694    }
1695
1696    // -----------------------------------------------------------------------
1697    // Ported from LockManagerTest.java — testMultipleReaders
1698    // -----------------------------------------------------------------------
1699
1700    /// Three concurrent threads
1701    /// can all hold read locks simultaneously.
1702    #[test]
1703    fn test_je_multiple_readers_concurrent() {
1704        let lm = Arc::new(LockManager::new());
1705        const LSN: u64 = 0xAAAA;
1706        let ready = Arc::new((Mutex::new(0usize), Condvar::new()));
1707        let mut handles = Vec::new();
1708
1709        for locker_id in 1i64..=3 {
1710            let lm2 = Arc::clone(&lm);
1711            let ready2 = Arc::clone(&ready);
1712            let h = thread::spawn(move || {
1713                lm2.lock(LSN, locker_id, LockType::Read, false, false).unwrap();
1714                assert_eq!(
1715                    lm2.get_owned_lock_type(LSN, locker_id),
1716                    Some(LockType::Read)
1717                );
1718                {
1719                    let (m, cv) = &*ready2;
1720                    let mut g = m.lock();
1721                    *g += 1;
1722                    cv.notify_all();
1723                }
1724                // Wait for all three to own
1725                {
1726                    let (m, cv) = &*ready2;
1727                    let mut g = m.lock();
1728                    while *g < 3 {
1729                        cv.wait(&mut g);
1730                    }
1731                }
1732                lm2.release(LSN, locker_id).unwrap();
1733            });
1734            handles.push(h);
1735        }
1736        for h in handles {
1737            h.join().unwrap();
1738        }
1739    }
1740
1741    // -----------------------------------------------------------------------
1742    // Ported from LockManagerTest.java — testNonBlockingLock1 / 2
1743    // -----------------------------------------------------------------------
1744
1745    /// A read lock is held;
1746    /// a non-blocking write request is denied; after release the write succeeds.
1747    #[test]
1748    fn test_je_nonblocking_write_denied_then_granted() {
1749        let lm = Arc::new(LockManager::new());
1750        const LSN: u64 = 0xBBBB;
1751
1752        // Thread 1 holds a read lock.
1753        lm.lock(LSN, 1, LockType::Read, false, false).unwrap();
1754
1755        let lm2 = Arc::clone(&lm);
1756        let h = thread::spawn(move || {
1757            // Non-blocking write → must be denied.
1758            let r = lm2.lock(LSN, 2, LockType::Write, true, false);
1759            assert!(
1760                matches!(r, Err(TxnError::LockNotAvailable { .. })),
1761                "expected LockNotAvailable, got {:?}",
1762                r
1763            );
1764            // Locker 2 is not an owner.
1765            assert_eq!(lm2.get_owned_lock_type(LSN, 2), None);
1766            let (_, waiters) = lm2.get_lock_info(LSN);
1767            assert_eq!(waiters, 0);
1768            let (owners, _) = lm2.get_lock_info(LSN);
1769            assert_eq!(owners, 1);
1770        });
1771        h.join().unwrap();
1772
1773        // Now release locker 1; locker 2 can acquire afterwards.
1774        lm.release(LSN, 1).unwrap();
1775        let r2 = lm.lock(LSN, 2, LockType::Write, false, false);
1776        assert_eq!(r2.unwrap(), LockGrantType::New);
1777        lm.release(LSN, 2).unwrap();
1778    }
1779
1780    /// A write lock is held;
1781    /// a non-blocking read request is denied; after release the read succeeds.
1782    #[test]
1783    fn test_je_nonblocking_read_denied_then_granted() {
1784        let lm = Arc::new(LockManager::new());
1785        const LSN: u64 = 0xCCCC;
1786
1787        // Locker 1 holds a write lock.
1788        lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1789        assert!(lm.is_owned_write_lock(LSN, 1));
1790
1791        // Non-blocking read for locker 2 → denied.
1792        let r = lm.lock(LSN, 2, LockType::Read, true, false);
1793        assert!(
1794            matches!(r, Err(TxnError::LockNotAvailable { .. })),
1795            "expected LockNotAvailable, got {:?}",
1796            r
1797        );
1798        assert_eq!(lm.get_owned_lock_type(LSN, 2), None);
1799
1800        // Release locker 1, then locker 2 can read.
1801        lm.release(LSN, 1).unwrap();
1802        let r2 = lm.lock(LSN, 2, LockType::Read, false, false);
1803        assert_eq!(r2.unwrap(), LockGrantType::New);
1804        assert!(!lm.is_owned_write_lock(LSN, 2));
1805        lm.release(LSN, 2).unwrap();
1806    }
1807
1808    // -----------------------------------------------------------------------
1809    // Ported from LockManagerTest.java — testMultipleReadersSingleWrite1
1810    // -----------------------------------------------------------------------
1811
1812    /// Two readers
1813    /// hold a lock; a writer blocks; when both readers release the writer is
1814    /// granted.
1815    #[test]
1816    fn test_je_two_readers_one_writer_blocks_then_granted() {
1817        let lm = Arc::new(LockManager::new());
1818        const LSN: u64 = 0xDDDD;
1819        let writers_waiting = Arc::new((Mutex::new(false), Condvar::new()));
1820
1821        // Locker 1 and 2 acquire read locks upfront.
1822        lm.lock(LSN, 1, LockType::Read, false, false).unwrap();
1823        lm.lock(LSN, 2, LockType::Read, false, false).unwrap();
1824
1825        let lm3 = Arc::clone(&lm);
1826        let ww = Arc::clone(&writers_waiting);
1827        let writer = thread::spawn(move || {
1828            {
1829                let (m, cv) = &*ww;
1830                let mut g = m.lock();
1831                *g = true;
1832                cv.notify_all();
1833            }
1834            // Block until both readers release.
1835            lm3.lock_with_timeout(LSN, 3, LockType::Write, false, false, 5000)
1836        });
1837
1838        // Wait until writer has registered as waiter.
1839        {
1840            let (m, cv) = &*writers_waiting;
1841            let mut g = m.lock();
1842            while !*g {
1843                cv.wait(&mut g);
1844            }
1845        }
1846        thread::sleep(Duration::from_millis(30));
1847
1848        let (_, waiters) = lm.get_lock_info(LSN);
1849        assert_eq!(waiters, 1, "writer should be waiting");
1850
1851        lm.release(LSN, 1).unwrap();
1852        lm.release(LSN, 2).unwrap();
1853
1854        let result = writer.join().unwrap();
1855        assert!(
1856            result.is_ok(),
1857            "writer should have been granted, got {:?}",
1858            result
1859        );
1860        assert!(lm.is_owned_write_lock(LSN, 3));
1861        lm.release(LSN, 3).unwrap();
1862    }
1863
1864    // -----------------------------------------------------------------------
1865    // Ported from DeadlockTest.java — testDeadlockBetweenTwoLockers
1866    // -----------------------------------------------------------------------
1867
1868    /// Classic 2-locker
1869    /// deadlock.  Locker 1 holds L1 and waits for L2; locker 2 holds L2 and
1870    /// waits for L1.  At least one must receive a Deadlock error.
1871    #[test]
1872    fn test_je_deadlock_two_lockers() {
1873        let lm = Arc::new(LockManager::new());
1874        const L1: u64 = 0x1001;
1875        const L2: u64 = 0x2002;
1876
1877        lm.lock(L1, 1, LockType::Write, false, false).unwrap();
1878        lm.lock(L2, 2, LockType::Write, false, false).unwrap();
1879
1880        let lm_a = Arc::clone(&lm);
1881        let lm_b = Arc::clone(&lm);
1882
1883        let a = thread::spawn(move || {
1884            lm_a.lock_with_timeout(L2, 1, LockType::Write, false, false, 3000)
1885        });
1886        thread::sleep(Duration::from_millis(50));
1887        let b = thread::spawn(move || {
1888            lm_b.lock_with_timeout(L1, 2, LockType::Write, false, false, 3000)
1889        });
1890
1891        let ra = a.join().unwrap();
1892        let rb = b.join().unwrap();
1893
1894        let one_dead = matches!(ra, Err(TxnError::Deadlock(_)))
1895            || matches!(rb, Err(TxnError::Deadlock(_)));
1896        assert!(
1897            one_dead,
1898            "expected at least one Deadlock, got a={:?} b={:?}",
1899            ra, rb
1900        );
1901    }
1902
1903    // -----------------------------------------------------------------------
1904    // Ported from DeadlockTest.java — testDeadlockAmongThreeLockers
1905    // -----------------------------------------------------------------------
1906
1907    /// 3-locker cycle.
1908    /// Locker1 → L2, Locker2 → L3, Locker3 → L1.  At least one deadlock.
1909    #[test]
1910    fn test_je_deadlock_three_lockers_cycle() {
1911        let lm = Arc::new(LockManager::new());
1912        const L1: u64 = 0x3001;
1913        const L2: u64 = 0x3002;
1914        const L3: u64 = 0x3003;
1915
1916        // Each locker acquires its first lock.
1917        lm.lock(L1, 1, LockType::Write, false, false).unwrap();
1918        lm.lock(L2, 2, LockType::Write, false, false).unwrap();
1919        lm.lock(L3, 3, LockType::Write, false, false).unwrap();
1920
1921        let lm1 = Arc::clone(&lm);
1922        let lm2 = Arc::clone(&lm);
1923        let lm3 = Arc::clone(&lm);
1924
1925        let t1 = thread::spawn(move || {
1926            lm1.lock_with_timeout(L2, 1, LockType::Write, false, false, 3000)
1927        });
1928        thread::sleep(Duration::from_millis(30));
1929        let t2 = thread::spawn(move || {
1930            lm2.lock_with_timeout(L3, 2, LockType::Write, false, false, 3000)
1931        });
1932        thread::sleep(Duration::from_millis(30));
1933        let t3 = thread::spawn(move || {
1934            lm3.lock_with_timeout(L1, 3, LockType::Write, false, false, 3000)
1935        });
1936
1937        let r1 = t1.join().unwrap();
1938        let r2 = t2.join().unwrap();
1939        let r3 = t3.join().unwrap();
1940
1941        let any_dead = matches!(r1, Err(TxnError::Deadlock(_)))
1942            || matches!(r2, Err(TxnError::Deadlock(_)))
1943            || matches!(r3, Err(TxnError::Deadlock(_)));
1944        assert!(
1945            any_dead,
1946            "3-locker cycle: expected at least one Deadlock error"
1947        );
1948    }
1949
1950    // -----------------------------------------------------------------------
1951    // Ported from DeadlockTest.java — testThrowCorrectException
1952    // -----------------------------------------------------------------------
1953
1954    /// A single waiter with
1955    /// no cycle should time out with LockTimeout (not Deadlock).
1956    #[test]
1957    fn test_je_no_cycle_gives_timeout_not_deadlock() {
1958        let lm = Arc::new(LockManager::new());
1959        const LSN: u64 = 0x4444;
1960
1961        // Locker 1 holds the lock and never releases.
1962        lm.lock(LSN, 1, LockType::Write, false, false).unwrap();
1963
1964        let lm2 = Arc::clone(&lm);
1965        let h = thread::spawn(move || {
1966            lm2.lock_with_timeout(LSN, 2, LockType::Write, false, false, 200)
1967        });
1968
1969        let r = h.join().unwrap();
1970        assert!(
1971            matches!(r, Err(TxnError::LockTimeout { .. })),
1972            "no cycle → expected LockTimeout, got {:?}",
1973            r
1974        );
1975
1976        lm.release(LSN, 1).unwrap();
1977    }
1978
1979    // -----------------------------------------------------------------------
1980    // Ported from LockManagerTest — lock statistics increment
1981    // -----------------------------------------------------------------------
1982
1983    /// Lock statistics (lock_requests, lock_waits) must increment correctly.
1984    #[test]
1985    fn test_je_lock_stats_increment() {
1986        let lm = LockManager::new();
1987
1988        lm.lock(10, 1, LockType::Read, false, false).unwrap();
1989        lm.lock(10, 2, LockType::Read, false, false).unwrap();
1990        lm.lock(20, 3, LockType::Write, false, false).unwrap();
1991
1992        let stats = lm.get_stats();
1993        assert_eq!(stats.lock_requests, 3, "3 lock requests should be counted");
1994        // No waits because all were immediately granted.
1995        assert_eq!(stats.lock_waits, 0, "no waits expected");
1996    }
1997
1998    // -----------------------------------------------------------------------
1999    // Ported from LockManagerTest.java — testUpgradeLock
2000    // -----------------------------------------------------------------------
2001
2002    /// A promotion waiter (locker
2003    /// that already holds a read lock) is placed ahead of new write waiters
2004    /// so it gets the write lock before them.
2005    #[test]
2006    fn test_je_upgrade_lock_butts_in_front() {
2007        let lm = Arc::new(LockManager::new());
2008        const LSN: u64 = 0x5555;
2009
2010        // Locker 1 and 2 hold read locks.
2011        lm.lock(LSN, 1, LockType::Read, false, false).unwrap();
2012        lm.lock(LSN, 2, LockType::Read, false, false).unwrap();
2013
2014        let lm3 = Arc::clone(&lm);
2015        let lm2 = Arc::clone(&lm);
2016
2017        // Locker 3 waits for write (new waiter).
2018        let t3 = thread::spawn(move || {
2019            lm3.lock_with_timeout(LSN, 3, LockType::Write, false, false, 5000)
2020        });
2021        thread::sleep(Duration::from_millis(30));
2022
2023        // Locker 2 upgrades read → write (promotion waiter, should jump ahead).
2024        let t2 = thread::spawn(move || {
2025            lm2.lock_with_timeout(LSN, 2, LockType::Write, false, false, 5000)
2026        });
2027        thread::sleep(Duration::from_millis(20));
2028
2029        // Release locker 1's read lock; locker 2's promotion should be granted
2030        // before locker 3.
2031        lm.release(LSN, 1).unwrap();
2032
2033        let r2 = t2.join().unwrap();
2034        assert!(r2.is_ok(), "locker 2 promotion should succeed, got {:?}", r2);
2035        assert_eq!(r2.unwrap(), LockGrantType::Promotion);
2036
2037        // Now release locker 2's write; locker 3 gets it.
2038        lm.release(LSN, 2).unwrap();
2039        let r3 = t3.join().unwrap();
2040        assert!(
2041            r3.is_ok(),
2042            "locker 3 should succeed after locker 2, got {:?}",
2043            r3
2044        );
2045        lm.release(LSN, 3).unwrap();
2046    }
2047
2048    // -----------------------------------------------------------------------
2049    // release_all_for_locker
2050    // -----------------------------------------------------------------------
2051
2052    #[test]
2053    fn release_all_for_locker_returns_count() {
2054        let lm = LockManager::new();
2055        // Locker 7 takes 5 locks, locker 8 takes 2.
2056        for lsn in [10u64, 20, 30, 40, 50] {
2057            lm.lock(lsn, 7, LockType::Read, false, false).unwrap();
2058        }
2059        for lsn in [100u64, 200] {
2060            lm.lock(lsn, 8, LockType::Write, false, false).unwrap();
2061        }
2062        assert_eq!(lm.n_total_locks(), 7);
2063
2064        let released = lm.release_all_for_locker(7);
2065        assert_eq!(released, 5);
2066        // Only locker 8's 2 locks remain.
2067        assert_eq!(lm.n_total_locks(), 2);
2068
2069        let released2 = lm.release_all_for_locker(8);
2070        assert_eq!(released2, 2);
2071        assert_eq!(lm.n_total_locks(), 0);
2072    }
2073
2074    #[test]
2075    fn release_all_for_locker_unknown_id_is_zero() {
2076        let lm = LockManager::new();
2077        lm.lock(1, 1, LockType::Read, false, false).unwrap();
2078        let released = lm.release_all_for_locker(999);
2079        assert_eq!(released, 0);
2080        assert_eq!(lm.n_total_locks(), 1);
2081        lm.release(1, 1).unwrap();
2082    }
2083
2084    #[test]
2085    fn release_all_for_locker_idempotent() {
2086        // Calling twice is safe — second call reaps zero entries.
2087        let lm = LockManager::new();
2088        lm.lock(1, 1, LockType::Read, false, false).unwrap();
2089        lm.lock(2, 1, LockType::Write, false, false).unwrap();
2090        assert_eq!(lm.release_all_for_locker(1), 2);
2091        assert_eq!(lm.release_all_for_locker(1), 0);
2092    }
2093
2094    #[test]
2095    fn release_all_for_locker_preserves_other_owners() {
2096        // Multiple lockers sharing a read lock at the same LSN: releasing
2097        // one locker leaves the others' entry intact.
2098        let lm = LockManager::new();
2099        lm.lock(1, 1, LockType::Read, false, false).unwrap();
2100        lm.lock(1, 2, LockType::Read, false, false).unwrap();
2101        lm.lock(1, 3, LockType::Read, false, false).unwrap();
2102
2103        let released = lm.release_all_for_locker(2);
2104        assert_eq!(released, 1);
2105        // Lock entry persists because lockers 1 and 3 still own it.
2106        assert_eq!(lm.n_total_locks(), 1);
2107
2108        // Verify locker 2 no longer has it.
2109        let released_again = lm.release_all_for_locker(2);
2110        assert_eq!(released_again, 0);
2111
2112        lm.release(1, 1).unwrap();
2113        lm.release(1, 3).unwrap();
2114        assert_eq!(lm.n_total_locks(), 0);
2115    }
2116
2117    #[test]
2118    fn release_all_for_locker_clears_lock_when_last_owner_leaves() {
2119        let lm = LockManager::new();
2120        lm.lock(42, 1, LockType::Write, false, false).unwrap();
2121        assert_eq!(lm.n_total_locks(), 1);
2122        lm.release_all_for_locker(1);
2123        // Lock entry was the last owner of LSN 42 — entry removed.
2124        assert_eq!(lm.n_total_locks(), 0);
2125    }
2126
2127    /// H-2 regression: verify that no internal deadlock occurs when the lock
2128    /// manager processes concurrent waiter registrations and deadlock-victim
2129    /// cleanups.  Before this fix, different code paths acquired shard and
2130    /// waiter_graph mutexes in inconsistent order, creating a potential
2131    /// process hang under extreme contention.
2132    ///
2133    /// The test spawns two threads:
2134    ///   Thread A: holds a write lock on LSN 1, then waits on LSN 2.
2135    ///   Thread B: holds a write lock on LSN 2, then waits on LSN 1.
2136    /// This is a classic 2-txn deadlock cycle.  The lock manager must detect
2137    /// it (aborting one victim) and complete without hanging.  The 2-second
2138    /// timeout is the safety net.
2139    #[test]
2140    fn test_lock_ordering_no_internal_deadlock() {
2141        use std::sync::{Arc, Barrier};
2142        use std::thread;
2143        use std::time::Duration;
2144
2145        let lm = Arc::new(LockManager::new());
2146        const LSN_A: u64 = 0xDEAD_0001;
2147        const LSN_B: u64 = 0xDEAD_0002;
2148        const LOCKER_A: i64 = 1001;
2149        const LOCKER_B: i64 = 1002;
2150
2151        // Both threads acquire their first lock before trying for the second.
2152        let barrier = Arc::new(Barrier::new(2));
2153
2154        let lm_a = Arc::clone(&lm);
2155        let barrier_a = Arc::clone(&barrier);
2156        let t_a = thread::spawn(move || {
2157            // Locker A grabs LSN_A, then tries to grab LSN_B (held by B).
2158            lm_a.lock(LSN_A, LOCKER_A, LockType::Write, false, false).unwrap();
2159            barrier_a.wait(); // both sides have their first lock
2160            lm_a.lock(LSN_B, LOCKER_A, LockType::Write, false, false)
2161        });
2162
2163        let lm_b = Arc::clone(&lm);
2164        let barrier_b = Arc::clone(&barrier);
2165        let t_b = thread::spawn(move || {
2166            // Locker B grabs LSN_B, then tries to grab LSN_A (held by A).
2167            lm_b.lock(LSN_B, LOCKER_B, LockType::Write, false, false).unwrap();
2168            barrier_b.wait(); // both sides have their first lock
2169            lm_b.lock(LSN_A, LOCKER_B, LockType::Write, false, false)
2170        });
2171
2172        // One thread must deadlock; the other must complete.  Neither should hang.
2173        let res_a = t_a.join();
2174        let res_b = t_b.join();
2175
2176        // Exactly one of the two must be a deadlock error.
2177        let both = [res_a, res_b];
2178        let n_deadlocks = both
2179            .iter()
2180            .filter(|r| matches!(r, Ok(Err(TxnError::Deadlock(_)))))
2181            .count();
2182        let n_success = both.iter().filter(|r| matches!(r, Ok(Ok(_)))).count();
2183        // Allow for timeout as well (one deadlock or one timeout + one success)
2184        assert!(
2185            (n_deadlocks == 1 && n_success <= 1) || n_deadlocks == 2,
2186            "expected at least one deadlock error, got: n_deadlocks={n_deadlocks} n_success={n_success}"
2187        );
2188        let _ = Duration::from_secs(0); // suppress unused import warning
2189    }
2190
2191    /// H-4 regression: when select_victim has populated lock_counts, the
2192    /// transaction holding the fewest locks is chosen, regardless of which
2193    /// is youngest.
2194    ///
2195    /// Construct a 2-locker cycle where the *older* (lower-id) locker holds
2196    /// many additional locks and the *younger* (higher-id) locker holds
2197    /// only the cycle lock plus a couple more, then verify the younger
2198    /// locker is selected.  (With the previous bug, lock_counts was always
2199    /// empty so select_victim fell through to the youngest-tiebreaker; the
2200    /// younger would be chosen *for the wrong reason*.  This test pins the
2201    /// counts so the primary criterion drives the choice.)
2202    #[test]
2203    fn test_h4_victim_selection_uses_lock_counts() {
2204        let lm = Arc::new(LockManager::new());
2205        // L_OLD is held by locker 1 (older, holds 5 unrelated locks).
2206        const L_OLD: u64 = 0x6001;
2207        // L_NEW is held by locker 2 (younger, holds 0 unrelated locks).
2208        const L_NEW: u64 = 0x6002;
2209
2210        // Locker 1 owns 5 unrelated locks then takes L_OLD.
2211        for i in 0..5 {
2212            lm.lock(0x7000 + i, 1, LockType::Write, false, false).unwrap();
2213        }
2214        lm.lock(L_OLD, 1, LockType::Write, false, false).unwrap();
2215
2216        // Locker 2 owns 0 unrelated locks, then takes L_NEW.
2217        lm.lock(L_NEW, 2, LockType::Write, false, false).unwrap();
2218
2219        // Compute counts on the cycle [1, 2].
2220        let counts = lm.compute_lock_counts(&[1, 2]);
2221        assert_eq!(
2222            counts.get(&1).copied().unwrap_or(0),
2223            6,
2224            "locker 1 holds 6 locks"
2225        );
2226        assert_eq!(
2227            counts.get(&2).copied().unwrap_or(0),
2228            1,
2229            "locker 2 holds 1 lock"
2230        );
2231
2232        // select_victim with these counts must pick locker 2 (fewest locks).
2233        let victim = DeadlockDetector::select_victim(&[1, 2], &counts);
2234        assert_eq!(victim, 2, "victim must be locker 2 (fewest locks held)");
2235    }
2236
2237    /// TXN-1 regression test: `lock_with_sharing_and_timeout` must detect a
2238    /// deadlock formed WHILE waiting (not only after a 50 ms slice).
2239    ///
2240    /// Setup: two lockers on the sharing path (HandleLocker-like) each hold
2241    /// a lock the other wants. The deadlock must be detected and returned as
2242    /// `DeadlockException` well within the test timeout, NOT after 50 ms.
2243    ///
2244    /// This is a structural test rather than a timing test: we form a clear
2245    /// two-node cycle via `lock_with_sharing_and_timeout` and verify the error
2246    /// is a deadlock (not a timeout). Prior to the fix, the check only fired
2247    /// on `timed_out.timed_out()` with stale owner IDs, so a deadlock on the
2248    /// sharing path could wait a full 50 ms slice before being detected.
2249    #[test]
2250    fn test_txn1_sharing_path_deadlock_detected_promptly() {
2251        use std::sync::Barrier;
2252        use std::thread;
2253
2254        let lm = Arc::new(LockManager::new());
2255        const L1: u64 = 0xA001;
2256        const L2: u64 = 0xA002;
2257        const LOCKER_A: i64 = 101;
2258        const LOCKER_B: i64 = 102;
2259        const TIMEOUT_MS: u64 = 5_000;
2260
2261        // A holds L1; B holds L2.
2262        lm.lock(L1, LOCKER_A, LockType::Write, false, false).unwrap();
2263        lm.lock(L2, LOCKER_B, LockType::Write, false, false).unwrap();
2264
2265        let barrier = Arc::new(Barrier::new(2));
2266        let lm_b = Arc::clone(&lm);
2267        let barrier_b = Arc::clone(&barrier);
2268
2269        // Thread B tries to acquire L1 (blocked by A).
2270        let handle = thread::spawn(move || {
2271            barrier_b.wait();
2272            lm_b.lock_with_sharing_and_timeout(
2273                L1,
2274                LOCKER_B,
2275                LockType::Write,
2276                false,
2277                false,
2278                TIMEOUT_MS,
2279            )
2280        });
2281
2282        // Main thread: wait until B is queued, then try L2 (blocked by B).
2283        barrier.wait();
2284        // Small yield to let B enter the wait loop before we enqueue ourselves.
2285        std::thread::sleep(Duration::from_millis(5));
2286        let result_a = lm.lock_with_sharing_and_timeout(
2287            L2,
2288            LOCKER_A,
2289            LockType::Write,
2290            false,
2291            false,
2292            TIMEOUT_MS,
2293        );
2294
2295        let result_b = handle.join().expect("thread B panicked");
2296
2297        // Exactly one of the two must get a DeadlockException; the other
2298        // should succeed or also get a deadlock. Both getting deadlock is fine.
2299        let a_dl = matches!(result_a, Err(TxnError::Deadlock(..)));
2300        let b_dl = matches!(result_b, Err(TxnError::Deadlock(..)));
2301        assert!(
2302            a_dl || b_dl,
2303            "TXN-1: expected at least one DeadlockException on the sharing path; \
2304             got A={result_a:?}, B={result_b:?}"
2305        );
2306    }
2307}