Skip to main content

sochdb_storage/
mvcc_concurrent.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Concurrent MVCC for Multi-Reader Single-Writer Embedded Mode
19//!
20//! This module implements lock-free concurrent reads with single-writer
21//! coordination using shared memory MVCC metadata.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │  Shared Memory MVCC Metadata (.mvcc_metadata file, mmap'd)      │
28//! ├─────────────────────────────────────────────────────────────────┤
29//! │  Header:                                                         │
30//! │    - magic, version, page_size                                   │
31//! │    - current_epoch (AtomicU64)                                   │
32//! │    - current_ts (AtomicU64, HLC timestamp)                       │
33//! │    - writer_lock (AtomicU32, 0=free, pid=locked)                │
34//! │                                                                  │
35//! │  Reader Table (1024 slots × 64 bytes = 64KB):                    │
36//! │    [slot 0]: pid, snapshot_ts, epoch, last_heartbeat             │
37//! │    [slot 1]: ...                                                 │
38//! │    [slot N]: ...                                                 │
39//! │                                                                  │
40//! │  Version Store (DashMap-like lock-free hashtable):               │
41//! │    key → [version @ ts=105, version @ ts=99, ...]                │
42//! └─────────────────────────────────────────────────────────────────┘
43//! ```
44//!
45//! ## Performance Model
46//!
47//! - Lock-free read: ~100ns (atomic load + version lookup)
48//! - Writer lock acquisition: ~20ns (uncontended CAS)
49//! - Version GC: O(N_versions) every 1000 commits
50//!
51//! ## Safety
52//!
53//! - Readers are lock-free (no blocking, no starvation)
54//! - Single writer ensures WAL consistency
55//! - Epoch-based GC prevents use-after-free
56
57use std::fs::{File, OpenOptions};
58use std::path::{Path, PathBuf};
59use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
60use std::time::{Duration, SystemTime, UNIX_EPOCH};
61
62use dashmap::DashMap;
63use parking_lot::RwLock;
64use sochdb_core::{Result, SochDBError};
65use sochdb_core::version_chain::{
66    BinarySearchChain, ChainEntry,
67    MvccVersionChain, MvccVersionChainMut, WriteConflictDetection,
68    VisibilityContext, TxnId, Timestamp,
69};
70
71// Type aliases to avoid conflicts with other modules
72pub type ConcurrentVersionChain = VersionChain;
73pub type ConcurrentVersionEntry = VersionEntry;
74
75// =============================================================================
76// Constants
77// =============================================================================
78
79/// Magic bytes for MVCC metadata file: "SOCHMVCC"
80const MVCC_MAGIC: u64 = 0x43435F564D484353; // "SOCHMVCC" little-endian
81
82/// Current format version
83const MVCC_VERSION: u32 = 1;
84
85/// Maximum concurrent readers
86const MAX_READERS: usize = 1024;
87
88/// Reader slot size (64 bytes = 1 cache line)
89const READER_SLOT_SIZE: usize = 64;
90
91/// Header size
92const HEADER_SIZE: usize = 64;
93
94/// Total metadata size (header + reader table)
95const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
96
97/// Stale reader timeout (60 seconds)
98const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
99
100/// GC interval (every N commits)
101const GC_COMMIT_INTERVAL: u64 = 1000;
102
103// =============================================================================
104// Hybrid Logical Clock
105// =============================================================================
106
107/// Hybrid Logical Clock for monotonic timestamps
108///
109/// Format: [48-bit physical time | 16-bit logical counter]
110///
111/// Properties:
112/// 1. Monotonically increasing
113/// 2. Causally ordered
114/// 3. Resolution: 65,536 events per millisecond
115#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
116pub struct HlcTimestamp(pub u64);
117
118impl HlcTimestamp {
119    /// Create timestamp from physical time (ms) and logical counter
120    #[inline]
121    pub fn new(physical_ms: u64, logical: u16) -> Self {
122        Self((physical_ms << 16) | (logical as u64))
123    }
124
125    /// Get physical time component (milliseconds since epoch)
126    #[inline]
127    pub fn physical_ms(&self) -> u64 {
128        self.0 >> 16
129    }
130
131    /// Get logical counter component
132    #[inline]
133    pub fn logical(&self) -> u16 {
134        (self.0 & 0xFFFF) as u16
135    }
136
137    /// Get raw value
138    #[inline]
139    pub fn raw(&self) -> u64 {
140        self.0
141    }
142
143    /// Allocate next timestamp (atomic, lock-free)
144    ///
145    /// Algorithm:
146    /// 1. Read current physical time
147    /// 2. If physical > last_physical: reset logical to 0
148    /// 3. Else: increment logical counter
149    /// 4. CAS to update, retry on conflict
150    pub fn allocate_next(last: &AtomicU64) -> Self {
151        let physical_now = SystemTime::now()
152            .duration_since(UNIX_EPOCH)
153            .unwrap()
154            .as_millis() as u64;
155
156        loop {
157            let last_val = last.load(Ordering::Acquire);
158            let last_phys = last_val >> 16;
159            let last_log = (last_val & 0xFFFF) as u16;
160
161            let (new_phys, new_log) = if physical_now > last_phys {
162                (physical_now, 0u16)
163            } else {
164                // Clock hasn't advanced, increment logical
165                (last_phys, last_log.saturating_add(1))
166            };
167
168            let new_val = (new_phys << 16) | (new_log as u64);
169
170            if last
171                .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
172                .is_ok()
173            {
174                return Self(new_val);
175            }
176            // CAS failed, retry
177            std::hint::spin_loop();
178        }
179    }
180
181    /// Read current timestamp without advancing
182    #[inline]
183    pub fn read_current(ts: &AtomicU64) -> Self {
184        Self(ts.load(Ordering::Acquire))
185    }
186}
187
188impl From<u64> for HlcTimestamp {
189    fn from(val: u64) -> Self {
190        Self(val)
191    }
192}
193
194impl From<HlcTimestamp> for u64 {
195    fn from(ts: HlcTimestamp) -> Self {
196        ts.0
197    }
198}
199
200// =============================================================================
201// Reader Slot (Cache-Line Aligned)
202// =============================================================================
203
204/// Reader slot in shared memory (64 bytes = 1 cache line)
205///
206/// Each active reader registers in a slot to prevent GC from
207/// reclaiming versions it might need.
208///
209/// Layout (with #[repr(C, align(64))]):
210///   pid:            AtomicU32 (4 bytes)
211///   <pad>:          4 bytes   (alignment for AtomicU64)
212///   snapshot_ts:    AtomicU64 (8 bytes)
213///   epoch:          AtomicU32 (4 bytes)
214///   <pad>:          4 bytes   (alignment for AtomicU64)
215///   last_heartbeat: AtomicU64 (8 bytes)
216///   _reserved:      [u8; 32]  (pad to 64 bytes total)
217///   Total: 64 bytes — fits exactly in one cache line
218#[repr(C, align(64))]
219#[derive(Debug)]
220pub struct ReaderSlot {
221    /// Process ID (0 = slot is free)
222    pub pid: AtomicU32,
223    /// Snapshot timestamp this reader is using
224    pub snapshot_ts: AtomicU64,
225    /// Epoch number when reader registered
226    pub epoch: AtomicU32,
227    /// Last heartbeat (microseconds since epoch)
228    pub last_heartbeat: AtomicU64,
229    /// Reserved for future use (sized to make struct exactly 64 bytes)
230    _reserved: [u8; 32],
231}
232
233impl ReaderSlot {
234    /// Create an empty reader slot
235    pub const fn empty() -> Self {
236        Self {
237            pid: AtomicU32::new(0),
238            snapshot_ts: AtomicU64::new(0),
239            epoch: AtomicU32::new(0),
240            last_heartbeat: AtomicU64::new(0),
241            _reserved: [0u8; 32],
242        }
243    }
244
245    /// Check if slot is free
246    #[inline]
247    pub fn is_free(&self) -> bool {
248        self.pid.load(Ordering::Acquire) == 0
249    }
250
251    /// Try to claim this slot for reading
252    ///
253    /// Returns true if successfully claimed.
254    #[inline]
255    pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
256        let current_pid = self.pid.load(Ordering::Acquire);
257        
258        // Only claim if free or already ours
259        if current_pid != 0 && current_pid != my_pid {
260            return false;
261        }
262
263        if self
264            .pid
265            .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
266            .is_ok()
267        {
268            // Successfully claimed, update metadata
269            self.snapshot_ts.store(snapshot_ts, Ordering::Release);
270            self.epoch.store(epoch, Ordering::Release);
271            self.last_heartbeat
272                .store(current_time_us(), Ordering::Release);
273            true
274        } else {
275            false
276        }
277    }
278
279    /// Release this slot
280    #[inline]
281    pub fn release(&self, my_pid: u32) {
282        // Only release if we own it
283        if self.pid.load(Ordering::Acquire) == my_pid {
284            self.snapshot_ts.store(0, Ordering::Release);
285            self.pid.store(0, Ordering::Release);
286        }
287    }
288
289    /// Update heartbeat
290    #[inline]
291    pub fn heartbeat(&self) {
292        self.last_heartbeat
293            .store(current_time_us(), Ordering::Release);
294    }
295
296    /// Check if this slot is stale (process crashed or hung)
297    #[inline]
298    pub fn is_stale(&self, now_us: u64) -> bool {
299        let pid = self.pid.load(Ordering::Acquire);
300        if pid == 0 {
301            return false; // Empty slot
302        }
303
304        // Check heartbeat timeout
305        let last_hb = self.last_heartbeat.load(Ordering::Acquire);
306        if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
307            return true;
308        }
309
310        // Check if process still exists
311        !process_exists(pid)
312    }
313}
314
315// =============================================================================
316// MVCC Metadata Header
317// =============================================================================
318
319/// MVCC metadata file header
320#[repr(C)]
321#[derive(Debug)]
322pub struct MvccHeader {
323    /// Magic bytes for validation
324    pub magic: u64,
325    /// Format version
326    pub version: u32,
327    /// Page size
328    pub page_size: u32,
329    /// Number of reader slots
330    pub num_readers: u32,
331    /// Current epoch (incremented on recovery/GC)
332    pub current_epoch: AtomicU64,
333    /// Current HLC timestamp
334    pub current_ts: AtomicU64,
335    /// Writer lock (0 = free, pid = locked)
336    pub writer_lock: AtomicU32,
337    /// Number of commits since last GC
338    pub commits_since_gc: AtomicU64,
339    /// Reserved for alignment
340    _reserved: [u8; 4],
341}
342
343impl MvccHeader {
344    /// Create new header with default values
345    pub fn new() -> Self {
346        Self {
347            magic: MVCC_MAGIC,
348            version: MVCC_VERSION,
349            page_size: 4096,
350            num_readers: MAX_READERS as u32,
351            current_epoch: AtomicU64::new(1),
352            current_ts: AtomicU64::new(HlcTimestamp::new(
353                SystemTime::now()
354                    .duration_since(UNIX_EPOCH)
355                    .unwrap()
356                    .as_millis() as u64,
357                0,
358            ).raw()),
359            writer_lock: AtomicU32::new(0),
360            commits_since_gc: AtomicU64::new(0),
361            _reserved: [0u8; 4],
362        }
363    }
364
365    /// Validate header
366    pub fn validate(&self) -> Result<()> {
367        if self.magic != MVCC_MAGIC {
368            return Err(SochDBError::Corruption(
369                "Invalid MVCC metadata magic".into(),
370            ));
371        }
372        if self.version != MVCC_VERSION {
373            return Err(SochDBError::Corruption(format!(
374                "Unsupported MVCC version: {} (expected {})",
375                self.version, MVCC_VERSION
376            )));
377        }
378        Ok(())
379    }
380}
381
382impl Default for MvccHeader {
383    fn default() -> Self {
384        Self::new()
385    }
386}
387
388// =============================================================================
389// Version Entry
390// =============================================================================
391
392/// A single version of a key-value pair
393#[derive(Debug, Clone)]
394pub struct VersionEntry {
395    /// Commit timestamp (HLC)
396    pub commit_ts: u64,
397    /// Transaction ID that created this version
398    pub txn_id: u64,
399    /// Epoch when this version was created
400    pub epoch: u32,
401    /// The value (None = tombstone/deletion)
402    pub value: Option<Vec<u8>>,
403}
404
405impl VersionEntry {
406    /// Create new version entry
407    pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
408        Self {
409            commit_ts,
410            txn_id,
411            epoch,
412            value,
413        }
414    }
415
416    /// Check if this version is visible at given snapshot
417    #[inline]
418    pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
419        self.commit_ts > 0 && self.commit_ts < snapshot_ts
420    }
421}
422
423// Rec 11: Implement ChainEntry so BinarySearchChain<VersionEntry> works
424impl ChainEntry for VersionEntry {
425    #[inline] fn commit_ts(&self) -> u64 { self.commit_ts }
426    #[inline] fn txn_id(&self) -> u64 { self.txn_id }
427    #[inline] fn set_commit_ts(&mut self, ts: u64) { self.commit_ts = ts; }
428}
429
430// =============================================================================
431// Version Chain (Sorted by commit_ts descending) — Rec 11: Consolidated
432// =============================================================================
433
434/// Chain of versions for a single key, sorted by commit_ts descending
435///
436/// Optimized for:
437/// - O(log V) reads via binary search
438/// - O(1) writes (prepend to front)
439/// - O(V) GC (linear scan with compaction)
440///
441/// ## Rec 11: Consolidated
442///
443/// Delegates binary-search logic to `BinarySearchChain<VersionEntry>` from
444/// sochdb-core, eliminating duplication with `durable_storage::VersionChain`.
445#[derive(Debug, Default)]
446pub struct VersionChain {
447    /// Consolidated binary-search chain (Rec 11)
448    inner: BinarySearchChain<VersionEntry>,
449}
450
451impl VersionChain {
452    /// Create empty version chain
453    pub fn new() -> Self {
454        Self { inner: BinarySearchChain::new() }
455    }
456
457    /// Add uncommitted version
458    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
459        self.inner.set_uncommitted(VersionEntry {
460            commit_ts: 0,
461            txn_id,
462            epoch,
463            value,
464        });
465    }
466
467    /// Commit the uncommitted version
468    #[inline]
469    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
470        self.inner.commit(txn_id, commit_ts)
471    }
472
473    /// Abort uncommitted version
474    #[inline]
475    pub fn abort(&mut self, txn_id: u64) {
476        self.inner.abort(txn_id);
477    }
478
479    /// Read at snapshot timestamp
480    ///
481    /// Complexity: O(log V) via binary search
482    #[inline]
483    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
484        self.inner.read_at(snapshot_ts, current_txn_id)
485    }
486
487    /// Check if there's a write conflict
488    #[inline]
489    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
490        self.inner.has_write_conflict(my_txn_id)
491    }
492
493    /// Garbage collect old versions
494    ///
495    /// Custom GC: filters by both epoch AND snapshot timestamp.
496    /// Returns number of versions reclaimed.
497    pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
498        let versions = self.inner.committed_versions_mut();
499        if versions.len() <= 1 {
500            return 0;
501        }
502
503        let original_len = versions.len();
504
505        let mut keep_count = 1; // Always keep newest
506        for v in versions.iter().skip(1) {
507            if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
508                keep_count += 1;
509            } else {
510                break; // Sorted descending, all remaining are older
511            }
512        }
513
514        versions.truncate(keep_count);
515        original_len - versions.len()
516    }
517
518    /// Get number of versions
519    #[inline]
520    pub fn len(&self) -> usize {
521        self.inner.version_count()
522    }
523
524    /// Check if empty
525    #[inline]
526    pub fn is_empty(&self) -> bool {
527        self.inner.is_empty()
528    }
529}
530
531// =============================================================================
532// Rec 6: Unified Version Chain Trait Implementations
533// =============================================================================
534
535impl MvccVersionChain for VersionChain {
536    type Value = Option<Vec<u8>>;
537
538    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
539        self.inner.read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
540            .map(|v| &v.value)
541    }
542
543    fn get_latest(&self) -> Option<&Self::Value> {
544        self.inner.latest().map(|v| &v.value)
545    }
546
547    fn version_count(&self) -> usize {
548        self.inner.version_count()
549    }
550}
551
552impl MvccVersionChainMut for VersionChain {
553    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
554        self.add_uncommitted(value, txn_id, 0);
555    }
556
557    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
558        self.inner.commit(txn_id, commit_ts)
559    }
560
561    fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
562        self.add_uncommitted(None, txn_id, 0);
563        true
564    }
565
566    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
567        let removed = self.gc(0, min_visible_ts);
568        (removed, removed * std::mem::size_of::<VersionEntry>())
569    }
570}
571
572impl WriteConflictDetection for VersionChain {
573    fn has_write_conflict(&self, txn_id: TxnId) -> bool {
574        self.inner.has_write_conflict(txn_id)
575    }
576}
577
578// =============================================================================
579// Concurrent Version Store
580// =============================================================================
581
582/// Lock-free version store using DashMap
583///
584/// Provides O(1) key lookup + O(log V) version lookup per key.
585pub struct VersionStore {
586    /// Key → VersionChain mapping
587    data: DashMap<Vec<u8>, VersionChain>,
588    /// Statistics
589    stats: VersionStoreStats,
590}
591
592/// Version store statistics
593#[derive(Debug, Default)]
594pub struct VersionStoreStats {
595    /// Total number of keys
596    pub num_keys: AtomicU64,
597    /// Total number of versions across all keys
598    pub num_versions: AtomicU64,
599    /// Number of GC passes
600    pub gc_passes: AtomicU64,
601    /// Versions reclaimed by GC
602    pub versions_reclaimed: AtomicU64,
603}
604
605impl VersionStore {
606    /// Create new version store
607    pub fn new() -> Self {
608        Self {
609            data: DashMap::new(),
610            stats: VersionStoreStats::default(),
611        }
612    }
613
614    /// Insert a new uncommitted version
615    pub fn insert_uncommitted(
616        &self,
617        key: &[u8],
618        value: Option<Vec<u8>>,
619        txn_id: u64,
620        epoch: u32,
621    ) -> Result<()> {
622        let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
623            self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
624            VersionChain::new()
625        });
626
627        // Check for write conflict
628        if entry.has_write_conflict(txn_id) {
629            return Err(SochDBError::Internal(
630                "Write conflict: another transaction has uncommitted write".into(),
631            ));
632        }
633
634        entry.add_uncommitted(value, txn_id, epoch);
635        self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
636        Ok(())
637    }
638
639    /// Commit a version
640    pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
641        if let Some(mut entry) = self.data.get_mut(key) {
642            return entry.commit(txn_id, commit_ts);
643        }
644        false
645    }
646
647    /// Abort uncommitted version
648    pub fn abort(&self, key: &[u8], txn_id: u64) {
649        if let Some(mut entry) = self.data.get_mut(key) {
650            entry.abort(txn_id);
651            self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
652        }
653    }
654
655    /// Read value at snapshot timestamp
656    pub fn get(&self, key: &[u8], snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<Vec<u8>> {
657        self.data.get(key).and_then(|chain| {
658            chain
659                .read_at(snapshot_ts, current_txn_id)
660                .and_then(|v| v.value.clone())
661        })
662    }
663
664    /// Check if key exists at snapshot
665    pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
666        self.data
667            .get(key)
668            .map(|chain| chain.read_at(snapshot_ts, None).is_some())
669            .unwrap_or(false)
670    }
671
672    /// Run garbage collection
673    ///
674    /// Removes old versions that are no longer visible to any reader.
675    pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
676        let mut total_reclaimed = 0;
677
678        for mut entry in self.data.iter_mut() {
679            let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
680            total_reclaimed += reclaimed;
681        }
682
683        self.stats
684            .gc_passes
685            .fetch_add(1, Ordering::Relaxed);
686        self.stats
687            .versions_reclaimed
688            .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
689
690        total_reclaimed
691    }
692
693    /// Get number of keys
694    pub fn len(&self) -> usize {
695        self.data.len()
696    }
697
698    /// Check if empty
699    pub fn is_empty(&self) -> bool {
700        self.data.is_empty()
701    }
702
703    /// Get statistics
704    pub fn stats(&self) -> &VersionStoreStats {
705        &self.stats
706    }
707}
708
709impl Default for VersionStore {
710    fn default() -> Self {
711        Self::new()
712    }
713}
714
715// =============================================================================
716// Rec 11: Unified MvccStore Implementation for VersionStore
717// =============================================================================
718
719impl sochdb_core::version_chain::MvccStore for VersionStore {
720    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
721        self.get(key, snapshot_ts, txn_id)
722    }
723
724    fn mvcc_put(
725        &self,
726        key: &[u8],
727        value: Option<Vec<u8>>,
728        txn_id: u64,
729    ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
730        let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
731            self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
732            VersionChain::new()
733        });
734        if entry.has_write_conflict(txn_id) {
735            return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
736        }
737        entry.add_uncommitted(value, txn_id, 0);
738        self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
739        Ok(())
740    }
741
742    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
743        self.commit(key, txn_id, commit_ts)
744    }
745
746    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
747        self.abort(key, txn_id);
748    }
749
750    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
751        self.data
752            .get(key)
753            .map(|chain| chain.has_write_conflict(txn_id))
754            .unwrap_or(false)
755    }
756
757    fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
758        let mut stats = sochdb_core::version_chain::MvccGcStats::default();
759        for mut entry in self.data.iter_mut() {
760            stats.keys_scanned += 1;
761            let removed = entry.gc(0, min_ts);
762            stats.versions_removed += removed;
763        }
764        self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
765        self.stats.versions_reclaimed.fetch_add(stats.versions_removed as u64, Ordering::Relaxed);
766        stats
767    }
768
769    fn mvcc_key_count(&self) -> usize {
770        self.len()
771    }
772}
773
774// =============================================================================
775// Concurrent MVCC Manager
776// =============================================================================
777
778/// Manager for concurrent MVCC operations
779///
780/// Coordinates:
781/// - Reader registration (lock-free)
782/// - Writer locking (single-writer)
783/// - Version store access
784/// - Garbage collection
785///
786/// The MVCC header (writer_lock, current_ts, current_epoch) is stored in
787/// a memory-mapped file so that AtomicU32/AtomicU64 CAS operations work
788/// correctly across independent OS processes. Without mmap, each process
789/// would have its own copy of these atomics, making cross-process
790/// coordination impossible.
791pub struct ConcurrentMvcc {
792    /// Path to database
793    path: PathBuf,
794    /// Memory-mapped metadata file (keeps mmap alive)
795    /// SAFETY: The mmap is opened with read-write access and the header
796    /// pointer below points into this mapping. The mapping must outlive
797    /// all references to `header`.
798    _mmap: memmap2::MmapMut,
799    /// Pointer to the MVCC header inside the mmap'd region
800    /// SAFETY: Valid for the lifetime of `_mmap`. The header is repr(C)
801    /// and the mmap region is at least METADATA_SIZE bytes.
802    header: *const MvccHeader,
803    /// Pointer to reader slots inside the mmap'd region  
804    /// SAFETY: Valid for the lifetime of `_mmap`. Points to the region
805    /// starting at offset HEADER_SIZE within the mapping.
806    reader_slots_ptr: *const ReaderSlot,
807    /// Number of reader slots
808    num_reader_slots: usize,
809    /// Version store (in-process — versions are rebuilt from WAL on open)
810    version_store: VersionStore,
811    /// Our process ID
812    our_pid: u32,
813    /// Slot index we're using (if registered as reader)
814    our_slot: RwLock<Option<usize>>,
815}
816
817// SAFETY: The mmap'd region contains only atomic types (AtomicU32, AtomicU64)
818// which are inherently safe for concurrent access from multiple threads.
819// The mmap itself is backed by a file that is shared across processes.
820unsafe impl Send for ConcurrentMvcc {}
821unsafe impl Sync for ConcurrentMvcc {}
822
823impl ConcurrentMvcc {
824    /// Open or create MVCC manager with shared memory-mapped metadata
825    ///
826    /// The metadata file (.mvcc_metadata) is mmap'd so that the writer_lock,
827    /// current_ts, and reader slots are shared across all processes that open
828    /// the same database. This enables true cross-process atomic coordination.
829    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
830        let path = path.as_ref().to_path_buf();
831        std::fs::create_dir_all(&path)?;
832
833        let metadata_path = path.join(".mvcc_metadata");
834        let is_new = !metadata_path.exists();
835
836        // Create or open the metadata file at the required size
837        let file = OpenOptions::new()
838            .create(true)
839            .read(true)
840            .write(true)
841            .open(&metadata_path)?;
842
843        // Ensure file is the correct size (header + reader slots)
844        let required_size = METADATA_SIZE as u64;
845        if file.metadata()?.len() < required_size {
846            file.set_len(required_size)?;
847        }
848
849        // Memory-map the file
850        // SAFETY: The file is opened read-write and we control its contents.
851        // Multiple processes may mmap the same file simultaneously — this is
852        // the intended usage pattern. The atomics in the header ensure safe
853        // concurrent access.
854        let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
855
856        if is_new || mmap.len() < METADATA_SIZE {
857            // Initialize fresh metadata
858            let header = MvccHeader::new();
859            let header_bytes = unsafe {
860                std::slice::from_raw_parts(
861                    &header as *const MvccHeader as *const u8,
862                    std::mem::size_of::<MvccHeader>(),
863                )
864            };
865            mmap[..header_bytes.len()].copy_from_slice(header_bytes);
866
867            // Zero-initialize reader slots (already zero from set_len, but be explicit)
868            for i in 0..MAX_READERS {
869                let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
870                let end = offset + READER_SLOT_SIZE;
871                if end <= mmap.len() {
872                    mmap[offset..end].fill(0);
873                }
874            }
875
876            mmap.flush()?;
877        } else {
878            // Validate existing header
879            let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
880            header_ref.validate()?;
881        }
882
883        // Get pointers into the mmap'd region
884        let header = mmap.as_ptr() as *const MvccHeader;
885        let reader_slots_ptr = unsafe {
886            mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot
887        };
888
889        Ok(Self {
890            path,
891            _mmap: mmap,
892            header,
893            reader_slots_ptr,
894            num_reader_slots: MAX_READERS,
895            version_store: VersionStore::new(),
896            our_pid: std::process::id(),
897            our_slot: RwLock::new(None),
898        })
899    }
900
901    /// Get reference to the shared header
902    ///
903    /// SAFETY: The header pointer is valid for the lifetime of self._mmap
904    #[inline]
905    fn header(&self) -> &MvccHeader {
906        unsafe { &*self.header }
907    }
908
909    /// Get reference to a reader slot
910    ///
911    /// SAFETY: The pointer is valid for the lifetime of self._mmap
912    #[inline]
913    fn reader_slot(&self, idx: usize) -> &ReaderSlot {
914        assert!(idx < self.num_reader_slots);
915        unsafe { &*self.reader_slots_ptr.add(idx) }
916    }
917
918    /// Allocate next timestamp
919    #[inline]
920    pub fn allocate_timestamp(&self) -> HlcTimestamp {
921        HlcTimestamp::allocate_next(&self.header().current_ts)
922    }
923
924    /// Get current timestamp without advancing
925    #[inline]
926    pub fn current_timestamp(&self) -> HlcTimestamp {
927        HlcTimestamp::read_current(&self.header().current_ts)
928    }
929
930    /// Get current epoch
931    #[inline]
932    pub fn current_epoch(&self) -> u64 {
933        self.header().current_epoch.load(Ordering::Acquire)
934    }
935
936    /// Register as active reader
937    ///
938    /// Returns slot index on success.
939    /// Must call `unregister_reader()` when done.
940    pub fn register_reader(&self) -> Result<usize> {
941        let snapshot_ts = self.current_timestamp().raw();
942        let epoch = self.current_epoch() as u32;
943
944        // Find free slot in shared mmap'd reader table
945        for i in 0..self.num_reader_slots {
946            let slot = self.reader_slot(i);
947            if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
948                *self.our_slot.write() = Some(i);
949                return Ok(i);
950            }
951        }
952
953        Err(SochDBError::ResourceExhausted(
954            "Too many concurrent readers".into(),
955        ))
956    }
957
958    /// Unregister as reader
959    pub fn unregister_reader(&self, slot_idx: usize) {
960        if slot_idx < self.num_reader_slots {
961            self.reader_slot(slot_idx).release(self.our_pid);
962            *self.our_slot.write() = None;
963        }
964    }
965
966    /// Try to acquire writer lock
967    ///
968    /// Returns WriterGuard on success, which releases lock on drop.
969    pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
970        let current = self.header().writer_lock.load(Ordering::Acquire);
971
972        if current == 0 {
973            // Try to acquire (CAS on shared mmap'd atomic)
974            if self
975                .header()
976                .writer_lock
977                .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
978                .is_ok()
979            {
980                return Ok(WriterGuard { mvcc: self });
981            }
982        } else if current == self.our_pid {
983            // Already own the lock (reentrant)
984            return Ok(WriterGuard { mvcc: self });
985        }
986
987        Err(SochDBError::LockError(format!(
988            "Writer lock held by process {}",
989            current
990        )))
991    }
992
993    /// Acquire writer lock with timeout
994    pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
995        let deadline = std::time::Instant::now() + timeout;
996
997        loop {
998            match self.try_acquire_writer() {
999                Ok(guard) => return Ok(guard),
1000                Err(_) if std::time::Instant::now() < deadline => {
1001                    std::thread::sleep(Duration::from_micros(100));
1002                }
1003                Err(e) => return Err(e),
1004            }
1005        }
1006    }
1007
1008    /// Release writer lock
1009    fn release_writer(&self) {
1010        let current = self.header().writer_lock.load(Ordering::Acquire);
1011        if current == self.our_pid {
1012            self.header().writer_lock.store(0, Ordering::Release);
1013        }
1014    }
1015
1016    /// Get version store
1017    pub fn version_store(&self) -> &VersionStore {
1018        &self.version_store
1019    }
1020
1021    /// Calculate minimum visible snapshot across all readers
1022    pub fn min_active_snapshot(&self) -> u64 {
1023        let mut min_ts = u64::MAX;
1024
1025        for i in 0..self.num_reader_slots {
1026            let slot = self.reader_slot(i);
1027            let pid = slot.pid.load(Ordering::Acquire);
1028            if pid != 0 {
1029                let ts = slot.snapshot_ts.load(Ordering::Acquire);
1030                if ts > 0 && ts < min_ts {
1031                    min_ts = ts;
1032                }
1033            }
1034        }
1035
1036        min_ts
1037    }
1038
1039    /// Calculate minimum active epoch across all readers
1040    pub fn min_active_epoch(&self) -> u32 {
1041        let mut min_epoch = u32::MAX;
1042
1043        for i in 0..self.num_reader_slots {
1044            let slot = self.reader_slot(i);
1045            let pid = slot.pid.load(Ordering::Acquire);
1046            if pid != 0 {
1047                let epoch = slot.epoch.load(Ordering::Acquire);
1048                if epoch < min_epoch {
1049                    min_epoch = epoch;
1050                }
1051            }
1052        }
1053
1054        if min_epoch == u32::MAX {
1055            // No active readers, use current epoch
1056            self.current_epoch() as u32
1057        } else {
1058            min_epoch
1059        }
1060    }
1061
1062    /// Run garbage collection
1063    ///
1064    /// Should be called periodically (e.g., every 1000 commits).
1065    pub fn run_gc(&self) -> usize {
1066        let min_epoch = self.min_active_epoch();
1067        let min_snapshot = self.min_active_snapshot();
1068
1069        self.version_store.gc(min_epoch, min_snapshot)
1070    }
1071
1072    /// Check if GC should run (based on commit count)
1073    pub fn should_run_gc(&self) -> bool {
1074        self.header()
1075            .commits_since_gc
1076            .load(Ordering::Relaxed)
1077            >= GC_COMMIT_INTERVAL
1078    }
1079
1080    /// Increment commit count and maybe run GC
1081    pub fn on_commit(&self) {
1082        let count = self
1083            .header()
1084            .commits_since_gc
1085            .fetch_add(1, Ordering::Relaxed);
1086
1087        if count >= GC_COMMIT_INTERVAL {
1088            self.header().commits_since_gc.store(0, Ordering::Relaxed);
1089            let _ = self.run_gc();
1090        }
1091    }
1092
1093    /// Clean up stale readers (from crashed processes)
1094    pub fn cleanup_stale_readers(&self) -> usize {
1095        let now = current_time_us();
1096        let mut cleaned = 0;
1097
1098        for i in 0..self.num_reader_slots {
1099            let slot = self.reader_slot(i);
1100            if slot.is_stale(now) {
1101                slot.pid.store(0, Ordering::Release);
1102                cleaned += 1;
1103            }
1104        }
1105
1106        cleaned
1107    }
1108
1109    /// Advance epoch (called on recovery)
1110    pub fn advance_epoch(&self) -> u64 {
1111        self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
1112    }
1113}
1114
1115impl Drop for ConcurrentMvcc {
1116    fn drop(&mut self) {
1117        // Release any reader slot we hold
1118        if let Some(slot_idx) = *self.our_slot.read() {
1119            self.unregister_reader(slot_idx);
1120        }
1121
1122        // Release writer lock if we hold it
1123        self.release_writer();
1124    }
1125}
1126
1127// =============================================================================
1128// Writer Guard (RAII)
1129// =============================================================================
1130
1131/// Guard that releases writer lock on drop
1132pub struct WriterGuard<'a> {
1133    mvcc: &'a ConcurrentMvcc,
1134}
1135
1136impl<'a> Drop for WriterGuard<'a> {
1137    fn drop(&mut self) {
1138        self.mvcc.release_writer();
1139    }
1140}
1141
1142// =============================================================================
1143// Utility Functions
1144// =============================================================================
1145
1146/// Get current time in microseconds since epoch
1147#[inline]
1148fn current_time_us() -> u64 {
1149    SystemTime::now()
1150        .duration_since(UNIX_EPOCH)
1151        .unwrap()
1152        .as_micros() as u64
1153}
1154
1155/// Check if a process exists
1156#[cfg(unix)]
1157fn process_exists(pid: u32) -> bool {
1158    // kill(pid, 0) checks existence without sending signal
1159    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1160    if result == 0 {
1161        true
1162    } else {
1163        // ESRCH = no such process
1164        std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1165    }
1166}
1167
1168#[cfg(windows)]
1169fn process_exists(pid: u32) -> bool {
1170    use windows_sys::Win32::Foundation::CloseHandle;
1171    use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1172
1173    unsafe {
1174        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1175        if handle == 0 {
1176            false
1177        } else {
1178            CloseHandle(handle);
1179            true
1180        }
1181    }
1182}
1183
1184#[cfg(not(any(unix, windows)))]
1185fn process_exists(_pid: u32) -> bool {
1186    true // Assume exists on unknown platforms
1187}
1188
1189// =============================================================================
1190// Tests
1191// =============================================================================
1192
1193#[cfg(test)]
1194mod tests {
1195    use super::*;
1196    use std::sync::Arc;
1197    use std::thread;
1198
1199    #[test]
1200    fn test_struct_sizes() {
1201        eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
1202        eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
1203        eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
1204        eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
1205        eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
1206        eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
1207        eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
1208
1209        assert_eq!(std::mem::size_of::<MvccHeader>(), HEADER_SIZE,
1210            "MvccHeader size mismatch! Actual: {}, Expected: {}",
1211            std::mem::size_of::<MvccHeader>(), HEADER_SIZE);
1212        assert_eq!(std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE,
1213            "ReaderSlot size mismatch! Actual: {}, Expected: {}",
1214            std::mem::size_of::<ReaderSlot>(), READER_SLOT_SIZE);
1215    }
1216
1217    #[test]
1218    fn test_hlc_timestamp_ordering() {
1219        let ts = AtomicU64::new(0);
1220
1221        let t1 = HlcTimestamp::allocate_next(&ts);
1222        let t2 = HlcTimestamp::allocate_next(&ts);
1223        let t3 = HlcTimestamp::allocate_next(&ts);
1224
1225        assert!(t1.raw() < t2.raw());
1226        assert!(t2.raw() < t3.raw());
1227    }
1228
1229    #[test]
1230    fn test_hlc_timestamp_concurrent() {
1231        let ts = Arc::new(AtomicU64::new(0));
1232        let mut handles = vec![];
1233
1234        for _ in 0..8 {
1235            let ts = ts.clone();
1236            handles.push(thread::spawn(move || {
1237                let mut timestamps = vec![];
1238                for _ in 0..1000 {
1239                    timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1240                }
1241                timestamps
1242            }));
1243        }
1244
1245        let mut all_ts: Vec<u64> = handles
1246            .into_iter()
1247            .flat_map(|h| h.join().unwrap())
1248            .collect();
1249
1250        // All timestamps should be unique
1251        all_ts.sort();
1252        let len_before = all_ts.len();
1253        all_ts.dedup();
1254        assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1255    }
1256
1257    #[test]
1258    fn test_version_chain_read_at() {
1259        let mut chain = VersionChain::new();
1260
1261        // Add committed versions via the public API
1262        chain.add_uncommitted(Some(b"v80".to_vec()), 3, 1);
1263        chain.commit(3, 80);
1264        chain.add_uncommitted(Some(b"v90".to_vec()), 2, 1);
1265        chain.commit(2, 90);
1266        chain.add_uncommitted(Some(b"v100".to_vec()), 1, 1);
1267        chain.commit(1, 100);
1268
1269        // Read at different snapshots
1270        let v = chain.read_at(105, None).unwrap();
1271        assert_eq!(v.value, Some(b"v100".to_vec()));
1272
1273        let v = chain.read_at(95, None).unwrap();
1274        assert_eq!(v.value, Some(b"v90".to_vec()));
1275
1276        let v = chain.read_at(85, None).unwrap();
1277        assert_eq!(v.value, Some(b"v80".to_vec()));
1278
1279        // Read before all versions
1280        assert!(chain.read_at(75, None).is_none());
1281    }
1282
1283    #[test]
1284    fn test_version_chain_gc() {
1285        let mut chain = VersionChain::new();
1286
1287        // Add 10 committed versions with varying epochs via public API
1288        for i in (0..10u64).rev() {
1289            chain.add_uncommitted(
1290                Some(format!("v{}", 100 - i * 5).into_bytes()),
1291                i,
1292                (10 - i) as u32,
1293            );
1294            chain.commit(i, 100 - i * 5);
1295        }
1296
1297        assert_eq!(chain.len(), 10);
1298
1299        // GC with min_epoch = 7, min_snapshot = 75
1300        // Keeps versions where epoch >= 7 OR commit_ts >= 75
1301        // epoch >= 7: versions at ts=100, 95, 90, 85 (epochs 10, 9, 8, 7)
1302        // commit_ts >= 75: versions at ts=100, 95, 90, 85, 80, 75
1303        // But we always keep at least the newest one
1304        let reclaimed = chain.gc(7, 75);
1305
1306        // Versions that should be reclaimed:
1307        // ts=70 (epoch=4), ts=65 (epoch=3), ts=60 (epoch=2), ts=55 (epoch=1)
1308        // That's 4 versions reclaimed
1309        assert!(reclaimed > 0, "Should have reclaimed some versions, got {}", reclaimed);
1310        assert!(chain.len() >= 1, "Should keep at least one version");
1311    }
1312
1313    #[test]
1314    fn test_version_store_basic() {
1315        let store = VersionStore::new();
1316
1317        // Insert uncommitted
1318        store
1319            .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1320            .unwrap();
1321
1322        // Commit it
1323        assert!(store.commit(b"key1", 1, 100));
1324
1325        // Read at snapshot after commit
1326        let value = store.get(b"key1", 150, None);
1327        assert_eq!(value, Some(b"value1".to_vec()));
1328
1329        // Read at snapshot before commit
1330        let value = store.get(b"key1", 50, None);
1331        assert!(value.is_none());
1332    }
1333
1334    #[test]
1335    fn test_reader_slot_claim_release() {
1336        let slot = ReaderSlot::empty();
1337
1338        assert!(slot.is_free());
1339
1340        // Claim slot
1341        assert!(slot.try_claim(1234, 100, 1));
1342        assert!(!slot.is_free());
1343
1344        // Can't claim from different PID
1345        assert!(!slot.try_claim(5678, 200, 2));
1346
1347        // Can re-claim from same PID
1348        assert!(slot.try_claim(1234, 300, 3));
1349
1350        // Release
1351        slot.release(1234);
1352        assert!(slot.is_free());
1353    }
1354}