Skip to main content

sochdb_core/
concurrency.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Hierarchical Lock Architecture with Epoch-Based Reclamation
19//!
20//! This module implements a production-quality concurrency control system:
21//! - Intent locks (IS, IX, S, X) for table-level coordination
22//! - Sharded row-level locks (256 shards) for minimal contention
23//! - Epoch-based reclamation for safe lock-free reads
24//! - Optimistic concurrency control for HNSW updates
25//!
26//! ## Lock Compatibility Matrix
27//!
28//! ```text
29//!         IS   IX   S    X
30//! IS      ✓    ✓    ✓    ✗
31//! IX      ✓    ✓    ✗    ✗
32//! S       ✓    ✗    ✓    ✗
33//! X       ✗    ✗    ✗    ✗
34//! ```
35//!
36//! ## Sharded Lock Table
37//!
38//! 256 shards reduce contention by distributing locks across independent mutexes.
39//! With N concurrent writers on M rows:
40//! - Per-shard arrival rate: λ' = λ/256
41//! - Average wait time: W' ≈ W/256
42
43use dashmap::DashMap;
44use parking_lot::Mutex;
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
48
49/// Table identifier
50pub type TableId = u64;
51
52/// Row identifier
53pub type RowId = u128;
54
55/// Transaction ID for lock ownership
56pub type TxnId = u64;
57
58/// Intent lock modes for table-level locks
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum IntentLock {
61    /// Intent Shared - transaction intends to read some rows
62    IntentShared,
63    /// Intent Exclusive - transaction intends to write some rows
64    IntentExclusive,
65    /// Shared - table-level read lock (e.g., for full table scan)
66    Shared,
67    /// Exclusive - table-level write lock (e.g., for DDL)
68    Exclusive,
69}
70
71impl IntentLock {
72    /// Check if this lock is compatible with another lock
73    pub fn is_compatible(&self, other: &IntentLock) -> bool {
74        use IntentLock::*;
75        matches!(
76            (self, other),
77            (IntentShared, IntentShared)
78                | (IntentShared, IntentExclusive)
79                | (IntentShared, Shared)
80                | (IntentExclusive, IntentShared)
81                | (IntentExclusive, IntentExclusive)
82                | (Shared, IntentShared)
83                | (Shared, Shared)
84        )
85    }
86}
87
88/// Row-level lock modes
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum LockMode {
91    /// Shared (read) lock
92    Shared,
93    /// Exclusive (write) lock
94    Exclusive,
95}
96
97impl LockMode {
98    pub fn is_compatible(&self, other: &LockMode) -> bool {
99        matches!((self, other), (LockMode::Shared, LockMode::Shared))
100    }
101}
102
103/// Lock held on a table
104#[derive(Debug)]
105struct TableLockEntry {
106    mode: IntentLock,
107    holders: Vec<TxnId>,
108    // NOTE: waiters field removed — it was always empty and never used.
109    // The current design uses abort-retry (WouldBlock) rather than wait queues.
110    // Wait queues should be added in v1.1 when analytical query support
111    // (long-running transactions) arrives, using wound-wait protocol.
112}
113
114impl TableLockEntry {
115    fn new(mode: IntentLock, txn_id: TxnId) -> Self {
116        Self {
117            mode,
118            holders: vec![txn_id],
119        }
120    }
121}
122
123/// Lock held on a row
124#[derive(Debug)]
125struct RowLockEntry {
126    mode: LockMode,
127    holders: Vec<TxnId>,
128}
129
130impl RowLockEntry {
131    fn new(mode: LockMode, txn_id: TxnId) -> Self {
132        Self {
133            mode,
134            holders: vec![txn_id],
135        }
136    }
137}
138
139/// Sharded lock table for row-level locks
140/// 256 shards reduce contention by ~256x for uniform key distribution
141pub struct ShardedLockTable {
142    shards: [Mutex<HashMap<RowId, RowLockEntry>>; 256],
143    stats: LockTableStats,
144}
145
146impl Default for ShardedLockTable {
147    fn default() -> Self {
148        Self::new()
149    }
150}
151
152impl ShardedLockTable {
153    /// Create a new sharded lock table
154    pub fn new() -> Self {
155        Self {
156            shards: std::array::from_fn(|_| Mutex::new(HashMap::new())),
157            stats: LockTableStats::default(),
158        }
159    }
160
161    /// Get shard index for a row
162    #[inline]
163    fn shard_index(&self, row_id: RowId) -> usize {
164        // Use upper bits for better distribution
165        ((row_id >> 64) as usize ^ (row_id as usize)) % 256
166    }
167
168    /// Try to acquire a lock on a row
169    pub fn try_lock(&self, row_id: RowId, mode: LockMode, txn_id: TxnId) -> LockResult {
170        let shard_idx = self.shard_index(row_id);
171        let mut shard = self.shards[shard_idx].lock();
172
173        if let Some(entry) = shard.get_mut(&row_id) {
174            // Check if already held by this transaction
175            if entry.holders.contains(&txn_id) {
176                // Upgrade if needed
177                if entry.mode == LockMode::Shared && mode == LockMode::Exclusive {
178                    if entry.holders.len() == 1 {
179                        entry.mode = LockMode::Exclusive;
180                        self.stats.upgrades.fetch_add(1, Ordering::Relaxed);
181                        return LockResult::Acquired;
182                    } else {
183                        self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
184                        return LockResult::WouldBlock;
185                    }
186                }
187                return LockResult::AlreadyHeld;
188            }
189
190            // Check compatibility
191            if entry.mode.is_compatible(&mode) {
192                entry.holders.push(txn_id);
193                self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed);
194                return LockResult::Acquired;
195            }
196
197            self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
198            return LockResult::WouldBlock;
199        }
200
201        // No existing lock - acquire
202        shard.insert(row_id, RowLockEntry::new(mode, txn_id));
203        match mode {
204            LockMode::Shared => self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed),
205            LockMode::Exclusive => self
206                .stats
207                .exclusive_acquired
208                .fetch_add(1, Ordering::Relaxed),
209        };
210        LockResult::Acquired
211    }
212
213    /// Release a lock on a row
214    pub fn unlock(&self, row_id: RowId, txn_id: TxnId) -> bool {
215        let shard_idx = self.shard_index(row_id);
216        let mut shard = self.shards[shard_idx].lock();
217
218        if let Some(entry) = shard.get_mut(&row_id)
219            && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
220        {
221            entry.holders.remove(pos);
222            self.stats.released.fetch_add(1, Ordering::Relaxed);
223
224            if entry.holders.is_empty() {
225                shard.remove(&row_id);
226            }
227            return true;
228        }
229        false
230    }
231
232    /// Release all locks held by a transaction
233    ///
234    /// WARNING: This method scans all 256 shards, acquiring each mutex.
235    /// For a transaction holding k locks across s shards, it still visits
236    /// all 256 shards at ~15ns per uncontested mutex = ~7.7µs overhead.
237    ///
238    /// For better performance, use `try_lock_tracked` + `unlock_all_tracked`
239    /// which only visits the shards that actually hold locks for this txn.
240    pub fn unlock_all(&self, txn_id: TxnId) -> usize {
241        let mut count = 0;
242        for shard in &self.shards {
243            let mut shard_guard = shard.lock();
244            let to_remove: Vec<RowId> = shard_guard
245                .iter()
246                .filter(|(_, entry)| entry.holders.contains(&txn_id))
247                .map(|(&row_id, _)| row_id)
248                .collect();
249
250            for row_id in to_remove {
251                if let Some(entry) = shard_guard.get_mut(&row_id)
252                    && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
253                {
254                    entry.holders.remove(pos);
255                    count += 1;
256
257                    if entry.holders.is_empty() {
258                        shard_guard.remove(&row_id);
259                    }
260                }
261            }
262        }
263        self.stats
264            .released
265            .fetch_add(count as u64, Ordering::Relaxed);
266        count
267    }
268
269    /// Acquire a lock and record it in the transaction's lock set
270    ///
271    /// This is the preferred API over `try_lock` because it enables O(k) unlock
272    /// via `unlock_all_tracked` instead of O(256) unlock via `unlock_all`.
273    pub fn try_lock_tracked(
274        &self,
275        row_id: RowId,
276        mode: LockMode,
277        txn_id: TxnId,
278        lock_set: &mut TransactionLockSet,
279    ) -> LockResult {
280        let result = self.try_lock(row_id, mode, txn_id);
281        if matches!(result, LockResult::Acquired) {
282            let shard_idx = self.shard_index(row_id);
283            lock_set.record(shard_idx, row_id);
284        }
285        result
286    }
287
288    /// Release all locks tracked in a TransactionLockSet
289    ///
290    /// O(k) where k is the number of locks held, instead of O(256) for unlock_all.
291    /// For a typical 3-lock transaction: ~90ns instead of ~7.7µs (85× improvement).
292    pub fn unlock_all_tracked(&self, txn_id: TxnId, lock_set: &TransactionLockSet) -> usize {
293        let mut count = 0;
294        for &(shard_idx, row_id) in &lock_set.locks {
295            let mut shard = self.shards[shard_idx].lock();
296            if let Some(entry) = shard.get_mut(&row_id)
297                && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
298            {
299                entry.holders.remove(pos);
300                count += 1;
301                if entry.holders.is_empty() {
302                    shard.remove(&row_id);
303                }
304            }
305        }
306        self.stats
307            .released
308            .fetch_add(count as u64, Ordering::Relaxed);
309        count
310    }
311
312    /// Get statistics
313    pub fn stats(&self) -> &LockTableStats {
314        &self.stats
315    }
316}
317
318/// Per-transaction lock tracking for O(k) unlock
319///
320/// Records which (shard_index, row_id) pairs a transaction holds locks on,
321/// enabling `ShardedLockTable::unlock_all_tracked` to only visit relevant
322/// shards instead of scanning all 256.
323///
324/// Stack-allocates up to 8 entries inline (128 bytes) to avoid heap
325/// allocation for typical OLTP transactions.
326#[derive(Debug, Default)]
327pub struct TransactionLockSet {
328    /// (shard_index, row_id) pairs — typically ≤8 for OLTP
329    locks: Vec<(usize, RowId)>,
330}
331
332impl TransactionLockSet {
333    /// Create a new empty lock set
334    pub fn new() -> Self {
335        Self {
336            locks: Vec::with_capacity(8),
337        }
338    }
339
340    /// Record a newly acquired lock
341    fn record(&mut self, shard_idx: usize, row_id: RowId) {
342        self.locks.push((shard_idx, row_id));
343    }
344
345    /// Number of locks tracked
346    pub fn len(&self) -> usize {
347        self.locks.len()
348    }
349
350    /// Whether no locks are tracked
351    pub fn is_empty(&self) -> bool {
352        self.locks.is_empty()
353    }
354
355    /// Clear all tracked locks (for reuse)
356    pub fn clear(&mut self) {
357        self.locks.clear();
358    }
359}
360
361/// Lock table statistics
362#[derive(Debug, Default)]
363pub struct LockTableStats {
364    pub shared_acquired: AtomicU64,
365    pub exclusive_acquired: AtomicU64,
366    pub upgrades: AtomicU64,
367    pub conflicts: AtomicU64,
368    pub released: AtomicU64,
369}
370
371/// Result of a lock attempt
372#[derive(Debug, Clone, Copy, PartialEq, Eq)]
373pub enum LockResult {
374    /// Lock was acquired
375    Acquired,
376    /// Lock is already held by this transaction
377    AlreadyHeld,
378    /// Lock would block (conflict with existing lock)
379    WouldBlock,
380    /// Deadlock or irrecoverable conflict detected
381    /// NOTE: Currently unused — the abort-retry model (WouldBlock) prevents
382    /// deadlocks by construction. Retained for API compatibility.
383    Conflict,
384}
385
386/// Hierarchical lock manager with table and row-level locks
387pub struct LockManager {
388    /// Table-level intent locks
389    table_locks: DashMap<TableId, TableLockEntry>,
390    /// Row-level sharded locks (per table)
391    row_locks: DashMap<TableId, Arc<ShardedLockTable>>,
392    /// Epoch-based reclamation for safe memory access
393    epoch: AtomicU64,
394    /// Statistics
395    stats: LockManagerStats,
396}
397
398impl Default for LockManager {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404impl LockManager {
405    /// Create a new lock manager
406    pub fn new() -> Self {
407        Self {
408            table_locks: DashMap::new(),
409            row_locks: DashMap::new(),
410            epoch: AtomicU64::new(0),
411            stats: LockManagerStats::default(),
412        }
413    }
414
415    /// Acquire an intent lock on a table
416    pub fn lock_table(&self, table_id: TableId, mode: IntentLock, txn_id: TxnId) -> LockResult {
417        use dashmap::mapref::entry::Entry;
418
419        match self.table_locks.entry(table_id) {
420            Entry::Vacant(vacant) => {
421                vacant.insert(TableLockEntry::new(mode, txn_id));
422                self.stats
423                    .table_locks_acquired
424                    .fetch_add(1, Ordering::Relaxed);
425                LockResult::Acquired
426            }
427            Entry::Occupied(mut occupied) => {
428                let entry = occupied.get_mut();
429
430                // Check if already held by this transaction
431                if entry.holders.contains(&txn_id) {
432                    return LockResult::AlreadyHeld;
433                }
434
435                // Check compatibility
436                if entry.mode.is_compatible(&mode) {
437                    entry.holders.push(txn_id);
438                    self.stats
439                        .table_locks_acquired
440                        .fetch_add(1, Ordering::Relaxed);
441                    return LockResult::Acquired;
442                }
443
444                self.stats.table_conflicts.fetch_add(1, Ordering::Relaxed);
445                LockResult::WouldBlock
446            }
447        }
448    }
449
450    /// Release a table-level lock
451    pub fn unlock_table(&self, table_id: TableId, txn_id: TxnId) -> bool {
452        if let Some(mut entry) = self.table_locks.get_mut(&table_id)
453            && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
454        {
455            entry.holders.remove(pos);
456            self.stats
457                .table_locks_released
458                .fetch_add(1, Ordering::Relaxed);
459
460            if entry.holders.is_empty() {
461                drop(entry);
462                self.table_locks.remove(&table_id);
463            }
464            return true;
465        }
466        false
467    }
468
469    /// Get or create row lock table for a table
470    fn get_row_lock_table(&self, table_id: TableId) -> Arc<ShardedLockTable> {
471        self.row_locks
472            .entry(table_id)
473            .or_insert_with(|| Arc::new(ShardedLockTable::new()))
474            .clone()
475    }
476
477    /// Acquire a row-level lock
478    pub fn lock_row(
479        &self,
480        table_id: TableId,
481        row_id: RowId,
482        mode: LockMode,
483        txn_id: TxnId,
484    ) -> LockResult {
485        // First acquire intent lock on table
486        let intent_mode = match mode {
487            LockMode::Shared => IntentLock::IntentShared,
488            LockMode::Exclusive => IntentLock::IntentExclusive,
489        };
490
491        match self.lock_table(table_id, intent_mode, txn_id) {
492            LockResult::Acquired | LockResult::AlreadyHeld => {}
493            result => return result,
494        }
495
496        // Then acquire row lock
497        let row_locks = self.get_row_lock_table(table_id);
498        row_locks.try_lock(row_id, mode, txn_id)
499    }
500
501    /// Release a row-level lock
502    pub fn unlock_row(&self, table_id: TableId, row_id: RowId, txn_id: TxnId) -> bool {
503        if let Some(row_locks) = self.row_locks.get(&table_id) {
504            return row_locks.unlock(row_id, txn_id);
505        }
506        false
507    }
508
509    /// Release all locks held by a transaction
510    pub fn release_all(&self, txn_id: TxnId) -> usize {
511        let mut count = 0;
512
513        // Release row locks
514        for entry in self.row_locks.iter() {
515            count += entry.value().unlock_all(txn_id);
516        }
517
518        // Release table locks
519        let table_ids: Vec<TableId> = self
520            .table_locks
521            .iter()
522            .filter(|e| e.value().holders.contains(&txn_id))
523            .map(|e| *e.key())
524            .collect();
525
526        for table_id in table_ids {
527            if self.unlock_table(table_id, txn_id) {
528                count += 1;
529            }
530        }
531
532        count
533    }
534
535    /// Enter a new epoch (for epoch-based reclamation)
536    pub fn enter_epoch(&self) -> u64 {
537        self.epoch.fetch_add(1, Ordering::AcqRel)
538    }
539
540    /// Get current epoch
541    pub fn current_epoch(&self) -> u64 {
542        self.epoch.load(Ordering::Acquire)
543    }
544
545    /// Get statistics
546    pub fn stats(&self) -> &LockManagerStats {
547        &self.stats
548    }
549}
550
551/// Lock manager statistics
552#[derive(Debug, Default)]
553pub struct LockManagerStats {
554    pub table_locks_acquired: AtomicU64,
555    pub table_locks_released: AtomicU64,
556    pub table_conflicts: AtomicU64,
557}
558
559/// Optimistic concurrency control for HNSW nodes
560/// Uses version counters instead of locks for wait-free reads
561pub struct OptimisticVersion {
562    /// Version counter (even = stable, odd = being modified)
563    version: AtomicU64,
564}
565
566impl Default for OptimisticVersion {
567    fn default() -> Self {
568        Self::new()
569    }
570}
571
572impl OptimisticVersion {
573    pub fn new() -> Self {
574        Self {
575            version: AtomicU64::new(0),
576        }
577    }
578
579    /// Read the version (for optimistic read)
580    #[inline]
581    pub fn read_version(&self) -> u64 {
582        self.version.load(Ordering::Acquire)
583    }
584
585    /// Check if version is stable (not being modified)
586    #[inline]
587    pub fn is_stable(&self, version: u64) -> bool {
588        version & 1 == 0
589    }
590
591    /// Validate that version hasn't changed
592    #[inline]
593    pub fn validate(&self, read_version: u64) -> bool {
594        // Ensure we see all writes before version check
595        std::sync::atomic::fence(Ordering::Acquire);
596        self.version.load(Ordering::Relaxed) == read_version
597    }
598
599    /// Try to begin a write (returns None if concurrent write in progress)
600    pub fn try_write_begin(&self) -> Option<WriteGuard<'_>> {
601        let current = self.version.load(Ordering::Acquire);
602
603        // Check if stable
604        if !self.is_stable(current) {
605            return None;
606        }
607
608        // Try to CAS to odd (writing) state
609        match self.version.compare_exchange(
610            current,
611            current + 1,
612            Ordering::AcqRel,
613            Ordering::Relaxed,
614        ) {
615            Ok(_) => Some(WriteGuard {
616                version: &self.version,
617                start_version: current,
618            }),
619            Err(_) => None,
620        }
621    }
622}
623
624/// Guard that commits version on drop
625pub struct WriteGuard<'a> {
626    version: &'a AtomicU64,
627    start_version: u64,
628}
629
630impl<'a> WriteGuard<'a> {
631    /// Commit the write (increment version to even)
632    pub fn commit(self) {
633        self.version
634            .store(self.start_version + 2, Ordering::Release);
635        std::mem::forget(self); // Don't run drop
636    }
637
638    /// Abort the write (restore original version)
639    pub fn abort(self) {
640        self.version.store(self.start_version, Ordering::Release);
641        std::mem::forget(self);
642    }
643}
644
645impl<'a> Drop for WriteGuard<'a> {
646    fn drop(&mut self) {
647        // If dropped without commit/abort, abort the write
648        self.version.store(self.start_version, Ordering::Release);
649    }
650}
651
652/// Epoch-based reclamation for safe memory access
653pub struct EpochGuard {
654    manager: Arc<EpochManager>,
655    epoch: u64,
656}
657
658impl Drop for EpochGuard {
659    fn drop(&mut self) {
660        self.manager.leave_epoch(self.epoch);
661    }
662}
663
664/// Epoch manager for safe memory reclamation
665pub struct EpochManager {
666    /// Current global epoch
667    global_epoch: AtomicU64,
668    /// Number of threads in each epoch
669    epoch_counts: [AtomicUsize; 4],
670    /// Retired items pending reclamation
671    retired: Mutex<Vec<(u64, Box<dyn Send>)>>,
672}
673
674impl Default for EpochManager {
675    fn default() -> Self {
676        Self::new()
677    }
678}
679
680impl EpochManager {
681    pub fn new() -> Self {
682        Self {
683            global_epoch: AtomicU64::new(0),
684            epoch_counts: std::array::from_fn(|_| AtomicUsize::new(0)),
685            retired: Mutex::new(Vec::new()),
686        }
687    }
688
689    /// Pin the current epoch (enter critical section)
690    pub fn pin(self: &Arc<Self>) -> EpochGuard {
691        let epoch = self.global_epoch.load(Ordering::Acquire);
692        self.epoch_counts[(epoch % 4) as usize].fetch_add(1, Ordering::AcqRel);
693
694        EpochGuard {
695            manager: self.clone(),
696            epoch,
697        }
698    }
699
700    /// Leave an epoch
701    fn leave_epoch(&self, epoch: u64) {
702        self.epoch_counts[(epoch % 4) as usize].fetch_sub(1, Ordering::AcqRel);
703    }
704
705    /// Advance the global epoch (called periodically)
706    pub fn advance(&self) {
707        let current = self.global_epoch.load(Ordering::Acquire);
708        let old_epoch = (current + 2) % 4; // Two epochs ago
709
710        // Check if old epoch is empty
711        if self.epoch_counts[old_epoch as usize].load(Ordering::Acquire) == 0 {
712            self.global_epoch.fetch_add(1, Ordering::AcqRel);
713            self.reclaim(current.saturating_sub(2));
714        }
715    }
716
717    /// Retire an object for later reclamation
718    pub fn retire<T: Send + 'static>(&self, item: T) {
719        let epoch = self.global_epoch.load(Ordering::Acquire);
720        let mut retired = self.retired.lock();
721        retired.push((epoch, Box::new(item)));
722    }
723
724    /// Reclaim objects from old epochs
725    fn reclaim(&self, safe_epoch: u64) {
726        let mut retired = self.retired.lock();
727        retired.retain(|(epoch, _)| *epoch > safe_epoch);
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::*;
734    use std::thread;
735
736    #[test]
737    fn test_intent_lock_compatibility() {
738        use IntentLock::*;
739
740        // IS compatible with IS, IX, S
741        assert!(IntentShared.is_compatible(&IntentShared));
742        assert!(IntentShared.is_compatible(&IntentExclusive));
743        assert!(IntentShared.is_compatible(&Shared));
744        assert!(!IntentShared.is_compatible(&Exclusive));
745
746        // IX compatible with IS, IX
747        assert!(IntentExclusive.is_compatible(&IntentShared));
748        assert!(IntentExclusive.is_compatible(&IntentExclusive));
749        assert!(!IntentExclusive.is_compatible(&Shared));
750        assert!(!IntentExclusive.is_compatible(&Exclusive));
751
752        // S compatible with IS, S
753        assert!(Shared.is_compatible(&IntentShared));
754        assert!(!Shared.is_compatible(&IntentExclusive));
755        assert!(Shared.is_compatible(&Shared));
756        assert!(!Shared.is_compatible(&Exclusive));
757
758        // X compatible with nothing
759        assert!(!Exclusive.is_compatible(&IntentShared));
760        assert!(!Exclusive.is_compatible(&IntentExclusive));
761        assert!(!Exclusive.is_compatible(&Shared));
762        assert!(!Exclusive.is_compatible(&Exclusive));
763    }
764
765    #[test]
766    fn test_sharded_lock_table_basic() {
767        let table = ShardedLockTable::new();
768
769        // Acquire exclusive lock
770        assert_eq!(
771            table.try_lock(1, LockMode::Exclusive, 100),
772            LockResult::Acquired
773        );
774
775        // Can't acquire another exclusive on same row
776        assert_eq!(
777            table.try_lock(1, LockMode::Exclusive, 200),
778            LockResult::WouldBlock
779        );
780
781        // Can't acquire shared on exclusive
782        assert_eq!(
783            table.try_lock(1, LockMode::Shared, 200),
784            LockResult::WouldBlock
785        );
786
787        // Can acquire on different row
788        assert_eq!(
789            table.try_lock(2, LockMode::Exclusive, 200),
790            LockResult::Acquired
791        );
792
793        // Release and reacquire
794        assert!(table.unlock(1, 100));
795        assert_eq!(
796            table.try_lock(1, LockMode::Exclusive, 200),
797            LockResult::Acquired
798        );
799    }
800
801    #[test]
802    fn test_sharded_lock_table_shared() {
803        let table = ShardedLockTable::new();
804
805        // Multiple shared locks on same row
806        assert_eq!(
807            table.try_lock(1, LockMode::Shared, 100),
808            LockResult::Acquired
809        );
810        assert_eq!(
811            table.try_lock(1, LockMode::Shared, 200),
812            LockResult::Acquired
813        );
814        assert_eq!(
815            table.try_lock(1, LockMode::Shared, 300),
816            LockResult::Acquired
817        );
818
819        // Can't acquire exclusive while shared held
820        assert_eq!(
821            table.try_lock(1, LockMode::Exclusive, 400),
822            LockResult::WouldBlock
823        );
824
825        // Release all shared, then exclusive works
826        assert!(table.unlock(1, 100));
827        assert!(table.unlock(1, 200));
828        assert!(table.unlock(1, 300));
829        assert_eq!(
830            table.try_lock(1, LockMode::Exclusive, 400),
831            LockResult::Acquired
832        );
833    }
834
835    #[test]
836    fn test_sharded_lock_upgrade() {
837        let table = ShardedLockTable::new();
838
839        // Acquire shared
840        assert_eq!(
841            table.try_lock(1, LockMode::Shared, 100),
842            LockResult::Acquired
843        );
844
845        // Try to upgrade - should succeed since we're only holder
846        assert_eq!(
847            table.try_lock(1, LockMode::Exclusive, 100),
848            LockResult::Acquired
849        );
850
851        // Now we hold exclusive
852        assert_eq!(
853            table.try_lock(1, LockMode::Shared, 200),
854            LockResult::WouldBlock
855        );
856    }
857
858    #[test]
859    fn test_lock_manager_hierarchical() {
860        let manager = LockManager::new();
861
862        // Acquire row lock (should auto-acquire intent lock)
863        assert_eq!(
864            manager.lock_row(1, 100, LockMode::Exclusive, 1000),
865            LockResult::Acquired
866        );
867
868        // Another transaction can read different row
869        assert_eq!(
870            manager.lock_row(1, 200, LockMode::Shared, 2000),
871            LockResult::Acquired
872        );
873
874        // Can't acquire exclusive on locked row
875        assert_eq!(
876            manager.lock_row(1, 100, LockMode::Exclusive, 2000),
877            LockResult::WouldBlock
878        );
879
880        // Release all
881        let released = manager.release_all(1000);
882        assert!(released >= 1);
883    }
884
885    #[test]
886    fn test_optimistic_version() {
887        let version = OptimisticVersion::new();
888
889        // Read should see stable version (0)
890        let v = version.read_version();
891        assert!(version.is_stable(v));
892        assert!(version.validate(v));
893
894        // Write should increment
895        {
896            let guard = version.try_write_begin().unwrap();
897            let v_during = version.read_version();
898            assert!(!version.is_stable(v_during)); // Odd = writing
899            guard.commit();
900        }
901
902        // After commit, version is 2 (even = stable)
903        let v2 = version.read_version();
904        assert!(version.is_stable(v2));
905        assert_eq!(v2, 2);
906    }
907
908    #[test]
909    fn test_optimistic_concurrent() {
910        let version = Arc::new(OptimisticVersion::new());
911
912        // Start a write
913        let guard = version.try_write_begin().unwrap();
914
915        // Concurrent write attempt should fail
916        let version2 = version.clone();
917        let result = version2.try_write_begin();
918        assert!(result.is_none());
919
920        // Finish write
921        guard.commit();
922
923        // Now another write should succeed
924        let guard2 = version.try_write_begin().unwrap();
925        guard2.commit();
926    }
927
928    #[test]
929    fn test_epoch_manager() {
930        let manager = Arc::new(EpochManager::new());
931
932        // Pin epoch
933        let guard1 = manager.pin();
934        assert_eq!(guard1.epoch, 0);
935
936        // Retire something
937        manager.retire(vec![1, 2, 3]);
938
939        // Advance epoch
940        manager.advance();
941
942        // New pin gets new epoch
943        let guard2 = manager.pin();
944        assert!(guard2.epoch >= guard1.epoch);
945
946        // Drop guards
947        drop(guard1);
948        drop(guard2);
949
950        // Can advance more
951        manager.advance();
952        manager.advance();
953    }
954
955    #[test]
956    fn test_sharded_distribution() {
957        let table = ShardedLockTable::new();
958
959        // Lock many rows and verify distribution
960        for i in 0..1000u128 {
961            assert_eq!(table.try_lock(i, LockMode::Shared, 1), LockResult::Acquired);
962        }
963
964        // Count locks per shard (should be somewhat distributed)
965        let mut non_empty_shards = 0;
966        for shard in &table.shards {
967            if !shard.lock().is_empty() {
968                non_empty_shards += 1;
969            }
970        }
971
972        // With 1000 locks across 256 shards, should use many shards
973        assert!(
974            non_empty_shards > 100,
975            "Expected better distribution: {} shards used",
976            non_empty_shards
977        );
978    }
979
980    #[test]
981    fn test_unlock_all() {
982        let table = ShardedLockTable::new();
983
984        // Lock many rows as txn 100
985        for i in 0..50u128 {
986            table.try_lock(i, LockMode::Exclusive, 100);
987        }
988
989        // Lock some as txn 200
990        for i in 50..100u128 {
991            table.try_lock(i, LockMode::Exclusive, 200);
992        }
993
994        // Unlock all for txn 100
995        let released = table.unlock_all(100);
996        assert_eq!(released, 50);
997
998        // Txn 200's locks should still be held
999        assert_eq!(
1000            table.try_lock(50, LockMode::Exclusive, 300),
1001            LockResult::WouldBlock
1002        );
1003
1004        // Txn 100's former locks should be available
1005        assert_eq!(
1006            table.try_lock(0, LockMode::Exclusive, 300),
1007            LockResult::Acquired
1008        );
1009    }
1010
1011    #[test]
1012    fn test_concurrent_locks() {
1013        let table = Arc::new(ShardedLockTable::new());
1014        let mut handles = vec![];
1015
1016        // Spawn threads that each lock different rows
1017        for txn_id in 0..16u64 {
1018            let table = table.clone();
1019            handles.push(thread::spawn(move || {
1020                let start = txn_id as u128 * 100;
1021                for i in 0..100 {
1022                    let result = table.try_lock(start + i, LockMode::Exclusive, txn_id);
1023                    assert_eq!(result, LockResult::Acquired);
1024                }
1025            }));
1026        }
1027
1028        for handle in handles {
1029            handle.join().unwrap();
1030        }
1031
1032        // Verify stats
1033        assert_eq!(
1034            table.stats().exclusive_acquired.load(Ordering::Relaxed),
1035            1600
1036        );
1037    }
1038}