sochdb_core/
concurrency.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Hierarchical Lock Architecture with Epoch-Based Reclamation
16//!
17//! This module implements a production-quality concurrency control system:
18//! - Intent locks (IS, IX, S, X) for table-level coordination
19//! - Sharded row-level locks (256 shards) for minimal contention
20//! - Epoch-based reclamation for safe lock-free reads
21//! - Optimistic concurrency control for HNSW updates
22//!
23//! ## Lock Compatibility Matrix
24//!
25//! ```text
26//!         IS   IX   S    X
27//! IS      ✓    ✓    ✓    ✗
28//! IX      ✓    ✓    ✗    ✗
29//! S       ✓    ✗    ✓    ✗
30//! X       ✗    ✗    ✗    ✗
31//! ```
32//!
33//! ## Sharded Lock Table
34//!
35//! 256 shards reduce contention by distributing locks across independent mutexes.
36//! With N concurrent writers on M rows:
37//! - Per-shard arrival rate: λ' = λ/256
38//! - Average wait time: W' ≈ W/256
39
40use dashmap::DashMap;
41use parking_lot::Mutex;
42use std::collections::HashMap;
43use std::sync::Arc;
44use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
45
46/// Table identifier
47pub type TableId = u64;
48
49/// Row identifier
50pub type RowId = u128;
51
52/// Transaction ID for lock ownership
53pub type TxnId = u64;
54
55/// Intent lock modes for table-level locks
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum IntentLock {
58    /// Intent Shared - transaction intends to read some rows
59    IntentShared,
60    /// Intent Exclusive - transaction intends to write some rows
61    IntentExclusive,
62    /// Shared - table-level read lock (e.g., for full table scan)
63    Shared,
64    /// Exclusive - table-level write lock (e.g., for DDL)
65    Exclusive,
66}
67
68impl IntentLock {
69    /// Check if this lock is compatible with another lock
70    pub fn is_compatible(&self, other: &IntentLock) -> bool {
71        use IntentLock::*;
72        matches!(
73            (self, other),
74            (IntentShared, IntentShared)
75                | (IntentShared, IntentExclusive)
76                | (IntentShared, Shared)
77                | (IntentExclusive, IntentShared)
78                | (IntentExclusive, IntentExclusive)
79                | (Shared, IntentShared)
80                | (Shared, Shared)
81        )
82    }
83}
84
85/// Row-level lock modes
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum LockMode {
88    /// Shared (read) lock
89    Shared,
90    /// Exclusive (write) lock
91    Exclusive,
92}
93
94impl LockMode {
95    pub fn is_compatible(&self, other: &LockMode) -> bool {
96        matches!((self, other), (LockMode::Shared, LockMode::Shared))
97    }
98}
99
100/// Lock held on a table
101#[derive(Debug)]
102struct TableLockEntry {
103    mode: IntentLock,
104    holders: Vec<TxnId>,
105    #[allow(dead_code)]
106    waiters: Vec<(TxnId, IntentLock)>,
107}
108
109impl TableLockEntry {
110    fn new(mode: IntentLock, txn_id: TxnId) -> Self {
111        Self {
112            mode,
113            holders: vec![txn_id],
114            waiters: Vec::new(),
115        }
116    }
117}
118
119/// Lock held on a row
120#[derive(Debug)]
121struct RowLockEntry {
122    mode: LockMode,
123    holders: Vec<TxnId>,
124}
125
126impl RowLockEntry {
127    fn new(mode: LockMode, txn_id: TxnId) -> Self {
128        Self {
129            mode,
130            holders: vec![txn_id],
131        }
132    }
133}
134
135/// Sharded lock table for row-level locks
136/// 256 shards reduce contention by ~256x for uniform key distribution
137pub struct ShardedLockTable {
138    shards: [Mutex<HashMap<RowId, RowLockEntry>>; 256],
139    stats: LockTableStats,
140}
141
142impl Default for ShardedLockTable {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl ShardedLockTable {
149    /// Create a new sharded lock table
150    pub fn new() -> Self {
151        Self {
152            shards: std::array::from_fn(|_| Mutex::new(HashMap::new())),
153            stats: LockTableStats::default(),
154        }
155    }
156
157    /// Get shard index for a row
158    #[inline]
159    fn shard_index(&self, row_id: RowId) -> usize {
160        // Use upper bits for better distribution
161        ((row_id >> 64) as usize ^ (row_id as usize)) % 256
162    }
163
164    /// Try to acquire a lock on a row
165    pub fn try_lock(&self, row_id: RowId, mode: LockMode, txn_id: TxnId) -> LockResult {
166        let shard_idx = self.shard_index(row_id);
167        let mut shard = self.shards[shard_idx].lock();
168
169        if let Some(entry) = shard.get_mut(&row_id) {
170            // Check if already held by this transaction
171            if entry.holders.contains(&txn_id) {
172                // Upgrade if needed
173                if entry.mode == LockMode::Shared && mode == LockMode::Exclusive {
174                    if entry.holders.len() == 1 {
175                        entry.mode = LockMode::Exclusive;
176                        self.stats.upgrades.fetch_add(1, Ordering::Relaxed);
177                        return LockResult::Acquired;
178                    } else {
179                        self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
180                        return LockResult::WouldBlock;
181                    }
182                }
183                return LockResult::AlreadyHeld;
184            }
185
186            // Check compatibility
187            if entry.mode.is_compatible(&mode) {
188                entry.holders.push(txn_id);
189                self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed);
190                return LockResult::Acquired;
191            }
192
193            self.stats.conflicts.fetch_add(1, Ordering::Relaxed);
194            return LockResult::WouldBlock;
195        }
196
197        // No existing lock - acquire
198        shard.insert(row_id, RowLockEntry::new(mode, txn_id));
199        match mode {
200            LockMode::Shared => self.stats.shared_acquired.fetch_add(1, Ordering::Relaxed),
201            LockMode::Exclusive => self
202                .stats
203                .exclusive_acquired
204                .fetch_add(1, Ordering::Relaxed),
205        };
206        LockResult::Acquired
207    }
208
209    /// Release a lock on a row
210    pub fn unlock(&self, row_id: RowId, txn_id: TxnId) -> bool {
211        let shard_idx = self.shard_index(row_id);
212        let mut shard = self.shards[shard_idx].lock();
213
214        if let Some(entry) = shard.get_mut(&row_id)
215            && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
216        {
217            entry.holders.remove(pos);
218            self.stats.released.fetch_add(1, Ordering::Relaxed);
219
220            if entry.holders.is_empty() {
221                shard.remove(&row_id);
222            }
223            return true;
224        }
225        false
226    }
227
228    /// Release all locks held by a transaction
229    pub fn unlock_all(&self, txn_id: TxnId) -> usize {
230        let mut count = 0;
231        for shard in &self.shards {
232            let mut shard_guard = shard.lock();
233            let to_remove: Vec<RowId> = shard_guard
234                .iter()
235                .filter(|(_, entry)| entry.holders.contains(&txn_id))
236                .map(|(&row_id, _)| row_id)
237                .collect();
238
239            for row_id in to_remove {
240                if let Some(entry) = shard_guard.get_mut(&row_id)
241                    && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
242                {
243                    entry.holders.remove(pos);
244                    count += 1;
245
246                    if entry.holders.is_empty() {
247                        shard_guard.remove(&row_id);
248                    }
249                }
250            }
251        }
252        self.stats
253            .released
254            .fetch_add(count as u64, Ordering::Relaxed);
255        count
256    }
257
258    /// Get statistics
259    pub fn stats(&self) -> &LockTableStats {
260        &self.stats
261    }
262}
263
264/// Lock table statistics
265#[derive(Debug, Default)]
266pub struct LockTableStats {
267    pub shared_acquired: AtomicU64,
268    pub exclusive_acquired: AtomicU64,
269    pub upgrades: AtomicU64,
270    pub conflicts: AtomicU64,
271    pub released: AtomicU64,
272}
273
274/// Result of a lock attempt
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum LockResult {
277    /// Lock was acquired
278    Acquired,
279    /// Lock is already held by this transaction
280    AlreadyHeld,
281    /// Lock would block (conflict with existing lock)
282    WouldBlock,
283    /// Deadlock detected
284    Deadlock,
285}
286
287/// Hierarchical lock manager with table and row-level locks
288pub struct LockManager {
289    /// Table-level intent locks
290    table_locks: DashMap<TableId, TableLockEntry>,
291    /// Row-level sharded locks (per table)
292    row_locks: DashMap<TableId, Arc<ShardedLockTable>>,
293    /// Epoch-based reclamation for safe memory access
294    epoch: AtomicU64,
295    /// Statistics
296    stats: LockManagerStats,
297}
298
299impl Default for LockManager {
300    fn default() -> Self {
301        Self::new()
302    }
303}
304
305impl LockManager {
306    /// Create a new lock manager
307    pub fn new() -> Self {
308        Self {
309            table_locks: DashMap::new(),
310            row_locks: DashMap::new(),
311            epoch: AtomicU64::new(0),
312            stats: LockManagerStats::default(),
313        }
314    }
315
316    /// Acquire an intent lock on a table
317    pub fn lock_table(&self, table_id: TableId, mode: IntentLock, txn_id: TxnId) -> LockResult {
318        use dashmap::mapref::entry::Entry;
319
320        match self.table_locks.entry(table_id) {
321            Entry::Vacant(vacant) => {
322                vacant.insert(TableLockEntry::new(mode, txn_id));
323                self.stats
324                    .table_locks_acquired
325                    .fetch_add(1, Ordering::Relaxed);
326                LockResult::Acquired
327            }
328            Entry::Occupied(mut occupied) => {
329                let entry = occupied.get_mut();
330
331                // Check if already held by this transaction
332                if entry.holders.contains(&txn_id) {
333                    return LockResult::AlreadyHeld;
334                }
335
336                // Check compatibility
337                if entry.mode.is_compatible(&mode) {
338                    entry.holders.push(txn_id);
339                    self.stats
340                        .table_locks_acquired
341                        .fetch_add(1, Ordering::Relaxed);
342                    return LockResult::Acquired;
343                }
344
345                self.stats.table_conflicts.fetch_add(1, Ordering::Relaxed);
346                LockResult::WouldBlock
347            }
348        }
349    }
350
351    /// Release a table-level lock
352    pub fn unlock_table(&self, table_id: TableId, txn_id: TxnId) -> bool {
353        if let Some(mut entry) = self.table_locks.get_mut(&table_id)
354            && let Some(pos) = entry.holders.iter().position(|&id| id == txn_id)
355        {
356            entry.holders.remove(pos);
357            self.stats
358                .table_locks_released
359                .fetch_add(1, Ordering::Relaxed);
360
361            if entry.holders.is_empty() {
362                drop(entry);
363                self.table_locks.remove(&table_id);
364            }
365            return true;
366        }
367        false
368    }
369
370    /// Get or create row lock table for a table
371    fn get_row_lock_table(&self, table_id: TableId) -> Arc<ShardedLockTable> {
372        self.row_locks
373            .entry(table_id)
374            .or_insert_with(|| Arc::new(ShardedLockTable::new()))
375            .clone()
376    }
377
378    /// Acquire a row-level lock
379    pub fn lock_row(
380        &self,
381        table_id: TableId,
382        row_id: RowId,
383        mode: LockMode,
384        txn_id: TxnId,
385    ) -> LockResult {
386        // First acquire intent lock on table
387        let intent_mode = match mode {
388            LockMode::Shared => IntentLock::IntentShared,
389            LockMode::Exclusive => IntentLock::IntentExclusive,
390        };
391
392        match self.lock_table(table_id, intent_mode, txn_id) {
393            LockResult::Acquired | LockResult::AlreadyHeld => {}
394            result => return result,
395        }
396
397        // Then acquire row lock
398        let row_locks = self.get_row_lock_table(table_id);
399        row_locks.try_lock(row_id, mode, txn_id)
400    }
401
402    /// Release a row-level lock
403    pub fn unlock_row(&self, table_id: TableId, row_id: RowId, txn_id: TxnId) -> bool {
404        if let Some(row_locks) = self.row_locks.get(&table_id) {
405            return row_locks.unlock(row_id, txn_id);
406        }
407        false
408    }
409
410    /// Release all locks held by a transaction
411    pub fn release_all(&self, txn_id: TxnId) -> usize {
412        let mut count = 0;
413
414        // Release row locks
415        for entry in self.row_locks.iter() {
416            count += entry.value().unlock_all(txn_id);
417        }
418
419        // Release table locks
420        let table_ids: Vec<TableId> = self
421            .table_locks
422            .iter()
423            .filter(|e| e.value().holders.contains(&txn_id))
424            .map(|e| *e.key())
425            .collect();
426
427        for table_id in table_ids {
428            if self.unlock_table(table_id, txn_id) {
429                count += 1;
430            }
431        }
432
433        count
434    }
435
436    /// Enter a new epoch (for epoch-based reclamation)
437    pub fn enter_epoch(&self) -> u64 {
438        self.epoch.fetch_add(1, Ordering::AcqRel)
439    }
440
441    /// Get current epoch
442    pub fn current_epoch(&self) -> u64 {
443        self.epoch.load(Ordering::Acquire)
444    }
445
446    /// Get statistics
447    pub fn stats(&self) -> &LockManagerStats {
448        &self.stats
449    }
450}
451
452/// Lock manager statistics
453#[derive(Debug, Default)]
454pub struct LockManagerStats {
455    pub table_locks_acquired: AtomicU64,
456    pub table_locks_released: AtomicU64,
457    pub table_conflicts: AtomicU64,
458}
459
460/// Optimistic concurrency control for HNSW nodes
461/// Uses version counters instead of locks for wait-free reads
462pub struct OptimisticVersion {
463    /// Version counter (even = stable, odd = being modified)
464    version: AtomicU64,
465}
466
467impl Default for OptimisticVersion {
468    fn default() -> Self {
469        Self::new()
470    }
471}
472
473impl OptimisticVersion {
474    pub fn new() -> Self {
475        Self {
476            version: AtomicU64::new(0),
477        }
478    }
479
480    /// Read the version (for optimistic read)
481    #[inline]
482    pub fn read_version(&self) -> u64 {
483        self.version.load(Ordering::Acquire)
484    }
485
486    /// Check if version is stable (not being modified)
487    #[inline]
488    pub fn is_stable(&self, version: u64) -> bool {
489        version & 1 == 0
490    }
491
492    /// Validate that version hasn't changed
493    #[inline]
494    pub fn validate(&self, read_version: u64) -> bool {
495        // Ensure we see all writes before version check
496        std::sync::atomic::fence(Ordering::Acquire);
497        self.version.load(Ordering::Relaxed) == read_version
498    }
499
500    /// Try to begin a write (returns None if concurrent write in progress)
501    pub fn try_write_begin(&self) -> Option<WriteGuard<'_>> {
502        let current = self.version.load(Ordering::Acquire);
503
504        // Check if stable
505        if !self.is_stable(current) {
506            return None;
507        }
508
509        // Try to CAS to odd (writing) state
510        match self.version.compare_exchange(
511            current,
512            current + 1,
513            Ordering::AcqRel,
514            Ordering::Relaxed,
515        ) {
516            Ok(_) => Some(WriteGuard {
517                version: &self.version,
518                start_version: current,
519            }),
520            Err(_) => None,
521        }
522    }
523}
524
525/// Guard that commits version on drop
526pub struct WriteGuard<'a> {
527    version: &'a AtomicU64,
528    start_version: u64,
529}
530
531impl<'a> WriteGuard<'a> {
532    /// Commit the write (increment version to even)
533    pub fn commit(self) {
534        self.version
535            .store(self.start_version + 2, Ordering::Release);
536        std::mem::forget(self); // Don't run drop
537    }
538
539    /// Abort the write (restore original version)
540    pub fn abort(self) {
541        self.version.store(self.start_version, Ordering::Release);
542        std::mem::forget(self);
543    }
544}
545
546impl<'a> Drop for WriteGuard<'a> {
547    fn drop(&mut self) {
548        // If dropped without commit/abort, abort the write
549        self.version.store(self.start_version, Ordering::Release);
550    }
551}
552
553/// Epoch-based reclamation for safe memory access
554pub struct EpochGuard {
555    manager: Arc<EpochManager>,
556    epoch: u64,
557}
558
559impl Drop for EpochGuard {
560    fn drop(&mut self) {
561        self.manager.leave_epoch(self.epoch);
562    }
563}
564
565/// Epoch manager for safe memory reclamation
566pub struct EpochManager {
567    /// Current global epoch
568    global_epoch: AtomicU64,
569    /// Number of threads in each epoch
570    epoch_counts: [AtomicUsize; 4],
571    /// Retired items pending reclamation
572    retired: Mutex<Vec<(u64, Box<dyn Send>)>>,
573}
574
575impl Default for EpochManager {
576    fn default() -> Self {
577        Self::new()
578    }
579}
580
581impl EpochManager {
582    pub fn new() -> Self {
583        Self {
584            global_epoch: AtomicU64::new(0),
585            epoch_counts: std::array::from_fn(|_| AtomicUsize::new(0)),
586            retired: Mutex::new(Vec::new()),
587        }
588    }
589
590    /// Pin the current epoch (enter critical section)
591    pub fn pin(self: &Arc<Self>) -> EpochGuard {
592        let epoch = self.global_epoch.load(Ordering::Acquire);
593        self.epoch_counts[(epoch % 4) as usize].fetch_add(1, Ordering::AcqRel);
594
595        EpochGuard {
596            manager: self.clone(),
597            epoch,
598        }
599    }
600
601    /// Leave an epoch
602    fn leave_epoch(&self, epoch: u64) {
603        self.epoch_counts[(epoch % 4) as usize].fetch_sub(1, Ordering::AcqRel);
604    }
605
606    /// Advance the global epoch (called periodically)
607    pub fn advance(&self) {
608        let current = self.global_epoch.load(Ordering::Acquire);
609        let old_epoch = (current + 2) % 4; // Two epochs ago
610
611        // Check if old epoch is empty
612        if self.epoch_counts[old_epoch as usize].load(Ordering::Acquire) == 0 {
613            self.global_epoch.fetch_add(1, Ordering::AcqRel);
614            self.reclaim(current.saturating_sub(2));
615        }
616    }
617
618    /// Retire an object for later reclamation
619    pub fn retire<T: Send + 'static>(&self, item: T) {
620        let epoch = self.global_epoch.load(Ordering::Acquire);
621        let mut retired = self.retired.lock();
622        retired.push((epoch, Box::new(item)));
623    }
624
625    /// Reclaim objects from old epochs
626    fn reclaim(&self, safe_epoch: u64) {
627        let mut retired = self.retired.lock();
628        retired.retain(|(epoch, _)| *epoch > safe_epoch);
629    }
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use std::thread;
636
637    #[test]
638    fn test_intent_lock_compatibility() {
639        use IntentLock::*;
640
641        // IS compatible with IS, IX, S
642        assert!(IntentShared.is_compatible(&IntentShared));
643        assert!(IntentShared.is_compatible(&IntentExclusive));
644        assert!(IntentShared.is_compatible(&Shared));
645        assert!(!IntentShared.is_compatible(&Exclusive));
646
647        // IX compatible with IS, IX
648        assert!(IntentExclusive.is_compatible(&IntentShared));
649        assert!(IntentExclusive.is_compatible(&IntentExclusive));
650        assert!(!IntentExclusive.is_compatible(&Shared));
651        assert!(!IntentExclusive.is_compatible(&Exclusive));
652
653        // S compatible with IS, S
654        assert!(Shared.is_compatible(&IntentShared));
655        assert!(!Shared.is_compatible(&IntentExclusive));
656        assert!(Shared.is_compatible(&Shared));
657        assert!(!Shared.is_compatible(&Exclusive));
658
659        // X compatible with nothing
660        assert!(!Exclusive.is_compatible(&IntentShared));
661        assert!(!Exclusive.is_compatible(&IntentExclusive));
662        assert!(!Exclusive.is_compatible(&Shared));
663        assert!(!Exclusive.is_compatible(&Exclusive));
664    }
665
666    #[test]
667    fn test_sharded_lock_table_basic() {
668        let table = ShardedLockTable::new();
669
670        // Acquire exclusive lock
671        assert_eq!(
672            table.try_lock(1, LockMode::Exclusive, 100),
673            LockResult::Acquired
674        );
675
676        // Can't acquire another exclusive on same row
677        assert_eq!(
678            table.try_lock(1, LockMode::Exclusive, 200),
679            LockResult::WouldBlock
680        );
681
682        // Can't acquire shared on exclusive
683        assert_eq!(
684            table.try_lock(1, LockMode::Shared, 200),
685            LockResult::WouldBlock
686        );
687
688        // Can acquire on different row
689        assert_eq!(
690            table.try_lock(2, LockMode::Exclusive, 200),
691            LockResult::Acquired
692        );
693
694        // Release and reacquire
695        assert!(table.unlock(1, 100));
696        assert_eq!(
697            table.try_lock(1, LockMode::Exclusive, 200),
698            LockResult::Acquired
699        );
700    }
701
702    #[test]
703    fn test_sharded_lock_table_shared() {
704        let table = ShardedLockTable::new();
705
706        // Multiple shared locks on same row
707        assert_eq!(
708            table.try_lock(1, LockMode::Shared, 100),
709            LockResult::Acquired
710        );
711        assert_eq!(
712            table.try_lock(1, LockMode::Shared, 200),
713            LockResult::Acquired
714        );
715        assert_eq!(
716            table.try_lock(1, LockMode::Shared, 300),
717            LockResult::Acquired
718        );
719
720        // Can't acquire exclusive while shared held
721        assert_eq!(
722            table.try_lock(1, LockMode::Exclusive, 400),
723            LockResult::WouldBlock
724        );
725
726        // Release all shared, then exclusive works
727        assert!(table.unlock(1, 100));
728        assert!(table.unlock(1, 200));
729        assert!(table.unlock(1, 300));
730        assert_eq!(
731            table.try_lock(1, LockMode::Exclusive, 400),
732            LockResult::Acquired
733        );
734    }
735
736    #[test]
737    fn test_sharded_lock_upgrade() {
738        let table = ShardedLockTable::new();
739
740        // Acquire shared
741        assert_eq!(
742            table.try_lock(1, LockMode::Shared, 100),
743            LockResult::Acquired
744        );
745
746        // Try to upgrade - should succeed since we're only holder
747        assert_eq!(
748            table.try_lock(1, LockMode::Exclusive, 100),
749            LockResult::Acquired
750        );
751
752        // Now we hold exclusive
753        assert_eq!(
754            table.try_lock(1, LockMode::Shared, 200),
755            LockResult::WouldBlock
756        );
757    }
758
759    #[test]
760    fn test_lock_manager_hierarchical() {
761        let manager = LockManager::new();
762
763        // Acquire row lock (should auto-acquire intent lock)
764        assert_eq!(
765            manager.lock_row(1, 100, LockMode::Exclusive, 1000),
766            LockResult::Acquired
767        );
768
769        // Another transaction can read different row
770        assert_eq!(
771            manager.lock_row(1, 200, LockMode::Shared, 2000),
772            LockResult::Acquired
773        );
774
775        // Can't acquire exclusive on locked row
776        assert_eq!(
777            manager.lock_row(1, 100, LockMode::Exclusive, 2000),
778            LockResult::WouldBlock
779        );
780
781        // Release all
782        let released = manager.release_all(1000);
783        assert!(released >= 1);
784    }
785
786    #[test]
787    fn test_optimistic_version() {
788        let version = OptimisticVersion::new();
789
790        // Read should see stable version (0)
791        let v = version.read_version();
792        assert!(version.is_stable(v));
793        assert!(version.validate(v));
794
795        // Write should increment
796        {
797            let guard = version.try_write_begin().unwrap();
798            let v_during = version.read_version();
799            assert!(!version.is_stable(v_during)); // Odd = writing
800            guard.commit();
801        }
802
803        // After commit, version is 2 (even = stable)
804        let v2 = version.read_version();
805        assert!(version.is_stable(v2));
806        assert_eq!(v2, 2);
807    }
808
809    #[test]
810    fn test_optimistic_concurrent() {
811        let version = Arc::new(OptimisticVersion::new());
812
813        // Start a write
814        let guard = version.try_write_begin().unwrap();
815
816        // Concurrent write attempt should fail
817        let version2 = version.clone();
818        let result = version2.try_write_begin();
819        assert!(result.is_none());
820
821        // Finish write
822        guard.commit();
823
824        // Now another write should succeed
825        let guard2 = version.try_write_begin().unwrap();
826        guard2.commit();
827    }
828
829    #[test]
830    fn test_epoch_manager() {
831        let manager = Arc::new(EpochManager::new());
832
833        // Pin epoch
834        let guard1 = manager.pin();
835        assert_eq!(guard1.epoch, 0);
836
837        // Retire something
838        manager.retire(vec![1, 2, 3]);
839
840        // Advance epoch
841        manager.advance();
842
843        // New pin gets new epoch
844        let guard2 = manager.pin();
845        assert!(guard2.epoch >= guard1.epoch);
846
847        // Drop guards
848        drop(guard1);
849        drop(guard2);
850
851        // Can advance more
852        manager.advance();
853        manager.advance();
854    }
855
856    #[test]
857    fn test_sharded_distribution() {
858        let table = ShardedLockTable::new();
859
860        // Lock many rows and verify distribution
861        for i in 0..1000u128 {
862            assert_eq!(table.try_lock(i, LockMode::Shared, 1), LockResult::Acquired);
863        }
864
865        // Count locks per shard (should be somewhat distributed)
866        let mut non_empty_shards = 0;
867        for shard in &table.shards {
868            if !shard.lock().is_empty() {
869                non_empty_shards += 1;
870            }
871        }
872
873        // With 1000 locks across 256 shards, should use many shards
874        assert!(
875            non_empty_shards > 100,
876            "Expected better distribution: {} shards used",
877            non_empty_shards
878        );
879    }
880
881    #[test]
882    fn test_unlock_all() {
883        let table = ShardedLockTable::new();
884
885        // Lock many rows as txn 100
886        for i in 0..50u128 {
887            table.try_lock(i, LockMode::Exclusive, 100);
888        }
889
890        // Lock some as txn 200
891        for i in 50..100u128 {
892            table.try_lock(i, LockMode::Exclusive, 200);
893        }
894
895        // Unlock all for txn 100
896        let released = table.unlock_all(100);
897        assert_eq!(released, 50);
898
899        // Txn 200's locks should still be held
900        assert_eq!(
901            table.try_lock(50, LockMode::Exclusive, 300),
902            LockResult::WouldBlock
903        );
904
905        // Txn 100's former locks should be available
906        assert_eq!(
907            table.try_lock(0, LockMode::Exclusive, 300),
908            LockResult::Acquired
909        );
910    }
911
912    #[test]
913    fn test_concurrent_locks() {
914        let table = Arc::new(ShardedLockTable::new());
915        let mut handles = vec![];
916
917        // Spawn threads that each lock different rows
918        for txn_id in 0..16u64 {
919            let table = table.clone();
920            handles.push(thread::spawn(move || {
921                let start = txn_id as u128 * 100;
922                for i in 0..100 {
923                    let result = table.try_lock(start + i, LockMode::Exclusive, txn_id);
924                    assert_eq!(result, LockResult::Acquired);
925                }
926            }));
927        }
928
929        for handle in handles {
930            handle.join().unwrap();
931        }
932
933        // Verify stats
934        assert_eq!(
935            table.stats().exclusive_acquired.load(Ordering::Relaxed),
936            1600
937        );
938    }
939}