Skip to main content

sochdb_storage/
mvcc_concurrent.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Concurrent MVCC for Multi-Reader Single-Writer Embedded Mode
19//!
20//! This module implements lock-free concurrent reads with single-writer
21//! coordination using shared memory MVCC metadata.
22//!
23//! ## Architecture
24//!
25//! ```text
26//! ┌─────────────────────────────────────────────────────────────────┐
27//! │  Shared Memory MVCC Metadata (.mvcc_metadata file, mmap'd)      │
28//! ├─────────────────────────────────────────────────────────────────┤
29//! │  Header:                                                         │
30//! │    - magic, version, page_size                                   │
31//! │    - current_epoch (AtomicU64)                                   │
32//! │    - current_ts (AtomicU64, HLC timestamp)                       │
33//! │    - writer_lock (AtomicU32, 0=free, pid=locked)                │
34//! │                                                                  │
35//! │  Reader Table (1024 slots × 64 bytes = 64KB):                    │
36//! │    [slot 0]: pid, snapshot_ts, epoch, last_heartbeat             │
37//! │    [slot 1]: ...                                                 │
38//! │    [slot N]: ...                                                 │
39//! │                                                                  │
40//! │  Version Store (DashMap-like lock-free hashtable):               │
41//! │    key → [version @ ts=105, version @ ts=99, ...]                │
42//! └─────────────────────────────────────────────────────────────────┘
43//! ```
44//!
45//! ## Performance Model
46//!
47//! - Lock-free read: ~100ns (atomic load + version lookup)
48//! - Writer lock acquisition: ~20ns (uncontended CAS)
49//! - Version GC: O(N_versions) every 1000 commits
50//!
51//! ## Concurrency Contract (read this before scaling writes)
52//!
53//! This is a **Multi-Reader, Single-Writer** engine, by design (SQLite-class
54//! embedded semantics). The guarantees and limits are:
55//!
56//! - **Readers**: lock-free and unbounded. Each reader takes a snapshot
57//!   timestamp (HLC) and observes a consistent version view; readers never
58//!   block readers or the writer. Isolation provided to readers is **snapshot
59//!   isolation** against a serial writer timeline.
60//! - **Writers**: serialized by the single `writer_lock` (`AtomicU32`:
61//!   `0` = free, otherwise the owning pid). There is **no concurrent-writer
62//!   protocol** — by construction there is at most one writer at a time, so
63//!   there is no cross-writer SSI validation here. A second writer (in another
64//!   process) contends on the lock rather than running concurrently.
65//!
66//! ### Throughput ceiling
67//!
68//! Because writes are serialized, single-writer throughput is bounded by:
69//!
70//! ```text
71//!   write_throughput  ≤  1 / (t_crit + t_fsync)
72//! ```
73//!
74//! where `t_crit` is the critical-section time and `t_fsync` is the durability
75//! cost per commit. This is a hard ceiling for write-heavy or multi-process
76//! deployments — do not expect writes to scale with cores. To raise effective
77//! write throughput **without** changing the isolation model, coalesce many
78//! logical writes into one critical section / one fsync (batching amortizes
79//! both terms to `B / (t_crit + t_fsync)` for batch size `B`, the same √-law
80//! payoff as WAL group commit).
81//!
82//! ### Write coalescing is already provided by `EventDrivenGroupCommit`
83//!
84//! The batching layer described above is **implemented and wired** in the
85//! canonical engine: [`crate::group_commit::EventDrivenGroupCommit`] amortizes
86//! `t_fsync` by issuing a single fsync per batch of committing transactions,
87//! with adaptive batch sizing (`N* = sqrt(2·L_fsync·λ / C_wait)`, Little's Law).
88//! [`crate::durable_storage`] constructs one and routes commits through it to
89//! `TxnWal`, so the `B / (t_crit + t_fsync)` payoff is realized in production
90//! without weakening the single-writer isolation contract above. This module
91//! deliberately does **not** add a second, competing coalescing layer.
92//!
93//! ## Safety
94//!
95//! - Readers are lock-free (no blocking, no starvation)
96//! - Single writer ensures WAL consistency
97//! - Epoch-based GC prevents use-after-free
98
99use std::fs::OpenOptions;
100use std::path::{Path, PathBuf};
101use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
102use std::time::{Duration, SystemTime, UNIX_EPOCH};
103
104use dashmap::DashMap;
105use parking_lot::RwLock;
106use sochdb_core::version_chain::{
107    BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
108    VisibilityContext, WriteConflictDetection,
109};
110use sochdb_core::{Result, SochDBError};
111
112// Type aliases to avoid conflicts with other modules
113pub type ConcurrentVersionChain = VersionChain;
114pub type ConcurrentVersionEntry = VersionEntry;
115
116// =============================================================================
117// Constants
118// =============================================================================
119
120/// Magic bytes for MVCC metadata file: "SOCHMVCC"
121const MVCC_MAGIC: u64 = 0x43435F564D484353; // "SOCHMVCC" little-endian
122
123/// Current format version
124const MVCC_VERSION: u32 = 1;
125
126/// Maximum concurrent readers
127const MAX_READERS: usize = 1024;
128
129/// Reader slot size (64 bytes = 1 cache line)
130const READER_SLOT_SIZE: usize = 64;
131
132/// Header size
133const HEADER_SIZE: usize = 64;
134
135/// Total metadata size (header + reader table)
136const METADATA_SIZE: usize = HEADER_SIZE + (MAX_READERS * READER_SLOT_SIZE);
137
138/// Stale reader timeout (60 seconds)
139const STALE_READER_TIMEOUT_US: u64 = 60_000_000;
140
141/// GC interval (every N commits)
142const GC_COMMIT_INTERVAL: u64 = 1000;
143
144// =============================================================================
145// Hybrid Logical Clock
146// =============================================================================
147
148/// Hybrid Logical Clock for monotonic timestamps
149///
150/// Format: [48-bit physical time | 16-bit logical counter]
151///
152/// Properties:
153/// 1. Monotonically increasing
154/// 2. Causally ordered
155/// 3. Resolution: 65,536 events per millisecond
156#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
157pub struct HlcTimestamp(pub u64);
158
159impl HlcTimestamp {
160    /// Create timestamp from physical time (ms) and logical counter
161    #[inline]
162    pub fn new(physical_ms: u64, logical: u16) -> Self {
163        Self((physical_ms << 16) | (logical as u64))
164    }
165
166    /// Get physical time component (milliseconds since epoch)
167    #[inline]
168    pub fn physical_ms(&self) -> u64 {
169        self.0 >> 16
170    }
171
172    /// Get logical counter component
173    #[inline]
174    pub fn logical(&self) -> u16 {
175        (self.0 & 0xFFFF) as u16
176    }
177
178    /// Get raw value
179    #[inline]
180    pub fn raw(&self) -> u64 {
181        self.0
182    }
183
184    /// Allocate next timestamp (atomic, lock-free)
185    ///
186    /// Algorithm:
187    /// 1. Read current physical time
188    /// 2. If physical > last_physical: reset logical to 0
189    /// 3. Else: increment logical counter
190    /// 4. CAS to update, retry on conflict
191    pub fn allocate_next(last: &AtomicU64) -> Self {
192        let physical_now = SystemTime::now()
193            .duration_since(UNIX_EPOCH)
194            .unwrap()
195            .as_millis() as u64;
196
197        loop {
198            let last_val = last.load(Ordering::Acquire);
199            let last_phys = last_val >> 16;
200            let last_log = (last_val & 0xFFFF) as u16;
201
202            let (new_phys, new_log) = if physical_now > last_phys {
203                (physical_now, 0u16)
204            } else {
205                // Clock hasn't advanced, increment logical
206                (last_phys, last_log.saturating_add(1))
207            };
208
209            let new_val = (new_phys << 16) | (new_log as u64);
210
211            if last
212                .compare_exchange(last_val, new_val, Ordering::AcqRel, Ordering::Acquire)
213                .is_ok()
214            {
215                return Self(new_val);
216            }
217            // CAS failed, retry
218            std::hint::spin_loop();
219        }
220    }
221
222    /// Read current timestamp without advancing
223    #[inline]
224    pub fn read_current(ts: &AtomicU64) -> Self {
225        Self(ts.load(Ordering::Acquire))
226    }
227}
228
229impl From<u64> for HlcTimestamp {
230    fn from(val: u64) -> Self {
231        Self(val)
232    }
233}
234
235impl From<HlcTimestamp> for u64 {
236    fn from(ts: HlcTimestamp) -> Self {
237        ts.0
238    }
239}
240
241// =============================================================================
242// Reader Slot (Cache-Line Aligned)
243// =============================================================================
244
245/// Reader slot in shared memory (64 bytes = 1 cache line)
246///
247/// Each active reader registers in a slot to prevent GC from
248/// reclaiming versions it might need.
249///
250/// Layout (with #[repr(C, align(64))]):
251///   pid:            AtomicU32 (4 bytes)
252///   <pad>:          4 bytes   (alignment for AtomicU64)
253///   snapshot_ts:    AtomicU64 (8 bytes)
254///   epoch:          AtomicU32 (4 bytes)
255///   <pad>:          4 bytes   (alignment for AtomicU64)
256///   last_heartbeat: AtomicU64 (8 bytes)
257///   _reserved:      [u8; 32]  (pad to 64 bytes total)
258///   Total: 64 bytes — fits exactly in one cache line
259#[repr(C, align(64))]
260#[derive(Debug)]
261pub struct ReaderSlot {
262    /// Process ID (0 = slot is free)
263    pub pid: AtomicU32,
264    /// Snapshot timestamp this reader is using
265    pub snapshot_ts: AtomicU64,
266    /// Epoch number when reader registered
267    pub epoch: AtomicU32,
268    /// Last heartbeat (microseconds since epoch)
269    pub last_heartbeat: AtomicU64,
270    /// Reserved for future use (sized to make struct exactly 64 bytes)
271    _reserved: [u8; 32],
272}
273
274impl ReaderSlot {
275    /// Create an empty reader slot
276    pub const fn empty() -> Self {
277        Self {
278            pid: AtomicU32::new(0),
279            snapshot_ts: AtomicU64::new(0),
280            epoch: AtomicU32::new(0),
281            last_heartbeat: AtomicU64::new(0),
282            _reserved: [0u8; 32],
283        }
284    }
285
286    /// Check if slot is free
287    #[inline]
288    pub fn is_free(&self) -> bool {
289        self.pid.load(Ordering::Acquire) == 0
290    }
291
292    /// Try to claim this slot for reading
293    ///
294    /// Returns true if successfully claimed.
295    #[inline]
296    pub fn try_claim(&self, my_pid: u32, snapshot_ts: u64, epoch: u32) -> bool {
297        let current_pid = self.pid.load(Ordering::Acquire);
298
299        // Only claim if free or already ours
300        if current_pid != 0 && current_pid != my_pid {
301            return false;
302        }
303
304        if self
305            .pid
306            .compare_exchange(current_pid, my_pid, Ordering::AcqRel, Ordering::Acquire)
307            .is_ok()
308        {
309            // Successfully claimed, update metadata
310            self.snapshot_ts.store(snapshot_ts, Ordering::Release);
311            self.epoch.store(epoch, Ordering::Release);
312            self.last_heartbeat
313                .store(current_time_us(), Ordering::Release);
314            true
315        } else {
316            false
317        }
318    }
319
320    /// Release this slot
321    #[inline]
322    pub fn release(&self, my_pid: u32) {
323        // Only release if we own it
324        if self.pid.load(Ordering::Acquire) == my_pid {
325            self.snapshot_ts.store(0, Ordering::Release);
326            self.pid.store(0, Ordering::Release);
327        }
328    }
329
330    /// Update heartbeat
331    #[inline]
332    pub fn heartbeat(&self) {
333        self.last_heartbeat
334            .store(current_time_us(), Ordering::Release);
335    }
336
337    /// Check if this slot is stale (process crashed or hung)
338    #[inline]
339    pub fn is_stale(&self, now_us: u64) -> bool {
340        let pid = self.pid.load(Ordering::Acquire);
341        if pid == 0 {
342            return false; // Empty slot
343        }
344
345        // Check heartbeat timeout
346        let last_hb = self.last_heartbeat.load(Ordering::Acquire);
347        if now_us.saturating_sub(last_hb) > STALE_READER_TIMEOUT_US {
348            return true;
349        }
350
351        // Check if process still exists
352        !process_exists(pid)
353    }
354}
355
356// =============================================================================
357// MVCC Metadata Header
358// =============================================================================
359
360/// MVCC metadata file header
361#[repr(C)]
362#[derive(Debug)]
363pub struct MvccHeader {
364    /// Magic bytes for validation
365    pub magic: u64,
366    /// Format version
367    pub version: u32,
368    /// Page size
369    pub page_size: u32,
370    /// Number of reader slots
371    pub num_readers: u32,
372    /// Current epoch (incremented on recovery/GC)
373    pub current_epoch: AtomicU64,
374    /// Current HLC timestamp
375    pub current_ts: AtomicU64,
376    /// Writer lock (0 = free, pid = locked)
377    pub writer_lock: AtomicU32,
378    /// Number of commits since last GC
379    pub commits_since_gc: AtomicU64,
380    /// Reserved for alignment
381    _reserved: [u8; 4],
382}
383
384impl MvccHeader {
385    /// Create new header with default values
386    pub fn new() -> Self {
387        Self {
388            magic: MVCC_MAGIC,
389            version: MVCC_VERSION,
390            page_size: 4096,
391            num_readers: MAX_READERS as u32,
392            current_epoch: AtomicU64::new(1),
393            current_ts: AtomicU64::new(
394                HlcTimestamp::new(
395                    SystemTime::now()
396                        .duration_since(UNIX_EPOCH)
397                        .unwrap()
398                        .as_millis() as u64,
399                    0,
400                )
401                .raw(),
402            ),
403            writer_lock: AtomicU32::new(0),
404            commits_since_gc: AtomicU64::new(0),
405            _reserved: [0u8; 4],
406        }
407    }
408
409    /// Validate header
410    pub fn validate(&self) -> Result<()> {
411        if self.magic != MVCC_MAGIC {
412            return Err(SochDBError::Corruption(
413                "Invalid MVCC metadata magic".into(),
414            ));
415        }
416        if self.version != MVCC_VERSION {
417            return Err(SochDBError::Corruption(format!(
418                "Unsupported MVCC version: {} (expected {})",
419                self.version, MVCC_VERSION
420            )));
421        }
422        Ok(())
423    }
424}
425
426impl Default for MvccHeader {
427    fn default() -> Self {
428        Self::new()
429    }
430}
431
432// =============================================================================
433// Version Entry
434// =============================================================================
435
436/// A single version of a key-value pair
437#[derive(Debug, Clone)]
438pub struct VersionEntry {
439    /// Commit timestamp (HLC)
440    pub commit_ts: u64,
441    /// Transaction ID that created this version
442    pub txn_id: u64,
443    /// Epoch when this version was created
444    pub epoch: u32,
445    /// The value (None = tombstone/deletion)
446    pub value: Option<Vec<u8>>,
447}
448
449impl VersionEntry {
450    /// Create new version entry
451    pub fn new(commit_ts: u64, txn_id: u64, epoch: u32, value: Option<Vec<u8>>) -> Self {
452        Self {
453            commit_ts,
454            txn_id,
455            epoch,
456            value,
457        }
458    }
459
460    /// Check if this version is visible at given snapshot
461    #[inline]
462    pub fn is_visible_at(&self, snapshot_ts: u64) -> bool {
463        self.commit_ts > 0 && self.commit_ts < snapshot_ts
464    }
465}
466
467// Rec 11: Implement ChainEntry so BinarySearchChain<VersionEntry> works
468impl ChainEntry for VersionEntry {
469    #[inline]
470    fn commit_ts(&self) -> u64 {
471        self.commit_ts
472    }
473    #[inline]
474    fn txn_id(&self) -> u64 {
475        self.txn_id
476    }
477    #[inline]
478    fn set_commit_ts(&mut self, ts: u64) {
479        self.commit_ts = ts;
480    }
481}
482
483// =============================================================================
484// Version Chain (Sorted by commit_ts descending) — Rec 11: Consolidated
485// =============================================================================
486
487/// Chain of versions for a single key, sorted by commit_ts descending
488///
489/// Optimized for:
490/// - O(log V) reads via binary search
491/// - O(1) writes (prepend to front)
492/// - O(V) GC (linear scan with compaction)
493///
494/// ## Rec 11: Consolidated
495///
496/// Delegates binary-search logic to `BinarySearchChain<VersionEntry>` from
497/// sochdb-core, eliminating duplication with `durable_storage::VersionChain`.
498#[derive(Debug, Default)]
499pub struct VersionChain {
500    /// Consolidated binary-search chain (Rec 11)
501    inner: BinarySearchChain<VersionEntry>,
502}
503
504impl VersionChain {
505    /// Create empty version chain
506    pub fn new() -> Self {
507        Self {
508            inner: BinarySearchChain::new(),
509        }
510    }
511
512    /// Add uncommitted version
513    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64, epoch: u32) {
514        self.inner.set_uncommitted(VersionEntry {
515            commit_ts: 0,
516            txn_id,
517            epoch,
518            value,
519        });
520    }
521
522    /// Commit the uncommitted version
523    #[inline]
524    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
525        self.inner.commit(txn_id, commit_ts)
526    }
527
528    /// Abort uncommitted version
529    #[inline]
530    pub fn abort(&mut self, txn_id: u64) {
531        self.inner.abort(txn_id);
532    }
533
534    /// Read at snapshot timestamp
535    ///
536    /// Complexity: O(log V) via binary search
537    #[inline]
538    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&VersionEntry> {
539        self.inner.read_at(snapshot_ts, current_txn_id)
540    }
541
542    /// Check if there's a write conflict
543    #[inline]
544    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
545        self.inner.has_write_conflict(my_txn_id)
546    }
547
548    /// Garbage collect old versions
549    ///
550    /// Custom GC: filters by both epoch AND snapshot timestamp.
551    /// Returns number of versions reclaimed.
552    pub fn gc(&mut self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
553        let versions = self.inner.committed_versions_mut();
554        if versions.len() <= 1 {
555            return 0;
556        }
557
558        let original_len = versions.len();
559
560        let mut keep_count = 1; // Always keep newest
561        for v in versions.iter().skip(1) {
562            if v.epoch >= min_epoch || v.commit_ts >= min_snapshot_ts {
563                keep_count += 1;
564            } else {
565                break; // Sorted descending, all remaining are older
566            }
567        }
568
569        versions.truncate(keep_count);
570        original_len - versions.len()
571    }
572
573    /// Get number of versions
574    #[inline]
575    pub fn len(&self) -> usize {
576        self.inner.version_count()
577    }
578
579    /// Check if empty
580    #[inline]
581    pub fn is_empty(&self) -> bool {
582        self.inner.is_empty()
583    }
584}
585
586// =============================================================================
587// Rec 6: Unified Version Chain Trait Implementations
588// =============================================================================
589
590impl MvccVersionChain for VersionChain {
591    type Value = Option<Vec<u8>>;
592
593    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
594        self.inner
595            .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
596            .map(|v| &v.value)
597    }
598
599    fn get_latest(&self) -> Option<&Self::Value> {
600        self.inner.latest().map(|v| &v.value)
601    }
602
603    fn version_count(&self) -> usize {
604        self.inner.version_count()
605    }
606}
607
608impl MvccVersionChainMut for VersionChain {
609    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
610        self.add_uncommitted(value, txn_id, 0);
611    }
612
613    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
614        self.inner.commit(txn_id, commit_ts)
615    }
616
617    fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
618        self.add_uncommitted(None, txn_id, 0);
619        true
620    }
621
622    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
623        let removed = self.gc(0, min_visible_ts);
624        (removed, removed * std::mem::size_of::<VersionEntry>())
625    }
626}
627
628impl WriteConflictDetection for VersionChain {
629    fn has_write_conflict(&self, txn_id: TxnId) -> bool {
630        self.inner.has_write_conflict(txn_id)
631    }
632}
633
634// =============================================================================
635// Concurrent Version Store
636// =============================================================================
637
638/// Lock-free version store using DashMap
639///
640/// Provides O(1) key lookup + O(log V) version lookup per key.
641pub struct VersionStore {
642    /// Key → VersionChain mapping
643    data: DashMap<Vec<u8>, VersionChain>,
644    /// Statistics
645    stats: VersionStoreStats,
646}
647
648/// Version store statistics
649#[derive(Debug, Default)]
650pub struct VersionStoreStats {
651    /// Total number of keys
652    pub num_keys: AtomicU64,
653    /// Total number of versions across all keys
654    pub num_versions: AtomicU64,
655    /// Number of GC passes
656    pub gc_passes: AtomicU64,
657    /// Versions reclaimed by GC
658    pub versions_reclaimed: AtomicU64,
659}
660
661impl VersionStore {
662    /// Create new version store
663    pub fn new() -> Self {
664        Self {
665            data: DashMap::new(),
666            stats: VersionStoreStats::default(),
667        }
668    }
669
670    /// Insert a new uncommitted version
671    pub fn insert_uncommitted(
672        &self,
673        key: &[u8],
674        value: Option<Vec<u8>>,
675        txn_id: u64,
676        epoch: u32,
677    ) -> Result<()> {
678        let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
679            self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
680            VersionChain::new()
681        });
682
683        // Check for write conflict
684        if entry.has_write_conflict(txn_id) {
685            return Err(SochDBError::Internal(
686                "Write conflict: another transaction has uncommitted write".into(),
687            ));
688        }
689
690        entry.add_uncommitted(value, txn_id, epoch);
691        self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
692        Ok(())
693    }
694
695    /// Commit a version
696    pub fn commit(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
697        if let Some(mut entry) = self.data.get_mut(key) {
698            return entry.commit(txn_id, commit_ts);
699        }
700        false
701    }
702
703    /// Abort uncommitted version
704    pub fn abort(&self, key: &[u8], txn_id: u64) {
705        if let Some(mut entry) = self.data.get_mut(key) {
706            entry.abort(txn_id);
707            self.stats.num_versions.fetch_sub(1, Ordering::Relaxed);
708        }
709    }
710
711    /// Read value at snapshot timestamp
712    pub fn get(
713        &self,
714        key: &[u8],
715        snapshot_ts: u64,
716        current_txn_id: Option<u64>,
717    ) -> Option<Vec<u8>> {
718        self.data.get(key).and_then(|chain| {
719            chain
720                .read_at(snapshot_ts, current_txn_id)
721                .and_then(|v| v.value.clone())
722        })
723    }
724
725    /// Check if key exists at snapshot
726    pub fn contains(&self, key: &[u8], snapshot_ts: u64) -> bool {
727        self.data
728            .get(key)
729            .map(|chain| chain.read_at(snapshot_ts, None).is_some())
730            .unwrap_or(false)
731    }
732
733    /// Run garbage collection
734    ///
735    /// Removes old versions that are no longer visible to any reader.
736    pub fn gc(&self, min_epoch: u32, min_snapshot_ts: u64) -> usize {
737        let mut total_reclaimed = 0;
738
739        for mut entry in self.data.iter_mut() {
740            let reclaimed = entry.gc(min_epoch, min_snapshot_ts);
741            total_reclaimed += reclaimed;
742        }
743
744        self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
745        self.stats
746            .versions_reclaimed
747            .fetch_add(total_reclaimed as u64, Ordering::Relaxed);
748
749        total_reclaimed
750    }
751
752    /// Get number of keys
753    pub fn len(&self) -> usize {
754        self.data.len()
755    }
756
757    /// Check if empty
758    pub fn is_empty(&self) -> bool {
759        self.data.is_empty()
760    }
761
762    /// Get statistics
763    pub fn stats(&self) -> &VersionStoreStats {
764        &self.stats
765    }
766}
767
768impl Default for VersionStore {
769    fn default() -> Self {
770        Self::new()
771    }
772}
773
774// =============================================================================
775// Rec 11: Unified MvccStore Implementation for VersionStore
776// =============================================================================
777
778impl sochdb_core::version_chain::MvccStore for VersionStore {
779    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
780        self.get(key, snapshot_ts, txn_id)
781    }
782
783    fn mvcc_put(
784        &self,
785        key: &[u8],
786        value: Option<Vec<u8>>,
787        txn_id: u64,
788    ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
789        let mut entry = self.data.entry(key.to_vec()).or_insert_with(|| {
790            self.stats.num_keys.fetch_add(1, Ordering::Relaxed);
791            VersionChain::new()
792        });
793        if entry.has_write_conflict(txn_id) {
794            return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
795        }
796        entry.add_uncommitted(value, txn_id, 0);
797        self.stats.num_versions.fetch_add(1, Ordering::Relaxed);
798        Ok(())
799    }
800
801    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
802        self.commit(key, txn_id, commit_ts)
803    }
804
805    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
806        self.abort(key, txn_id);
807    }
808
809    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
810        self.data
811            .get(key)
812            .map(|chain| chain.has_write_conflict(txn_id))
813            .unwrap_or(false)
814    }
815
816    fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
817        let mut stats = sochdb_core::version_chain::MvccGcStats::default();
818        for mut entry in self.data.iter_mut() {
819            stats.keys_scanned += 1;
820            let removed = entry.gc(0, min_ts);
821            stats.versions_removed += removed;
822        }
823        self.stats.gc_passes.fetch_add(1, Ordering::Relaxed);
824        self.stats
825            .versions_reclaimed
826            .fetch_add(stats.versions_removed as u64, Ordering::Relaxed);
827        stats
828    }
829
830    fn mvcc_key_count(&self) -> usize {
831        self.len()
832    }
833}
834
835// =============================================================================
836// Concurrent MVCC Manager
837// =============================================================================
838
839/// Manager for concurrent MVCC operations
840///
841/// Coordinates:
842/// - Reader registration (lock-free)
843/// - Writer locking (single-writer)
844/// - Version store access
845/// - Garbage collection
846///
847/// The MVCC header (writer_lock, current_ts, current_epoch) is stored in
848/// a memory-mapped file so that AtomicU32/AtomicU64 CAS operations work
849/// correctly across independent OS processes. Without mmap, each process
850/// would have its own copy of these atomics, making cross-process
851/// coordination impossible.
852pub struct ConcurrentMvcc {
853    /// Path to database
854    path: PathBuf,
855    /// Memory-mapped metadata file (keeps mmap alive)
856    /// SAFETY: The mmap is opened with read-write access and the header
857    /// pointer below points into this mapping. The mapping must outlive
858    /// all references to `header`.
859    _mmap: memmap2::MmapMut,
860    /// Pointer to the MVCC header inside the mmap'd region
861    /// SAFETY: Valid for the lifetime of `_mmap`. The header is repr(C)
862    /// and the mmap region is at least METADATA_SIZE bytes.
863    header: *const MvccHeader,
864    /// Pointer to reader slots inside the mmap'd region  
865    /// SAFETY: Valid for the lifetime of `_mmap`. Points to the region
866    /// starting at offset HEADER_SIZE within the mapping.
867    reader_slots_ptr: *const ReaderSlot,
868    /// Number of reader slots
869    num_reader_slots: usize,
870    /// Version store (in-process — versions are rebuilt from WAL on open)
871    version_store: VersionStore,
872    /// Our process ID
873    our_pid: u32,
874    /// Slot index we're using (if registered as reader)
875    our_slot: RwLock<Option<usize>>,
876}
877
878// SAFETY: The mmap'd region contains only atomic types (AtomicU32, AtomicU64)
879// which are inherently safe for concurrent access from multiple threads.
880// The mmap itself is backed by a file that is shared across processes.
881unsafe impl Send for ConcurrentMvcc {}
882unsafe impl Sync for ConcurrentMvcc {}
883
884impl ConcurrentMvcc {
885    /// Open or create MVCC manager with shared memory-mapped metadata
886    ///
887    /// The metadata file (.mvcc_metadata) is mmap'd so that the writer_lock,
888    /// current_ts, and reader slots are shared across all processes that open
889    /// the same database. This enables true cross-process atomic coordination.
890    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
891        let path = path.as_ref().to_path_buf();
892        std::fs::create_dir_all(&path)?;
893
894        let metadata_path = path.join(".mvcc_metadata");
895        let is_new = !metadata_path.exists();
896
897        // Create or open the metadata file at the required size
898        let file = OpenOptions::new()
899            .create(true)
900            .read(true)
901            .write(true)
902            .open(&metadata_path)?;
903
904        // Ensure file is the correct size (header + reader slots)
905        let required_size = METADATA_SIZE as u64;
906        if file.metadata()?.len() < required_size {
907            file.set_len(required_size)?;
908        }
909
910        // Memory-map the file
911        // SAFETY: The file is opened read-write and we control its contents.
912        // Multiple processes may mmap the same file simultaneously — this is
913        // the intended usage pattern. The atomics in the header ensure safe
914        // concurrent access.
915        let mut mmap = unsafe { memmap2::MmapMut::map_mut(&file)? };
916
917        if is_new || mmap.len() < METADATA_SIZE {
918            // Initialize fresh metadata
919            let header = MvccHeader::new();
920            let header_bytes = unsafe {
921                std::slice::from_raw_parts(
922                    &header as *const MvccHeader as *const u8,
923                    std::mem::size_of::<MvccHeader>(),
924                )
925            };
926            mmap[..header_bytes.len()].copy_from_slice(header_bytes);
927
928            // Zero-initialize reader slots (already zero from set_len, but be explicit)
929            for i in 0..MAX_READERS {
930                let offset = HEADER_SIZE + i * READER_SLOT_SIZE;
931                let end = offset + READER_SLOT_SIZE;
932                if end <= mmap.len() {
933                    mmap[offset..end].fill(0);
934                }
935            }
936
937            mmap.flush()?;
938        } else {
939            // Validate existing header
940            let header_ref = unsafe { &*(mmap.as_ptr() as *const MvccHeader) };
941            header_ref.validate()?;
942        }
943
944        // Get pointers into the mmap'd region
945        let header = mmap.as_ptr() as *const MvccHeader;
946        let reader_slots_ptr = unsafe { mmap.as_ptr().add(HEADER_SIZE) as *const ReaderSlot };
947
948        Ok(Self {
949            path,
950            _mmap: mmap,
951            header,
952            reader_slots_ptr,
953            num_reader_slots: MAX_READERS,
954            version_store: VersionStore::new(),
955            our_pid: std::process::id(),
956            our_slot: RwLock::new(None),
957        })
958    }
959
960    /// Get reference to the shared header
961    ///
962    /// SAFETY: The header pointer is valid for the lifetime of self._mmap
963    #[inline]
964    fn header(&self) -> &MvccHeader {
965        unsafe { &*self.header }
966    }
967
968    /// Get reference to a reader slot
969    ///
970    /// SAFETY: The pointer is valid for the lifetime of self._mmap
971    #[inline]
972    fn reader_slot(&self, idx: usize) -> &ReaderSlot {
973        assert!(idx < self.num_reader_slots);
974        unsafe { &*self.reader_slots_ptr.add(idx) }
975    }
976
977    /// Allocate next timestamp
978    #[inline]
979    pub fn allocate_timestamp(&self) -> HlcTimestamp {
980        HlcTimestamp::allocate_next(&self.header().current_ts)
981    }
982
983    /// Get current timestamp without advancing
984    #[inline]
985    pub fn current_timestamp(&self) -> HlcTimestamp {
986        HlcTimestamp::read_current(&self.header().current_ts)
987    }
988
989    /// Get current epoch
990    #[inline]
991    pub fn current_epoch(&self) -> u64 {
992        self.header().current_epoch.load(Ordering::Acquire)
993    }
994
995    /// Register as active reader
996    ///
997    /// Returns slot index on success.
998    /// Must call `unregister_reader()` when done.
999    pub fn register_reader(&self) -> Result<usize> {
1000        let snapshot_ts = self.current_timestamp().raw();
1001        let epoch = self.current_epoch() as u32;
1002
1003        // Find free slot in shared mmap'd reader table
1004        for i in 0..self.num_reader_slots {
1005            let slot = self.reader_slot(i);
1006            if slot.try_claim(self.our_pid, snapshot_ts, epoch) {
1007                *self.our_slot.write() = Some(i);
1008                return Ok(i);
1009            }
1010        }
1011
1012        Err(SochDBError::ResourceExhausted(
1013            "Too many concurrent readers".into(),
1014        ))
1015    }
1016
1017    /// Unregister as reader
1018    pub fn unregister_reader(&self, slot_idx: usize) {
1019        if slot_idx < self.num_reader_slots {
1020            self.reader_slot(slot_idx).release(self.our_pid);
1021            *self.our_slot.write() = None;
1022        }
1023    }
1024
1025    /// Try to acquire writer lock
1026    ///
1027    /// Returns WriterGuard on success, which releases lock on drop.
1028    pub fn try_acquire_writer(&self) -> Result<WriterGuard<'_>> {
1029        let current = self.header().writer_lock.load(Ordering::Acquire);
1030
1031        if current == 0 {
1032            // Try to acquire (CAS on shared mmap'd atomic)
1033            if self
1034                .header()
1035                .writer_lock
1036                .compare_exchange(0, self.our_pid, Ordering::AcqRel, Ordering::Acquire)
1037                .is_ok()
1038            {
1039                return Ok(WriterGuard { mvcc: self });
1040            }
1041        } else if current == self.our_pid {
1042            // Already own the lock (reentrant)
1043            return Ok(WriterGuard { mvcc: self });
1044        }
1045
1046        Err(SochDBError::LockError(format!(
1047            "Writer lock held by process {}",
1048            current
1049        )))
1050    }
1051
1052    /// Acquire writer lock with timeout
1053    pub fn acquire_writer(&self, timeout: Duration) -> Result<WriterGuard<'_>> {
1054        let deadline = std::time::Instant::now() + timeout;
1055
1056        loop {
1057            match self.try_acquire_writer() {
1058                Ok(guard) => return Ok(guard),
1059                Err(_) if std::time::Instant::now() < deadline => {
1060                    std::thread::sleep(Duration::from_micros(100));
1061                }
1062                Err(e) => return Err(e),
1063            }
1064        }
1065    }
1066
1067    /// Release writer lock
1068    fn release_writer(&self) {
1069        let current = self.header().writer_lock.load(Ordering::Acquire);
1070        if current == self.our_pid {
1071            self.header().writer_lock.store(0, Ordering::Release);
1072        }
1073    }
1074
1075    /// Get version store
1076    pub fn version_store(&self) -> &VersionStore {
1077        &self.version_store
1078    }
1079
1080    /// Calculate minimum visible snapshot across all readers
1081    pub fn min_active_snapshot(&self) -> u64 {
1082        let mut min_ts = u64::MAX;
1083
1084        for i in 0..self.num_reader_slots {
1085            let slot = self.reader_slot(i);
1086            let pid = slot.pid.load(Ordering::Acquire);
1087            if pid != 0 {
1088                let ts = slot.snapshot_ts.load(Ordering::Acquire);
1089                if ts > 0 && ts < min_ts {
1090                    min_ts = ts;
1091                }
1092            }
1093        }
1094
1095        min_ts
1096    }
1097
1098    /// Calculate minimum active epoch across all readers
1099    pub fn min_active_epoch(&self) -> u32 {
1100        let mut min_epoch = u32::MAX;
1101
1102        for i in 0..self.num_reader_slots {
1103            let slot = self.reader_slot(i);
1104            let pid = slot.pid.load(Ordering::Acquire);
1105            if pid != 0 {
1106                let epoch = slot.epoch.load(Ordering::Acquire);
1107                if epoch < min_epoch {
1108                    min_epoch = epoch;
1109                }
1110            }
1111        }
1112
1113        if min_epoch == u32::MAX {
1114            // No active readers, use current epoch
1115            self.current_epoch() as u32
1116        } else {
1117            min_epoch
1118        }
1119    }
1120
1121    /// Run garbage collection
1122    ///
1123    /// Should be called periodically (e.g., every 1000 commits).
1124    pub fn run_gc(&self) -> usize {
1125        let min_epoch = self.min_active_epoch();
1126        let min_snapshot = self.min_active_snapshot();
1127
1128        self.version_store.gc(min_epoch, min_snapshot)
1129    }
1130
1131    /// Check if GC should run (based on commit count)
1132    pub fn should_run_gc(&self) -> bool {
1133        self.header().commits_since_gc.load(Ordering::Relaxed) >= GC_COMMIT_INTERVAL
1134    }
1135
1136    /// Increment commit count and maybe run GC
1137    pub fn on_commit(&self) {
1138        let count = self
1139            .header()
1140            .commits_since_gc
1141            .fetch_add(1, Ordering::Relaxed);
1142
1143        if count >= GC_COMMIT_INTERVAL {
1144            self.header().commits_since_gc.store(0, Ordering::Relaxed);
1145            let _ = self.run_gc();
1146        }
1147    }
1148
1149    /// Clean up stale readers (from crashed processes)
1150    pub fn cleanup_stale_readers(&self) -> usize {
1151        let now = current_time_us();
1152        let mut cleaned = 0;
1153
1154        for i in 0..self.num_reader_slots {
1155            let slot = self.reader_slot(i);
1156            if slot.is_stale(now) {
1157                slot.pid.store(0, Ordering::Release);
1158                cleaned += 1;
1159            }
1160        }
1161
1162        cleaned
1163    }
1164
1165    /// Advance epoch (called on recovery)
1166    pub fn advance_epoch(&self) -> u64 {
1167        self.header().current_epoch.fetch_add(1, Ordering::AcqRel) + 1
1168    }
1169}
1170
1171impl Drop for ConcurrentMvcc {
1172    fn drop(&mut self) {
1173        // Release any reader slot we hold
1174        if let Some(slot_idx) = *self.our_slot.read() {
1175            self.unregister_reader(slot_idx);
1176        }
1177
1178        // Release writer lock if we hold it
1179        self.release_writer();
1180    }
1181}
1182
1183// =============================================================================
1184// Writer Guard (RAII)
1185// =============================================================================
1186
1187/// Guard that releases writer lock on drop
1188pub struct WriterGuard<'a> {
1189    mvcc: &'a ConcurrentMvcc,
1190}
1191
1192impl<'a> Drop for WriterGuard<'a> {
1193    fn drop(&mut self) {
1194        self.mvcc.release_writer();
1195    }
1196}
1197
1198// =============================================================================
1199// Utility Functions
1200// =============================================================================
1201
1202/// Get current time in microseconds since epoch
1203#[inline]
1204fn current_time_us() -> u64 {
1205    SystemTime::now()
1206        .duration_since(UNIX_EPOCH)
1207        .unwrap()
1208        .as_micros() as u64
1209}
1210
1211/// Check if a process exists
1212#[cfg(unix)]
1213fn process_exists(pid: u32) -> bool {
1214    // kill(pid, 0) checks existence without sending signal
1215    let result = unsafe { libc::kill(pid as libc::pid_t, 0) };
1216    if result == 0 {
1217        true
1218    } else {
1219        // ESRCH = no such process
1220        std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
1221    }
1222}
1223
1224#[cfg(windows)]
1225fn process_exists(pid: u32) -> bool {
1226    use windows_sys::Win32::Foundation::CloseHandle;
1227    use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION};
1228
1229    unsafe {
1230        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
1231        if handle == 0 {
1232            false
1233        } else {
1234            CloseHandle(handle);
1235            true
1236        }
1237    }
1238}
1239
1240#[cfg(not(any(unix, windows)))]
1241fn process_exists(_pid: u32) -> bool {
1242    true // Assume exists on unknown platforms
1243}
1244
1245// =============================================================================
1246// Tests
1247// =============================================================================
1248
1249#[cfg(test)]
1250mod tests {
1251    use super::*;
1252    use std::sync::Arc;
1253    use std::thread;
1254
1255    #[test]
1256    fn test_struct_sizes() {
1257        eprintln!("MvccHeader size: {}", std::mem::size_of::<MvccHeader>());
1258        eprintln!("MvccHeader align: {}", std::mem::align_of::<MvccHeader>());
1259        eprintln!("ReaderSlot size: {}", std::mem::size_of::<ReaderSlot>());
1260        eprintln!("ReaderSlot align: {}", std::mem::align_of::<ReaderSlot>());
1261        eprintln!("HEADER_SIZE constant: {}", HEADER_SIZE);
1262        eprintln!("READER_SLOT_SIZE constant: {}", READER_SLOT_SIZE);
1263        eprintln!("METADATA_SIZE constant: {}", METADATA_SIZE);
1264
1265        assert_eq!(
1266            std::mem::size_of::<MvccHeader>(),
1267            HEADER_SIZE,
1268            "MvccHeader size mismatch! Actual: {}, Expected: {}",
1269            std::mem::size_of::<MvccHeader>(),
1270            HEADER_SIZE
1271        );
1272        assert_eq!(
1273            std::mem::size_of::<ReaderSlot>(),
1274            READER_SLOT_SIZE,
1275            "ReaderSlot size mismatch! Actual: {}, Expected: {}",
1276            std::mem::size_of::<ReaderSlot>(),
1277            READER_SLOT_SIZE
1278        );
1279    }
1280
1281    #[test]
1282    fn test_hlc_timestamp_ordering() {
1283        let ts = AtomicU64::new(0);
1284
1285        let t1 = HlcTimestamp::allocate_next(&ts);
1286        let t2 = HlcTimestamp::allocate_next(&ts);
1287        let t3 = HlcTimestamp::allocate_next(&ts);
1288
1289        assert!(t1.raw() < t2.raw());
1290        assert!(t2.raw() < t3.raw());
1291    }
1292
1293    #[test]
1294    fn test_hlc_timestamp_concurrent() {
1295        let ts = Arc::new(AtomicU64::new(0));
1296        let mut handles = vec![];
1297
1298        for _ in 0..8 {
1299            let ts = ts.clone();
1300            handles.push(thread::spawn(move || {
1301                let mut timestamps = vec![];
1302                for _ in 0..1000 {
1303                    timestamps.push(HlcTimestamp::allocate_next(&ts).raw());
1304                }
1305                timestamps
1306            }));
1307        }
1308
1309        let mut all_ts: Vec<u64> = handles
1310            .into_iter()
1311            .flat_map(|h| h.join().unwrap())
1312            .collect();
1313
1314        // All timestamps should be unique
1315        all_ts.sort();
1316        let len_before = all_ts.len();
1317        all_ts.dedup();
1318        assert_eq!(all_ts.len(), len_before, "Duplicate timestamps found!");
1319    }
1320
1321    #[test]
1322    fn test_version_chain_read_at() {
1323        let mut chain = VersionChain::new();
1324
1325        // Add committed versions via the public API
1326        chain.add_uncommitted(Some(b"v80".to_vec()), 3, 1);
1327        chain.commit(3, 80);
1328        chain.add_uncommitted(Some(b"v90".to_vec()), 2, 1);
1329        chain.commit(2, 90);
1330        chain.add_uncommitted(Some(b"v100".to_vec()), 1, 1);
1331        chain.commit(1, 100);
1332
1333        // Read at different snapshots
1334        let v = chain.read_at(105, None).unwrap();
1335        assert_eq!(v.value, Some(b"v100".to_vec()));
1336
1337        let v = chain.read_at(95, None).unwrap();
1338        assert_eq!(v.value, Some(b"v90".to_vec()));
1339
1340        let v = chain.read_at(85, None).unwrap();
1341        assert_eq!(v.value, Some(b"v80".to_vec()));
1342
1343        // Read before all versions
1344        assert!(chain.read_at(75, None).is_none());
1345    }
1346
1347    #[test]
1348    fn test_version_chain_gc() {
1349        let mut chain = VersionChain::new();
1350
1351        // Add 10 committed versions with varying epochs via public API
1352        for i in (0..10u64).rev() {
1353            chain.add_uncommitted(
1354                Some(format!("v{}", 100 - i * 5).into_bytes()),
1355                i,
1356                (10 - i) as u32,
1357            );
1358            chain.commit(i, 100 - i * 5);
1359        }
1360
1361        assert_eq!(chain.len(), 10);
1362
1363        // GC with min_epoch = 7, min_snapshot = 75
1364        // Keeps versions where epoch >= 7 OR commit_ts >= 75
1365        // epoch >= 7: versions at ts=100, 95, 90, 85 (epochs 10, 9, 8, 7)
1366        // commit_ts >= 75: versions at ts=100, 95, 90, 85, 80, 75
1367        // But we always keep at least the newest one
1368        let reclaimed = chain.gc(7, 75);
1369
1370        // Versions that should be reclaimed:
1371        // ts=70 (epoch=4), ts=65 (epoch=3), ts=60 (epoch=2), ts=55 (epoch=1)
1372        // That's 4 versions reclaimed
1373        assert!(
1374            reclaimed > 0,
1375            "Should have reclaimed some versions, got {}",
1376            reclaimed
1377        );
1378        assert!(chain.len() >= 1, "Should keep at least one version");
1379    }
1380
1381    #[test]
1382    fn test_version_store_basic() {
1383        let store = VersionStore::new();
1384
1385        // Insert uncommitted
1386        store
1387            .insert_uncommitted(b"key1", Some(b"value1".to_vec()), 1, 1)
1388            .unwrap();
1389
1390        // Commit it
1391        assert!(store.commit(b"key1", 1, 100));
1392
1393        // Read at snapshot after commit
1394        let value = store.get(b"key1", 150, None);
1395        assert_eq!(value, Some(b"value1".to_vec()));
1396
1397        // Read at snapshot before commit
1398        let value = store.get(b"key1", 50, None);
1399        assert!(value.is_none());
1400    }
1401
1402    #[test]
1403    fn test_reader_slot_claim_release() {
1404        let slot = ReaderSlot::empty();
1405
1406        assert!(slot.is_free());
1407
1408        // Claim slot
1409        assert!(slot.try_claim(1234, 100, 1));
1410        assert!(!slot.is_free());
1411
1412        // Can't claim from different PID
1413        assert!(!slot.try_claim(5678, 200, 2));
1414
1415        // Can re-claim from same PID
1416        assert!(slot.try_claim(1234, 300, 3));
1417
1418        // Release
1419        slot.release(1234);
1420        assert!(slot.is_free());
1421    }
1422}