Skip to main content

sochdb_storage/
mvcc_concurrent.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Concurrent MVCC for Multi-Reader Single-Writer Embedded Mode
19//!
20//! This module implements lock-free concurrent reads with single-writer
21//! coordination using shared memory MVCC metadata.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │  Shared Memory MVCC Metadata (.mvcc_metadata file, mmap'd)      │
28//! ├─────────────────────────────────────────────────────────────────┤
29//! │  Header:                                                         │
30//! │    - magic, version, page_size                                   │
31//! │    - current_epoch (AtomicU64)                                   │
32//! │    - current_ts (AtomicU64, HLC timestamp)                       │
33//! │    - writer_lock (AtomicU32, 0=free, pid=locked)                │
34//! │                                                                  │
35//! │  Reader Table (1024 slots × 64 bytes = 64KB):                    │
36//! │    [slot 0]: pid, snapshot_ts, epoch, last_heartbeat             │
37//! │    [slot 1]: ...                                                 │
38//! │    [slot N]: ...                                                 │
39//! │                                                                  │
40//! │  Version Store (DashMap-like lock-free hashtable):               │
41//! │    key → [version @ ts=105, version @ ts=99, ...]                │
42//! └─────────────────────────────────────────────────────────────────┘
43//! ```
44//!
45//! ## Performance Model
46//!
47//! - Lock-free read: ~100ns (atomic load + version lookup)
48//! - Writer lock acquisition: ~20ns (uncontended CAS)
49//! - Version GC: O(N_versions) every 1000 commits
50//!
51//! ## Safety
52//!
53//! - Readers are lock-free (no blocking, no starvation)
54//! - Single writer ensures WAL consistency
55//! - Epoch-based GC prevents use-after-free
56
57use std::fs::{File, OpenOptions};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
60use std::time::{Duration, SystemTime, UNIX_EPOCH};
61
62use dashmap::DashMap;
63use parking_lot::RwLock;
64use sochdb_core::{Result, SochDBError};
65
66// Type aliases to avoid conflicts with other modules
67pub type ConcurrentVersionChain = VersionChain;
68pub type ConcurrentVersionEntry = VersionEntry;
69
70// =============================================================================
71// Constants
72// =============================================================================
73
74/// Magic bytes for MVCC metadata file: "SOCHMVCC"
75const MVCC_MAGIC: u64 = 0x43435F564D484353; // "SOCHMVCC" little-endian
76
77/// Current format version
78const MVCC_VERSION: u32 = 1;
79
80/// Maximum concurrent readers
81const MAX_READERS: usize = 1024;
82
83/// Reader slot size (64 bytes = 1 cache line)
84const READER_SLOT_SIZE: usize = 64;
85
86/// Header size
87const HEADER_SIZE: usize = 64;
88
89/// Total metadata size (header + reader table)
90const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
91
92/// Stale reader timeout (60 seconds)
93const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
94
95/// GC interval (every N commits)
96const GC_COMMIT_INTERVAL: u64 = 1000;
97
98// =============================================================================
99// Hybrid Logical Clock
100// =============================================================================
101
102/// Hybrid Logical Clock for monotonic timestamps
103///
104/// Format: [48-bit physical time | 16-bit logical counter]
105///
106/// Properties:
107/// 1. Monotonically increasing
108/// 2. Causally ordered
109/// 3. Resolution: 65,536 events per millisecond
110#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
111pub struct HlcTimestamp(pub u64);
112
113impl HlcTimestamp {
114    /// Create timestamp from physical time (ms) and logical counter
115    #[inline]
116    pub fn new(physical_ms: u64, logical: u16) -> Self {
117        Self((physical_ms << 16) | (logical as u64))
118    }
119
120    /// Get physical time component (milliseconds since epoch)
121    #[inline]
122    pub fn physical_ms(&self) -> u64 {
123        self.0 >> 16
124    }
125
126    /// Get logical counter component
127    #[inline]
128    pub fn logical(&self) -> u16 {
129        (self.0 & 0xFFFF) as u16
130    }
131
132    /// Get raw value
133    #[inline]
134    pub fn raw(&self) -> u64 {
135        self.0
136    }
137
138    /// Allocate next timestamp (atomic, lock-free)
139    ///
140    /// Algorithm:
141    /// 1. Read current physical time
142    /// 2. If physical > last_physical: reset logical to 0
143    /// 3. Else: increment logical counter
144    /// 4. CAS to update, retry on conflict
145    pub fn allocate_next(last: &AtomicU64) -> Self {
146        let physical_now = SystemTime::now()
147            .duration_since(UNIX_EPOCH)
148            .unwrap()
149            .as_millis() as u64;
150
151        loop {
152            let last_val = last.load(Ordering::Acquire);
153            let last_phys = last_val >> 16;
154            let last_log = (last_val & 0xFFFF) as u16;
155
156            let (new_phys, new_log) = if physical_now > last_phys {
157                (physical_now, 0u16)
158            } else {
159                // Clock hasn't advanced, increment logical
160                (last_phys, last_log.saturating_add(1))
161            };
162
163            let new_val = (new_phys << 16) | (new_log as u64);
164
165            if last
166                .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
167                .is_ok()
168            {
169                return Self(new_val);
170            }
171            // CAS failed, retry
172            std::hint::spin_loop();
173        }
174    }
175
176    /// Read current timestamp without advancing
177    #[inline]
178    pub fn read_current(ts: &AtomicU64) -> Self {
179        Self(ts.load(Ordering::Acquire))
180    }
181}
182
183impl From<u64> for HlcTimestamp {
184    fn from(val: u64) -> Self {
185        Self(val)
186    }
187}
188
189impl From<HlcTimestamp> for u64 {
190    fn from(ts: HlcTimestamp) -> Self {
191        ts.0
192    }
193}
194
195// =============================================================================
196// Reader Slot (Cache-Line Aligned)
197// =============================================================================
198
199/// Reader slot in shared memory (64 bytes = 1 cache line)
200///
201/// Each active reader registers in a slot to prevent GC from
202/// reclaiming versions it might need.
203///
204/// Layout (with #[repr(C, align(64))]):
205///   pid:            AtomicU32 (4 bytes)
206///   <pad>:          4 bytes   (alignment for AtomicU64)
207///   snapshot_ts:    AtomicU64 (8 bytes)
208///   epoch:          AtomicU32 (4 bytes)
209///   <pad>:          4 bytes   (alignment for AtomicU64)
210///   last_heartbeat: AtomicU64 (8 bytes)
211///   _reserved:      [u8; 32]  (pad to 64 bytes total)
212///   Total: 64 bytes — fits exactly in one cache line
213#[repr(C, align(64))]
214#[derive(Debug)]
215pub struct ReaderSlot {
216    /// Process ID (0 = slot is free)
217    pub pid: AtomicU32,
218    /// Snapshot timestamp this reader is using
219    pub snapshot_ts: AtomicU64,
220    /// Epoch number when reader registered
221    pub epoch: AtomicU32,
222    /// Last heartbeat (microseconds since epoch)
223    pub last_heartbeat: AtomicU64,
224    /// Reserved for future use (sized to make struct exactly 64 bytes)
225    _reserved: [u8; 32],
226}
227
228impl ReaderSlot {
229    /// Create an empty reader slot
230    pub const fn empty() -> Self {
231        Self {
232            pid: AtomicU32::new(0),
233            snapshot_ts: AtomicU64::new(0),
234            epoch: AtomicU32::new(0),
235            last_heartbeat: AtomicU64::new(0),
236            _reserved: [0u8; 32],
237        }
238    }
239
240    /// Check if slot is free
241    #[inline]
242    pub fn is_free(&self) -> bool {
243        self.pid.load(Ordering::Acquire) == 0
244    }
245
246    /// Try to claim this slot for reading
247    ///
248    /// Returns true if successfully claimed.
249    #[inline]
250    pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
251        let current_pid = self.pid.load(Ordering::Acquire);
252        
253        // Only claim if free or already ours
254        if current_pid != 0 && current_pid != my_pid {
255            return false;
256        }
257
258        if self
259            .pid
260            .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
261            .is_ok()
262        {
263            // Successfully claimed, update metadata
264            self.snapshot_ts.store(snapshot_ts, Ordering::Release);
265            self.epoch.store(epoch, Ordering::Release);
266            self.last_heartbeat
267                .store(current_time_us(), Ordering::Release);
268            true
269        } else {
270            false
271        }
272    }
273
274    /// Release this slot
275    #[inline]
276    pub fn release(&self, my_pid: u32) {
277        // Only release if we own it
278        if self.pid.load(Ordering::Acquire) == my_pid {
279            self.snapshot_ts.store(0, Ordering::Release);
280            self.pid.store(0, Ordering::Release);
281        }
282    }
283
284    /// Update heartbeat
285    #[inline]
286    pub fn heartbeat(&self) {
287        self.last_heartbeat
288            .store(current_time_us(), Ordering::Release);
289    }
290
291    /// Check if this slot is stale (process crashed or hung)
292    #[inline]
293    pub fn is_stale(&self, now_us: u64) -> bool {
294        let pid = self.pid.load(Ordering::Acquire);
295        if pid == 0 {
296            return false; // Empty slot
297        }
298
299        // Check heartbeat timeout
300        let last_hb = self.last_heartbeat.load(Ordering::Acquire);
301        if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
302            return true;
303        }
304
305        // Check if process still exists
306        !process_exists(pid)
307    }
308}
309
310// =============================================================================
311// MVCC Metadata Header
312// =============================================================================
313
314/// MVCC metadata file header
315#[repr(C)]
316#[derive(Debug)]
317pub struct MvccHeader {
318    /// Magic bytes for validation
319    pub magic: u64,
320    /// Format version
321    pub version: u32,
322    /// Page size
323    pub page_size: u32,
324    /// Number of reader slots
325    pub num_readers: u32,
326    /// Current epoch (incremented on recovery/GC)
327    pub current_epoch: AtomicU64,
328    /// Current HLC timestamp
329    pub current_ts: AtomicU64,
330    /// Writer lock (0 = free, pid = locked)
331    pub writer_lock: AtomicU32,
332    /// Number of commits since last GC
333    pub commits_since_gc: AtomicU64,
334    /// Reserved for alignment
335    _reserved: [u8; 4],
336}
337
338impl MvccHeader {
339    /// Create new header with default values
340    pub fn new() -> Self {
341        Self {
342            magic: MVCC_MAGIC,
343            version: MVCC_VERSION,
344            page_size: 4096,
345            num_readers: MAX_READERS as u32,
346            current_epoch: AtomicU64::new(1),
347            current_ts: AtomicU64::new(HlcTimestamp::new(
348                SystemTime::now()
349                    .duration_since(UNIX_EPOCH)
350                    .unwrap()
351                    .as_millis() as u64,
352                0,
353            ).raw()),
354            writer_lock: AtomicU32::new(0),
355            commits_since_gc: AtomicU64::new(0),
356            _reserved: [0u8; 4],
357        }
358    }
359
360    /// Validate header
361    pub fn validate(&self) -> Result<()> {
362        if self.magic != MVCC_MAGIC {
363            return Err(SochDBError::Corruption(
364                "Invalid MVCC metadata magic".into(),
365            ));
366        }
367        if self.version != MVCC_VERSION {
368            return Err(SochDBError::Corruption(format!(
369                "Unsupported MVCC version: {} (expected {})",
370                self.version, MVCC_VERSION
371            )));
372        }
373        Ok(())
374    }
375}
376
377impl Default for MvccHeader {
378    fn default() -> Self {
379        Self::new()
380    }
381}
382
383// =============================================================================
384// Version Entry
385// =============================================================================
386
387/// A single version of a key-value pair
388#[derive(Debug, Clone)]
389pub struct VersionEntry {
390    /// Commit timestamp (HLC)
391    pub commit_ts: u64,
392    /// Transaction ID that created this version
393    pub txn_id: u64,
394    /// Epoch when this version was created
395    pub epoch: u32,
396    /// The value (None = tombstone/deletion)
397    pub value: Option<Vec<u8>>,
398}
399
400impl VersionEntry {
401    /// Create new version entry
402    pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
403        Self {
404            commit_ts,
405            txn_id,
406            epoch,
407            value,
408        }
409    }
410
411    /// Check if this version is visible at given snapshot
412    #[inline]
413    pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
414        self.commit_ts > 0 && self.commit_ts < snapshot_ts
415    }
416}
417
418// =============================================================================
419// Version Chain (Sorted by commit_ts descending)
420// =============================================================================
421
422/// Chain of versions for a single key, sorted by commit_ts descending
423///
424/// Optimized for:
425/// - O(log V) reads via binary search
426/// - O(1) writes (prepend to front)
427/// - O(V) GC (linear scan with compaction)
428#[derive(Debug, Default)]
429pub struct VersionChain {
430    /// Committed versions, sorted by commit_ts DESCENDING
431    versions: Vec<VersionEntry>,
432    /// Uncommitted version (at most one per key)
433    uncommitted: Option<VersionEntry>,
434}
435
436impl VersionChain {
437    /// Create empty version chain
438    pub fn new() -> Self {
439        Self {
440            versions: Vec::new(),
441            uncommitted: None,
442        }
443    }
444
445    /// Add uncommitted version
446    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
447        self.uncommitted = Some(VersionEntry {
448            commit_ts: 0, // Uncommitted
449            txn_id,
450            epoch,
451            value,
452        });
453    }
454
455    /// Commit the uncommitted version
456    ///
457    /// Returns true if committed successfully
458    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
459        if let Some(ref mut v) = self.uncommitted {
460            if v.txn_id == txn_id {
461                v.commit_ts = commit_ts;
462                let committed = self.uncommitted.take().unwrap();
463
464                // Insert into sorted position (descending by commit_ts)
465                // Binary search for insertion point
466                let pos = self
467                    .versions
468                    .partition_point(|existing| existing.commit_ts > commit_ts);
469                self.versions.insert(pos, committed);
470
471                return true;
472            }
473        }
474        false
475    }
476
477    /// Abort uncommitted version
478    pub fn abort(&mut self, txn_id: u64) {
479        if let Some(ref v) = self.uncommitted {
480            if v.txn_id == txn_id {
481                self.uncommitted = None;
482            }
483        }
484    }
485
486    /// Read at snapshot timestamp
487    ///
488    /// Returns the most recent version visible at snapshot_ts.
489    /// If current_txn_id is provided, also considers uncommitted writes from that txn.
490    ///
491    /// Complexity: O(log V) via binary search
492    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
493        // Check uncommitted version from current transaction
494        if let Some(txn_id) = current_txn_id {
495            if let Some(ref v) = self.uncommitted {
496                if v.txn_id == txn_id {
497                    return Some(v);
498                }
499            }
500        }
501
502        // Binary search for first version with commit_ts < snapshot_ts
503        // Versions are sorted descending, so we find first where commit_ts < snapshot_ts
504        let idx = self
505            .versions
506            .partition_point(|v| v.commit_ts >= snapshot_ts);
507
508        self.versions.get(idx)
509    }
510
511    /// Check if there's a write conflict
512    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
513        if let Some(ref v) = self.uncommitted {
514            return v.txn_id != my_txn_id;
515        }
516        false
517    }
518
519    /// Garbage collect old versions
520    ///
521    /// Removes versions that are:
522    /// 1. Older than min_visible_epoch AND
523    /// 2. Older than min_snapshot_ts AND
524    /// 3. Not the most recent version (we always keep at least one)
525    ///
526    /// Returns number of versions reclaimed
527    pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
528        if self.versions.len() <= 1 {
529            return 0; // Always keep at least one version
530        }
531
532        let original_len = self.versions.len();
533
534        // Keep versions that are:
535        // - In current or recent epoch, OR
536        // - Potentially visible to active readers, OR
537        // - The most recent version (always keep)
538        let mut keep_count = 1; // Always keep newest
539        for v in self.versions.iter().skip(1) {
540            if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
541                keep_count += 1;
542            } else {
543                break; // Sorted descending, so all remaining are older
544            }
545        }
546
547        self.versions.truncate(keep_count);
548        original_len - self.versions.len()
549    }
550
551    /// Get number of versions
552    pub fn len(&self) -> usize {
553        self.versions.len() + if self.uncommitted.is_some() { 1 } else { 0 }
554    }
555
556    /// Check if empty
557    pub fn is_empty(&self) -> bool {
558        self.versions.is_empty() && self.uncommitted.is_none()
559    }
560}
561
562// =============================================================================
563// Concurrent Version Store
564// =============================================================================
565
566/// Lock-free version store using DashMap
567///
568/// Provides O(1) key lookup + O(log V) version lookup per key.
569pub struct VersionStore {
570    /// Key → VersionChain mapping
571    data: DashMap<Vec<u8>, VersionChain>,
572    /// Statistics
573    stats: VersionStoreStats,
574}
575
576/// Version store statistics
577#[derive(Debug, Default)]
578pub struct VersionStoreStats {
579    /// Total number of keys
580    pub num_keys: AtomicU64,
581    /// Total number of versions across all keys
582    pub num_versions: AtomicU64,
583    /// Number of GC passes
584    pub gc_passes: AtomicU64,
585    /// Versions reclaimed by GC
586    pub versions_reclaimed: AtomicU64,
587}
588
589impl VersionStore {
590    /// Create new version store
591    pub fn new() -> Self {
592        Self {
593            data: DashMap::new(),
594            stats: VersionStoreStats::default(),
595        }
596    }
597
598    /// Insert a new uncommitted version
599    pub fn insert_uncommitted(
600        &self,
601        key: &[u8],
602        value: Option<Vec<u8>>,
603        txn_id: u64,
604        epoch: u32,
605    ) -> Result<()> {
606        let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
607            self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
608            VersionChain::new()
609        });
610
611        // Check for write conflict
612        if entry.has_write_conflict(txn_id) {
613            return Err(SochDBError::Internal(
614                "Write conflict: another transaction has uncommitted write".into(),
615            ));
616        }
617
618        entry.add_uncommitted(value, txn_id, epoch);
619        self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
620        Ok(())
621    }
622
623    /// Commit a version
624    pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
625        if let Some(mut entry) = self.data.get_mut(key) {
626            return entry.commit(txn_id, commit_ts);
627        }
628        false
629    }
630
631    /// Abort uncommitted version
632    pub fn abort(&self, key: &[u8], txn_id: u64) {
633        if let Some(mut entry) = self.data.get_mut(key) {
634            entry.abort(txn_id);
635            self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
636        }
637    }
638
639    /// Read value at snapshot timestamp
640    pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
641        self.data.get(key).and_then(|chain| {
642            chain
643                .read_at(snapshot_ts, current_txn_id)
644                .and_then(|v| v.value.clone())
645        })
646    }
647
648    /// Check if key exists at snapshot
649    pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
650        self.data
651            .get(key)
652            .map(|chain| chain.read_at(snapshot_ts, None).is_some())
653            .unwrap_or(false)
654    }
655
656    /// Run garbage collection
657    ///
658    /// Removes old versions that are no longer visible to any reader.
659    pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
660        let mut total_reclaimed = 0;
661
662        for mut entry in self.data.iter_mut() {
663            let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
664            total_reclaimed += reclaimed;
665        }
666
667        self.stats
668            .gc_passes
669            .fetch_add(1, Ordering::Relaxed);
670        self.stats
671            .versions_reclaimed
672            .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
673
674        total_reclaimed
675    }
676
677    /// Get number of keys
678    pub fn len(&self) -> usize {
679        self.data.len()
680    }
681
682    /// Check if empty
683    pub fn is_empty(&self) -> bool {
684        self.data.is_empty()
685    }
686
687    /// Get statistics
688    pub fn stats(&self) -> &VersionStoreStats {
689        &self.stats
690    }
691}
692
693impl Default for VersionStore {
694    fn default() -> Self {
695        Self::new()
696    }
697}
698
699// =============================================================================
700// Concurrent MVCC Manager
701// =============================================================================
702
703/// Manager for concurrent MVCC operations
704///
705/// Coordinates:
706/// - Reader registration (lock-free)
707/// - Writer locking (single-writer)
708/// - Version store access
709/// - Garbage collection
710///
711/// The MVCC header (writer_lock, current_ts, current_epoch) is stored in
712/// a memory-mapped file so that AtomicU32/AtomicU64 CAS operations work
713/// correctly across independent OS processes. Without mmap, each process
714/// would have its own copy of these atomics, making cross-process
715/// coordination impossible.
716pub struct ConcurrentMvcc {
717    /// Path to database
718    path: PathBuf,
719    /// Memory-mapped metadata file (keeps mmap alive)
720    /// SAFETY: The mmap is opened with read-write access and the header
721    /// pointer below points into this mapping. The mapping must outlive
722    /// all references to `header`.
723    _mmap: memmap2::MmapMut,
724    /// Pointer to the MVCC header inside the mmap'd region
725    /// SAFETY: Valid for the lifetime of `_mmap`. The header is repr(C)
726    /// and the mmap region is at least METADATA_SIZE bytes.
727    header: *const MvccHeader,
728    /// Pointer to reader slots inside the mmap'd region  
729    /// SAFETY: Valid for the lifetime of `_mmap`. Points to the region
730    /// starting at offset HEADER_SIZE within the mapping.
731    reader_slots_ptr: *const ReaderSlot,
732    /// Number of reader slots
733    num_reader_slots: usize,
734    /// Version store (in-process — versions are rebuilt from WAL on open)
735    version_store: VersionStore,
736    /// Our process ID
737    our_pid: u32,
738    /// Slot index we're using (if registered as reader)
739    our_slot: RwLock<Option<usize>>,
740}
741
742// SAFETY: The mmap'd region contains only atomic types (AtomicU32, AtomicU64)
743// which are inherently safe for concurrent access from multiple threads.
744// The mmap itself is backed by a file that is shared across processes.
745unsafe impl Send for ConcurrentMvcc {}
746unsafe impl Sync for ConcurrentMvcc {}
747
748impl ConcurrentMvcc {
749    /// Open or create MVCC manager with shared memory-mapped metadata
750    ///
751    /// The metadata file (.mvcc_metadata) is mmap'd so that the writer_lock,
752    /// current_ts, and reader slots are shared across all processes that open
753    /// the same database. This enables true cross-process atomic coordination.
754    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
755        let path = path.as_ref().to_path_buf();
756        std::fs::create_dir_all(&path)?;
757
758        let metadata_path = path.join(".mvcc_metadata");
759        let is_new = !metadata_path.exists();
760
761        // Create or open the metadata file at the required size
762        let file = OpenOptions::new()
763            .create(true)
764            .read(true)
765            .write(true)
766            .open(&metadata_path)?;
767
768        // Ensure file is the correct size (header + reader slots)
769        let required_size = METADATA_SIZE as u64;
770        if file.metadata()?.len() < required_size {
771            file.set_len(required_size)?;
772        }
773
774        // Memory-map the file
775        // SAFETY: The file is opened read-write and we control its contents.
776        // Multiple processes may mmap the same file simultaneously — this is
777        // the intended usage pattern. The atomics in the header ensure safe
778        // concurrent access.
779        let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
780
781        if is_new || mmap.len() < METADATA_SIZE {
782            // Initialize fresh metadata
783            let header = MvccHeader::new();
784            let header_bytes = unsafe {
785                std::slice::from_raw_parts(
786                    &header as *const MvccHeader as *const u8,
787                    std::mem::size_of::<MvccHeader>(),
788                )
789            };
790            mmap[..header_bytes.len()].copy_from_slice(header_bytes);
791
792            // Zero-initialize reader slots (already zero from set_len, but be explicit)
793            for i in 0..MAX_READERS {
794                let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
795                let end = offset + READER_SLOT_SIZE;
796                if end <= mmap.len() {
797                    mmap[offset..end].fill(0);
798                }
799            }
800
801            mmap.flush()?;
802        } else {
803            // Validate existing header
804            let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
805            header_ref.validate()?;
806        }
807
808        // Get pointers into the mmap'd region
809        let header = mmap.as_ptr() as *const MvccHeader;
810        let reader_slots_ptr = unsafe {
811            mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot
812        };
813
814        Ok(Self {
815            path,
816            _mmap: mmap,
817            header,
818            reader_slots_ptr,
819            num_reader_slots: MAX_READERS,
820            version_store: VersionStore::new(),
821            our_pid: std::process::id(),
822            our_slot: RwLock::new(None),
823        })
824    }
825
826    /// Get reference to the shared header
827    ///
828    /// SAFETY: The header pointer is valid for the lifetime of self._mmap
829    #[inline]
830    fn header(&self) -> &MvccHeader {
831        unsafe { &*self.header }
832    }
833
834    /// Get reference to a reader slot
835    ///
836    /// SAFETY: The pointer is valid for the lifetime of self._mmap
837    #[inline]
838    fn reader_slot(&self, idx: usize) -> &ReaderSlot {
839        assert!(idx < self.num_reader_slots);
840        unsafe { &*self.reader_slots_ptr.add(idx) }
841    }
842
843    /// Allocate next timestamp
844    #[inline]
845    pub fn allocate_timestamp(&self) -> HlcTimestamp {
846        HlcTimestamp::allocate_next(&self.header().current_ts)
847    }
848
849    /// Get current timestamp without advancing
850    #[inline]
851    pub fn current_timestamp(&self) -> HlcTimestamp {
852        HlcTimestamp::read_current(&self.header().current_ts)
853    }
854
855    /// Get current epoch
856    #[inline]
857    pub fn current_epoch(&self) -> u64 {
858        self.header().current_epoch.load(Ordering::Acquire)
859    }
860
861    /// Register as active reader
862    ///
863    /// Returns slot index on success.
864    /// Must call `unregister_reader()` when done.
865    pub fn register_reader(&self) -> Result<usize> {
866        let snapshot_ts = self.current_timestamp().raw();
867        let epoch = self.current_epoch() as u32;
868
869        // Find free slot in shared mmap'd reader table
870        for i in 0..self.num_reader_slots {
871            let slot = self.reader_slot(i);
872            if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
873                *self.our_slot.write() = Some(i);
874                return Ok(i);
875            }
876        }
877
878        Err(SochDBError::ResourceExhausted(
879            "Too many concurrent readers".into(),
880        ))
881    }
882
883    /// Unregister as reader
884    pub fn unregister_reader(&self, slot_idx: usize) {
885        if slot_idx < self.num_reader_slots {
886            self.reader_slot(slot_idx).release(self.our_pid);
887            *self.our_slot.write() = None;
888        }
889    }
890
891    /// Try to acquire writer lock
892    ///
893    /// Returns WriterGuard on success, which releases lock on drop.
894    pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
895        let current = self.header().writer_lock.load(Ordering::Acquire);
896
897        if current == 0 {
898            // Try to acquire (CAS on shared mmap'd atomic)
899            if self
900                .header()
901                .writer_lock
902                .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
903                .is_ok()
904            {
905                return Ok(WriterGuard { mvcc: self });
906            }
907        } else if current == self.our_pid {
908            // Already own the lock (reentrant)
909            return Ok(WriterGuard { mvcc: self });
910        }
911
912        Err(SochDBError::LockError(format!(
913            "Writer lock held by process {}",
914            current
915        )))
916    }
917
918    /// Acquire writer lock with timeout
919    pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
920        let deadline = std::time::Instant::now() + timeout;
921
922        loop {
923            match self.try_acquire_writer() {
924                Ok(guard) => return Ok(guard),
925                Err(_) if std::time::Instant::now() < deadline => {
926                    std::thread::sleep(Duration::from_micros(100));
927                }
928                Err(e) => return Err(e),
929            }
930        }
931    }
932
933    /// Release writer lock
934    fn release_writer(&self) {
935        let current = self.header().writer_lock.load(Ordering::Acquire);
936        if current == self.our_pid {
937            self.header().writer_lock.store(0, Ordering::Release);
938        }
939    }
940
941    /// Get version store
942    pub fn version_store(&self) -> &VersionStore {
943        &self.version_store
944    }
945
946    /// Calculate minimum visible snapshot across all readers
947    pub fn min_active_snapshot(&self) -> u64 {
948        let mut min_ts = u64::MAX;
949
950        for i in 0..self.num_reader_slots {
951            let slot = self.reader_slot(i);
952            let pid = slot.pid.load(Ordering::Acquire);
953            if pid != 0 {
954                let ts = slot.snapshot_ts.load(Ordering::Acquire);
955                if ts > 0 && ts < min_ts {
956                    min_ts = ts;
957                }
958            }
959        }
960
961        min_ts
962    }
963
964    /// Calculate minimum active epoch across all readers
965    pub fn min_active_epoch(&self) -> u32 {
966        let mut min_epoch = u32::MAX;
967
968        for i in 0..self.num_reader_slots {
969            let slot = self.reader_slot(i);
970            let pid = slot.pid.load(Ordering::Acquire);
971            if pid != 0 {
972                let epoch = slot.epoch.load(Ordering::Acquire);
973                if epoch < min_epoch {
974                    min_epoch = epoch;
975                }
976            }
977        }
978
979        if min_epoch == u32::MAX {
980            // No active readers, use current epoch
981            self.current_epoch() as u32
982        } else {
983            min_epoch
984        }
985    }
986
987    /// Run garbage collection
988    ///
989    /// Should be called periodically (e.g., every 1000 commits).
990    pub fn run_gc(&self) -> usize {
991        let min_epoch = self.min_active_epoch();
992        let min_snapshot = self.min_active_snapshot();
993
994        self.version_store.gc(min_epoch, min_snapshot)
995    }
996
997    /// Check if GC should run (based on commit count)
998    pub fn should_run_gc(&self) -> bool {
999        self.header()
1000            .commits_since_gc
1001            .load(Ordering::Relaxed)
1002            >= GC_COMMIT_INTERVAL
1003    }
1004
1005    /// Increment commit count and maybe run GC
1006    pub fn on_commit(&self) {
1007        let count = self
1008            .header()
1009            .commits_since_gc
1010            .fetch_add(1, Ordering::Relaxed);
1011
1012        if count >= GC_COMMIT_INTERVAL {
1013            self.header().commits_since_gc.store(0, Ordering::Relaxed);
1014            let _ = self.run_gc();
1015        }
1016    }
1017
1018    /// Clean up stale readers (from crashed processes)
1019    pub fn cleanup_stale_readers(&self) -> usize {
1020        let now = current_time_us();
1021        let mut cleaned = 0;
1022
1023        for i in 0..self.num_reader_slots {
1024            let slot = self.reader_slot(i);
1025            if slot.is_stale(now) {
1026                slot.pid.store(0, Ordering::Release);
1027                cleaned += 1;
1028            }
1029        }
1030
1031        cleaned
1032    }
1033
1034    /// Advance epoch (called on recovery)
1035    pub fn advance_epoch(&self) -> u64 {
1036        self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
1037    }
1038}
1039
1040impl Drop for ConcurrentMvcc {
1041    fn drop(&mut self) {
1042        // Release any reader slot we hold
1043        if let Some(slot_idx) = *self.our_slot.read() {
1044            self.unregister_reader(slot_idx);
1045        }
1046
1047        // Release writer lock if we hold it
1048        self.release_writer();
1049    }
1050}
1051
1052// =============================================================================
1053// Writer Guard (RAII)
1054// =============================================================================
1055
1056/// Guard that releases writer lock on drop
1057pub struct WriterGuard<'a> {
1058    mvcc: &'a ConcurrentMvcc,
1059}
1060
1061impl<'a> Drop for WriterGuard<'a> {
1062    fn drop(&mut self) {
1063        self.mvcc.release_writer();
1064    }
1065}
1066
1067// =============================================================================
1068// Utility Functions
1069// =============================================================================
1070
1071/// Get current time in microseconds since epoch
1072#[inline]
1073fn current_time_us() -> u64 {
1074    SystemTime::now()
1075        .duration_since(UNIX_EPOCH)
1076        .unwrap()
1077        .as_micros() as u64
1078}
1079
1080/// Check if a process exists
1081#[cfg(unix)]
1082fn process_exists(pid: u32) -> bool {
1083    // kill(pid, 0) checks existence without sending signal
1084    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1085    if result == 0 {
1086        true
1087    } else {
1088        // ESRCH = no such process
1089        std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1090    }
1091}
1092
1093#[cfg(windows)]
1094fn process_exists(pid: u32) -> bool {
1095    use windows_sys::Win32::Foundation::CloseHandle;
1096    use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1097
1098    unsafe {
1099        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1100        if handle == 0 {
1101            false
1102        } else {
1103            CloseHandle(handle);
1104            true
1105        }
1106    }
1107}
1108
1109#[cfg(not(any(unix, windows)))]
1110fn process_exists(_pid: u32) -> bool {
1111    true // Assume exists on unknown platforms
1112}
1113
1114// =============================================================================
1115// Tests
1116// =============================================================================
1117
1118#[cfg(test)]
1119mod tests {
1120    use super::*;
1121    use std::sync::Arc;
1122    use std::thread;
1123
1124    #[test]
1125    fn test_struct_sizes() {
1126        eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
1127        eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
1128        eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
1129        eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
1130        eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
1131        eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
1132        eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
1133
1134        assert_eq!(std::mem::size_of::<MvccHeader>(), HEADER_SIZE,
1135            "MvccHeader size mismatch! Actual: {}, Expected: {}",
1136            std::mem::size_of::<MvccHeader>(), HEADER_SIZE);
1137        assert_eq!(std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE,
1138            "ReaderSlot size mismatch! Actual: {}, Expected: {}",
1139            std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE);
1140    }
1141
1142    #[test]
1143    fn test_hlc_timestamp_ordering() {
1144        let ts = AtomicU64::new(0);
1145
1146        let t1 = HlcTimestamp::allocate_next(&ts);
1147        let t2 = HlcTimestamp::allocate_next(&ts);
1148        let t3 = HlcTimestamp::allocate_next(&ts);
1149
1150        assert!(t1.raw() < t2.raw());
1151        assert!(t2.raw() < t3.raw());
1152    }
1153
1154    #[test]
1155    fn test_hlc_timestamp_concurrent() {
1156        let ts = Arc::new(AtomicU64::new(0));
1157        let mut handles = vec![];
1158
1159        for _ in 0..8 {
1160            let ts = ts.clone();
1161            handles.push(thread::spawn(move || {
1162                let mut timestamps = vec![];
1163                for _ in 0..1000 {
1164                    timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1165                }
1166                timestamps
1167            }));
1168        }
1169
1170        let mut all_ts: Vec<u64> = handles
1171            .into_iter()
1172            .flat_map(|h| h.join().unwrap())
1173            .collect();
1174
1175        // All timestamps should be unique
1176        all_ts.sort();
1177        let len_before = all_ts.len();
1178        all_ts.dedup();
1179        assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1180    }
1181
1182    #[test]
1183    fn test_version_chain_read_at() {
1184        let mut chain = VersionChain::new();
1185
1186        // Add versions
1187        chain.versions.push(VersionEntry::new(100, 1, 1, Some(b"v100".to_vec())));
1188        chain.versions.push(VersionEntry::new(90, 2, 1, Some(b"v90".to_vec())));
1189        chain.versions.push(VersionEntry::new(80, 3, 1, Some(b"v80".to_vec())));
1190
1191        // Read at different snapshots
1192        let v = chain.read_at(105, None).unwrap();
1193        assert_eq!(v.value, Some(b"v100".to_vec()));
1194
1195        let v = chain.read_at(95, None).unwrap();
1196        assert_eq!(v.value, Some(b"v90".to_vec()));
1197
1198        let v = chain.read_at(85, None).unwrap();
1199        assert_eq!(v.value, Some(b"v80".to_vec()));
1200
1201        // Read before all versions
1202        assert!(chain.read_at(75, None).is_none());
1203    }
1204
1205    #[test]
1206    fn test_version_chain_gc() {
1207        let mut chain = VersionChain::new();
1208
1209        // Add 10 versions with varying epochs
1210        // Older versions have older epochs
1211        for i in 0..10 {
1212            chain.versions.push(VersionEntry::new(
1213                100 - i * 5,   // commit_ts: 100, 95, 90, 85, 80, 75, 70, 65, 60, 55
1214                i as u64,     // txn_id
1215                (10 - i) as u32, // epoch: 10, 9, 8, 7, 6, 5, 4, 3, 2, 1 (older = lower)
1216                Some(format!("v{}", 100 - i * 5).into_bytes()),
1217            ));
1218        }
1219
1220        assert_eq!(chain.len(), 10);
1221
1222        // GC with min_epoch = 7, min_snapshot = 75
1223        // Keeps versions where epoch >= 7 OR commit_ts >= 75
1224        // epoch >= 7: versions at ts=100, 95, 90, 85 (epochs 10, 9, 8, 7)
1225        // commit_ts >= 75: versions at ts=100, 95, 90, 85, 80, 75
1226        // But we always keep at least the newest one
1227        let reclaimed = chain.gc(7, 75);
1228
1229        // Versions that should be reclaimed:
1230        // ts=70 (epoch=4), ts=65 (epoch=3), ts=60 (epoch=2), ts=55 (epoch=1)
1231        // That's 4 versions reclaimed
1232        assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
1233        assert!(chain.len() >= 1, "Should keep at least one version");
1234    }
1235
1236    #[test]
1237    fn test_version_store_basic() {
1238        let store = VersionStore::new();
1239
1240        // Insert uncommitted
1241        store
1242            .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1243            .unwrap();
1244
1245        // Commit it
1246        assert!(store.commit(b"key1", 1, 100));
1247
1248        // Read at snapshot after commit
1249        let value = store.get(b"key1", 150, None);
1250        assert_eq!(value, Some(b"value1".to_vec()));
1251
1252        // Read at snapshot before commit
1253        let value = store.get(b"key1", 50, None);
1254        assert!(value.is_none());
1255    }
1256
1257    #[test]
1258    fn test_reader_slot_claim_release() {
1259        let slot = ReaderSlot::empty();
1260
1261        assert!(slot.is_free());
1262
1263        // Claim slot
1264        assert!(slot.try_claim(1234, 100, 1));
1265        assert!(!slot.is_free());
1266
1267        // Can't claim from different PID
1268        assert!(!slot.try_claim(5678, 200, 2));
1269
1270        // Can re-claim from same PID
1271        assert!(slot.try_claim(1234, 300, 3));
1272
1273        // Release
1274        slot.release(1234);
1275        assert!(slot.is_free());
1276    }
1277}