Skip to main content

sochdb_core/
concurrency.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Hierarchical Lock Architecture with Epoch-Based Reclamation
19//!
20//! This module implements a production-quality concurrency control system:
21//! - Intent locks (IS, IX, S, X) for table-level coordination
22//! - Sharded row-level locks (256 shards) for minimal contention
23//! - Epoch-based reclamation for safe lock-free reads
24//! - Optimistic concurrency control for HNSW updates
25//!
26//! ## Lock Compatibility Matrix
27//!
28//! ```text
29//!         IS   IX   S    X
30//! IS      ✓    ✓    ✓    ✗
31//! IX      ✓    ✓    ✗    ✗
32//! S       ✓    ✗    ✓    ✗
33//! X       ✗    ✗    ✗    ✗
34//! ```
35//!
36//! ## Sharded Lock Table
37//!
38//! 256 shards reduce contention by distributing locks across independent mutexes.
39//! With N concurrent writers on M rows:
40//! - Per-shard arrival rate: λ' = λ/256
41//! - Average wait time: W' ≈ W/256
42
43use dashmap::DashMap;
44use parking_lot::Mutex;
45use std::collections::HashMap;
46use std::sync::Arc;
47use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
48
49/// Table identifier
50pub type TableId = u64;
51
52/// Row identifier
53pub type RowId = u128;
54
55/// Transaction ID for lock ownership
56pub type TxnId = u64;
57
58/// Intent lock modes for table-level locks
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60pub enum IntentLock {
61    /// Intent Shared - transaction intends to read some rows
62    IntentShared,
63    /// Intent Exclusive - transaction intends to write some rows
64    IntentExclusive,
65    /// Shared - table-level read lock (e.g., for full table scan)
66    Shared,
67    /// Exclusive - table-level write lock (e.g., for DDL)
68    Exclusive,
69}
70
71impl IntentLock {
72    /// Check if this lock is compatible with another lock
73    pub fn is_compatible(&self, other: &IntentLock) -> bool {
74        use IntentLock::*;
75        matches!(
76            (self, other),
77            (IntentShared, IntentShared)
78                | (IntentShared, IntentExclusive)
79                | (IntentShared, Shared)
80                | (IntentExclusive, IntentShared)
81                | (IntentExclusive, IntentExclusive)
82                | (Shared, IntentShared)
83                | (Shared, Shared)
84        )
85    }
86}
87
88/// Row-level lock modes
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum LockMode {
91    /// Shared (read) lock
92    Shared,
93    /// Exclusive (write) lock
94    Exclusive,
95}
96
97impl LockMode {
98    pub fn is_compatible(&self, other: &LockMode) -> bool {
99        matches!((self, other), (LockMode::Shared, LockMode::Shared))
100    }
101}
102
103/// Lock held on a table
104#[derive(Debug)]
105struct TableLockEntry {
106    mode: IntentLock,
107    holders: Vec<TxnId>,
108    #[allow(dead_code)]
109    waiters: Vec<(TxnId, IntentLock)>,
110}
111
112impl TableLockEntry {
113    fn new(mode: IntentLock, txn_id: TxnId) -> Self {
114        Self {
115            mode,
116            holders: vec![txn_id],
117            waiters: Vec::new(),
118        }
119    }
120}
121
122/// Lock held on a row
123#[derive(Debug)]
124struct RowLockEntry {
125    mode: LockMode,
126    holders: Vec<TxnId>,
127}
128
129impl RowLockEntry {
130    fn new(mode: LockMode, txn_id: TxnId) -> Self {
131        Self {
132            mode,
133            holders: vec![txn_id],
134        }
135    }
136}
137
138/// Sharded lock table for row-level locks
139/// 256 shards reduce contention by ~256x for uniform key distribution
140pub struct ShardedLockTable {
141    shards: [Mutex<HashMap<RowId, RowLockEntry>>; 256],
142    stats: LockTableStats,
143}
144
145impl Default for ShardedLockTable {
146    fn default() -> Self {
147        Self::new()
148    }
149}
150
151impl ShardedLockTable {
152    /// Create a new sharded lock table
153    pub fn new() -> Self {
154        Self {
155            shards: std::array::from_fn(|_| Mutex::new(HashMap::new())),
156            stats: LockTableStats::default(),
157        }
158    }
159
160    /// Get shard index for a row
161    #[inline]
162    fn shard_index(&self, row_id: RowId) -> usize {
163        // Use upper bits for better distribution
164        ((row_id >> 64) as usize ^ (row_id as usize)) % 256
165    }
166
167    /// Try to acquire a lock on a row
168    pub fn try_lock(&self, row_id: RowId, mode: LockMode, txn_id: TxnId) -> LockResult {
169        let shard_idx = self.shard_index(row_id);
170        let mut shard = self.shards[shard_idx].lock();
171
172        if let Some(entry) = shard.get_mut(&row_id) {
173            // Check if already held by this transaction
174            if entry.holders.contains(&txn_id) {
175                // Upgrade if needed
176                if entry.mode == LockMode::Shared && mode == LockMode::Exclusive {
177                    if entry.holders.len() == 1 {
178                        entry.mode = LockMode::Exclusive;
179                        self.stats.upgrades.fetch_add(1, Ordering::Relaxed);
180                        return LockResult::Acquired;
181                    } else {
182                        self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
183                        return LockResult::WouldBlock;
184                    }
185                }
186                return LockResult::AlreadyHeld;
187            }
188
189            // Check compatibility
190            if entry.mode.is_compatible(&mode) {
191                entry.holders.push(txn_id);
192                self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed);
193                return LockResult::Acquired;
194            }
195
196            self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
197            return LockResult::WouldBlock;
198        }
199
200        // No existing lock - acquire
201        shard.insert(row_id, RowLockEntry::new(mode, txn_id));
202        match mode {
203            LockMode::Shared => self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed),
204            LockMode::Exclusive => self
205                .stats
206                .exclusive_acquired
207                .fetch_add(1, Ordering::Relaxed),
208        };
209        LockResult::Acquired
210    }
211
212    /// Release a lock on a row
213    pub fn unlock(&self, row_id: RowId, txn_id: TxnId) -> bool {
214        let shard_idx = self.shard_index(row_id);
215        let mut shard = self.shards[shard_idx].lock();
216
217        if let Some(entry) = shard.get_mut(&row_id)
218            && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
219        {
220            entry.holders.remove(pos);
221            self.stats.released.fetch_add(1, Ordering::Relaxed);
222
223            if entry.holders.is_empty() {
224                shard.remove(&row_id);
225            }
226            return true;
227        }
228        false
229    }
230
231    /// Release all locks held by a transaction
232    pub fn unlock_all(&self, txn_id: TxnId) -> usize {
233        let mut count = 0;
234        for shard in &self.shards {
235            let mut shard_guard = shard.lock();
236            let to_remove: Vec<RowId> = shard_guard
237                .iter()
238                .filter(|(_, entry)| entry.holders.contains(&txn_id))
239                .map(|(&row_id, _)| row_id)
240                .collect();
241
242            for row_id in to_remove {
243                if let Some(entry) = shard_guard.get_mut(&row_id)
244                    && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
245                {
246                    entry.holders.remove(pos);
247                    count += 1;
248
249                    if entry.holders.is_empty() {
250                        shard_guard.remove(&row_id);
251                    }
252                }
253            }
254        }
255        self.stats
256            .released
257            .fetch_add(count as u64, Ordering::Relaxed);
258        count
259    }
260
261    /// Get statistics
262    pub fn stats(&self) -> &LockTableStats {
263        &self.stats
264    }
265}
266
267/// Lock table statistics
268#[derive(Debug, Default)]
269pub struct LockTableStats {
270    pub shared_acquired: AtomicU64,
271    pub exclusive_acquired: AtomicU64,
272    pub upgrades: AtomicU64,
273    pub conflicts: AtomicU64,
274    pub released: AtomicU64,
275}
276
277/// Result of a lock attempt
278#[derive(Debug, Clone, Copy, PartialEq, Eq)]
279pub enum LockResult {
280    /// Lock was acquired
281    Acquired,
282    /// Lock is already held by this transaction
283    AlreadyHeld,
284    /// Lock would block (conflict with existing lock)
285    WouldBlock,
286    /// Deadlock detected
287    Deadlock,
288}
289
290/// Hierarchical lock manager with table and row-level locks
291pub struct LockManager {
292    /// Table-level intent locks
293    table_locks: DashMap<TableId, TableLockEntry>,
294    /// Row-level sharded locks (per table)
295    row_locks: DashMap<TableId, Arc<ShardedLockTable>>,
296    /// Epoch-based reclamation for safe memory access
297    epoch: AtomicU64,
298    /// Statistics
299    stats: LockManagerStats,
300}
301
302impl Default for LockManager {
303    fn default() -> Self {
304        Self::new()
305    }
306}
307
308impl LockManager {
309    /// Create a new lock manager
310    pub fn new() -> Self {
311        Self {
312            table_locks: DashMap::new(),
313            row_locks: DashMap::new(),
314            epoch: AtomicU64::new(0),
315            stats: LockManagerStats::default(),
316        }
317    }
318
319    /// Acquire an intent lock on a table
320    pub fn lock_table(&self, table_id: TableId, mode: IntentLock, txn_id: TxnId) -> LockResult {
321        use dashmap::mapref::entry::Entry;
322
323        match self.table_locks.entry(table_id) {
324            Entry::Vacant(vacant) => {
325                vacant.insert(TableLockEntry::new(mode, txn_id));
326                self.stats
327                    .table_locks_acquired
328                    .fetch_add(1, Ordering::Relaxed);
329                LockResult::Acquired
330            }
331            Entry::Occupied(mut occupied) => {
332                let entry = occupied.get_mut();
333
334                // Check if already held by this transaction
335                if entry.holders.contains(&txn_id) {
336                    return LockResult::AlreadyHeld;
337                }
338
339                // Check compatibility
340                if entry.mode.is_compatible(&mode) {
341                    entry.holders.push(txn_id);
342                    self.stats
343                        .table_locks_acquired
344                        .fetch_add(1, Ordering::Relaxed);
345                    return LockResult::Acquired;
346                }
347
348                self.stats.table_conflicts.fetch_add(1, Ordering::Relaxed);
349                LockResult::WouldBlock
350            }
351        }
352    }
353
354    /// Release a table-level lock
355    pub fn unlock_table(&self, table_id: TableId, txn_id: TxnId) -> bool {
356        if let Some(mut entry) = self.table_locks.get_mut(&table_id)
357            && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
358        {
359            entry.holders.remove(pos);
360            self.stats
361                .table_locks_released
362                .fetch_add(1, Ordering::Relaxed);
363
364            if entry.holders.is_empty() {
365                drop(entry);
366                self.table_locks.remove(&table_id);
367            }
368            return true;
369        }
370        false
371    }
372
373    /// Get or create row lock table for a table
374    fn get_row_lock_table(&self, table_id: TableId) -> Arc<ShardedLockTable> {
375        self.row_locks
376            .entry(table_id)
377            .or_insert_with(|| Arc::new(ShardedLockTable::new()))
378            .clone()
379    }
380
381    /// Acquire a row-level lock
382    pub fn lock_row(
383        &self,
384        table_id: TableId,
385        row_id: RowId,
386        mode: LockMode,
387        txn_id: TxnId,
388    ) -> LockResult {
389        // First acquire intent lock on table
390        let intent_mode = match mode {
391            LockMode::Shared => IntentLock::IntentShared,
392            LockMode::Exclusive => IntentLock::IntentExclusive,
393        };
394
395        match self.lock_table(table_id, intent_mode, txn_id) {
396            LockResult::Acquired | LockResult::AlreadyHeld => {}
397            result => return result,
398        }
399
400        // Then acquire row lock
401        let row_locks = self.get_row_lock_table(table_id);
402        row_locks.try_lock(row_id, mode, txn_id)
403    }
404
405    /// Release a row-level lock
406    pub fn unlock_row(&self, table_id: TableId, row_id: RowId, txn_id: TxnId) -> bool {
407        if let Some(row_locks) = self.row_locks.get(&table_id) {
408            return row_locks.unlock(row_id, txn_id);
409        }
410        false
411    }
412
413    /// Release all locks held by a transaction
414    pub fn release_all(&self, txn_id: TxnId) -> usize {
415        let mut count = 0;
416
417        // Release row locks
418        for entry in self.row_locks.iter() {
419            count += entry.value().unlock_all(txn_id);
420        }
421
422        // Release table locks
423        let table_ids: Vec<TableId> = self
424            .table_locks
425            .iter()
426            .filter(|e| e.value().holders.contains(&txn_id))
427            .map(|e| *e.key())
428            .collect();
429
430        for table_id in table_ids {
431            if self.unlock_table(table_id, txn_id) {
432                count += 1;
433            }
434        }
435
436        count
437    }
438
439    /// Enter a new epoch (for epoch-based reclamation)
440    pub fn enter_epoch(&self) -> u64 {
441        self.epoch.fetch_add(1, Ordering::AcqRel)
442    }
443
444    /// Get current epoch
445    pub fn current_epoch(&self) -> u64 {
446        self.epoch.load(Ordering::Acquire)
447    }
448
449    /// Get statistics
450    pub fn stats(&self) -> &LockManagerStats {
451        &self.stats
452    }
453}
454
455/// Lock manager statistics
456#[derive(Debug, Default)]
457pub struct LockManagerStats {
458    pub table_locks_acquired: AtomicU64,
459    pub table_locks_released: AtomicU64,
460    pub table_conflicts: AtomicU64,
461}
462
463/// Optimistic concurrency control for HNSW nodes
464/// Uses version counters instead of locks for wait-free reads
465pub struct OptimisticVersion {
466    /// Version counter (even = stable, odd = being modified)
467    version: AtomicU64,
468}
469
470impl Default for OptimisticVersion {
471    fn default() -> Self {
472        Self::new()
473    }
474}
475
476impl OptimisticVersion {
477    pub fn new() -> Self {
478        Self {
479            version: AtomicU64::new(0),
480        }
481    }
482
483    /// Read the version (for optimistic read)
484    #[inline]
485    pub fn read_version(&self) -> u64 {
486        self.version.load(Ordering::Acquire)
487    }
488
489    /// Check if version is stable (not being modified)
490    #[inline]
491    pub fn is_stable(&self, version: u64) -> bool {
492        version & 1 == 0
493    }
494
495    /// Validate that version hasn't changed
496    #[inline]
497    pub fn validate(&self, read_version: u64) -> bool {
498        // Ensure we see all writes before version check
499        std::sync::atomic::fence(Ordering::Acquire);
500        self.version.load(Ordering::Relaxed) == read_version
501    }
502
503    /// Try to begin a write (returns None if concurrent write in progress)
504    pub fn try_write_begin(&self) -> Option<WriteGuard<'_>> {
505        let current = self.version.load(Ordering::Acquire);
506
507        // Check if stable
508        if !self.is_stable(current) {
509            return None;
510        }
511
512        // Try to CAS to odd (writing) state
513        match self.version.compare_exchange(
514            current,
515            current + 1,
516            Ordering::AcqRel,
517            Ordering::Relaxed,
518        ) {
519            Ok(_) => Some(WriteGuard {
520                version: &self.version,
521                start_version: current,
522            }),
523            Err(_) => None,
524        }
525    }
526}
527
528/// Guard that commits version on drop
529pub struct WriteGuard<'a> {
530    version: &'a AtomicU64,
531    start_version: u64,
532}
533
534impl<'a> WriteGuard<'a> {
535    /// Commit the write (increment version to even)
536    pub fn commit(self) {
537        self.version
538            .store(self.start_version + 2, Ordering::Release);
539        std::mem::forget(self); // Don't run drop
540    }
541
542    /// Abort the write (restore original version)
543    pub fn abort(self) {
544        self.version.store(self.start_version, Ordering::Release);
545        std::mem::forget(self);
546    }
547}
548
549impl<'a> Drop for WriteGuard<'a> {
550    fn drop(&mut self) {
551        // If dropped without commit/abort, abort the write
552        self.version.store(self.start_version, Ordering::Release);
553    }
554}
555
556/// Epoch-based reclamation for safe memory access
557pub struct EpochGuard {
558    manager: Arc<EpochManager>,
559    epoch: u64,
560}
561
562impl Drop for EpochGuard {
563    fn drop(&mut self) {
564        self.manager.leave_epoch(self.epoch);
565    }
566}
567
568/// Epoch manager for safe memory reclamation
569pub struct EpochManager {
570    /// Current global epoch
571    global_epoch: AtomicU64,
572    /// Number of threads in each epoch
573    epoch_counts: [AtomicUsize; 4],
574    /// Retired items pending reclamation
575    retired: Mutex<Vec<(u64, Box<dyn Send>)>>,
576}
577
578impl Default for EpochManager {
579    fn default() -> Self {
580        Self::new()
581    }
582}
583
584impl EpochManager {
585    pub fn new() -> Self {
586        Self {
587            global_epoch: AtomicU64::new(0),
588            epoch_counts: std::array::from_fn(|_| AtomicUsize::new(0)),
589            retired: Mutex::new(Vec::new()),
590        }
591    }
592
593    /// Pin the current epoch (enter critical section)
594    pub fn pin(self: &Arc<Self>) -> EpochGuard {
595        let epoch = self.global_epoch.load(Ordering::Acquire);
596        self.epoch_counts[(epoch % 4) as usize].fetch_add(1, Ordering::AcqRel);
597
598        EpochGuard {
599            manager: self.clone(),
600            epoch,
601        }
602    }
603
604    /// Leave an epoch
605    fn leave_epoch(&self, epoch: u64) {
606        self.epoch_counts[(epoch % 4) as usize].fetch_sub(1, Ordering::AcqRel);
607    }
608
609    /// Advance the global epoch (called periodically)
610    pub fn advance(&self) {
611        let current = self.global_epoch.load(Ordering::Acquire);
612        let old_epoch = (current + 2) % 4; // Two epochs ago
613
614        // Check if old epoch is empty
615        if self.epoch_counts[old_epoch as usize].load(Ordering::Acquire) == 0 {
616            self.global_epoch.fetch_add(1, Ordering::AcqRel);
617            self.reclaim(current.saturating_sub(2));
618        }
619    }
620
621    /// Retire an object for later reclamation
622    pub fn retire<T: Send + 'static>(&self, item: T) {
623        let epoch = self.global_epoch.load(Ordering::Acquire);
624        let mut retired = self.retired.lock();
625        retired.push((epoch, Box::new(item)));
626    }
627
628    /// Reclaim objects from old epochs
629    fn reclaim(&self, safe_epoch: u64) {
630        let mut retired = self.retired.lock();
631        retired.retain(|(epoch, _)| *epoch > safe_epoch);
632    }
633}
634
635#[cfg(test)]
636mod tests {
637    use super::*;
638    use std::thread;
639
640    #[test]
641    fn test_intent_lock_compatibility() {
642        use IntentLock::*;
643
644        // IS compatible with IS, IX, S
645        assert!(IntentShared.is_compatible(&IntentShared));
646        assert!(IntentShared.is_compatible(&IntentExclusive));
647        assert!(IntentShared.is_compatible(&Shared));
648        assert!(!IntentShared.is_compatible(&Exclusive));
649
650        // IX compatible with IS, IX
651        assert!(IntentExclusive.is_compatible(&IntentShared));
652        assert!(IntentExclusive.is_compatible(&IntentExclusive));
653        assert!(!IntentExclusive.is_compatible(&Shared));
654        assert!(!IntentExclusive.is_compatible(&Exclusive));
655
656        // S compatible with IS, S
657        assert!(Shared.is_compatible(&IntentShared));
658        assert!(!Shared.is_compatible(&IntentExclusive));
659        assert!(Shared.is_compatible(&Shared));
660        assert!(!Shared.is_compatible(&Exclusive));
661
662        // X compatible with nothing
663        assert!(!Exclusive.is_compatible(&IntentShared));
664        assert!(!Exclusive.is_compatible(&IntentExclusive));
665        assert!(!Exclusive.is_compatible(&Shared));
666        assert!(!Exclusive.is_compatible(&Exclusive));
667    }
668
669    #[test]
670    fn test_sharded_lock_table_basic() {
671        let table = ShardedLockTable::new();
672
673        // Acquire exclusive lock
674        assert_eq!(
675            table.try_lock(1, LockMode::Exclusive, 100),
676            LockResult::Acquired
677        );
678
679        // Can't acquire another exclusive on same row
680        assert_eq!(
681            table.try_lock(1, LockMode::Exclusive, 200),
682            LockResult::WouldBlock
683        );
684
685        // Can't acquire shared on exclusive
686        assert_eq!(
687            table.try_lock(1, LockMode::Shared, 200),
688            LockResult::WouldBlock
689        );
690
691        // Can acquire on different row
692        assert_eq!(
693            table.try_lock(2, LockMode::Exclusive, 200),
694            LockResult::Acquired
695        );
696
697        // Release and reacquire
698        assert!(table.unlock(1, 100));
699        assert_eq!(
700            table.try_lock(1, LockMode::Exclusive, 200),
701            LockResult::Acquired
702        );
703    }
704
705    #[test]
706    fn test_sharded_lock_table_shared() {
707        let table = ShardedLockTable::new();
708
709        // Multiple shared locks on same row
710        assert_eq!(
711            table.try_lock(1, LockMode::Shared, 100),
712            LockResult::Acquired
713        );
714        assert_eq!(
715            table.try_lock(1, LockMode::Shared, 200),
716            LockResult::Acquired
717        );
718        assert_eq!(
719            table.try_lock(1, LockMode::Shared, 300),
720            LockResult::Acquired
721        );
722
723        // Can't acquire exclusive while shared held
724        assert_eq!(
725            table.try_lock(1, LockMode::Exclusive, 400),
726            LockResult::WouldBlock
727        );
728
729        // Release all shared, then exclusive works
730        assert!(table.unlock(1, 100));
731        assert!(table.unlock(1, 200));
732        assert!(table.unlock(1, 300));
733        assert_eq!(
734            table.try_lock(1, LockMode::Exclusive, 400),
735            LockResult::Acquired
736        );
737    }
738
739    #[test]
740    fn test_sharded_lock_upgrade() {
741        let table = ShardedLockTable::new();
742
743        // Acquire shared
744        assert_eq!(
745            table.try_lock(1, LockMode::Shared, 100),
746            LockResult::Acquired
747        );
748
749        // Try to upgrade - should succeed since we're only holder
750        assert_eq!(
751            table.try_lock(1, LockMode::Exclusive, 100),
752            LockResult::Acquired
753        );
754
755        // Now we hold exclusive
756        assert_eq!(
757            table.try_lock(1, LockMode::Shared, 200),
758            LockResult::WouldBlock
759        );
760    }
761
762    #[test]
763    fn test_lock_manager_hierarchical() {
764        let manager = LockManager::new();
765
766        // Acquire row lock (should auto-acquire intent lock)
767        assert_eq!(
768            manager.lock_row(1, 100, LockMode::Exclusive, 1000),
769            LockResult::Acquired
770        );
771
772        // Another transaction can read different row
773        assert_eq!(
774            manager.lock_row(1, 200, LockMode::Shared, 2000),
775            LockResult::Acquired
776        );
777
778        // Can't acquire exclusive on locked row
779        assert_eq!(
780            manager.lock_row(1, 100, LockMode::Exclusive, 2000),
781            LockResult::WouldBlock
782        );
783
784        // Release all
785        let released = manager.release_all(1000);
786        assert!(released >= 1);
787    }
788
789    #[test]
790    fn test_optimistic_version() {
791        let version = OptimisticVersion::new();
792
793        // Read should see stable version (0)
794        let v = version.read_version();
795        assert!(version.is_stable(v));
796        assert!(version.validate(v));
797
798        // Write should increment
799        {
800            let guard = version.try_write_begin().unwrap();
801            let v_during = version.read_version();
802            assert!(!version.is_stable(v_during)); // Odd = writing
803            guard.commit();
804        }
805
806        // After commit, version is 2 (even = stable)
807        let v2 = version.read_version();
808        assert!(version.is_stable(v2));
809        assert_eq!(v2, 2);
810    }
811
812    #[test]
813    fn test_optimistic_concurrent() {
814        let version = Arc::new(OptimisticVersion::new());
815
816        // Start a write
817        let guard = version.try_write_begin().unwrap();
818
819        // Concurrent write attempt should fail
820        let version2 = version.clone();
821        let result = version2.try_write_begin();
822        assert!(result.is_none());
823
824        // Finish write
825        guard.commit();
826
827        // Now another write should succeed
828        let guard2 = version.try_write_begin().unwrap();
829        guard2.commit();
830    }
831
832    #[test]
833    fn test_epoch_manager() {
834        let manager = Arc::new(EpochManager::new());
835
836        // Pin epoch
837        let guard1 = manager.pin();
838        assert_eq!(guard1.epoch, 0);
839
840        // Retire something
841        manager.retire(vec![1, 2, 3]);
842
843        // Advance epoch
844        manager.advance();
845
846        // New pin gets new epoch
847        let guard2 = manager.pin();
848        assert!(guard2.epoch >= guard1.epoch);
849
850        // Drop guards
851        drop(guard1);
852        drop(guard2);
853
854        // Can advance more
855        manager.advance();
856        manager.advance();
857    }
858
859    #[test]
860    fn test_sharded_distribution() {
861        let table = ShardedLockTable::new();
862
863        // Lock many rows and verify distribution
864        for i in 0..1000u128 {
865            assert_eq!(table.try_lock(i, LockMode::Shared, 1), LockResult::Acquired);
866        }
867
868        // Count locks per shard (should be somewhat distributed)
869        let mut non_empty_shards = 0;
870        for shard in &table.shards {
871            if !shard.lock().is_empty() {
872                non_empty_shards += 1;
873            }
874        }
875
876        // With 1000 locks across 256 shards, should use many shards
877        assert!(
878            non_empty_shards > 100,
879            "Expected better distribution: {} shards used",
880            non_empty_shards
881        );
882    }
883
884    #[test]
885    fn test_unlock_all() {
886        let table = ShardedLockTable::new();
887
888        // Lock many rows as txn 100
889        for i in 0..50u128 {
890            table.try_lock(i, LockMode::Exclusive, 100);
891        }
892
893        // Lock some as txn 200
894        for i in 50..100u128 {
895            table.try_lock(i, LockMode::Exclusive, 200);
896        }
897
898        // Unlock all for txn 100
899        let released = table.unlock_all(100);
900        assert_eq!(released, 50);
901
902        // Txn 200's locks should still be held
903        assert_eq!(
904            table.try_lock(50, LockMode::Exclusive, 300),
905            LockResult::WouldBlock
906        );
907
908        // Txn 100's former locks should be available
909        assert_eq!(
910            table.try_lock(0, LockMode::Exclusive, 300),
911            LockResult::Acquired
912        );
913    }
914
915    #[test]
916    fn test_concurrent_locks() {
917        let table = Arc::new(ShardedLockTable::new());
918        let mut handles = vec![];
919
920        // Spawn threads that each lock different rows
921        for txn_id in 0..16u64 {
922            let table = table.clone();
923            handles.push(thread::spawn(move || {
924                let start = txn_id as u128 * 100;
925                for i in 0..100 {
926                    let result = table.try_lock(start + i, LockMode::Exclusive, txn_id);
927                    assert_eq!(result, LockResult::Acquired);
928                }
929            }));
930        }
931
932        for handle in handles {
933            handle.join().unwrap();
934        }
935
936        // Verify stats
937        assert_eq!(
938            table.stats().exclusive_acquired.load(Ordering::Relaxed),
939            1600
940        );
941    }
942}