Skip to main content

sochdb_storage/
mvcc_concurrent.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Concurrent MVCC for Multi-Reader Single-Writer Embedded Mode
19//!
20//! This module implements lock-free concurrent reads with single-writer
21//! coordination using shared memory MVCC metadata.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │  Shared Memory MVCC Metadata (.mvcc_metadata file, mmap'd)      │
28//! ├─────────────────────────────────────────────────────────────────┤
29//! │  Header:                                                         │
30//! │    - magic, version, page_size                                   │
31//! │    - current_epoch (AtomicU64)                                   │
32//! │    - current_ts (AtomicU64, HLC timestamp)                       │
33//! │    - writer_lock (AtomicU32, 0=free, pid=locked)                │
34//! │                                                                  │
35//! │  Reader Table (1024 slots × 64 bytes = 64KB):                    │
36//! │    [slot 0]: pid, snapshot_ts, epoch, last_heartbeat             │
37//! │    [slot 1]: ...                                                 │
38//! │    [slot N]: ...                                                 │
39//! │                                                                  │
40//! │  Version Store (DashMap-like lock-free hashtable):               │
41//! │    key → [version @ ts=105, version @ ts=99, ...]                │
42//! └─────────────────────────────────────────────────────────────────┘
43//! ```
44//!
45//! ## Performance Model
46//!
47//! - Lock-free read: ~100ns (atomic load + version lookup)
48//! - Writer lock acquisition: ~20ns (uncontended CAS)
49//! - Version GC: O(N_versions) every 1000 commits
50//!
51//! ## Safety
52//!
53//! - Readers are lock-free (no blocking, no starvation)
54//! - Single writer ensures WAL consistency
55//! - Epoch-based GC prevents use-after-free
56
57use std::fs::{File, OpenOptions};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
60use std::time::{Duration, SystemTime, UNIX_EPOCH};
61
62use dashmap::DashMap;
63use parking_lot::RwLock;
64use sochdb_core::{Result, SochDBError};
65
66// Type aliases to avoid conflicts with other modules
67pub type ConcurrentVersionChain = VersionChain;
68pub type ConcurrentVersionEntry = VersionEntry;
69
70// =============================================================================
71// Constants
72// =============================================================================
73
74/// Magic bytes for MVCC metadata file: "SOCHMVCC"
75const MVCC_MAGIC: u64 = 0x43435F564D484353; // "SOCHMVCC" little-endian
76
77/// Current format version
78const MVCC_VERSION: u32 = 1;
79
80/// Maximum concurrent readers
81const MAX_READERS: usize = 1024;
82
83/// Reader slot size (64 bytes = 1 cache line)
84const READER_SLOT_SIZE: usize = 64;
85
86/// Header size
87const HEADER_SIZE: usize = 64;
88
89/// Total metadata size (header + reader table)
90const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
91
92/// Stale reader timeout (60 seconds)
93const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
94
95/// GC interval (every N commits)
96const GC_COMMIT_INTERVAL: u64 = 1000;
97
98// =============================================================================
99// Hybrid Logical Clock
100// =============================================================================
101
102/// Hybrid Logical Clock for monotonic timestamps
103///
104/// Format: [48-bit physical time | 16-bit logical counter]
105///
106/// Properties:
107/// 1. Monotonically increasing
108/// 2. Causally ordered
109/// 3. Resolution: 65,536 events per millisecond
110#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
111pub struct HlcTimestamp(pub u64);
112
113impl HlcTimestamp {
114    /// Create timestamp from physical time (ms) and logical counter
115    #[inline]
116    pub fn new(physical_ms: u64, logical: u16) -> Self {
117        Self((physical_ms << 16) | (logical as u64))
118    }
119
120    /// Get physical time component (milliseconds since epoch)
121    #[inline]
122    pub fn physical_ms(&self) -> u64 {
123        self.0 >> 16
124    }
125
126    /// Get logical counter component
127    #[inline]
128    pub fn logical(&self) -> u16 {
129        (self.0 & 0xFFFF) as u16
130    }
131
132    /// Get raw value
133    #[inline]
134    pub fn raw(&self) -> u64 {
135        self.0
136    }
137
138    /// Allocate next timestamp (atomic, lock-free)
139    ///
140    /// Algorithm:
141    /// 1. Read current physical time
142    /// 2. If physical > last_physical: reset logical to 0
143    /// 3. Else: increment logical counter
144    /// 4. CAS to update, retry on conflict
145    pub fn allocate_next(last: &AtomicU64) -> Self {
146        let physical_now = SystemTime::now()
147            .duration_since(UNIX_EPOCH)
148            .unwrap()
149            .as_millis() as u64;
150
151        loop {
152            let last_val = last.load(Ordering::Acquire);
153            let last_phys = last_val >> 16;
154            let last_log = (last_val & 0xFFFF) as u16;
155
156            let (new_phys, new_log) = if physical_now > last_phys {
157                (physical_now, 0u16)
158            } else {
159                // Clock hasn't advanced, increment logical
160                (last_phys, last_log.saturating_add(1))
161            };
162
163            let new_val = (new_phys << 16) | (new_log as u64);
164
165            if last
166                .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
167                .is_ok()
168            {
169                return Self(new_val);
170            }
171            // CAS failed, retry
172            std::hint::spin_loop();
173        }
174    }
175
176    /// Read current timestamp without advancing
177    #[inline]
178    pub fn read_current(ts: &AtomicU64) -> Self {
179        Self(ts.load(Ordering::Acquire))
180    }
181}
182
183impl From<u64> for HlcTimestamp {
184    fn from(val: u64) -> Self {
185        Self(val)
186    }
187}
188
189impl From<HlcTimestamp> for u64 {
190    fn from(ts: HlcTimestamp) -> Self {
191        ts.0
192    }
193}
194
195// =============================================================================
196// Reader Slot (Cache-Line Aligned)
197// =============================================================================
198
199/// Reader slot in shared memory (64 bytes = 1 cache line)
200///
201/// Each active reader registers in a slot to prevent GC from
202/// reclaiming versions it might need.
203#[repr(C, align(64))]
204#[derive(Debug)]
205pub struct ReaderSlot {
206    /// Process ID (0 = slot is free)
207    pub pid: AtomicU32,
208    /// Snapshot timestamp this reader is using
209    pub snapshot_ts: AtomicU64,
210    /// Epoch number when reader registered
211    pub epoch: AtomicU32,
212    /// Last heartbeat (microseconds since epoch)
213    pub last_heartbeat: AtomicU64,
214    /// Reserved for future use
215    _reserved: [u8; 36],
216}
217
218impl ReaderSlot {
219    /// Create an empty reader slot
220    pub const fn empty() -> Self {
221        Self {
222            pid: AtomicU32::new(0),
223            snapshot_ts: AtomicU64::new(0),
224            epoch: AtomicU32::new(0),
225            last_heartbeat: AtomicU64::new(0),
226            _reserved: [0u8; 36],
227        }
228    }
229
230    /// Check if slot is free
231    #[inline]
232    pub fn is_free(&self) -> bool {
233        self.pid.load(Ordering::Acquire) == 0
234    }
235
236    /// Try to claim this slot for reading
237    ///
238    /// Returns true if successfully claimed.
239    #[inline]
240    pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
241        let current_pid = self.pid.load(Ordering::Acquire);
242        
243        // Only claim if free or already ours
244        if current_pid != 0 && current_pid != my_pid {
245            return false;
246        }
247
248        if self
249            .pid
250            .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
251            .is_ok()
252        {
253            // Successfully claimed, update metadata
254            self.snapshot_ts.store(snapshot_ts, Ordering::Release);
255            self.epoch.store(epoch, Ordering::Release);
256            self.last_heartbeat
257                .store(current_time_us(), Ordering::Release);
258            true
259        } else {
260            false
261        }
262    }
263
264    /// Release this slot
265    #[inline]
266    pub fn release(&self, my_pid: u32) {
267        // Only release if we own it
268        if self.pid.load(Ordering::Acquire) == my_pid {
269            self.snapshot_ts.store(0, Ordering::Release);
270            self.pid.store(0, Ordering::Release);
271        }
272    }
273
274    /// Update heartbeat
275    #[inline]
276    pub fn heartbeat(&self) {
277        self.last_heartbeat
278            .store(current_time_us(), Ordering::Release);
279    }
280
281    /// Check if this slot is stale (process crashed or hung)
282    #[inline]
283    pub fn is_stale(&self, now_us: u64) -> bool {
284        let pid = self.pid.load(Ordering::Acquire);
285        if pid == 0 {
286            return false; // Empty slot
287        }
288
289        // Check heartbeat timeout
290        let last_hb = self.last_heartbeat.load(Ordering::Acquire);
291        if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
292            return true;
293        }
294
295        // Check if process still exists
296        !process_exists(pid)
297    }
298}
299
300// =============================================================================
301// MVCC Metadata Header
302// =============================================================================
303
304/// MVCC metadata file header
305#[repr(C)]
306#[derive(Debug)]
307pub struct MvccHeader {
308    /// Magic bytes for validation
309    pub magic: u64,
310    /// Format version
311    pub version: u32,
312    /// Page size
313    pub page_size: u32,
314    /// Number of reader slots
315    pub num_readers: u32,
316    /// Current epoch (incremented on recovery/GC)
317    pub current_epoch: AtomicU64,
318    /// Current HLC timestamp
319    pub current_ts: AtomicU64,
320    /// Writer lock (0 = free, pid = locked)
321    pub writer_lock: AtomicU32,
322    /// Number of commits since last GC
323    pub commits_since_gc: AtomicU64,
324    /// Reserved for alignment
325    _reserved: [u8; 4],
326}
327
328impl MvccHeader {
329    /// Create new header with default values
330    pub fn new() -> Self {
331        Self {
332            magic: MVCC_MAGIC,
333            version: MVCC_VERSION,
334            page_size: 4096,
335            num_readers: MAX_READERS as u32,
336            current_epoch: AtomicU64::new(1),
337            current_ts: AtomicU64::new(HlcTimestamp::new(
338                SystemTime::now()
339                    .duration_since(UNIX_EPOCH)
340                    .unwrap()
341                    .as_millis() as u64,
342                0,
343            ).raw()),
344            writer_lock: AtomicU32::new(0),
345            commits_since_gc: AtomicU64::new(0),
346            _reserved: [0u8; 4],
347        }
348    }
349
350    /// Validate header
351    pub fn validate(&self) -> Result<()> {
352        if self.magic != MVCC_MAGIC {
353            return Err(SochDBError::Corruption(
354                "Invalid MVCC metadata magic".into(),
355            ));
356        }
357        if self.version != MVCC_VERSION {
358            return Err(SochDBError::Corruption(format!(
359                "Unsupported MVCC version: {} (expected {})",
360                self.version, MVCC_VERSION
361            )));
362        }
363        Ok(())
364    }
365}
366
367impl Default for MvccHeader {
368    fn default() -> Self {
369        Self::new()
370    }
371}
372
373// =============================================================================
374// Version Entry
375// =============================================================================
376
377/// A single version of a key-value pair
378#[derive(Debug, Clone)]
379pub struct VersionEntry {
380    /// Commit timestamp (HLC)
381    pub commit_ts: u64,
382    /// Transaction ID that created this version
383    pub txn_id: u64,
384    /// Epoch when this version was created
385    pub epoch: u32,
386    /// The value (None = tombstone/deletion)
387    pub value: Option<Vec<u8>>,
388}
389
390impl VersionEntry {
391    /// Create new version entry
392    pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
393        Self {
394            commit_ts,
395            txn_id,
396            epoch,
397            value,
398        }
399    }
400
401    /// Check if this version is visible at given snapshot
402    #[inline]
403    pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
404        self.commit_ts > 0 && self.commit_ts < snapshot_ts
405    }
406}
407
408// =============================================================================
409// Version Chain (Sorted by commit_ts descending)
410// =============================================================================
411
412/// Chain of versions for a single key, sorted by commit_ts descending
413///
414/// Optimized for:
415/// - O(log V) reads via binary search
416/// - O(1) writes (prepend to front)
417/// - O(V) GC (linear scan with compaction)
418#[derive(Debug, Default)]
419pub struct VersionChain {
420    /// Committed versions, sorted by commit_ts DESCENDING
421    versions: Vec<VersionEntry>,
422    /// Uncommitted version (at most one per key)
423    uncommitted: Option<VersionEntry>,
424}
425
426impl VersionChain {
427    /// Create empty version chain
428    pub fn new() -> Self {
429        Self {
430            versions: Vec::new(),
431            uncommitted: None,
432        }
433    }
434
435    /// Add uncommitted version
436    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
437        self.uncommitted = Some(VersionEntry {
438            commit_ts: 0, // Uncommitted
439            txn_id,
440            epoch,
441            value,
442        });
443    }
444
445    /// Commit the uncommitted version
446    ///
447    /// Returns true if committed successfully
448    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
449        if let Some(ref mut v) = self.uncommitted {
450            if v.txn_id == txn_id {
451                v.commit_ts = commit_ts;
452                let committed = self.uncommitted.take().unwrap();
453
454                // Insert into sorted position (descending by commit_ts)
455                // Binary search for insertion point
456                let pos = self
457                    .versions
458                    .partition_point(|existing| existing.commit_ts > commit_ts);
459                self.versions.insert(pos, committed);
460
461                return true;
462            }
463        }
464        false
465    }
466
467    /// Abort uncommitted version
468    pub fn abort(&mut self, txn_id: u64) {
469        if let Some(ref v) = self.uncommitted {
470            if v.txn_id == txn_id {
471                self.uncommitted = None;
472            }
473        }
474    }
475
476    /// Read at snapshot timestamp
477    ///
478    /// Returns the most recent version visible at snapshot_ts.
479    /// If current_txn_id is provided, also considers uncommitted writes from that txn.
480    ///
481    /// Complexity: O(log V) via binary search
482    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
483        // Check uncommitted version from current transaction
484        if let Some(txn_id) = current_txn_id {
485            if let Some(ref v) = self.uncommitted {
486                if v.txn_id == txn_id {
487                    return Some(v);
488                }
489            }
490        }
491
492        // Binary search for first version with commit_ts < snapshot_ts
493        // Versions are sorted descending, so we find first where commit_ts < snapshot_ts
494        let idx = self
495            .versions
496            .partition_point(|v| v.commit_ts >= snapshot_ts);
497
498        self.versions.get(idx)
499    }
500
501    /// Check if there's a write conflict
502    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
503        if let Some(ref v) = self.uncommitted {
504            return v.txn_id != my_txn_id;
505        }
506        false
507    }
508
509    /// Garbage collect old versions
510    ///
511    /// Removes versions that are:
512    /// 1. Older than min_visible_epoch AND
513    /// 2. Older than min_snapshot_ts AND
514    /// 3. Not the most recent version (we always keep at least one)
515    ///
516    /// Returns number of versions reclaimed
517    pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
518        if self.versions.len() <= 1 {
519            return 0; // Always keep at least one version
520        }
521
522        let original_len = self.versions.len();
523
524        // Keep versions that are:
525        // - In current or recent epoch, OR
526        // - Potentially visible to active readers, OR
527        // - The most recent version (always keep)
528        let mut keep_count = 1; // Always keep newest
529        for v in self.versions.iter().skip(1) {
530            if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
531                keep_count += 1;
532            } else {
533                break; // Sorted descending, so all remaining are older
534            }
535        }
536
537        self.versions.truncate(keep_count);
538        original_len - self.versions.len()
539    }
540
541    /// Get number of versions
542    pub fn len(&self) -> usize {
543        self.versions.len() + if self.uncommitted.is_some() { 1 } else { 0 }
544    }
545
546    /// Check if empty
547    pub fn is_empty(&self) -> bool {
548        self.versions.is_empty() && self.uncommitted.is_none()
549    }
550}
551
552// =============================================================================
553// Concurrent Version Store
554// =============================================================================
555
556/// Lock-free version store using DashMap
557///
558/// Provides O(1) key lookup + O(log V) version lookup per key.
559pub struct VersionStore {
560    /// Key → VersionChain mapping
561    data: DashMap<Vec<u8>, VersionChain>,
562    /// Statistics
563    stats: VersionStoreStats,
564}
565
566/// Version store statistics
567#[derive(Debug, Default)]
568pub struct VersionStoreStats {
569    /// Total number of keys
570    pub num_keys: AtomicU64,
571    /// Total number of versions across all keys
572    pub num_versions: AtomicU64,
573    /// Number of GC passes
574    pub gc_passes: AtomicU64,
575    /// Versions reclaimed by GC
576    pub versions_reclaimed: AtomicU64,
577}
578
579impl VersionStore {
580    /// Create new version store
581    pub fn new() -> Self {
582        Self {
583            data: DashMap::new(),
584            stats: VersionStoreStats::default(),
585        }
586    }
587
588    /// Insert a new uncommitted version
589    pub fn insert_uncommitted(
590        &self,
591        key: &[u8],
592        value: Option<Vec<u8>>,
593        txn_id: u64,
594        epoch: u32,
595    ) -> Result<()> {
596        let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
597            self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
598            VersionChain::new()
599        });
600
601        // Check for write conflict
602        if entry.has_write_conflict(txn_id) {
603            return Err(SochDBError::Internal(
604                "Write conflict: another transaction has uncommitted write".into(),
605            ));
606        }
607
608        entry.add_uncommitted(value, txn_id, epoch);
609        self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
610        Ok(())
611    }
612
613    /// Commit a version
614    pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
615        if let Some(mut entry) = self.data.get_mut(key) {
616            return entry.commit(txn_id, commit_ts);
617        }
618        false
619    }
620
621    /// Abort uncommitted version
622    pub fn abort(&self, key: &[u8], txn_id: u64) {
623        if let Some(mut entry) = self.data.get_mut(key) {
624            entry.abort(txn_id);
625            self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
626        }
627    }
628
629    /// Read value at snapshot timestamp
630    pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
631        self.data.get(key).and_then(|chain| {
632            chain
633                .read_at(snapshot_ts, current_txn_id)
634                .and_then(|v| v.value.clone())
635        })
636    }
637
638    /// Check if key exists at snapshot
639    pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
640        self.data
641            .get(key)
642            .map(|chain| chain.read_at(snapshot_ts, None).is_some())
643            .unwrap_or(false)
644    }
645
646    /// Run garbage collection
647    ///
648    /// Removes old versions that are no longer visible to any reader.
649    pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
650        let mut total_reclaimed = 0;
651
652        for mut entry in self.data.iter_mut() {
653            let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
654            total_reclaimed += reclaimed;
655        }
656
657        self.stats
658            .gc_passes
659            .fetch_add(1, Ordering::Relaxed);
660        self.stats
661            .versions_reclaimed
662            .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
663
664        total_reclaimed
665    }
666
667    /// Get number of keys
668    pub fn len(&self) -> usize {
669        self.data.len()
670    }
671
672    /// Check if empty
673    pub fn is_empty(&self) -> bool {
674        self.data.is_empty()
675    }
676
677    /// Get statistics
678    pub fn stats(&self) -> &VersionStoreStats {
679        &self.stats
680    }
681}
682
683impl Default for VersionStore {
684    fn default() -> Self {
685        Self::new()
686    }
687}
688
689// =============================================================================
690// Concurrent MVCC Manager
691// =============================================================================
692
693/// Manager for concurrent MVCC operations
694///
695/// Coordinates:
696/// - Reader registration (lock-free)
697/// - Writer locking (single-writer)
698/// - Version store access
699/// - Garbage collection
700pub struct ConcurrentMvcc {
701    /// Path to database
702    path: PathBuf,
703    /// MVCC header (in-memory, synced from mmap'd file)
704    header: MvccHeader,
705    /// Reader slots
706    reader_slots: Vec<ReaderSlot>,
707    /// Version store
708    version_store: VersionStore,
709    /// Our process ID
710    our_pid: u32,
711    /// Slot index we're using (if registered as reader)
712    our_slot: RwLock<Option<usize>>,
713}
714
715impl ConcurrentMvcc {
716    /// Open or create MVCC manager
717    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
718        let path = path.as_ref().to_path_buf();
719        std::fs::create_dir_all(&path)?;
720
721        let metadata_path = path.join(".mvcc_metadata");
722        let header = if metadata_path.exists() {
723            // Load existing header
724            Self::load_header(&metadata_path)?
725        } else {
726            // Create new metadata file
727            let header = MvccHeader::new();
728            Self::save_header(&metadata_path, &header)?;
729            header
730        };
731
732        // Initialize reader slots
733        let mut reader_slots = Vec::with_capacity(MAX_READERS);
734        for _ in 0..MAX_READERS {
735            reader_slots.push(ReaderSlot::empty());
736        }
737
738        Ok(Self {
739            path,
740            header,
741            reader_slots,
742            version_store: VersionStore::new(),
743            our_pid: std::process::id(),
744            our_slot: RwLock::new(None),
745        })
746    }
747
748    /// Load header from file
749    fn load_header(path: &Path) -> Result<MvccHeader> {
750        use std::io::Read;
751        let mut file = File::open(path)?;
752        let mut buf = vec![0u8; std::mem::size_of::<MvccHeader>()];
753        file.read_exact(&mut buf)?;
754
755        // SAFETY: MvccHeader is repr(C) with no padding issues
756        let header: MvccHeader = unsafe { std::ptr::read(buf.as_ptr() as *const MvccHeader) };
757        header.validate()?;
758        Ok(header)
759    }
760
761    /// Save header to file
762    fn save_header(path: &Path, header: &MvccHeader) -> Result<()> {
763        use std::io::Write;
764        let mut file = OpenOptions::new()
765            .create(true)
766            .write(true)
767            .truncate(true)
768            .open(path)?;
769
770        // SAFETY: MvccHeader is repr(C)
771        let buf = unsafe {
772            std::slice::from_raw_parts(
773                header as *const MvccHeader as *const u8,
774                std::mem::size_of::<MvccHeader>(),
775            )
776        };
777        file.write_all(buf)?;
778        file.sync_all()?;
779        Ok(())
780    }
781
782    /// Allocate next timestamp
783    #[inline]
784    pub fn allocate_timestamp(&self) -> HlcTimestamp {
785        HlcTimestamp::allocate_next(&self.header.current_ts)
786    }
787
788    /// Get current timestamp without advancing
789    #[inline]
790    pub fn current_timestamp(&self) -> HlcTimestamp {
791        HlcTimestamp::read_current(&self.header.current_ts)
792    }
793
794    /// Get current epoch
795    #[inline]
796    pub fn current_epoch(&self) -> u64 {
797        self.header.current_epoch.load(Ordering::Acquire)
798    }
799
800    /// Register as active reader
801    ///
802    /// Returns slot index on success.
803    /// Must call `unregister_reader()` when done.
804    pub fn register_reader(&self) -> Result<usize> {
805        let snapshot_ts = self.current_timestamp().raw();
806        let epoch = self.current_epoch() as u32;
807
808        // Find free slot
809        for (i, slot) in self.reader_slots.iter().enumerate() {
810            if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
811                *self.our_slot.write() = Some(i);
812                return Ok(i);
813            }
814        }
815
816        Err(SochDBError::ResourceExhausted(
817            "Too many concurrent readers".into(),
818        ))
819    }
820
821    /// Unregister as reader
822    pub fn unregister_reader(&self, slot_idx: usize) {
823        if slot_idx < self.reader_slots.len() {
824            self.reader_slots[slot_idx].release(self.our_pid);
825            *self.our_slot.write() = None;
826        }
827    }
828
829    /// Try to acquire writer lock
830    ///
831    /// Returns WriterGuard on success, which releases lock on drop.
832    pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
833        let current = self.header.writer_lock.load(Ordering::Acquire);
834
835        if current == 0 {
836            // Try to acquire
837            if self
838                .header
839                .writer_lock
840                .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
841                .is_ok()
842            {
843                return Ok(WriterGuard { mvcc: self });
844            }
845        } else if current == self.our_pid {
846            // Already own the lock (reentrant)
847            return Ok(WriterGuard { mvcc: self });
848        }
849
850        Err(SochDBError::LockError(format!(
851            "Writer lock held by process {}",
852            current
853        )))
854    }
855
856    /// Acquire writer lock with timeout
857    pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
858        let deadline = std::time::Instant::now() + timeout;
859
860        loop {
861            match self.try_acquire_writer() {
862                Ok(guard) => return Ok(guard),
863                Err(_) if std::time::Instant::now() < deadline => {
864                    std::thread::sleep(Duration::from_micros(100));
865                }
866                Err(e) => return Err(e),
867            }
868        }
869    }
870
871    /// Release writer lock
872    fn release_writer(&self) {
873        let current = self.header.writer_lock.load(Ordering::Acquire);
874        if current == self.our_pid {
875            self.header.writer_lock.store(0, Ordering::Release);
876        }
877    }
878
879    /// Get version store
880    pub fn version_store(&self) -> &VersionStore {
881        &self.version_store
882    }
883
884    /// Calculate minimum visible snapshot across all readers
885    pub fn min_active_snapshot(&self) -> u64 {
886        let mut min_ts = u64::MAX;
887
888        for slot in &self.reader_slots {
889            let pid = slot.pid.load(Ordering::Acquire);
890            if pid != 0 {
891                let ts = slot.snapshot_ts.load(Ordering::Acquire);
892                if ts > 0 && ts < min_ts {
893                    min_ts = ts;
894                }
895            }
896        }
897
898        min_ts
899    }
900
901    /// Calculate minimum active epoch across all readers
902    pub fn min_active_epoch(&self) -> u32 {
903        let mut min_epoch = u32::MAX;
904
905        for slot in &self.reader_slots {
906            let pid = slot.pid.load(Ordering::Acquire);
907            if pid != 0 {
908                let epoch = slot.epoch.load(Ordering::Acquire);
909                if epoch < min_epoch {
910                    min_epoch = epoch;
911                }
912            }
913        }
914
915        if min_epoch == u32::MAX {
916            // No active readers, use current epoch
917            self.current_epoch() as u32
918        } else {
919            min_epoch
920        }
921    }
922
923    /// Run garbage collection
924    ///
925    /// Should be called periodically (e.g., every 1000 commits).
926    pub fn run_gc(&self) -> usize {
927        let min_epoch = self.min_active_epoch();
928        let min_snapshot = self.min_active_snapshot();
929
930        self.version_store.gc(min_epoch, min_snapshot)
931    }
932
933    /// Check if GC should run (based on commit count)
934    pub fn should_run_gc(&self) -> bool {
935        self.header
936            .commits_since_gc
937            .load(Ordering::Relaxed)
938            >= GC_COMMIT_INTERVAL
939    }
940
941    /// Increment commit count and maybe run GC
942    pub fn on_commit(&self) {
943        let count = self
944            .header
945            .commits_since_gc
946            .fetch_add(1, Ordering::Relaxed);
947
948        if count >= GC_COMMIT_INTERVAL {
949            self.header.commits_since_gc.store(0, Ordering::Relaxed);
950            let _ = self.run_gc();
951        }
952    }
953
954    /// Clean up stale readers (from crashed processes)
955    pub fn cleanup_stale_readers(&self) -> usize {
956        let now = current_time_us();
957        let mut cleaned = 0;
958
959        for slot in &self.reader_slots {
960            if slot.is_stale(now) {
961                slot.pid.store(0, Ordering::Release);
962                cleaned += 1;
963            }
964        }
965
966        cleaned
967    }
968
969    /// Advance epoch (called on recovery)
970    pub fn advance_epoch(&self) -> u64 {
971        self.header.current_epoch.fetch_add(1, Ordering::AcqRel) + 1
972    }
973}
974
975impl Drop for ConcurrentMvcc {
976    fn drop(&mut self) {
977        // Release any reader slot we hold
978        if let Some(slot_idx) = *self.our_slot.read() {
979            self.unregister_reader(slot_idx);
980        }
981
982        // Release writer lock if we hold it
983        self.release_writer();
984    }
985}
986
987// =============================================================================
988// Writer Guard (RAII)
989// =============================================================================
990
991/// Guard that releases writer lock on drop
992pub struct WriterGuard<'a> {
993    mvcc: &'a ConcurrentMvcc,
994}
995
996impl<'a> Drop for WriterGuard<'a> {
997    fn drop(&mut self) {
998        self.mvcc.release_writer();
999    }
1000}
1001
1002// =============================================================================
1003// Utility Functions
1004// =============================================================================
1005
1006/// Get current time in microseconds since epoch
1007#[inline]
1008fn current_time_us() -> u64 {
1009    SystemTime::now()
1010        .duration_since(UNIX_EPOCH)
1011        .unwrap()
1012        .as_micros() as u64
1013}
1014
1015/// Check if a process exists
1016#[cfg(unix)]
1017fn process_exists(pid: u32) -> bool {
1018    // kill(pid, 0) checks existence without sending signal
1019    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1020    if result == 0 {
1021        true
1022    } else {
1023        // ESRCH = no such process
1024        std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1025    }
1026}
1027
1028#[cfg(windows)]
1029fn process_exists(pid: u32) -> bool {
1030    use windows_sys::Win32::Foundation::CloseHandle;
1031    use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1032
1033    unsafe {
1034        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1035        if handle == 0 {
1036            false
1037        } else {
1038            CloseHandle(handle);
1039            true
1040        }
1041    }
1042}
1043
1044#[cfg(not(any(unix, windows)))]
1045fn process_exists(_pid: u32) -> bool {
1046    true // Assume exists on unknown platforms
1047}
1048
1049// =============================================================================
1050// Tests
1051// =============================================================================
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::*;
1056    use std::sync::Arc;
1057    use std::thread;
1058
1059    #[test]
1060    fn test_hlc_timestamp_ordering() {
1061        let ts = AtomicU64::new(0);
1062
1063        let t1 = HlcTimestamp::allocate_next(&ts);
1064        let t2 = HlcTimestamp::allocate_next(&ts);
1065        let t3 = HlcTimestamp::allocate_next(&ts);
1066
1067        assert!(t1.raw() < t2.raw());
1068        assert!(t2.raw() < t3.raw());
1069    }
1070
1071    #[test]
1072    fn test_hlc_timestamp_concurrent() {
1073        let ts = Arc::new(AtomicU64::new(0));
1074        let mut handles = vec![];
1075
1076        for _ in 0..8 {
1077            let ts = ts.clone();
1078            handles.push(thread::spawn(move || {
1079                let mut timestamps = vec![];
1080                for _ in 0..1000 {
1081                    timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1082                }
1083                timestamps
1084            }));
1085        }
1086
1087        let mut all_ts: Vec<u64> = handles
1088            .into_iter()
1089            .flat_map(|h| h.join().unwrap())
1090            .collect();
1091
1092        // All timestamps should be unique
1093        all_ts.sort();
1094        let len_before = all_ts.len();
1095        all_ts.dedup();
1096        assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1097    }
1098
1099    #[test]
1100    fn test_version_chain_read_at() {
1101        let mut chain = VersionChain::new();
1102
1103        // Add versions
1104        chain.versions.push(VersionEntry::new(100, 1, 1, Some(b"v100".to_vec())));
1105        chain.versions.push(VersionEntry::new(90, 2, 1, Some(b"v90".to_vec())));
1106        chain.versions.push(VersionEntry::new(80, 3, 1, Some(b"v80".to_vec())));
1107
1108        // Read at different snapshots
1109        let v = chain.read_at(105, None).unwrap();
1110        assert_eq!(v.value, Some(b"v100".to_vec()));
1111
1112        let v = chain.read_at(95, None).unwrap();
1113        assert_eq!(v.value, Some(b"v90".to_vec()));
1114
1115        let v = chain.read_at(85, None).unwrap();
1116        assert_eq!(v.value, Some(b"v80".to_vec()));
1117
1118        // Read before all versions
1119        assert!(chain.read_at(75, None).is_none());
1120    }
1121
1122    #[test]
1123    fn test_version_chain_gc() {
1124        let mut chain = VersionChain::new();
1125
1126        // Add 10 versions with varying epochs
1127        // Older versions have older epochs
1128        for i in 0..10 {
1129            chain.versions.push(VersionEntry::new(
1130                100 - i * 5,   // commit_ts: 100, 95, 90, 85, 80, 75, 70, 65, 60, 55
1131                i as u64,     // txn_id
1132                (10 - i) as u32, // epoch: 10, 9, 8, 7, 6, 5, 4, 3, 2, 1 (older = lower)
1133                Some(format!("v{}", 100 - i * 5).into_bytes()),
1134            ));
1135        }
1136
1137        assert_eq!(chain.len(), 10);
1138
1139        // GC with min_epoch = 7, min_snapshot = 75
1140        // Keeps versions where epoch >= 7 OR commit_ts >= 75
1141        // epoch >= 7: versions at ts=100, 95, 90, 85 (epochs 10, 9, 8, 7)
1142        // commit_ts >= 75: versions at ts=100, 95, 90, 85, 80, 75
1143        // But we always keep at least the newest one
1144        let reclaimed = chain.gc(7, 75);
1145
1146        // Versions that should be reclaimed:
1147        // ts=70 (epoch=4), ts=65 (epoch=3), ts=60 (epoch=2), ts=55 (epoch=1)
1148        // That's 4 versions reclaimed
1149        assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
1150        assert!(chain.len() >= 1, "Should keep at least one version");
1151    }
1152
1153    #[test]
1154    fn test_version_store_basic() {
1155        let store = VersionStore::new();
1156
1157        // Insert uncommitted
1158        store
1159            .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1160            .unwrap();
1161
1162        // Commit it
1163        assert!(store.commit(b"key1", 1, 100));
1164
1165        // Read at snapshot after commit
1166        let value = store.get(b"key1", 150, None);
1167        assert_eq!(value, Some(b"value1".to_vec()));
1168
1169        // Read at snapshot before commit
1170        let value = store.get(b"key1", 50, None);
1171        assert!(value.is_none());
1172    }
1173
1174    #[test]
1175    fn test_reader_slot_claim_release() {
1176        let slot = ReaderSlot::empty();
1177
1178        assert!(slot.is_free());
1179
1180        // Claim slot
1181        assert!(slot.try_claim(1234, 100, 1));
1182        assert!(!slot.is_free());
1183
1184        // Can't claim from different PID
1185        assert!(!slot.try_claim(5678, 200, 2));
1186
1187        // Can re-claim from same PID
1188        assert!(slot.try_claim(1234, 300, 3));
1189
1190        // Release
1191        slot.release(1234);
1192        assert!(slot.is_free());
1193    }
1194}