sochdb_storage/
durable_storage.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Durable Storage Layer
16//!
17//! This module wires together all storage components into a production-ready
18//! durable storage engine:
19//!
20//! - WAL (txn_wal.rs) for durability
21//! - Group Commit for throughput
22//! - MVCC for isolation
23//! - LSCS for columnar efficiency
24//!
25//! ## Architecture
26//!
27//! ```text
28//! ┌─────────────────────────────────────────────────────────────────┐
29//! │                      DurableStorage                              │
30//! ├─────────────────────────────────────────────────────────────────┤
31//! │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
32//! │  │ MvccManager │    │ GroupCommit │───▶│ TxnWal (fsync)      │ │
33//! │  │             │    │             │    └─────────────────────┘ │
34//! │  │ ┌─────────┐ │    └─────────────┘                            │
35//! │  │ │Snapshots│ │                                                │
36//! │  │ └─────────┘ │    ┌─────────────────────────────────────────┐│
37//! │  │ ┌─────────┐ │    │              MemTable                    ││
38//! │  │ │ Txn Map │ │    │  (key → (value, txn_id, version))       ││
39//! │  │ └─────────┘ │    └─────────────────────────────────────────┘│
40//! │  └─────────────┘                                                │
41//! │                      ┌─────────────────────────────────────────┐│
42//! │                      │              LSCS (SST)                  ││
43//! │                      │  Immutable columnar segments             ││
44//! │                      └─────────────────────────────────────────┘│
45//! └─────────────────────────────────────────────────────────────────┘
46//! ```
47//!
48//! ## Concurrency
49//!
50//! - Writers: Serialize through WAL, use MVCC for conflict detection
51//! - Readers: Lock-free reads at snapshot timestamp
52//! - Commits: Batched through GroupCommit for throughput
53
54use std::collections::HashSet;
55use std::path::{Path, PathBuf};
56use std::sync::Arc;
57use std::sync::atomic::{AtomicU64, Ordering};
58
59use dashmap::DashMap;
60use smallvec::SmallVec;
61
62use crossbeam_skiplist::SkipMap;
63
64use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
65use crate::group_commit::EventDrivenGroupCommit;
66use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
67use sochdb_core::{Result, SochDBError};
68
69// =============================================================================
70// SSI Bloom Filter - Fast Conflict Pre-Filtering
71// =============================================================================
72
73/// Space-efficient Bloom filter for SSI conflict detection
74///
75/// Used to quickly determine if two transactions MIGHT have conflicting keys.
76/// False positives are acceptable (leads to unnecessary exact checks),
77/// but false negatives are not allowed.
78///
79/// ## Configuration
80///
81/// For 1000 keys with 1% false positive rate:
82/// - m = ~9600 bits ≈ 1.2 KB per transaction
83/// - k = 7 hash functions
84///
85/// ## Lazy Initialization
86///
87/// The bit vector is lazily initialized on first insert to avoid
88/// allocation overhead for read-only transactions.
89#[derive(Clone, Debug)]
90pub struct SsiBloomFilter {
91    /// Bit vector (each u64 holds 64 bits) - lazily initialized
92    bits: Option<Vec<u64>>,
93    /// Expected capacity (used for lazy init sizing)
94    expected_capacity: usize,
95    /// Number of hash functions to use
96    num_hashes: u32,
97}
98
99impl SsiBloomFilter {
100    /// Optimal number of bits per item for 1% false positive rate
101    /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
102    const BITS_PER_ITEM: f64 = 9.6;
103
104    /// Optimal number of hash functions for 1% false positive rate
105    /// k = (m/n) × ln(2) ≈ 7
106    const DEFAULT_NUM_HASHES: u32 = 7;
107
108    /// Minimum capacity to avoid tiny filters
109    const MIN_CAPACITY: usize = 64;
110
111    /// Create a new bloom filter for expected item count (lazy allocation)
112    ///
113    /// Configured for ~1% false positive rate.
114    /// The bit vector is not allocated until first insert.
115    #[inline]
116    pub fn new(expected_items: usize) -> Self {
117        Self {
118            bits: None,
119            expected_capacity: expected_items.max(Self::MIN_CAPACITY),
120            num_hashes: Self::DEFAULT_NUM_HASHES,
121        }
122    }
123
124    /// Create with specific capacity in words (for memory-constrained scenarios)
125    pub fn with_word_capacity(words: usize) -> Self {
126        Self {
127            bits: None,
128            expected_capacity: words.max(1) * 64 / 10, // Approx items from words
129            num_hashes: Self::DEFAULT_NUM_HASHES,
130        }
131    }
132
133    /// Ensure bits are allocated (lazy initialization)
134    #[inline]
135    fn ensure_allocated(&mut self) {
136        if self.bits.is_none() {
137            let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
138            let num_words = num_bits.div_ceil(64);
139            self.bits = Some(vec![0u64; num_words]);
140        }
141    }
142
143    /// Add a key to the filter - O(k) where k = num_hashes
144    #[inline]
145    pub fn insert(&mut self, key: &[u8]) {
146        self.ensure_allocated();
147        let bits = self.bits.as_mut().unwrap();
148        let num_bits = bits.len() * 64;
149        if num_bits == 0 {
150            return;
151        }
152
153        // Use two hash functions to simulate k hash functions
154        // h(i) = h1 + i * h2 (double hashing technique)
155        let h1 = Self::hash1(key);
156        let h2 = Self::hash2(key);
157
158        for i in 0..self.num_hashes {
159            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
160            let bit_idx = (h as usize) % num_bits;
161            let word_idx = bit_idx / 64;
162            let bit_pos = bit_idx % 64;
163            bits[word_idx] |= 1 << bit_pos;
164        }
165    }
166
167    /// Check if a key might be present - O(k)
168    ///
169    /// Returns:
170    /// - false: Key is definitely NOT in the set (or filter not initialized)
171    /// - true: Key MIGHT be in the set (needs exact check)
172    #[inline]
173    pub fn may_contain(&self, key: &[u8]) -> bool {
174        let bits = match &self.bits {
175            Some(b) => b,
176            None => return false, // Uninitialized = empty
177        };
178        let num_bits = bits.len() * 64;
179        if num_bits == 0 {
180            return false;
181        }
182
183        let h1 = Self::hash1(key);
184        let h2 = Self::hash2(key);
185
186        for i in 0..self.num_hashes {
187            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
188            let bit_idx = (h as usize) % num_bits;
189            let word_idx = bit_idx / 64;
190            let bit_pos = bit_idx % 64;
191            if bits[word_idx] & (1 << bit_pos) == 0 {
192                return false; // Definitely not present
193            }
194        }
195        true // Might be present
196    }
197
198    /// Check if this filter might intersect with another
199    ///
200    /// Fast O(m/64) check using bitwise AND of all words.
201    /// If no bits are shared, sets are definitely disjoint.
202    #[inline]
203    pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
204        let (self_bits, other_bits) = match (&self.bits, &other.bits) {
205            (Some(s), Some(o)) => (s, o),
206            _ => return false, // Either uninitialized = no intersection
207        };
208        let min_len = self_bits.len().min(other_bits.len());
209        for i in 0..min_len {
210            if self_bits[i] & other_bits[i] != 0 {
211                return true; // Might intersect
212            }
213        }
214        false // Definitely disjoint
215    }
216
217    /// First hash function (using built-in hasher)
218    #[inline]
219    fn hash1(key: &[u8]) -> u64 {
220        use std::collections::hash_map::DefaultHasher;
221        use std::hash::{Hash, Hasher};
222        let mut hasher = DefaultHasher::new();
223        key.hash(&mut hasher);
224        hasher.finish()
225    }
226
227    /// Second hash function (using twox-hash for independence)
228    #[inline]
229    fn hash2(key: &[u8]) -> u64 {
230        twox_hash::xxh3::hash64(key)
231    }
232
233    /// Get the memory size in bytes
234    pub fn size_bytes(&self) -> usize {
235        self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
236    }
237
238    /// Check if the filter is empty
239    pub fn is_empty(&self) -> bool {
240        match &self.bits {
241            Some(bits) => bits.iter().all(|&w| w == 0),
242            None => true,
243        }
244    }
245}
246
247/// Type alias for inline key storage - keys up to 32 bytes stored on stack
248/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
249pub type InlineKey = SmallVec<[u8; 32]>;
250
251/// Version of a key-value pair
252#[derive(Debug, Clone)]
253pub struct Version {
254    /// The value (None = tombstone)
255    pub value: Option<Vec<u8>>,
256    /// Transaction that created this version
257    pub txn_id: u64,
258    /// Commit timestamp (0 = uncommitted)
259    pub commit_ts: u64,
260}
261
262// ============================================================================
263// Optimized VersionChain with Binary Search (Task 1: mm.md)
264// ============================================================================
265
266/// Multi-version data for a single key with O(log v) read complexity
267///
268/// ## Optimization: Binary Search with Sorted Commit Ordering
269///
270/// Separates committed versions (sorted descending by commit_ts) from 
271/// uncommitted version (single optional slot per transaction).
272///
273/// **Before:** O(v) linear scan + O(v) max computation = O(v)
274/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
275///
276/// For v=10 versions: 3.3x speedup
277/// For v=100 versions: 7x speedup
278#[derive(Debug, Default)]
279pub struct VersionChain {
280    /// Committed versions sorted by commit_ts DESCENDING (newest first)
281    /// This ordering enables efficient binary search using partition_point
282    committed: Vec<Version>,
283    /// Single uncommitted version slot (at most one per transaction writing this key)
284    uncommitted: Option<Version>,
285}
286
287impl VersionChain {
288    /// Create a new empty version chain
289    #[inline]
290    pub fn new() -> Self {
291        Self {
292            committed: Vec::new(),
293            uncommitted: None,
294        }
295    }
296
297    /// Add a new uncommitted version
298    /// If there's already an uncommitted version from this txn, update it in place
299    /// 
300    /// O(1) - just updates the uncommitted slot
301    #[inline]
302    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
303        match &mut self.uncommitted {
304            Some(v) if v.txn_id == txn_id => {
305                // Update in place - O(1)
306                v.value = value;
307            }
308            Some(_) => {
309                // Different transaction - this is a write conflict!
310                // The caller should have checked has_write_conflict first
311                // For safety, we overwrite (this will be caught at commit)
312                self.uncommitted = Some(Version {
313                    value,
314                    txn_id,
315                    commit_ts: 0,
316                });
317            }
318            None => {
319                // New uncommitted version - O(1)
320                self.uncommitted = Some(Version {
321                    value,
322                    txn_id,
323                    commit_ts: 0,
324                });
325            }
326        }
327    }
328
329    /// Commit a version - moves from uncommitted slot to sorted committed list
330    /// 
331    /// O(log v) - inserts into sorted position using binary search
332    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
333        if let Some(ref mut v) = self.uncommitted {
334            if v.txn_id == txn_id && v.commit_ts == 0 {
335                v.commit_ts = commit_ts;
336                let committed_version = self.uncommitted.take().unwrap();
337                
338                // Insert into sorted position (descending by commit_ts)
339                // Use partition_point to find insertion point in O(log v)
340                let insert_pos = self.committed.partition_point(|existing| existing.commit_ts > commit_ts);
341                self.committed.insert(insert_pos, committed_version);
342                
343                return true;
344            }
345        }
346        false
347    }
348
349    /// Abort a version (remove uncommitted version for txn)
350    /// 
351    /// O(1) - just clears the uncommitted slot if it matches
352    #[inline]
353    pub fn abort(&mut self, txn_id: u64) {
354        if let Some(ref v) = self.uncommitted {
355            if v.txn_id == txn_id {
356                self.uncommitted = None;
357            }
358        }
359    }
360
361    /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
362    /// Returns the most recent committed version visible at snapshot_ts,
363    /// or an uncommitted version if it belongs to current_txn_id.
364    ///
365    /// ## Complexity: O(1) + O(log v) = O(log v)
366    ///
367    /// 1. O(1) check for uncommitted version from current transaction
368    /// 2. O(log v) binary search for most recent visible committed version
369    ///
370    /// Snapshot isolation: we see commits with commit_ts < snapshot_ts (strictly less)
371    #[inline]
372    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
373        // O(1): Check uncommitted version from current transaction
374        if let Some(txn_id) = current_txn_id {
375            if let Some(ref v) = self.uncommitted {
376                if v.txn_id == txn_id {
377                    return Some(v);
378                }
379            }
380        }
381
382        // O(log v): Binary search for first version with commit_ts < snapshot_ts
383        // Since committed is sorted descending by commit_ts, we find the first
384        // version where commit_ts < snapshot_ts (the newest visible version)
385        //
386        // partition_point returns the first index where predicate is false
387        // We want first index where commit_ts < snapshot_ts
388        let idx = self.committed.partition_point(|v| v.commit_ts >= snapshot_ts);
389        
390        // The version at idx (if exists) is the newest with commit_ts < snapshot_ts
391        self.committed.get(idx)
392    }
393
394    /// Check if there's an uncommitted version by another transaction
395    /// 
396    /// O(1) - just checks the uncommitted slot
397    #[inline]
398    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
399        if let Some(ref v) = self.uncommitted {
400            return v.txn_id != my_txn_id;
401        }
402        false
403    }
404
405    /// Garbage collect old versions
406    /// 
407    /// Keeps only versions that might be visible to active transactions,
408    /// plus one committed version before min_active_ts for new snapshots.
409    pub fn gc(&mut self, min_active_ts: u64) {
410        // Uncommitted version is always kept (will be committed or aborted)
411        
412        if self.committed.len() <= 1 {
413            return;
414        }
415
416        // Find versions to keep:
417        // 1. All versions with commit_ts > min_active_ts (visible to active txns)
418        // 2. One version with commit_ts <= min_active_ts (newest anchor point)
419        
420        // Since committed is sorted descending, find split point
421        let split_idx = self.committed.partition_point(|v| v.commit_ts > min_active_ts);
422        
423        // Keep all versions before split_idx (commit_ts > min_active_ts)
424        // Plus one version at split_idx if it exists (anchor point)
425        let keep_count = if split_idx < self.committed.len() {
426            split_idx + 1 // Keep one anchor version
427        } else {
428            split_idx
429        };
430        
431        self.committed.truncate(keep_count);
432    }
433
434    /// Get total version count (committed + uncommitted)
435    #[inline]
436    pub fn version_count(&self) -> usize {
437        self.committed.len() + if self.uncommitted.is_some() { 1 } else { 0 }
438    }
439
440    // Legacy compatibility: get versions vec (for tests)
441    #[cfg(test)]
442    pub fn versions(&self) -> Vec<Version> {
443        let mut result = self.committed.clone();
444        if let Some(ref v) = self.uncommitted {
445            result.push(v.clone());
446        }
447        result
448    }
449}
450
451// =============================================================================
452// Pre-sizing Constants to Avoid HashSet Resize Overhead
453// =============================================================================
454
455/// Default capacity for write_set HashSet
456/// Sized for typical OLTP transactions (10-50 keys)
457/// Avoids resize overhead that caused +11% regression
458const WRITE_SET_INITIAL_CAPACITY: usize = 32;
459
460/// Default capacity for read_set HashSet  
461/// Typically larger than write_set due to read-heavy patterns
462const READ_SET_INITIAL_CAPACITY: usize = 64;
463
464/// Transaction state for MVCC
465#[derive(Debug, Clone)]
466pub struct MvccTransaction {
467    /// Transaction ID
468    pub txn_id: u64,
469    /// Snapshot timestamp (reads see commits before this)
470    pub snapshot_ts: u64,
471    /// Keys written by this transaction - uses SmallVec for inline storage
472    /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
473    pub write_set: HashSet<InlineKey>,
474    /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
475    /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
476    pub read_set: HashSet<InlineKey>,
477    /// Bloom filter for write set - fast SSI pre-filtering
478    pub write_bloom: SsiBloomFilter,
479    /// Bloom filter for read set - fast SSI pre-filtering
480    pub read_bloom: SsiBloomFilter,
481    /// Transaction state
482    pub state: TxnState,
483    /// Transaction mode for SSI optimization (Recommendation 9)
484    /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
485    pub mode: TransactionMode,
486}
487
488impl MvccTransaction {
489    /// Create a new transaction with pre-sized collections
490    /// 
491    /// This avoids HashSet resize overhead during the transaction
492    /// which was causing +11% regression on write_set.insert().
493    #[inline]
494    pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
495        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
496    }
497    
498    /// Create a read-only transaction (SSI bypass - 2.6x faster)
499    ///
500    /// Read-only transactions skip all SSI tracking:
501    /// - No read_set allocation
502    /// - No read_bloom allocation  
503    /// - No commit validation
504    ///
505    /// ## Performance
506    /// 
507    /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
508    #[inline]
509    pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
510        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
511    }
512    
513    /// Create a write-only transaction (partial SSI bypass)
514    ///
515    /// Write-only transactions skip read tracking:
516    /// - No read_set tracking
517    /// - No read_bloom inserts
518    /// - Still needs write_set for commit
519    #[inline]
520    pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
521        Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
522    }
523    
524    /// Create transaction with specific mode
525    #[inline]
526    pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
527        // Optimize allocation based on mode
528        let (write_capacity, read_capacity) = match mode {
529            TransactionMode::ReadOnly => (0, 0),  // No tracking needed
530            TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
531            TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
532        };
533        Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
534    }
535
536    /// Create with custom capacities for expected workload
537    /// 
538    /// Use this when you know the transaction will write many keys
539    /// to avoid resize overhead entirely.
540    #[inline]
541    pub fn with_capacity(
542        txn_id: u64,
543        snapshot_ts: u64,
544        write_capacity: usize,
545        read_capacity: usize,
546        mode: TransactionMode,
547    ) -> Self {
548        Self {
549            txn_id,
550            snapshot_ts,
551            write_set: HashSet::with_capacity(write_capacity),
552            read_set: HashSet::with_capacity(read_capacity),
553            write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
554            read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
555            state: TxnState::Active,
556            mode,
557        }
558    }
559
560    /// Check if this is a read-only transaction
561    #[inline]
562    pub fn is_read_only(&self) -> bool {
563        self.write_set.is_empty()
564    }
565
566    /// Check if this is a single-key write transaction
567    #[inline]
568    pub fn is_single_key_write(&self) -> bool {
569        self.write_set.len() == 1 && self.read_set.len() <= 1
570    }
571}
572
573/// Transaction state
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575pub enum TxnState {
576    Active,
577    Committed,
578    Aborted,
579}
580
581// =============================================================================
582// Transaction Mode for SSI Bypass (Recommendation 9)
583// =============================================================================
584
585/// Transaction mode for SSI optimization
586///
587/// By classifying transactions at begin time, we can skip expensive SSI
588/// tracking for the majority of transactions:
589///
590/// | Mode      | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
591/// |-----------|-------------------|--------------------|-----------------|
592/// | ReadOnly  | None             | None               | ~10 ns          |
593/// | WriteOnly | None             | Full               | ~30 ns          |
594/// | ReadWrite | Full             | Full               | ~50 ns          |
595///
596/// ## Performance Analysis
597///
598/// For read-only transactions (typically 90% of workload):
599/// ```text
600/// Current:  T_txn = T_begin + N × (T_read + T_record) + T_commit
601///                 = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
602///
603/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
604///                 = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
605///
606/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
607/// ```
608#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
609pub enum TransactionMode {
610    /// Read-only transaction - skips ALL SSI tracking
611    /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
612    /// Safe to skip read_set, read_bloom, and commit validation entirely
613    ReadOnly,
614    
615    /// Write-only transaction - skips read tracking
616    /// Cannot form incoming rw-edges (no reads from concurrent writers)
617    /// Only needs write_set and write_bloom tracking
618    WriteOnly,
619    
620    /// Full read-write transaction (default) - complete SSI tracking
621    /// May form both incoming and outgoing rw-edges
622    /// Requires full validation at commit time
623    #[default]
624    ReadWrite,
625}
626
627impl TransactionMode {
628    /// Check if this mode requires read tracking
629    #[inline]
630    pub fn tracks_reads(&self) -> bool {
631        matches!(self, TransactionMode::ReadWrite)
632    }
633    
634    /// Check if this mode requires write tracking
635    #[inline]
636    pub fn tracks_writes(&self) -> bool {
637        matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
638    }
639    
640    /// Check if commit needs SSI validation
641    #[inline]
642    pub fn needs_ssi_validation(&self) -> bool {
643        matches!(self, TransactionMode::ReadWrite)
644    }
645}
646
647/// SSI conflict edge type
648#[derive(Debug, Clone, Copy, PartialEq, Eq)]
649pub enum ConflictType {
650    /// Read-write conflict: T1 reads X, then T2 writes X
651    ReadWrite,
652    /// Write-read conflict: T1 writes X, then T2 reads X  
653    WriteRead,
654}
655
656/// SSI conflict edge for dangerous structure detection
657#[derive(Debug, Clone)]
658pub struct ConflictEdge {
659    /// Source transaction
660    pub from_txn: u64,
661    /// Target transaction
662    pub to_txn: u64,
663    /// Type of conflict
664    pub conflict_type: ConflictType,
665}
666
667/// MVCC Manager with SSI support
668///
669/// Uses DashMap for lock-free per-transaction access.
670/// Implements Serializable Snapshot Isolation (SSI) with
671/// dangerous structure detection for rw-antidependency cycles.
672#[allow(clippy::type_complexity)]
673pub struct MvccManager {
674    /// Active transactions (sharded for concurrent access)
675    active_txns: DashMap<u64, MvccTransaction>,
676    /// Current timestamp counter
677    ts_counter: AtomicU64,
678    /// Minimum active snapshot timestamp (for GC)
679    min_active_ts: AtomicU64,
680    /// Recently committed transactions for SSI validation
681    /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
682    /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
683    recent_commits: DashMap<
684        u64,
685        (
686            u64,
687            SsiBloomFilter,
688            SsiBloomFilter,
689            HashSet<InlineKey>,
690            HashSet<InlineKey>,
691        ),
692    >,
693    /// Max recent commits to track
694    max_recent_commits: usize,
695}
696
697impl Default for MvccManager {
698    fn default() -> Self {
699        Self::new()
700    }
701}
702
703impl MvccManager {
704    pub fn new() -> Self {
705        Self {
706            active_txns: DashMap::new(),
707            ts_counter: AtomicU64::new(1),
708            min_active_ts: AtomicU64::new(0),
709            recent_commits: DashMap::new(),
710            max_recent_commits: 1000, // Track last 1000 commits for SSI
711        }
712    }
713
714    /// Begin a new transaction with snapshot isolation
715    /// 
716    /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
717    pub fn begin(&self, txn_id: u64) -> MvccTransaction {
718        self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
719    }
720    
721    /// Begin a read-only transaction (SSI bypass - 2.6x faster)
722    ///
723    /// Read-only transactions skip all SSI tracking, reducing
724    /// per-read overhead from ~82ns to ~32ns.
725    ///
726    /// ## Safety
727    ///
728    /// Caller must ensure no writes are performed. Attempting to
729    /// write in a read-only transaction will still succeed but
730    /// won't be tracked for SSI validation.
731    #[inline]
732    pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
733        self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
734    }
735    
736    /// Begin a write-only transaction (partial SSI bypass)
737    ///
738    /// Write-only transactions skip read tracking, reducing overhead
739    /// for insert-heavy workloads.
740    #[inline]
741    pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
742        self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
743    }
744    
745    /// Begin a transaction with specific mode
746    ///
747    /// This is the core transaction creation method that all other
748    /// begin_* methods delegate to.
749    pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
750        let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
751
752        // Create transaction with mode-optimized allocations
753        let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
754
755        self.active_txns.insert(txn_id, txn.clone());
756        self.update_min_active_ts();
757
758        txn
759    }
760
761    /// Get transaction if active (clones - use get_snapshot_ts for hot path)
762    pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
763        self.active_txns.get(&txn_id).map(|t| t.clone())
764    }
765
766    /// Fast path: get just the snapshot timestamp without cloning
767    /// This is the hot path for reads - avoids cloning bloom filters
768    #[inline]
769    pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
770        self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
771    }
772
773    /// Record a read (for SSI) - uses inline key storage + bloom filter
774    ///
775    /// ## SSI Bypass (Recommendation 9)
776    ///
777    /// For ReadOnly mode transactions, this is a no-op (instant return).
778    /// For WriteOnly mode transactions, this is a no-op.
779    /// Only ReadWrite mode transactions track reads for SSI.
780    ///
781    /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
782    #[inline]
783    pub fn record_read(&self, txn_id: u64, key: &[u8]) {
784        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
785            // SSI Bypass: Skip tracking for read-only and write-only modes
786            if !txn.mode.tracks_reads() {
787                return;
788            }
789            
790            // Only track reads if within reasonable bounds
791            if txn.read_set.len() < 10000 {
792                txn.read_set.insert(SmallVec::from_slice(key));
793                txn.read_bloom.insert(key);
794            }
795        }
796    }
797
798    /// Record a write - uses inline key storage + bloom filter
799    ///
800    /// Note: Even ReadOnly transactions can record writes (mode is a hint).
801    /// The mode only affects SSI tracking, not write capability.
802    pub fn record_write(&self, txn_id: u64, key: &[u8]) {
803        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
804            txn.write_set.insert(SmallVec::from_slice(key));
805            txn.write_bloom.insert(key);
806        }
807    }
808
809    /// Allocate commit timestamp
810    pub fn alloc_commit_ts(&self) -> u64 {
811        self.ts_counter.fetch_add(1, Ordering::SeqCst)
812    }
813
814    /// Commit transaction with SSI validation
815    /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
816    /// Returns None if SSI validation fails (dangerous structure detected)
817    ///
818    /// ## SSI Bypass (Recommendation 9)
819    ///
820    /// For ReadOnly mode: Skip validation entirely (~10ns commit)
821    /// For WriteOnly mode: Skip read-based validation (~30ns commit)
822    /// For ReadWrite mode: Full validation (~50ns commit)
823    pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
824        // Get transaction before removing
825        let txn = self.active_txns.get(&txn_id)?.clone();
826
827        // SSI Bypass: Skip validation for ReadOnly transactions
828        // ReadOnly can never form rw-antidependency cycles
829        if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
830            // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
831            // For ReadWrite: check SSI validation result
832            if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
833                // Abort on SSI violation
834                self.active_txns.remove(&txn_id);
835                self.update_min_active_ts();
836                return None;
837            }
838        }
839
840        let commit_ts = self.alloc_commit_ts();
841
842        // Extract write_set and remove transaction - takes ownership
843        let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
844
845        // OPTIMIZATION: Only track ReadWrite transactions for SSI
846        // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
847        let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite 
848            && !removed_txn.read_set.is_empty() 
849            && !removed_txn.write_set.is_empty();
850        
851        if needs_ssi_tracking {
852            // Need to clone write_set since we return it AND track it
853            let write_set_for_return = removed_txn.write_set.clone();
854            
855            self.track_commit_owned(
856                txn_id,
857                commit_ts,
858                removed_txn.read_bloom,
859                removed_txn.write_bloom,
860                removed_txn.read_set,
861                removed_txn.write_set,
862            );
863
864            self.update_min_active_ts();
865            Some((commit_ts, write_set_for_return))
866        } else {
867            // Fast path: no SSI tracking needed, avoid clone entirely
868            self.update_min_active_ts();
869            Some((commit_ts, removed_txn.write_set))
870        }
871    }
872
873    /// Validate SSI constraints for a committing transaction
874    ///
875    /// ## Transaction Classification (Task 3: Optimistic MVCC)
876    ///
877    /// Transactions are classified and routed through appropriate fast paths:
878    ///
879    /// | Class      | Criteria                      | Validation Cost |
880    /// |------------|-------------------------------|-----------------|
881    /// | ReadOnly   | write_set.is_empty()          | 0 ns           |
882    /// | SingleKey  | write_set.len() == 1          | 0 ns           |
883    /// | Disjoint   | bloom filters don't intersect | ~10 ns         |
884    /// | General    | full SSI check                | ~50 ns         |
885    ///
886    /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
887    /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
888    ///
889    /// Detects "dangerous structures" - rw-antidependency cycles:
890    /// - T1 reads X (snapshot sees old value)
891    /// - T2 writes X (concurrent write)  
892    /// - T2 reads Y (snapshot sees old value)
893    /// - T1 writes Y (concurrent write)
894    ///
895    /// If T1 → rw → T2 → rw → T1 exists, abort T1
896    #[inline]
897    fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
898        // =================================================================
899        // Fast Path 1: Read-only transactions (0 ns)
900        // =================================================================
901        // Read-only transactions can never form rw-antidependency cycles
902        // because they have no writes to create outgoing rw-edges
903        if txn.write_set.is_empty() {
904            return true;
905        }
906
907        // =================================================================
908        // Fast Path 2: No recent commits to check (0 ns)
909        // =================================================================
910        if self.recent_commits.is_empty() {
911            return true;
912        }
913
914        // =================================================================
915        // Fast Path 3: Single-key write transactions (0 ns)
916        // =================================================================
917        // A single-key write transaction cannot form a dangerous cycle:
918        // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
919        //   AND T2 to read what T1 wrote
920        // - With only one key in write_set, the same key would need to be
921        //   in both read_set AND write_set of both transactions
922        // - This is already prevented by our conflict detection (write-write)
923        if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
924            return true;
925        }
926
927        let my_snapshot = txn.snapshot_ts;
928
929        // =================================================================
930        // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
931        // =================================================================
932        // Pre-filter using bloom filters: if our write_bloom doesn't intersect
933        // with any concurrent transaction's read_bloom AND vice versa,
934        // there can be no rw-antidependency
935        let mut any_may_intersect = false;
936        for entry in self.recent_commits.iter() {
937            let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
938            
939            // Only check concurrent transactions
940            if *other_commit_ts <= my_snapshot {
941                continue;
942            }
943
944            // Check bloom filter intersection (O(m/64) per filter)
945            // If our writes may intersect their reads OR their writes may intersect our reads
946            if txn.write_bloom.may_intersect(other_read_bloom) 
947                || other_write_bloom.may_intersect(&txn.read_bloom) 
948            {
949                any_may_intersect = true;
950                break;
951            }
952        }
953
954        // No bloom intersection means definitely disjoint - no SSI conflict possible
955        if !any_may_intersect {
956            return true;
957        }
958
959        // =================================================================
960        // Full SSI Validation (~50 ns)
961        // =================================================================
962        // Check for rw-conflicts with recently committed transactions
963        // An rw-conflict exists if:
964        // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
965        // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
966
967        let mut in_conflict_with: Vec<u64> = Vec::new();
968        let mut out_conflict_to: Vec<u64> = Vec::new();
969
970        for entry in self.recent_commits.iter() {
971            let (
972                other_txn_id,
973                (
974                    other_commit_ts,
975                    _other_read_bloom,
976                    other_write_bloom,
977                    other_read_set,
978                    other_write_set,
979                ),
980            ) = entry.pair();
981
982            // Only consider transactions that committed after our snapshot started
983            // (concurrent transactions)
984            if *other_commit_ts <= my_snapshot {
985                continue;
986            }
987
988            // Check: other wrote → we read (other →rw→ me)
989            // T_other wrote a key that T_me read (rw-dependency inbound)
990            //
991            // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
992            // Only do expensive HashSet intersection if bloom says "maybe conflict"
993            let mut has_in_conflict = false;
994            for key in txn.read_set.iter() {
995                if other_write_bloom.may_contain(key) {
996                    // Bloom says maybe - do exact check
997                    if other_write_set.contains(key) {
998                        has_in_conflict = true;
999                        break;
1000                    }
1001                }
1002            }
1003            if has_in_conflict {
1004                in_conflict_with.push(*other_txn_id);
1005            }
1006
1007            // Check: we wrote → other read (me →rw→ other)
1008            // T_me wrote a key that T_other read (rw-dependency outbound)
1009            //
1010            // Bloom-accelerated: Use our write_bloom against their read_set
1011            let mut has_out_conflict = false;
1012            for key in other_read_set.iter() {
1013                if txn.write_bloom.may_contain(key) {
1014                    // Bloom says maybe - do exact check
1015                    if txn.write_set.contains(key) {
1016                        has_out_conflict = true;
1017                        break;
1018                    }
1019                }
1020            }
1021            if has_out_conflict {
1022                out_conflict_to.push(*other_txn_id);
1023            }
1024        }
1025
1026        // Dangerous structure: we have both incoming AND outgoing rw-edges
1027        // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1028        //
1029        // Conservative check: if both exist, abort
1030        // A more precise check would verify the cycle path, but this is safe
1031        if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1032            return false; // SSI violation - abort
1033        }
1034
1035        true
1036    }
1037
1038    /// Track a committed transaction for future SSI validation
1039    ///
1040    /// Only tracks transactions that have both reads AND writes, since SSI
1041    /// only detects rw-antidependency cycles. Pure read or pure write
1042    /// transactions can't form cycles.
1043    ///
1044    /// ## Optimization: Zero-Copy Transfer
1045    ///
1046    /// Takes ownership of sets instead of cloning to avoid the +15% commit
1047    /// phase regression. The caller should use mem::take() to transfer ownership.
1048    fn track_commit_owned(
1049        &self,
1050        txn_id: u64,
1051        commit_ts: u64,
1052        read_bloom: SsiBloomFilter,
1053        write_bloom: SsiBloomFilter,
1054        read_set: HashSet<InlineKey>,
1055        write_set: HashSet<InlineKey>,
1056    ) {
1057        // Optimization: Only track mixed read-write transactions
1058        // Pure reads can't create outgoing rw-edges
1059        // Pure writes can't create incoming rw-edges
1060        if read_set.is_empty() || write_set.is_empty() {
1061            return; // Skip tracking - can't form SSI cycle
1062        }
1063
1064        // Add to recent commits with bloom filters for fast SSI pre-filtering
1065        // No cloning needed - we take ownership
1066        self.recent_commits.insert(
1067            txn_id,
1068            (
1069                commit_ts,
1070                read_bloom,
1071                write_bloom,
1072                read_set,
1073                write_set,
1074            ),
1075        );
1076
1077        // Lazy pruning: only prune when we're significantly over capacity
1078        // Avoids pruning overhead on every commit
1079        if self.recent_commits.len() > self.max_recent_commits * 2 {
1080            // Remove entries with lowest commit_ts
1081            let min_active = self.min_active_ts.load(Ordering::Relaxed);
1082            self.recent_commits
1083                .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1084        }
1085    }
1086
1087    /// Legacy track_commit that clones - kept for compatibility
1088    #[allow(dead_code)]
1089    fn track_commit(
1090        &self,
1091        txn_id: u64,
1092        commit_ts: u64,
1093        read_bloom: SsiBloomFilter,
1094        write_bloom: SsiBloomFilter,
1095        read_set: &HashSet<InlineKey>,
1096        write_set: &HashSet<InlineKey>,
1097    ) {
1098        if read_set.is_empty() || write_set.is_empty() {
1099            return;
1100        }
1101        self.recent_commits.insert(
1102            txn_id,
1103            (
1104                commit_ts,
1105                read_bloom,
1106                write_bloom,
1107                read_set.clone(),
1108                write_set.clone(),
1109            ),
1110        );
1111    }
1112
1113    /// Abort transaction
1114    pub fn abort(&self, txn_id: u64) {
1115        self.active_txns.remove(&txn_id);
1116        self.update_min_active_ts();
1117    }
1118
1119    /// Get minimum active snapshot timestamp
1120    pub fn min_active_snapshot(&self) -> u64 {
1121        self.min_active_ts.load(Ordering::SeqCst)
1122    }
1123
1124    /// Get count of active transactions
1125    pub fn active_transaction_count(&self) -> usize {
1126        self.active_txns.len()
1127    }
1128
1129    fn update_min_active_ts(&self) {
1130        let min = self
1131            .active_txns
1132            .iter()
1133            .map(|entry| entry.value().snapshot_ts)
1134            .min()
1135            .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1136        self.min_active_ts.store(min, Ordering::SeqCst);
1137    }
1138}
1139
1140/// Epoch-based dirty list for O(expired) GC instead of O(n)
1141///
1142/// Instead of scanning ALL version chains, we track which keys have versions
1143/// created in each epoch. GC only needs to visit keys from old epochs.
1144struct EpochDirtyList {
1145    /// Ring buffer of epoch -> dirty keys
1146    /// Index = epoch % EPOCH_RING_SIZE
1147    epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1148    /// Current epoch
1149    current_epoch: AtomicU64,
1150}
1151
1152const EPOCH_RING_SIZE: usize = 4;
1153
1154impl EpochDirtyList {
1155    fn new() -> Self {
1156        Self {
1157            epochs: [
1158                parking_lot::Mutex::new(Vec::new()),
1159                parking_lot::Mutex::new(Vec::new()),
1160                parking_lot::Mutex::new(Vec::new()),
1161                parking_lot::Mutex::new(Vec::new()),
1162            ],
1163            current_epoch: AtomicU64::new(0),
1164        }
1165    }
1166
1167    /// Record a version created in the current epoch
1168    #[inline]
1169    fn record_version(&self, key: Vec<u8>) {
1170        let epoch = self.current_epoch.load(Ordering::Relaxed);
1171        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1172        self.epochs[idx].lock().push(key);
1173    }
1174
1175    /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1176    ///
1177    /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1178    /// For 100 writes: ~100x fewer mutex operations.
1179    #[inline]
1180    fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1181        let epoch = self.current_epoch.load(Ordering::Relaxed);
1182        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1183        let mut guard = self.epochs[idx].lock();
1184        guard.extend(keys);
1185    }
1186
1187    /// Advance to next epoch, returning old epoch's dirty keys
1188    fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1189        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1190        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1191
1192        // Drain the old epoch's dirty list
1193        let mut guard = self.epochs[old_idx].lock();
1194        let keys = std::mem::take(&mut *guard);
1195        (old_epoch, keys)
1196    }
1197
1198    /// Get current epoch
1199    #[allow(dead_code)]
1200    fn current(&self) -> u64 {
1201        self.current_epoch.load(Ordering::Relaxed)
1202    }
1203}
1204
1205// ============================================================================
1206// Streaming Scan Iterator
1207// ============================================================================
1208
1209/// Streaming iterator for range scans
1210/// 
1211/// Yields results one at a time without materializing the full result set.
1212/// This enables processing of very large result sets with O(1) memory per
1213/// iteration instead of O(N) for the entire result set.
1214struct ScanRangeIterator<'a> {
1215    memtable: &'a MvccMemTable,
1216    start: Vec<u8>,
1217    end: Vec<u8>,
1218    snapshot_ts: u64,
1219    current_txn_id: Option<u64>,
1220    use_ordered: bool,
1221    // We use Option to defer initialization
1222    ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1223    unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1224    initialized: bool,
1225}
1226
1227impl<'a> Iterator for ScanRangeIterator<'a> {
1228    type Item = (Vec<u8>, Vec<u8>);
1229    
1230    fn next(&mut self) -> Option<Self::Item> {
1231        // Lazy initialization on first call
1232        if !self.initialized {
1233            self.initialized = true;
1234            
1235            if self.use_ordered {
1236                // Try deferred index first (after compaction, it uses a SkipMap internally)
1237                if let Some(ref def_idx) = self.memtable.deferred_index {
1238                    let start = self.start.clone();
1239                    let end = self.end.clone();
1240                    let snapshot_ts = self.snapshot_ts;
1241                    let current_txn_id = self.current_txn_id;
1242                    let data = &self.memtable.data;
1243                    
1244                    // Collect keys from deferred index (already sorted after compact)
1245                    let keys: Vec<Vec<u8>> = if end.is_empty() {
1246                        def_idx.range_from(&start).collect()
1247                    } else {
1248                        def_idx.range(&start, &end).collect()
1249                    };
1250                    
1251                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1252                        keys.into_iter()
1253                            .filter_map(move |key| {
1254                                if let Some(chain) = data.get(&key)
1255                                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1256                                    && let Some(value) = &v.value
1257                                {
1258                                    Some((key, value.clone()))
1259                                } else {
1260                                    None
1261                                }
1262                            })
1263                    );
1264                    self.ordered_iter = Some(iter);
1265                } else if let Some(ref idx) = self.memtable.ordered_index {
1266                    let start = self.start.clone();
1267                    let end = self.end.clone();
1268                    let snapshot_ts = self.snapshot_ts;
1269                    let current_txn_id = self.current_txn_id;
1270                    let data = &self.memtable.data;
1271                    
1272                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
1273                        Box::new(
1274                            idx.range(start..)
1275                                .filter_map(move |entry| {
1276                                    let key = entry.key();
1277                                    if let Some(chain) = data.get(key)
1278                                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1279                                        && let Some(value) = &v.value
1280                                    {
1281                                        Some((key.clone(), value.clone()))
1282                                    } else {
1283                                        None
1284                                    }
1285                                })
1286                        )
1287                    } else {
1288                        Box::new(
1289                            idx.range(start..end)
1290                                .filter_map(move |entry| {
1291                                    let key = entry.key();
1292                                    if let Some(chain) = data.get(key)
1293                                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1294                                        && let Some(value) = &v.value
1295                                    {
1296                                        Some((key.clone(), value.clone()))
1297                                    } else {
1298                                        None
1299                                    }
1300                                })
1301                        )
1302                    };
1303                    self.ordered_iter = Some(iter);
1304                }
1305            } else {
1306                // Unordered full scan
1307                let start = self.start.clone();
1308                let end = self.end.clone();
1309                let snapshot_ts = self.snapshot_ts;
1310                let current_txn_id = self.current_txn_id;
1311                
1312                let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1313                    self.memtable.data.iter()
1314                        .filter_map(move |entry| {
1315                            let key = entry.key();
1316                            
1317                            if key.as_slice() < start.as_slice() {
1318                                return None;
1319                            }
1320                            if !end.is_empty() && key.as_slice() >= end.as_slice() {
1321                                return None;
1322                            }
1323                            
1324                            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1325                                && let Some(value) = &v.value
1326                            {
1327                                Some((key.clone(), value.clone()))
1328                            } else {
1329                                None
1330                            }
1331                        })
1332                );
1333                self.unordered_iter = Some(iter);
1334            }
1335        }
1336        
1337        // Get next from appropriate iterator
1338        if let Some(ref mut iter) = self.ordered_iter {
1339            iter.next()
1340        } else if let Some(ref mut iter) = self.unordered_iter {
1341            iter.next()
1342        } else {
1343            None
1344        }
1345    }
1346}
1347
1348/// MemTable with MVCC support
1349///
1350/// Uses DashMap for lock-free concurrent access per key.
1351/// This eliminates the global write lock bottleneck.
1352///
1353/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1354/// Maintains a deferred sorted index for efficient scans:
1355/// - Writes: O(1) append to hot buffer
1356/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1357pub struct MvccMemTable {
1358    /// Key -> VersionChain (sharded for concurrent access)
1359    data: DashMap<Vec<u8>, VersionChain>,
1360    /// Deferred sorted index for efficient prefix/range scans (optional)
1361    /// O(1) insert to hot buffer, O(N log N) sort on first scan
1362    /// When None, scan_prefix will fall back to O(N) DashMap iteration
1363    deferred_index: Option<DeferredSortedIndex>,
1364    /// Legacy SkipMap for compatibility (used when deferred=false)
1365    ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1366    /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1367    #[allow(dead_code)]
1368    use_deferred: bool,
1369    /// Approximate size in bytes
1370    size_bytes: AtomicU64,
1371    /// Epoch-based dirty list for efficient GC
1372    dirty_list: EpochDirtyList,
1373}
1374
1375impl Default for MvccMemTable {
1376    fn default() -> Self {
1377        Self::new()
1378    }
1379}
1380
1381impl MvccMemTable {
1382    pub fn new() -> Self {
1383        Self::with_ordered_index(true)
1384    }
1385
1386    /// Create memtable with optional ordered index
1387    ///
1388    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1389    /// but scan_prefix becomes O(N) instead of O(log N + K)
1390    ///
1391    /// Uses deferred sorting by default for better write performance:
1392    /// - Writes: O(1) append to hot buffer
1393    /// - Scans: O(N log N) sort-on-demand
1394    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1395        Self::with_index_mode(enable_ordered_index, true)
1396    }
1397
1398    /// Create memtable with fine-grained control over indexing
1399    ///
1400    /// # Arguments
1401    /// * `enable_ordered_index` - Whether to maintain an ordered index
1402    /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1403    ///                    If false, use SkipMap (O(log N) writes)
1404    pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1405        Self {
1406            data: DashMap::new(),
1407            deferred_index: if enable_ordered_index && use_deferred {
1408                Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1409                    max_unsorted_entries: 10_000, // Compact every 10K writes
1410                    enabled: true,
1411                }))
1412            } else {
1413                None
1414            },
1415            ordered_index: if enable_ordered_index && !use_deferred {
1416                Some(SkipMap::new())
1417            } else {
1418                None
1419            },
1420            use_deferred,
1421            size_bytes: AtomicU64::new(0),
1422            dirty_list: EpochDirtyList::new(),
1423        }
1424    }
1425
1426    /// Write a key-value pair (uncommitted)
1427    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1428        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1429        let key_len = key.len();
1430
1431        // Track this key in the current epoch's dirty list for GC
1432        self.dirty_list.record_version(key.clone());
1433
1434        // Insert into ordered index for prefix scans (if enabled)
1435        // Deferred: O(1) append to hot buffer
1436        // SkipMap: O(log N) insert
1437        if let Some(ref idx) = self.deferred_index {
1438            idx.insert(key.clone());
1439        } else if let Some(ref idx) = self.ordered_index {
1440            idx.insert(key.clone(), ());
1441        }
1442
1443        // Use entry API for atomic get-or-insert
1444        let mut entry = self.data.entry(key).or_default();
1445
1446        // Check for write-write conflict
1447        if entry.has_write_conflict(txn_id) {
1448            return Err(SochDBError::Internal(
1449                "Write-write conflict detected".into(),
1450            ));
1451        }
1452        entry.add_uncommitted(value, txn_id);
1453        self.size_bytes
1454            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1455
1456        Ok(())
1457    }
1458
1459    /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1460    ///
1461    /// Optimizations applied (Rec 3: MVCC Batching):
1462    /// - Batched dirty list tracking: single lock acquire for all keys
1463    /// - Deferred index: O(1) append per key
1464    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1465        let mut total_size = 0u64;
1466
1467        // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1468        self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1469
1470        for (key, value) in writes {
1471            // Insert into ordered index (if enabled)
1472            // Deferred: O(1) append, SkipMap: O(log N)
1473            if let Some(ref idx) = self.deferred_index {
1474                idx.insert(key.clone());
1475            } else if let Some(ref idx) = self.ordered_index {
1476                idx.insert(key.clone(), ());
1477            }
1478
1479            let mut entry = self.data.entry(key.clone()).or_default();
1480
1481            if entry.has_write_conflict(txn_id) {
1482                return Err(SochDBError::Internal(
1483                    "Write-write conflict detected".into(),
1484                ));
1485            }
1486
1487            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1488            entry.add_uncommitted(value.clone(), txn_id);
1489            total_size += (key.len() + value_size) as u64;
1490        }
1491
1492        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1493        Ok(())
1494    }
1495
1496    /// Read at snapshot timestamp, with optional current txn to see own writes
1497    pub fn read(
1498        &self,
1499        key: &[u8],
1500        snapshot_ts: u64,
1501        current_txn_id: Option<u64>,
1502    ) -> Option<Vec<u8>> {
1503        self.data.get(key).and_then(|chain| {
1504            chain
1505                .read_at(snapshot_ts, current_txn_id)
1506                .and_then(|v| v.value.clone())
1507        })
1508    }
1509
1510    /// Commit all versions for a transaction
1511    ///
1512    /// Only updates the keys that were written by this transaction (tracked in write_set).
1513    /// Accepts InlineKey for zero-allocation MVCC tracking.
1514    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1515        // Only iterate over keys we know were written - O(k) instead of O(n)
1516        for key in write_set {
1517            if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1518                chain.commit(txn_id, commit_ts);
1519            }
1520        }
1521    }
1522
1523    /// Legacy commit method (iterates all keys) - kept for backward compatibility
1524    #[allow(dead_code)]
1525    pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1526        for mut entry in self.data.iter_mut() {
1527            entry.value_mut().commit(txn_id, commit_ts);
1528        }
1529    }
1530
1531    /// Abort all versions for a transaction
1532    pub fn abort(&self, txn_id: u64) {
1533        for mut entry in self.data.iter_mut() {
1534            entry.value_mut().abort(txn_id);
1535        }
1536    }
1537
1538    /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1539    ///
1540    /// ## Performance
1541    ///
1542    /// When ordered_index is enabled: O(log N + K) complexity
1543    /// - O(log N) to seek to the first key with prefix
1544    /// - O(K) to iterate matching keys
1545    ///
1546    /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1547    /// 
1548    /// ## Optimizations Applied
1549    /// 
1550    /// - Pre-allocates result vector based on expected output size
1551    /// - Uses batch-friendly iteration patterns
1552    /// - Minimizes allocations during iteration
1553    /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1554    pub fn scan_prefix(
1555        &self,
1556        prefix: &[u8],
1557        snapshot_ts: u64,
1558        current_txn_id: Option<u64>,
1559    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1560        // Estimate result size for pre-allocation (use 10% of total as heuristic)
1561        let estimated_size = (self.data.len() / 10).max(64);
1562        let mut results = Vec::with_capacity(estimated_size);
1563
1564        if let Some(ref idx) = self.deferred_index {
1565            // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1566            for key in idx.range_from(prefix) {
1567                // Stop when we've passed the prefix range
1568                if !key.starts_with(prefix) {
1569                    break;
1570                }
1571
1572                // O(1) lookup in DashMap for version chain
1573                if let Some(chain) = self.data.get(&key)
1574                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1575                    && let Some(value) = &v.value
1576                {
1577                    results.push((key, value.clone()));
1578                }
1579            }
1580        } else if let Some(ref idx) = self.ordered_index {
1581            // Fast path: O(log N) seek to first key >= prefix
1582            for entry in idx.range(prefix.to_vec()..) {
1583                let key = entry.key();
1584
1585                // Stop when we've passed the prefix range
1586                if !key.starts_with(prefix) {
1587                    break;
1588                }
1589
1590                // O(1) lookup in DashMap for version chain
1591                if let Some(chain) = self.data.get(key)
1592                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1593                    && let Some(value) = &v.value
1594                {
1595                    results.push((key.clone(), value.clone()));
1596                }
1597            }
1598        } else {
1599            // Fallback: O(N) full DashMap scan when ordered_index is disabled
1600            // Optimized with batch-friendly iteration
1601            for entry in self.data.iter() {
1602                let key = entry.key();
1603                if !key.starts_with(prefix) {
1604                    continue;
1605                }
1606                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1607                    && let Some(value) = &v.value
1608                {
1609                    results.push((key.clone(), value.clone()));
1610                }
1611            }
1612        }
1613
1614        results
1615    }
1616
1617    /// Optimized full scan with batch allocation
1618    /// 
1619    /// For use when scanning entire tables/namespaces.
1620    /// Pre-allocates based on actual data size.
1621    pub fn scan_all(
1622        &self,
1623        snapshot_ts: u64,
1624        current_txn_id: Option<u64>,
1625    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1626        let mut results = Vec::with_capacity(self.data.len());
1627
1628        for entry in self.data.iter() {
1629            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1630                && let Some(value) = &v.value
1631            {
1632                results.push((entry.key().clone(), value.clone()));
1633            }
1634        }
1635
1636        results
1637    }
1638
1639    /// Streaming scan iterator for very large datasets
1640    /// 
1641    /// Returns an iterator that yields (key, value) pairs without
1642    /// materializing the entire result set in memory.
1643    pub fn scan_prefix_iter<'a>(
1644        &'a self,
1645        prefix: &'a [u8],
1646        snapshot_ts: u64,
1647        current_txn_id: Option<u64>,
1648    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1649        self.data.iter().filter_map(move |entry| {
1650            let key = entry.key();
1651            if !key.starts_with(prefix) {
1652                return None;
1653            }
1654            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1655                && let Some(value) = &v.value
1656            {
1657                Some((key.clone(), value.clone()))
1658            } else {
1659                None
1660            }
1661        })
1662    }
1663
1664    /// Scan range
1665    pub fn scan_range(
1666        &self,
1667        start: &[u8],
1668        end: &[u8],
1669        snapshot_ts: u64,
1670        current_txn_id: Option<u64>,
1671    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1672        let mut results = Vec::new();
1673
1674        if let Some(ref idx) = self.deferred_index {
1675            // Deferred index path: sort-on-scan
1676            if end.is_empty() {
1677                for key in idx.range_from(start) {
1678                    if let Some(chain) = self.data.get(&key)
1679                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1680                        && let Some(value) = &v.value
1681                    {
1682                        results.push((key, value.clone()));
1683                    }
1684                }
1685            } else {
1686                for key in idx.range(start, end) {
1687                    if let Some(chain) = self.data.get(&key)
1688                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1689                        && let Some(value) = &v.value
1690                    {
1691                        results.push((key, value.clone()));
1692                    }
1693                }
1694            }
1695        } else if let Some(ref idx) = self.ordered_index {
1696            // Use range scan on SkipMap
1697            if end.is_empty() {
1698                // Unbounded end
1699                for entry in idx.range(start.to_vec()..) {
1700                    let key = entry.key();
1701                    if let Some(chain) = self.data.get(key)
1702                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1703                        && let Some(value) = &v.value
1704                    {
1705                        results.push((key.clone(), value.clone()));
1706                    }
1707                }
1708            } else {
1709                for entry in idx.range(start.to_vec()..end.to_vec()) {
1710                    let key = entry.key();
1711                    if let Some(chain) = self.data.get(key)
1712                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1713                        && let Some(value) = &v.value
1714                    {
1715                        results.push((key.clone(), value.clone()));
1716                    }
1717                }
1718            }
1719        } else {
1720            // Fallback to full scan if no ordered index
1721            for entry in self.data.iter() {
1722                let key = entry.key();
1723
1724                if key.as_slice() < start {
1725                    continue;
1726                }
1727                if !end.is_empty() && key.as_slice() >= end {
1728                    continue;
1729                }
1730
1731                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1732                    && let Some(value) = &v.value
1733                {
1734                    results.push((key.clone(), value.clone()));
1735                }
1736            }
1737        }
1738
1739        results
1740    }
1741
1742    /// Streaming range scan iterator for very large datasets
1743    /// 
1744    /// Returns an iterator that yields (key, value) pairs without
1745    /// materializing the entire result set in memory. Uses the ordered
1746    /// index when available for O(log N + K) complexity.
1747    /// 
1748    /// ## Zero-Allocation Design
1749    /// 
1750    /// While the iterator itself cannot avoid allocations for returned
1751    /// values (since the caller needs ownership), it avoids:
1752    /// - Pre-materializing all results
1753    /// - Intermediate buffers
1754    /// - Repeated key comparisons for already-visited entries
1755    /// 
1756    /// ## Usage
1757    /// 
1758    /// ```ignore
1759    /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1760    ///     // Process each result as it arrives
1761    ///     // Memory usage is O(1) per iteration, not O(N) total
1762    /// }
1763    /// ```
1764    pub fn scan_range_iter<'a>(
1765        &'a self,
1766        start: &'a [u8],
1767        end: &'a [u8],
1768        snapshot_ts: u64,
1769        current_txn_id: Option<u64>,
1770    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1771        // Compact deferred index before scanning if needed
1772        if let Some(ref idx) = self.deferred_index {
1773            idx.compact();
1774        }
1775        
1776        // Use either ordered index or full scan
1777        let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1778        
1779        // Create iterator based on availability of ordered index
1780        ScanRangeIterator {
1781            memtable: self,
1782            start: start.to_vec(),
1783            end: end.to_vec(),
1784            snapshot_ts,
1785            current_txn_id,
1786            use_ordered,
1787            ordered_iter: None,
1788            unordered_iter: None,
1789            initialized: false,
1790        }
1791    }
1792
1793    /// Get approximate size
1794    pub fn size(&self) -> u64 {
1795        self.size_bytes.load(Ordering::Relaxed)
1796    }
1797
1798    /// Garbage collect old versions using epoch-based dirty list
1799    ///
1800    /// O(expired_versions) instead of O(all_versions)
1801    /// Only visits keys that had versions created in the old epoch.
1802    pub fn gc(&self, min_active_ts: u64) -> usize {
1803        // Advance epoch and get the dirty keys from the old epoch
1804        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1805
1806        if dirty_keys.is_empty() {
1807            return 0;
1808        }
1809
1810        let mut gc_count = 0;
1811
1812        // Only visit keys that were modified in the old epoch
1813        // Use a HashSet to deduplicate keys that were written multiple times
1814        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1815
1816        for key in unique_keys {
1817            if let Some(mut entry) = self.data.get_mut(&key) {
1818                let before = entry.value().version_count();
1819                entry.value_mut().gc(min_active_ts);
1820                gc_count += before.saturating_sub(entry.value().version_count());
1821            }
1822        }
1823
1824        gc_count
1825    }
1826
1827    /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1828    #[allow(dead_code)]
1829    pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1830        let mut gc_count = 0;
1831
1832        for mut entry in self.data.iter_mut() {
1833            let before = entry.value().version_count();
1834            entry.value_mut().gc(min_active_ts);
1835            gc_count += before.saturating_sub(entry.value().version_count());
1836        }
1837
1838        gc_count
1839    }
1840}
1841
1842// ============================================================================
1843// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1844// ============================================================================
1845
1846use crate::key_buffer::ArenaKeyHandle;
1847
1848/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1849struct ArenaEpochDirtyList {
1850    epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1851    current_epoch: AtomicU64,
1852}
1853
1854impl ArenaEpochDirtyList {
1855    fn new() -> Self {
1856        Self {
1857            epochs: [
1858                parking_lot::Mutex::new(Vec::new()),
1859                parking_lot::Mutex::new(Vec::new()),
1860                parking_lot::Mutex::new(Vec::new()),
1861                parking_lot::Mutex::new(Vec::new()),
1862            ],
1863            current_epoch: AtomicU64::new(0),
1864        }
1865    }
1866
1867    #[inline]
1868    fn record_version(&self, key: ArenaKeyHandle) {
1869        let epoch = self.current_epoch.load(Ordering::Relaxed);
1870        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1871        self.epochs[idx].lock().push(key);
1872    }
1873
1874    fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1875        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1876        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1877        let mut guard = self.epochs[old_idx].lock();
1878        let keys = std::mem::take(&mut *guard);
1879        (old_epoch, keys)
1880    }
1881}
1882
1883/// Arena-backed MVCC MemTable with optimized key storage
1884///
1885/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1886/// reducing per-write allocations from 3 to 1:
1887///
1888/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1889/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1890///
1891/// ## Performance
1892///
1893/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1894/// by reducing:
1895/// - Heap allocations: 3 → 1 per write
1896/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1897pub struct ArenaMvccMemTable {
1898    /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1899    data: DashMap<ArenaKeyHandle, VersionChain>,
1900    /// Ordered index for prefix scans
1901    ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1902    /// Approximate size in bytes
1903    size_bytes: AtomicU64,
1904    /// Epoch-based dirty list (arena-backed)
1905    dirty_list: ArenaEpochDirtyList,
1906}
1907
1908impl ArenaMvccMemTable {
1909    pub fn new() -> Self {
1910        Self::with_ordered_index(true)
1911    }
1912
1913    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1914        Self {
1915            data: DashMap::new(),
1916            ordered_index: if enable_ordered_index {
1917                Some(SkipMap::new())
1918            } else {
1919                None
1920            },
1921            size_bytes: AtomicU64::new(0),
1922            dirty_list: ArenaEpochDirtyList::new(),
1923        }
1924    }
1925
1926    /// Write a key-value pair using arena key handle
1927    ///
1928    /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
1929    /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
1930    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1931        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1932        let key_len = key.len();
1933
1934        // Create ONE ArenaKeyHandle - this is the only allocation for the key
1935        let key_handle = ArenaKeyHandle::new(key);
1936
1937        // Track in dirty list (O(1) copy of 16-byte handle)
1938        self.dirty_list.record_version(key_handle.clone());
1939
1940        // Insert into ordered index (O(1) copy of 16-byte handle)
1941        if let Some(ref idx) = self.ordered_index {
1942            idx.insert(key_handle.clone(), ());
1943        }
1944
1945        // Use entry API with the handle
1946        let mut entry = self.data.entry(key_handle).or_default();
1947
1948        if entry.has_write_conflict(txn_id) {
1949            return Err(SochDBError::Internal(
1950                "Write-write conflict detected".into(),
1951            ));
1952        }
1953        entry.add_uncommitted(value, txn_id);
1954        self.size_bytes
1955            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1956
1957        Ok(())
1958    }
1959
1960    /// Write batch using arena key handles
1961    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1962        let mut total_size = 0u64;
1963
1964        for (key, value) in writes {
1965            let key_handle = ArenaKeyHandle::new(key);
1966
1967            self.dirty_list.record_version(key_handle.clone());
1968
1969            if let Some(ref idx) = self.ordered_index {
1970                idx.insert(key_handle.clone(), ());
1971            }
1972
1973            let mut entry = self.data.entry(key_handle).or_default();
1974
1975            if entry.has_write_conflict(txn_id) {
1976                return Err(SochDBError::Internal(
1977                    "Write-write conflict detected".into(),
1978                ));
1979            }
1980
1981            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1982            entry.add_uncommitted(value.clone(), txn_id);
1983            total_size += (key.len() + value_size) as u64;
1984        }
1985
1986        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1987        Ok(())
1988    }
1989
1990    /// Read at snapshot timestamp
1991    pub fn read(
1992        &self,
1993        key: &[u8],
1994        snapshot_ts: u64,
1995        current_txn_id: Option<u64>,
1996    ) -> Option<Vec<u8>> {
1997        // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
1998        let key_handle = ArenaKeyHandle::new(key);
1999        self.data.get(&key_handle).and_then(|chain| {
2000            chain
2001                .read_at(snapshot_ts, current_txn_id)
2002                .and_then(|v| v.value.clone())
2003        })
2004    }
2005
2006    /// Commit transaction
2007    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2008        for key in write_set {
2009            let key_handle = ArenaKeyHandle::new(key.as_slice());
2010            if let Some(mut chain) = self.data.get_mut(&key_handle) {
2011                chain.commit(txn_id, commit_ts);
2012            }
2013        }
2014    }
2015
2016    /// Abort transaction
2017    pub fn abort(&self, txn_id: u64) {
2018        for mut entry in self.data.iter_mut() {
2019            entry.value_mut().abort(txn_id);
2020        }
2021    }
2022
2023    /// Scan prefix
2024    pub fn scan_prefix(
2025        &self,
2026        prefix: &[u8],
2027        snapshot_ts: u64,
2028        current_txn_id: Option<u64>,
2029    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2030        let mut results = Vec::new();
2031        let prefix_handle = ArenaKeyHandle::new(prefix);
2032
2033        if let Some(ref idx) = self.ordered_index {
2034            for entry in idx.range(prefix_handle..) {
2035                let key = entry.key();
2036
2037                if !key.as_bytes().starts_with(prefix) {
2038                    break;
2039                }
2040
2041                if let Some(chain) = self.data.get(key)
2042                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2043                    && let Some(value) = &v.value
2044                {
2045                    results.push((key.as_bytes().to_vec(), value.clone()));
2046                }
2047            }
2048        } else {
2049            for entry in self.data.iter() {
2050                let key = entry.key();
2051                if !key.as_bytes().starts_with(prefix) {
2052                    continue;
2053                }
2054                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2055                    && let Some(value) = &v.value
2056                {
2057                    results.push((key.as_bytes().to_vec(), value.clone()));
2058                }
2059            }
2060        }
2061
2062        results
2063    }
2064
2065    /// Get approximate size
2066    pub fn size(&self) -> u64 {
2067        self.size_bytes.load(Ordering::Relaxed)
2068    }
2069
2070    /// Garbage collect old versions
2071    pub fn gc(&self, min_active_ts: u64) -> usize {
2072        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2073
2074        if dirty_keys.is_empty() {
2075            return 0;
2076        }
2077
2078        let mut gc_count = 0;
2079        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2080
2081        for key in unique_keys {
2082            if let Some(mut entry) = self.data.get_mut(&key) {
2083                let before = entry.value().version_count();
2084                entry.value_mut().gc(min_active_ts);
2085                gc_count += before.saturating_sub(entry.value().version_count());
2086            }
2087        }
2088
2089        gc_count
2090    }
2091}
2092
2093impl Default for ArenaMvccMemTable {
2094    fn default() -> Self {
2095        Self::new()
2096    }
2097}
2098
2099// ============================================================================
2100// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2101// ============================================================================
2102
2103/// Configuration for memtable type selection
2104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2105pub enum MemTableType {
2106    /// Standard MVCC memtable with deferred sorting
2107    /// Best for: general workloads, balanced read/write
2108    Standard,
2109    /// Arena-backed memtable with reduced allocations
2110    /// Best for: write-heavy workloads, large keys
2111    Arena,
2112}
2113
2114impl Default for MemTableType {
2115    fn default() -> Self {
2116        // Default to Standard which now has deferred sorting
2117        MemTableType::Standard
2118    }
2119}
2120
2121/// Unified memtable abstraction using enum dispatch
2122///
2123/// This pattern provides:
2124/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2125/// - Type-safe switching between implementations
2126/// - Easy extensibility for future memtable types
2127///
2128/// ## Why Enum over Trait Object?
2129///
2130/// - Hot path performance: enum match is a single branch vs vtable indirection
2131/// - Cache friendliness: no pointer chasing
2132/// - Inlining: compiler can inline through enum dispatch
2133pub enum MemTableKind {
2134    Standard(MvccMemTable),
2135    Arena(ArenaMvccMemTable),
2136}
2137
2138impl MemTableKind {
2139    /// Create a new memtable of the specified type
2140    pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2141        match kind {
2142            MemTableType::Standard => {
2143                MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2144            }
2145            MemTableType::Arena => {
2146                MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2147            }
2148        }
2149    }
2150
2151    /// Write a key-value pair
2152    #[inline]
2153    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2154        match self {
2155            MemTableKind::Standard(m) => m.write(key, value, txn_id),
2156            MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2157        }
2158    }
2159
2160    /// Write batch of key-value pairs
2161    #[inline]
2162    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2163        match self {
2164            MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2165            MemTableKind::Arena(m) => {
2166                // Convert to arena-compatible format
2167                let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2168                    .iter()
2169                    .map(|(k, v)| (k.as_slice(), v.clone()))
2170                    .collect();
2171                m.write_batch(&arena_writes, txn_id)
2172            }
2173        }
2174    }
2175
2176    /// Read at snapshot timestamp
2177    #[inline]
2178    pub fn read(
2179        &self,
2180        key: &[u8],
2181        snapshot_ts: u64,
2182        current_txn_id: Option<u64>,
2183    ) -> Option<Vec<u8>> {
2184        match self {
2185            MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2186            MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2187        }
2188    }
2189
2190    /// Commit transaction
2191    #[inline]
2192    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2193        match self {
2194            MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2195            MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2196        }
2197    }
2198
2199    /// Abort transaction
2200    #[inline]
2201    pub fn abort(&self, txn_id: u64) {
2202        match self {
2203            MemTableKind::Standard(m) => m.abort(txn_id),
2204            MemTableKind::Arena(m) => m.abort(txn_id),
2205        }
2206    }
2207
2208    /// Scan prefix
2209    #[inline]
2210    pub fn scan_prefix(
2211        &self,
2212        prefix: &[u8],
2213        snapshot_ts: u64,
2214        current_txn_id: Option<u64>,
2215    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2216        match self {
2217            MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2218            MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2219        }
2220    }
2221
2222    /// Scan range
2223    #[inline]
2224    pub fn scan_range(
2225        &self,
2226        start: &[u8],
2227        end: &[u8],
2228        snapshot_ts: u64,
2229        current_txn_id: Option<u64>,
2230    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2231        match self {
2232            MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2233            MemTableKind::Arena(m) => {
2234                // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2235                let mut results = Vec::new();
2236                if let Some(ref idx) = m.ordered_index {
2237                    let start_handle = ArenaKeyHandle::new(start);
2238                    let end_handle = ArenaKeyHandle::new(end);
2239                    
2240                    if end.is_empty() {
2241                        for entry in idx.range(start_handle..) {
2242                            let key = entry.key();
2243                            if let Some(chain) = m.data.get(key)
2244                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2245                                && let Some(value) = &v.value
2246                            {
2247                                results.push((key.as_bytes().to_vec(), value.clone()));
2248                            }
2249                        }
2250                    } else {
2251                        for entry in idx.range(start_handle..end_handle) {
2252                            let key = entry.key();
2253                            if let Some(chain) = m.data.get(key)
2254                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2255                                && let Some(value) = &v.value
2256                            {
2257                                results.push((key.as_bytes().to_vec(), value.clone()));
2258                            }
2259                        }
2260                    }
2261                } else {
2262                    for entry in m.data.iter() {
2263                        let key = entry.key();
2264                        let key_bytes = key.as_bytes();
2265                        if key_bytes < start {
2266                            continue;
2267                        }
2268                        if !end.is_empty() && key_bytes >= end {
2269                            continue;
2270                        }
2271                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2272                            && let Some(value) = &v.value
2273                        {
2274                            results.push((key_bytes.to_vec(), value.clone()));
2275                        }
2276                    }
2277                }
2278                results
2279            }
2280        }
2281    }
2282
2283    /// Scan range iterator (returns collected results for now)
2284    #[inline]
2285    pub fn scan_range_iter<'a>(
2286        &'a self,
2287        start: &'a [u8],
2288        end: &'a [u8],
2289        snapshot_ts: u64,
2290        current_txn_id: Option<u64>,
2291    ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2292        match self {
2293            MemTableKind::Standard(m) => {
2294                Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2295            }
2296            MemTableKind::Arena(_) => {
2297                // Arena version returns collected results as iterator
2298                let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2299                Box::new(results.into_iter())
2300            }
2301        }
2302    }
2303
2304    /// Get approximate size
2305    #[inline]
2306    pub fn size(&self) -> u64 {
2307        match self {
2308            MemTableKind::Standard(m) => m.size(),
2309            MemTableKind::Arena(m) => m.size(),
2310        }
2311    }
2312
2313    /// Garbage collect old versions
2314    #[inline]
2315    pub fn gc(&self, min_active_ts: u64) -> usize {
2316        match self {
2317            MemTableKind::Standard(m) => m.gc(min_active_ts),
2318            MemTableKind::Arena(m) => m.gc(min_active_ts),
2319        }
2320    }
2321
2322    /// Get the kind of memtable
2323    pub fn kind(&self) -> MemTableType {
2324        match self {
2325            MemTableKind::Standard(_) => MemTableType::Standard,
2326            MemTableKind::Arena(_) => MemTableType::Arena,
2327        }
2328    }
2329}
2330
2331/// Durable storage engine with full ACID support
2332pub struct DurableStorage {
2333    /// Path to storage directory
2334    path: PathBuf,
2335    /// Write-ahead log
2336    wal: Arc<TxnWal>,
2337    /// MVCC manager
2338    mvcc: Arc<MvccManager>,
2339    /// In-memory data (unified abstraction over Standard/Arena)
2340    memtable: Arc<MemTableKind>,
2341    /// Per-transaction WAL buffers for batched writes
2342    /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2343    /// At commit, buffer is flushed to WAL with single lock acquisition
2344    txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2345    /// Group commit buffer (optional)
2346    group_commit: Option<Arc<EventDrivenGroupCommit>>,
2347    /// Recovery state
2348    needs_recovery: AtomicU64, // 1 = needs recovery
2349    /// Last checkpoint LSN
2350    last_checkpoint_lsn: AtomicU64,
2351    /// Synchronous mode (like SQLite's PRAGMA synchronous)
2352    /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2353    sync_mode: AtomicU64,
2354    /// Commits since last sync (for NORMAL mode)
2355    commits_since_sync: AtomicU64,
2356    /// Adaptive batch sizing for NORMAL mode (Little's Law)
2357    /// Arrival rate in requests/sec × 1000 for precision
2358    arrival_rate_ema: AtomicU64,
2359    /// Last commit timestamp in microseconds
2360    last_commit_us: AtomicU64,
2361    /// Estimated fsync latency in microseconds
2362    fsync_latency_us: AtomicU64,
2363    /// Database lock for exclusive access (None = no locking)
2364    #[allow(dead_code)]
2365    db_lock: Option<crate::lock::DatabaseLock>,
2366}
2367
2368impl DurableStorage {
2369    /// Open or create durable storage at path
2370    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2371        Self::open_with_config(path, true)
2372    }
2373
2374    /// Open with configurable ordered index
2375    ///
2376    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2377    /// but scan_prefix becomes O(N) instead of O(log N + K)
2378    pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2379        Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2380    }
2381
2382    /// Open with arena-backed memtable for write-heavy workloads
2383    ///
2384    /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2385    /// Best for workloads with:
2386    /// - High write throughput
2387    /// - Large keys (reduces allocation overhead)
2388    /// - Minimal concurrent reads during writes
2389    pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2390        Self::open_with_full_config(path, true, MemTableType::Arena)
2391    }
2392
2393    /// Open with full configuration options
2394    ///
2395    /// # Arguments
2396    /// * `path` - Storage directory path
2397    /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2398    /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2399    ///
2400    /// # Locking
2401    ///
2402    /// Acquires an exclusive advisory lock on the database directory.
2403    /// This prevents concurrent multi-process access which would corrupt data.
2404    /// If another process has the database open, returns `Err(DatabaseLocked)`.
2405    pub fn open_with_full_config<P: AsRef<Path>>(
2406        path: P,
2407        enable_ordered_index: bool,
2408        memtable_type: MemTableType,
2409    ) -> Result<Self> {
2410        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2411    }
2412
2413    /// Open without locking (for testing crash recovery scenarios)
2414    ///
2415    /// # Safety
2416    /// This should ONLY be used in tests that simulate crashes by forgetting
2417    /// the storage instance. In production, always use `open_with_full_config`.
2418    #[cfg(test)]
2419    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2420        Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2421    }
2422
2423    fn open_with_full_config_internal<P: AsRef<Path>>(
2424        path: P,
2425        enable_ordered_index: bool,
2426        memtable_type: MemTableType,
2427        acquire_lock: bool,
2428    ) -> Result<Self> {
2429        let path = path.as_ref().to_path_buf();
2430        std::fs::create_dir_all(&path)?;
2431
2432        // Acquire exclusive lock on database directory (unless disabled for testing)
2433        let db_lock = if acquire_lock {
2434            Some(crate::lock::DatabaseLock::acquire(&path)
2435                .map_err(|e| SochDBError::LockError(e.to_string()))?)
2436        } else {
2437            None
2438        };
2439
2440        let wal_path = path.join("wal.log");
2441        let wal = Arc::new(TxnWal::new(&wal_path)?);
2442
2443        let storage = Self {
2444            path,
2445            wal: wal.clone(),
2446            mvcc: Arc::new(MvccManager::new()),
2447            memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2448            txn_write_buffers: DashMap::new(),
2449            group_commit: None,
2450            needs_recovery: AtomicU64::new(0),
2451            last_checkpoint_lsn: AtomicU64::new(0),
2452            sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2453            commits_since_sync: AtomicU64::new(0),
2454            // Adaptive batch sizing (Little's Law)
2455            arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2456            last_commit_us: AtomicU64::new(0),
2457            fsync_latency_us: AtomicU64::new(5000), // 5ms default
2458            db_lock,
2459        };
2460
2461        // Check if recovery needed
2462        if storage.check_recovery_needed()? {
2463            storage.needs_recovery.store(1, Ordering::SeqCst);
2464        }
2465
2466        Ok(storage)
2467    }
2468
2469    /// Open with group commit enabled
2470    pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2471        Self::open_with_group_commit_and_config(path, true)
2472    }
2473
2474    /// Open with group commit and configurable ordered index
2475    pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2476        path: P,
2477        enable_ordered_index: bool,
2478    ) -> Result<Self> {
2479        let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2480
2481        let wal = storage.wal.clone();
2482        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2483            // Write all commit records WITHOUT flushing (batch them)
2484            for &txn_id in txn_ids {
2485                let entry = TxnWalEntry::txn_commit(txn_id);
2486                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2487            }
2488
2489            // Then do a SINGLE flush + fsync for the entire batch
2490            wal.flush().map_err(|e| e.to_string())?;
2491            wal.sync().map_err(|e| e.to_string())?;
2492
2493            // Return commit timestamp
2494            Ok(std::time::SystemTime::now()
2495                .duration_since(std::time::UNIX_EPOCH)
2496                .unwrap()
2497                .as_micros() as u64)
2498        });
2499
2500        storage.group_commit = Some(Arc::new(gc));
2501        Ok(storage)
2502    }
2503
2504    /// Open with IndexPolicy for automatic memtable/index configuration
2505    ///
2506    /// This is the recommended constructor for new code. The policy determines:
2507    /// - Whether to use ordered index (ScanOptimized only)
2508    /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2509    /// - Default settings optimized for the workload pattern
2510    ///
2511    /// # Arguments
2512    /// * `path` - Storage directory path
2513    /// * `policy` - Index policy determining write/scan tradeoffs
2514    /// * `group_commit` - Whether to enable group commit for throughput
2515    pub fn open_with_policy<P: AsRef<Path>>(
2516        path: P,
2517        policy: crate::index_policy::IndexPolicy,
2518        group_commit: bool,
2519    ) -> Result<Self> {
2520        use crate::index_policy::IndexPolicy;
2521        
2522        // Derive configuration from policy
2523        let (enable_ordered_index, memtable_type) = match policy {
2524            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2525                // Write-heavy: no ordered index, use arena for reduced allocations
2526                (false, MemTableType::Arena)
2527            }
2528            IndexPolicy::Balanced => {
2529                // Mixed OLTP: deferred sorting (already implemented in Standard)
2530                (true, MemTableType::Standard)
2531            }
2532            IndexPolicy::ScanOptimized => {
2533                // Scan-heavy: maintain ordered index
2534                (true, MemTableType::Standard)
2535            }
2536        };
2537
2538        if group_commit {
2539            let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2540            
2541            let wal = storage.wal.clone();
2542            let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2543                for &txn_id in txn_ids {
2544                    let entry = TxnWalEntry::txn_commit(txn_id);
2545                    wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2546                }
2547                wal.flush().map_err(|e| e.to_string())?;
2548                wal.sync().map_err(|e| e.to_string())?;
2549                Ok(std::time::SystemTime::now()
2550                    .duration_since(std::time::UNIX_EPOCH)
2551                    .unwrap()
2552                    .as_micros() as u64)
2553            });
2554            storage.group_commit = Some(Arc::new(gc));
2555            Ok(storage)
2556        } else {
2557            Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2558        }
2559    }
2560
2561    /// Get the memtable type being used
2562    pub fn memtable_type(&self) -> MemTableType {
2563        self.memtable.kind()
2564    }
2565
2566    /// Check if recovery is needed (dirty shutdown detection)
2567    ///
2568    /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2569    /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2570    /// but committed data still needs to be loaded from WAL into the memtable.
2571    fn check_recovery_needed(&self) -> Result<bool> {
2572        let marker_path = self.path.join(".clean_shutdown");
2573        if marker_path.exists() {
2574            // Clean shutdown - remove marker
2575            std::fs::remove_file(&marker_path)?;
2576        }
2577        // ALWAYS need recovery to rebuild memtable from WAL
2578        // The memtable is in-memory only and needs to be restored on every startup
2579        Ok(true)
2580    }
2581
2582    /// Perform crash recovery
2583    pub fn recover(&self) -> Result<RecoveryStats> {
2584        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2585            return Ok(RecoveryStats::default());
2586        }
2587
2588        let (writes, txn_count) = self.wal.replay_for_recovery()?;
2589
2590        // Apply committed writes to memtable
2591        let recovery_txn_id = self.wal.alloc_txn_id();
2592        let commit_ts = self.mvcc.alloc_commit_ts();
2593
2594        // Collect keys being written for efficient commit
2595        let mut write_set: HashSet<InlineKey> = HashSet::new();
2596        for (key, value) in &writes {
2597            write_set.insert(SmallVec::from_slice(key));
2598            self.memtable
2599                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2600        }
2601        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2602
2603        self.needs_recovery.store(0, Ordering::SeqCst);
2604
2605        Ok(RecoveryStats {
2606            transactions_recovered: txn_count,
2607            writes_recovered: writes.len(),
2608            commit_ts,
2609        })
2610    }
2611
2612    /// Begin a new transaction
2613    pub fn begin_transaction(&self) -> Result<u64> {
2614        let txn_id = self.wal.begin_transaction()?;
2615        self.mvcc.begin(txn_id);
2616        Ok(txn_id)
2617    }
2618
2619    /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2620    ///
2621    /// This enables mode-aware optimizations:
2622    /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2623    /// - WriteOnly: Skip read tracking, faster bulk inserts
2624    /// - ReadWrite: Full SSI for serializable isolation
2625    pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2626        let txn_id = self.wal.begin_transaction()?;
2627        self.mvcc.begin_with_mode(txn_id, mode);
2628        Ok(txn_id)
2629    }
2630
2631    /// Read a key within a transaction
2632    #[inline]
2633    pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2634        // Fast path: get just snapshot_ts without cloning whole transaction
2635        let snapshot_ts = self
2636            .mvcc
2637            .get_snapshot_ts(txn_id)
2638            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2639
2640        // Record read for SSI (skipped for read-only transactions)
2641        self.mvcc.record_read(txn_id, key);
2642
2643        // Read at snapshot timestamp, seeing own uncommitted writes
2644        Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2645    }
2646
2647    /// Write a key-value pair within a transaction
2648    ///
2649    /// Writes are buffered and only flushed to disk on commit.
2650    /// This provides ~10× better throughput for batched inserts.
2651    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2652        // Use the zero-allocation path internally
2653        self.write_refs(txn_id, &key, &value)?;
2654
2655        Ok(())
2656    }
2657
2658    /// Write from references - zero allocation hot path
2659    ///
2660    /// Avoids cloning key/value by writing to WAL from refs directly,
2661    /// then only allocating once for memtable storage.
2662    #[inline]
2663    pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2664        // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2665        self.mvcc.record_write(txn_id, key);
2666
2667        // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2668        // This reduces lock contention from O(writes) to O(1) per transaction
2669        self.txn_write_buffers
2670            .entry(txn_id)
2671            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2672            .append(key, value);
2673
2674        // Write to memtable (needs owned key/value for storage)
2675        self.memtable
2676            .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2677
2678        Ok(())
2679    }
2680
2681    /// Delete a key within a transaction
2682    pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2683        // Record write (uses InlineKey - zero allocation for small keys)
2684        self.mvcc.record_write(txn_id, &key);
2685
2686        // Buffer tombstone in memory - NO WAL lock taken!
2687        self.txn_write_buffers
2688            .entry(txn_id)
2689            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2690            .append(&key, &[]); // Empty value = tombstone
2691
2692        // Write tombstone to memtable
2693        self.memtable.write(key, None, txn_id)?;
2694
2695        Ok(())
2696    }
2697
2698    /// Batch write multiple key-value pairs with reduced overhead
2699    ///
2700    /// This API amortizes fixed costs over the batch:
2701    /// - Single DashMap entry lookup for TxnWalBuffer
2702    /// - Single MVCC write set update
2703    /// - Batch memtable operations
2704    ///
2705    /// Performance: ~2-3x faster than individual write_refs calls
2706    /// for batches of 100+ entries.
2707    ///
2708    /// # Arguments
2709    /// * `txn_id` - Transaction ID
2710    /// * `writes` - Slice of (key, value) pairs
2711    #[inline]
2712    pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2713        if writes.is_empty() {
2714            return Ok(());
2715        }
2716
2717        // Single DashMap access for entire batch
2718        let mut buffer_entry = self
2719            .txn_write_buffers
2720            .entry(txn_id)
2721            .or_insert_with(|| TxnWalBuffer::new(txn_id));
2722
2723        // Batch operations with reduced per-row overhead
2724        for (key, value) in writes {
2725            // Record write for MVCC
2726            self.mvcc.record_write(txn_id, key);
2727
2728            // Append to WAL buffer
2729            buffer_entry.append(key, value);
2730        }
2731        drop(buffer_entry);
2732
2733        // Batch write to memtable
2734        let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2735            .iter()
2736            .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2737            .collect();
2738        self.memtable.write_batch(&owned_writes, txn_id)?;
2739
2740        Ok(())
2741    }
2742
2743    /// Commit a transaction
2744    ///
2745    /// With sync_mode:
2746    /// - 0 (OFF): No sync, risk of data loss
2747    /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2748    /// - 2 (FULL): Sync every commit (safest, slowest)
2749    pub fn commit(&self, txn_id: u64) -> Result<u64> {
2750        // First, flush all buffered writes to WAL with SINGLE lock acquisition
2751        // This is the key optimization: O(1) lock instead of O(writes) locks
2752        if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2753            && !buffer.is_empty()
2754        {
2755            // Flush entire buffer to WAL with one lock
2756            self.wal.flush_buffer(&buffer)?;
2757        }
2758
2759        // Use group commit if available, otherwise direct commit
2760        if let Some(gc) = &self.group_commit {
2761            // Submit to group commit and wait for result
2762            // This batches multiple commits into a single fsync
2763            gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
2764
2765            // Get commit timestamp and write_set from MVCC
2766            let (commit_ts, write_set) = self
2767                .mvcc
2768                .commit(txn_id)
2769                .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2770
2771            // Commit in memtable (O(k) where k = keys written)
2772            self.memtable.commit(txn_id, commit_ts, &write_set);
2773
2774            Ok(commit_ts)
2775        } else {
2776            // Direct commit path with adaptive sync (Little's Law)
2777            let sync_mode = self.sync_mode.load(Ordering::Relaxed);
2778            let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
2779
2780            // Update arrival rate for adaptive batching
2781            self.update_arrival_rate();
2782
2783            // Write commit record (no flush yet - BufWriter will buffer it)
2784            let entry = TxnWalEntry::txn_commit(txn_id);
2785            self.wal.append_no_flush(&entry)?;
2786
2787            // Determine if we should sync/flush based on mode
2788            let should_sync = match sync_mode {
2789                0 => false,                                      // OFF: never sync
2790                1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
2791                _ => true,                                       // FULL: always sync
2792            };
2793
2794            if should_sync {
2795                // Measure fsync latency for adaptive tuning
2796                let start = std::time::Instant::now();
2797                self.wal.flush()?;
2798                self.wal.sync()?;
2799                let latency_us = start.elapsed().as_micros() as u64;
2800
2801                // Update fsync latency estimate (EMA with α = 0.1)
2802                let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
2803                let new_latency = (old_latency * 9 + latency_us) / 10;
2804                self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
2805
2806                self.commits_since_sync.store(0, Ordering::Relaxed);
2807            }
2808
2809            // Get commit timestamp and write_set from MVCC
2810            let (commit_ts, write_set) = self
2811                .mvcc
2812                .commit(txn_id)
2813                .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2814
2815            // Commit in memtable (O(k) where k = keys written)
2816            self.memtable.commit(txn_id, commit_ts, &write_set);
2817
2818            Ok(commit_ts)
2819        }
2820    }
2821
2822    /// Update arrival rate using exponential moving average
2823    #[inline]
2824    fn update_arrival_rate(&self) {
2825        let now_us = std::time::SystemTime::now()
2826            .duration_since(std::time::UNIX_EPOCH)
2827            .unwrap()
2828            .as_micros() as u64;
2829
2830        let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
2831
2832        if last > 0 {
2833            let delta_us = now_us.saturating_sub(last);
2834            if delta_us > 0 && delta_us < 10_000_000 {
2835                // Ignore gaps > 10s
2836                // Rate = 1_000_000 / delta_us (requests/sec)
2837                // Stored as rate × 1000 for precision
2838                let instant_rate = 1_000_000_000 / delta_us;
2839
2840                // EMA with α = 0.1
2841                let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
2842                let new_rate = (old_rate * 9 + instant_rate) / 10;
2843                self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
2844            }
2845        }
2846    }
2847
2848    /// Compute optimal batch threshold using Little's Law
2849    ///
2850    /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
2851    /// Returns the number of commits to batch before fsync
2852    #[inline]
2853    fn adaptive_batch_threshold(&self) -> u64 {
2854        let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
2855        let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
2856
2857        if lambda <= 0.0 || tau <= 0.0 {
2858            return 100; // Fallback to fixed threshold
2859        }
2860
2861        // Little's Law: W* = sqrt(2 × τ × λ)
2862        // This minimizes total time = wait_time + fsync_overhead
2863        let n_opt = (2.0 * tau * lambda).sqrt();
2864
2865        // Clamp between 1 and 1000
2866        (n_opt as u64).clamp(1, 1000)
2867    }
2868
2869    /// Set synchronous mode
2870    ///
2871    /// - 0: OFF - No fsync (risk of data loss)
2872    /// - 1: NORMAL - Periodic fsync (balanced)
2873    /// - 2: FULL - Fsync every commit (safest)
2874    pub fn set_sync_mode(&self, mode: u64) {
2875        self.sync_mode.store(mode.min(2), Ordering::Relaxed);
2876    }
2877
2878    /// Force a group commit flush (useful for benchmarking or testing)
2879    pub fn flush_group_commit(&self) {
2880        if let Some(gc) = &self.group_commit {
2881            gc.flush_batch();
2882        }
2883    }
2884
2885    /// Abort a transaction
2886    pub fn abort(&self, txn_id: u64) -> Result<()> {
2887        // Discard buffered writes (no need to write to WAL)
2888        self.txn_write_buffers.remove(&txn_id);
2889
2890        // Write abort to WAL
2891        self.wal.abort_transaction(txn_id)?;
2892
2893        // Abort in MVCC
2894        self.mvcc.abort(txn_id);
2895
2896        // Abort in memtable
2897        self.memtable.abort(txn_id);
2898
2899        Ok(())
2900    }
2901
2902    /// Scan keys with prefix
2903    #[inline]
2904    pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2905        // Fast path: get just snapshot_ts without cloning whole transaction
2906        let snapshot_ts = self
2907            .mvcc
2908            .get_snapshot_ts(txn_id)
2909            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2910
2911        // Note: Scan doesn't record individual key reads for SSI (too expensive)
2912        // SSI conflicts are tracked at the prefix level if needed
2913        Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
2914    }
2915
2916    /// Scan keys in range
2917    #[inline]
2918    pub fn scan_range(
2919        &self,
2920        txn_id: u64,
2921        start: &[u8],
2922        end: &[u8],
2923    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2924        let snapshot_ts = self
2925            .mvcc
2926            .get_snapshot_ts(txn_id)
2927            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2928
2929        Ok(self
2930            .memtable
2931            .scan_range(start, end, snapshot_ts, Some(txn_id)))
2932    }
2933
2934    /// Streaming scan for very large result sets
2935    /// 
2936    /// Returns an iterator that yields (key, value) pairs without
2937    /// materializing the entire result set in memory.
2938    #[inline]
2939    pub fn scan_range_iter<'a>(
2940        &'a self,
2941        txn_id: u64,
2942        start: &'a [u8],
2943        end: &'a [u8],
2944    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
2945        let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
2946        self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
2947    }
2948
2949    /// Force fsync to disk
2950    pub fn fsync(&self) -> Result<()> {
2951        self.wal.sync()
2952    }
2953
2954    /// Write checkpoint
2955    pub fn checkpoint(&self) -> Result<u64> {
2956        let txn_id = 0; // System transaction
2957        let entry = TxnWalEntry::checkpoint(txn_id);
2958        let lsn = self.wal.append_sync(&entry)?;
2959        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
2960        Ok(lsn)
2961    }
2962
2963    /// Get storage statistics
2964    pub fn stats(&self) -> StorageStats {
2965        // Get WAL size from the WAL manager
2966        let wal_size = self.wal.size_bytes();
2967
2968        // Get active transaction count from MVCC
2969        let active_txns = self.mvcc.active_transaction_count();
2970
2971        StorageStats {
2972            memtable_size_bytes: self.memtable.size(),
2973            wal_size_bytes: wal_size,
2974            active_transactions: active_txns,
2975            min_active_snapshot: self.mvcc.min_active_snapshot(),
2976            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
2977        }
2978    }
2979
2980    /// Garbage collect old versions
2981    pub fn gc(&self) -> usize {
2982        let min_ts = self.mvcc.min_active_snapshot();
2983        self.memtable.gc(min_ts)
2984    }
2985
2986    /// Clean shutdown
2987    pub fn shutdown(&self) -> Result<()> {
2988        // Sync WAL
2989        self.fsync()?;
2990
2991        // Write clean shutdown marker
2992        let marker_path = self.path.join(".clean_shutdown");
2993        std::fs::write(&marker_path, b"clean")?;
2994
2995        Ok(())
2996    }
2997}
2998
2999impl Drop for DurableStorage {
3000    fn drop(&mut self) {
3001        let _ = self.shutdown();
3002    }
3003}
3004
3005/// Recovery statistics
3006#[derive(Debug, Default)]
3007pub struct RecoveryStats {
3008    pub transactions_recovered: usize,
3009    pub writes_recovered: usize,
3010    pub commit_ts: u64,
3011}
3012
3013/// Storage statistics
3014#[derive(Debug, Default)]
3015pub struct StorageStats {
3016    pub memtable_size_bytes: u64,
3017    pub wal_size_bytes: u64,
3018    pub active_transactions: usize,
3019    pub min_active_snapshot: u64,
3020    pub last_checkpoint_lsn: u64,
3021}
3022
3023#[cfg(test)]
3024mod tests {
3025    use super::*;
3026    use tempfile::tempdir;
3027
3028    #[test]
3029    fn test_basic_transaction() {
3030        let dir = tempdir().unwrap();
3031        let storage = DurableStorage::open(dir.path()).unwrap();
3032
3033        // Begin transaction
3034        let txn_id = storage.begin_transaction().unwrap();
3035
3036        // Write data
3037        storage
3038            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3039            .unwrap();
3040        storage
3041            .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3042            .unwrap();
3043
3044        // Read back (within same transaction)
3045        let v1 = storage.read(txn_id, b"key1").unwrap();
3046        assert_eq!(v1, Some(b"value1".to_vec()));
3047
3048        // Commit
3049        let commit_ts = storage.commit(txn_id).unwrap();
3050        assert!(commit_ts > 0);
3051
3052        // Read in new transaction
3053        let txn2 = storage.begin_transaction().unwrap();
3054        let v1 = storage.read(txn2, b"key1").unwrap();
3055        assert_eq!(v1, Some(b"value1".to_vec()));
3056        storage.abort(txn2).unwrap();
3057    }
3058
3059    #[test]
3060    fn test_snapshot_isolation() {
3061        let dir = tempdir().unwrap();
3062        let storage = DurableStorage::open(dir.path()).unwrap();
3063
3064        // T1: Write initial value
3065        let t1 = storage.begin_transaction().unwrap();
3066        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3067        storage.commit(t1).unwrap();
3068
3069        // T2: Start reading (snapshot at this point)
3070        let t2 = storage.begin_transaction().unwrap();
3071
3072        // T3: Update the value
3073        let t3 = storage.begin_transaction().unwrap();
3074        storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3075        storage.commit(t3).unwrap();
3076
3077        // T2 should still see v1 (snapshot isolation)
3078        let v = storage.read(t2, b"key").unwrap();
3079        assert_eq!(v, Some(b"v1".to_vec()));
3080
3081        // New transaction should see v2
3082        let t4 = storage.begin_transaction().unwrap();
3083        let v = storage.read(t4, b"key").unwrap();
3084        assert_eq!(v, Some(b"v2".to_vec()));
3085
3086        storage.abort(t2).unwrap();
3087        storage.abort(t4).unwrap();
3088    }
3089
3090    #[test]
3091    fn test_abort_transaction() {
3092        let dir = tempdir().unwrap();
3093        let storage = DurableStorage::open(dir.path()).unwrap();
3094
3095        // Write initial value
3096        let t1 = storage.begin_transaction().unwrap();
3097        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3098        storage.commit(t1).unwrap();
3099
3100        // Start transaction that will abort
3101        let t2 = storage.begin_transaction().unwrap();
3102        storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3103        storage.abort(t2).unwrap();
3104
3105        // New transaction should see v1 (aborted changes not visible)
3106        let t3 = storage.begin_transaction().unwrap();
3107        let v = storage.read(t3, b"key").unwrap();
3108        assert_eq!(v, Some(b"v1".to_vec()));
3109        storage.abort(t3).unwrap();
3110    }
3111
3112    #[test]
3113    fn test_crash_recovery() {
3114        let dir = tempdir().unwrap();
3115
3116        // Phase 1: Write data and commit
3117        {
3118            // Use open_without_lock for crash simulation tests
3119            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3120
3121            // Set sync mode to FULL to ensure data is synced before "crash"
3122            storage.set_sync_mode(2); // FULL: sync every commit
3123
3124            let txn = storage.begin_transaction().unwrap();
3125            storage
3126                .write(txn, b"persist".to_vec(), b"data".to_vec())
3127                .unwrap();
3128            storage.commit(txn).unwrap();
3129
3130            // Simulate crash (no clean shutdown)
3131            std::mem::forget(storage);
3132        }
3133
3134        // Phase 2: Reopen and recover
3135        {
3136            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3137            let stats = storage.recover().unwrap();
3138            assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3139
3140            // Data should be recovered
3141            let txn = storage.begin_transaction().unwrap();
3142            let v = storage.read(txn, b"persist").unwrap();
3143            assert_eq!(v, Some(b"data".to_vec()));
3144            storage.abort(txn).unwrap();
3145        }
3146    }
3147
3148    #[test]
3149    fn test_scan_prefix() {
3150        let dir = tempdir().unwrap();
3151        let storage = DurableStorage::open(dir.path()).unwrap();
3152
3153        let txn = storage.begin_transaction().unwrap();
3154        storage
3155            .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3156            .unwrap();
3157        storage
3158            .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3159            .unwrap();
3160        storage
3161            .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3162            .unwrap();
3163        storage.commit(txn).unwrap();
3164
3165        let txn2 = storage.begin_transaction().unwrap();
3166        let users = storage.scan(txn2, b"user:").unwrap();
3167        assert_eq!(users.len(), 2);
3168        storage.abort(txn2).unwrap();
3169    }
3170
3171    #[test]
3172    fn test_delete() {
3173        let dir = tempdir().unwrap();
3174        let storage = DurableStorage::open(dir.path()).unwrap();
3175
3176        // Insert
3177        let t1 = storage.begin_transaction().unwrap();
3178        storage
3179            .write(t1, b"key".to_vec(), b"value".to_vec())
3180            .unwrap();
3181        storage.commit(t1).unwrap();
3182
3183        // Verify exists
3184        let t2 = storage.begin_transaction().unwrap();
3185        assert!(storage.read(t2, b"key").unwrap().is_some());
3186        storage.abort(t2).unwrap();
3187
3188        // Delete
3189        let t3 = storage.begin_transaction().unwrap();
3190        storage.delete(t3, b"key".to_vec()).unwrap();
3191        storage.commit(t3).unwrap();
3192
3193        // Verify deleted
3194        let t4 = storage.begin_transaction().unwrap();
3195        assert!(storage.read(t4, b"key").unwrap().is_none());
3196        storage.abort(t4).unwrap();
3197    }
3198
3199    #[test]
3200    fn test_gc() {
3201        let dir = tempdir().unwrap();
3202        let storage = DurableStorage::open(dir.path()).unwrap();
3203
3204        // Create multiple versions
3205        for i in 0..10 {
3206            let txn = storage.begin_transaction().unwrap();
3207            storage
3208                .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3209                .unwrap();
3210            storage.commit(txn).unwrap();
3211        }
3212
3213        // GC should reclaim old versions
3214        let gc_count = storage.gc();
3215        // At least some versions should be collected
3216        // (exact count depends on implementation)
3217        let _ = gc_count; // gc_count is usize, always >= 0
3218    }
3219
3220    #[test]
3221    fn test_group_commit() {
3222        use std::sync::Arc;
3223        use std::thread;
3224
3225        let dir = tempdir().unwrap();
3226        let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3227
3228        // Spawn multiple threads to commit concurrently
3229        let mut handles = vec![];
3230        for i in 0..4 {
3231            let storage = Arc::clone(&storage);
3232            handles.push(thread::spawn(move || {
3233                let txn = storage.begin_transaction().unwrap();
3234                storage
3235                    .write(
3236                        txn,
3237                        format!("key{}", i).into_bytes(),
3238                        format!("val{}", i).into_bytes(),
3239                    )
3240                    .unwrap();
3241                storage.commit(txn).unwrap()
3242            }));
3243        }
3244
3245        // Wait for all commits
3246        let mut commit_times = vec![];
3247        for h in handles {
3248            commit_times.push(h.join().unwrap());
3249        }
3250
3251        // All commits should succeed
3252        assert!(commit_times.iter().all(|&ts| ts > 0));
3253
3254        // Verify data persisted
3255        let txn = storage.begin_transaction().unwrap();
3256        for i in 0..4 {
3257            let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3258            assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3259        }
3260        storage.abort(txn).unwrap();
3261    }
3262
3263    // ==================== ArenaMvccMemTable Tests ====================
3264
3265    #[test]
3266    fn test_arena_memtable_basic_write_read() {
3267        let memtable = ArenaMvccMemTable::new();
3268
3269        // Write some values
3270        memtable
3271            .write(b"key1", Some(b"value1".to_vec()), 1)
3272            .unwrap();
3273        memtable
3274            .write(b"key2", Some(b"value2".to_vec()), 1)
3275            .unwrap();
3276
3277        // Read them back (uncommitted, so need txn_id match)
3278        assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3279        assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3280        assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3281    }
3282
3283    #[test]
3284    fn test_arena_memtable_update() {
3285        let memtable = ArenaMvccMemTable::new();
3286
3287        memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3288        memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3289
3290        assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3291    }
3292
3293    #[test]
3294    fn test_arena_memtable_delete() {
3295        let memtable = ArenaMvccMemTable::new();
3296
3297        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3298        memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3299
3300        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3301    }
3302
3303    #[test]
3304    fn test_arena_memtable_scan_prefix() {
3305        let memtable = ArenaMvccMemTable::new();
3306
3307        memtable
3308            .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3309            .unwrap();
3310        memtable
3311            .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3312            .unwrap();
3313        memtable
3314            .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3315            .unwrap();
3316        memtable
3317            .write(b"order:1", Some(b"order_data".to_vec()), 1)
3318            .unwrap();
3319
3320        // Create a write set and commit
3321        let mut write_set = HashSet::new();
3322        write_set.insert(InlineKey::from_slice(b"user:1:name"));
3323        write_set.insert(InlineKey::from_slice(b"user:1:email"));
3324        write_set.insert(InlineKey::from_slice(b"user:2:name"));
3325        write_set.insert(InlineKey::from_slice(b"order:1"));
3326        memtable.commit(1, 10, &write_set);
3327
3328        // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3329        let results = memtable.scan_prefix(b"user:1:", 11, None);
3330        assert_eq!(results.len(), 2);
3331
3332        // Scan for all users
3333        let results = memtable.scan_prefix(b"user:", 11, None);
3334        assert_eq!(results.len(), 3);
3335    }
3336
3337    #[test]
3338    fn test_arena_memtable_write_batch() {
3339        let memtable = ArenaMvccMemTable::new();
3340
3341        let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3342            (b"k1", Some(b"v1".to_vec())),
3343            (b"k2", Some(b"v2".to_vec())),
3344            (b"k3", Some(b"v3".to_vec())),
3345        ];
3346
3347        memtable.write_batch(&writes, 1).unwrap();
3348
3349        assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3350        assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3351        assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3352    }
3353
3354    #[test]
3355    fn test_arena_memtable_gc() {
3356        let memtable = ArenaMvccMemTable::new();
3357
3358        // Write multiple versions
3359        for i in 0..10 {
3360            memtable
3361                .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3362                .unwrap();
3363
3364            let mut write_set = HashSet::new();
3365            write_set.insert(InlineKey::from_slice(b"key"));
3366            memtable.commit(i + 1, (i + 1) * 10, &write_set);
3367        }
3368
3369        // GC old versions
3370        let gc_count = memtable.gc(90);
3371        let _ = gc_count; // gc_count is usize, always >= 0
3372    }
3373
3374    #[test]
3375    fn test_arena_memtable_size_tracking() {
3376        let memtable = ArenaMvccMemTable::new();
3377
3378        assert_eq!(memtable.size(), 0);
3379
3380        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3381
3382        assert!(memtable.size() > 0);
3383    }
3384
3385    #[test]
3386    fn test_arena_memtable_abort() {
3387        let memtable = ArenaMvccMemTable::new();
3388
3389        memtable
3390            .write(b"key", Some(b"uncommitted".to_vec()), 1)
3391            .unwrap();
3392
3393        // Visible to same txn
3394        assert_eq!(
3395            memtable.read(b"key", 0, Some(1)),
3396            Some(b"uncommitted".to_vec())
3397        );
3398
3399        // Not visible to other txns
3400        assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3401
3402        // Abort
3403        memtable.abort(1);
3404
3405        // No longer visible
3406        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3407    }
3408
3409    // ========================================================================
3410    // MemTableKind Tests - Unified Abstraction
3411    // ========================================================================
3412
3413    #[test]
3414    fn test_memtable_kind_standard() {
3415        let memtable = MemTableKind::new(MemTableType::Standard, true);
3416        assert_eq!(memtable.kind(), MemTableType::Standard);
3417
3418        // Write and read
3419        memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3420        
3421        // Commit transaction at ts=100
3422        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3423        memtable.commit(1, 100, &write_set);
3424        
3425        // Read after commit - snapshot_ts must be > commit_ts for visibility
3426        let v = memtable.read(b"key1", 101, None);
3427        assert_eq!(v, Some(b"value1".to_vec()));
3428    }
3429
3430    #[test]
3431    fn test_memtable_kind_arena() {
3432        let memtable = MemTableKind::new(MemTableType::Arena, true);
3433        assert_eq!(memtable.kind(), MemTableType::Arena);
3434
3435        // Write and read
3436        memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3437        
3438        // Commit at ts=100
3439        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3440        memtable.commit(1, 100, &write_set);
3441        
3442        // Read after commit - snapshot_ts > commit_ts
3443        let v = memtable.read(b"key1", 101, None);
3444        assert_eq!(v, Some(b"value1".to_vec()));
3445    }
3446
3447    #[test]
3448    fn test_memtable_kind_scan_range() {
3449        // Test both implementations have consistent behavior
3450        for kind in [MemTableType::Standard, MemTableType::Arena] {
3451            let memtable = MemTableKind::new(kind, true);
3452
3453            // Write some data
3454            for i in 0..5 {
3455                let key = format!("key{}", i);
3456                let value = format!("value{}", i);
3457                memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
3458            }
3459
3460            // Commit all at ts=100
3461            let write_set: HashSet<InlineKey> = (0..5)
3462                .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3463                .collect();
3464            memtable.commit(1, 100, &write_set);
3465
3466            // Scan range with snapshot_ts > commit_ts
3467            let results = memtable.scan_range(b"key1", b"key4", 101, None);
3468            assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
3469        }
3470    }
3471
3472    #[test]
3473    fn test_durable_storage_arena() {
3474        let dir = tempdir().unwrap();
3475        let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3476        
3477        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3478
3479        // Basic transaction should work the same
3480        let txn_id = storage.begin_transaction().unwrap();
3481        storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
3482        storage.commit(txn_id).unwrap();
3483
3484        let txn2 = storage.begin_transaction().unwrap();
3485        let v = storage.read(txn2, b"key1").unwrap();
3486        assert_eq!(v, Some(b"value1".to_vec()));
3487        storage.abort(txn2).unwrap();
3488    }
3489
3490    #[test]
3491    fn test_durable_storage_full_config() {
3492        let dir = tempdir().unwrap();
3493        
3494        // Test with Arena and ordered index enabled
3495        let storage = DurableStorage::open_with_full_config(
3496            dir.path(),
3497            true,
3498            MemTableType::Arena,
3499        ).unwrap();
3500        
3501        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3502
3503        // Write multiple keys
3504        let txn = storage.begin_transaction().unwrap();
3505        for i in 0..10 {
3506            let key = format!("key{:02}", i);
3507            let value = format!("value{}", i);
3508            storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
3509        }
3510        storage.commit(txn).unwrap();
3511
3512        // Scan should work (uses scan method for prefix)
3513        let txn2 = storage.begin_transaction().unwrap();
3514        let results = storage.scan(txn2, b"key0").unwrap();
3515        assert_eq!(results.len(), 10); // key00 through key09
3516        storage.abort(txn2).unwrap();
3517    }
3518}