Skip to main content

sochdb_storage/
durable_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! This module wires together all storage components into a production-ready
21//! durable storage engine:
22//!
23//! - WAL (txn_wal.rs) for durability
24//! - Group Commit for throughput
25//! - MVCC for isolation
26//! - LSCS for columnar efficiency
27//!
28//! ## Architecture
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │                      DurableStorage                              │
33//! ├─────────────────────────────────────────────────────────────────┤
34//! │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
35//! │  │ MvccManager │    │ GroupCommit │───▶│ TxnWal (fsync)      │ │
36//! │  │             │    │             │    └─────────────────────┘ │
37//! │  │ ┌─────────┐ │    └─────────────┘                            │
38//! │  │ │Snapshots│ │                                                │
39//! │  │ └─────────┘ │    ┌─────────────────────────────────────────┐│
40//! │  │ ┌─────────┐ │    │              MemTable                    ││
41//! │  │ │ Txn Map │ │    │  (key → (value, txn_id, version))       ││
42//! │  │ └─────────┘ │    └─────────────────────────────────────────┘│
43//! │  └─────────────┘                                                │
44//! │                      ┌─────────────────────────────────────────┐│
45//! │                      │              LSCS (SST)                  ││
46//! │                      │  Immutable columnar segments             ││
47//! │                      └─────────────────────────────────────────┘│
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Concurrency
52//!
53//! - Writers: Serialize through WAL, use MVCC for conflict detection
54//! - Readers: Lock-free reads at snapshot timestamp
55//! - Commits: Batched through GroupCommit for throughput
56
57use std::collections::HashSet;
58use std::path::{Path, PathBuf};
59use std::sync::Arc;
60use std::sync::atomic::{AtomicU64, Ordering};
61
62use dashmap::DashMap;
63use smallvec::SmallVec;
64
65use crossbeam_skiplist::SkipMap;
66
67use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
68use crate::group_commit::EventDrivenGroupCommit;
69use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
70use sochdb_core::{Result, SochDBError};
71use sochdb_core::version_chain::{
72    BinarySearchChain, ChainEntry,
73    MvccVersionChain, MvccVersionChainMut, WriteConflictDetection,
74    VisibilityContext, TxnId, Timestamp,
75};
76
77// =============================================================================
78// SSI Bloom Filter - Fast Conflict Pre-Filtering
79// =============================================================================
80
81/// Space-efficient Bloom filter for SSI conflict detection
82///
83/// Used to quickly determine if two transactions MIGHT have conflicting keys.
84/// False positives are acceptable (leads to unnecessary exact checks),
85/// but false negatives are not allowed.
86///
87/// ## Configuration
88///
89/// For 1000 keys with 1% false positive rate:
90/// - m = ~9600 bits ≈ 1.2 KB per transaction
91/// - k = 7 hash functions
92///
93/// ## Lazy Initialization
94///
95/// The bit vector is lazily initialized on first insert to avoid
96/// allocation overhead for read-only transactions.
97#[derive(Clone, Debug)]
98pub struct SsiBloomFilter {
99    /// Bit vector (each u64 holds 64 bits) - lazily initialized
100    bits: Option<Vec<u64>>,
101    /// Expected capacity (used for lazy init sizing)
102    expected_capacity: usize,
103    /// Number of hash functions to use
104    num_hashes: u32,
105}
106
107impl SsiBloomFilter {
108    /// Optimal number of bits per item for 1% false positive rate
109    /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
110    const BITS_PER_ITEM: f64 = 9.6;
111
112    /// Optimal number of hash functions for 1% false positive rate
113    /// k = (m/n) × ln(2) ≈ 7
114    const DEFAULT_NUM_HASHES: u32 = 7;
115
116    /// Minimum capacity to avoid tiny filters
117    const MIN_CAPACITY: usize = 64;
118
119    /// Create a new bloom filter for expected item count (lazy allocation)
120    ///
121    /// Configured for ~1% false positive rate.
122    /// The bit vector is not allocated until first insert.
123    #[inline]
124    pub fn new(expected_items: usize) -> Self {
125        Self {
126            bits: None,
127            expected_capacity: expected_items.max(Self::MIN_CAPACITY),
128            num_hashes: Self::DEFAULT_NUM_HASHES,
129        }
130    }
131
132    /// Create with specific capacity in words (for memory-constrained scenarios)
133    pub fn with_word_capacity(words: usize) -> Self {
134        Self {
135            bits: None,
136            expected_capacity: words.max(1) * 64 / 10, // Approx items from words
137            num_hashes: Self::DEFAULT_NUM_HASHES,
138        }
139    }
140
141    /// Ensure bits are allocated (lazy initialization)
142    #[inline]
143    fn ensure_allocated(&mut self) {
144        if self.bits.is_none() {
145            let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
146            let num_words = num_bits.div_ceil(64);
147            self.bits = Some(vec![0u64; num_words]);
148        }
149    }
150
151    /// Add a key to the filter - O(k) where k = num_hashes
152    #[inline]
153    pub fn insert(&mut self, key: &[u8]) {
154        self.ensure_allocated();
155        let bits = self.bits.as_mut().unwrap();
156        let num_bits = bits.len() * 64;
157        if num_bits == 0 {
158            return;
159        }
160
161        // Use two hash functions to simulate k hash functions
162        // h(i) = h1 + i * h2 (double hashing technique)
163        let h1 = Self::hash1(key);
164        let h2 = Self::hash2(key);
165
166        for i in 0..self.num_hashes {
167            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
168            let bit_idx = (h as usize) % num_bits;
169            let word_idx = bit_idx / 64;
170            let bit_pos = bit_idx % 64;
171            bits[word_idx] |= 1 << bit_pos;
172        }
173    }
174
175    /// Check if a key might be present - O(k)
176    ///
177    /// Returns:
178    /// - false: Key is definitely NOT in the set (or filter not initialized)
179    /// - true: Key MIGHT be in the set (needs exact check)
180    #[inline]
181    pub fn may_contain(&self, key: &[u8]) -> bool {
182        let bits = match &self.bits {
183            Some(b) => b,
184            None => return false, // Uninitialized = empty
185        };
186        let num_bits = bits.len() * 64;
187        if num_bits == 0 {
188            return false;
189        }
190
191        let h1 = Self::hash1(key);
192        let h2 = Self::hash2(key);
193
194        for i in 0..self.num_hashes {
195            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
196            let bit_idx = (h as usize) % num_bits;
197            let word_idx = bit_idx / 64;
198            let bit_pos = bit_idx % 64;
199            if bits[word_idx] & (1 << bit_pos) == 0 {
200                return false; // Definitely not present
201            }
202        }
203        true // Might be present
204    }
205
206    /// Check if this filter might intersect with another
207    ///
208    /// Fast O(m/64) check using bitwise AND of all words.
209    /// If no bits are shared, sets are definitely disjoint.
210    #[inline]
211    pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
212        let (self_bits, other_bits) = match (&self.bits, &other.bits) {
213            (Some(s), Some(o)) => (s, o),
214            _ => return false, // Either uninitialized = no intersection
215        };
216        let min_len = self_bits.len().min(other_bits.len());
217        for i in 0..min_len {
218            if self_bits[i] & other_bits[i] != 0 {
219                return true; // Might intersect
220            }
221        }
222        false // Definitely disjoint
223    }
224
225    /// First hash function (using built-in hasher)
226    #[inline]
227    fn hash1(key: &[u8]) -> u64 {
228        use std::collections::hash_map::DefaultHasher;
229        use std::hash::{Hash, Hasher};
230        let mut hasher = DefaultHasher::new();
231        key.hash(&mut hasher);
232        hasher.finish()
233    }
234
235    /// Second hash function (using twox-hash for independence)
236    #[inline]
237    fn hash2(key: &[u8]) -> u64 {
238        twox_hash::xxh3::hash64(key)
239    }
240
241    /// Get the memory size in bytes
242    pub fn size_bytes(&self) -> usize {
243        self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
244    }
245
246    /// Check if the filter is empty
247    pub fn is_empty(&self) -> bool {
248        match &self.bits {
249            Some(bits) => bits.iter().all(|&w| w == 0),
250            None => true,
251        }
252    }
253}
254
255/// Type alias for inline key storage - keys up to 32 bytes stored on stack
256/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
257pub type InlineKey = SmallVec<[u8; 32]>;
258
259/// Version of a key-value pair
260#[derive(Debug, Clone)]
261pub struct Version {
262    /// The value (None = tombstone)
263    pub value: Option<Vec<u8>>,
264    /// Transaction that created this version
265    pub txn_id: u64,
266    /// Commit timestamp (0 = uncommitted)
267    pub commit_ts: u64,
268}
269
270// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
271impl ChainEntry for Version {
272    #[inline] fn commit_ts(&self) -> u64 { self.commit_ts }
273    #[inline] fn txn_id(&self) -> u64 { self.txn_id }
274    #[inline] fn set_commit_ts(&mut self, ts: u64) { self.commit_ts = ts; }
275}
276
277// ============================================================================
278// Optimized VersionChain with Binary Search (Task 1: mm.md)
279// ============================================================================
280
281/// Multi-version data for a single key with O(log v) read complexity
282///
283/// ## Optimization: Binary Search with Sorted Commit Ordering
284///
285/// Separates committed versions (sorted descending by commit_ts) from 
286/// uncommitted version (single optional slot per transaction).
287///
288/// **Before:** O(v) linear scan + O(v) max computation = O(v)
289/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
290///
291/// For v=10 versions: 3.3x speedup
292/// For v=100 versions: 7x speedup
293///
294/// ## Rec 11: Consolidated
295///
296/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
297/// eliminating duplication with `mvcc_concurrent::VersionChain`.
298#[derive(Debug, Default)]
299pub struct VersionChain {
300    /// Consolidated binary-search chain (Rec 11)
301    inner: BinarySearchChain<Version>,
302}
303
304impl VersionChain {
305    /// Create a new empty version chain
306    #[inline]
307    pub fn new() -> Self {
308        Self { inner: BinarySearchChain::new() }
309    }
310
311    /// Add a new uncommitted version
312    /// If there's already an uncommitted version from this txn, update it in place
313    /// 
314    /// O(1) - just updates the uncommitted slot
315    #[inline]
316    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
317        match self.inner.uncommitted_mut() {
318            Some(v) if v.txn_id == txn_id => {
319                // Update in place - O(1)
320                v.value = value;
321            }
322            _ => {
323                // New or different txn — set the slot
324                self.inner.set_uncommitted(Version {
325                    value,
326                    txn_id,
327                    commit_ts: 0,
328                });
329            }
330        }
331    }
332
333    /// Commit a version - moves from uncommitted slot to sorted committed list
334    /// 
335    /// O(log v) - inserts into sorted position using binary search
336    #[inline]
337    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
338        self.inner.commit(txn_id, commit_ts)
339    }
340
341    /// Abort a version (remove uncommitted version for txn)
342    /// 
343    /// O(1) - just clears the uncommitted slot if it matches
344    #[inline]
345    pub fn abort(&mut self, txn_id: u64) {
346        self.inner.abort(txn_id);
347    }
348
349    /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
350    ///
351    /// ## Complexity: O(1) + O(log v) = O(log v)
352    #[inline]
353    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
354        self.inner.read_at(snapshot_ts, current_txn_id)
355    }
356
357    /// Check if there's an uncommitted version by another transaction
358    /// 
359    /// O(1) - just checks the uncommitted slot
360    #[inline]
361    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
362        self.inner.has_write_conflict(my_txn_id)
363    }
364
365    /// Garbage collect old versions
366    pub fn gc(&mut self, min_active_ts: u64) {
367        self.inner.gc_by_ts(min_active_ts);
368    }
369
370    /// Get total version count (committed + uncommitted)
371    #[inline]
372    pub fn version_count(&self) -> usize {
373        self.inner.version_count()
374    }
375
376    // Legacy compatibility: get versions vec (for tests)
377    #[cfg(test)]
378    pub fn versions(&self) -> Vec<Version> {
379        let mut result = self.inner.committed_versions().to_vec();
380        if let Some(v) = self.inner.uncommitted() {
381            result.push(v.clone());
382        }
383        result
384    }
385}
386
387// =============================================================================
388// Rec 6: Unified Version Chain Trait Implementations
389// =============================================================================
390
391impl MvccVersionChain for VersionChain {
392    type Value = Option<Vec<u8>>;
393
394    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
395        // Delegate to BinarySearchChain, then project to value field
396        self.inner.read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
397            .map(|v| &v.value)
398    }
399
400    fn get_latest(&self) -> Option<&Self::Value> {
401        self.inner.latest().map(|v| &v.value)
402    }
403
404    fn version_count(&self) -> usize {
405        self.inner.version_count()
406    }
407}
408
409impl MvccVersionChainMut for VersionChain {
410    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
411        self.add_uncommitted(value, txn_id);
412    }
413
414    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
415        self.inner.commit(txn_id, commit_ts)
416    }
417
418    fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
419        // Insert a tombstone (None value) as uncommitted
420        self.add_uncommitted(None, txn_id);
421        true
422    }
423
424    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
425        let before = self.inner.committed_count();
426        self.inner.gc_by_ts(min_visible_ts);
427        let removed = before - self.inner.committed_count();
428        (removed, removed * std::mem::size_of::<Version>())
429    }
430}
431
432impl WriteConflictDetection for VersionChain {
433    fn has_write_conflict(&self, txn_id: TxnId) -> bool {
434        self.has_write_conflict(txn_id)
435    }
436}
437
438// =============================================================================
439// Pre-sizing Constants to Avoid HashSet Resize Overhead
440// =============================================================================
441
442/// Default capacity for write_set HashSet
443/// Sized for typical OLTP transactions (10-50 keys)
444/// Avoids resize overhead that caused +11% regression
445const WRITE_SET_INITIAL_CAPACITY: usize = 32;
446
447/// Default capacity for read_set HashSet  
448/// Typically larger than write_set due to read-heavy patterns
449const READ_SET_INITIAL_CAPACITY: usize = 64;
450
451/// Transaction state for MVCC
452#[derive(Debug, Clone)]
453pub struct MvccTransaction {
454    /// Transaction ID
455    pub txn_id: u64,
456    /// Snapshot timestamp (reads see commits before this)
457    pub snapshot_ts: u64,
458    /// Keys written by this transaction - uses SmallVec for inline storage
459    /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
460    pub write_set: HashSet<InlineKey>,
461    /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
462    /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
463    pub read_set: HashSet<InlineKey>,
464    /// Bloom filter for write set - fast SSI pre-filtering
465    pub write_bloom: SsiBloomFilter,
466    /// Bloom filter for read set - fast SSI pre-filtering
467    pub read_bloom: SsiBloomFilter,
468    /// Transaction state
469    pub state: TxnState,
470    /// Transaction mode for SSI optimization (Recommendation 9)
471    /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
472    pub mode: TransactionMode,
473}
474
475impl MvccTransaction {
476    /// Create a new transaction with pre-sized collections
477    /// 
478    /// This avoids HashSet resize overhead during the transaction
479    /// which was causing +11% regression on write_set.insert().
480    #[inline]
481    pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
482        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
483    }
484    
485    /// Create a read-only transaction (SSI bypass - 2.6x faster)
486    ///
487    /// Read-only transactions skip all SSI tracking:
488    /// - No read_set allocation
489    /// - No read_bloom allocation  
490    /// - No commit validation
491    ///
492    /// ## Performance
493    /// 
494    /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
495    #[inline]
496    pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
497        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
498    }
499    
500    /// Create a write-only transaction (partial SSI bypass)
501    ///
502    /// Write-only transactions skip read tracking:
503    /// - No read_set tracking
504    /// - No read_bloom inserts
505    /// - Still needs write_set for commit
506    #[inline]
507    pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
508        Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
509    }
510    
511    /// Create transaction with specific mode
512    #[inline]
513    pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
514        // Optimize allocation based on mode
515        let (write_capacity, read_capacity) = match mode {
516            TransactionMode::ReadOnly => (0, 0),  // No tracking needed
517            TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
518            TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
519        };
520        Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
521    }
522
523    /// Create with custom capacities for expected workload
524    /// 
525    /// Use this when you know the transaction will write many keys
526    /// to avoid resize overhead entirely.
527    #[inline]
528    pub fn with_capacity(
529        txn_id: u64,
530        snapshot_ts: u64,
531        write_capacity: usize,
532        read_capacity: usize,
533        mode: TransactionMode,
534    ) -> Self {
535        Self {
536            txn_id,
537            snapshot_ts,
538            write_set: HashSet::with_capacity(write_capacity),
539            read_set: HashSet::with_capacity(read_capacity),
540            write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
541            read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
542            state: TxnState::Active,
543            mode,
544        }
545    }
546
547    /// Check if this is a read-only transaction
548    #[inline]
549    pub fn is_read_only(&self) -> bool {
550        self.write_set.is_empty()
551    }
552
553    /// Check if this is a single-key write transaction
554    #[inline]
555    pub fn is_single_key_write(&self) -> bool {
556        self.write_set.len() == 1 && self.read_set.len() <= 1
557    }
558}
559
560/// Transaction state
561#[derive(Debug, Clone, Copy, PartialEq, Eq)]
562pub enum TxnState {
563    Active,
564    Committed,
565    Aborted,
566}
567
568// =============================================================================
569// Transaction Mode for SSI Bypass (Recommendation 9)
570// =============================================================================
571
572/// Transaction mode for SSI optimization
573///
574/// By classifying transactions at begin time, we can skip expensive SSI
575/// tracking for the majority of transactions:
576///
577/// | Mode      | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
578/// |-----------|-------------------|--------------------|-----------------|
579/// | ReadOnly  | None             | None               | ~10 ns          |
580/// | WriteOnly | None             | Full               | ~30 ns          |
581/// | ReadWrite | Full             | Full               | ~50 ns          |
582///
583/// ## Performance Analysis
584///
585/// For read-only transactions (typically 90% of workload):
586/// ```text
587/// Current:  T_txn = T_begin + N × (T_read + T_record) + T_commit
588///                 = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
589///
590/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
591///                 = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
592///
593/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
594/// ```
595#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
596pub enum TransactionMode {
597    /// Read-only transaction - skips ALL SSI tracking
598    /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
599    /// Safe to skip read_set, read_bloom, and commit validation entirely
600    ReadOnly,
601    
602    /// Write-only transaction - skips read tracking
603    /// Cannot form incoming rw-edges (no reads from concurrent writers)
604    /// Only needs write_set and write_bloom tracking
605    WriteOnly,
606    
607    /// Full read-write transaction (default) - complete SSI tracking
608    /// May form both incoming and outgoing rw-edges
609    /// Requires full validation at commit time
610    #[default]
611    ReadWrite,
612}
613
614impl TransactionMode {
615    /// Check if this mode requires read tracking
616    #[inline]
617    pub fn tracks_reads(&self) -> bool {
618        matches!(self, TransactionMode::ReadWrite)
619    }
620    
621    /// Check if this mode requires write tracking
622    #[inline]
623    pub fn tracks_writes(&self) -> bool {
624        matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
625    }
626    
627    /// Check if commit needs SSI validation
628    #[inline]
629    pub fn needs_ssi_validation(&self) -> bool {
630        matches!(self, TransactionMode::ReadWrite)
631    }
632}
633
634/// SSI conflict edge type
635#[derive(Debug, Clone, Copy, PartialEq, Eq)]
636pub enum ConflictType {
637    /// Read-write conflict: T1 reads X, then T2 writes X
638    ReadWrite,
639    /// Write-read conflict: T1 writes X, then T2 reads X  
640    WriteRead,
641}
642
643/// SSI conflict edge for dangerous structure detection
644#[derive(Debug, Clone)]
645pub struct ConflictEdge {
646    /// Source transaction
647    pub from_txn: u64,
648    /// Target transaction
649    pub to_txn: u64,
650    /// Type of conflict
651    pub conflict_type: ConflictType,
652}
653
654/// MVCC Manager with SSI support
655///
656/// Uses DashMap for lock-free per-transaction access.
657/// Implements Serializable Snapshot Isolation (SSI) with
658/// dangerous structure detection for rw-antidependency cycles.
659#[allow(clippy::type_complexity)]
660pub struct MvccManager {
661    /// Active transactions (sharded for concurrent access)
662    active_txns: DashMap<u64, MvccTransaction>,
663    /// Current timestamp counter
664    ts_counter: AtomicU64,
665    /// Minimum active snapshot timestamp (for GC)
666    min_active_ts: AtomicU64,
667    /// Recently committed transactions for SSI validation
668    /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
669    /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
670    recent_commits: DashMap<
671        u64,
672        (
673            u64,
674            SsiBloomFilter,
675            SsiBloomFilter,
676            HashSet<InlineKey>,
677            HashSet<InlineKey>,
678        ),
679    >,
680    /// Max recent commits to track
681    max_recent_commits: usize,
682}
683
684impl Default for MvccManager {
685    fn default() -> Self {
686        Self::new()
687    }
688}
689
690impl MvccManager {
691    pub fn new() -> Self {
692        Self {
693            active_txns: DashMap::new(),
694            ts_counter: AtomicU64::new(1),
695            min_active_ts: AtomicU64::new(0),
696            recent_commits: DashMap::new(),
697            max_recent_commits: 1000, // Track last 1000 commits for SSI
698        }
699    }
700
701    /// Begin a new transaction with snapshot isolation
702    /// 
703    /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
704    pub fn begin(&self, txn_id: u64) -> MvccTransaction {
705        self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
706    }
707    
708    /// Begin a read-only transaction (SSI bypass - 2.6x faster)
709    ///
710    /// Read-only transactions skip all SSI tracking, reducing
711    /// per-read overhead from ~82ns to ~32ns.
712    ///
713    /// ## Safety
714    ///
715    /// Caller must ensure no writes are performed. Attempting to
716    /// write in a read-only transaction will still succeed but
717    /// won't be tracked for SSI validation.
718    #[inline]
719    pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
720        self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
721    }
722    
723    /// Begin a write-only transaction (partial SSI bypass)
724    ///
725    /// Write-only transactions skip read tracking, reducing overhead
726    /// for insert-heavy workloads.
727    #[inline]
728    pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
729        self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
730    }
731    
732    /// Begin a transaction with specific mode
733    ///
734    /// This is the core transaction creation method that all other
735    /// begin_* methods delegate to.
736    pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
737        let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
738
739        // Create transaction with mode-optimized allocations
740        let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
741
742        self.active_txns.insert(txn_id, txn.clone());
743        self.update_min_active_ts();
744
745        txn
746    }
747
748    /// Get transaction if active (clones - use get_snapshot_ts for hot path)
749    pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
750        self.active_txns.get(&txn_id).map(|t| t.clone())
751    }
752
753    /// Fast path: get just the snapshot timestamp without cloning
754    /// This is the hot path for reads - avoids cloning bloom filters
755    #[inline]
756    pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
757        self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
758    }
759
760    /// Record a read (for SSI) - uses inline key storage + bloom filter
761    ///
762    /// ## SSI Bypass (Recommendation 9)
763    ///
764    /// For ReadOnly mode transactions, this is a no-op (instant return).
765    /// For WriteOnly mode transactions, this is a no-op.
766    /// Only ReadWrite mode transactions track reads for SSI.
767    ///
768    /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
769    #[inline]
770    pub fn record_read(&self, txn_id: u64, key: &[u8]) {
771        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
772            // SSI Bypass: Skip tracking for read-only and write-only modes
773            if !txn.mode.tracks_reads() {
774                return;
775            }
776            
777            // Only track reads if within reasonable bounds
778            if txn.read_set.len() < 10000 {
779                txn.read_set.insert(SmallVec::from_slice(key));
780                txn.read_bloom.insert(key);
781            }
782        }
783    }
784
785    /// Record a write - uses inline key storage + bloom filter
786    ///
787    /// Note: Even ReadOnly transactions can record writes (mode is a hint).
788    /// The mode only affects SSI tracking, not write capability.
789    pub fn record_write(&self, txn_id: u64, key: &[u8]) {
790        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
791            txn.write_set.insert(SmallVec::from_slice(key));
792            txn.write_bloom.insert(key);
793        }
794    }
795
796    /// Allocate commit timestamp
797    pub fn alloc_commit_ts(&self) -> u64 {
798        self.ts_counter.fetch_add(1, Ordering::SeqCst)
799    }
800
801    /// Commit transaction with SSI validation
802    /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
803    /// Returns None if SSI validation fails (dangerous structure detected)
804    ///
805    /// ## SSI Bypass (Recommendation 9)
806    ///
807    /// For ReadOnly mode: Skip validation entirely (~10ns commit)
808    /// For WriteOnly mode: Skip read-based validation (~30ns commit)
809    /// For ReadWrite mode: Full validation (~50ns commit)
810    pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
811        // Get transaction before removing
812        let txn = self.active_txns.get(&txn_id)?.clone();
813
814        // SSI Bypass: Skip validation for ReadOnly transactions
815        // ReadOnly can never form rw-antidependency cycles
816        if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
817            // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
818            // For ReadWrite: check SSI validation result
819            if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
820                // Abort on SSI violation
821                self.active_txns.remove(&txn_id);
822                self.update_min_active_ts();
823                return None;
824            }
825        }
826
827        let commit_ts = self.alloc_commit_ts();
828
829        // Extract write_set and remove transaction - takes ownership
830        let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
831
832        // OPTIMIZATION: Only track ReadWrite transactions for SSI
833        // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
834        let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite 
835            && !removed_txn.read_set.is_empty() 
836            && !removed_txn.write_set.is_empty();
837        
838        if needs_ssi_tracking {
839            // Need to clone write_set since we return it AND track it
840            let write_set_for_return = removed_txn.write_set.clone();
841            
842            self.track_commit_owned(
843                txn_id,
844                commit_ts,
845                removed_txn.read_bloom,
846                removed_txn.write_bloom,
847                removed_txn.read_set,
848                removed_txn.write_set,
849            );
850
851            self.update_min_active_ts();
852            Some((commit_ts, write_set_for_return))
853        } else {
854            // Fast path: no SSI tracking needed, avoid clone entirely
855            self.update_min_active_ts();
856            Some((commit_ts, removed_txn.write_set))
857        }
858    }
859
860    /// Validate SSI constraints for a committing transaction
861    ///
862    /// ## Transaction Classification (Task 3: Optimistic MVCC)
863    ///
864    /// Transactions are classified and routed through appropriate fast paths:
865    ///
866    /// | Class      | Criteria                      | Validation Cost |
867    /// |------------|-------------------------------|-----------------|
868    /// | ReadOnly   | write_set.is_empty()          | 0 ns           |
869    /// | SingleKey  | write_set.len() == 1          | 0 ns           |
870    /// | Disjoint   | bloom filters don't intersect | ~10 ns         |
871    /// | General    | full SSI check                | ~50 ns         |
872    ///
873    /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
874    /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
875    ///
876    /// Detects "dangerous structures" - rw-antidependency cycles:
877    /// - T1 reads X (snapshot sees old value)
878    /// - T2 writes X (concurrent write)  
879    /// - T2 reads Y (snapshot sees old value)
880    /// - T1 writes Y (concurrent write)
881    ///
882    /// If T1 → rw → T2 → rw → T1 exists, abort T1
883    #[inline]
884    fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
885        // =================================================================
886        // Fast Path 1: Read-only transactions (0 ns)
887        // =================================================================
888        // Read-only transactions can never form rw-antidependency cycles
889        // because they have no writes to create outgoing rw-edges
890        if txn.write_set.is_empty() {
891            return true;
892        }
893
894        // =================================================================
895        // Fast Path 2: No recent commits to check (0 ns)
896        // =================================================================
897        if self.recent_commits.is_empty() {
898            return true;
899        }
900
901        // =================================================================
902        // Fast Path 3: Single-key write transactions (0 ns)
903        // =================================================================
904        // A single-key write transaction cannot form a dangerous cycle:
905        // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
906        //   AND T2 to read what T1 wrote
907        // - With only one key in write_set, the same key would need to be
908        //   in both read_set AND write_set of both transactions
909        // - This is already prevented by our conflict detection (write-write)
910        if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
911            return true;
912        }
913
914        let my_snapshot = txn.snapshot_ts;
915
916        // =================================================================
917        // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
918        // =================================================================
919        // Pre-filter using bloom filters: if our write_bloom doesn't intersect
920        // with any concurrent transaction's read_bloom AND vice versa,
921        // there can be no rw-antidependency
922        let mut any_may_intersect = false;
923        for entry in self.recent_commits.iter() {
924            let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
925            
926            // Only check concurrent transactions
927            if *other_commit_ts <= my_snapshot {
928                continue;
929            }
930
931            // Check bloom filter intersection (O(m/64) per filter)
932            // If our writes may intersect their reads OR their writes may intersect our reads
933            if txn.write_bloom.may_intersect(other_read_bloom) 
934                || other_write_bloom.may_intersect(&txn.read_bloom) 
935            {
936                any_may_intersect = true;
937                break;
938            }
939        }
940
941        // No bloom intersection means definitely disjoint - no SSI conflict possible
942        if !any_may_intersect {
943            return true;
944        }
945
946        // =================================================================
947        // Full SSI Validation (~50 ns)
948        // =================================================================
949        // Check for rw-conflicts with recently committed transactions
950        // An rw-conflict exists if:
951        // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
952        // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
953
954        let mut in_conflict_with: Vec<u64> = Vec::new();
955        let mut out_conflict_to: Vec<u64> = Vec::new();
956
957        for entry in self.recent_commits.iter() {
958            let (
959                other_txn_id,
960                (
961                    other_commit_ts,
962                    _other_read_bloom,
963                    other_write_bloom,
964                    other_read_set,
965                    other_write_set,
966                ),
967            ) = entry.pair();
968
969            // Only consider transactions that committed after our snapshot started
970            // (concurrent transactions)
971            if *other_commit_ts <= my_snapshot {
972                continue;
973            }
974
975            // Check: other wrote → we read (other →rw→ me)
976            // T_other wrote a key that T_me read (rw-dependency inbound)
977            //
978            // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
979            // Only do expensive HashSet intersection if bloom says "maybe conflict"
980            let mut has_in_conflict = false;
981            for key in txn.read_set.iter() {
982                if other_write_bloom.may_contain(key) {
983                    // Bloom says maybe - do exact check
984                    if other_write_set.contains(key) {
985                        has_in_conflict = true;
986                        break;
987                    }
988                }
989            }
990            if has_in_conflict {
991                in_conflict_with.push(*other_txn_id);
992            }
993
994            // Check: we wrote → other read (me →rw→ other)
995            // T_me wrote a key that T_other read (rw-dependency outbound)
996            //
997            // Bloom-accelerated: Use our write_bloom against their read_set
998            let mut has_out_conflict = false;
999            for key in other_read_set.iter() {
1000                if txn.write_bloom.may_contain(key) {
1001                    // Bloom says maybe - do exact check
1002                    if txn.write_set.contains(key) {
1003                        has_out_conflict = true;
1004                        break;
1005                    }
1006                }
1007            }
1008            if has_out_conflict {
1009                out_conflict_to.push(*other_txn_id);
1010            }
1011        }
1012
1013        // Dangerous structure: we have both incoming AND outgoing rw-edges
1014        // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1015        //
1016        // Conservative check: if both exist, abort
1017        // A more precise check would verify the cycle path, but this is safe
1018        if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1019            return false; // SSI violation - abort
1020        }
1021
1022        true
1023    }
1024
1025    /// Track a committed transaction for future SSI validation
1026    ///
1027    /// Only tracks transactions that have both reads AND writes, since SSI
1028    /// only detects rw-antidependency cycles. Pure read or pure write
1029    /// transactions can't form cycles.
1030    ///
1031    /// ## Optimization: Zero-Copy Transfer
1032    ///
1033    /// Takes ownership of sets instead of cloning to avoid the +15% commit
1034    /// phase regression. The caller should use mem::take() to transfer ownership.
1035    fn track_commit_owned(
1036        &self,
1037        txn_id: u64,
1038        commit_ts: u64,
1039        read_bloom: SsiBloomFilter,
1040        write_bloom: SsiBloomFilter,
1041        read_set: HashSet<InlineKey>,
1042        write_set: HashSet<InlineKey>,
1043    ) {
1044        // Optimization: Only track mixed read-write transactions
1045        // Pure reads can't create outgoing rw-edges
1046        // Pure writes can't create incoming rw-edges
1047        if read_set.is_empty() || write_set.is_empty() {
1048            return; // Skip tracking - can't form SSI cycle
1049        }
1050
1051        // Add to recent commits with bloom filters for fast SSI pre-filtering
1052        // No cloning needed - we take ownership
1053        self.recent_commits.insert(
1054            txn_id,
1055            (
1056                commit_ts,
1057                read_bloom,
1058                write_bloom,
1059                read_set,
1060                write_set,
1061            ),
1062        );
1063
1064        // Lazy pruning: only prune when we're significantly over capacity
1065        // Avoids pruning overhead on every commit
1066        if self.recent_commits.len() > self.max_recent_commits * 2 {
1067            // Remove entries with lowest commit_ts
1068            let min_active = self.min_active_ts.load(Ordering::Relaxed);
1069            self.recent_commits
1070                .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1071        }
1072    }
1073
1074    /// Legacy track_commit that clones - kept for compatibility
1075    #[allow(dead_code)]
1076    fn track_commit(
1077        &self,
1078        txn_id: u64,
1079        commit_ts: u64,
1080        read_bloom: SsiBloomFilter,
1081        write_bloom: SsiBloomFilter,
1082        read_set: &HashSet<InlineKey>,
1083        write_set: &HashSet<InlineKey>,
1084    ) {
1085        if read_set.is_empty() || write_set.is_empty() {
1086            return;
1087        }
1088        self.recent_commits.insert(
1089            txn_id,
1090            (
1091                commit_ts,
1092                read_bloom,
1093                write_bloom,
1094                read_set.clone(),
1095                write_set.clone(),
1096            ),
1097        );
1098    }
1099
1100    /// Abort transaction
1101    pub fn abort(&self, txn_id: u64) {
1102        self.active_txns.remove(&txn_id);
1103        self.update_min_active_ts();
1104    }
1105
1106    /// Get minimum active snapshot timestamp
1107    pub fn min_active_snapshot(&self) -> u64 {
1108        self.min_active_ts.load(Ordering::SeqCst)
1109    }
1110
1111    /// Get count of active transactions
1112    pub fn active_transaction_count(&self) -> usize {
1113        self.active_txns.len()
1114    }
1115
1116    fn update_min_active_ts(&self) {
1117        let min = self
1118            .active_txns
1119            .iter()
1120            .map(|entry| entry.value().snapshot_ts)
1121            .min()
1122            .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1123        self.min_active_ts.store(min, Ordering::SeqCst);
1124    }
1125}
1126
1127/// Epoch-based dirty list for O(expired) GC instead of O(n)
1128///
1129/// Instead of scanning ALL version chains, we track which keys have versions
1130/// created in each epoch. GC only needs to visit keys from old epochs.
1131struct EpochDirtyList {
1132    /// Ring buffer of epoch -> dirty keys
1133    /// Index = epoch % EPOCH_RING_SIZE
1134    epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1135    /// Current epoch
1136    current_epoch: AtomicU64,
1137}
1138
1139const EPOCH_RING_SIZE: usize = 4;
1140
1141impl EpochDirtyList {
1142    fn new() -> Self {
1143        Self {
1144            epochs: [
1145                parking_lot::Mutex::new(Vec::new()),
1146                parking_lot::Mutex::new(Vec::new()),
1147                parking_lot::Mutex::new(Vec::new()),
1148                parking_lot::Mutex::new(Vec::new()),
1149            ],
1150            current_epoch: AtomicU64::new(0),
1151        }
1152    }
1153
1154    /// Record a version created in the current epoch
1155    #[inline]
1156    fn record_version(&self, key: Vec<u8>) {
1157        let epoch = self.current_epoch.load(Ordering::Relaxed);
1158        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1159        self.epochs[idx].lock().push(key);
1160    }
1161
1162    /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1163    ///
1164    /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1165    /// For 100 writes: ~100x fewer mutex operations.
1166    #[inline]
1167    fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1168        let epoch = self.current_epoch.load(Ordering::Relaxed);
1169        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1170        let mut guard = self.epochs[idx].lock();
1171        guard.extend(keys);
1172    }
1173
1174    /// Advance to next epoch, returning old epoch's dirty keys
1175    fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1176        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1177        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1178
1179        // Drain the old epoch's dirty list
1180        let mut guard = self.epochs[old_idx].lock();
1181        let keys = std::mem::take(&mut *guard);
1182        (old_epoch, keys)
1183    }
1184
1185    /// Get current epoch
1186    #[allow(dead_code)]
1187    fn current(&self) -> u64 {
1188        self.current_epoch.load(Ordering::Relaxed)
1189    }
1190}
1191
1192// ============================================================================
1193// Streaming Scan Iterator
1194// ============================================================================
1195
1196/// Streaming iterator for range scans
1197/// 
1198/// Yields results one at a time without materializing the full result set.
1199/// This enables processing of very large result sets with O(1) memory per
1200/// iteration instead of O(N) for the entire result set.
1201struct ScanRangeIterator<'a> {
1202    memtable: &'a MvccMemTable,
1203    start: Vec<u8>,
1204    end: Vec<u8>,
1205    snapshot_ts: u64,
1206    current_txn_id: Option<u64>,
1207    use_ordered: bool,
1208    // We use Option to defer initialization
1209    ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1210    unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1211    initialized: bool,
1212}
1213
1214impl<'a> Iterator for ScanRangeIterator<'a> {
1215    type Item = (Vec<u8>, Vec<u8>);
1216    
1217    fn next(&mut self) -> Option<Self::Item> {
1218        // Lazy initialization on first call
1219        if !self.initialized {
1220            self.initialized = true;
1221            
1222            if self.use_ordered {
1223                // Try deferred index first (after compaction, it uses a SkipMap internally)
1224                if let Some(ref def_idx) = self.memtable.deferred_index {
1225                    let start = self.start.clone();
1226                    let end = self.end.clone();
1227                    let snapshot_ts = self.snapshot_ts;
1228                    let current_txn_id = self.current_txn_id;
1229                    let data = &self.memtable.data;
1230                    
1231                    // Collect keys from deferred index (already sorted after compact)
1232                    let keys: Vec<Vec<u8>> = if end.is_empty() {
1233                        def_idx.range_from(&start).collect()
1234                    } else {
1235                        def_idx.range(&start, &end).collect()
1236                    };
1237                    
1238                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1239                        keys.into_iter()
1240                            .filter_map(move |key| {
1241                                if let Some(chain) = data.get(&key)
1242                                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1243                                    && let Some(value) = &v.value
1244                                {
1245                                    Some((key, value.clone()))
1246                                } else {
1247                                    None
1248                                }
1249                            })
1250                    );
1251                    self.ordered_iter = Some(iter);
1252                } else if let Some(ref idx) = self.memtable.ordered_index {
1253                    let start = self.start.clone();
1254                    let end = self.end.clone();
1255                    let snapshot_ts = self.snapshot_ts;
1256                    let current_txn_id = self.current_txn_id;
1257                    let data = &self.memtable.data;
1258                    
1259                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
1260                        Box::new(
1261                            idx.range(start..)
1262                                .filter_map(move |entry| {
1263                                    let key = entry.key();
1264                                    if let Some(chain) = data.get(key)
1265                                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1266                                        && let Some(value) = &v.value
1267                                    {
1268                                        Some((key.clone(), value.clone()))
1269                                    } else {
1270                                        None
1271                                    }
1272                                })
1273                        )
1274                    } else {
1275                        Box::new(
1276                            idx.range(start..end)
1277                                .filter_map(move |entry| {
1278                                    let key = entry.key();
1279                                    if let Some(chain) = data.get(key)
1280                                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1281                                        && let Some(value) = &v.value
1282                                    {
1283                                        Some((key.clone(), value.clone()))
1284                                    } else {
1285                                        None
1286                                    }
1287                                })
1288                        )
1289                    };
1290                    self.ordered_iter = Some(iter);
1291                }
1292            } else {
1293                // Unordered full scan
1294                let start = self.start.clone();
1295                let end = self.end.clone();
1296                let snapshot_ts = self.snapshot_ts;
1297                let current_txn_id = self.current_txn_id;
1298                
1299                let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1300                    self.memtable.data.iter()
1301                        .filter_map(move |entry| {
1302                            let key = entry.key();
1303                            
1304                            if key.as_slice() < start.as_slice() {
1305                                return None;
1306                            }
1307                            if !end.is_empty() && key.as_slice() >= end.as_slice() {
1308                                return None;
1309                            }
1310                            
1311                            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1312                                && let Some(value) = &v.value
1313                            {
1314                                Some((key.clone(), value.clone()))
1315                            } else {
1316                                None
1317                            }
1318                        })
1319                );
1320                self.unordered_iter = Some(iter);
1321            }
1322        }
1323        
1324        // Get next from appropriate iterator
1325        if let Some(ref mut iter) = self.ordered_iter {
1326            iter.next()
1327        } else if let Some(ref mut iter) = self.unordered_iter {
1328            iter.next()
1329        } else {
1330            None
1331        }
1332    }
1333}
1334
1335/// MemTable with MVCC support
1336///
1337/// Uses DashMap for lock-free concurrent access per key.
1338/// This eliminates the global write lock bottleneck.
1339///
1340/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1341/// Maintains a deferred sorted index for efficient scans:
1342/// - Writes: O(1) append to hot buffer
1343/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1344pub struct MvccMemTable {
1345    /// Key -> VersionChain (sharded for concurrent access)
1346    data: DashMap<Vec<u8>, VersionChain>,
1347    /// Deferred sorted index for efficient prefix/range scans (optional)
1348    /// O(1) insert to hot buffer, O(N log N) sort on first scan
1349    /// When None, scan_prefix will fall back to O(N) DashMap iteration
1350    deferred_index: Option<DeferredSortedIndex>,
1351    /// Legacy SkipMap for compatibility (used when deferred=false)
1352    ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1353    /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1354    #[allow(dead_code)]
1355    use_deferred: bool,
1356    /// Approximate size in bytes
1357    size_bytes: AtomicU64,
1358    /// Epoch-based dirty list for efficient GC
1359    dirty_list: EpochDirtyList,
1360}
1361
1362impl Default for MvccMemTable {
1363    fn default() -> Self {
1364        Self::new()
1365    }
1366}
1367
1368impl MvccMemTable {
1369    pub fn new() -> Self {
1370        Self::with_ordered_index(true)
1371    }
1372
1373    /// Create memtable with optional ordered index
1374    ///
1375    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1376    /// but scan_prefix becomes O(N) instead of O(log N + K)
1377    ///
1378    /// Uses deferred sorting by default for better write performance:
1379    /// - Writes: O(1) append to hot buffer
1380    /// - Scans: O(N log N) sort-on-demand
1381    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1382        Self::with_index_mode(enable_ordered_index, true)
1383    }
1384
1385    /// Create memtable with fine-grained control over indexing
1386    ///
1387    /// # Arguments
1388    /// * `enable_ordered_index` - Whether to maintain an ordered index
1389    /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1390    ///                    If false, use SkipMap (O(log N) writes)
1391    pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1392        Self {
1393            data: DashMap::new(),
1394            deferred_index: if enable_ordered_index && use_deferred {
1395                Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1396                    max_unsorted_entries: 10_000, // Compact every 10K writes
1397                    enabled: true,
1398                }))
1399            } else {
1400                None
1401            },
1402            ordered_index: if enable_ordered_index && !use_deferred {
1403                Some(SkipMap::new())
1404            } else {
1405                None
1406            },
1407            use_deferred,
1408            size_bytes: AtomicU64::new(0),
1409            dirty_list: EpochDirtyList::new(),
1410        }
1411    }
1412
1413    /// Write a key-value pair (uncommitted)
1414    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1415        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1416        let key_len = key.len();
1417
1418        // Track this key in the current epoch's dirty list for GC
1419        self.dirty_list.record_version(key.clone());
1420
1421        // Insert into ordered index for prefix scans (if enabled)
1422        // Deferred: O(1) append to hot buffer
1423        // SkipMap: O(log N) insert
1424        if let Some(ref idx) = self.deferred_index {
1425            idx.insert(key.clone());
1426        } else if let Some(ref idx) = self.ordered_index {
1427            idx.insert(key.clone(), ());
1428        }
1429
1430        // Use entry API for atomic get-or-insert
1431        let mut entry = self.data.entry(key).or_default();
1432
1433        // Check for write-write conflict
1434        if entry.has_write_conflict(txn_id) {
1435            return Err(SochDBError::Internal(
1436                "Write-write conflict detected".into(),
1437            ));
1438        }
1439        entry.add_uncommitted(value, txn_id);
1440        self.size_bytes
1441            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1442
1443        Ok(())
1444    }
1445
1446    /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1447    ///
1448    /// Optimizations applied (Rec 3: MVCC Batching):
1449    /// - Batched dirty list tracking: single lock acquire for all keys
1450    /// - Deferred index: O(1) append per key
1451    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1452        let mut total_size = 0u64;
1453
1454        // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1455        self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1456
1457        for (key, value) in writes {
1458            // Insert into ordered index (if enabled)
1459            // Deferred: O(1) append, SkipMap: O(log N)
1460            if let Some(ref idx) = self.deferred_index {
1461                idx.insert(key.clone());
1462            } else if let Some(ref idx) = self.ordered_index {
1463                idx.insert(key.clone(), ());
1464            }
1465
1466            let mut entry = self.data.entry(key.clone()).or_default();
1467
1468            if entry.has_write_conflict(txn_id) {
1469                return Err(SochDBError::Internal(
1470                    "Write-write conflict detected".into(),
1471                ));
1472            }
1473
1474            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1475            entry.add_uncommitted(value.clone(), txn_id);
1476            total_size += (key.len() + value_size) as u64;
1477        }
1478
1479        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1480        Ok(())
1481    }
1482
1483    /// Read at snapshot timestamp, with optional current txn to see own writes
1484    pub fn read(
1485        &self,
1486        key: &[u8],
1487        snapshot_ts: u64,
1488        current_txn_id: Option<u64>,
1489    ) -> Option<Vec<u8>> {
1490        self.data.get(key).and_then(|chain| {
1491            chain
1492                .read_at(snapshot_ts, current_txn_id)
1493                .and_then(|v| v.value.clone())
1494        })
1495    }
1496
1497    /// Commit all versions for a transaction
1498    ///
1499    /// Only updates the keys that were written by this transaction (tracked in write_set).
1500    /// Accepts InlineKey for zero-allocation MVCC tracking.
1501    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1502        // Only iterate over keys we know were written - O(k) instead of O(n)
1503        for key in write_set {
1504            if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1505                chain.commit(txn_id, commit_ts);
1506            }
1507        }
1508    }
1509
1510    /// Legacy commit method (iterates all keys) - kept for backward compatibility
1511    #[allow(dead_code)]
1512    pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1513        for mut entry in self.data.iter_mut() {
1514            entry.value_mut().commit(txn_id, commit_ts);
1515        }
1516    }
1517
1518    /// Abort all versions for a transaction
1519    pub fn abort(&self, txn_id: u64) {
1520        for mut entry in self.data.iter_mut() {
1521            entry.value_mut().abort(txn_id);
1522        }
1523    }
1524
1525    /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1526    ///
1527    /// ## Performance
1528    ///
1529    /// When ordered_index is enabled: O(log N + K) complexity
1530    /// - O(log N) to seek to the first key with prefix
1531    /// - O(K) to iterate matching keys
1532    ///
1533    /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1534    /// 
1535    /// ## Optimizations Applied
1536    /// 
1537    /// - Pre-allocates result vector based on expected output size
1538    /// - Uses batch-friendly iteration patterns
1539    /// - Minimizes allocations during iteration
1540    /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1541    pub fn scan_prefix(
1542        &self,
1543        prefix: &[u8],
1544        snapshot_ts: u64,
1545        current_txn_id: Option<u64>,
1546    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1547        // Estimate result size for pre-allocation (use 10% of total as heuristic)
1548        let estimated_size = (self.data.len() / 10).max(64);
1549        let mut results = Vec::with_capacity(estimated_size);
1550
1551        if let Some(ref idx) = self.deferred_index {
1552            // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1553            for key in idx.range_from(prefix) {
1554                // Stop when we've passed the prefix range
1555                if !key.starts_with(prefix) {
1556                    break;
1557                }
1558
1559                // O(1) lookup in DashMap for version chain
1560                if let Some(chain) = self.data.get(&key)
1561                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1562                    && let Some(value) = &v.value
1563                {
1564                    results.push((key, value.clone()));
1565                }
1566            }
1567        } else if let Some(ref idx) = self.ordered_index {
1568            // Fast path: O(log N) seek to first key >= prefix
1569            for entry in idx.range(prefix.to_vec()..) {
1570                let key = entry.key();
1571
1572                // Stop when we've passed the prefix range
1573                if !key.starts_with(prefix) {
1574                    break;
1575                }
1576
1577                // O(1) lookup in DashMap for version chain
1578                if let Some(chain) = self.data.get(key)
1579                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1580                    && let Some(value) = &v.value
1581                {
1582                    results.push((key.clone(), value.clone()));
1583                }
1584            }
1585        } else {
1586            // Fallback: O(N) full DashMap scan when ordered_index is disabled
1587            // Optimized with batch-friendly iteration
1588            for entry in self.data.iter() {
1589                let key = entry.key();
1590                if !key.starts_with(prefix) {
1591                    continue;
1592                }
1593                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1594                    && let Some(value) = &v.value
1595                {
1596                    results.push((key.clone(), value.clone()));
1597                }
1598            }
1599        }
1600
1601        results
1602    }
1603
1604    /// Optimized full scan with batch allocation
1605    /// 
1606    /// For use when scanning entire tables/namespaces.
1607    /// Pre-allocates based on actual data size.
1608    pub fn scan_all(
1609        &self,
1610        snapshot_ts: u64,
1611        current_txn_id: Option<u64>,
1612    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1613        let mut results = Vec::with_capacity(self.data.len());
1614
1615        for entry in self.data.iter() {
1616            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1617                && let Some(value) = &v.value
1618            {
1619                results.push((entry.key().clone(), value.clone()));
1620            }
1621        }
1622
1623        results
1624    }
1625
1626    /// Streaming scan iterator for very large datasets
1627    /// 
1628    /// Returns an iterator that yields (key, value) pairs without
1629    /// materializing the entire result set in memory.
1630    pub fn scan_prefix_iter<'a>(
1631        &'a self,
1632        prefix: &'a [u8],
1633        snapshot_ts: u64,
1634        current_txn_id: Option<u64>,
1635    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1636        self.data.iter().filter_map(move |entry| {
1637            let key = entry.key();
1638            if !key.starts_with(prefix) {
1639                return None;
1640            }
1641            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1642                && let Some(value) = &v.value
1643            {
1644                Some((key.clone(), value.clone()))
1645            } else {
1646                None
1647            }
1648        })
1649    }
1650
1651    /// Scan range
1652    pub fn scan_range(
1653        &self,
1654        start: &[u8],
1655        end: &[u8],
1656        snapshot_ts: u64,
1657        current_txn_id: Option<u64>,
1658    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1659        let mut results = Vec::new();
1660
1661        if let Some(ref idx) = self.deferred_index {
1662            // Deferred index path: sort-on-scan
1663            if end.is_empty() {
1664                for key in idx.range_from(start) {
1665                    if let Some(chain) = self.data.get(&key)
1666                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1667                        && let Some(value) = &v.value
1668                    {
1669                        results.push((key, value.clone()));
1670                    }
1671                }
1672            } else {
1673                for key in idx.range(start, end) {
1674                    if let Some(chain) = self.data.get(&key)
1675                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1676                        && let Some(value) = &v.value
1677                    {
1678                        results.push((key, value.clone()));
1679                    }
1680                }
1681            }
1682        } else if let Some(ref idx) = self.ordered_index {
1683            // Use range scan on SkipMap
1684            if end.is_empty() {
1685                // Unbounded end
1686                for entry in idx.range(start.to_vec()..) {
1687                    let key = entry.key();
1688                    if let Some(chain) = self.data.get(key)
1689                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1690                        && let Some(value) = &v.value
1691                    {
1692                        results.push((key.clone(), value.clone()));
1693                    }
1694                }
1695            } else {
1696                for entry in idx.range(start.to_vec()..end.to_vec()) {
1697                    let key = entry.key();
1698                    if let Some(chain) = self.data.get(key)
1699                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1700                        && let Some(value) = &v.value
1701                    {
1702                        results.push((key.clone(), value.clone()));
1703                    }
1704                }
1705            }
1706        } else {
1707            // Fallback to full scan if no ordered index
1708            for entry in self.data.iter() {
1709                let key = entry.key();
1710
1711                if key.as_slice() < start {
1712                    continue;
1713                }
1714                if !end.is_empty() && key.as_slice() >= end {
1715                    continue;
1716                }
1717
1718                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1719                    && let Some(value) = &v.value
1720                {
1721                    results.push((key.clone(), value.clone()));
1722                }
1723            }
1724        }
1725
1726        results
1727    }
1728
1729    /// Streaming range scan iterator for very large datasets
1730    /// 
1731    /// Returns an iterator that yields (key, value) pairs without
1732    /// materializing the entire result set in memory. Uses the ordered
1733    /// index when available for O(log N + K) complexity.
1734    /// 
1735    /// ## Zero-Allocation Design
1736    /// 
1737    /// While the iterator itself cannot avoid allocations for returned
1738    /// values (since the caller needs ownership), it avoids:
1739    /// - Pre-materializing all results
1740    /// - Intermediate buffers
1741    /// - Repeated key comparisons for already-visited entries
1742    /// 
1743    /// ## Usage
1744    /// 
1745    /// ```ignore
1746    /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1747    ///     // Process each result as it arrives
1748    ///     // Memory usage is O(1) per iteration, not O(N) total
1749    /// }
1750    /// ```
1751    pub fn scan_range_iter<'a>(
1752        &'a self,
1753        start: &'a [u8],
1754        end: &'a [u8],
1755        snapshot_ts: u64,
1756        current_txn_id: Option<u64>,
1757    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1758        // Compact deferred index before scanning if needed
1759        if let Some(ref idx) = self.deferred_index {
1760            idx.compact();
1761        }
1762        
1763        // Use either ordered index or full scan
1764        let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1765        
1766        // Create iterator based on availability of ordered index
1767        ScanRangeIterator {
1768            memtable: self,
1769            start: start.to_vec(),
1770            end: end.to_vec(),
1771            snapshot_ts,
1772            current_txn_id,
1773            use_ordered,
1774            ordered_iter: None,
1775            unordered_iter: None,
1776            initialized: false,
1777        }
1778    }
1779
1780    /// Get approximate size
1781    pub fn size(&self) -> u64 {
1782        self.size_bytes.load(Ordering::Relaxed)
1783    }
1784
1785    /// Garbage collect old versions using epoch-based dirty list
1786    ///
1787    /// O(expired_versions) instead of O(all_versions)
1788    /// Only visits keys that had versions created in the old epoch.
1789    pub fn gc(&self, min_active_ts: u64) -> usize {
1790        // Advance epoch and get the dirty keys from the old epoch
1791        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1792
1793        if dirty_keys.is_empty() {
1794            return 0;
1795        }
1796
1797        let mut gc_count = 0;
1798
1799        // Only visit keys that were modified in the old epoch
1800        // Use a HashSet to deduplicate keys that were written multiple times
1801        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1802
1803        for key in unique_keys {
1804            if let Some(mut entry) = self.data.get_mut(&key) {
1805                let before = entry.value().version_count();
1806                entry.value_mut().gc(min_active_ts);
1807                gc_count += before.saturating_sub(entry.value().version_count());
1808            }
1809        }
1810
1811        gc_count
1812    }
1813
1814    /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1815    #[allow(dead_code)]
1816    pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1817        let mut gc_count = 0;
1818
1819        for mut entry in self.data.iter_mut() {
1820            let before = entry.value().version_count();
1821            entry.value_mut().gc(min_active_ts);
1822            gc_count += before.saturating_sub(entry.value().version_count());
1823        }
1824
1825        gc_count
1826    }
1827}
1828
1829// =============================================================================
1830// Rec 11: Unified MvccStore Implementation for MvccMemTable
1831// =============================================================================
1832
1833impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1834    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1835        self.read(key, snapshot_ts, txn_id)
1836    }
1837
1838    fn mvcc_put(
1839        &self,
1840        key: &[u8],
1841        value: Option<Vec<u8>>,
1842        txn_id: u64,
1843    ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1844        let mut entry = self.data.entry(key.to_vec()).or_default();
1845        if entry.has_write_conflict(txn_id) {
1846            return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1847        }
1848        entry.add_uncommitted(value, txn_id);
1849        Ok(())
1850    }
1851
1852    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1853        if let Some(mut chain) = self.data.get_mut(key) {
1854            return chain.commit(txn_id, commit_ts);
1855        }
1856        false
1857    }
1858
1859    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1860        if let Some(mut chain) = self.data.get_mut(key) {
1861            chain.abort(txn_id);
1862        }
1863    }
1864
1865    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1866        self.data
1867            .get(key)
1868            .map(|chain| chain.has_write_conflict(txn_id))
1869            .unwrap_or(false)
1870    }
1871
1872    fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1873        let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1874        for mut entry in self.data.iter_mut() {
1875            stats.keys_scanned += 1;
1876            let before = entry.value().version_count();
1877            entry.value_mut().gc(min_ts);
1878            stats.versions_removed += before.saturating_sub(entry.value().version_count());
1879        }
1880        stats
1881    }
1882
1883    fn mvcc_key_count(&self) -> usize {
1884        self.data.len()
1885    }
1886}
1887
1888// ============================================================================
1889// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1890// ============================================================================
1891
1892use crate::key_buffer::ArenaKeyHandle;
1893
1894/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1895struct ArenaEpochDirtyList {
1896    epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1897    current_epoch: AtomicU64,
1898}
1899
1900impl ArenaEpochDirtyList {
1901    fn new() -> Self {
1902        Self {
1903            epochs: [
1904                parking_lot::Mutex::new(Vec::new()),
1905                parking_lot::Mutex::new(Vec::new()),
1906                parking_lot::Mutex::new(Vec::new()),
1907                parking_lot::Mutex::new(Vec::new()),
1908            ],
1909            current_epoch: AtomicU64::new(0),
1910        }
1911    }
1912
1913    #[inline]
1914    fn record_version(&self, key: ArenaKeyHandle) {
1915        let epoch = self.current_epoch.load(Ordering::Relaxed);
1916        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1917        self.epochs[idx].lock().push(key);
1918    }
1919
1920    fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1921        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1922        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1923        let mut guard = self.epochs[old_idx].lock();
1924        let keys = std::mem::take(&mut *guard);
1925        (old_epoch, keys)
1926    }
1927}
1928
1929/// Arena-backed MVCC MemTable with optimized key storage
1930///
1931/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1932/// reducing per-write allocations from 3 to 1:
1933///
1934/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1935/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1936///
1937/// ## Performance
1938///
1939/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1940/// by reducing:
1941/// - Heap allocations: 3 → 1 per write
1942/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1943pub struct ArenaMvccMemTable {
1944    /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1945    data: DashMap<ArenaKeyHandle, VersionChain>,
1946    /// Ordered index for prefix scans
1947    ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1948    /// Approximate size in bytes
1949    size_bytes: AtomicU64,
1950    /// Epoch-based dirty list (arena-backed)
1951    dirty_list: ArenaEpochDirtyList,
1952}
1953
1954impl ArenaMvccMemTable {
1955    pub fn new() -> Self {
1956        Self::with_ordered_index(true)
1957    }
1958
1959    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1960        Self {
1961            data: DashMap::new(),
1962            ordered_index: if enable_ordered_index {
1963                Some(SkipMap::new())
1964            } else {
1965                None
1966            },
1967            size_bytes: AtomicU64::new(0),
1968            dirty_list: ArenaEpochDirtyList::new(),
1969        }
1970    }
1971
1972    /// Write a key-value pair using arena key handle
1973    ///
1974    /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
1975    /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
1976    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1977        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1978        let key_len = key.len();
1979
1980        // Create ONE ArenaKeyHandle - this is the only allocation for the key
1981        let key_handle = ArenaKeyHandle::new(key);
1982
1983        // Track in dirty list (O(1) copy of 16-byte handle)
1984        self.dirty_list.record_version(key_handle.clone());
1985
1986        // Insert into ordered index (O(1) copy of 16-byte handle)
1987        if let Some(ref idx) = self.ordered_index {
1988            idx.insert(key_handle.clone(), ());
1989        }
1990
1991        // Use entry API with the handle
1992        let mut entry = self.data.entry(key_handle).or_default();
1993
1994        if entry.has_write_conflict(txn_id) {
1995            return Err(SochDBError::Internal(
1996                "Write-write conflict detected".into(),
1997            ));
1998        }
1999        entry.add_uncommitted(value, txn_id);
2000        self.size_bytes
2001            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2002
2003        Ok(())
2004    }
2005
2006    /// Write batch using arena key handles
2007    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2008        let mut total_size = 0u64;
2009
2010        for (key, value) in writes {
2011            let key_handle = ArenaKeyHandle::new(key);
2012
2013            self.dirty_list.record_version(key_handle.clone());
2014
2015            if let Some(ref idx) = self.ordered_index {
2016                idx.insert(key_handle.clone(), ());
2017            }
2018
2019            let mut entry = self.data.entry(key_handle).or_default();
2020
2021            if entry.has_write_conflict(txn_id) {
2022                return Err(SochDBError::Internal(
2023                    "Write-write conflict detected".into(),
2024                ));
2025            }
2026
2027            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2028            entry.add_uncommitted(value.clone(), txn_id);
2029            total_size += (key.len() + value_size) as u64;
2030        }
2031
2032        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2033        Ok(())
2034    }
2035
2036    /// Read at snapshot timestamp
2037    pub fn read(
2038        &self,
2039        key: &[u8],
2040        snapshot_ts: u64,
2041        current_txn_id: Option<u64>,
2042    ) -> Option<Vec<u8>> {
2043        // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2044        let key_handle = ArenaKeyHandle::new(key);
2045        self.data.get(&key_handle).and_then(|chain| {
2046            chain
2047                .read_at(snapshot_ts, current_txn_id)
2048                .and_then(|v| v.value.clone())
2049        })
2050    }
2051
2052    /// Commit transaction
2053    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2054        for key in write_set {
2055            let key_handle = ArenaKeyHandle::new(key.as_slice());
2056            if let Some(mut chain) = self.data.get_mut(&key_handle) {
2057                chain.commit(txn_id, commit_ts);
2058            }
2059        }
2060    }
2061
2062    /// Abort transaction
2063    pub fn abort(&self, txn_id: u64) {
2064        for mut entry in self.data.iter_mut() {
2065            entry.value_mut().abort(txn_id);
2066        }
2067    }
2068
2069    /// Scan prefix
2070    pub fn scan_prefix(
2071        &self,
2072        prefix: &[u8],
2073        snapshot_ts: u64,
2074        current_txn_id: Option<u64>,
2075    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2076        let mut results = Vec::new();
2077        let prefix_handle = ArenaKeyHandle::new(prefix);
2078
2079        if let Some(ref idx) = self.ordered_index {
2080            for entry in idx.range(prefix_handle..) {
2081                let key = entry.key();
2082
2083                if !key.as_bytes().starts_with(prefix) {
2084                    break;
2085                }
2086
2087                if let Some(chain) = self.data.get(key)
2088                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2089                    && let Some(value) = &v.value
2090                {
2091                    results.push((key.as_bytes().to_vec(), value.clone()));
2092                }
2093            }
2094        } else {
2095            for entry in self.data.iter() {
2096                let key = entry.key();
2097                if !key.as_bytes().starts_with(prefix) {
2098                    continue;
2099                }
2100                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2101                    && let Some(value) = &v.value
2102                {
2103                    results.push((key.as_bytes().to_vec(), value.clone()));
2104                }
2105            }
2106        }
2107
2108        results
2109    }
2110
2111    /// Get approximate size
2112    pub fn size(&self) -> u64 {
2113        self.size_bytes.load(Ordering::Relaxed)
2114    }
2115
2116    /// Garbage collect old versions
2117    pub fn gc(&self, min_active_ts: u64) -> usize {
2118        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2119
2120        if dirty_keys.is_empty() {
2121            return 0;
2122        }
2123
2124        let mut gc_count = 0;
2125        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2126
2127        for key in unique_keys {
2128            if let Some(mut entry) = self.data.get_mut(&key) {
2129                let before = entry.value().version_count();
2130                entry.value_mut().gc(min_active_ts);
2131                gc_count += before.saturating_sub(entry.value().version_count());
2132            }
2133        }
2134
2135        gc_count
2136    }
2137}
2138
2139impl Default for ArenaMvccMemTable {
2140    fn default() -> Self {
2141        Self::new()
2142    }
2143}
2144
2145// ============================================================================
2146// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2147// ============================================================================
2148
2149/// Configuration for memtable type selection
2150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2151pub enum MemTableType {
2152    /// Standard MVCC memtable with deferred sorting
2153    /// Best for: general workloads, balanced read/write
2154    Standard,
2155    /// Arena-backed memtable with reduced allocations
2156    /// Best for: write-heavy workloads, large keys
2157    Arena,
2158}
2159
2160impl Default for MemTableType {
2161    fn default() -> Self {
2162        // Default to Standard which now has deferred sorting
2163        MemTableType::Standard
2164    }
2165}
2166
2167/// Unified memtable abstraction using enum dispatch
2168///
2169/// This pattern provides:
2170/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2171/// - Type-safe switching between implementations
2172/// - Easy extensibility for future memtable types
2173///
2174/// ## Why Enum over Trait Object?
2175///
2176/// - Hot path performance: enum match is a single branch vs vtable indirection
2177/// - Cache friendliness: no pointer chasing
2178/// - Inlining: compiler can inline through enum dispatch
2179pub enum MemTableKind {
2180    Standard(MvccMemTable),
2181    Arena(ArenaMvccMemTable),
2182}
2183
2184impl MemTableKind {
2185    /// Create a new memtable of the specified type
2186    pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2187        match kind {
2188            MemTableType::Standard => {
2189                MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2190            }
2191            MemTableType::Arena => {
2192                MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2193            }
2194        }
2195    }
2196
2197    /// Write a key-value pair
2198    #[inline]
2199    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2200        match self {
2201            MemTableKind::Standard(m) => m.write(key, value, txn_id),
2202            MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2203        }
2204    }
2205
2206    /// Write batch of key-value pairs
2207    #[inline]
2208    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2209        match self {
2210            MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2211            MemTableKind::Arena(m) => {
2212                // Convert to arena-compatible format
2213                let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2214                    .iter()
2215                    .map(|(k, v)| (k.as_slice(), v.clone()))
2216                    .collect();
2217                m.write_batch(&arena_writes, txn_id)
2218            }
2219        }
2220    }
2221
2222    /// Read at snapshot timestamp
2223    #[inline]
2224    pub fn read(
2225        &self,
2226        key: &[u8],
2227        snapshot_ts: u64,
2228        current_txn_id: Option<u64>,
2229    ) -> Option<Vec<u8>> {
2230        match self {
2231            MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2232            MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2233        }
2234    }
2235
2236    /// Commit transaction
2237    #[inline]
2238    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2239        match self {
2240            MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2241            MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2242        }
2243    }
2244
2245    /// Abort transaction
2246    #[inline]
2247    pub fn abort(&self, txn_id: u64) {
2248        match self {
2249            MemTableKind::Standard(m) => m.abort(txn_id),
2250            MemTableKind::Arena(m) => m.abort(txn_id),
2251        }
2252    }
2253
2254    /// Scan prefix
2255    #[inline]
2256    pub fn scan_prefix(
2257        &self,
2258        prefix: &[u8],
2259        snapshot_ts: u64,
2260        current_txn_id: Option<u64>,
2261    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2262        match self {
2263            MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2264            MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2265        }
2266    }
2267
2268    /// Scan range
2269    #[inline]
2270    pub fn scan_range(
2271        &self,
2272        start: &[u8],
2273        end: &[u8],
2274        snapshot_ts: u64,
2275        current_txn_id: Option<u64>,
2276    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2277        match self {
2278            MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2279            MemTableKind::Arena(m) => {
2280                // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2281                let mut results = Vec::new();
2282                if let Some(ref idx) = m.ordered_index {
2283                    let start_handle = ArenaKeyHandle::new(start);
2284                    let end_handle = ArenaKeyHandle::new(end);
2285                    
2286                    if end.is_empty() {
2287                        for entry in idx.range(start_handle..) {
2288                            let key = entry.key();
2289                            if let Some(chain) = m.data.get(key)
2290                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2291                                && let Some(value) = &v.value
2292                            {
2293                                results.push((key.as_bytes().to_vec(), value.clone()));
2294                            }
2295                        }
2296                    } else {
2297                        for entry in idx.range(start_handle..end_handle) {
2298                            let key = entry.key();
2299                            if let Some(chain) = m.data.get(key)
2300                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2301                                && let Some(value) = &v.value
2302                            {
2303                                results.push((key.as_bytes().to_vec(), value.clone()));
2304                            }
2305                        }
2306                    }
2307                } else {
2308                    for entry in m.data.iter() {
2309                        let key = entry.key();
2310                        let key_bytes = key.as_bytes();
2311                        if key_bytes < start {
2312                            continue;
2313                        }
2314                        if !end.is_empty() && key_bytes >= end {
2315                            continue;
2316                        }
2317                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2318                            && let Some(value) = &v.value
2319                        {
2320                            results.push((key_bytes.to_vec(), value.clone()));
2321                        }
2322                    }
2323                }
2324                results
2325            }
2326        }
2327    }
2328
2329    /// Scan range iterator (returns collected results for now)
2330    #[inline]
2331    pub fn scan_range_iter<'a>(
2332        &'a self,
2333        start: &'a [u8],
2334        end: &'a [u8],
2335        snapshot_ts: u64,
2336        current_txn_id: Option<u64>,
2337    ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2338        match self {
2339            MemTableKind::Standard(m) => {
2340                Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2341            }
2342            MemTableKind::Arena(_) => {
2343                // Arena version returns collected results as iterator
2344                let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2345                Box::new(results.into_iter())
2346            }
2347        }
2348    }
2349
2350    /// Get approximate size
2351    #[inline]
2352    pub fn size(&self) -> u64 {
2353        match self {
2354            MemTableKind::Standard(m) => m.size(),
2355            MemTableKind::Arena(m) => m.size(),
2356        }
2357    }
2358
2359    /// Garbage collect old versions
2360    #[inline]
2361    pub fn gc(&self, min_active_ts: u64) -> usize {
2362        match self {
2363            MemTableKind::Standard(m) => m.gc(min_active_ts),
2364            MemTableKind::Arena(m) => m.gc(min_active_ts),
2365        }
2366    }
2367
2368    /// Get the kind of memtable
2369    pub fn kind(&self) -> MemTableType {
2370        match self {
2371            MemTableKind::Standard(_) => MemTableType::Standard,
2372            MemTableKind::Arena(_) => MemTableType::Arena,
2373        }
2374    }
2375}
2376
2377/// Durable storage engine with full ACID support
2378pub struct DurableStorage {
2379    /// Path to storage directory
2380    path: PathBuf,
2381    /// Write-ahead log
2382    wal: Arc<TxnWal>,
2383    /// MVCC manager
2384    mvcc: Arc<MvccManager>,
2385    /// In-memory data (unified abstraction over Standard/Arena)
2386    memtable: Arc<MemTableKind>,
2387    /// Per-transaction WAL buffers for batched writes
2388    /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2389    /// At commit, buffer is flushed to WAL with single lock acquisition
2390    txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2391    /// Group commit buffer (optional)
2392    group_commit: Option<Arc<EventDrivenGroupCommit>>,
2393    /// Recovery state
2394    needs_recovery: AtomicU64, // 1 = needs recovery
2395    /// Last checkpoint LSN
2396    last_checkpoint_lsn: AtomicU64,
2397    /// Synchronous mode (like SQLite's PRAGMA synchronous)
2398    /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2399    sync_mode: AtomicU64,
2400    /// Commits since last sync (for NORMAL mode)
2401    commits_since_sync: AtomicU64,
2402    /// Adaptive batch sizing for NORMAL mode (Little's Law)
2403    /// Arrival rate in requests/sec × 1000 for precision
2404    arrival_rate_ema: AtomicU64,
2405    /// Last commit timestamp in microseconds
2406    last_commit_us: AtomicU64,
2407    /// Estimated fsync latency in microseconds
2408    fsync_latency_us: AtomicU64,
2409    /// Database lock for exclusive access (None = no locking)
2410    #[allow(dead_code)]
2411    db_lock: Option<crate::lock::DatabaseLock>,
2412}
2413
2414impl DurableStorage {
2415    /// Open or create durable storage at path
2416    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2417        Self::open_with_config(path, true)
2418    }
2419
2420    /// Open with configurable ordered index
2421    ///
2422    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2423    /// but scan_prefix becomes O(N) instead of O(log N + K)
2424    pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2425        Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2426    }
2427
2428    /// Open with arena-backed memtable for write-heavy workloads
2429    ///
2430    /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2431    /// Best for workloads with:
2432    /// - High write throughput
2433    /// - Large keys (reduces allocation overhead)
2434    /// - Minimal concurrent reads during writes
2435    pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2436        Self::open_with_full_config(path, true, MemTableType::Arena)
2437    }
2438
2439    /// Open with full configuration options
2440    ///
2441    /// # Arguments
2442    /// * `path` - Storage directory path
2443    /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2444    /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2445    ///
2446    /// # Locking
2447    ///
2448    /// Acquires an exclusive advisory lock on the database directory.
2449    /// This prevents concurrent multi-process access which would corrupt data.
2450    /// If another process has the database open, returns `Err(DatabaseLocked)`.
2451    pub fn open_with_full_config<P: AsRef<Path>>(
2452        path: P,
2453        enable_ordered_index: bool,
2454        memtable_type: MemTableType,
2455    ) -> Result<Self> {
2456        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2457    }
2458
2459    /// Open without locking (for testing crash recovery scenarios)
2460    ///
2461    /// # Safety
2462    /// This should ONLY be used in tests that simulate crashes by forgetting
2463    /// the storage instance. In production, always use `open_with_full_config`.
2464    #[cfg(test)]
2465    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2466        Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2467    }
2468
2469    fn open_with_full_config_internal<P: AsRef<Path>>(
2470        path: P,
2471        enable_ordered_index: bool,
2472        memtable_type: MemTableType,
2473        acquire_lock: bool,
2474    ) -> Result<Self> {
2475        let path = path.as_ref().to_path_buf();
2476        std::fs::create_dir_all(&path)?;
2477
2478        // Acquire exclusive lock on database directory (unless disabled for testing)
2479        let db_lock = if acquire_lock {
2480            Some(crate::lock::DatabaseLock::acquire(&path)
2481                .map_err(|e| SochDBError::LockError(e.to_string()))?)
2482        } else {
2483            None
2484        };
2485
2486        let wal_path = path.join("wal.log");
2487        let wal = Arc::new(TxnWal::new(&wal_path)?);
2488
2489        let storage = Self {
2490            path,
2491            wal: wal.clone(),
2492            mvcc: Arc::new(MvccManager::new()),
2493            memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2494            txn_write_buffers: DashMap::new(),
2495            group_commit: None,
2496            needs_recovery: AtomicU64::new(0),
2497            last_checkpoint_lsn: AtomicU64::new(0),
2498            sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2499            commits_since_sync: AtomicU64::new(0),
2500            // Adaptive batch sizing (Little's Law)
2501            arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2502            last_commit_us: AtomicU64::new(0),
2503            fsync_latency_us: AtomicU64::new(5000), // 5ms default
2504            db_lock,
2505        };
2506
2507        // Check if recovery needed
2508        if storage.check_recovery_needed()? {
2509            storage.needs_recovery.store(1, Ordering::SeqCst);
2510        }
2511
2512        Ok(storage)
2513    }
2514
2515    /// Open with group commit enabled
2516    pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2517        Self::open_with_group_commit_and_config(path, true)
2518    }
2519
2520    /// Open with group commit and configurable ordered index
2521    pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2522        path: P,
2523        enable_ordered_index: bool,
2524    ) -> Result<Self> {
2525        let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2526
2527        let wal = storage.wal.clone();
2528        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2529            // Write all commit records WITHOUT flushing (batch them)
2530            for &txn_id in txn_ids {
2531                let entry = TxnWalEntry::txn_commit(txn_id);
2532                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2533            }
2534
2535            // Then do a SINGLE flush + fsync for the entire batch
2536            wal.flush().map_err(|e| e.to_string())?;
2537            wal.sync().map_err(|e| e.to_string())?;
2538
2539            // Return commit timestamp
2540            Ok(std::time::SystemTime::now()
2541                .duration_since(std::time::UNIX_EPOCH)
2542                .unwrap()
2543                .as_micros() as u64)
2544        });
2545
2546        storage.group_commit = Some(Arc::new(gc));
2547        Ok(storage)
2548    }
2549
2550    /// Open with IndexPolicy for automatic memtable/index configuration
2551    ///
2552    /// This is the recommended constructor for new code. The policy determines:
2553    /// - Whether to use ordered index (ScanOptimized only)
2554    /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2555    /// - Default settings optimized for the workload pattern
2556    ///
2557    /// # Arguments
2558    /// * `path` - Storage directory path
2559    /// * `policy` - Index policy determining write/scan tradeoffs
2560    /// * `group_commit` - Whether to enable group commit for throughput
2561    pub fn open_with_policy<P: AsRef<Path>>(
2562        path: P,
2563        policy: crate::index_policy::IndexPolicy,
2564        group_commit: bool,
2565    ) -> Result<Self> {
2566        use crate::index_policy::IndexPolicy;
2567        
2568        // Derive configuration from policy
2569        let (enable_ordered_index, memtable_type) = match policy {
2570            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2571                // Write-heavy: no ordered index, use arena for reduced allocations
2572                (false, MemTableType::Arena)
2573            }
2574            IndexPolicy::Balanced => {
2575                // Mixed OLTP: deferred sorting (already implemented in Standard)
2576                (true, MemTableType::Standard)
2577            }
2578            IndexPolicy::ScanOptimized => {
2579                // Scan-heavy: maintain ordered index
2580                (true, MemTableType::Standard)
2581            }
2582        };
2583
2584        if group_commit {
2585            let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2586            
2587            let wal = storage.wal.clone();
2588            let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2589                for &txn_id in txn_ids {
2590                    let entry = TxnWalEntry::txn_commit(txn_id);
2591                    wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2592                }
2593                wal.flush().map_err(|e| e.to_string())?;
2594                wal.sync().map_err(|e| e.to_string())?;
2595                Ok(std::time::SystemTime::now()
2596                    .duration_since(std::time::UNIX_EPOCH)
2597                    .unwrap()
2598                    .as_micros() as u64)
2599            });
2600            storage.group_commit = Some(Arc::new(gc));
2601            Ok(storage)
2602        } else {
2603            Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2604        }
2605    }
2606
2607    /// Open storage for concurrent mode (multi-reader, single-writer)
2608    ///
2609    /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2610    /// Coordination is handled by the concurrent MVCC layer instead.
2611    ///
2612    /// # Safety
2613    ///
2614    /// This must ONLY be called from `Database::open_concurrent()` which
2615    /// manages the concurrent MVCC coordination. Direct use will cause
2616    /// data corruption.
2617    pub fn open_for_concurrent<P: AsRef<Path>>(
2618        path: P,
2619        policy: crate::index_policy::IndexPolicy,
2620    ) -> Result<Self> {
2621        use crate::index_policy::IndexPolicy;
2622        
2623        let (enable_ordered_index, memtable_type) = match policy {
2624            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2625                (false, MemTableType::Arena)
2626            }
2627            IndexPolicy::Balanced => {
2628                (true, MemTableType::Standard)
2629            }
2630            IndexPolicy::ScanOptimized => {
2631                (true, MemTableType::Standard)
2632            }
2633        };
2634
2635        // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2636        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, false)
2637    }
2638
2639    /// Get the memtable type being used
2640    pub fn memtable_type(&self) -> MemTableType {
2641        self.memtable.kind()
2642    }
2643
2644    /// Check if recovery is needed (dirty shutdown detection)
2645    ///
2646    /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2647    /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2648    /// but committed data still needs to be loaded from WAL into the memtable.
2649    fn check_recovery_needed(&self) -> Result<bool> {
2650        let marker_path = self.path.join(".clean_shutdown");
2651        if marker_path.exists() {
2652            // Clean shutdown - remove marker
2653            std::fs::remove_file(&marker_path)?;
2654        }
2655        // ALWAYS need recovery to rebuild memtable from WAL
2656        // The memtable is in-memory only and needs to be restored on every startup
2657        Ok(true)
2658    }
2659
2660    /// Perform crash recovery
2661    pub fn recover(&self) -> Result<RecoveryStats> {
2662        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2663            return Ok(RecoveryStats::default());
2664        }
2665
2666        let (writes, txn_count) = self.wal.replay_for_recovery()?;
2667
2668        // Apply committed writes to memtable
2669        let recovery_txn_id = self.wal.alloc_txn_id();
2670        let commit_ts = self.mvcc.alloc_commit_ts();
2671
2672        // Collect keys being written for efficient commit
2673        let mut write_set: HashSet<InlineKey> = HashSet::new();
2674        for (key, value) in &writes {
2675            write_set.insert(SmallVec::from_slice(key));
2676            self.memtable
2677                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2678        }
2679        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2680
2681        self.needs_recovery.store(0, Ordering::SeqCst);
2682
2683        Ok(RecoveryStats {
2684            transactions_recovered: txn_count,
2685            writes_recovered: writes.len(),
2686            commit_ts,
2687        })
2688    }
2689
2690    /// Begin a new transaction
2691    pub fn begin_transaction(&self) -> Result<u64> {
2692        let txn_id = self.wal.begin_transaction()?;
2693        self.mvcc.begin(txn_id);
2694        Ok(txn_id)
2695    }
2696
2697    /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2698    ///
2699    /// This enables mode-aware optimizations:
2700    /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2701    /// - WriteOnly: Skip read tracking, faster bulk inserts
2702    /// - ReadWrite: Full SSI for serializable isolation
2703    pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2704        let txn_id = self.wal.begin_transaction()?;
2705        self.mvcc.begin_with_mode(txn_id, mode);
2706        Ok(txn_id)
2707    }
2708
2709    /// Begin a read-only transaction without any WAL records.
2710    ///
2711    /// This is a performance-critical optimization that eliminates two WAL
2712    /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
2713    /// transactions have no state to recover, WAL records are unnecessary.
2714    ///
2715    /// Callers MUST use `abort_read_only_fast()` to clean up.
2716    #[inline]
2717    pub fn begin_read_only_fast(&self) -> u64 {
2718        let txn_id = self.wal.alloc_txn_id();
2719        self.mvcc.begin_read_only(txn_id);
2720        txn_id
2721    }
2722
2723    /// Abort a fast read-only transaction.
2724    ///
2725    /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
2726    #[inline]
2727    pub fn abort_read_only_fast(&self, txn_id: u64) {
2728        self.mvcc.abort(txn_id);
2729    }
2730
2731    /// Read a key WITHOUT any MVCC transaction tracking.
2732    ///
2733    /// Uses the current global timestamp to see all committed writes.
2734    /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
2735    /// Only safe for single-threaded access (no concurrent writes).
2736    #[inline]
2737    pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
2738        let snapshot_ts = self.mvcc.ts_counter.load(std::sync::atomic::Ordering::Relaxed);
2739        self.memtable.read(key, snapshot_ts, None)
2740    }
2741
2742    /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
2743    ///
2744    /// Uses the current global timestamp. Only safe for single-threaded access.
2745    #[inline]
2746    pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
2747        let snapshot_ts = self.mvcc.ts_counter.load(std::sync::atomic::Ordering::Relaxed);
2748        self.memtable.scan_prefix(prefix, snapshot_ts, None)
2749    }
2750
2751    /// Read a key within a transaction
2752    #[inline]
2753    pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2754        // Fast path: get just snapshot_ts without cloning whole transaction
2755        let snapshot_ts = self
2756            .mvcc
2757            .get_snapshot_ts(txn_id)
2758            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2759
2760        // Record read for SSI (skipped for read-only transactions)
2761        self.mvcc.record_read(txn_id, key);
2762
2763        // Read at snapshot timestamp, seeing own uncommitted writes
2764        Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2765    }
2766
2767    /// Write a key-value pair within a transaction
2768    ///
2769    /// Writes are buffered and only flushed to disk on commit.
2770    /// This provides ~10× better throughput for batched inserts.
2771    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2772        // Use the zero-allocation path internally
2773        self.write_refs(txn_id, &key, &value)?;
2774
2775        Ok(())
2776    }
2777
2778    /// Write from references - zero allocation hot path
2779    ///
2780    /// Avoids cloning key/value by writing to WAL from refs directly,
2781    /// then only allocating once for memtable storage.
2782    #[inline]
2783    pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2784        // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2785        self.mvcc.record_write(txn_id, key);
2786
2787        // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2788        // This reduces lock contention from O(writes) to O(1) per transaction
2789        self.txn_write_buffers
2790            .entry(txn_id)
2791            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2792            .append(key, value);
2793
2794        // Write to memtable (needs owned key/value for storage)
2795        self.memtable
2796            .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2797
2798        Ok(())
2799    }
2800
2801    /// Delete a key within a transaction
2802    pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2803        // Record write (uses InlineKey - zero allocation for small keys)
2804        self.mvcc.record_write(txn_id, &key);
2805
2806        // Buffer tombstone in memory - NO WAL lock taken!
2807        self.txn_write_buffers
2808            .entry(txn_id)
2809            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2810            .append(&key, &[]); // Empty value = tombstone
2811
2812        // Write tombstone to memtable
2813        self.memtable.write(key, None, txn_id)?;
2814
2815        Ok(())
2816    }
2817
2818    /// Batch write multiple key-value pairs with reduced overhead
2819    ///
2820    /// This API amortizes fixed costs over the batch:
2821    /// - Single DashMap entry lookup for TxnWalBuffer
2822    /// - Single MVCC write set update
2823    /// - Batch memtable operations
2824    ///
2825    /// Performance: ~2-3x faster than individual write_refs calls
2826    /// for batches of 100+ entries.
2827    ///
2828    /// # Arguments
2829    /// * `txn_id` - Transaction ID
2830    /// * `writes` - Slice of (key, value) pairs
2831    #[inline]
2832    pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2833        if writes.is_empty() {
2834            return Ok(());
2835        }
2836
2837        // Single DashMap access for entire batch
2838        let mut buffer_entry = self
2839            .txn_write_buffers
2840            .entry(txn_id)
2841            .or_insert_with(|| TxnWalBuffer::new(txn_id));
2842
2843        // Batch operations with reduced per-row overhead
2844        for (key, value) in writes {
2845            // Record write for MVCC
2846            self.mvcc.record_write(txn_id, key);
2847
2848            // Append to WAL buffer
2849            buffer_entry.append(key, value);
2850        }
2851        drop(buffer_entry);
2852
2853        // Batch write to memtable
2854        let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2855            .iter()
2856            .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2857            .collect();
2858        self.memtable.write_batch(&owned_writes, txn_id)?;
2859
2860        Ok(())
2861    }
2862
2863    /// Commit a transaction
2864    ///
2865    /// With sync_mode:
2866    /// - 0 (OFF): No sync, risk of data loss
2867    /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2868    /// - 2 (FULL): Sync every commit (safest, slowest)
2869    pub fn commit(&self, txn_id: u64) -> Result<u64> {
2870        // First, flush all buffered writes to WAL with SINGLE lock acquisition
2871        // This is the key optimization: O(1) lock instead of O(writes) locks
2872        if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2873            && !buffer.is_empty()
2874        {
2875            // Flush entire buffer to WAL with one lock
2876            self.wal.flush_buffer(&buffer)?;
2877        }
2878
2879        // Use group commit if available, otherwise direct commit
2880        if let Some(gc) = &self.group_commit {
2881            // Submit to group commit and wait for result
2882            // This batches multiple commits into a single fsync
2883            gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
2884
2885            // Get commit timestamp and write_set from MVCC
2886            let (commit_ts, write_set) = self
2887                .mvcc
2888                .commit(txn_id)
2889                .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2890
2891            // Commit in memtable (O(k) where k = keys written)
2892            self.memtable.commit(txn_id, commit_ts, &write_set);
2893
2894            Ok(commit_ts)
2895        } else {
2896            // Direct commit path with adaptive sync (Little's Law)
2897            let sync_mode = self.sync_mode.load(Ordering::Relaxed);
2898            let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
2899
2900            // Update arrival rate for adaptive batching
2901            self.update_arrival_rate();
2902
2903            // Write commit record (no flush yet - BufWriter will buffer it)
2904            let entry = TxnWalEntry::txn_commit(txn_id);
2905            self.wal.append_no_flush(&entry)?;
2906
2907            // Determine if we should sync/flush based on mode
2908            let should_sync = match sync_mode {
2909                0 => false,                                      // OFF: never sync
2910                1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
2911                _ => true,                                       // FULL: always sync
2912            };
2913
2914            if should_sync {
2915                // Measure fsync latency for adaptive tuning
2916                let start = std::time::Instant::now();
2917                self.wal.flush()?;
2918                self.wal.sync()?;
2919                let latency_us = start.elapsed().as_micros() as u64;
2920
2921                // Update fsync latency estimate (EMA with α = 0.1)
2922                let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
2923                let new_latency = (old_latency * 9 + latency_us) / 10;
2924                self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
2925
2926                self.commits_since_sync.store(0, Ordering::Relaxed);
2927            }
2928
2929            // Get commit timestamp and write_set from MVCC
2930            let (commit_ts, write_set) = self
2931                .mvcc
2932                .commit(txn_id)
2933                .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2934
2935            // Commit in memtable (O(k) where k = keys written)
2936            self.memtable.commit(txn_id, commit_ts, &write_set);
2937
2938            Ok(commit_ts)
2939        }
2940    }
2941
2942    /// Update arrival rate using exponential moving average
2943    #[inline]
2944    fn update_arrival_rate(&self) {
2945        let now_us = std::time::SystemTime::now()
2946            .duration_since(std::time::UNIX_EPOCH)
2947            .unwrap()
2948            .as_micros() as u64;
2949
2950        let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
2951
2952        if last > 0 {
2953            let delta_us = now_us.saturating_sub(last);
2954            if delta_us > 0 && delta_us < 10_000_000 {
2955                // Ignore gaps > 10s
2956                // Rate = 1_000_000 / delta_us (requests/sec)
2957                // Stored as rate × 1000 for precision
2958                let instant_rate = 1_000_000_000 / delta_us;
2959
2960                // EMA with α = 0.1
2961                let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
2962                let new_rate = (old_rate * 9 + instant_rate) / 10;
2963                self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
2964            }
2965        }
2966    }
2967
2968    /// Compute optimal batch threshold using Little's Law
2969    ///
2970    /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
2971    /// Returns the number of commits to batch before fsync
2972    #[inline]
2973    fn adaptive_batch_threshold(&self) -> u64 {
2974        let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
2975        let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
2976
2977        if lambda <= 0.0 || tau <= 0.0 {
2978            return 100; // Fallback to fixed threshold
2979        }
2980
2981        // Little's Law: W* = sqrt(2 × τ × λ)
2982        // This minimizes total time = wait_time + fsync_overhead
2983        let n_opt = (2.0 * tau * lambda).sqrt();
2984
2985        // Clamp between 1 and 1000
2986        (n_opt as u64).clamp(1, 1000)
2987    }
2988
2989    /// Set synchronous mode
2990    ///
2991    /// - 0: OFF - No fsync (risk of data loss)
2992    /// - 1: NORMAL - Periodic fsync (balanced)
2993    /// - 2: FULL - Fsync every commit (safest)
2994    pub fn set_sync_mode(&self, mode: u64) {
2995        self.sync_mode.store(mode.min(2), Ordering::Relaxed);
2996    }
2997
2998    /// Force a group commit flush (useful for benchmarking or testing)
2999    pub fn flush_group_commit(&self) {
3000        if let Some(gc) = &self.group_commit {
3001            gc.flush_batch();
3002        }
3003    }
3004
3005    /// Abort a transaction
3006    ///
3007    /// Performance: O(1) for read-only transactions (no writes to clean up).
3008    /// For write transactions, O(N) memtable scan is required to remove
3009    /// uncommitted versions.
3010    pub fn abort(&self, txn_id: u64) -> Result<()> {
3011        // Check if transaction had any buffered writes.
3012        // Read-only transactions never populate txn_write_buffers,
3013        // so this returns None — allowing us to skip the O(N) memtable scan.
3014        let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3015
3016        if had_writes {
3017            // Write abort record to WAL (only needed if data was written)
3018            self.wal.abort_transaction(txn_id)?;
3019            // Clean up uncommitted memtable entries
3020            self.memtable.abort(txn_id);
3021        }
3022
3023        // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3024        self.mvcc.abort(txn_id);
3025
3026        Ok(())
3027    }
3028
3029    /// Scan keys with prefix
3030    #[inline]
3031    pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3032        // Fast path: get just snapshot_ts without cloning whole transaction
3033        let snapshot_ts = self
3034            .mvcc
3035            .get_snapshot_ts(txn_id)
3036            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3037
3038        // Note: Scan doesn't record individual key reads for SSI (too expensive)
3039        // SSI conflicts are tracked at the prefix level if needed
3040        Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3041    }
3042
3043    /// Scan keys in range
3044    #[inline]
3045    pub fn scan_range(
3046        &self,
3047        txn_id: u64,
3048        start: &[u8],
3049        end: &[u8],
3050    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3051        let snapshot_ts = self
3052            .mvcc
3053            .get_snapshot_ts(txn_id)
3054            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3055
3056        Ok(self
3057            .memtable
3058            .scan_range(start, end, snapshot_ts, Some(txn_id)))
3059    }
3060
3061    /// Streaming scan for very large result sets
3062    /// 
3063    /// Returns an iterator that yields (key, value) pairs without
3064    /// materializing the entire result set in memory.
3065    #[inline]
3066    pub fn scan_range_iter<'a>(
3067        &'a self,
3068        txn_id: u64,
3069        start: &'a [u8],
3070        end: &'a [u8],
3071    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3072        let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3073        self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3074    }
3075
3076    /// Force fsync to disk
3077    /// Flush the WAL's in-memory buffer to the OS
3078    ///
3079    /// This ensures all buffered writes are pushed from the BufWriter
3080    /// into the OS page cache. Call this before `fsync()` to ensure
3081    /// all data is durable.
3082    pub fn flush_wal(&self) -> Result<()> {
3083        self.wal.flush()
3084    }
3085
3086    /// Force sync the WAL to disk (fsync)
3087    pub fn fsync(&self) -> Result<()> {
3088        self.wal.sync()
3089    }
3090
3091    /// Write checkpoint
3092    pub fn checkpoint(&self) -> Result<u64> {
3093        let txn_id = 0; // System transaction
3094        let entry = TxnWalEntry::checkpoint(txn_id);
3095        let lsn = self.wal.append_sync(&entry)?;
3096        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3097        Ok(lsn)
3098    }
3099
3100    /// Truncate the WAL file after checkpoint.
3101    ///
3102    /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3103    /// space. The in-memory memtable retains all data for the current
3104    /// session, but a crash after truncation will result in data loss
3105    /// since the WAL is the only persistence mechanism for DurableStorage.
3106    ///
3107    /// Call after `checkpoint()` when WAL durability across restarts is
3108    /// not required (e.g. desktop telemetry viewers, caches).
3109    pub fn truncate_wal(&self) -> Result<()> {
3110        self.wal.truncate()
3111    }
3112
3113    /// Get storage statistics
3114    pub fn stats(&self) -> StorageStats {
3115        // Get WAL size from the WAL manager
3116        let wal_size = self.wal.size_bytes();
3117
3118        // Get active transaction count from MVCC
3119        let active_txns = self.mvcc.active_transaction_count();
3120
3121        StorageStats {
3122            memtable_size_bytes: self.memtable.size(),
3123            wal_size_bytes: wal_size,
3124            active_transactions: active_txns,
3125            min_active_snapshot: self.mvcc.min_active_snapshot(),
3126            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3127        }
3128    }
3129
3130    /// Garbage collect old versions
3131    pub fn gc(&self) -> usize {
3132        let min_ts = self.mvcc.min_active_snapshot();
3133        self.memtable.gc(min_ts)
3134    }
3135
3136    /// Clean shutdown
3137    pub fn shutdown(&self) -> Result<()> {
3138        // Sync WAL
3139        self.fsync()?;
3140
3141        // Write clean shutdown marker
3142        let marker_path = self.path.join(".clean_shutdown");
3143        std::fs::write(&marker_path, b"clean")?;
3144
3145        Ok(())
3146    }
3147}
3148
3149impl Drop for DurableStorage {
3150    fn drop(&mut self) {
3151        let _ = self.shutdown();
3152    }
3153}
3154
3155/// Recovery statistics
3156#[derive(Debug, Default)]
3157pub struct RecoveryStats {
3158    pub transactions_recovered: usize,
3159    pub writes_recovered: usize,
3160    pub commit_ts: u64,
3161}
3162
3163/// Storage statistics
3164#[derive(Debug, Default)]
3165pub struct StorageStats {
3166    pub memtable_size_bytes: u64,
3167    pub wal_size_bytes: u64,
3168    pub active_transactions: usize,
3169    pub min_active_snapshot: u64,
3170    pub last_checkpoint_lsn: u64,
3171}
3172
3173#[cfg(test)]
3174mod tests {
3175    use super::*;
3176    use tempfile::tempdir;
3177
3178    #[test]
3179    fn test_basic_transaction() {
3180        let dir = tempdir().unwrap();
3181        let storage = DurableStorage::open(dir.path()).unwrap();
3182
3183        // Begin transaction
3184        let txn_id = storage.begin_transaction().unwrap();
3185
3186        // Write data
3187        storage
3188            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3189            .unwrap();
3190        storage
3191            .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3192            .unwrap();
3193
3194        // Read back (within same transaction)
3195        let v1 = storage.read(txn_id, b"key1").unwrap();
3196        assert_eq!(v1, Some(b"value1".to_vec()));
3197
3198        // Commit
3199        let commit_ts = storage.commit(txn_id).unwrap();
3200        assert!(commit_ts > 0);
3201
3202        // Read in new transaction
3203        let txn2 = storage.begin_transaction().unwrap();
3204        let v1 = storage.read(txn2, b"key1").unwrap();
3205        assert_eq!(v1, Some(b"value1".to_vec()));
3206        storage.abort(txn2).unwrap();
3207    }
3208
3209    #[test]
3210    fn test_snapshot_isolation() {
3211        let dir = tempdir().unwrap();
3212        let storage = DurableStorage::open(dir.path()).unwrap();
3213
3214        // T1: Write initial value
3215        let t1 = storage.begin_transaction().unwrap();
3216        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3217        storage.commit(t1).unwrap();
3218
3219        // T2: Start reading (snapshot at this point)
3220        let t2 = storage.begin_transaction().unwrap();
3221
3222        // T3: Update the value
3223        let t3 = storage.begin_transaction().unwrap();
3224        storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3225        storage.commit(t3).unwrap();
3226
3227        // T2 should still see v1 (snapshot isolation)
3228        let v = storage.read(t2, b"key").unwrap();
3229        assert_eq!(v, Some(b"v1".to_vec()));
3230
3231        // New transaction should see v2
3232        let t4 = storage.begin_transaction().unwrap();
3233        let v = storage.read(t4, b"key").unwrap();
3234        assert_eq!(v, Some(b"v2".to_vec()));
3235
3236        storage.abort(t2).unwrap();
3237        storage.abort(t4).unwrap();
3238    }
3239
3240    #[test]
3241    fn test_abort_transaction() {
3242        let dir = tempdir().unwrap();
3243        let storage = DurableStorage::open(dir.path()).unwrap();
3244
3245        // Write initial value
3246        let t1 = storage.begin_transaction().unwrap();
3247        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3248        storage.commit(t1).unwrap();
3249
3250        // Start transaction that will abort
3251        let t2 = storage.begin_transaction().unwrap();
3252        storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3253        storage.abort(t2).unwrap();
3254
3255        // New transaction should see v1 (aborted changes not visible)
3256        let t3 = storage.begin_transaction().unwrap();
3257        let v = storage.read(t3, b"key").unwrap();
3258        assert_eq!(v, Some(b"v1".to_vec()));
3259        storage.abort(t3).unwrap();
3260    }
3261
3262    #[test]
3263    fn test_crash_recovery() {
3264        let dir = tempdir().unwrap();
3265
3266        // Phase 1: Write data and commit
3267        {
3268            // Use open_without_lock for crash simulation tests
3269            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3270
3271            // Set sync mode to FULL to ensure data is synced before "crash"
3272            storage.set_sync_mode(2); // FULL: sync every commit
3273
3274            let txn = storage.begin_transaction().unwrap();
3275            storage
3276                .write(txn, b"persist".to_vec(), b"data".to_vec())
3277                .unwrap();
3278            storage.commit(txn).unwrap();
3279
3280            // Simulate crash (no clean shutdown)
3281            std::mem::forget(storage);
3282        }
3283
3284        // Phase 2: Reopen and recover
3285        {
3286            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3287            let stats = storage.recover().unwrap();
3288            assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3289
3290            // Data should be recovered
3291            let txn = storage.begin_transaction().unwrap();
3292            let v = storage.read(txn, b"persist").unwrap();
3293            assert_eq!(v, Some(b"data".to_vec()));
3294            storage.abort(txn).unwrap();
3295        }
3296    }
3297
3298    #[test]
3299    fn test_scan_prefix() {
3300        let dir = tempdir().unwrap();
3301        let storage = DurableStorage::open(dir.path()).unwrap();
3302
3303        let txn = storage.begin_transaction().unwrap();
3304        storage
3305            .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3306            .unwrap();
3307        storage
3308            .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3309            .unwrap();
3310        storage
3311            .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3312            .unwrap();
3313        storage.commit(txn).unwrap();
3314
3315        let txn2 = storage.begin_transaction().unwrap();
3316        let users = storage.scan(txn2, b"user:").unwrap();
3317        assert_eq!(users.len(), 2);
3318        storage.abort(txn2).unwrap();
3319    }
3320
3321    #[test]
3322    fn test_delete() {
3323        let dir = tempdir().unwrap();
3324        let storage = DurableStorage::open(dir.path()).unwrap();
3325
3326        // Insert
3327        let t1 = storage.begin_transaction().unwrap();
3328        storage
3329            .write(t1, b"key".to_vec(), b"value".to_vec())
3330            .unwrap();
3331        storage.commit(t1).unwrap();
3332
3333        // Verify exists
3334        let t2 = storage.begin_transaction().unwrap();
3335        assert!(storage.read(t2, b"key").unwrap().is_some());
3336        storage.abort(t2).unwrap();
3337
3338        // Delete
3339        let t3 = storage.begin_transaction().unwrap();
3340        storage.delete(t3, b"key".to_vec()).unwrap();
3341        storage.commit(t3).unwrap();
3342
3343        // Verify deleted
3344        let t4 = storage.begin_transaction().unwrap();
3345        assert!(storage.read(t4, b"key").unwrap().is_none());
3346        storage.abort(t4).unwrap();
3347    }
3348
3349    #[test]
3350    fn test_gc() {
3351        let dir = tempdir().unwrap();
3352        let storage = DurableStorage::open(dir.path()).unwrap();
3353
3354        // Create multiple versions
3355        for i in 0..10 {
3356            let txn = storage.begin_transaction().unwrap();
3357            storage
3358                .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3359                .unwrap();
3360            storage.commit(txn).unwrap();
3361        }
3362
3363        // GC should reclaim old versions
3364        let gc_count = storage.gc();
3365        // At least some versions should be collected
3366        // (exact count depends on implementation)
3367        let _ = gc_count; // gc_count is usize, always >= 0
3368    }
3369
3370    #[test]
3371    fn test_group_commit() {
3372        use std::sync::Arc;
3373        use std::thread;
3374
3375        let dir = tempdir().unwrap();
3376        let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3377
3378        // Spawn multiple threads to commit concurrently
3379        let mut handles = vec![];
3380        for i in 0..4 {
3381            let storage = Arc::clone(&storage);
3382            handles.push(thread::spawn(move || {
3383                let txn = storage.begin_transaction().unwrap();
3384                storage
3385                    .write(
3386                        txn,
3387                        format!("key{}", i).into_bytes(),
3388                        format!("val{}", i).into_bytes(),
3389                    )
3390                    .unwrap();
3391                storage.commit(txn).unwrap()
3392            }));
3393        }
3394
3395        // Wait for all commits
3396        let mut commit_times = vec![];
3397        for h in handles {
3398            commit_times.push(h.join().unwrap());
3399        }
3400
3401        // All commits should succeed
3402        assert!(commit_times.iter().all(|&ts| ts > 0));
3403
3404        // Verify data persisted
3405        let txn = storage.begin_transaction().unwrap();
3406        for i in 0..4 {
3407            let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3408            assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3409        }
3410        storage.abort(txn).unwrap();
3411    }
3412
3413    // ==================== ArenaMvccMemTable Tests ====================
3414
3415    #[test]
3416    fn test_arena_memtable_basic_write_read() {
3417        let memtable = ArenaMvccMemTable::new();
3418
3419        // Write some values
3420        memtable
3421            .write(b"key1", Some(b"value1".to_vec()), 1)
3422            .unwrap();
3423        memtable
3424            .write(b"key2", Some(b"value2".to_vec()), 1)
3425            .unwrap();
3426
3427        // Read them back (uncommitted, so need txn_id match)
3428        assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3429        assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3430        assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3431    }
3432
3433    #[test]
3434    fn test_arena_memtable_update() {
3435        let memtable = ArenaMvccMemTable::new();
3436
3437        memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3438        memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3439
3440        assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3441    }
3442
3443    #[test]
3444    fn test_arena_memtable_delete() {
3445        let memtable = ArenaMvccMemTable::new();
3446
3447        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3448        memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3449
3450        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3451    }
3452
3453    #[test]
3454    fn test_arena_memtable_scan_prefix() {
3455        let memtable = ArenaMvccMemTable::new();
3456
3457        memtable
3458            .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3459            .unwrap();
3460        memtable
3461            .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3462            .unwrap();
3463        memtable
3464            .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3465            .unwrap();
3466        memtable
3467            .write(b"order:1", Some(b"order_data".to_vec()), 1)
3468            .unwrap();
3469
3470        // Create a write set and commit
3471        let mut write_set = HashSet::new();
3472        write_set.insert(InlineKey::from_slice(b"user:1:name"));
3473        write_set.insert(InlineKey::from_slice(b"user:1:email"));
3474        write_set.insert(InlineKey::from_slice(b"user:2:name"));
3475        write_set.insert(InlineKey::from_slice(b"order:1"));
3476        memtable.commit(1, 10, &write_set);
3477
3478        // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3479        let results = memtable.scan_prefix(b"user:1:", 11, None);
3480        assert_eq!(results.len(), 2);
3481
3482        // Scan for all users
3483        let results = memtable.scan_prefix(b"user:", 11, None);
3484        assert_eq!(results.len(), 3);
3485    }
3486
3487    #[test]
3488    fn test_arena_memtable_write_batch() {
3489        let memtable = ArenaMvccMemTable::new();
3490
3491        let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3492            (b"k1", Some(b"v1".to_vec())),
3493            (b"k2", Some(b"v2".to_vec())),
3494            (b"k3", Some(b"v3".to_vec())),
3495        ];
3496
3497        memtable.write_batch(&writes, 1).unwrap();
3498
3499        assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3500        assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3501        assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3502    }
3503
3504    #[test]
3505    fn test_arena_memtable_gc() {
3506        let memtable = ArenaMvccMemTable::new();
3507
3508        // Write multiple versions
3509        for i in 0..10 {
3510            memtable
3511                .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3512                .unwrap();
3513
3514            let mut write_set = HashSet::new();
3515            write_set.insert(InlineKey::from_slice(b"key"));
3516            memtable.commit(i + 1, (i + 1) * 10, &write_set);
3517        }
3518
3519        // GC old versions
3520        let gc_count = memtable.gc(90);
3521        let _ = gc_count; // gc_count is usize, always >= 0
3522    }
3523
3524    #[test]
3525    fn test_arena_memtable_size_tracking() {
3526        let memtable = ArenaMvccMemTable::new();
3527
3528        assert_eq!(memtable.size(), 0);
3529
3530        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3531
3532        assert!(memtable.size() > 0);
3533    }
3534
3535    #[test]
3536    fn test_arena_memtable_abort() {
3537        let memtable = ArenaMvccMemTable::new();
3538
3539        memtable
3540            .write(b"key", Some(b"uncommitted".to_vec()), 1)
3541            .unwrap();
3542
3543        // Visible to same txn
3544        assert_eq!(
3545            memtable.read(b"key", 0, Some(1)),
3546            Some(b"uncommitted".to_vec())
3547        );
3548
3549        // Not visible to other txns
3550        assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3551
3552        // Abort
3553        memtable.abort(1);
3554
3555        // No longer visible
3556        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3557    }
3558
3559    // ========================================================================
3560    // MemTableKind Tests - Unified Abstraction
3561    // ========================================================================
3562
3563    #[test]
3564    fn test_memtable_kind_standard() {
3565        let memtable = MemTableKind::new(MemTableType::Standard, true);
3566        assert_eq!(memtable.kind(), MemTableType::Standard);
3567
3568        // Write and read
3569        memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3570        
3571        // Commit transaction at ts=100
3572        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3573        memtable.commit(1, 100, &write_set);
3574        
3575        // Read after commit - snapshot_ts must be > commit_ts for visibility
3576        let v = memtable.read(b"key1", 101, None);
3577        assert_eq!(v, Some(b"value1".to_vec()));
3578    }
3579
3580    #[test]
3581    fn test_memtable_kind_arena() {
3582        let memtable = MemTableKind::new(MemTableType::Arena, true);
3583        assert_eq!(memtable.kind(), MemTableType::Arena);
3584
3585        // Write and read
3586        memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3587        
3588        // Commit at ts=100
3589        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3590        memtable.commit(1, 100, &write_set);
3591        
3592        // Read after commit - snapshot_ts > commit_ts
3593        let v = memtable.read(b"key1", 101, None);
3594        assert_eq!(v, Some(b"value1".to_vec()));
3595    }
3596
3597    #[test]
3598    fn test_memtable_kind_scan_range() {
3599        // Test both implementations have consistent behavior
3600        for kind in [MemTableType::Standard, MemTableType::Arena] {
3601            let memtable = MemTableKind::new(kind, true);
3602
3603            // Write some data
3604            for i in 0..5 {
3605                let key = format!("key{}", i);
3606                let value = format!("value{}", i);
3607                memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
3608            }
3609
3610            // Commit all at ts=100
3611            let write_set: HashSet<InlineKey> = (0..5)
3612                .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3613                .collect();
3614            memtable.commit(1, 100, &write_set);
3615
3616            // Scan range with snapshot_ts > commit_ts
3617            let results = memtable.scan_range(b"key1", b"key4", 101, None);
3618            assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
3619        }
3620    }
3621
3622    #[test]
3623    fn test_durable_storage_arena() {
3624        let dir = tempdir().unwrap();
3625        let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3626        
3627        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3628
3629        // Basic transaction should work the same
3630        let txn_id = storage.begin_transaction().unwrap();
3631        storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
3632        storage.commit(txn_id).unwrap();
3633
3634        let txn2 = storage.begin_transaction().unwrap();
3635        let v = storage.read(txn2, b"key1").unwrap();
3636        assert_eq!(v, Some(b"value1".to_vec()));
3637        storage.abort(txn2).unwrap();
3638    }
3639
3640    #[test]
3641    fn test_durable_storage_full_config() {
3642        let dir = tempdir().unwrap();
3643        
3644        // Test with Arena and ordered index enabled
3645        let storage = DurableStorage::open_with_full_config(
3646            dir.path(),
3647            true,
3648            MemTableType::Arena,
3649        ).unwrap();
3650        
3651        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3652
3653        // Write multiple keys
3654        let txn = storage.begin_transaction().unwrap();
3655        for i in 0..10 {
3656            let key = format!("key{:02}", i);
3657            let value = format!("value{}", i);
3658            storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
3659        }
3660        storage.commit(txn).unwrap();
3661
3662        // Scan should work (uses scan method for prefix)
3663        let txn2 = storage.begin_transaction().unwrap();
3664        let results = storage.scan(txn2, b"key0").unwrap();
3665        assert_eq!(results.len(), 10); // key00 through key09
3666        storage.abort(txn2).unwrap();
3667    }
3668}