Skip to main content

sochdb_storage/
durable_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! This module wires together all storage components into a production-ready
21//! durable storage engine:
22//!
23//! - WAL (txn_wal.rs) for durability
24//! - Group Commit for throughput
25//! - MVCC for isolation
26//! - LSCS for columnar efficiency
27//!
28//! ## Architecture
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │                      DurableStorage                              │
33//! ├─────────────────────────────────────────────────────────────────┤
34//! │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
35//! │  │ MvccManager │    │ GroupCommit │───▶│ TxnWal (fsync)      │ │
36//! │  │             │    │             │    └─────────────────────┘ │
37//! │  │ ┌─────────┐ │    └─────────────┘                            │
38//! │  │ │Snapshots│ │                                                │
39//! │  │ └─────────┘ │    ┌─────────────────────────────────────────┐│
40//! │  │ ┌─────────┐ │    │              MemTable                    ││
41//! │  │ │ Txn Map │ │    │  (key → (value, txn_id, version))       ││
42//! │  │ └─────────┘ │    └─────────────────────────────────────────┘│
43//! │  └─────────────┘                                                │
44//! │                      ┌─────────────────────────────────────────┐│
45//! │                      │              LSCS (SST)                  ││
46//! │                      │  Immutable columnar segments             ││
47//! │                      └─────────────────────────────────────────┘│
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Concurrency
52//!
53//! - Writers: Serialize through WAL, use MVCC for conflict detection
54//! - Readers: Lock-free reads at snapshot timestamp
55//! - Commits: Batched through GroupCommit for throughput
56
57use std::collections::HashSet;
58use std::path::{Path, PathBuf};
59use std::sync::Arc;
60use std::sync::atomic::{AtomicU64, Ordering};
61
62use dashmap::DashMap;
63use smallvec::SmallVec;
64
65use crossbeam_skiplist::SkipMap;
66
67use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
68use crate::group_commit::EventDrivenGroupCommit;
69use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
70use sochdb_core::{Result, SochDBError};
71
72// =============================================================================
73// SSI Bloom Filter - Fast Conflict Pre-Filtering
74// =============================================================================
75
76/// Space-efficient Bloom filter for SSI conflict detection
77///
78/// Used to quickly determine if two transactions MIGHT have conflicting keys.
79/// False positives are acceptable (leads to unnecessary exact checks),
80/// but false negatives are not allowed.
81///
82/// ## Configuration
83///
84/// For 1000 keys with 1% false positive rate:
85/// - m = ~9600 bits ≈ 1.2 KB per transaction
86/// - k = 7 hash functions
87///
88/// ## Lazy Initialization
89///
90/// The bit vector is lazily initialized on first insert to avoid
91/// allocation overhead for read-only transactions.
92#[derive(Clone, Debug)]
93pub struct SsiBloomFilter {
94    /// Bit vector (each u64 holds 64 bits) - lazily initialized
95    bits: Option<Vec<u64>>,
96    /// Expected capacity (used for lazy init sizing)
97    expected_capacity: usize,
98    /// Number of hash functions to use
99    num_hashes: u32,
100}
101
102impl SsiBloomFilter {
103    /// Optimal number of bits per item for 1% false positive rate
104    /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
105    const BITS_PER_ITEM: f64 = 9.6;
106
107    /// Optimal number of hash functions for 1% false positive rate
108    /// k = (m/n) × ln(2) ≈ 7
109    const DEFAULT_NUM_HASHES: u32 = 7;
110
111    /// Minimum capacity to avoid tiny filters
112    const MIN_CAPACITY: usize = 64;
113
114    /// Create a new bloom filter for expected item count (lazy allocation)
115    ///
116    /// Configured for ~1% false positive rate.
117    /// The bit vector is not allocated until first insert.
118    #[inline]
119    pub fn new(expected_items: usize) -> Self {
120        Self {
121            bits: None,
122            expected_capacity: expected_items.max(Self::MIN_CAPACITY),
123            num_hashes: Self::DEFAULT_NUM_HASHES,
124        }
125    }
126
127    /// Create with specific capacity in words (for memory-constrained scenarios)
128    pub fn with_word_capacity(words: usize) -> Self {
129        Self {
130            bits: None,
131            expected_capacity: words.max(1) * 64 / 10, // Approx items from words
132            num_hashes: Self::DEFAULT_NUM_HASHES,
133        }
134    }
135
136    /// Ensure bits are allocated (lazy initialization)
137    #[inline]
138    fn ensure_allocated(&mut self) {
139        if self.bits.is_none() {
140            let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
141            let num_words = num_bits.div_ceil(64);
142            self.bits = Some(vec![0u64; num_words]);
143        }
144    }
145
146    /// Add a key to the filter - O(k) where k = num_hashes
147    #[inline]
148    pub fn insert(&mut self, key: &[u8]) {
149        self.ensure_allocated();
150        let bits = self.bits.as_mut().unwrap();
151        let num_bits = bits.len() * 64;
152        if num_bits == 0 {
153            return;
154        }
155
156        // Use two hash functions to simulate k hash functions
157        // h(i) = h1 + i * h2 (double hashing technique)
158        let h1 = Self::hash1(key);
159        let h2 = Self::hash2(key);
160
161        for i in 0..self.num_hashes {
162            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
163            let bit_idx = (h as usize) % num_bits;
164            let word_idx = bit_idx / 64;
165            let bit_pos = bit_idx % 64;
166            bits[word_idx] |= 1 << bit_pos;
167        }
168    }
169
170    /// Check if a key might be present - O(k)
171    ///
172    /// Returns:
173    /// - false: Key is definitely NOT in the set (or filter not initialized)
174    /// - true: Key MIGHT be in the set (needs exact check)
175    #[inline]
176    pub fn may_contain(&self, key: &[u8]) -> bool {
177        let bits = match &self.bits {
178            Some(b) => b,
179            None => return false, // Uninitialized = empty
180        };
181        let num_bits = bits.len() * 64;
182        if num_bits == 0 {
183            return false;
184        }
185
186        let h1 = Self::hash1(key);
187        let h2 = Self::hash2(key);
188
189        for i in 0..self.num_hashes {
190            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
191            let bit_idx = (h as usize) % num_bits;
192            let word_idx = bit_idx / 64;
193            let bit_pos = bit_idx % 64;
194            if bits[word_idx] & (1 << bit_pos) == 0 {
195                return false; // Definitely not present
196            }
197        }
198        true // Might be present
199    }
200
201    /// Check if this filter might intersect with another
202    ///
203    /// Fast O(m/64) check using bitwise AND of all words.
204    /// If no bits are shared, sets are definitely disjoint.
205    #[inline]
206    pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
207        let (self_bits, other_bits) = match (&self.bits, &other.bits) {
208            (Some(s), Some(o)) => (s, o),
209            _ => return false, // Either uninitialized = no intersection
210        };
211        let min_len = self_bits.len().min(other_bits.len());
212        for i in 0..min_len {
213            if self_bits[i] & other_bits[i] != 0 {
214                return true; // Might intersect
215            }
216        }
217        false // Definitely disjoint
218    }
219
220    /// First hash function (using built-in hasher)
221    #[inline]
222    fn hash1(key: &[u8]) -> u64 {
223        use std::collections::hash_map::DefaultHasher;
224        use std::hash::{Hash, Hasher};
225        let mut hasher = DefaultHasher::new();
226        key.hash(&mut hasher);
227        hasher.finish()
228    }
229
230    /// Second hash function (using twox-hash for independence)
231    #[inline]
232    fn hash2(key: &[u8]) -> u64 {
233        twox_hash::xxh3::hash64(key)
234    }
235
236    /// Get the memory size in bytes
237    pub fn size_bytes(&self) -> usize {
238        self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
239    }
240
241    /// Check if the filter is empty
242    pub fn is_empty(&self) -> bool {
243        match &self.bits {
244            Some(bits) => bits.iter().all(|&w| w == 0),
245            None => true,
246        }
247    }
248}
249
250/// Type alias for inline key storage - keys up to 32 bytes stored on stack
251/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
252pub type InlineKey = SmallVec<[u8; 32]>;
253
254/// Version of a key-value pair
255#[derive(Debug, Clone)]
256pub struct Version {
257    /// The value (None = tombstone)
258    pub value: Option<Vec<u8>>,
259    /// Transaction that created this version
260    pub txn_id: u64,
261    /// Commit timestamp (0 = uncommitted)
262    pub commit_ts: u64,
263}
264
265// ============================================================================
266// Optimized VersionChain with Binary Search (Task 1: mm.md)
267// ============================================================================
268
269/// Multi-version data for a single key with O(log v) read complexity
270///
271/// ## Optimization: Binary Search with Sorted Commit Ordering
272///
273/// Separates committed versions (sorted descending by commit_ts) from 
274/// uncommitted version (single optional slot per transaction).
275///
276/// **Before:** O(v) linear scan + O(v) max computation = O(v)
277/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
278///
279/// For v=10 versions: 3.3x speedup
280/// For v=100 versions: 7x speedup
281#[derive(Debug, Default)]
282pub struct VersionChain {
283    /// Committed versions sorted by commit_ts DESCENDING (newest first)
284    /// This ordering enables efficient binary search using partition_point
285    committed: Vec<Version>,
286    /// Single uncommitted version slot (at most one per transaction writing this key)
287    uncommitted: Option<Version>,
288}
289
290impl VersionChain {
291    /// Create a new empty version chain
292    #[inline]
293    pub fn new() -> Self {
294        Self {
295            committed: Vec::new(),
296            uncommitted: None,
297        }
298    }
299
300    /// Add a new uncommitted version
301    /// If there's already an uncommitted version from this txn, update it in place
302    /// 
303    /// O(1) - just updates the uncommitted slot
304    #[inline]
305    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
306        match &mut self.uncommitted {
307            Some(v) if v.txn_id == txn_id => {
308                // Update in place - O(1)
309                v.value = value;
310            }
311            Some(_) => {
312                // Different transaction - this is a write conflict!
313                // The caller should have checked has_write_conflict first
314                // For safety, we overwrite (this will be caught at commit)
315                self.uncommitted = Some(Version {
316                    value,
317                    txn_id,
318                    commit_ts: 0,
319                });
320            }
321            None => {
322                // New uncommitted version - O(1)
323                self.uncommitted = Some(Version {
324                    value,
325                    txn_id,
326                    commit_ts: 0,
327                });
328            }
329        }
330    }
331
332    /// Commit a version - moves from uncommitted slot to sorted committed list
333    /// 
334    /// O(log v) - inserts into sorted position using binary search
335    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
336        if let Some(ref mut v) = self.uncommitted {
337            if v.txn_id == txn_id && v.commit_ts == 0 {
338                v.commit_ts = commit_ts;
339                let committed_version = self.uncommitted.take().unwrap();
340                
341                // Insert into sorted position (descending by commit_ts)
342                // Use partition_point to find insertion point in O(log v)
343                let insert_pos = self.committed.partition_point(|existing| existing.commit_ts > commit_ts);
344                self.committed.insert(insert_pos, committed_version);
345                
346                return true;
347            }
348        }
349        false
350    }
351
352    /// Abort a version (remove uncommitted version for txn)
353    /// 
354    /// O(1) - just clears the uncommitted slot if it matches
355    #[inline]
356    pub fn abort(&mut self, txn_id: u64) {
357        if let Some(ref v) = self.uncommitted {
358            if v.txn_id == txn_id {
359                self.uncommitted = None;
360            }
361        }
362    }
363
364    /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
365    /// Returns the most recent committed version visible at snapshot_ts,
366    /// or an uncommitted version if it belongs to current_txn_id.
367    ///
368    /// ## Complexity: O(1) + O(log v) = O(log v)
369    ///
370    /// 1. O(1) check for uncommitted version from current transaction
371    /// 2. O(log v) binary search for most recent visible committed version
372    ///
373    /// Snapshot isolation: we see commits with commit_ts < snapshot_ts (strictly less)
374    #[inline]
375    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
376        // O(1): Check uncommitted version from current transaction
377        if let Some(txn_id) = current_txn_id {
378            if let Some(ref v) = self.uncommitted {
379                if v.txn_id == txn_id {
380                    return Some(v);
381                }
382            }
383        }
384
385        // O(log v): Binary search for first version with commit_ts < snapshot_ts
386        // Since committed is sorted descending by commit_ts, we find the first
387        // version where commit_ts < snapshot_ts (the newest visible version)
388        //
389        // partition_point returns the first index where predicate is false
390        // We want first index where commit_ts < snapshot_ts
391        let idx = self.committed.partition_point(|v| v.commit_ts >= snapshot_ts);
392        
393        // The version at idx (if exists) is the newest with commit_ts < snapshot_ts
394        self.committed.get(idx)
395    }
396
397    /// Check if there's an uncommitted version by another transaction
398    /// 
399    /// O(1) - just checks the uncommitted slot
400    #[inline]
401    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
402        if let Some(ref v) = self.uncommitted {
403            return v.txn_id != my_txn_id;
404        }
405        false
406    }
407
408    /// Garbage collect old versions
409    /// 
410    /// Keeps only versions that might be visible to active transactions,
411    /// plus one committed version before min_active_ts for new snapshots.
412    pub fn gc(&mut self, min_active_ts: u64) {
413        // Uncommitted version is always kept (will be committed or aborted)
414        
415        if self.committed.len() <= 1 {
416            return;
417        }
418
419        // Find versions to keep:
420        // 1. All versions with commit_ts > min_active_ts (visible to active txns)
421        // 2. One version with commit_ts <= min_active_ts (newest anchor point)
422        
423        // Since committed is sorted descending, find split point
424        let split_idx = self.committed.partition_point(|v| v.commit_ts > min_active_ts);
425        
426        // Keep all versions before split_idx (commit_ts > min_active_ts)
427        // Plus one version at split_idx if it exists (anchor point)
428        let keep_count = if split_idx < self.committed.len() {
429            split_idx + 1 // Keep one anchor version
430        } else {
431            split_idx
432        };
433        
434        self.committed.truncate(keep_count);
435    }
436
437    /// Get total version count (committed + uncommitted)
438    #[inline]
439    pub fn version_count(&self) -> usize {
440        self.committed.len() + if self.uncommitted.is_some() { 1 } else { 0 }
441    }
442
443    // Legacy compatibility: get versions vec (for tests)
444    #[cfg(test)]
445    pub fn versions(&self) -> Vec<Version> {
446        let mut result = self.committed.clone();
447        if let Some(ref v) = self.uncommitted {
448            result.push(v.clone());
449        }
450        result
451    }
452}
453
454// =============================================================================
455// Pre-sizing Constants to Avoid HashSet Resize Overhead
456// =============================================================================
457
458/// Default capacity for write_set HashSet
459/// Sized for typical OLTP transactions (10-50 keys)
460/// Avoids resize overhead that caused +11% regression
461const WRITE_SET_INITIAL_CAPACITY: usize = 32;
462
463/// Default capacity for read_set HashSet  
464/// Typically larger than write_set due to read-heavy patterns
465const READ_SET_INITIAL_CAPACITY: usize = 64;
466
467/// Transaction state for MVCC
468#[derive(Debug, Clone)]
469pub struct MvccTransaction {
470    /// Transaction ID
471    pub txn_id: u64,
472    /// Snapshot timestamp (reads see commits before this)
473    pub snapshot_ts: u64,
474    /// Keys written by this transaction - uses SmallVec for inline storage
475    /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
476    pub write_set: HashSet<InlineKey>,
477    /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
478    /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
479    pub read_set: HashSet<InlineKey>,
480    /// Bloom filter for write set - fast SSI pre-filtering
481    pub write_bloom: SsiBloomFilter,
482    /// Bloom filter for read set - fast SSI pre-filtering
483    pub read_bloom: SsiBloomFilter,
484    /// Transaction state
485    pub state: TxnState,
486    /// Transaction mode for SSI optimization (Recommendation 9)
487    /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
488    pub mode: TransactionMode,
489}
490
491impl MvccTransaction {
492    /// Create a new transaction with pre-sized collections
493    /// 
494    /// This avoids HashSet resize overhead during the transaction
495    /// which was causing +11% regression on write_set.insert().
496    #[inline]
497    pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
498        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
499    }
500    
501    /// Create a read-only transaction (SSI bypass - 2.6x faster)
502    ///
503    /// Read-only transactions skip all SSI tracking:
504    /// - No read_set allocation
505    /// - No read_bloom allocation  
506    /// - No commit validation
507    ///
508    /// ## Performance
509    /// 
510    /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
511    #[inline]
512    pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
513        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
514    }
515    
516    /// Create a write-only transaction (partial SSI bypass)
517    ///
518    /// Write-only transactions skip read tracking:
519    /// - No read_set tracking
520    /// - No read_bloom inserts
521    /// - Still needs write_set for commit
522    #[inline]
523    pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
524        Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
525    }
526    
527    /// Create transaction with specific mode
528    #[inline]
529    pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
530        // Optimize allocation based on mode
531        let (write_capacity, read_capacity) = match mode {
532            TransactionMode::ReadOnly => (0, 0),  // No tracking needed
533            TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
534            TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
535        };
536        Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
537    }
538
539    /// Create with custom capacities for expected workload
540    /// 
541    /// Use this when you know the transaction will write many keys
542    /// to avoid resize overhead entirely.
543    #[inline]
544    pub fn with_capacity(
545        txn_id: u64,
546        snapshot_ts: u64,
547        write_capacity: usize,
548        read_capacity: usize,
549        mode: TransactionMode,
550    ) -> Self {
551        Self {
552            txn_id,
553            snapshot_ts,
554            write_set: HashSet::with_capacity(write_capacity),
555            read_set: HashSet::with_capacity(read_capacity),
556            write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
557            read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
558            state: TxnState::Active,
559            mode,
560        }
561    }
562
563    /// Check if this is a read-only transaction
564    #[inline]
565    pub fn is_read_only(&self) -> bool {
566        self.write_set.is_empty()
567    }
568
569    /// Check if this is a single-key write transaction
570    #[inline]
571    pub fn is_single_key_write(&self) -> bool {
572        self.write_set.len() == 1 && self.read_set.len() <= 1
573    }
574}
575
576/// Transaction state
577#[derive(Debug, Clone, Copy, PartialEq, Eq)]
578pub enum TxnState {
579    Active,
580    Committed,
581    Aborted,
582}
583
584// =============================================================================
585// Transaction Mode for SSI Bypass (Recommendation 9)
586// =============================================================================
587
588/// Transaction mode for SSI optimization
589///
590/// By classifying transactions at begin time, we can skip expensive SSI
591/// tracking for the majority of transactions:
592///
593/// | Mode      | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
594/// |-----------|-------------------|--------------------|-----------------|
595/// | ReadOnly  | None             | None               | ~10 ns          |
596/// | WriteOnly | None             | Full               | ~30 ns          |
597/// | ReadWrite | Full             | Full               | ~50 ns          |
598///
599/// ## Performance Analysis
600///
601/// For read-only transactions (typically 90% of workload):
602/// ```text
603/// Current:  T_txn = T_begin + N × (T_read + T_record) + T_commit
604///                 = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
605///
606/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
607///                 = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
608///
609/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
610/// ```
611#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
612pub enum TransactionMode {
613    /// Read-only transaction - skips ALL SSI tracking
614    /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
615    /// Safe to skip read_set, read_bloom, and commit validation entirely
616    ReadOnly,
617    
618    /// Write-only transaction - skips read tracking
619    /// Cannot form incoming rw-edges (no reads from concurrent writers)
620    /// Only needs write_set and write_bloom tracking
621    WriteOnly,
622    
623    /// Full read-write transaction (default) - complete SSI tracking
624    /// May form both incoming and outgoing rw-edges
625    /// Requires full validation at commit time
626    #[default]
627    ReadWrite,
628}
629
630impl TransactionMode {
631    /// Check if this mode requires read tracking
632    #[inline]
633    pub fn tracks_reads(&self) -> bool {
634        matches!(self, TransactionMode::ReadWrite)
635    }
636    
637    /// Check if this mode requires write tracking
638    #[inline]
639    pub fn tracks_writes(&self) -> bool {
640        matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
641    }
642    
643    /// Check if commit needs SSI validation
644    #[inline]
645    pub fn needs_ssi_validation(&self) -> bool {
646        matches!(self, TransactionMode::ReadWrite)
647    }
648}
649
650/// SSI conflict edge type
651#[derive(Debug, Clone, Copy, PartialEq, Eq)]
652pub enum ConflictType {
653    /// Read-write conflict: T1 reads X, then T2 writes X
654    ReadWrite,
655    /// Write-read conflict: T1 writes X, then T2 reads X  
656    WriteRead,
657}
658
659/// SSI conflict edge for dangerous structure detection
660#[derive(Debug, Clone)]
661pub struct ConflictEdge {
662    /// Source transaction
663    pub from_txn: u64,
664    /// Target transaction
665    pub to_txn: u64,
666    /// Type of conflict
667    pub conflict_type: ConflictType,
668}
669
670/// MVCC Manager with SSI support
671///
672/// Uses DashMap for lock-free per-transaction access.
673/// Implements Serializable Snapshot Isolation (SSI) with
674/// dangerous structure detection for rw-antidependency cycles.
675#[allow(clippy::type_complexity)]
676pub struct MvccManager {
677    /// Active transactions (sharded for concurrent access)
678    active_txns: DashMap<u64, MvccTransaction>,
679    /// Current timestamp counter
680    ts_counter: AtomicU64,
681    /// Minimum active snapshot timestamp (for GC)
682    min_active_ts: AtomicU64,
683    /// Recently committed transactions for SSI validation
684    /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
685    /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
686    recent_commits: DashMap<
687        u64,
688        (
689            u64,
690            SsiBloomFilter,
691            SsiBloomFilter,
692            HashSet<InlineKey>,
693            HashSet<InlineKey>,
694        ),
695    >,
696    /// Max recent commits to track
697    max_recent_commits: usize,
698}
699
700impl Default for MvccManager {
701    fn default() -> Self {
702        Self::new()
703    }
704}
705
706impl MvccManager {
707    pub fn new() -> Self {
708        Self {
709            active_txns: DashMap::new(),
710            ts_counter: AtomicU64::new(1),
711            min_active_ts: AtomicU64::new(0),
712            recent_commits: DashMap::new(),
713            max_recent_commits: 1000, // Track last 1000 commits for SSI
714        }
715    }
716
717    /// Begin a new transaction with snapshot isolation
718    /// 
719    /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
720    pub fn begin(&self, txn_id: u64) -> MvccTransaction {
721        self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
722    }
723    
724    /// Begin a read-only transaction (SSI bypass - 2.6x faster)
725    ///
726    /// Read-only transactions skip all SSI tracking, reducing
727    /// per-read overhead from ~82ns to ~32ns.
728    ///
729    /// ## Safety
730    ///
731    /// Caller must ensure no writes are performed. Attempting to
732    /// write in a read-only transaction will still succeed but
733    /// won't be tracked for SSI validation.
734    #[inline]
735    pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
736        self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
737    }
738    
739    /// Begin a write-only transaction (partial SSI bypass)
740    ///
741    /// Write-only transactions skip read tracking, reducing overhead
742    /// for insert-heavy workloads.
743    #[inline]
744    pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
745        self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
746    }
747    
748    /// Begin a transaction with specific mode
749    ///
750    /// This is the core transaction creation method that all other
751    /// begin_* methods delegate to.
752    pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
753        let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
754
755        // Create transaction with mode-optimized allocations
756        let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
757
758        self.active_txns.insert(txn_id, txn.clone());
759        self.update_min_active_ts();
760
761        txn
762    }
763
764    /// Get transaction if active (clones - use get_snapshot_ts for hot path)
765    pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
766        self.active_txns.get(&txn_id).map(|t| t.clone())
767    }
768
769    /// Fast path: get just the snapshot timestamp without cloning
770    /// This is the hot path for reads - avoids cloning bloom filters
771    #[inline]
772    pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
773        self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
774    }
775
776    /// Record a read (for SSI) - uses inline key storage + bloom filter
777    ///
778    /// ## SSI Bypass (Recommendation 9)
779    ///
780    /// For ReadOnly mode transactions, this is a no-op (instant return).
781    /// For WriteOnly mode transactions, this is a no-op.
782    /// Only ReadWrite mode transactions track reads for SSI.
783    ///
784    /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
785    #[inline]
786    pub fn record_read(&self, txn_id: u64, key: &[u8]) {
787        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
788            // SSI Bypass: Skip tracking for read-only and write-only modes
789            if !txn.mode.tracks_reads() {
790                return;
791            }
792            
793            // Only track reads if within reasonable bounds
794            if txn.read_set.len() < 10000 {
795                txn.read_set.insert(SmallVec::from_slice(key));
796                txn.read_bloom.insert(key);
797            }
798        }
799    }
800
801    /// Record a write - uses inline key storage + bloom filter
802    ///
803    /// Note: Even ReadOnly transactions can record writes (mode is a hint).
804    /// The mode only affects SSI tracking, not write capability.
805    pub fn record_write(&self, txn_id: u64, key: &[u8]) {
806        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
807            txn.write_set.insert(SmallVec::from_slice(key));
808            txn.write_bloom.insert(key);
809        }
810    }
811
812    /// Allocate commit timestamp
813    pub fn alloc_commit_ts(&self) -> u64 {
814        self.ts_counter.fetch_add(1, Ordering::SeqCst)
815    }
816
817    /// Commit transaction with SSI validation
818    /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
819    /// Returns None if SSI validation fails (dangerous structure detected)
820    ///
821    /// ## SSI Bypass (Recommendation 9)
822    ///
823    /// For ReadOnly mode: Skip validation entirely (~10ns commit)
824    /// For WriteOnly mode: Skip read-based validation (~30ns commit)
825    /// For ReadWrite mode: Full validation (~50ns commit)
826    pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
827        // Get transaction before removing
828        let txn = self.active_txns.get(&txn_id)?.clone();
829
830        // SSI Bypass: Skip validation for ReadOnly transactions
831        // ReadOnly can never form rw-antidependency cycles
832        if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
833            // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
834            // For ReadWrite: check SSI validation result
835            if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
836                // Abort on SSI violation
837                self.active_txns.remove(&txn_id);
838                self.update_min_active_ts();
839                return None;
840            }
841        }
842
843        let commit_ts = self.alloc_commit_ts();
844
845        // Extract write_set and remove transaction - takes ownership
846        let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
847
848        // OPTIMIZATION: Only track ReadWrite transactions for SSI
849        // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
850        let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite 
851            && !removed_txn.read_set.is_empty() 
852            && !removed_txn.write_set.is_empty();
853        
854        if needs_ssi_tracking {
855            // Need to clone write_set since we return it AND track it
856            let write_set_for_return = removed_txn.write_set.clone();
857            
858            self.track_commit_owned(
859                txn_id,
860                commit_ts,
861                removed_txn.read_bloom,
862                removed_txn.write_bloom,
863                removed_txn.read_set,
864                removed_txn.write_set,
865            );
866
867            self.update_min_active_ts();
868            Some((commit_ts, write_set_for_return))
869        } else {
870            // Fast path: no SSI tracking needed, avoid clone entirely
871            self.update_min_active_ts();
872            Some((commit_ts, removed_txn.write_set))
873        }
874    }
875
876    /// Validate SSI constraints for a committing transaction
877    ///
878    /// ## Transaction Classification (Task 3: Optimistic MVCC)
879    ///
880    /// Transactions are classified and routed through appropriate fast paths:
881    ///
882    /// | Class      | Criteria                      | Validation Cost |
883    /// |------------|-------------------------------|-----------------|
884    /// | ReadOnly   | write_set.is_empty()          | 0 ns           |
885    /// | SingleKey  | write_set.len() == 1          | 0 ns           |
886    /// | Disjoint   | bloom filters don't intersect | ~10 ns         |
887    /// | General    | full SSI check                | ~50 ns         |
888    ///
889    /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
890    /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
891    ///
892    /// Detects "dangerous structures" - rw-antidependency cycles:
893    /// - T1 reads X (snapshot sees old value)
894    /// - T2 writes X (concurrent write)  
895    /// - T2 reads Y (snapshot sees old value)
896    /// - T1 writes Y (concurrent write)
897    ///
898    /// If T1 → rw → T2 → rw → T1 exists, abort T1
899    #[inline]
900    fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
901        // =================================================================
902        // Fast Path 1: Read-only transactions (0 ns)
903        // =================================================================
904        // Read-only transactions can never form rw-antidependency cycles
905        // because they have no writes to create outgoing rw-edges
906        if txn.write_set.is_empty() {
907            return true;
908        }
909
910        // =================================================================
911        // Fast Path 2: No recent commits to check (0 ns)
912        // =================================================================
913        if self.recent_commits.is_empty() {
914            return true;
915        }
916
917        // =================================================================
918        // Fast Path 3: Single-key write transactions (0 ns)
919        // =================================================================
920        // A single-key write transaction cannot form a dangerous cycle:
921        // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
922        //   AND T2 to read what T1 wrote
923        // - With only one key in write_set, the same key would need to be
924        //   in both read_set AND write_set of both transactions
925        // - This is already prevented by our conflict detection (write-write)
926        if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
927            return true;
928        }
929
930        let my_snapshot = txn.snapshot_ts;
931
932        // =================================================================
933        // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
934        // =================================================================
935        // Pre-filter using bloom filters: if our write_bloom doesn't intersect
936        // with any concurrent transaction's read_bloom AND vice versa,
937        // there can be no rw-antidependency
938        let mut any_may_intersect = false;
939        for entry in self.recent_commits.iter() {
940            let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
941            
942            // Only check concurrent transactions
943            if *other_commit_ts <= my_snapshot {
944                continue;
945            }
946
947            // Check bloom filter intersection (O(m/64) per filter)
948            // If our writes may intersect their reads OR their writes may intersect our reads
949            if txn.write_bloom.may_intersect(other_read_bloom) 
950                || other_write_bloom.may_intersect(&txn.read_bloom) 
951            {
952                any_may_intersect = true;
953                break;
954            }
955        }
956
957        // No bloom intersection means definitely disjoint - no SSI conflict possible
958        if !any_may_intersect {
959            return true;
960        }
961
962        // =================================================================
963        // Full SSI Validation (~50 ns)
964        // =================================================================
965        // Check for rw-conflicts with recently committed transactions
966        // An rw-conflict exists if:
967        // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
968        // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
969
970        let mut in_conflict_with: Vec<u64> = Vec::new();
971        let mut out_conflict_to: Vec<u64> = Vec::new();
972
973        for entry in self.recent_commits.iter() {
974            let (
975                other_txn_id,
976                (
977                    other_commit_ts,
978                    _other_read_bloom,
979                    other_write_bloom,
980                    other_read_set,
981                    other_write_set,
982                ),
983            ) = entry.pair();
984
985            // Only consider transactions that committed after our snapshot started
986            // (concurrent transactions)
987            if *other_commit_ts <= my_snapshot {
988                continue;
989            }
990
991            // Check: other wrote → we read (other →rw→ me)
992            // T_other wrote a key that T_me read (rw-dependency inbound)
993            //
994            // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
995            // Only do expensive HashSet intersection if bloom says "maybe conflict"
996            let mut has_in_conflict = false;
997            for key in txn.read_set.iter() {
998                if other_write_bloom.may_contain(key) {
999                    // Bloom says maybe - do exact check
1000                    if other_write_set.contains(key) {
1001                        has_in_conflict = true;
1002                        break;
1003                    }
1004                }
1005            }
1006            if has_in_conflict {
1007                in_conflict_with.push(*other_txn_id);
1008            }
1009
1010            // Check: we wrote → other read (me →rw→ other)
1011            // T_me wrote a key that T_other read (rw-dependency outbound)
1012            //
1013            // Bloom-accelerated: Use our write_bloom against their read_set
1014            let mut has_out_conflict = false;
1015            for key in other_read_set.iter() {
1016                if txn.write_bloom.may_contain(key) {
1017                    // Bloom says maybe - do exact check
1018                    if txn.write_set.contains(key) {
1019                        has_out_conflict = true;
1020                        break;
1021                    }
1022                }
1023            }
1024            if has_out_conflict {
1025                out_conflict_to.push(*other_txn_id);
1026            }
1027        }
1028
1029        // Dangerous structure: we have both incoming AND outgoing rw-edges
1030        // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1031        //
1032        // Conservative check: if both exist, abort
1033        // A more precise check would verify the cycle path, but this is safe
1034        if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1035            return false; // SSI violation - abort
1036        }
1037
1038        true
1039    }
1040
1041    /// Track a committed transaction for future SSI validation
1042    ///
1043    /// Only tracks transactions that have both reads AND writes, since SSI
1044    /// only detects rw-antidependency cycles. Pure read or pure write
1045    /// transactions can't form cycles.
1046    ///
1047    /// ## Optimization: Zero-Copy Transfer
1048    ///
1049    /// Takes ownership of sets instead of cloning to avoid the +15% commit
1050    /// phase regression. The caller should use mem::take() to transfer ownership.
1051    fn track_commit_owned(
1052        &self,
1053        txn_id: u64,
1054        commit_ts: u64,
1055        read_bloom: SsiBloomFilter,
1056        write_bloom: SsiBloomFilter,
1057        read_set: HashSet<InlineKey>,
1058        write_set: HashSet<InlineKey>,
1059    ) {
1060        // Optimization: Only track mixed read-write transactions
1061        // Pure reads can't create outgoing rw-edges
1062        // Pure writes can't create incoming rw-edges
1063        if read_set.is_empty() || write_set.is_empty() {
1064            return; // Skip tracking - can't form SSI cycle
1065        }
1066
1067        // Add to recent commits with bloom filters for fast SSI pre-filtering
1068        // No cloning needed - we take ownership
1069        self.recent_commits.insert(
1070            txn_id,
1071            (
1072                commit_ts,
1073                read_bloom,
1074                write_bloom,
1075                read_set,
1076                write_set,
1077            ),
1078        );
1079
1080        // Lazy pruning: only prune when we're significantly over capacity
1081        // Avoids pruning overhead on every commit
1082        if self.recent_commits.len() > self.max_recent_commits * 2 {
1083            // Remove entries with lowest commit_ts
1084            let min_active = self.min_active_ts.load(Ordering::Relaxed);
1085            self.recent_commits
1086                .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1087        }
1088    }
1089
1090    /// Legacy track_commit that clones - kept for compatibility
1091    #[allow(dead_code)]
1092    fn track_commit(
1093        &self,
1094        txn_id: u64,
1095        commit_ts: u64,
1096        read_bloom: SsiBloomFilter,
1097        write_bloom: SsiBloomFilter,
1098        read_set: &HashSet<InlineKey>,
1099        write_set: &HashSet<InlineKey>,
1100    ) {
1101        if read_set.is_empty() || write_set.is_empty() {
1102            return;
1103        }
1104        self.recent_commits.insert(
1105            txn_id,
1106            (
1107                commit_ts,
1108                read_bloom,
1109                write_bloom,
1110                read_set.clone(),
1111                write_set.clone(),
1112            ),
1113        );
1114    }
1115
1116    /// Abort transaction
1117    pub fn abort(&self, txn_id: u64) {
1118        self.active_txns.remove(&txn_id);
1119        self.update_min_active_ts();
1120    }
1121
1122    /// Get minimum active snapshot timestamp
1123    pub fn min_active_snapshot(&self) -> u64 {
1124        self.min_active_ts.load(Ordering::SeqCst)
1125    }
1126
1127    /// Get count of active transactions
1128    pub fn active_transaction_count(&self) -> usize {
1129        self.active_txns.len()
1130    }
1131
1132    fn update_min_active_ts(&self) {
1133        let min = self
1134            .active_txns
1135            .iter()
1136            .map(|entry| entry.value().snapshot_ts)
1137            .min()
1138            .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1139        self.min_active_ts.store(min, Ordering::SeqCst);
1140    }
1141}
1142
1143/// Epoch-based dirty list for O(expired) GC instead of O(n)
1144///
1145/// Instead of scanning ALL version chains, we track which keys have versions
1146/// created in each epoch. GC only needs to visit keys from old epochs.
1147struct EpochDirtyList {
1148    /// Ring buffer of epoch -> dirty keys
1149    /// Index = epoch % EPOCH_RING_SIZE
1150    epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1151    /// Current epoch
1152    current_epoch: AtomicU64,
1153}
1154
1155const EPOCH_RING_SIZE: usize = 4;
1156
1157impl EpochDirtyList {
1158    fn new() -> Self {
1159        Self {
1160            epochs: [
1161                parking_lot::Mutex::new(Vec::new()),
1162                parking_lot::Mutex::new(Vec::new()),
1163                parking_lot::Mutex::new(Vec::new()),
1164                parking_lot::Mutex::new(Vec::new()),
1165            ],
1166            current_epoch: AtomicU64::new(0),
1167        }
1168    }
1169
1170    /// Record a version created in the current epoch
1171    #[inline]
1172    fn record_version(&self, key: Vec<u8>) {
1173        let epoch = self.current_epoch.load(Ordering::Relaxed);
1174        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1175        self.epochs[idx].lock().push(key);
1176    }
1177
1178    /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1179    ///
1180    /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1181    /// For 100 writes: ~100x fewer mutex operations.
1182    #[inline]
1183    fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1184        let epoch = self.current_epoch.load(Ordering::Relaxed);
1185        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1186        let mut guard = self.epochs[idx].lock();
1187        guard.extend(keys);
1188    }
1189
1190    /// Advance to next epoch, returning old epoch's dirty keys
1191    fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1192        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1193        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1194
1195        // Drain the old epoch's dirty list
1196        let mut guard = self.epochs[old_idx].lock();
1197        let keys = std::mem::take(&mut *guard);
1198        (old_epoch, keys)
1199    }
1200
1201    /// Get current epoch
1202    #[allow(dead_code)]
1203    fn current(&self) -> u64 {
1204        self.current_epoch.load(Ordering::Relaxed)
1205    }
1206}
1207
1208// ============================================================================
1209// Streaming Scan Iterator
1210// ============================================================================
1211
1212/// Streaming iterator for range scans
1213/// 
1214/// Yields results one at a time without materializing the full result set.
1215/// This enables processing of very large result sets with O(1) memory per
1216/// iteration instead of O(N) for the entire result set.
1217struct ScanRangeIterator<'a> {
1218    memtable: &'a MvccMemTable,
1219    start: Vec<u8>,
1220    end: Vec<u8>,
1221    snapshot_ts: u64,
1222    current_txn_id: Option<u64>,
1223    use_ordered: bool,
1224    // We use Option to defer initialization
1225    ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1226    unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1227    initialized: bool,
1228}
1229
1230impl<'a> Iterator for ScanRangeIterator<'a> {
1231    type Item = (Vec<u8>, Vec<u8>);
1232    
1233    fn next(&mut self) -> Option<Self::Item> {
1234        // Lazy initialization on first call
1235        if !self.initialized {
1236            self.initialized = true;
1237            
1238            if self.use_ordered {
1239                // Try deferred index first (after compaction, it uses a SkipMap internally)
1240                if let Some(ref def_idx) = self.memtable.deferred_index {
1241                    let start = self.start.clone();
1242                    let end = self.end.clone();
1243                    let snapshot_ts = self.snapshot_ts;
1244                    let current_txn_id = self.current_txn_id;
1245                    let data = &self.memtable.data;
1246                    
1247                    // Collect keys from deferred index (already sorted after compact)
1248                    let keys: Vec<Vec<u8>> = if end.is_empty() {
1249                        def_idx.range_from(&start).collect()
1250                    } else {
1251                        def_idx.range(&start, &end).collect()
1252                    };
1253                    
1254                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1255                        keys.into_iter()
1256                            .filter_map(move |key| {
1257                                if let Some(chain) = data.get(&key)
1258                                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1259                                    && let Some(value) = &v.value
1260                                {
1261                                    Some((key, value.clone()))
1262                                } else {
1263                                    None
1264                                }
1265                            })
1266                    );
1267                    self.ordered_iter = Some(iter);
1268                } else if let Some(ref idx) = self.memtable.ordered_index {
1269                    let start = self.start.clone();
1270                    let end = self.end.clone();
1271                    let snapshot_ts = self.snapshot_ts;
1272                    let current_txn_id = self.current_txn_id;
1273                    let data = &self.memtable.data;
1274                    
1275                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
1276                        Box::new(
1277                            idx.range(start..)
1278                                .filter_map(move |entry| {
1279                                    let key = entry.key();
1280                                    if let Some(chain) = data.get(key)
1281                                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1282                                        && let Some(value) = &v.value
1283                                    {
1284                                        Some((key.clone(), value.clone()))
1285                                    } else {
1286                                        None
1287                                    }
1288                                })
1289                        )
1290                    } else {
1291                        Box::new(
1292                            idx.range(start..end)
1293                                .filter_map(move |entry| {
1294                                    let key = entry.key();
1295                                    if let Some(chain) = data.get(key)
1296                                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1297                                        && let Some(value) = &v.value
1298                                    {
1299                                        Some((key.clone(), value.clone()))
1300                                    } else {
1301                                        None
1302                                    }
1303                                })
1304                        )
1305                    };
1306                    self.ordered_iter = Some(iter);
1307                }
1308            } else {
1309                // Unordered full scan
1310                let start = self.start.clone();
1311                let end = self.end.clone();
1312                let snapshot_ts = self.snapshot_ts;
1313                let current_txn_id = self.current_txn_id;
1314                
1315                let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1316                    self.memtable.data.iter()
1317                        .filter_map(move |entry| {
1318                            let key = entry.key();
1319                            
1320                            if key.as_slice() < start.as_slice() {
1321                                return None;
1322                            }
1323                            if !end.is_empty() && key.as_slice() >= end.as_slice() {
1324                                return None;
1325                            }
1326                            
1327                            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1328                                && let Some(value) = &v.value
1329                            {
1330                                Some((key.clone(), value.clone()))
1331                            } else {
1332                                None
1333                            }
1334                        })
1335                );
1336                self.unordered_iter = Some(iter);
1337            }
1338        }
1339        
1340        // Get next from appropriate iterator
1341        if let Some(ref mut iter) = self.ordered_iter {
1342            iter.next()
1343        } else if let Some(ref mut iter) = self.unordered_iter {
1344            iter.next()
1345        } else {
1346            None
1347        }
1348    }
1349}
1350
1351/// MemTable with MVCC support
1352///
1353/// Uses DashMap for lock-free concurrent access per key.
1354/// This eliminates the global write lock bottleneck.
1355///
1356/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1357/// Maintains a deferred sorted index for efficient scans:
1358/// - Writes: O(1) append to hot buffer
1359/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1360pub struct MvccMemTable {
1361    /// Key -> VersionChain (sharded for concurrent access)
1362    data: DashMap<Vec<u8>, VersionChain>,
1363    /// Deferred sorted index for efficient prefix/range scans (optional)
1364    /// O(1) insert to hot buffer, O(N log N) sort on first scan
1365    /// When None, scan_prefix will fall back to O(N) DashMap iteration
1366    deferred_index: Option<DeferredSortedIndex>,
1367    /// Legacy SkipMap for compatibility (used when deferred=false)
1368    ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1369    /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1370    #[allow(dead_code)]
1371    use_deferred: bool,
1372    /// Approximate size in bytes
1373    size_bytes: AtomicU64,
1374    /// Epoch-based dirty list for efficient GC
1375    dirty_list: EpochDirtyList,
1376}
1377
1378impl Default for MvccMemTable {
1379    fn default() -> Self {
1380        Self::new()
1381    }
1382}
1383
1384impl MvccMemTable {
1385    pub fn new() -> Self {
1386        Self::with_ordered_index(true)
1387    }
1388
1389    /// Create memtable with optional ordered index
1390    ///
1391    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1392    /// but scan_prefix becomes O(N) instead of O(log N + K)
1393    ///
1394    /// Uses deferred sorting by default for better write performance:
1395    /// - Writes: O(1) append to hot buffer
1396    /// - Scans: O(N log N) sort-on-demand
1397    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1398        Self::with_index_mode(enable_ordered_index, true)
1399    }
1400
1401    /// Create memtable with fine-grained control over indexing
1402    ///
1403    /// # Arguments
1404    /// * `enable_ordered_index` - Whether to maintain an ordered index
1405    /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1406    ///                    If false, use SkipMap (O(log N) writes)
1407    pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1408        Self {
1409            data: DashMap::new(),
1410            deferred_index: if enable_ordered_index && use_deferred {
1411                Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1412                    max_unsorted_entries: 10_000, // Compact every 10K writes
1413                    enabled: true,
1414                }))
1415            } else {
1416                None
1417            },
1418            ordered_index: if enable_ordered_index && !use_deferred {
1419                Some(SkipMap::new())
1420            } else {
1421                None
1422            },
1423            use_deferred,
1424            size_bytes: AtomicU64::new(0),
1425            dirty_list: EpochDirtyList::new(),
1426        }
1427    }
1428
1429    /// Write a key-value pair (uncommitted)
1430    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1431        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1432        let key_len = key.len();
1433
1434        // Track this key in the current epoch's dirty list for GC
1435        self.dirty_list.record_version(key.clone());
1436
1437        // Insert into ordered index for prefix scans (if enabled)
1438        // Deferred: O(1) append to hot buffer
1439        // SkipMap: O(log N) insert
1440        if let Some(ref idx) = self.deferred_index {
1441            idx.insert(key.clone());
1442        } else if let Some(ref idx) = self.ordered_index {
1443            idx.insert(key.clone(), ());
1444        }
1445
1446        // Use entry API for atomic get-or-insert
1447        let mut entry = self.data.entry(key).or_default();
1448
1449        // Check for write-write conflict
1450        if entry.has_write_conflict(txn_id) {
1451            return Err(SochDBError::Internal(
1452                "Write-write conflict detected".into(),
1453            ));
1454        }
1455        entry.add_uncommitted(value, txn_id);
1456        self.size_bytes
1457            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1458
1459        Ok(())
1460    }
1461
1462    /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1463    ///
1464    /// Optimizations applied (Rec 3: MVCC Batching):
1465    /// - Batched dirty list tracking: single lock acquire for all keys
1466    /// - Deferred index: O(1) append per key
1467    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1468        let mut total_size = 0u64;
1469
1470        // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1471        self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1472
1473        for (key, value) in writes {
1474            // Insert into ordered index (if enabled)
1475            // Deferred: O(1) append, SkipMap: O(log N)
1476            if let Some(ref idx) = self.deferred_index {
1477                idx.insert(key.clone());
1478            } else if let Some(ref idx) = self.ordered_index {
1479                idx.insert(key.clone(), ());
1480            }
1481
1482            let mut entry = self.data.entry(key.clone()).or_default();
1483
1484            if entry.has_write_conflict(txn_id) {
1485                return Err(SochDBError::Internal(
1486                    "Write-write conflict detected".into(),
1487                ));
1488            }
1489
1490            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1491            entry.add_uncommitted(value.clone(), txn_id);
1492            total_size += (key.len() + value_size) as u64;
1493        }
1494
1495        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1496        Ok(())
1497    }
1498
1499    /// Read at snapshot timestamp, with optional current txn to see own writes
1500    pub fn read(
1501        &self,
1502        key: &[u8],
1503        snapshot_ts: u64,
1504        current_txn_id: Option<u64>,
1505    ) -> Option<Vec<u8>> {
1506        self.data.get(key).and_then(|chain| {
1507            chain
1508                .read_at(snapshot_ts, current_txn_id)
1509                .and_then(|v| v.value.clone())
1510        })
1511    }
1512
1513    /// Commit all versions for a transaction
1514    ///
1515    /// Only updates the keys that were written by this transaction (tracked in write_set).
1516    /// Accepts InlineKey for zero-allocation MVCC tracking.
1517    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1518        // Only iterate over keys we know were written - O(k) instead of O(n)
1519        for key in write_set {
1520            if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1521                chain.commit(txn_id, commit_ts);
1522            }
1523        }
1524    }
1525
1526    /// Legacy commit method (iterates all keys) - kept for backward compatibility
1527    #[allow(dead_code)]
1528    pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1529        for mut entry in self.data.iter_mut() {
1530            entry.value_mut().commit(txn_id, commit_ts);
1531        }
1532    }
1533
1534    /// Abort all versions for a transaction
1535    pub fn abort(&self, txn_id: u64) {
1536        for mut entry in self.data.iter_mut() {
1537            entry.value_mut().abort(txn_id);
1538        }
1539    }
1540
1541    /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1542    ///
1543    /// ## Performance
1544    ///
1545    /// When ordered_index is enabled: O(log N + K) complexity
1546    /// - O(log N) to seek to the first key with prefix
1547    /// - O(K) to iterate matching keys
1548    ///
1549    /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1550    /// 
1551    /// ## Optimizations Applied
1552    /// 
1553    /// - Pre-allocates result vector based on expected output size
1554    /// - Uses batch-friendly iteration patterns
1555    /// - Minimizes allocations during iteration
1556    /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1557    pub fn scan_prefix(
1558        &self,
1559        prefix: &[u8],
1560        snapshot_ts: u64,
1561        current_txn_id: Option<u64>,
1562    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1563        // Estimate result size for pre-allocation (use 10% of total as heuristic)
1564        let estimated_size = (self.data.len() / 10).max(64);
1565        let mut results = Vec::with_capacity(estimated_size);
1566
1567        if let Some(ref idx) = self.deferred_index {
1568            // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1569            for key in idx.range_from(prefix) {
1570                // Stop when we've passed the prefix range
1571                if !key.starts_with(prefix) {
1572                    break;
1573                }
1574
1575                // O(1) lookup in DashMap for version chain
1576                if let Some(chain) = self.data.get(&key)
1577                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1578                    && let Some(value) = &v.value
1579                {
1580                    results.push((key, value.clone()));
1581                }
1582            }
1583        } else if let Some(ref idx) = self.ordered_index {
1584            // Fast path: O(log N) seek to first key >= prefix
1585            for entry in idx.range(prefix.to_vec()..) {
1586                let key = entry.key();
1587
1588                // Stop when we've passed the prefix range
1589                if !key.starts_with(prefix) {
1590                    break;
1591                }
1592
1593                // O(1) lookup in DashMap for version chain
1594                if let Some(chain) = self.data.get(key)
1595                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1596                    && let Some(value) = &v.value
1597                {
1598                    results.push((key.clone(), value.clone()));
1599                }
1600            }
1601        } else {
1602            // Fallback: O(N) full DashMap scan when ordered_index is disabled
1603            // Optimized with batch-friendly iteration
1604            for entry in self.data.iter() {
1605                let key = entry.key();
1606                if !key.starts_with(prefix) {
1607                    continue;
1608                }
1609                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1610                    && let Some(value) = &v.value
1611                {
1612                    results.push((key.clone(), value.clone()));
1613                }
1614            }
1615        }
1616
1617        results
1618    }
1619
1620    /// Optimized full scan with batch allocation
1621    /// 
1622    /// For use when scanning entire tables/namespaces.
1623    /// Pre-allocates based on actual data size.
1624    pub fn scan_all(
1625        &self,
1626        snapshot_ts: u64,
1627        current_txn_id: Option<u64>,
1628    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1629        let mut results = Vec::with_capacity(self.data.len());
1630
1631        for entry in self.data.iter() {
1632            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1633                && let Some(value) = &v.value
1634            {
1635                results.push((entry.key().clone(), value.clone()));
1636            }
1637        }
1638
1639        results
1640    }
1641
1642    /// Streaming scan iterator for very large datasets
1643    /// 
1644    /// Returns an iterator that yields (key, value) pairs without
1645    /// materializing the entire result set in memory.
1646    pub fn scan_prefix_iter<'a>(
1647        &'a self,
1648        prefix: &'a [u8],
1649        snapshot_ts: u64,
1650        current_txn_id: Option<u64>,
1651    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1652        self.data.iter().filter_map(move |entry| {
1653            let key = entry.key();
1654            if !key.starts_with(prefix) {
1655                return None;
1656            }
1657            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1658                && let Some(value) = &v.value
1659            {
1660                Some((key.clone(), value.clone()))
1661            } else {
1662                None
1663            }
1664        })
1665    }
1666
1667    /// Scan range
1668    pub fn scan_range(
1669        &self,
1670        start: &[u8],
1671        end: &[u8],
1672        snapshot_ts: u64,
1673        current_txn_id: Option<u64>,
1674    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1675        let mut results = Vec::new();
1676
1677        if let Some(ref idx) = self.deferred_index {
1678            // Deferred index path: sort-on-scan
1679            if end.is_empty() {
1680                for key in idx.range_from(start) {
1681                    if let Some(chain) = self.data.get(&key)
1682                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1683                        && let Some(value) = &v.value
1684                    {
1685                        results.push((key, value.clone()));
1686                    }
1687                }
1688            } else {
1689                for key in idx.range(start, end) {
1690                    if let Some(chain) = self.data.get(&key)
1691                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1692                        && let Some(value) = &v.value
1693                    {
1694                        results.push((key, value.clone()));
1695                    }
1696                }
1697            }
1698        } else if let Some(ref idx) = self.ordered_index {
1699            // Use range scan on SkipMap
1700            if end.is_empty() {
1701                // Unbounded end
1702                for entry in idx.range(start.to_vec()..) {
1703                    let key = entry.key();
1704                    if let Some(chain) = self.data.get(key)
1705                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1706                        && let Some(value) = &v.value
1707                    {
1708                        results.push((key.clone(), value.clone()));
1709                    }
1710                }
1711            } else {
1712                for entry in idx.range(start.to_vec()..end.to_vec()) {
1713                    let key = entry.key();
1714                    if let Some(chain) = self.data.get(key)
1715                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1716                        && let Some(value) = &v.value
1717                    {
1718                        results.push((key.clone(), value.clone()));
1719                    }
1720                }
1721            }
1722        } else {
1723            // Fallback to full scan if no ordered index
1724            for entry in self.data.iter() {
1725                let key = entry.key();
1726
1727                if key.as_slice() < start {
1728                    continue;
1729                }
1730                if !end.is_empty() && key.as_slice() >= end {
1731                    continue;
1732                }
1733
1734                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1735                    && let Some(value) = &v.value
1736                {
1737                    results.push((key.clone(), value.clone()));
1738                }
1739            }
1740        }
1741
1742        results
1743    }
1744
1745    /// Streaming range scan iterator for very large datasets
1746    /// 
1747    /// Returns an iterator that yields (key, value) pairs without
1748    /// materializing the entire result set in memory. Uses the ordered
1749    /// index when available for O(log N + K) complexity.
1750    /// 
1751    /// ## Zero-Allocation Design
1752    /// 
1753    /// While the iterator itself cannot avoid allocations for returned
1754    /// values (since the caller needs ownership), it avoids:
1755    /// - Pre-materializing all results
1756    /// - Intermediate buffers
1757    /// - Repeated key comparisons for already-visited entries
1758    /// 
1759    /// ## Usage
1760    /// 
1761    /// ```ignore
1762    /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1763    ///     // Process each result as it arrives
1764    ///     // Memory usage is O(1) per iteration, not O(N) total
1765    /// }
1766    /// ```
1767    pub fn scan_range_iter<'a>(
1768        &'a self,
1769        start: &'a [u8],
1770        end: &'a [u8],
1771        snapshot_ts: u64,
1772        current_txn_id: Option<u64>,
1773    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1774        // Compact deferred index before scanning if needed
1775        if let Some(ref idx) = self.deferred_index {
1776            idx.compact();
1777        }
1778        
1779        // Use either ordered index or full scan
1780        let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1781        
1782        // Create iterator based on availability of ordered index
1783        ScanRangeIterator {
1784            memtable: self,
1785            start: start.to_vec(),
1786            end: end.to_vec(),
1787            snapshot_ts,
1788            current_txn_id,
1789            use_ordered,
1790            ordered_iter: None,
1791            unordered_iter: None,
1792            initialized: false,
1793        }
1794    }
1795
1796    /// Get approximate size
1797    pub fn size(&self) -> u64 {
1798        self.size_bytes.load(Ordering::Relaxed)
1799    }
1800
1801    /// Garbage collect old versions using epoch-based dirty list
1802    ///
1803    /// O(expired_versions) instead of O(all_versions)
1804    /// Only visits keys that had versions created in the old epoch.
1805    pub fn gc(&self, min_active_ts: u64) -> usize {
1806        // Advance epoch and get the dirty keys from the old epoch
1807        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1808
1809        if dirty_keys.is_empty() {
1810            return 0;
1811        }
1812
1813        let mut gc_count = 0;
1814
1815        // Only visit keys that were modified in the old epoch
1816        // Use a HashSet to deduplicate keys that were written multiple times
1817        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1818
1819        for key in unique_keys {
1820            if let Some(mut entry) = self.data.get_mut(&key) {
1821                let before = entry.value().version_count();
1822                entry.value_mut().gc(min_active_ts);
1823                gc_count += before.saturating_sub(entry.value().version_count());
1824            }
1825        }
1826
1827        gc_count
1828    }
1829
1830    /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1831    #[allow(dead_code)]
1832    pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1833        let mut gc_count = 0;
1834
1835        for mut entry in self.data.iter_mut() {
1836            let before = entry.value().version_count();
1837            entry.value_mut().gc(min_active_ts);
1838            gc_count += before.saturating_sub(entry.value().version_count());
1839        }
1840
1841        gc_count
1842    }
1843}
1844
1845// ============================================================================
1846// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1847// ============================================================================
1848
1849use crate::key_buffer::ArenaKeyHandle;
1850
1851/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1852struct ArenaEpochDirtyList {
1853    epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1854    current_epoch: AtomicU64,
1855}
1856
1857impl ArenaEpochDirtyList {
1858    fn new() -> Self {
1859        Self {
1860            epochs: [
1861                parking_lot::Mutex::new(Vec::new()),
1862                parking_lot::Mutex::new(Vec::new()),
1863                parking_lot::Mutex::new(Vec::new()),
1864                parking_lot::Mutex::new(Vec::new()),
1865            ],
1866            current_epoch: AtomicU64::new(0),
1867        }
1868    }
1869
1870    #[inline]
1871    fn record_version(&self, key: ArenaKeyHandle) {
1872        let epoch = self.current_epoch.load(Ordering::Relaxed);
1873        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1874        self.epochs[idx].lock().push(key);
1875    }
1876
1877    fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1878        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1879        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1880        let mut guard = self.epochs[old_idx].lock();
1881        let keys = std::mem::take(&mut *guard);
1882        (old_epoch, keys)
1883    }
1884}
1885
1886/// Arena-backed MVCC MemTable with optimized key storage
1887///
1888/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1889/// reducing per-write allocations from 3 to 1:
1890///
1891/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1892/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1893///
1894/// ## Performance
1895///
1896/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1897/// by reducing:
1898/// - Heap allocations: 3 → 1 per write
1899/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1900pub struct ArenaMvccMemTable {
1901    /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1902    data: DashMap<ArenaKeyHandle, VersionChain>,
1903    /// Ordered index for prefix scans
1904    ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1905    /// Approximate size in bytes
1906    size_bytes: AtomicU64,
1907    /// Epoch-based dirty list (arena-backed)
1908    dirty_list: ArenaEpochDirtyList,
1909}
1910
1911impl ArenaMvccMemTable {
1912    pub fn new() -> Self {
1913        Self::with_ordered_index(true)
1914    }
1915
1916    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1917        Self {
1918            data: DashMap::new(),
1919            ordered_index: if enable_ordered_index {
1920                Some(SkipMap::new())
1921            } else {
1922                None
1923            },
1924            size_bytes: AtomicU64::new(0),
1925            dirty_list: ArenaEpochDirtyList::new(),
1926        }
1927    }
1928
1929    /// Write a key-value pair using arena key handle
1930    ///
1931    /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
1932    /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
1933    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1934        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1935        let key_len = key.len();
1936
1937        // Create ONE ArenaKeyHandle - this is the only allocation for the key
1938        let key_handle = ArenaKeyHandle::new(key);
1939
1940        // Track in dirty list (O(1) copy of 16-byte handle)
1941        self.dirty_list.record_version(key_handle.clone());
1942
1943        // Insert into ordered index (O(1) copy of 16-byte handle)
1944        if let Some(ref idx) = self.ordered_index {
1945            idx.insert(key_handle.clone(), ());
1946        }
1947
1948        // Use entry API with the handle
1949        let mut entry = self.data.entry(key_handle).or_default();
1950
1951        if entry.has_write_conflict(txn_id) {
1952            return Err(SochDBError::Internal(
1953                "Write-write conflict detected".into(),
1954            ));
1955        }
1956        entry.add_uncommitted(value, txn_id);
1957        self.size_bytes
1958            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1959
1960        Ok(())
1961    }
1962
1963    /// Write batch using arena key handles
1964    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1965        let mut total_size = 0u64;
1966
1967        for (key, value) in writes {
1968            let key_handle = ArenaKeyHandle::new(key);
1969
1970            self.dirty_list.record_version(key_handle.clone());
1971
1972            if let Some(ref idx) = self.ordered_index {
1973                idx.insert(key_handle.clone(), ());
1974            }
1975
1976            let mut entry = self.data.entry(key_handle).or_default();
1977
1978            if entry.has_write_conflict(txn_id) {
1979                return Err(SochDBError::Internal(
1980                    "Write-write conflict detected".into(),
1981                ));
1982            }
1983
1984            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1985            entry.add_uncommitted(value.clone(), txn_id);
1986            total_size += (key.len() + value_size) as u64;
1987        }
1988
1989        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1990        Ok(())
1991    }
1992
1993    /// Read at snapshot timestamp
1994    pub fn read(
1995        &self,
1996        key: &[u8],
1997        snapshot_ts: u64,
1998        current_txn_id: Option<u64>,
1999    ) -> Option<Vec<u8>> {
2000        // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2001        let key_handle = ArenaKeyHandle::new(key);
2002        self.data.get(&key_handle).and_then(|chain| {
2003            chain
2004                .read_at(snapshot_ts, current_txn_id)
2005                .and_then(|v| v.value.clone())
2006        })
2007    }
2008
2009    /// Commit transaction
2010    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2011        for key in write_set {
2012            let key_handle = ArenaKeyHandle::new(key.as_slice());
2013            if let Some(mut chain) = self.data.get_mut(&key_handle) {
2014                chain.commit(txn_id, commit_ts);
2015            }
2016        }
2017    }
2018
2019    /// Abort transaction
2020    pub fn abort(&self, txn_id: u64) {
2021        for mut entry in self.data.iter_mut() {
2022            entry.value_mut().abort(txn_id);
2023        }
2024    }
2025
2026    /// Scan prefix
2027    pub fn scan_prefix(
2028        &self,
2029        prefix: &[u8],
2030        snapshot_ts: u64,
2031        current_txn_id: Option<u64>,
2032    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2033        let mut results = Vec::new();
2034        let prefix_handle = ArenaKeyHandle::new(prefix);
2035
2036        if let Some(ref idx) = self.ordered_index {
2037            for entry in idx.range(prefix_handle..) {
2038                let key = entry.key();
2039
2040                if !key.as_bytes().starts_with(prefix) {
2041                    break;
2042                }
2043
2044                if let Some(chain) = self.data.get(key)
2045                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2046                    && let Some(value) = &v.value
2047                {
2048                    results.push((key.as_bytes().to_vec(), value.clone()));
2049                }
2050            }
2051        } else {
2052            for entry in self.data.iter() {
2053                let key = entry.key();
2054                if !key.as_bytes().starts_with(prefix) {
2055                    continue;
2056                }
2057                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2058                    && let Some(value) = &v.value
2059                {
2060                    results.push((key.as_bytes().to_vec(), value.clone()));
2061                }
2062            }
2063        }
2064
2065        results
2066    }
2067
2068    /// Get approximate size
2069    pub fn size(&self) -> u64 {
2070        self.size_bytes.load(Ordering::Relaxed)
2071    }
2072
2073    /// Garbage collect old versions
2074    pub fn gc(&self, min_active_ts: u64) -> usize {
2075        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2076
2077        if dirty_keys.is_empty() {
2078            return 0;
2079        }
2080
2081        let mut gc_count = 0;
2082        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2083
2084        for key in unique_keys {
2085            if let Some(mut entry) = self.data.get_mut(&key) {
2086                let before = entry.value().version_count();
2087                entry.value_mut().gc(min_active_ts);
2088                gc_count += before.saturating_sub(entry.value().version_count());
2089            }
2090        }
2091
2092        gc_count
2093    }
2094}
2095
2096impl Default for ArenaMvccMemTable {
2097    fn default() -> Self {
2098        Self::new()
2099    }
2100}
2101
2102// ============================================================================
2103// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2104// ============================================================================
2105
2106/// Configuration for memtable type selection
2107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2108pub enum MemTableType {
2109    /// Standard MVCC memtable with deferred sorting
2110    /// Best for: general workloads, balanced read/write
2111    Standard,
2112    /// Arena-backed memtable with reduced allocations
2113    /// Best for: write-heavy workloads, large keys
2114    Arena,
2115}
2116
2117impl Default for MemTableType {
2118    fn default() -> Self {
2119        // Default to Standard which now has deferred sorting
2120        MemTableType::Standard
2121    }
2122}
2123
2124/// Unified memtable abstraction using enum dispatch
2125///
2126/// This pattern provides:
2127/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2128/// - Type-safe switching between implementations
2129/// - Easy extensibility for future memtable types
2130///
2131/// ## Why Enum over Trait Object?
2132///
2133/// - Hot path performance: enum match is a single branch vs vtable indirection
2134/// - Cache friendliness: no pointer chasing
2135/// - Inlining: compiler can inline through enum dispatch
2136pub enum MemTableKind {
2137    Standard(MvccMemTable),
2138    Arena(ArenaMvccMemTable),
2139}
2140
2141impl MemTableKind {
2142    /// Create a new memtable of the specified type
2143    pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2144        match kind {
2145            MemTableType::Standard => {
2146                MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2147            }
2148            MemTableType::Arena => {
2149                MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2150            }
2151        }
2152    }
2153
2154    /// Write a key-value pair
2155    #[inline]
2156    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2157        match self {
2158            MemTableKind::Standard(m) => m.write(key, value, txn_id),
2159            MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2160        }
2161    }
2162
2163    /// Write batch of key-value pairs
2164    #[inline]
2165    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2166        match self {
2167            MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2168            MemTableKind::Arena(m) => {
2169                // Convert to arena-compatible format
2170                let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2171                    .iter()
2172                    .map(|(k, v)| (k.as_slice(), v.clone()))
2173                    .collect();
2174                m.write_batch(&arena_writes, txn_id)
2175            }
2176        }
2177    }
2178
2179    /// Read at snapshot timestamp
2180    #[inline]
2181    pub fn read(
2182        &self,
2183        key: &[u8],
2184        snapshot_ts: u64,
2185        current_txn_id: Option<u64>,
2186    ) -> Option<Vec<u8>> {
2187        match self {
2188            MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2189            MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2190        }
2191    }
2192
2193    /// Commit transaction
2194    #[inline]
2195    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2196        match self {
2197            MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2198            MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2199        }
2200    }
2201
2202    /// Abort transaction
2203    #[inline]
2204    pub fn abort(&self, txn_id: u64) {
2205        match self {
2206            MemTableKind::Standard(m) => m.abort(txn_id),
2207            MemTableKind::Arena(m) => m.abort(txn_id),
2208        }
2209    }
2210
2211    /// Scan prefix
2212    #[inline]
2213    pub fn scan_prefix(
2214        &self,
2215        prefix: &[u8],
2216        snapshot_ts: u64,
2217        current_txn_id: Option<u64>,
2218    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2219        match self {
2220            MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2221            MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2222        }
2223    }
2224
2225    /// Scan range
2226    #[inline]
2227    pub fn scan_range(
2228        &self,
2229        start: &[u8],
2230        end: &[u8],
2231        snapshot_ts: u64,
2232        current_txn_id: Option<u64>,
2233    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2234        match self {
2235            MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2236            MemTableKind::Arena(m) => {
2237                // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2238                let mut results = Vec::new();
2239                if let Some(ref idx) = m.ordered_index {
2240                    let start_handle = ArenaKeyHandle::new(start);
2241                    let end_handle = ArenaKeyHandle::new(end);
2242                    
2243                    if end.is_empty() {
2244                        for entry in idx.range(start_handle..) {
2245                            let key = entry.key();
2246                            if let Some(chain) = m.data.get(key)
2247                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2248                                && let Some(value) = &v.value
2249                            {
2250                                results.push((key.as_bytes().to_vec(), value.clone()));
2251                            }
2252                        }
2253                    } else {
2254                        for entry in idx.range(start_handle..end_handle) {
2255                            let key = entry.key();
2256                            if let Some(chain) = m.data.get(key)
2257                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2258                                && let Some(value) = &v.value
2259                            {
2260                                results.push((key.as_bytes().to_vec(), value.clone()));
2261                            }
2262                        }
2263                    }
2264                } else {
2265                    for entry in m.data.iter() {
2266                        let key = entry.key();
2267                        let key_bytes = key.as_bytes();
2268                        if key_bytes < start {
2269                            continue;
2270                        }
2271                        if !end.is_empty() && key_bytes >= end {
2272                            continue;
2273                        }
2274                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2275                            && let Some(value) = &v.value
2276                        {
2277                            results.push((key_bytes.to_vec(), value.clone()));
2278                        }
2279                    }
2280                }
2281                results
2282            }
2283        }
2284    }
2285
2286    /// Scan range iterator (returns collected results for now)
2287    #[inline]
2288    pub fn scan_range_iter<'a>(
2289        &'a self,
2290        start: &'a [u8],
2291        end: &'a [u8],
2292        snapshot_ts: u64,
2293        current_txn_id: Option<u64>,
2294    ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2295        match self {
2296            MemTableKind::Standard(m) => {
2297                Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2298            }
2299            MemTableKind::Arena(_) => {
2300                // Arena version returns collected results as iterator
2301                let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2302                Box::new(results.into_iter())
2303            }
2304        }
2305    }
2306
2307    /// Get approximate size
2308    #[inline]
2309    pub fn size(&self) -> u64 {
2310        match self {
2311            MemTableKind::Standard(m) => m.size(),
2312            MemTableKind::Arena(m) => m.size(),
2313        }
2314    }
2315
2316    /// Garbage collect old versions
2317    #[inline]
2318    pub fn gc(&self, min_active_ts: u64) -> usize {
2319        match self {
2320            MemTableKind::Standard(m) => m.gc(min_active_ts),
2321            MemTableKind::Arena(m) => m.gc(min_active_ts),
2322        }
2323    }
2324
2325    /// Get the kind of memtable
2326    pub fn kind(&self) -> MemTableType {
2327        match self {
2328            MemTableKind::Standard(_) => MemTableType::Standard,
2329            MemTableKind::Arena(_) => MemTableType::Arena,
2330        }
2331    }
2332}
2333
2334/// Durable storage engine with full ACID support
2335pub struct DurableStorage {
2336    /// Path to storage directory
2337    path: PathBuf,
2338    /// Write-ahead log
2339    wal: Arc<TxnWal>,
2340    /// MVCC manager
2341    mvcc: Arc<MvccManager>,
2342    /// In-memory data (unified abstraction over Standard/Arena)
2343    memtable: Arc<MemTableKind>,
2344    /// Per-transaction WAL buffers for batched writes
2345    /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2346    /// At commit, buffer is flushed to WAL with single lock acquisition
2347    txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2348    /// Group commit buffer (optional)
2349    group_commit: Option<Arc<EventDrivenGroupCommit>>,
2350    /// Recovery state
2351    needs_recovery: AtomicU64, // 1 = needs recovery
2352    /// Last checkpoint LSN
2353    last_checkpoint_lsn: AtomicU64,
2354    /// Synchronous mode (like SQLite's PRAGMA synchronous)
2355    /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2356    sync_mode: AtomicU64,
2357    /// Commits since last sync (for NORMAL mode)
2358    commits_since_sync: AtomicU64,
2359    /// Adaptive batch sizing for NORMAL mode (Little's Law)
2360    /// Arrival rate in requests/sec × 1000 for precision
2361    arrival_rate_ema: AtomicU64,
2362    /// Last commit timestamp in microseconds
2363    last_commit_us: AtomicU64,
2364    /// Estimated fsync latency in microseconds
2365    fsync_latency_us: AtomicU64,
2366    /// Database lock for exclusive access (None = no locking)
2367    #[allow(dead_code)]
2368    db_lock: Option<crate::lock::DatabaseLock>,
2369}
2370
2371impl DurableStorage {
2372    /// Open or create durable storage at path
2373    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2374        Self::open_with_config(path, true)
2375    }
2376
2377    /// Open with configurable ordered index
2378    ///
2379    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2380    /// but scan_prefix becomes O(N) instead of O(log N + K)
2381    pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2382        Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2383    }
2384
2385    /// Open with arena-backed memtable for write-heavy workloads
2386    ///
2387    /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2388    /// Best for workloads with:
2389    /// - High write throughput
2390    /// - Large keys (reduces allocation overhead)
2391    /// - Minimal concurrent reads during writes
2392    pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2393        Self::open_with_full_config(path, true, MemTableType::Arena)
2394    }
2395
2396    /// Open with full configuration options
2397    ///
2398    /// # Arguments
2399    /// * `path` - Storage directory path
2400    /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2401    /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2402    ///
2403    /// # Locking
2404    ///
2405    /// Acquires an exclusive advisory lock on the database directory.
2406    /// This prevents concurrent multi-process access which would corrupt data.
2407    /// If another process has the database open, returns `Err(DatabaseLocked)`.
2408    pub fn open_with_full_config<P: AsRef<Path>>(
2409        path: P,
2410        enable_ordered_index: bool,
2411        memtable_type: MemTableType,
2412    ) -> Result<Self> {
2413        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2414    }
2415
2416    /// Open without locking (for testing crash recovery scenarios)
2417    ///
2418    /// # Safety
2419    /// This should ONLY be used in tests that simulate crashes by forgetting
2420    /// the storage instance. In production, always use `open_with_full_config`.
2421    #[cfg(test)]
2422    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2423        Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2424    }
2425
2426    fn open_with_full_config_internal<P: AsRef<Path>>(
2427        path: P,
2428        enable_ordered_index: bool,
2429        memtable_type: MemTableType,
2430        acquire_lock: bool,
2431    ) -> Result<Self> {
2432        let path = path.as_ref().to_path_buf();
2433        std::fs::create_dir_all(&path)?;
2434
2435        // Acquire exclusive lock on database directory (unless disabled for testing)
2436        let db_lock = if acquire_lock {
2437            Some(crate::lock::DatabaseLock::acquire(&path)
2438                .map_err(|e| SochDBError::LockError(e.to_string()))?)
2439        } else {
2440            None
2441        };
2442
2443        let wal_path = path.join("wal.log");
2444        let wal = Arc::new(TxnWal::new(&wal_path)?);
2445
2446        let storage = Self {
2447            path,
2448            wal: wal.clone(),
2449            mvcc: Arc::new(MvccManager::new()),
2450            memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2451            txn_write_buffers: DashMap::new(),
2452            group_commit: None,
2453            needs_recovery: AtomicU64::new(0),
2454            last_checkpoint_lsn: AtomicU64::new(0),
2455            sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2456            commits_since_sync: AtomicU64::new(0),
2457            // Adaptive batch sizing (Little's Law)
2458            arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2459            last_commit_us: AtomicU64::new(0),
2460            fsync_latency_us: AtomicU64::new(5000), // 5ms default
2461            db_lock,
2462        };
2463
2464        // Check if recovery needed
2465        if storage.check_recovery_needed()? {
2466            storage.needs_recovery.store(1, Ordering::SeqCst);
2467        }
2468
2469        Ok(storage)
2470    }
2471
2472    /// Open with group commit enabled
2473    pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2474        Self::open_with_group_commit_and_config(path, true)
2475    }
2476
2477    /// Open with group commit and configurable ordered index
2478    pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2479        path: P,
2480        enable_ordered_index: bool,
2481    ) -> Result<Self> {
2482        let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2483
2484        let wal = storage.wal.clone();
2485        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2486            // Write all commit records WITHOUT flushing (batch them)
2487            for &txn_id in txn_ids {
2488                let entry = TxnWalEntry::txn_commit(txn_id);
2489                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2490            }
2491
2492            // Then do a SINGLE flush + fsync for the entire batch
2493            wal.flush().map_err(|e| e.to_string())?;
2494            wal.sync().map_err(|e| e.to_string())?;
2495
2496            // Return commit timestamp
2497            Ok(std::time::SystemTime::now()
2498                .duration_since(std::time::UNIX_EPOCH)
2499                .unwrap()
2500                .as_micros() as u64)
2501        });
2502
2503        storage.group_commit = Some(Arc::new(gc));
2504        Ok(storage)
2505    }
2506
2507    /// Open with IndexPolicy for automatic memtable/index configuration
2508    ///
2509    /// This is the recommended constructor for new code. The policy determines:
2510    /// - Whether to use ordered index (ScanOptimized only)
2511    /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2512    /// - Default settings optimized for the workload pattern
2513    ///
2514    /// # Arguments
2515    /// * `path` - Storage directory path
2516    /// * `policy` - Index policy determining write/scan tradeoffs
2517    /// * `group_commit` - Whether to enable group commit for throughput
2518    pub fn open_with_policy<P: AsRef<Path>>(
2519        path: P,
2520        policy: crate::index_policy::IndexPolicy,
2521        group_commit: bool,
2522    ) -> Result<Self> {
2523        use crate::index_policy::IndexPolicy;
2524        
2525        // Derive configuration from policy
2526        let (enable_ordered_index, memtable_type) = match policy {
2527            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2528                // Write-heavy: no ordered index, use arena for reduced allocations
2529                (false, MemTableType::Arena)
2530            }
2531            IndexPolicy::Balanced => {
2532                // Mixed OLTP: deferred sorting (already implemented in Standard)
2533                (true, MemTableType::Standard)
2534            }
2535            IndexPolicy::ScanOptimized => {
2536                // Scan-heavy: maintain ordered index
2537                (true, MemTableType::Standard)
2538            }
2539        };
2540
2541        if group_commit {
2542            let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2543            
2544            let wal = storage.wal.clone();
2545            let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2546                for &txn_id in txn_ids {
2547                    let entry = TxnWalEntry::txn_commit(txn_id);
2548                    wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2549                }
2550                wal.flush().map_err(|e| e.to_string())?;
2551                wal.sync().map_err(|e| e.to_string())?;
2552                Ok(std::time::SystemTime::now()
2553                    .duration_since(std::time::UNIX_EPOCH)
2554                    .unwrap()
2555                    .as_micros() as u64)
2556            });
2557            storage.group_commit = Some(Arc::new(gc));
2558            Ok(storage)
2559        } else {
2560            Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2561        }
2562    }
2563
2564    /// Open storage for concurrent mode (multi-reader, single-writer)
2565    ///
2566    /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2567    /// Coordination is handled by the concurrent MVCC layer instead.
2568    ///
2569    /// # Safety
2570    ///
2571    /// This must ONLY be called from `Database::open_concurrent()` which
2572    /// manages the concurrent MVCC coordination. Direct use will cause
2573    /// data corruption.
2574    pub fn open_for_concurrent<P: AsRef<Path>>(
2575        path: P,
2576        policy: crate::index_policy::IndexPolicy,
2577    ) -> Result<Self> {
2578        use crate::index_policy::IndexPolicy;
2579        
2580        let (enable_ordered_index, memtable_type) = match policy {
2581            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2582                (false, MemTableType::Arena)
2583            }
2584            IndexPolicy::Balanced => {
2585                (true, MemTableType::Standard)
2586            }
2587            IndexPolicy::ScanOptimized => {
2588                (true, MemTableType::Standard)
2589            }
2590        };
2591
2592        // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2593        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, false)
2594    }
2595
2596    /// Get the memtable type being used
2597    pub fn memtable_type(&self) -> MemTableType {
2598        self.memtable.kind()
2599    }
2600
2601    /// Check if recovery is needed (dirty shutdown detection)
2602    ///
2603    /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2604    /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2605    /// but committed data still needs to be loaded from WAL into the memtable.
2606    fn check_recovery_needed(&self) -> Result<bool> {
2607        let marker_path = self.path.join(".clean_shutdown");
2608        if marker_path.exists() {
2609            // Clean shutdown - remove marker
2610            std::fs::remove_file(&marker_path)?;
2611        }
2612        // ALWAYS need recovery to rebuild memtable from WAL
2613        // The memtable is in-memory only and needs to be restored on every startup
2614        Ok(true)
2615    }
2616
2617    /// Perform crash recovery
2618    pub fn recover(&self) -> Result<RecoveryStats> {
2619        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2620            return Ok(RecoveryStats::default());
2621        }
2622
2623        let (writes, txn_count) = self.wal.replay_for_recovery()?;
2624
2625        // Apply committed writes to memtable
2626        let recovery_txn_id = self.wal.alloc_txn_id();
2627        let commit_ts = self.mvcc.alloc_commit_ts();
2628
2629        // Collect keys being written for efficient commit
2630        let mut write_set: HashSet<InlineKey> = HashSet::new();
2631        for (key, value) in &writes {
2632            write_set.insert(SmallVec::from_slice(key));
2633            self.memtable
2634                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2635        }
2636        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2637
2638        self.needs_recovery.store(0, Ordering::SeqCst);
2639
2640        Ok(RecoveryStats {
2641            transactions_recovered: txn_count,
2642            writes_recovered: writes.len(),
2643            commit_ts,
2644        })
2645    }
2646
2647    /// Begin a new transaction
2648    pub fn begin_transaction(&self) -> Result<u64> {
2649        let txn_id = self.wal.begin_transaction()?;
2650        self.mvcc.begin(txn_id);
2651        Ok(txn_id)
2652    }
2653
2654    /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2655    ///
2656    /// This enables mode-aware optimizations:
2657    /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2658    /// - WriteOnly: Skip read tracking, faster bulk inserts
2659    /// - ReadWrite: Full SSI for serializable isolation
2660    pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2661        let txn_id = self.wal.begin_transaction()?;
2662        self.mvcc.begin_with_mode(txn_id, mode);
2663        Ok(txn_id)
2664    }
2665
2666    /// Read a key within a transaction
2667    #[inline]
2668    pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2669        // Fast path: get just snapshot_ts without cloning whole transaction
2670        let snapshot_ts = self
2671            .mvcc
2672            .get_snapshot_ts(txn_id)
2673            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2674
2675        // Record read for SSI (skipped for read-only transactions)
2676        self.mvcc.record_read(txn_id, key);
2677
2678        // Read at snapshot timestamp, seeing own uncommitted writes
2679        Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2680    }
2681
2682    /// Write a key-value pair within a transaction
2683    ///
2684    /// Writes are buffered and only flushed to disk on commit.
2685    /// This provides ~10× better throughput for batched inserts.
2686    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2687        // Use the zero-allocation path internally
2688        self.write_refs(txn_id, &key, &value)?;
2689
2690        Ok(())
2691    }
2692
2693    /// Write from references - zero allocation hot path
2694    ///
2695    /// Avoids cloning key/value by writing to WAL from refs directly,
2696    /// then only allocating once for memtable storage.
2697    #[inline]
2698    pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2699        // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2700        self.mvcc.record_write(txn_id, key);
2701
2702        // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2703        // This reduces lock contention from O(writes) to O(1) per transaction
2704        self.txn_write_buffers
2705            .entry(txn_id)
2706            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2707            .append(key, value);
2708
2709        // Write to memtable (needs owned key/value for storage)
2710        self.memtable
2711            .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2712
2713        Ok(())
2714    }
2715
2716    /// Delete a key within a transaction
2717    pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2718        // Record write (uses InlineKey - zero allocation for small keys)
2719        self.mvcc.record_write(txn_id, &key);
2720
2721        // Buffer tombstone in memory - NO WAL lock taken!
2722        self.txn_write_buffers
2723            .entry(txn_id)
2724            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2725            .append(&key, &[]); // Empty value = tombstone
2726
2727        // Write tombstone to memtable
2728        self.memtable.write(key, None, txn_id)?;
2729
2730        Ok(())
2731    }
2732
2733    /// Batch write multiple key-value pairs with reduced overhead
2734    ///
2735    /// This API amortizes fixed costs over the batch:
2736    /// - Single DashMap entry lookup for TxnWalBuffer
2737    /// - Single MVCC write set update
2738    /// - Batch memtable operations
2739    ///
2740    /// Performance: ~2-3x faster than individual write_refs calls
2741    /// for batches of 100+ entries.
2742    ///
2743    /// # Arguments
2744    /// * `txn_id` - Transaction ID
2745    /// * `writes` - Slice of (key, value) pairs
2746    #[inline]
2747    pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2748        if writes.is_empty() {
2749            return Ok(());
2750        }
2751
2752        // Single DashMap access for entire batch
2753        let mut buffer_entry = self
2754            .txn_write_buffers
2755            .entry(txn_id)
2756            .or_insert_with(|| TxnWalBuffer::new(txn_id));
2757
2758        // Batch operations with reduced per-row overhead
2759        for (key, value) in writes {
2760            // Record write for MVCC
2761            self.mvcc.record_write(txn_id, key);
2762
2763            // Append to WAL buffer
2764            buffer_entry.append(key, value);
2765        }
2766        drop(buffer_entry);
2767
2768        // Batch write to memtable
2769        let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2770            .iter()
2771            .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2772            .collect();
2773        self.memtable.write_batch(&owned_writes, txn_id)?;
2774
2775        Ok(())
2776    }
2777
2778    /// Commit a transaction
2779    ///
2780    /// With sync_mode:
2781    /// - 0 (OFF): No sync, risk of data loss
2782    /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2783    /// - 2 (FULL): Sync every commit (safest, slowest)
2784    pub fn commit(&self, txn_id: u64) -> Result<u64> {
2785        // First, flush all buffered writes to WAL with SINGLE lock acquisition
2786        // This is the key optimization: O(1) lock instead of O(writes) locks
2787        if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2788            && !buffer.is_empty()
2789        {
2790            // Flush entire buffer to WAL with one lock
2791            self.wal.flush_buffer(&buffer)?;
2792        }
2793
2794        // Use group commit if available, otherwise direct commit
2795        if let Some(gc) = &self.group_commit {
2796            // Submit to group commit and wait for result
2797            // This batches multiple commits into a single fsync
2798            gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
2799
2800            // Get commit timestamp and write_set from MVCC
2801            let (commit_ts, write_set) = self
2802                .mvcc
2803                .commit(txn_id)
2804                .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2805
2806            // Commit in memtable (O(k) where k = keys written)
2807            self.memtable.commit(txn_id, commit_ts, &write_set);
2808
2809            Ok(commit_ts)
2810        } else {
2811            // Direct commit path with adaptive sync (Little's Law)
2812            let sync_mode = self.sync_mode.load(Ordering::Relaxed);
2813            let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
2814
2815            // Update arrival rate for adaptive batching
2816            self.update_arrival_rate();
2817
2818            // Write commit record (no flush yet - BufWriter will buffer it)
2819            let entry = TxnWalEntry::txn_commit(txn_id);
2820            self.wal.append_no_flush(&entry)?;
2821
2822            // Determine if we should sync/flush based on mode
2823            let should_sync = match sync_mode {
2824                0 => false,                                      // OFF: never sync
2825                1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
2826                _ => true,                                       // FULL: always sync
2827            };
2828
2829            if should_sync {
2830                // Measure fsync latency for adaptive tuning
2831                let start = std::time::Instant::now();
2832                self.wal.flush()?;
2833                self.wal.sync()?;
2834                let latency_us = start.elapsed().as_micros() as u64;
2835
2836                // Update fsync latency estimate (EMA with α = 0.1)
2837                let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
2838                let new_latency = (old_latency * 9 + latency_us) / 10;
2839                self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
2840
2841                self.commits_since_sync.store(0, Ordering::Relaxed);
2842            }
2843
2844            // Get commit timestamp and write_set from MVCC
2845            let (commit_ts, write_set) = self
2846                .mvcc
2847                .commit(txn_id)
2848                .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2849
2850            // Commit in memtable (O(k) where k = keys written)
2851            self.memtable.commit(txn_id, commit_ts, &write_set);
2852
2853            Ok(commit_ts)
2854        }
2855    }
2856
2857    /// Update arrival rate using exponential moving average
2858    #[inline]
2859    fn update_arrival_rate(&self) {
2860        let now_us = std::time::SystemTime::now()
2861            .duration_since(std::time::UNIX_EPOCH)
2862            .unwrap()
2863            .as_micros() as u64;
2864
2865        let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
2866
2867        if last > 0 {
2868            let delta_us = now_us.saturating_sub(last);
2869            if delta_us > 0 && delta_us < 10_000_000 {
2870                // Ignore gaps > 10s
2871                // Rate = 1_000_000 / delta_us (requests/sec)
2872                // Stored as rate × 1000 for precision
2873                let instant_rate = 1_000_000_000 / delta_us;
2874
2875                // EMA with α = 0.1
2876                let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
2877                let new_rate = (old_rate * 9 + instant_rate) / 10;
2878                self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
2879            }
2880        }
2881    }
2882
2883    /// Compute optimal batch threshold using Little's Law
2884    ///
2885    /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
2886    /// Returns the number of commits to batch before fsync
2887    #[inline]
2888    fn adaptive_batch_threshold(&self) -> u64 {
2889        let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
2890        let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
2891
2892        if lambda <= 0.0 || tau <= 0.0 {
2893            return 100; // Fallback to fixed threshold
2894        }
2895
2896        // Little's Law: W* = sqrt(2 × τ × λ)
2897        // This minimizes total time = wait_time + fsync_overhead
2898        let n_opt = (2.0 * tau * lambda).sqrt();
2899
2900        // Clamp between 1 and 1000
2901        (n_opt as u64).clamp(1, 1000)
2902    }
2903
2904    /// Set synchronous mode
2905    ///
2906    /// - 0: OFF - No fsync (risk of data loss)
2907    /// - 1: NORMAL - Periodic fsync (balanced)
2908    /// - 2: FULL - Fsync every commit (safest)
2909    pub fn set_sync_mode(&self, mode: u64) {
2910        self.sync_mode.store(mode.min(2), Ordering::Relaxed);
2911    }
2912
2913    /// Force a group commit flush (useful for benchmarking or testing)
2914    pub fn flush_group_commit(&self) {
2915        if let Some(gc) = &self.group_commit {
2916            gc.flush_batch();
2917        }
2918    }
2919
2920    /// Abort a transaction
2921    pub fn abort(&self, txn_id: u64) -> Result<()> {
2922        // Discard buffered writes (no need to write to WAL)
2923        self.txn_write_buffers.remove(&txn_id);
2924
2925        // Write abort to WAL
2926        self.wal.abort_transaction(txn_id)?;
2927
2928        // Abort in MVCC
2929        self.mvcc.abort(txn_id);
2930
2931        // Abort in memtable
2932        self.memtable.abort(txn_id);
2933
2934        Ok(())
2935    }
2936
2937    /// Scan keys with prefix
2938    #[inline]
2939    pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2940        // Fast path: get just snapshot_ts without cloning whole transaction
2941        let snapshot_ts = self
2942            .mvcc
2943            .get_snapshot_ts(txn_id)
2944            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2945
2946        // Note: Scan doesn't record individual key reads for SSI (too expensive)
2947        // SSI conflicts are tracked at the prefix level if needed
2948        Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
2949    }
2950
2951    /// Scan keys in range
2952    #[inline]
2953    pub fn scan_range(
2954        &self,
2955        txn_id: u64,
2956        start: &[u8],
2957        end: &[u8],
2958    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2959        let snapshot_ts = self
2960            .mvcc
2961            .get_snapshot_ts(txn_id)
2962            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2963
2964        Ok(self
2965            .memtable
2966            .scan_range(start, end, snapshot_ts, Some(txn_id)))
2967    }
2968
2969    /// Streaming scan for very large result sets
2970    /// 
2971    /// Returns an iterator that yields (key, value) pairs without
2972    /// materializing the entire result set in memory.
2973    #[inline]
2974    pub fn scan_range_iter<'a>(
2975        &'a self,
2976        txn_id: u64,
2977        start: &'a [u8],
2978        end: &'a [u8],
2979    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
2980        let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
2981        self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
2982    }
2983
2984    /// Force fsync to disk
2985    /// Flush the WAL's in-memory buffer to the OS
2986    ///
2987    /// This ensures all buffered writes are pushed from the BufWriter
2988    /// into the OS page cache. Call this before `fsync()` to ensure
2989    /// all data is durable.
2990    pub fn flush_wal(&self) -> Result<()> {
2991        self.wal.flush()
2992    }
2993
2994    /// Force sync the WAL to disk (fsync)
2995    pub fn fsync(&self) -> Result<()> {
2996        self.wal.sync()
2997    }
2998
2999    /// Write checkpoint
3000    pub fn checkpoint(&self) -> Result<u64> {
3001        let txn_id = 0; // System transaction
3002        let entry = TxnWalEntry::checkpoint(txn_id);
3003        let lsn = self.wal.append_sync(&entry)?;
3004        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3005        Ok(lsn)
3006    }
3007
3008    /// Truncate the WAL file after checkpoint.
3009    ///
3010    /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3011    /// space. The in-memory memtable retains all data for the current
3012    /// session, but a crash after truncation will result in data loss
3013    /// since the WAL is the only persistence mechanism for DurableStorage.
3014    ///
3015    /// Call after `checkpoint()` when WAL durability across restarts is
3016    /// not required (e.g. desktop telemetry viewers, caches).
3017    pub fn truncate_wal(&self) -> Result<()> {
3018        self.wal.truncate()
3019    }
3020
3021    /// Get storage statistics
3022    pub fn stats(&self) -> StorageStats {
3023        // Get WAL size from the WAL manager
3024        let wal_size = self.wal.size_bytes();
3025
3026        // Get active transaction count from MVCC
3027        let active_txns = self.mvcc.active_transaction_count();
3028
3029        StorageStats {
3030            memtable_size_bytes: self.memtable.size(),
3031            wal_size_bytes: wal_size,
3032            active_transactions: active_txns,
3033            min_active_snapshot: self.mvcc.min_active_snapshot(),
3034            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3035        }
3036    }
3037
3038    /// Garbage collect old versions
3039    pub fn gc(&self) -> usize {
3040        let min_ts = self.mvcc.min_active_snapshot();
3041        self.memtable.gc(min_ts)
3042    }
3043
3044    /// Clean shutdown
3045    pub fn shutdown(&self) -> Result<()> {
3046        // Sync WAL
3047        self.fsync()?;
3048
3049        // Write clean shutdown marker
3050        let marker_path = self.path.join(".clean_shutdown");
3051        std::fs::write(&marker_path, b"clean")?;
3052
3053        Ok(())
3054    }
3055}
3056
3057impl Drop for DurableStorage {
3058    fn drop(&mut self) {
3059        let _ = self.shutdown();
3060    }
3061}
3062
3063/// Recovery statistics
3064#[derive(Debug, Default)]
3065pub struct RecoveryStats {
3066    pub transactions_recovered: usize,
3067    pub writes_recovered: usize,
3068    pub commit_ts: u64,
3069}
3070
3071/// Storage statistics
3072#[derive(Debug, Default)]
3073pub struct StorageStats {
3074    pub memtable_size_bytes: u64,
3075    pub wal_size_bytes: u64,
3076    pub active_transactions: usize,
3077    pub min_active_snapshot: u64,
3078    pub last_checkpoint_lsn: u64,
3079}
3080
3081#[cfg(test)]
3082mod tests {
3083    use super::*;
3084    use tempfile::tempdir;
3085
3086    #[test]
3087    fn test_basic_transaction() {
3088        let dir = tempdir().unwrap();
3089        let storage = DurableStorage::open(dir.path()).unwrap();
3090
3091        // Begin transaction
3092        let txn_id = storage.begin_transaction().unwrap();
3093
3094        // Write data
3095        storage
3096            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3097            .unwrap();
3098        storage
3099            .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3100            .unwrap();
3101
3102        // Read back (within same transaction)
3103        let v1 = storage.read(txn_id, b"key1").unwrap();
3104        assert_eq!(v1, Some(b"value1".to_vec()));
3105
3106        // Commit
3107        let commit_ts = storage.commit(txn_id).unwrap();
3108        assert!(commit_ts > 0);
3109
3110        // Read in new transaction
3111        let txn2 = storage.begin_transaction().unwrap();
3112        let v1 = storage.read(txn2, b"key1").unwrap();
3113        assert_eq!(v1, Some(b"value1".to_vec()));
3114        storage.abort(txn2).unwrap();
3115    }
3116
3117    #[test]
3118    fn test_snapshot_isolation() {
3119        let dir = tempdir().unwrap();
3120        let storage = DurableStorage::open(dir.path()).unwrap();
3121
3122        // T1: Write initial value
3123        let t1 = storage.begin_transaction().unwrap();
3124        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3125        storage.commit(t1).unwrap();
3126
3127        // T2: Start reading (snapshot at this point)
3128        let t2 = storage.begin_transaction().unwrap();
3129
3130        // T3: Update the value
3131        let t3 = storage.begin_transaction().unwrap();
3132        storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3133        storage.commit(t3).unwrap();
3134
3135        // T2 should still see v1 (snapshot isolation)
3136        let v = storage.read(t2, b"key").unwrap();
3137        assert_eq!(v, Some(b"v1".to_vec()));
3138
3139        // New transaction should see v2
3140        let t4 = storage.begin_transaction().unwrap();
3141        let v = storage.read(t4, b"key").unwrap();
3142        assert_eq!(v, Some(b"v2".to_vec()));
3143
3144        storage.abort(t2).unwrap();
3145        storage.abort(t4).unwrap();
3146    }
3147
3148    #[test]
3149    fn test_abort_transaction() {
3150        let dir = tempdir().unwrap();
3151        let storage = DurableStorage::open(dir.path()).unwrap();
3152
3153        // Write initial value
3154        let t1 = storage.begin_transaction().unwrap();
3155        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3156        storage.commit(t1).unwrap();
3157
3158        // Start transaction that will abort
3159        let t2 = storage.begin_transaction().unwrap();
3160        storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3161        storage.abort(t2).unwrap();
3162
3163        // New transaction should see v1 (aborted changes not visible)
3164        let t3 = storage.begin_transaction().unwrap();
3165        let v = storage.read(t3, b"key").unwrap();
3166        assert_eq!(v, Some(b"v1".to_vec()));
3167        storage.abort(t3).unwrap();
3168    }
3169
3170    #[test]
3171    fn test_crash_recovery() {
3172        let dir = tempdir().unwrap();
3173
3174        // Phase 1: Write data and commit
3175        {
3176            // Use open_without_lock for crash simulation tests
3177            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3178
3179            // Set sync mode to FULL to ensure data is synced before "crash"
3180            storage.set_sync_mode(2); // FULL: sync every commit
3181
3182            let txn = storage.begin_transaction().unwrap();
3183            storage
3184                .write(txn, b"persist".to_vec(), b"data".to_vec())
3185                .unwrap();
3186            storage.commit(txn).unwrap();
3187
3188            // Simulate crash (no clean shutdown)
3189            std::mem::forget(storage);
3190        }
3191
3192        // Phase 2: Reopen and recover
3193        {
3194            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3195            let stats = storage.recover().unwrap();
3196            assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3197
3198            // Data should be recovered
3199            let txn = storage.begin_transaction().unwrap();
3200            let v = storage.read(txn, b"persist").unwrap();
3201            assert_eq!(v, Some(b"data".to_vec()));
3202            storage.abort(txn).unwrap();
3203        }
3204    }
3205
3206    #[test]
3207    fn test_scan_prefix() {
3208        let dir = tempdir().unwrap();
3209        let storage = DurableStorage::open(dir.path()).unwrap();
3210
3211        let txn = storage.begin_transaction().unwrap();
3212        storage
3213            .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3214            .unwrap();
3215        storage
3216            .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3217            .unwrap();
3218        storage
3219            .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3220            .unwrap();
3221        storage.commit(txn).unwrap();
3222
3223        let txn2 = storage.begin_transaction().unwrap();
3224        let users = storage.scan(txn2, b"user:").unwrap();
3225        assert_eq!(users.len(), 2);
3226        storage.abort(txn2).unwrap();
3227    }
3228
3229    #[test]
3230    fn test_delete() {
3231        let dir = tempdir().unwrap();
3232        let storage = DurableStorage::open(dir.path()).unwrap();
3233
3234        // Insert
3235        let t1 = storage.begin_transaction().unwrap();
3236        storage
3237            .write(t1, b"key".to_vec(), b"value".to_vec())
3238            .unwrap();
3239        storage.commit(t1).unwrap();
3240
3241        // Verify exists
3242        let t2 = storage.begin_transaction().unwrap();
3243        assert!(storage.read(t2, b"key").unwrap().is_some());
3244        storage.abort(t2).unwrap();
3245
3246        // Delete
3247        let t3 = storage.begin_transaction().unwrap();
3248        storage.delete(t3, b"key".to_vec()).unwrap();
3249        storage.commit(t3).unwrap();
3250
3251        // Verify deleted
3252        let t4 = storage.begin_transaction().unwrap();
3253        assert!(storage.read(t4, b"key").unwrap().is_none());
3254        storage.abort(t4).unwrap();
3255    }
3256
3257    #[test]
3258    fn test_gc() {
3259        let dir = tempdir().unwrap();
3260        let storage = DurableStorage::open(dir.path()).unwrap();
3261
3262        // Create multiple versions
3263        for i in 0..10 {
3264            let txn = storage.begin_transaction().unwrap();
3265            storage
3266                .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3267                .unwrap();
3268            storage.commit(txn).unwrap();
3269        }
3270
3271        // GC should reclaim old versions
3272        let gc_count = storage.gc();
3273        // At least some versions should be collected
3274        // (exact count depends on implementation)
3275        let _ = gc_count; // gc_count is usize, always >= 0
3276    }
3277
3278    #[test]
3279    fn test_group_commit() {
3280        use std::sync::Arc;
3281        use std::thread;
3282
3283        let dir = tempdir().unwrap();
3284        let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3285
3286        // Spawn multiple threads to commit concurrently
3287        let mut handles = vec![];
3288        for i in 0..4 {
3289            let storage = Arc::clone(&storage);
3290            handles.push(thread::spawn(move || {
3291                let txn = storage.begin_transaction().unwrap();
3292                storage
3293                    .write(
3294                        txn,
3295                        format!("key{}", i).into_bytes(),
3296                        format!("val{}", i).into_bytes(),
3297                    )
3298                    .unwrap();
3299                storage.commit(txn).unwrap()
3300            }));
3301        }
3302
3303        // Wait for all commits
3304        let mut commit_times = vec![];
3305        for h in handles {
3306            commit_times.push(h.join().unwrap());
3307        }
3308
3309        // All commits should succeed
3310        assert!(commit_times.iter().all(|&ts| ts > 0));
3311
3312        // Verify data persisted
3313        let txn = storage.begin_transaction().unwrap();
3314        for i in 0..4 {
3315            let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3316            assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3317        }
3318        storage.abort(txn).unwrap();
3319    }
3320
3321    // ==================== ArenaMvccMemTable Tests ====================
3322
3323    #[test]
3324    fn test_arena_memtable_basic_write_read() {
3325        let memtable = ArenaMvccMemTable::new();
3326
3327        // Write some values
3328        memtable
3329            .write(b"key1", Some(b"value1".to_vec()), 1)
3330            .unwrap();
3331        memtable
3332            .write(b"key2", Some(b"value2".to_vec()), 1)
3333            .unwrap();
3334
3335        // Read them back (uncommitted, so need txn_id match)
3336        assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3337        assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3338        assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3339    }
3340
3341    #[test]
3342    fn test_arena_memtable_update() {
3343        let memtable = ArenaMvccMemTable::new();
3344
3345        memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3346        memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3347
3348        assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3349    }
3350
3351    #[test]
3352    fn test_arena_memtable_delete() {
3353        let memtable = ArenaMvccMemTable::new();
3354
3355        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3356        memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3357
3358        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3359    }
3360
3361    #[test]
3362    fn test_arena_memtable_scan_prefix() {
3363        let memtable = ArenaMvccMemTable::new();
3364
3365        memtable
3366            .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3367            .unwrap();
3368        memtable
3369            .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3370            .unwrap();
3371        memtable
3372            .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3373            .unwrap();
3374        memtable
3375            .write(b"order:1", Some(b"order_data".to_vec()), 1)
3376            .unwrap();
3377
3378        // Create a write set and commit
3379        let mut write_set = HashSet::new();
3380        write_set.insert(InlineKey::from_slice(b"user:1:name"));
3381        write_set.insert(InlineKey::from_slice(b"user:1:email"));
3382        write_set.insert(InlineKey::from_slice(b"user:2:name"));
3383        write_set.insert(InlineKey::from_slice(b"order:1"));
3384        memtable.commit(1, 10, &write_set);
3385
3386        // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3387        let results = memtable.scan_prefix(b"user:1:", 11, None);
3388        assert_eq!(results.len(), 2);
3389
3390        // Scan for all users
3391        let results = memtable.scan_prefix(b"user:", 11, None);
3392        assert_eq!(results.len(), 3);
3393    }
3394
3395    #[test]
3396    fn test_arena_memtable_write_batch() {
3397        let memtable = ArenaMvccMemTable::new();
3398
3399        let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3400            (b"k1", Some(b"v1".to_vec())),
3401            (b"k2", Some(b"v2".to_vec())),
3402            (b"k3", Some(b"v3".to_vec())),
3403        ];
3404
3405        memtable.write_batch(&writes, 1).unwrap();
3406
3407        assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3408        assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3409        assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3410    }
3411
3412    #[test]
3413    fn test_arena_memtable_gc() {
3414        let memtable = ArenaMvccMemTable::new();
3415
3416        // Write multiple versions
3417        for i in 0..10 {
3418            memtable
3419                .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3420                .unwrap();
3421
3422            let mut write_set = HashSet::new();
3423            write_set.insert(InlineKey::from_slice(b"key"));
3424            memtable.commit(i + 1, (i + 1) * 10, &write_set);
3425        }
3426
3427        // GC old versions
3428        let gc_count = memtable.gc(90);
3429        let _ = gc_count; // gc_count is usize, always >= 0
3430    }
3431
3432    #[test]
3433    fn test_arena_memtable_size_tracking() {
3434        let memtable = ArenaMvccMemTable::new();
3435
3436        assert_eq!(memtable.size(), 0);
3437
3438        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3439
3440        assert!(memtable.size() > 0);
3441    }
3442
3443    #[test]
3444    fn test_arena_memtable_abort() {
3445        let memtable = ArenaMvccMemTable::new();
3446
3447        memtable
3448            .write(b"key", Some(b"uncommitted".to_vec()), 1)
3449            .unwrap();
3450
3451        // Visible to same txn
3452        assert_eq!(
3453            memtable.read(b"key", 0, Some(1)),
3454            Some(b"uncommitted".to_vec())
3455        );
3456
3457        // Not visible to other txns
3458        assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3459
3460        // Abort
3461        memtable.abort(1);
3462
3463        // No longer visible
3464        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3465    }
3466
3467    // ========================================================================
3468    // MemTableKind Tests - Unified Abstraction
3469    // ========================================================================
3470
3471    #[test]
3472    fn test_memtable_kind_standard() {
3473        let memtable = MemTableKind::new(MemTableType::Standard, true);
3474        assert_eq!(memtable.kind(), MemTableType::Standard);
3475
3476        // Write and read
3477        memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3478        
3479        // Commit transaction at ts=100
3480        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3481        memtable.commit(1, 100, &write_set);
3482        
3483        // Read after commit - snapshot_ts must be > commit_ts for visibility
3484        let v = memtable.read(b"key1", 101, None);
3485        assert_eq!(v, Some(b"value1".to_vec()));
3486    }
3487
3488    #[test]
3489    fn test_memtable_kind_arena() {
3490        let memtable = MemTableKind::new(MemTableType::Arena, true);
3491        assert_eq!(memtable.kind(), MemTableType::Arena);
3492
3493        // Write and read
3494        memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3495        
3496        // Commit at ts=100
3497        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3498        memtable.commit(1, 100, &write_set);
3499        
3500        // Read after commit - snapshot_ts > commit_ts
3501        let v = memtable.read(b"key1", 101, None);
3502        assert_eq!(v, Some(b"value1".to_vec()));
3503    }
3504
3505    #[test]
3506    fn test_memtable_kind_scan_range() {
3507        // Test both implementations have consistent behavior
3508        for kind in [MemTableType::Standard, MemTableType::Arena] {
3509            let memtable = MemTableKind::new(kind, true);
3510
3511            // Write some data
3512            for i in 0..5 {
3513                let key = format!("key{}", i);
3514                let value = format!("value{}", i);
3515                memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
3516            }
3517
3518            // Commit all at ts=100
3519            let write_set: HashSet<InlineKey> = (0..5)
3520                .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3521                .collect();
3522            memtable.commit(1, 100, &write_set);
3523
3524            // Scan range with snapshot_ts > commit_ts
3525            let results = memtable.scan_range(b"key1", b"key4", 101, None);
3526            assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
3527        }
3528    }
3529
3530    #[test]
3531    fn test_durable_storage_arena() {
3532        let dir = tempdir().unwrap();
3533        let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3534        
3535        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3536
3537        // Basic transaction should work the same
3538        let txn_id = storage.begin_transaction().unwrap();
3539        storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
3540        storage.commit(txn_id).unwrap();
3541
3542        let txn2 = storage.begin_transaction().unwrap();
3543        let v = storage.read(txn2, b"key1").unwrap();
3544        assert_eq!(v, Some(b"value1".to_vec()));
3545        storage.abort(txn2).unwrap();
3546    }
3547
3548    #[test]
3549    fn test_durable_storage_full_config() {
3550        let dir = tempdir().unwrap();
3551        
3552        // Test with Arena and ordered index enabled
3553        let storage = DurableStorage::open_with_full_config(
3554            dir.path(),
3555            true,
3556            MemTableType::Arena,
3557        ).unwrap();
3558        
3559        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3560
3561        // Write multiple keys
3562        let txn = storage.begin_transaction().unwrap();
3563        for i in 0..10 {
3564            let key = format!("key{:02}", i);
3565            let value = format!("value{}", i);
3566            storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
3567        }
3568        storage.commit(txn).unwrap();
3569
3570        // Scan should work (uses scan method for prefix)
3571        let txn2 = storage.begin_transaction().unwrap();
3572        let results = storage.scan(txn2, b"key0").unwrap();
3573        assert_eq!(results.len(), 10); // key00 through key09
3574        storage.abort(txn2).unwrap();
3575    }
3576}