Skip to main content

sochdb_storage/
durable_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! This module wires together all storage components into a production-ready
21//! durable storage engine:
22//!
23//! - WAL (txn_wal.rs) for durability
24//! - Group Commit for throughput
25//! - MVCC for isolation
26//! - LSCS for columnar efficiency
27//!
28//! ## Architecture
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │                      DurableStorage                              │
33//! ├─────────────────────────────────────────────────────────────────┤
34//! │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
35//! │  │ MvccManager │    │ GroupCommit │───▶│ TxnWal (fsync)      │ │
36//! │  │             │    │             │    └─────────────────────┘ │
37//! │  │ ┌─────────┐ │    └─────────────┘                            │
38//! │  │ │Snapshots│ │                                                │
39//! │  │ └─────────┘ │    ┌─────────────────────────────────────────┐│
40//! │  │ ┌─────────┐ │    │              MemTable                    ││
41//! │  │ │ Txn Map │ │    │  (key → (value, txn_id, version))       ││
42//! │  │ └─────────┘ │    └─────────────────────────────────────────┘│
43//! │  └─────────────┘                                                │
44//! │                      ┌─────────────────────────────────────────┐│
45//! │                      │              LSCS (SST)                  ││
46//! │                      │  Immutable columnar segments             ││
47//! │                      └─────────────────────────────────────────┘│
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Concurrency
52//!
53//! - Writers: Serialize through WAL, use MVCC for conflict detection
54//! - Readers: Lock-free reads at snapshot timestamp
55//! - Commits: Batched through GroupCommit for throughput
56//!
57//! ## Isolation Contract
58//!
59//! The live write path (`MvccManager`, used by `DurableStorage`) provides
60//! **Serializable Snapshot Isolation (SSI)**, which is strictly stronger than
61//! plain Snapshot Isolation (SI):
62//!
63//! - **Snapshot reads.** Every transaction reads from a consistent snapshot
64//!   fixed at `begin_transaction()` (its `snapshot_ts`). Concurrent commits are
65//!   invisible to it — see `test_snapshot_isolation`. This alone is SI and is
66//!   vulnerable to **write skew** (two transactions each read a set the other
67//!   writes, both commit, and no serial order reproduces the result).
68//! - **Write-skew prevention.** On commit, `MvccManager::validate_ssi` inspects
69//!   recently-committed concurrent transactions for rw-antidependency edges:
70//!   an inbound edge (another txn wrote a key we read, `T_other →rw→ T_me`) and
71//!   an outbound edge (we wrote a key another txn read, `T_me →rw→ T_other`).
72//!   When a transaction sits on **both** an inbound and an outbound rw-edge it
73//!   is the pivot of a potential dependency cycle (Cahill/Fekete "dangerous
74//!   structure"), so it is aborted. This is the conservative safe subset of the
75//!   SSI test: it may abort some serializable schedules (false positives) but
76//!   **never admits a non-serializable one** (no false negatives), so the
77//!   externally observable isolation level is Serializable.
78//! - **Read-only transactions** (`begin_read_only`) skip read-set tracking and
79//!   never participate in validation; they always observe a serializable
80//!   snapshot and never abort.
81//!
82//! ### MVCC garbage collection is snapshot-safe
83//!
84//! Old versions are pruned by `DurableStorage::gc()`, which feeds the
85//! **low-water-mark** `MvccManager::min_active_snapshot()` — the minimum
86//! `snapshot_ts` across all still-active transactions — into
87//! `MvccMemTable::gc()` and then `BinarySearchChain::gc_by_ts()`. The chain
88//! retains every version with `commit_ts > min_active_ts` **plus one anchor
89//! version at or below the watermark**, so any in-flight reader can still
90//! resolve the correct version for its snapshot. A version is only freed once
91//! **no active snapshot can observe it**. The watermark is recomputed on every
92//! `begin`/`commit`/`abort`, so it is monotonic with respect to the oldest live
93//! reader. See `test_gc_preserves_versions_for_active_snapshot`.
94
95use std::collections::HashSet;
96use std::path::{Path, PathBuf};
97use std::sync::Arc;
98use std::sync::atomic::{AtomicU64, Ordering};
99
100use dashmap::DashMap;
101use smallvec::SmallVec;
102
103use crossbeam_skiplist::SkipMap;
104
105use crate::deferred_index::{DeferredIndexConfig, DeferredSortedIndex};
106use crate::group_commit::EventDrivenGroupCommit;
107use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
108use sochdb_core::version_chain::{
109    BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
110    VisibilityContext, WriteConflictDetection,
111};
112use sochdb_core::{Result, SochDBError};
113
114// =============================================================================
115// SSI Bloom Filter - Fast Conflict Pre-Filtering
116// =============================================================================
117
118/// Space-efficient Bloom filter for SSI conflict detection
119///
120/// Used to quickly determine if two transactions MIGHT have conflicting keys.
121/// False positives are acceptable (leads to unnecessary exact checks),
122/// but false negatives are not allowed.
123///
124/// ## Configuration
125///
126/// For 1000 keys with 1% false positive rate:
127/// - m = ~9600 bits ≈ 1.2 KB per transaction
128/// - k = 7 hash functions
129///
130/// ## Lazy Initialization
131///
132/// The bit vector is lazily initialized on first insert to avoid
133/// allocation overhead for read-only transactions.
134#[derive(Clone, Debug)]
135pub struct SsiBloomFilter {
136    /// Bit vector (each u64 holds 64 bits) - lazily initialized
137    bits: Option<Vec<u64>>,
138    /// Expected capacity (used for lazy init sizing)
139    expected_capacity: usize,
140    /// Number of hash functions to use
141    num_hashes: u32,
142}
143
144impl SsiBloomFilter {
145    /// Optimal number of bits per item for 1% false positive rate
146    /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
147    const BITS_PER_ITEM: f64 = 9.6;
148
149    /// Optimal number of hash functions for 1% false positive rate
150    /// k = (m/n) × ln(2) ≈ 7
151    const DEFAULT_NUM_HASHES: u32 = 7;
152
153    /// Minimum capacity to avoid tiny filters
154    const MIN_CAPACITY: usize = 64;
155
156    /// Create a new bloom filter for expected item count (lazy allocation)
157    ///
158    /// Configured for ~1% false positive rate.
159    /// The bit vector is not allocated until first insert.
160    #[inline]
161    pub fn new(expected_items: usize) -> Self {
162        Self {
163            bits: None,
164            expected_capacity: expected_items.max(Self::MIN_CAPACITY),
165            num_hashes: Self::DEFAULT_NUM_HASHES,
166        }
167    }
168
169    /// Create with specific capacity in words (for memory-constrained scenarios)
170    pub fn with_word_capacity(words: usize) -> Self {
171        Self {
172            bits: None,
173            expected_capacity: words.max(1) * 64 / 10, // Approx items from words
174            num_hashes: Self::DEFAULT_NUM_HASHES,
175        }
176    }
177
178    /// Ensure bits are allocated (lazy initialization)
179    #[inline]
180    fn ensure_allocated(&mut self) {
181        if self.bits.is_none() {
182            let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
183            let num_words = num_bits.div_ceil(64);
184            self.bits = Some(vec![0u64; num_words]);
185        }
186    }
187
188    /// Add a key to the filter - O(k) where k = num_hashes
189    #[inline]
190    pub fn insert(&mut self, key: &[u8]) {
191        self.ensure_allocated();
192        let bits = self.bits.as_mut().unwrap();
193        let num_bits = bits.len() * 64;
194        if num_bits == 0 {
195            return;
196        }
197
198        // Use two hash functions to simulate k hash functions
199        // h(i) = h1 + i * h2 (double hashing technique)
200        let h1 = Self::hash1(key);
201        let h2 = Self::hash2(key);
202
203        for i in 0..self.num_hashes {
204            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
205            let bit_idx = (h as usize) % num_bits;
206            let word_idx = bit_idx / 64;
207            let bit_pos = bit_idx % 64;
208            bits[word_idx] |= 1 << bit_pos;
209        }
210    }
211
212    /// Check if a key might be present - O(k)
213    ///
214    /// Returns:
215    /// - false: Key is definitely NOT in the set (or filter not initialized)
216    /// - true: Key MIGHT be in the set (needs exact check)
217    #[inline]
218    pub fn may_contain(&self, key: &[u8]) -> bool {
219        let bits = match &self.bits {
220            Some(b) => b,
221            None => return false, // Uninitialized = empty
222        };
223        let num_bits = bits.len() * 64;
224        if num_bits == 0 {
225            return false;
226        }
227
228        let h1 = Self::hash1(key);
229        let h2 = Self::hash2(key);
230
231        for i in 0..self.num_hashes {
232            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
233            let bit_idx = (h as usize) % num_bits;
234            let word_idx = bit_idx / 64;
235            let bit_pos = bit_idx % 64;
236            if bits[word_idx] & (1 << bit_pos) == 0 {
237                return false; // Definitely not present
238            }
239        }
240        true // Might be present
241    }
242
243    /// Check if this filter might intersect with another
244    ///
245    /// Fast O(m/64) check using bitwise AND of all words.
246    /// If no bits are shared, sets are definitely disjoint.
247    #[inline]
248    pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
249        let (self_bits, other_bits) = match (&self.bits, &other.bits) {
250            (Some(s), Some(o)) => (s, o),
251            _ => return false, // Either uninitialized = no intersection
252        };
253        let min_len = self_bits.len().min(other_bits.len());
254        for i in 0..min_len {
255            if self_bits[i] & other_bits[i] != 0 {
256                return true; // Might intersect
257            }
258        }
259        false // Definitely disjoint
260    }
261
262    /// First hash function (using built-in hasher)
263    #[inline]
264    fn hash1(key: &[u8]) -> u64 {
265        use std::collections::hash_map::DefaultHasher;
266        use std::hash::{Hash, Hasher};
267        let mut hasher = DefaultHasher::new();
268        key.hash(&mut hasher);
269        hasher.finish()
270    }
271
272    /// Second hash function (using twox-hash for independence)
273    #[inline]
274    fn hash2(key: &[u8]) -> u64 {
275        twox_hash::xxh3::hash64(key)
276    }
277
278    /// Get the memory size in bytes
279    pub fn size_bytes(&self) -> usize {
280        self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
281    }
282
283    /// Check if the filter is empty
284    pub fn is_empty(&self) -> bool {
285        match &self.bits {
286            Some(bits) => bits.iter().all(|&w| w == 0),
287            None => true,
288        }
289    }
290}
291
292/// Type alias for inline key storage - keys up to 32 bytes stored on stack
293/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
294pub type InlineKey = SmallVec<[u8; 32]>;
295
296/// Version of a key-value pair
297#[derive(Debug, Clone)]
298pub struct Version {
299    /// The value (None = tombstone)
300    pub value: Option<Vec<u8>>,
301    /// Transaction that created this version
302    pub txn_id: u64,
303    /// Commit timestamp (0 = uncommitted)
304    pub commit_ts: u64,
305}
306
307// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
308impl ChainEntry for Version {
309    #[inline]
310    fn commit_ts(&self) -> u64 {
311        self.commit_ts
312    }
313    #[inline]
314    fn txn_id(&self) -> u64 {
315        self.txn_id
316    }
317    #[inline]
318    fn set_commit_ts(&mut self, ts: u64) {
319        self.commit_ts = ts;
320    }
321}
322
323// ============================================================================
324// Optimized VersionChain with Binary Search (Task 1: mm.md)
325// ============================================================================
326
327/// Multi-version data for a single key with O(log v) read complexity
328///
329/// ## Optimization: Binary Search with Sorted Commit Ordering
330///
331/// Separates committed versions (sorted descending by commit_ts) from
332/// uncommitted version (single optional slot per transaction).
333///
334/// **Before:** O(v) linear scan + O(v) max computation = O(v)
335/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
336///
337/// For v=10 versions: 3.3x speedup
338/// For v=100 versions: 7x speedup
339///
340/// ## Rec 11: Consolidated
341///
342/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
343/// eliminating duplication with `mvcc_concurrent::VersionChain`.
344#[derive(Debug, Default)]
345pub struct VersionChain {
346    /// Consolidated binary-search chain (Rec 11)
347    inner: BinarySearchChain<Version>,
348}
349
350impl VersionChain {
351    /// Create a new empty version chain
352    #[inline]
353    pub fn new() -> Self {
354        Self {
355            inner: BinarySearchChain::new(),
356        }
357    }
358
359    /// Add a new uncommitted version
360    /// If there's already an uncommitted version from this txn, update it in place
361    ///
362    /// O(1) - just updates the uncommitted slot
363    #[inline]
364    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
365        match self.inner.uncommitted_mut() {
366            Some(v) if v.txn_id == txn_id => {
367                // Update in place - O(1)
368                v.value = value;
369            }
370            _ => {
371                // New or different txn — set the slot
372                self.inner.set_uncommitted(Version {
373                    value,
374                    txn_id,
375                    commit_ts: 0,
376                });
377            }
378        }
379    }
380
381    /// Commit a version - moves from uncommitted slot to sorted committed list
382    ///
383    /// O(log v) - inserts into sorted position using binary search
384    #[inline]
385    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
386        self.inner.commit(txn_id, commit_ts)
387    }
388
389    /// Abort a version (remove uncommitted version for txn)
390    ///
391    /// O(1) - just clears the uncommitted slot if it matches
392    #[inline]
393    pub fn abort(&mut self, txn_id: u64) {
394        self.inner.abort(txn_id);
395    }
396
397    /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
398    ///
399    /// ## Complexity: O(1) + O(log v) = O(log v)
400    #[inline]
401    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
402        self.inner.read_at(snapshot_ts, current_txn_id)
403    }
404
405    /// Check if there's an uncommitted version by another transaction
406    ///
407    /// O(1) - just checks the uncommitted slot
408    #[inline]
409    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
410        self.inner.has_write_conflict(my_txn_id)
411    }
412
413    /// Garbage collect old versions
414    pub fn gc(&mut self, min_active_ts: u64) {
415        self.inner.gc_by_ts(min_active_ts);
416    }
417
418    /// Get total version count (committed + uncommitted)
419    #[inline]
420    pub fn version_count(&self) -> usize {
421        self.inner.version_count()
422    }
423
424    // Legacy compatibility: get versions vec (for tests)
425    #[cfg(test)]
426    pub fn versions(&self) -> Vec<Version> {
427        let mut result = self.inner.committed_versions().to_vec();
428        if let Some(v) = self.inner.uncommitted() {
429            result.push(v.clone());
430        }
431        result
432    }
433}
434
435// =============================================================================
436// Rec 6: Unified Version Chain Trait Implementations
437// =============================================================================
438
439impl MvccVersionChain for VersionChain {
440    type Value = Option<Vec<u8>>;
441
442    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
443        // Delegate to BinarySearchChain, then project to value field
444        self.inner
445            .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
446            .map(|v| &v.value)
447    }
448
449    fn get_latest(&self) -> Option<&Self::Value> {
450        self.inner.latest().map(|v| &v.value)
451    }
452
453    fn version_count(&self) -> usize {
454        self.inner.version_count()
455    }
456}
457
458impl MvccVersionChainMut for VersionChain {
459    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
460        self.add_uncommitted(value, txn_id);
461    }
462
463    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
464        self.inner.commit(txn_id, commit_ts)
465    }
466
467    fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
468        // Insert a tombstone (None value) as uncommitted
469        self.add_uncommitted(None, txn_id);
470        true
471    }
472
473    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
474        let before = self.inner.committed_count();
475        self.inner.gc_by_ts(min_visible_ts);
476        let removed = before - self.inner.committed_count();
477        (removed, removed * std::mem::size_of::<Version>())
478    }
479}
480
481impl WriteConflictDetection for VersionChain {
482    fn has_write_conflict(&self, txn_id: TxnId) -> bool {
483        self.has_write_conflict(txn_id)
484    }
485}
486
487// =============================================================================
488// Pre-sizing Constants to Avoid HashSet Resize Overhead
489// =============================================================================
490
491/// Default capacity for write_set HashSet
492/// Sized for typical OLTP transactions (10-50 keys)
493/// Avoids resize overhead that caused +11% regression
494const WRITE_SET_INITIAL_CAPACITY: usize = 32;
495
496/// Default capacity for read_set HashSet  
497/// Typically larger than write_set due to read-heavy patterns
498const READ_SET_INITIAL_CAPACITY: usize = 64;
499
500/// Transaction state for MVCC
501#[derive(Debug, Clone)]
502pub struct MvccTransaction {
503    /// Transaction ID
504    pub txn_id: u64,
505    /// Snapshot timestamp (reads see commits before this)
506    pub snapshot_ts: u64,
507    /// Keys written by this transaction - uses SmallVec for inline storage
508    /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
509    pub write_set: HashSet<InlineKey>,
510    /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
511    /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
512    pub read_set: HashSet<InlineKey>,
513    /// Bloom filter for write set - fast SSI pre-filtering
514    pub write_bloom: SsiBloomFilter,
515    /// Bloom filter for read set - fast SSI pre-filtering
516    pub read_bloom: SsiBloomFilter,
517    /// Transaction state
518    pub state: TxnState,
519    /// Transaction mode for SSI optimization (Recommendation 9)
520    /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
521    pub mode: TransactionMode,
522}
523
524impl MvccTransaction {
525    /// Create a new transaction with pre-sized collections
526    ///
527    /// This avoids HashSet resize overhead during the transaction
528    /// which was causing +11% regression on write_set.insert().
529    #[inline]
530    pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
531        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
532    }
533
534    /// Create a read-only transaction (SSI bypass - 2.6x faster)
535    ///
536    /// Read-only transactions skip all SSI tracking:
537    /// - No read_set allocation
538    /// - No read_bloom allocation  
539    /// - No commit validation
540    ///
541    /// ## Performance
542    ///
543    /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
544    #[inline]
545    pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
546        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
547    }
548
549    /// Create a write-only transaction (partial SSI bypass)
550    ///
551    /// Write-only transactions skip read tracking:
552    /// - No read_set tracking
553    /// - No read_bloom inserts
554    /// - Still needs write_set for commit
555    #[inline]
556    pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
557        Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
558    }
559
560    /// Create transaction with specific mode
561    #[inline]
562    pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
563        // Optimize allocation based on mode
564        let (write_capacity, read_capacity) = match mode {
565            TransactionMode::ReadOnly => (0, 0), // No tracking needed
566            TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
567            TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
568        };
569        Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
570    }
571
572    /// Create with custom capacities for expected workload
573    ///
574    /// Use this when you know the transaction will write many keys
575    /// to avoid resize overhead entirely.
576    #[inline]
577    pub fn with_capacity(
578        txn_id: u64,
579        snapshot_ts: u64,
580        write_capacity: usize,
581        read_capacity: usize,
582        mode: TransactionMode,
583    ) -> Self {
584        Self {
585            txn_id,
586            snapshot_ts,
587            write_set: HashSet::with_capacity(write_capacity),
588            read_set: HashSet::with_capacity(read_capacity),
589            write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
590            read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
591            state: TxnState::Active,
592            mode,
593        }
594    }
595
596    /// Check if this is a read-only transaction
597    #[inline]
598    pub fn is_read_only(&self) -> bool {
599        self.write_set.is_empty()
600    }
601
602    /// Check if this is a single-key write transaction
603    #[inline]
604    pub fn is_single_key_write(&self) -> bool {
605        self.write_set.len() == 1 && self.read_set.len() <= 1
606    }
607}
608
609/// Transaction state
610#[derive(Debug, Clone, Copy, PartialEq, Eq)]
611pub enum TxnState {
612    Active,
613    Committed,
614    Aborted,
615}
616
617// =============================================================================
618// Transaction Mode for SSI Bypass (Recommendation 9)
619// =============================================================================
620
621/// Transaction mode for SSI optimization
622///
623/// By classifying transactions at begin time, we can skip expensive SSI
624/// tracking for the majority of transactions:
625///
626/// | Mode      | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
627/// |-----------|-------------------|--------------------|-----------------|
628/// | ReadOnly  | None             | None               | ~10 ns          |
629/// | WriteOnly | None             | Full               | ~30 ns          |
630/// | ReadWrite | Full             | Full               | ~50 ns          |
631///
632/// ## Performance Analysis
633///
634/// For read-only transactions (typically 90% of workload):
635/// ```text
636/// Current:  T_txn = T_begin + N × (T_read + T_record) + T_commit
637///                 = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
638///
639/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
640///                 = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
641///
642/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
643/// ```
644#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
645pub enum TransactionMode {
646    /// Read-only transaction - skips ALL SSI tracking
647    /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
648    /// Safe to skip read_set, read_bloom, and commit validation entirely
649    ReadOnly,
650
651    /// Write-only transaction - skips read tracking
652    /// Cannot form incoming rw-edges (no reads from concurrent writers)
653    /// Only needs write_set and write_bloom tracking
654    WriteOnly,
655
656    /// Full read-write transaction (default) - complete SSI tracking
657    /// May form both incoming and outgoing rw-edges
658    /// Requires full validation at commit time
659    #[default]
660    ReadWrite,
661}
662
663impl TransactionMode {
664    /// Check if this mode requires read tracking
665    #[inline]
666    pub fn tracks_reads(&self) -> bool {
667        matches!(self, TransactionMode::ReadWrite)
668    }
669
670    /// Check if this mode requires write tracking
671    #[inline]
672    pub fn tracks_writes(&self) -> bool {
673        matches!(
674            self,
675            TransactionMode::WriteOnly | TransactionMode::ReadWrite
676        )
677    }
678
679    /// Check if commit needs SSI validation
680    #[inline]
681    pub fn needs_ssi_validation(&self) -> bool {
682        matches!(self, TransactionMode::ReadWrite)
683    }
684}
685
686/// SSI conflict edge type
687#[derive(Debug, Clone, Copy, PartialEq, Eq)]
688pub enum ConflictType {
689    /// Read-write conflict: T1 reads X, then T2 writes X
690    ReadWrite,
691    /// Write-read conflict: T1 writes X, then T2 reads X  
692    WriteRead,
693}
694
695/// SSI conflict edge for dangerous structure detection
696#[derive(Debug, Clone)]
697pub struct ConflictEdge {
698    /// Source transaction
699    pub from_txn: u64,
700    /// Target transaction
701    pub to_txn: u64,
702    /// Type of conflict
703    pub conflict_type: ConflictType,
704}
705
706/// MVCC Manager with SSI support
707///
708/// Uses DashMap for lock-free per-transaction access.
709/// Implements Serializable Snapshot Isolation (SSI) with
710/// dangerous structure detection for rw-antidependency cycles.
711#[allow(clippy::type_complexity)]
712pub struct MvccManager {
713    /// Active transactions (sharded for concurrent access)
714    active_txns: DashMap<u64, MvccTransaction>,
715    /// Current timestamp counter
716    ts_counter: AtomicU64,
717    /// Minimum active snapshot timestamp (for GC)
718    min_active_ts: AtomicU64,
719    /// Recently committed transactions for SSI validation
720    /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
721    /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
722    recent_commits: DashMap<
723        u64,
724        (
725            u64,
726            SsiBloomFilter,
727            SsiBloomFilter,
728            HashSet<InlineKey>,
729            HashSet<InlineKey>,
730        ),
731    >,
732    /// Max recent commits to track
733    max_recent_commits: usize,
734}
735
736impl Default for MvccManager {
737    fn default() -> Self {
738        Self::new()
739    }
740}
741
742impl MvccManager {
743    pub fn new() -> Self {
744        Self {
745            active_txns: DashMap::new(),
746            ts_counter: AtomicU64::new(1),
747            min_active_ts: AtomicU64::new(0),
748            recent_commits: DashMap::new(),
749            max_recent_commits: 1000, // Track last 1000 commits for SSI
750        }
751    }
752
753    /// Begin a new transaction with snapshot isolation
754    ///
755    /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
756    pub fn begin(&self, txn_id: u64) -> MvccTransaction {
757        self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
758    }
759
760    /// Begin a read-only transaction (SSI bypass - 2.6x faster)
761    ///
762    /// Read-only transactions skip all SSI tracking, reducing
763    /// per-read overhead from ~82ns to ~32ns.
764    ///
765    /// ## Safety
766    ///
767    /// Caller must ensure no writes are performed. Attempting to
768    /// write in a read-only transaction will still succeed but
769    /// won't be tracked for SSI validation.
770    #[inline]
771    pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
772        self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
773    }
774
775    /// Begin a write-only transaction (partial SSI bypass)
776    ///
777    /// Write-only transactions skip read tracking, reducing overhead
778    /// for insert-heavy workloads.
779    #[inline]
780    pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
781        self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
782    }
783
784    /// Begin a transaction with specific mode
785    ///
786    /// This is the core transaction creation method that all other
787    /// begin_* methods delegate to.
788    pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
789        let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
790
791        // Create transaction with mode-optimized allocations
792        let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
793
794        self.active_txns.insert(txn_id, txn.clone());
795        self.update_min_active_ts();
796
797        txn
798    }
799
800    /// Get transaction if active (clones - use get_snapshot_ts for hot path)
801    pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
802        self.active_txns.get(&txn_id).map(|t| t.clone())
803    }
804
805    /// Fast path: get just the snapshot timestamp without cloning
806    /// This is the hot path for reads - avoids cloning bloom filters
807    #[inline]
808    pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
809        self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
810    }
811
812    /// Record a read (for SSI) - uses inline key storage + bloom filter
813    ///
814    /// ## SSI Bypass (Recommendation 9)
815    ///
816    /// For ReadOnly mode transactions, this is a no-op (instant return).
817    /// For WriteOnly mode transactions, this is a no-op.
818    /// Only ReadWrite mode transactions track reads for SSI.
819    ///
820    /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
821    #[inline]
822    pub fn record_read(&self, txn_id: u64, key: &[u8]) {
823        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
824            // SSI Bypass: Skip tracking for read-only and write-only modes
825            if !txn.mode.tracks_reads() {
826                return;
827            }
828
829            // Only track reads if within reasonable bounds
830            if txn.read_set.len() < 10000 {
831                txn.read_set.insert(SmallVec::from_slice(key));
832                txn.read_bloom.insert(key);
833            }
834        }
835    }
836
837    /// Record a write - uses inline key storage + bloom filter
838    ///
839    /// Note: Even ReadOnly transactions can record writes (mode is a hint).
840    /// The mode only affects SSI tracking, not write capability.
841    pub fn record_write(&self, txn_id: u64, key: &[u8]) {
842        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
843            txn.write_set.insert(SmallVec::from_slice(key));
844            txn.write_bloom.insert(key);
845        }
846    }
847
848    /// Allocate commit timestamp
849    pub fn alloc_commit_ts(&self) -> u64 {
850        self.ts_counter.fetch_add(1, Ordering::SeqCst)
851    }
852
853    /// Commit transaction with SSI validation
854    /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
855    /// Returns None if SSI validation fails (dangerous structure detected)
856    ///
857    /// ## SSI Bypass (Recommendation 9)
858    ///
859    /// For ReadOnly mode: Skip validation entirely (~10ns commit)
860    /// For WriteOnly mode: Skip read-based validation (~30ns commit)
861    /// For ReadWrite mode: Full validation (~50ns commit)
862    pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
863        // Get transaction before removing
864        let txn = self.active_txns.get(&txn_id)?.clone();
865
866        // SSI Bypass: Skip validation for ReadOnly transactions
867        // ReadOnly can never form rw-antidependency cycles
868        if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
869            // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
870            // For ReadWrite: check SSI validation result
871            if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
872                // Abort on SSI violation
873                self.active_txns.remove(&txn_id);
874                self.update_min_active_ts();
875                return None;
876            }
877        }
878
879        let commit_ts = self.alloc_commit_ts();
880
881        // Extract write_set and remove transaction - takes ownership
882        let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
883
884        // OPTIMIZATION: Only track ReadWrite transactions for SSI
885        // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
886        let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
887            && !removed_txn.read_set.is_empty()
888            && !removed_txn.write_set.is_empty();
889
890        if needs_ssi_tracking {
891            // Need to clone write_set since we return it AND track it
892            let write_set_for_return = removed_txn.write_set.clone();
893
894            self.track_commit_owned(
895                txn_id,
896                commit_ts,
897                removed_txn.read_bloom,
898                removed_txn.write_bloom,
899                removed_txn.read_set,
900                removed_txn.write_set,
901            );
902
903            self.update_min_active_ts();
904            Some((commit_ts, write_set_for_return))
905        } else {
906            // Fast path: no SSI tracking needed, avoid clone entirely
907            self.update_min_active_ts();
908            Some((commit_ts, removed_txn.write_set))
909        }
910    }
911
912    /// Validate SSI constraints for a committing transaction
913    ///
914    /// ## Transaction Classification (Task 3: Optimistic MVCC)
915    ///
916    /// Transactions are classified and routed through appropriate fast paths:
917    ///
918    /// | Class      | Criteria                      | Validation Cost |
919    /// |------------|-------------------------------|-----------------|
920    /// | ReadOnly   | write_set.is_empty()          | 0 ns           |
921    /// | SingleKey  | write_set.len() == 1          | 0 ns           |
922    /// | Disjoint   | bloom filters don't intersect | ~10 ns         |
923    /// | General    | full SSI check                | ~50 ns         |
924    ///
925    /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
926    /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
927    ///
928    /// Detects "dangerous structures" - rw-antidependency cycles:
929    /// - T1 reads X (snapshot sees old value)
930    /// - T2 writes X (concurrent write)  
931    /// - T2 reads Y (snapshot sees old value)
932    /// - T1 writes Y (concurrent write)
933    ///
934    /// If T1 → rw → T2 → rw → T1 exists, abort T1
935    #[inline]
936    fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
937        // =================================================================
938        // Fast Path 1: Read-only transactions (0 ns)
939        // =================================================================
940        // Read-only transactions can never form rw-antidependency cycles
941        // because they have no writes to create outgoing rw-edges
942        if txn.write_set.is_empty() {
943            return true;
944        }
945
946        // =================================================================
947        // Fast Path 2: No recent commits to check (0 ns)
948        // =================================================================
949        if self.recent_commits.is_empty() {
950            return true;
951        }
952
953        // =================================================================
954        // Fast Path 3: Single-key write transactions (0 ns)
955        // =================================================================
956        // A single-key write transaction cannot form a dangerous cycle:
957        // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
958        //   AND T2 to read what T1 wrote
959        // - With only one key in write_set, the same key would need to be
960        //   in both read_set AND write_set of both transactions
961        // - This is already prevented by our conflict detection (write-write)
962        if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
963            return true;
964        }
965
966        let my_snapshot = txn.snapshot_ts;
967
968        // =================================================================
969        // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
970        // =================================================================
971        // Pre-filter using bloom filters: if our write_bloom doesn't intersect
972        // with any concurrent transaction's read_bloom AND vice versa,
973        // there can be no rw-antidependency
974        let mut any_may_intersect = false;
975        for entry in self.recent_commits.iter() {
976            let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
977
978            // Only check concurrent transactions.
979            //
980            // A committed transaction is *concurrent* with us iff its writes are
981            // invisible to our snapshot. Read visibility is strict
982            // (`commit_ts < snapshot_ts`), so a transaction with
983            // `commit_ts >= my_snapshot` is invisible and must be validated
984            // against. Using `<` here (not `<=`) keeps this window consistent
985            // with `read_at`; a `<=` would skip a boundary transaction whose
986            // `commit_ts == my_snapshot` and miss a genuine write-skew.
987            if *other_commit_ts < my_snapshot {
988                continue;
989            }
990
991            // Check bloom filter intersection (O(m/64) per filter)
992            // If our writes may intersect their reads OR their writes may intersect our reads
993            if txn.write_bloom.may_intersect(other_read_bloom)
994                || other_write_bloom.may_intersect(&txn.read_bloom)
995            {
996                any_may_intersect = true;
997                break;
998            }
999        }
1000
1001        // No bloom intersection means definitely disjoint - no SSI conflict possible
1002        if !any_may_intersect {
1003            return true;
1004        }
1005
1006        // =================================================================
1007        // Full SSI Validation (~50 ns)
1008        // =================================================================
1009        // Check for rw-conflicts with recently committed transactions
1010        // An rw-conflict exists if:
1011        // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
1012        // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
1013
1014        let mut in_conflict_with: Vec<u64> = Vec::new();
1015        let mut out_conflict_to: Vec<u64> = Vec::new();
1016
1017        for entry in self.recent_commits.iter() {
1018            let (
1019                other_txn_id,
1020                (
1021                    other_commit_ts,
1022                    _other_read_bloom,
1023                    other_write_bloom,
1024                    other_read_set,
1025                    other_write_set,
1026                ),
1027            ) = entry.pair();
1028
1029            // Only consider transactions concurrent with us: those whose writes
1030            // are invisible to our snapshot. Read visibility is strict
1031            // (`commit_ts < snapshot_ts`), so `commit_ts >= my_snapshot` means
1032            // concurrent. `<` (not `<=`) keeps this consistent with `read_at`.
1033            if *other_commit_ts < my_snapshot {
1034                continue;
1035            }
1036
1037            // Check: other wrote → we read (other →rw→ me)
1038            // T_other wrote a key that T_me read (rw-dependency inbound)
1039            //
1040            // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
1041            // Only do expensive HashSet intersection if bloom says "maybe conflict"
1042            let mut has_in_conflict = false;
1043            for key in txn.read_set.iter() {
1044                if other_write_bloom.may_contain(key) {
1045                    // Bloom says maybe - do exact check
1046                    if other_write_set.contains(key) {
1047                        has_in_conflict = true;
1048                        break;
1049                    }
1050                }
1051            }
1052            if has_in_conflict {
1053                in_conflict_with.push(*other_txn_id);
1054            }
1055
1056            // Check: we wrote → other read (me →rw→ other)
1057            // T_me wrote a key that T_other read (rw-dependency outbound)
1058            //
1059            // Bloom-accelerated: Use our write_bloom against their read_set
1060            let mut has_out_conflict = false;
1061            for key in other_read_set.iter() {
1062                if txn.write_bloom.may_contain(key) {
1063                    // Bloom says maybe - do exact check
1064                    if txn.write_set.contains(key) {
1065                        has_out_conflict = true;
1066                        break;
1067                    }
1068                }
1069            }
1070            if has_out_conflict {
1071                out_conflict_to.push(*other_txn_id);
1072            }
1073        }
1074
1075        // Dangerous structure: we have both incoming AND outgoing rw-edges
1076        // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1077        //
1078        // Conservative check: if both exist, abort
1079        // A more precise check would verify the cycle path, but this is safe
1080        if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1081            return false; // SSI violation - abort
1082        }
1083
1084        true
1085    }
1086
1087    /// Track a committed transaction for future SSI validation
1088    ///
1089    /// Only tracks transactions that have both reads AND writes, since SSI
1090    /// only detects rw-antidependency cycles. Pure read or pure write
1091    /// transactions can't form cycles.
1092    ///
1093    /// ## Optimization: Zero-Copy Transfer
1094    ///
1095    /// Takes ownership of sets instead of cloning to avoid the +15% commit
1096    /// phase regression. The caller should use mem::take() to transfer ownership.
1097    fn track_commit_owned(
1098        &self,
1099        txn_id: u64,
1100        commit_ts: u64,
1101        read_bloom: SsiBloomFilter,
1102        write_bloom: SsiBloomFilter,
1103        read_set: HashSet<InlineKey>,
1104        write_set: HashSet<InlineKey>,
1105    ) {
1106        // Optimization: Only track mixed read-write transactions
1107        // Pure reads can't create outgoing rw-edges
1108        // Pure writes can't create incoming rw-edges
1109        if read_set.is_empty() || write_set.is_empty() {
1110            return; // Skip tracking - can't form SSI cycle
1111        }
1112
1113        // Add to recent commits with bloom filters for fast SSI pre-filtering
1114        // No cloning needed - we take ownership
1115        self.recent_commits.insert(
1116            txn_id,
1117            (commit_ts, read_bloom, write_bloom, read_set, write_set),
1118        );
1119
1120        // Lazy pruning: only prune when we're significantly over capacity
1121        // Avoids pruning overhead on every commit
1122        if self.recent_commits.len() > self.max_recent_commits * 2 {
1123            // Remove entries with lowest commit_ts
1124            let min_active = self.min_active_ts.load(Ordering::Relaxed);
1125            self.recent_commits
1126                .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1127        }
1128    }
1129
1130    /// Legacy track_commit that clones - kept for compatibility
1131    #[allow(dead_code)]
1132    fn track_commit(
1133        &self,
1134        txn_id: u64,
1135        commit_ts: u64,
1136        read_bloom: SsiBloomFilter,
1137        write_bloom: SsiBloomFilter,
1138        read_set: &HashSet<InlineKey>,
1139        write_set: &HashSet<InlineKey>,
1140    ) {
1141        if read_set.is_empty() || write_set.is_empty() {
1142            return;
1143        }
1144        self.recent_commits.insert(
1145            txn_id,
1146            (
1147                commit_ts,
1148                read_bloom,
1149                write_bloom,
1150                read_set.clone(),
1151                write_set.clone(),
1152            ),
1153        );
1154    }
1155
1156    /// Abort transaction
1157    pub fn abort(&self, txn_id: u64) {
1158        self.active_txns.remove(&txn_id);
1159        self.update_min_active_ts();
1160    }
1161
1162    /// Get minimum active snapshot timestamp
1163    pub fn min_active_snapshot(&self) -> u64 {
1164        self.min_active_ts.load(Ordering::SeqCst)
1165    }
1166
1167    /// Get count of active transactions
1168    pub fn active_transaction_count(&self) -> usize {
1169        self.active_txns.len()
1170    }
1171
1172    fn update_min_active_ts(&self) {
1173        let min = self
1174            .active_txns
1175            .iter()
1176            .map(|entry| entry.value().snapshot_ts)
1177            .min()
1178            .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1179        self.min_active_ts.store(min, Ordering::SeqCst);
1180    }
1181}
1182
1183/// Epoch-based dirty list for O(expired) GC instead of O(n)
1184///
1185/// Instead of scanning ALL version chains, we track which keys have versions
1186/// created in each epoch. GC only needs to visit keys from old epochs.
1187struct EpochDirtyList {
1188    /// Ring buffer of epoch -> dirty keys
1189    /// Index = epoch % EPOCH_RING_SIZE
1190    epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1191    /// Current epoch
1192    current_epoch: AtomicU64,
1193}
1194
1195const EPOCH_RING_SIZE: usize = 4;
1196
1197impl EpochDirtyList {
1198    fn new() -> Self {
1199        Self {
1200            epochs: [
1201                parking_lot::Mutex::new(Vec::new()),
1202                parking_lot::Mutex::new(Vec::new()),
1203                parking_lot::Mutex::new(Vec::new()),
1204                parking_lot::Mutex::new(Vec::new()),
1205            ],
1206            current_epoch: AtomicU64::new(0),
1207        }
1208    }
1209
1210    /// Record a version created in the current epoch
1211    #[inline]
1212    fn record_version(&self, key: Vec<u8>) {
1213        let epoch = self.current_epoch.load(Ordering::Relaxed);
1214        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1215        self.epochs[idx].lock().push(key);
1216    }
1217
1218    /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1219    ///
1220    /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1221    /// For 100 writes: ~100x fewer mutex operations.
1222    #[inline]
1223    fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1224        let epoch = self.current_epoch.load(Ordering::Relaxed);
1225        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1226        let mut guard = self.epochs[idx].lock();
1227        guard.extend(keys);
1228    }
1229
1230    /// Advance to next epoch, returning old epoch's dirty keys
1231    fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1232        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1233        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1234
1235        // Drain the old epoch's dirty list
1236        let mut guard = self.epochs[old_idx].lock();
1237        let keys = std::mem::take(&mut *guard);
1238        (old_epoch, keys)
1239    }
1240
1241    /// Get current epoch
1242    #[allow(dead_code)]
1243    fn current(&self) -> u64 {
1244        self.current_epoch.load(Ordering::Relaxed)
1245    }
1246}
1247
1248// ============================================================================
1249// Streaming Scan Iterator
1250// ============================================================================
1251
1252/// Streaming iterator for range scans
1253///
1254/// Yields results one at a time without materializing the full result set.
1255/// This enables processing of very large result sets with O(1) memory per
1256/// iteration instead of O(N) for the entire result set.
1257struct ScanRangeIterator<'a> {
1258    memtable: &'a MvccMemTable,
1259    start: Vec<u8>,
1260    end: Vec<u8>,
1261    snapshot_ts: u64,
1262    current_txn_id: Option<u64>,
1263    use_ordered: bool,
1264    // We use Option to defer initialization
1265    ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1266    unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1267    initialized: bool,
1268}
1269
1270impl<'a> Iterator for ScanRangeIterator<'a> {
1271    type Item = (Vec<u8>, Vec<u8>);
1272
1273    fn next(&mut self) -> Option<Self::Item> {
1274        // Lazy initialization on first call
1275        if !self.initialized {
1276            self.initialized = true;
1277
1278            if self.use_ordered {
1279                // Try deferred index first (after compaction, it uses a SkipMap internally)
1280                if let Some(ref def_idx) = self.memtable.deferred_index {
1281                    let start = self.start.clone();
1282                    let end = self.end.clone();
1283                    let snapshot_ts = self.snapshot_ts;
1284                    let current_txn_id = self.current_txn_id;
1285                    let data = &self.memtable.data;
1286
1287                    // Collect keys from deferred index (already sorted after compact)
1288                    let keys: Vec<Vec<u8>> = if end.is_empty() {
1289                        def_idx.range_from(&start).collect()
1290                    } else {
1291                        def_idx.range(&start, &end).collect()
1292                    };
1293
1294                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1295                        Box::new(keys.into_iter().filter_map(move |key| {
1296                            if let Some(chain) = data.get(&key)
1297                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1298                                && let Some(value) = &v.value
1299                            {
1300                                Some((key, value.clone()))
1301                            } else {
1302                                None
1303                            }
1304                        }));
1305                    self.ordered_iter = Some(iter);
1306                } else if let Some(ref idx) = self.memtable.ordered_index {
1307                    let start = self.start.clone();
1308                    let end = self.end.clone();
1309                    let snapshot_ts = self.snapshot_ts;
1310                    let current_txn_id = self.current_txn_id;
1311                    let data = &self.memtable.data;
1312
1313                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty()
1314                    {
1315                        Box::new(idx.range(start..).filter_map(move |entry| {
1316                            let key = entry.key();
1317                            if let Some(chain) = data.get(key)
1318                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1319                                && let Some(value) = &v.value
1320                            {
1321                                Some((key.clone(), value.clone()))
1322                            } else {
1323                                None
1324                            }
1325                        }))
1326                    } else {
1327                        Box::new(idx.range(start..end).filter_map(move |entry| {
1328                            let key = entry.key();
1329                            if let Some(chain) = data.get(key)
1330                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1331                                && let Some(value) = &v.value
1332                            {
1333                                Some((key.clone(), value.clone()))
1334                            } else {
1335                                None
1336                            }
1337                        }))
1338                    };
1339                    self.ordered_iter = Some(iter);
1340                }
1341            } else {
1342                // Unordered full scan
1343                let start = self.start.clone();
1344                let end = self.end.clone();
1345                let snapshot_ts = self.snapshot_ts;
1346                let current_txn_id = self.current_txn_id;
1347
1348                let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1349                    Box::new(self.memtable.data.iter().filter_map(move |entry| {
1350                        let key = entry.key();
1351
1352                        if key.as_slice() < start.as_slice() {
1353                            return None;
1354                        }
1355                        if !end.is_empty() && key.as_slice() >= end.as_slice() {
1356                            return None;
1357                        }
1358
1359                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1360                            && let Some(value) = &v.value
1361                        {
1362                            Some((key.clone(), value.clone()))
1363                        } else {
1364                            None
1365                        }
1366                    }));
1367                self.unordered_iter = Some(iter);
1368            }
1369        }
1370
1371        // Get next from appropriate iterator
1372        if let Some(ref mut iter) = self.ordered_iter {
1373            iter.next()
1374        } else if let Some(ref mut iter) = self.unordered_iter {
1375            iter.next()
1376        } else {
1377            None
1378        }
1379    }
1380}
1381
1382/// MemTable with MVCC support
1383///
1384/// Uses DashMap for lock-free concurrent access per key.
1385/// This eliminates the global write lock bottleneck.
1386///
1387/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1388/// Maintains a deferred sorted index for efficient scans:
1389/// - Writes: O(1) append to hot buffer
1390/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1391pub struct MvccMemTable {
1392    /// Key -> VersionChain (sharded for concurrent access)
1393    data: DashMap<Vec<u8>, VersionChain>,
1394    /// Deferred sorted index for efficient prefix/range scans (optional)
1395    /// O(1) insert to hot buffer, O(N log N) sort on first scan
1396    /// When None, scan_prefix will fall back to O(N) DashMap iteration
1397    deferred_index: Option<DeferredSortedIndex>,
1398    /// Legacy SkipMap for compatibility (used when deferred=false)
1399    ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1400    /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1401    #[allow(dead_code)]
1402    use_deferred: bool,
1403    /// Approximate size in bytes
1404    size_bytes: AtomicU64,
1405    /// Epoch-based dirty list for efficient GC
1406    dirty_list: EpochDirtyList,
1407}
1408
1409impl Default for MvccMemTable {
1410    fn default() -> Self {
1411        Self::new()
1412    }
1413}
1414
1415impl MvccMemTable {
1416    pub fn new() -> Self {
1417        Self::with_ordered_index(true)
1418    }
1419
1420    /// Create memtable with optional ordered index
1421    ///
1422    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1423    /// but scan_prefix becomes O(N) instead of O(log N + K)
1424    ///
1425    /// Uses deferred sorting by default for better write performance:
1426    /// - Writes: O(1) append to hot buffer
1427    /// - Scans: O(N log N) sort-on-demand
1428    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1429        Self::with_index_mode(enable_ordered_index, true)
1430    }
1431
1432    /// Create memtable with fine-grained control over indexing
1433    ///
1434    /// # Arguments
1435    /// * `enable_ordered_index` - Whether to maintain an ordered index
1436    /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1437    ///                    If false, use SkipMap (O(log N) writes)
1438    pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1439        Self {
1440            data: DashMap::new(),
1441            deferred_index: if enable_ordered_index && use_deferred {
1442                Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1443                    max_unsorted_entries: 10_000, // Compact every 10K writes
1444                    enabled: true,
1445                }))
1446            } else {
1447                None
1448            },
1449            ordered_index: if enable_ordered_index && !use_deferred {
1450                Some(SkipMap::new())
1451            } else {
1452                None
1453            },
1454            use_deferred,
1455            size_bytes: AtomicU64::new(0),
1456            dirty_list: EpochDirtyList::new(),
1457        }
1458    }
1459
1460    /// Write a key-value pair (uncommitted)
1461    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1462        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1463        let key_len = key.len();
1464
1465        // Track this key in the current epoch's dirty list for GC
1466        self.dirty_list.record_version(key.clone());
1467
1468        // Insert into ordered index for prefix scans (if enabled)
1469        // Deferred: O(1) append to hot buffer
1470        // SkipMap: O(log N) insert
1471        if let Some(ref idx) = self.deferred_index {
1472            idx.insert(key.clone());
1473        } else if let Some(ref idx) = self.ordered_index {
1474            idx.insert(key.clone(), ());
1475        }
1476
1477        // Use entry API for atomic get-or-insert
1478        let mut entry = self.data.entry(key).or_default();
1479
1480        // Check for write-write conflict
1481        if entry.has_write_conflict(txn_id) {
1482            return Err(SochDBError::Internal(
1483                "Write-write conflict detected".into(),
1484            ));
1485        }
1486        entry.add_uncommitted(value, txn_id);
1487        self.size_bytes
1488            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1489
1490        Ok(())
1491    }
1492
1493    /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1494    ///
1495    /// Optimizations applied (Rec 3: MVCC Batching):
1496    /// - Batched dirty list tracking: single lock acquire for all keys
1497    /// - Deferred index: O(1) append per key
1498    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1499        let mut total_size = 0u64;
1500
1501        // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1502        self.dirty_list
1503            .record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1504
1505        for (key, value) in writes {
1506            // Insert into ordered index (if enabled)
1507            // Deferred: O(1) append, SkipMap: O(log N)
1508            if let Some(ref idx) = self.deferred_index {
1509                idx.insert(key.clone());
1510            } else if let Some(ref idx) = self.ordered_index {
1511                idx.insert(key.clone(), ());
1512            }
1513
1514            let mut entry = self.data.entry(key.clone()).or_default();
1515
1516            if entry.has_write_conflict(txn_id) {
1517                return Err(SochDBError::Internal(
1518                    "Write-write conflict detected".into(),
1519                ));
1520            }
1521
1522            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1523            entry.add_uncommitted(value.clone(), txn_id);
1524            total_size += (key.len() + value_size) as u64;
1525        }
1526
1527        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1528        Ok(())
1529    }
1530
1531    /// Read at snapshot timestamp, with optional current txn to see own writes
1532    pub fn read(
1533        &self,
1534        key: &[u8],
1535        snapshot_ts: u64,
1536        current_txn_id: Option<u64>,
1537    ) -> Option<Vec<u8>> {
1538        self.data.get(key).and_then(|chain| {
1539            chain
1540                .read_at(snapshot_ts, current_txn_id)
1541                .and_then(|v| v.value.clone())
1542        })
1543    }
1544
1545    /// Commit all versions for a transaction
1546    ///
1547    /// Only updates the keys that were written by this transaction (tracked in write_set).
1548    /// Accepts InlineKey for zero-allocation MVCC tracking.
1549    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1550        // Only iterate over keys we know were written - O(k) instead of O(n)
1551        for key in write_set {
1552            if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1553                chain.commit(txn_id, commit_ts);
1554            }
1555        }
1556    }
1557
1558    /// Legacy commit method (iterates all keys) - kept for backward compatibility
1559    #[allow(dead_code)]
1560    pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1561        for mut entry in self.data.iter_mut() {
1562            entry.value_mut().commit(txn_id, commit_ts);
1563        }
1564    }
1565
1566    /// Abort all versions for a transaction
1567    pub fn abort(&self, txn_id: u64) {
1568        for mut entry in self.data.iter_mut() {
1569            entry.value_mut().abort(txn_id);
1570        }
1571    }
1572
1573    /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1574    ///
1575    /// ## Performance
1576    ///
1577    /// When ordered_index is enabled: O(log N + K) complexity
1578    /// - O(log N) to seek to the first key with prefix
1579    /// - O(K) to iterate matching keys
1580    ///
1581    /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1582    ///
1583    /// ## Optimizations Applied
1584    ///
1585    /// - Pre-allocates result vector based on expected output size
1586    /// - Uses batch-friendly iteration patterns
1587    /// - Minimizes allocations during iteration
1588    /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1589    pub fn scan_prefix(
1590        &self,
1591        prefix: &[u8],
1592        snapshot_ts: u64,
1593        current_txn_id: Option<u64>,
1594    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1595        // Estimate result size for pre-allocation (use 10% of total as heuristic)
1596        let estimated_size = (self.data.len() / 10).max(64);
1597        let mut results = Vec::with_capacity(estimated_size);
1598
1599        if let Some(ref idx) = self.deferred_index {
1600            // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1601            for key in idx.range_from(prefix) {
1602                // Stop when we've passed the prefix range
1603                if !key.starts_with(prefix) {
1604                    break;
1605                }
1606
1607                // O(1) lookup in DashMap for version chain
1608                if let Some(chain) = self.data.get(&key)
1609                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1610                    && let Some(value) = &v.value
1611                {
1612                    results.push((key, value.clone()));
1613                }
1614            }
1615        } else if let Some(ref idx) = self.ordered_index {
1616            // Fast path: O(log N) seek to first key >= prefix
1617            for entry in idx.range(prefix.to_vec()..) {
1618                let key = entry.key();
1619
1620                // Stop when we've passed the prefix range
1621                if !key.starts_with(prefix) {
1622                    break;
1623                }
1624
1625                // O(1) lookup in DashMap for version chain
1626                if let Some(chain) = self.data.get(key)
1627                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1628                    && let Some(value) = &v.value
1629                {
1630                    results.push((key.clone(), value.clone()));
1631                }
1632            }
1633        } else {
1634            // Fallback: O(N) full DashMap scan when ordered_index is disabled
1635            // Optimized with batch-friendly iteration
1636            for entry in self.data.iter() {
1637                let key = entry.key();
1638                if !key.starts_with(prefix) {
1639                    continue;
1640                }
1641                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1642                    && let Some(value) = &v.value
1643                {
1644                    results.push((key.clone(), value.clone()));
1645                }
1646            }
1647        }
1648
1649        results
1650    }
1651
1652    /// Optimized full scan with batch allocation
1653    ///
1654    /// For use when scanning entire tables/namespaces.
1655    /// Pre-allocates based on actual data size.
1656    pub fn scan_all(
1657        &self,
1658        snapshot_ts: u64,
1659        current_txn_id: Option<u64>,
1660    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1661        let mut results = Vec::with_capacity(self.data.len());
1662
1663        for entry in self.data.iter() {
1664            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1665                && let Some(value) = &v.value
1666            {
1667                results.push((entry.key().clone(), value.clone()));
1668            }
1669        }
1670
1671        results
1672    }
1673
1674    /// Streaming scan iterator for very large datasets
1675    ///
1676    /// Returns an iterator that yields (key, value) pairs without
1677    /// materializing the entire result set in memory.
1678    pub fn scan_prefix_iter<'a>(
1679        &'a self,
1680        prefix: &'a [u8],
1681        snapshot_ts: u64,
1682        current_txn_id: Option<u64>,
1683    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1684        self.data.iter().filter_map(move |entry| {
1685            let key = entry.key();
1686            if !key.starts_with(prefix) {
1687                return None;
1688            }
1689            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1690                && let Some(value) = &v.value
1691            {
1692                Some((key.clone(), value.clone()))
1693            } else {
1694                None
1695            }
1696        })
1697    }
1698
1699    /// Scan range
1700    pub fn scan_range(
1701        &self,
1702        start: &[u8],
1703        end: &[u8],
1704        snapshot_ts: u64,
1705        current_txn_id: Option<u64>,
1706    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1707        let mut results = Vec::new();
1708
1709        if let Some(ref idx) = self.deferred_index {
1710            // Deferred index path: sort-on-scan
1711            if end.is_empty() {
1712                for key in idx.range_from(start) {
1713                    if let Some(chain) = self.data.get(&key)
1714                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1715                        && let Some(value) = &v.value
1716                    {
1717                        results.push((key, value.clone()));
1718                    }
1719                }
1720            } else {
1721                for key in idx.range(start, end) {
1722                    if let Some(chain) = self.data.get(&key)
1723                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1724                        && let Some(value) = &v.value
1725                    {
1726                        results.push((key, value.clone()));
1727                    }
1728                }
1729            }
1730        } else if let Some(ref idx) = self.ordered_index {
1731            // Use range scan on SkipMap
1732            if end.is_empty() {
1733                // Unbounded end
1734                for entry in idx.range(start.to_vec()..) {
1735                    let key = entry.key();
1736                    if let Some(chain) = self.data.get(key)
1737                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1738                        && let Some(value) = &v.value
1739                    {
1740                        results.push((key.clone(), value.clone()));
1741                    }
1742                }
1743            } else {
1744                for entry in idx.range(start.to_vec()..end.to_vec()) {
1745                    let key = entry.key();
1746                    if let Some(chain) = self.data.get(key)
1747                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1748                        && let Some(value) = &v.value
1749                    {
1750                        results.push((key.clone(), value.clone()));
1751                    }
1752                }
1753            }
1754        } else {
1755            // Fallback to full scan if no ordered index
1756            for entry in self.data.iter() {
1757                let key = entry.key();
1758
1759                if key.as_slice() < start {
1760                    continue;
1761                }
1762                if !end.is_empty() && key.as_slice() >= end {
1763                    continue;
1764                }
1765
1766                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1767                    && let Some(value) = &v.value
1768                {
1769                    results.push((key.clone(), value.clone()));
1770                }
1771            }
1772        }
1773
1774        results
1775    }
1776
1777    /// Streaming range scan iterator for very large datasets
1778    ///
1779    /// Returns an iterator that yields (key, value) pairs without
1780    /// materializing the entire result set in memory. Uses the ordered
1781    /// index when available for O(log N + K) complexity.
1782    ///
1783    /// ## Zero-Allocation Design
1784    ///
1785    /// While the iterator itself cannot avoid allocations for returned
1786    /// values (since the caller needs ownership), it avoids:
1787    /// - Pre-materializing all results
1788    /// - Intermediate buffers
1789    /// - Repeated key comparisons for already-visited entries
1790    ///
1791    /// ## Usage
1792    ///
1793    /// ```ignore
1794    /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1795    ///     // Process each result as it arrives
1796    ///     // Memory usage is O(1) per iteration, not O(N) total
1797    /// }
1798    /// ```
1799    pub fn scan_range_iter<'a>(
1800        &'a self,
1801        start: &'a [u8],
1802        end: &'a [u8],
1803        snapshot_ts: u64,
1804        current_txn_id: Option<u64>,
1805    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1806        // Compact deferred index before scanning if needed
1807        if let Some(ref idx) = self.deferred_index {
1808            idx.compact();
1809        }
1810
1811        // Use either ordered index or full scan
1812        let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1813
1814        // Create iterator based on availability of ordered index
1815        ScanRangeIterator {
1816            memtable: self,
1817            start: start.to_vec(),
1818            end: end.to_vec(),
1819            snapshot_ts,
1820            current_txn_id,
1821            use_ordered,
1822            ordered_iter: None,
1823            unordered_iter: None,
1824            initialized: false,
1825        }
1826    }
1827
1828    /// Get approximate size
1829    pub fn size(&self) -> u64 {
1830        self.size_bytes.load(Ordering::Relaxed)
1831    }
1832
1833    /// Garbage collect old versions using epoch-based dirty list
1834    ///
1835    /// O(expired_versions) instead of O(all_versions)
1836    /// Only visits keys that had versions created in the old epoch.
1837    pub fn gc(&self, min_active_ts: u64) -> usize {
1838        // Advance epoch and get the dirty keys from the old epoch
1839        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1840
1841        if dirty_keys.is_empty() {
1842            return 0;
1843        }
1844
1845        let mut gc_count = 0;
1846
1847        // Only visit keys that were modified in the old epoch
1848        // Use a HashSet to deduplicate keys that were written multiple times
1849        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1850
1851        for key in unique_keys {
1852            if let Some(mut entry) = self.data.get_mut(&key) {
1853                let before = entry.value().version_count();
1854                entry.value_mut().gc(min_active_ts);
1855                gc_count += before.saturating_sub(entry.value().version_count());
1856            }
1857        }
1858
1859        gc_count
1860    }
1861
1862    /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1863    #[allow(dead_code)]
1864    pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1865        let mut gc_count = 0;
1866
1867        for mut entry in self.data.iter_mut() {
1868            let before = entry.value().version_count();
1869            entry.value_mut().gc(min_active_ts);
1870            gc_count += before.saturating_sub(entry.value().version_count());
1871        }
1872
1873        gc_count
1874    }
1875}
1876
1877// =============================================================================
1878// Rec 11: Unified MvccStore Implementation for MvccMemTable
1879// =============================================================================
1880
1881impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1882    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1883        self.read(key, snapshot_ts, txn_id)
1884    }
1885
1886    fn mvcc_put(
1887        &self,
1888        key: &[u8],
1889        value: Option<Vec<u8>>,
1890        txn_id: u64,
1891    ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1892        let mut entry = self.data.entry(key.to_vec()).or_default();
1893        if entry.has_write_conflict(txn_id) {
1894            return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1895        }
1896        entry.add_uncommitted(value, txn_id);
1897        Ok(())
1898    }
1899
1900    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1901        if let Some(mut chain) = self.data.get_mut(key) {
1902            return chain.commit(txn_id, commit_ts);
1903        }
1904        false
1905    }
1906
1907    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1908        if let Some(mut chain) = self.data.get_mut(key) {
1909            chain.abort(txn_id);
1910        }
1911    }
1912
1913    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1914        self.data
1915            .get(key)
1916            .map(|chain| chain.has_write_conflict(txn_id))
1917            .unwrap_or(false)
1918    }
1919
1920    fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1921        let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1922        for mut entry in self.data.iter_mut() {
1923            stats.keys_scanned += 1;
1924            let before = entry.value().version_count();
1925            entry.value_mut().gc(min_ts);
1926            stats.versions_removed += before.saturating_sub(entry.value().version_count());
1927        }
1928        stats
1929    }
1930
1931    fn mvcc_key_count(&self) -> usize {
1932        self.data.len()
1933    }
1934}
1935
1936// ============================================================================
1937// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1938// ============================================================================
1939
1940use crate::key_buffer::ArenaKeyHandle;
1941
1942/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1943struct ArenaEpochDirtyList {
1944    epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1945    current_epoch: AtomicU64,
1946}
1947
1948impl ArenaEpochDirtyList {
1949    fn new() -> Self {
1950        Self {
1951            epochs: [
1952                parking_lot::Mutex::new(Vec::new()),
1953                parking_lot::Mutex::new(Vec::new()),
1954                parking_lot::Mutex::new(Vec::new()),
1955                parking_lot::Mutex::new(Vec::new()),
1956            ],
1957            current_epoch: AtomicU64::new(0),
1958        }
1959    }
1960
1961    #[inline]
1962    fn record_version(&self, key: ArenaKeyHandle) {
1963        let epoch = self.current_epoch.load(Ordering::Relaxed);
1964        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1965        self.epochs[idx].lock().push(key);
1966    }
1967
1968    fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1969        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1970        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1971        let mut guard = self.epochs[old_idx].lock();
1972        let keys = std::mem::take(&mut *guard);
1973        (old_epoch, keys)
1974    }
1975}
1976
1977/// Arena-backed MVCC MemTable with optimized key storage
1978///
1979/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1980/// reducing per-write allocations from 3 to 1:
1981///
1982/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1983/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1984///
1985/// ## Performance
1986///
1987/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1988/// by reducing:
1989/// - Heap allocations: 3 → 1 per write
1990/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1991pub struct ArenaMvccMemTable {
1992    /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1993    data: DashMap<ArenaKeyHandle, VersionChain>,
1994    /// Ordered index for prefix scans
1995    ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1996    /// Approximate size in bytes
1997    size_bytes: AtomicU64,
1998    /// Epoch-based dirty list (arena-backed)
1999    dirty_list: ArenaEpochDirtyList,
2000}
2001
2002impl ArenaMvccMemTable {
2003    pub fn new() -> Self {
2004        Self::with_ordered_index(true)
2005    }
2006
2007    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
2008        Self {
2009            data: DashMap::new(),
2010            ordered_index: if enable_ordered_index {
2011                Some(SkipMap::new())
2012            } else {
2013                None
2014            },
2015            size_bytes: AtomicU64::new(0),
2016            dirty_list: ArenaEpochDirtyList::new(),
2017        }
2018    }
2019
2020    /// Write a key-value pair using arena key handle
2021    ///
2022    /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
2023    /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
2024    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2025        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2026        let key_len = key.len();
2027
2028        // Create ONE ArenaKeyHandle - this is the only allocation for the key
2029        let key_handle = ArenaKeyHandle::new(key);
2030
2031        // Track in dirty list (O(1) copy of 16-byte handle)
2032        self.dirty_list.record_version(key_handle.clone());
2033
2034        // Insert into ordered index (O(1) copy of 16-byte handle)
2035        if let Some(ref idx) = self.ordered_index {
2036            idx.insert(key_handle.clone(), ());
2037        }
2038
2039        // Use entry API with the handle
2040        let mut entry = self.data.entry(key_handle).or_default();
2041
2042        if entry.has_write_conflict(txn_id) {
2043            return Err(SochDBError::Internal(
2044                "Write-write conflict detected".into(),
2045            ));
2046        }
2047        entry.add_uncommitted(value, txn_id);
2048        self.size_bytes
2049            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2050
2051        Ok(())
2052    }
2053
2054    /// Write batch using arena key handles
2055    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2056        let mut total_size = 0u64;
2057
2058        for (key, value) in writes {
2059            let key_handle = ArenaKeyHandle::new(key);
2060
2061            self.dirty_list.record_version(key_handle.clone());
2062
2063            if let Some(ref idx) = self.ordered_index {
2064                idx.insert(key_handle.clone(), ());
2065            }
2066
2067            let mut entry = self.data.entry(key_handle).or_default();
2068
2069            if entry.has_write_conflict(txn_id) {
2070                return Err(SochDBError::Internal(
2071                    "Write-write conflict detected".into(),
2072                ));
2073            }
2074
2075            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2076            entry.add_uncommitted(value.clone(), txn_id);
2077            total_size += (key.len() + value_size) as u64;
2078        }
2079
2080        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2081        Ok(())
2082    }
2083
2084    /// Read at snapshot timestamp
2085    pub fn read(
2086        &self,
2087        key: &[u8],
2088        snapshot_ts: u64,
2089        current_txn_id: Option<u64>,
2090    ) -> Option<Vec<u8>> {
2091        // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2092        let key_handle = ArenaKeyHandle::new(key);
2093        self.data.get(&key_handle).and_then(|chain| {
2094            chain
2095                .read_at(snapshot_ts, current_txn_id)
2096                .and_then(|v| v.value.clone())
2097        })
2098    }
2099
2100    /// Commit transaction
2101    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2102        for key in write_set {
2103            let key_handle = ArenaKeyHandle::new(key.as_slice());
2104            if let Some(mut chain) = self.data.get_mut(&key_handle) {
2105                chain.commit(txn_id, commit_ts);
2106            }
2107        }
2108    }
2109
2110    /// Abort transaction
2111    pub fn abort(&self, txn_id: u64) {
2112        for mut entry in self.data.iter_mut() {
2113            entry.value_mut().abort(txn_id);
2114        }
2115    }
2116
2117    /// Scan prefix
2118    pub fn scan_prefix(
2119        &self,
2120        prefix: &[u8],
2121        snapshot_ts: u64,
2122        current_txn_id: Option<u64>,
2123    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2124        let mut results = Vec::new();
2125        let prefix_handle = ArenaKeyHandle::new(prefix);
2126
2127        if let Some(ref idx) = self.ordered_index {
2128            for entry in idx.range(prefix_handle..) {
2129                let key = entry.key();
2130
2131                if !key.as_bytes().starts_with(prefix) {
2132                    break;
2133                }
2134
2135                if let Some(chain) = self.data.get(key)
2136                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2137                    && let Some(value) = &v.value
2138                {
2139                    results.push((key.as_bytes().to_vec(), value.clone()));
2140                }
2141            }
2142        } else {
2143            for entry in self.data.iter() {
2144                let key = entry.key();
2145                if !key.as_bytes().starts_with(prefix) {
2146                    continue;
2147                }
2148                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2149                    && let Some(value) = &v.value
2150                {
2151                    results.push((key.as_bytes().to_vec(), value.clone()));
2152                }
2153            }
2154        }
2155
2156        results
2157    }
2158
2159    /// Get approximate size
2160    pub fn size(&self) -> u64 {
2161        self.size_bytes.load(Ordering::Relaxed)
2162    }
2163
2164    /// Garbage collect old versions
2165    pub fn gc(&self, min_active_ts: u64) -> usize {
2166        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2167
2168        if dirty_keys.is_empty() {
2169            return 0;
2170        }
2171
2172        let mut gc_count = 0;
2173        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2174
2175        for key in unique_keys {
2176            if let Some(mut entry) = self.data.get_mut(&key) {
2177                let before = entry.value().version_count();
2178                entry.value_mut().gc(min_active_ts);
2179                gc_count += before.saturating_sub(entry.value().version_count());
2180            }
2181        }
2182
2183        gc_count
2184    }
2185}
2186
2187impl Default for ArenaMvccMemTable {
2188    fn default() -> Self {
2189        Self::new()
2190    }
2191}
2192
2193// ============================================================================
2194// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2195// ============================================================================
2196
2197/// Configuration for memtable type selection
2198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2199pub enum MemTableType {
2200    /// Standard MVCC memtable with deferred sorting
2201    /// Best for: general workloads, balanced read/write
2202    Standard,
2203    /// Arena-backed memtable with reduced allocations
2204    /// Best for: write-heavy workloads, large keys
2205    Arena,
2206}
2207
2208impl Default for MemTableType {
2209    fn default() -> Self {
2210        // Default to Standard which now has deferred sorting
2211        MemTableType::Standard
2212    }
2213}
2214
2215/// Unified memtable abstraction using enum dispatch
2216///
2217/// This pattern provides:
2218/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2219/// - Type-safe switching between implementations
2220/// - Easy extensibility for future memtable types
2221///
2222/// ## Why Enum over Trait Object?
2223///
2224/// - Hot path performance: enum match is a single branch vs vtable indirection
2225/// - Cache friendliness: no pointer chasing
2226/// - Inlining: compiler can inline through enum dispatch
2227pub enum MemTableKind {
2228    Standard(MvccMemTable),
2229    Arena(ArenaMvccMemTable),
2230}
2231
2232impl MemTableKind {
2233    /// Create a new memtable of the specified type
2234    pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2235        match kind {
2236            MemTableType::Standard => {
2237                MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2238            }
2239            MemTableType::Arena => {
2240                MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2241            }
2242        }
2243    }
2244
2245    /// Write a key-value pair
2246    #[inline]
2247    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2248        match self {
2249            MemTableKind::Standard(m) => m.write(key, value, txn_id),
2250            MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2251        }
2252    }
2253
2254    /// Write batch of key-value pairs
2255    #[inline]
2256    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2257        match self {
2258            MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2259            MemTableKind::Arena(m) => {
2260                // Convert to arena-compatible format
2261                let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2262                    .iter()
2263                    .map(|(k, v)| (k.as_slice(), v.clone()))
2264                    .collect();
2265                m.write_batch(&arena_writes, txn_id)
2266            }
2267        }
2268    }
2269
2270    /// Read at snapshot timestamp
2271    #[inline]
2272    pub fn read(
2273        &self,
2274        key: &[u8],
2275        snapshot_ts: u64,
2276        current_txn_id: Option<u64>,
2277    ) -> Option<Vec<u8>> {
2278        match self {
2279            MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2280            MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2281        }
2282    }
2283
2284    /// Commit transaction
2285    #[inline]
2286    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2287        match self {
2288            MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2289            MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2290        }
2291    }
2292
2293    /// Abort transaction
2294    #[inline]
2295    pub fn abort(&self, txn_id: u64) {
2296        match self {
2297            MemTableKind::Standard(m) => m.abort(txn_id),
2298            MemTableKind::Arena(m) => m.abort(txn_id),
2299        }
2300    }
2301
2302    /// Scan prefix
2303    #[inline]
2304    pub fn scan_prefix(
2305        &self,
2306        prefix: &[u8],
2307        snapshot_ts: u64,
2308        current_txn_id: Option<u64>,
2309    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2310        match self {
2311            MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2312            MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2313        }
2314    }
2315
2316    /// Scan range
2317    #[inline]
2318    pub fn scan_range(
2319        &self,
2320        start: &[u8],
2321        end: &[u8],
2322        snapshot_ts: u64,
2323        current_txn_id: Option<u64>,
2324    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2325        match self {
2326            MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2327            MemTableKind::Arena(m) => {
2328                // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2329                let mut results = Vec::new();
2330                if let Some(ref idx) = m.ordered_index {
2331                    let start_handle = ArenaKeyHandle::new(start);
2332                    let end_handle = ArenaKeyHandle::new(end);
2333
2334                    if end.is_empty() {
2335                        for entry in idx.range(start_handle..) {
2336                            let key = entry.key();
2337                            if let Some(chain) = m.data.get(key)
2338                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2339                                && let Some(value) = &v.value
2340                            {
2341                                results.push((key.as_bytes().to_vec(), value.clone()));
2342                            }
2343                        }
2344                    } else {
2345                        for entry in idx.range(start_handle..end_handle) {
2346                            let key = entry.key();
2347                            if let Some(chain) = m.data.get(key)
2348                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2349                                && let Some(value) = &v.value
2350                            {
2351                                results.push((key.as_bytes().to_vec(), value.clone()));
2352                            }
2353                        }
2354                    }
2355                } else {
2356                    for entry in m.data.iter() {
2357                        let key = entry.key();
2358                        let key_bytes = key.as_bytes();
2359                        if key_bytes < start {
2360                            continue;
2361                        }
2362                        if !end.is_empty() && key_bytes >= end {
2363                            continue;
2364                        }
2365                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2366                            && let Some(value) = &v.value
2367                        {
2368                            results.push((key_bytes.to_vec(), value.clone()));
2369                        }
2370                    }
2371                }
2372                results
2373            }
2374        }
2375    }
2376
2377    /// Scan range iterator (returns collected results for now)
2378    #[inline]
2379    pub fn scan_range_iter<'a>(
2380        &'a self,
2381        start: &'a [u8],
2382        end: &'a [u8],
2383        snapshot_ts: u64,
2384        current_txn_id: Option<u64>,
2385    ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2386        match self {
2387            MemTableKind::Standard(m) => {
2388                Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2389            }
2390            MemTableKind::Arena(_) => {
2391                // Arena version returns collected results as iterator
2392                let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2393                Box::new(results.into_iter())
2394            }
2395        }
2396    }
2397
2398    /// Get approximate size
2399    #[inline]
2400    pub fn size(&self) -> u64 {
2401        match self {
2402            MemTableKind::Standard(m) => m.size(),
2403            MemTableKind::Arena(m) => m.size(),
2404        }
2405    }
2406
2407    /// Garbage collect old versions
2408    #[inline]
2409    pub fn gc(&self, min_active_ts: u64) -> usize {
2410        match self {
2411            MemTableKind::Standard(m) => m.gc(min_active_ts),
2412            MemTableKind::Arena(m) => m.gc(min_active_ts),
2413        }
2414    }
2415
2416    /// Get the kind of memtable
2417    pub fn kind(&self) -> MemTableType {
2418        match self {
2419            MemTableKind::Standard(_) => MemTableType::Standard,
2420            MemTableKind::Arena(_) => MemTableType::Arena,
2421        }
2422    }
2423}
2424
2425/// Durable storage engine with full ACID support
2426pub struct DurableStorage {
2427    /// Path to storage directory
2428    path: PathBuf,
2429    /// Write-ahead log
2430    wal: Arc<TxnWal>,
2431    /// MVCC manager
2432    mvcc: Arc<MvccManager>,
2433    /// In-memory data (unified abstraction over Standard/Arena)
2434    memtable: Arc<MemTableKind>,
2435    /// Per-transaction WAL buffers for batched writes
2436    /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2437    /// At commit, buffer is flushed to WAL with single lock acquisition
2438    txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2439    /// Group commit buffer (optional)
2440    group_commit: Option<Arc<EventDrivenGroupCommit>>,
2441    /// Recovery state
2442    needs_recovery: AtomicU64, // 1 = needs recovery
2443    /// Last checkpoint LSN
2444    last_checkpoint_lsn: AtomicU64,
2445    /// Synchronous mode (like SQLite's PRAGMA synchronous)
2446    /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2447    sync_mode: AtomicU64,
2448    /// Commits since last sync (for NORMAL mode)
2449    commits_since_sync: AtomicU64,
2450    /// Adaptive batch sizing for NORMAL mode (Little's Law)
2451    /// Arrival rate in requests/sec × 1000 for precision
2452    arrival_rate_ema: AtomicU64,
2453    /// Last commit timestamp in microseconds
2454    last_commit_us: AtomicU64,
2455    /// Estimated fsync latency in microseconds
2456    fsync_latency_us: AtomicU64,
2457    /// Database lock for exclusive access (None = no locking)
2458    #[allow(dead_code)]
2459    db_lock: Option<crate::lock::DatabaseLock>,
2460}
2461
2462impl DurableStorage {
2463    /// Open or create durable storage at path
2464    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2465        Self::open_with_config(path, true)
2466    }
2467
2468    /// Open with configurable ordered index
2469    ///
2470    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2471    /// but scan_prefix becomes O(N) instead of O(log N + K)
2472    pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2473        Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2474    }
2475
2476    /// Open with arena-backed memtable for write-heavy workloads
2477    ///
2478    /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2479    /// Best for workloads with:
2480    /// - High write throughput
2481    /// - Large keys (reduces allocation overhead)
2482    /// - Minimal concurrent reads during writes
2483    pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2484        Self::open_with_full_config(path, true, MemTableType::Arena)
2485    }
2486
2487    /// Open with full configuration options
2488    ///
2489    /// # Arguments
2490    /// * `path` - Storage directory path
2491    /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2492    /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2493    ///
2494    /// # Locking
2495    ///
2496    /// Acquires an exclusive advisory lock on the database directory.
2497    /// This prevents concurrent multi-process access which would corrupt data.
2498    /// If another process has the database open, returns `Err(DatabaseLocked)`.
2499    pub fn open_with_full_config<P: AsRef<Path>>(
2500        path: P,
2501        enable_ordered_index: bool,
2502        memtable_type: MemTableType,
2503    ) -> Result<Self> {
2504        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2505    }
2506
2507    /// Open without locking (for testing crash recovery scenarios)
2508    ///
2509    /// # Safety
2510    /// This should ONLY be used in tests that simulate crashes by forgetting
2511    /// the storage instance. In production, always use `open_with_full_config`.
2512    #[cfg(test)]
2513    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2514        Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2515    }
2516
2517    /// Open an ephemeral (in-memory-like) DurableStorage backed by a temp directory.
2518    ///
2519    /// Uses the full DurableStorage engine (WAL, MVCC, SSI) but writes to a
2520    /// temporary directory that is automatically cleaned up when the
2521    /// `EphemeralHandle` is dropped. This ensures test and production code paths
2522    /// are identical — bugs found in tests are guaranteed to reproduce in production.
2523    ///
2524    /// # Returns
2525    /// An `EphemeralHandle` that owns both the storage and the temp directory.
2526    /// Access the storage via `handle.storage()` or `Deref` coercion.
2527    ///
2528    /// # Example
2529    /// ```ignore
2530    /// let handle = DurableStorage::open_ephemeral()?;
2531    /// let txn = handle.begin_transaction()?;
2532    /// handle.write(txn, b"key".to_vec(), b"value".to_vec())?;
2533    /// handle.commit(txn)?;
2534    /// // temp directory cleaned up when `handle` drops
2535    /// ```
2536    pub fn open_ephemeral() -> Result<EphemeralHandle> {
2537        let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2538        let storage = Self::open_with_full_config_internal(
2539            tmp.path(),
2540            true,
2541            MemTableType::Standard,
2542            false, // No lock needed for ephemeral
2543        )?;
2544        Ok(EphemeralHandle {
2545            storage,
2546            _tmpdir: tmp,
2547        })
2548    }
2549
2550    /// Open an ephemeral DurableStorage with group commit enabled.
2551    ///
2552    /// Same as `open_ephemeral()` but with group commit for higher throughput.
2553    pub fn open_ephemeral_with_group_commit() -> Result<EphemeralHandle> {
2554        let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2555        let mut storage =
2556            Self::open_with_full_config_internal(tmp.path(), true, MemTableType::Standard, false)?;
2557
2558        let wal = storage.wal.clone();
2559        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2560            for &txn_id in txn_ids {
2561                let entry = TxnWalEntry::txn_commit(txn_id);
2562                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2563            }
2564            wal.flush().map_err(|e| e.to_string())?;
2565            wal.sync().map_err(|e| e.to_string())?;
2566            Ok(std::time::SystemTime::now()
2567                .duration_since(std::time::UNIX_EPOCH)
2568                .unwrap()
2569                .as_micros() as u64)
2570        });
2571        storage.group_commit = Some(Arc::new(gc));
2572
2573        Ok(EphemeralHandle {
2574            storage,
2575            _tmpdir: tmp,
2576        })
2577    }
2578
2579    fn open_with_full_config_internal<P: AsRef<Path>>(
2580        path: P,
2581        enable_ordered_index: bool,
2582        memtable_type: MemTableType,
2583        acquire_lock: bool,
2584    ) -> Result<Self> {
2585        let path = path.as_ref().to_path_buf();
2586        std::fs::create_dir_all(&path)?;
2587
2588        // Acquire exclusive lock on database directory (unless disabled for testing)
2589        let db_lock = if acquire_lock {
2590            Some(
2591                crate::lock::DatabaseLock::acquire(&path)
2592                    .map_err(|e| SochDBError::LockError(e.to_string()))?,
2593            )
2594        } else {
2595            None
2596        };
2597
2598        let wal_path = path.join("wal.log");
2599        let wal = Arc::new(TxnWal::new(&wal_path)?);
2600
2601        let storage = Self {
2602            path,
2603            wal: wal.clone(),
2604            mvcc: Arc::new(MvccManager::new()),
2605            memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2606            txn_write_buffers: DashMap::new(),
2607            group_commit: None,
2608            needs_recovery: AtomicU64::new(0),
2609            last_checkpoint_lsn: AtomicU64::new(0),
2610            sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2611            commits_since_sync: AtomicU64::new(0),
2612            // Adaptive batch sizing (Little's Law)
2613            arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2614            last_commit_us: AtomicU64::new(0),
2615            fsync_latency_us: AtomicU64::new(5000), // 5ms default
2616            db_lock,
2617        };
2618
2619        // Check if recovery needed
2620        if storage.check_recovery_needed()? {
2621            storage.needs_recovery.store(1, Ordering::SeqCst);
2622        }
2623
2624        Ok(storage)
2625    }
2626
2627    /// Open with group commit enabled
2628    pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2629        Self::open_with_group_commit_and_config(path, true)
2630    }
2631
2632    /// Open with group commit and configurable ordered index
2633    pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2634        path: P,
2635        enable_ordered_index: bool,
2636    ) -> Result<Self> {
2637        let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2638
2639        let wal = storage.wal.clone();
2640        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2641            // Write all commit records WITHOUT flushing (batch them)
2642            for &txn_id in txn_ids {
2643                let entry = TxnWalEntry::txn_commit(txn_id);
2644                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2645            }
2646
2647            // Then do a SINGLE flush + fsync for the entire batch
2648            wal.flush().map_err(|e| e.to_string())?;
2649            wal.sync().map_err(|e| e.to_string())?;
2650
2651            // Return commit timestamp
2652            Ok(std::time::SystemTime::now()
2653                .duration_since(std::time::UNIX_EPOCH)
2654                .unwrap()
2655                .as_micros() as u64)
2656        });
2657
2658        storage.group_commit = Some(Arc::new(gc));
2659        Ok(storage)
2660    }
2661
2662    /// Open with IndexPolicy for automatic memtable/index configuration
2663    ///
2664    /// This is the recommended constructor for new code. The policy determines:
2665    /// - Whether to use ordered index (ScanOptimized only)
2666    /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2667    /// - Default settings optimized for the workload pattern
2668    ///
2669    /// # Arguments
2670    /// * `path` - Storage directory path
2671    /// * `policy` - Index policy determining write/scan tradeoffs
2672    /// * `group_commit` - Whether to enable group commit for throughput
2673    pub fn open_with_policy<P: AsRef<Path>>(
2674        path: P,
2675        policy: crate::index_policy::IndexPolicy,
2676        group_commit: bool,
2677    ) -> Result<Self> {
2678        use crate::index_policy::IndexPolicy;
2679
2680        // Derive configuration from policy
2681        let (enable_ordered_index, memtable_type) = match policy {
2682            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2683                // Write-heavy: no ordered index, use arena for reduced allocations
2684                (false, MemTableType::Arena)
2685            }
2686            IndexPolicy::Balanced => {
2687                // Mixed OLTP: deferred sorting (already implemented in Standard)
2688                (true, MemTableType::Standard)
2689            }
2690            IndexPolicy::ScanOptimized => {
2691                // Scan-heavy: maintain ordered index
2692                (true, MemTableType::Standard)
2693            }
2694        };
2695
2696        if group_commit {
2697            let mut storage =
2698                Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2699
2700            let wal = storage.wal.clone();
2701            let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2702                for &txn_id in txn_ids {
2703                    let entry = TxnWalEntry::txn_commit(txn_id);
2704                    wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2705                }
2706                wal.flush().map_err(|e| e.to_string())?;
2707                wal.sync().map_err(|e| e.to_string())?;
2708                Ok(std::time::SystemTime::now()
2709                    .duration_since(std::time::UNIX_EPOCH)
2710                    .unwrap()
2711                    .as_micros() as u64)
2712            });
2713            storage.group_commit = Some(Arc::new(gc));
2714            Ok(storage)
2715        } else {
2716            Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2717        }
2718    }
2719
2720    /// Open storage for concurrent mode (multi-reader, single-writer)
2721    ///
2722    /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2723    /// Coordination is handled by the concurrent MVCC layer instead.
2724    ///
2725    /// # Safety
2726    ///
2727    /// This must ONLY be called from `Database::open_concurrent()` which
2728    /// manages the concurrent MVCC coordination. Direct use will cause
2729    /// data corruption.
2730    pub fn open_for_concurrent<P: AsRef<Path>>(
2731        path: P,
2732        policy: crate::index_policy::IndexPolicy,
2733    ) -> Result<Self> {
2734        use crate::index_policy::IndexPolicy;
2735
2736        let (enable_ordered_index, memtable_type) = match policy {
2737            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => (false, MemTableType::Arena),
2738            IndexPolicy::Balanced => (true, MemTableType::Standard),
2739            IndexPolicy::ScanOptimized => (true, MemTableType::Standard),
2740        };
2741
2742        // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2743        Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, false)
2744    }
2745
2746    /// Get the memtable type being used
2747    pub fn memtable_type(&self) -> MemTableType {
2748        self.memtable.kind()
2749    }
2750
2751    /// Check if recovery is needed (dirty shutdown detection)
2752    ///
2753    /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2754    /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2755    /// but committed data still needs to be loaded from WAL into the memtable.
2756    fn check_recovery_needed(&self) -> Result<bool> {
2757        let marker_path = self.path.join(".clean_shutdown");
2758        if marker_path.exists() {
2759            // Clean shutdown - remove marker
2760            std::fs::remove_file(&marker_path)?;
2761        }
2762        // ALWAYS need recovery to rebuild memtable from WAL
2763        // The memtable is in-memory only and needs to be restored on every startup
2764        Ok(true)
2765    }
2766
2767    /// Perform crash recovery
2768    pub fn recover(&self) -> Result<RecoveryStats> {
2769        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2770            return Ok(RecoveryStats::default());
2771        }
2772
2773        let (writes, txn_count) = self.wal.replay_for_recovery()?;
2774
2775        // Apply committed writes to memtable
2776        let recovery_txn_id = self.wal.alloc_txn_id();
2777        let commit_ts = self.mvcc.alloc_commit_ts();
2778
2779        // Collect keys being written for efficient commit
2780        let mut write_set: HashSet<InlineKey> = HashSet::new();
2781        for (key, value) in &writes {
2782            write_set.insert(SmallVec::from_slice(key));
2783            self.memtable
2784                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2785        }
2786        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2787
2788        self.needs_recovery.store(0, Ordering::SeqCst);
2789
2790        Ok(RecoveryStats {
2791            transactions_recovered: txn_count,
2792            writes_recovered: writes.len(),
2793            commit_ts,
2794        })
2795    }
2796
2797    /// Begin a new transaction
2798    pub fn begin_transaction(&self) -> Result<u64> {
2799        let txn_id = self.wal.begin_transaction()?;
2800        self.mvcc.begin(txn_id);
2801        Ok(txn_id)
2802    }
2803
2804    /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2805    ///
2806    /// This enables mode-aware optimizations:
2807    /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2808    /// - WriteOnly: Skip read tracking, faster bulk inserts
2809    /// - ReadWrite: Full SSI for serializable isolation
2810    pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2811        let txn_id = self.wal.begin_transaction()?;
2812        self.mvcc.begin_with_mode(txn_id, mode);
2813        Ok(txn_id)
2814    }
2815
2816    /// Begin a read-only transaction without any WAL records.
2817    ///
2818    /// This is a performance-critical optimization that eliminates two WAL
2819    /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
2820    /// transactions have no state to recover, WAL records are unnecessary.
2821    ///
2822    /// Callers MUST use `abort_read_only_fast()` to clean up.
2823    #[inline]
2824    pub fn begin_read_only_fast(&self) -> u64 {
2825        let txn_id = self.wal.alloc_txn_id();
2826        self.mvcc.begin_read_only(txn_id);
2827        txn_id
2828    }
2829
2830    /// Abort a fast read-only transaction.
2831    ///
2832    /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
2833    #[inline]
2834    pub fn abort_read_only_fast(&self, txn_id: u64) {
2835        self.mvcc.abort(txn_id);
2836    }
2837
2838    /// Read a key WITHOUT any MVCC transaction tracking.
2839    ///
2840    /// Uses the current global timestamp to see all committed writes.
2841    /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
2842    /// Only safe for single-threaded access (no concurrent writes).
2843    #[inline]
2844    pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
2845        let snapshot_ts = self
2846            .mvcc
2847            .ts_counter
2848            .load(std::sync::atomic::Ordering::Relaxed);
2849        self.memtable.read(key, snapshot_ts, None)
2850    }
2851
2852    /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
2853    ///
2854    /// Uses the current global timestamp. Only safe for single-threaded access.
2855    #[inline]
2856    pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
2857        let snapshot_ts = self
2858            .mvcc
2859            .ts_counter
2860            .load(std::sync::atomic::Ordering::Relaxed);
2861        self.memtable.scan_prefix(prefix, snapshot_ts, None)
2862    }
2863
2864    /// Read a key within a transaction
2865    #[inline]
2866    pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2867        // Fast path: get just snapshot_ts without cloning whole transaction
2868        let snapshot_ts = self
2869            .mvcc
2870            .get_snapshot_ts(txn_id)
2871            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2872
2873        // Record read for SSI (skipped for read-only transactions)
2874        self.mvcc.record_read(txn_id, key);
2875
2876        // Read at snapshot timestamp, seeing own uncommitted writes
2877        Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2878    }
2879
2880    /// Write a key-value pair within a transaction
2881    ///
2882    /// Writes are buffered and only flushed to disk on commit.
2883    /// This provides ~10× better throughput for batched inserts.
2884    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2885        // Use the zero-allocation path internally
2886        self.write_refs(txn_id, &key, &value)?;
2887
2888        Ok(())
2889    }
2890
2891    /// Write from references - zero allocation hot path
2892    ///
2893    /// Avoids cloning key/value by writing to WAL from refs directly,
2894    /// then only allocating once for memtable storage.
2895    #[inline]
2896    pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2897        // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2898        self.mvcc.record_write(txn_id, key);
2899
2900        // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2901        // This reduces lock contention from O(writes) to O(1) per transaction
2902        self.txn_write_buffers
2903            .entry(txn_id)
2904            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2905            .append(key, value);
2906
2907        // Write to memtable (needs owned key/value for storage)
2908        self.memtable
2909            .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2910
2911        Ok(())
2912    }
2913
2914    /// Delete a key within a transaction
2915    pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2916        // Record write (uses InlineKey - zero allocation for small keys)
2917        self.mvcc.record_write(txn_id, &key);
2918
2919        // Buffer tombstone in memory - NO WAL lock taken!
2920        self.txn_write_buffers
2921            .entry(txn_id)
2922            .or_insert_with(|| TxnWalBuffer::new(txn_id))
2923            .append(&key, &[]); // Empty value = tombstone
2924
2925        // Write tombstone to memtable
2926        self.memtable.write(key, None, txn_id)?;
2927
2928        Ok(())
2929    }
2930
2931    /// Batch write multiple key-value pairs with reduced overhead
2932    ///
2933    /// This API amortizes fixed costs over the batch:
2934    /// - Single DashMap entry lookup for TxnWalBuffer
2935    /// - Single MVCC write set update
2936    /// - Batch memtable operations
2937    ///
2938    /// Performance: ~2-3x faster than individual write_refs calls
2939    /// for batches of 100+ entries.
2940    ///
2941    /// # Arguments
2942    /// * `txn_id` - Transaction ID
2943    /// * `writes` - Slice of (key, value) pairs
2944    #[inline]
2945    pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2946        if writes.is_empty() {
2947            return Ok(());
2948        }
2949
2950        // Single DashMap access for entire batch
2951        let mut buffer_entry = self
2952            .txn_write_buffers
2953            .entry(txn_id)
2954            .or_insert_with(|| TxnWalBuffer::new(txn_id));
2955
2956        // Batch operations with reduced per-row overhead
2957        for (key, value) in writes {
2958            // Record write for MVCC
2959            self.mvcc.record_write(txn_id, key);
2960
2961            // Append to WAL buffer
2962            buffer_entry.append(key, value);
2963        }
2964        drop(buffer_entry);
2965
2966        // Batch write to memtable
2967        let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2968            .iter()
2969            .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2970            .collect();
2971        self.memtable.write_batch(&owned_writes, txn_id)?;
2972
2973        Ok(())
2974    }
2975
2976    /// Commit a transaction
2977    ///
2978    /// With sync_mode:
2979    /// - 0 (OFF): No sync, risk of data loss
2980    /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2981    /// - 2 (FULL): Sync every commit (safest, slowest)
2982    pub fn commit(&self, txn_id: u64) -> Result<u64> {
2983        // First, flush all buffered DATA writes to the WAL with a SINGLE lock
2984        // acquisition (O(1) lock instead of O(writes) locks). Flushing data
2985        // records before validation is safe: ARIES recovery only treats them as
2986        // winners if a *durable commit record* for this transaction also exists,
2987        // and that record is written below — strictly after validation succeeds.
2988        if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2989            && !buffer.is_empty()
2990        {
2991            // Flush entire buffer to WAL with one lock
2992            self.wal.flush_buffer(&buffer)?;
2993        }
2994
2995        // ====================================================================
2996        // Task 1 — Linearizable commit invariant:
2997        //
2998        //   A commit record must NEVER become durable unless the transaction
2999        //   has already passed SSI validation and its write set is final.
3000        //
3001        // We therefore VALIDATE (and freeze the write set) *before* making the
3002        // commit record durable. `mvcc.commit` performs SSI validation and
3003        // returns `None` on a dangerous structure (serialization conflict) or
3004        // if the transaction no longer exists. Writing/fsyncing the commit
3005        // record before this point would let a crash resurrect a transaction
3006        // that the live system rejected — the classic "committed-after-crash,
3007        // aborted-before-crash" non-linearizability bug.
3008        // ====================================================================
3009        let (commit_ts, write_set) = self.mvcc.commit(txn_id).ok_or_else(|| {
3010            SochDBError::Validation(
3011                "transaction aborted: SSI validation failed (serialization conflict) \
3012                 or transaction not found"
3013                    .into(),
3014            )
3015        })?;
3016
3017        // Validation passed and the write set is frozen. Only now may the
3018        // commit record become durable.
3019        if let Some(gc) = &self.group_commit {
3020            // Submit the *validated* commit intent to group commit and wait.
3021            // This batches multiple validated commits into a single fsync.
3022            gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
3023        } else {
3024            // Direct commit path with adaptive sync (Little's Law)
3025            let sync_mode = self.sync_mode.load(Ordering::Relaxed);
3026            let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
3027
3028            // Update arrival rate for adaptive batching
3029            self.update_arrival_rate();
3030
3031            // Write commit record (no flush yet - BufWriter will buffer it)
3032            let entry = TxnWalEntry::txn_commit(txn_id);
3033            self.wal.append_no_flush(&entry)?;
3034
3035            // Determine if we should sync/flush based on mode
3036            let should_sync = match sync_mode {
3037                0 => false,                                      // OFF: never sync
3038                1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
3039                _ => true,                                       // FULL: always sync
3040            };
3041
3042            if should_sync {
3043                // Measure fsync latency for adaptive tuning
3044                let start = std::time::Instant::now();
3045                self.wal.flush()?;
3046                self.wal.sync()?;
3047                let latency_us = start.elapsed().as_micros() as u64;
3048
3049                // Update fsync latency estimate (EMA with α = 0.1)
3050                let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
3051                let new_latency = (old_latency * 9 + latency_us) / 10;
3052                self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
3053
3054                self.commits_since_sync.store(0, Ordering::Relaxed);
3055            }
3056        }
3057
3058        // Commit record is durable (or buffered per sync mode) for a validated
3059        // transaction — publish the writes to the memtable. (O(k), k = keys.)
3060        self.memtable.commit(txn_id, commit_ts, &write_set);
3061
3062        Ok(commit_ts)
3063    }
3064
3065    /// Update arrival rate using exponential moving average
3066    #[inline]
3067    fn update_arrival_rate(&self) {
3068        let now_us = std::time::SystemTime::now()
3069            .duration_since(std::time::UNIX_EPOCH)
3070            .unwrap()
3071            .as_micros() as u64;
3072
3073        let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
3074
3075        if last > 0 {
3076            let delta_us = now_us.saturating_sub(last);
3077            if delta_us > 0 && delta_us < 10_000_000 {
3078                // Ignore gaps > 10s
3079                // Rate = 1_000_000 / delta_us (requests/sec)
3080                // Stored as rate × 1000 for precision
3081                let instant_rate = 1_000_000_000 / delta_us;
3082
3083                // EMA with α = 0.1
3084                let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
3085                let new_rate = (old_rate * 9 + instant_rate) / 10;
3086                self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
3087            }
3088        }
3089    }
3090
3091    /// Compute optimal batch threshold using Little's Law
3092    ///
3093    /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
3094    /// Returns the number of commits to batch before fsync
3095    #[inline]
3096    fn adaptive_batch_threshold(&self) -> u64 {
3097        let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
3098        let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
3099
3100        if lambda <= 0.0 || tau <= 0.0 {
3101            return 100; // Fallback to fixed threshold
3102        }
3103
3104        // Little's Law: W* = sqrt(2 × τ × λ)
3105        // This minimizes total time = wait_time + fsync_overhead
3106        let n_opt = (2.0 * tau * lambda).sqrt();
3107
3108        // Clamp between 1 and 1000
3109        (n_opt as u64).clamp(1, 1000)
3110    }
3111
3112    /// Set synchronous mode
3113    ///
3114    /// - 0: OFF - No fsync (risk of data loss)
3115    /// - 1: NORMAL - Periodic fsync (balanced)
3116    /// - 2: FULL - Fsync every commit (safest)
3117    pub fn set_sync_mode(&self, mode: u64) {
3118        self.sync_mode.store(mode.min(2), Ordering::Relaxed);
3119    }
3120
3121    /// Force a group commit flush (useful for benchmarking or testing)
3122    pub fn flush_group_commit(&self) {
3123        if let Some(gc) = &self.group_commit {
3124            gc.flush_batch();
3125        }
3126    }
3127
3128    /// Abort a transaction
3129    ///
3130    /// Performance: O(1) for read-only transactions (no writes to clean up).
3131    /// For write transactions, O(N) memtable scan is required to remove
3132    /// uncommitted versions.
3133    pub fn abort(&self, txn_id: u64) -> Result<()> {
3134        // Check if transaction had any buffered writes.
3135        // Read-only transactions never populate txn_write_buffers,
3136        // so this returns None — allowing us to skip the O(N) memtable scan.
3137        let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3138
3139        if had_writes {
3140            // Write abort record to WAL (only needed if data was written)
3141            self.wal.abort_transaction(txn_id)?;
3142            // Clean up uncommitted memtable entries
3143            self.memtable.abort(txn_id);
3144        }
3145
3146        // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3147        self.mvcc.abort(txn_id);
3148
3149        Ok(())
3150    }
3151
3152    /// Scan keys with prefix
3153    #[inline]
3154    pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3155        // Fast path: get just snapshot_ts without cloning whole transaction
3156        let snapshot_ts = self
3157            .mvcc
3158            .get_snapshot_ts(txn_id)
3159            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3160
3161        // Note: Scan doesn't record individual key reads for SSI (too expensive)
3162        // SSI conflicts are tracked at the prefix level if needed
3163        Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3164    }
3165
3166    /// Scan keys in range
3167    #[inline]
3168    pub fn scan_range(
3169        &self,
3170        txn_id: u64,
3171        start: &[u8],
3172        end: &[u8],
3173    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3174        let snapshot_ts = self
3175            .mvcc
3176            .get_snapshot_ts(txn_id)
3177            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3178
3179        Ok(self
3180            .memtable
3181            .scan_range(start, end, snapshot_ts, Some(txn_id)))
3182    }
3183
3184    /// Streaming scan for very large result sets
3185    ///
3186    /// Returns an iterator that yields (key, value) pairs without
3187    /// materializing the entire result set in memory.
3188    #[inline]
3189    pub fn scan_range_iter<'a>(
3190        &'a self,
3191        txn_id: u64,
3192        start: &'a [u8],
3193        end: &'a [u8],
3194    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3195        let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3196        self.memtable
3197            .scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3198    }
3199
3200    /// Force fsync to disk
3201    /// Flush the WAL's in-memory buffer to the OS
3202    ///
3203    /// This ensures all buffered writes are pushed from the BufWriter
3204    /// into the OS page cache. Call this before `fsync()` to ensure
3205    /// all data is durable.
3206    pub fn flush_wal(&self) -> Result<()> {
3207        self.wal.flush()
3208    }
3209
3210    /// Force sync the WAL to disk (fsync)
3211    pub fn fsync(&self) -> Result<()> {
3212        self.wal.sync()
3213    }
3214
3215    /// Write checkpoint
3216    pub fn checkpoint(&self) -> Result<u64> {
3217        let txn_id = 0; // System transaction
3218        let entry = TxnWalEntry::checkpoint(txn_id);
3219        let lsn = self.wal.append_sync(&entry)?;
3220        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3221        Ok(lsn)
3222    }
3223
3224    /// Truncate the WAL file after checkpoint.
3225    ///
3226    /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3227    /// space. The in-memory memtable retains all data for the current
3228    /// session, but a crash after truncation will result in data loss
3229    /// since the WAL is the only persistence mechanism for DurableStorage.
3230    ///
3231    /// Call after `checkpoint()` when WAL durability across restarts is
3232    /// not required (e.g. desktop telemetry viewers, caches).
3233    pub fn truncate_wal(&self) -> Result<()> {
3234        self.wal.truncate()
3235    }
3236
3237    /// Get storage statistics
3238    pub fn stats(&self) -> StorageStats {
3239        // Get WAL size from the WAL manager
3240        let wal_size = self.wal.size_bytes();
3241
3242        // Get active transaction count from MVCC
3243        let active_txns = self.mvcc.active_transaction_count();
3244
3245        StorageStats {
3246            memtable_size_bytes: self.memtable.size(),
3247            wal_size_bytes: wal_size,
3248            active_transactions: active_txns,
3249            min_active_snapshot: self.mvcc.min_active_snapshot(),
3250            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3251        }
3252    }
3253
3254    /// Garbage collect old versions
3255    pub fn gc(&self) -> usize {
3256        let min_ts = self.mvcc.min_active_snapshot();
3257        self.memtable.gc(min_ts)
3258    }
3259
3260    /// Clean shutdown
3261    pub fn shutdown(&self) -> Result<()> {
3262        // Sync WAL
3263        self.fsync()?;
3264
3265        // Write clean shutdown marker
3266        let marker_path = self.path.join(".clean_shutdown");
3267        std::fs::write(&marker_path, b"clean")?;
3268
3269        Ok(())
3270    }
3271}
3272
3273impl Drop for DurableStorage {
3274    fn drop(&mut self) {
3275        let _ = self.shutdown();
3276    }
3277}
3278
3279// =============================================================================
3280// EphemeralHandle - Temp-directory-backed DurableStorage for testing
3281// =============================================================================
3282
3283/// Owns a `DurableStorage` instance backed by a temporary directory.
3284///
3285/// The temp directory is automatically cleaned up when this handle is dropped.
3286/// Access the underlying storage via `Deref` coercion or `.storage()`.
3287///
3288/// # Why this exists
3289///
3290/// SochDB previously had two storage engines — `LscsStorage` (BTreeMap-backed,
3291/// in-memory WAL, used by tests) and `DurableStorage` (SkipMap-backed, real WAL,
3292/// used in production). This dual-engine architecture meant bugs could surface
3293/// only in production because the test path exercised different code.
3294///
3295/// `EphemeralHandle` eliminates this divergence: tests use the exact same
3296/// `DurableStorage` engine as production, just backed by a temp directory.
3297pub struct EphemeralHandle {
3298    storage: DurableStorage,
3299    _tmpdir: tempfile::TempDir,
3300}
3301
3302impl EphemeralHandle {
3303    /// Get a reference to the underlying storage
3304    pub fn storage(&self) -> &DurableStorage {
3305        &self.storage
3306    }
3307
3308    /// Get a mutable reference to the underlying storage
3309    pub fn storage_mut(&mut self) -> &mut DurableStorage {
3310        &mut self.storage
3311    }
3312
3313    /// Consume the handle and return the storage and temp directory.
3314    ///
3315    /// Useful when you need an `Arc<DurableStorage>` — the caller must keep
3316    /// the `TempDir` alive for the lifetime of the storage.
3317    pub fn into_parts(self) -> (DurableStorage, tempfile::TempDir) {
3318        (self.storage, self._tmpdir)
3319    }
3320}
3321
3322impl std::ops::Deref for EphemeralHandle {
3323    type Target = DurableStorage;
3324    fn deref(&self) -> &DurableStorage {
3325        &self.storage
3326    }
3327}
3328
3329impl std::ops::DerefMut for EphemeralHandle {
3330    fn deref_mut(&mut self) -> &mut DurableStorage {
3331        &mut self.storage
3332    }
3333}
3334
3335/// Recovery statistics
3336#[derive(Debug, Default)]
3337pub struct RecoveryStats {
3338    pub transactions_recovered: usize,
3339    pub writes_recovered: usize,
3340    pub commit_ts: u64,
3341}
3342
3343/// Storage statistics
3344#[derive(Debug, Default)]
3345pub struct StorageStats {
3346    pub memtable_size_bytes: u64,
3347    pub wal_size_bytes: u64,
3348    pub active_transactions: usize,
3349    pub min_active_snapshot: u64,
3350    pub last_checkpoint_lsn: u64,
3351}
3352
3353#[cfg(test)]
3354mod tests {
3355    use super::*;
3356    use tempfile::tempdir;
3357
3358    #[test]
3359    fn test_basic_transaction() {
3360        let dir = tempdir().unwrap();
3361        let storage = DurableStorage::open(dir.path()).unwrap();
3362
3363        // Begin transaction
3364        let txn_id = storage.begin_transaction().unwrap();
3365
3366        // Write data
3367        storage
3368            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3369            .unwrap();
3370        storage
3371            .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3372            .unwrap();
3373
3374        // Read back (within same transaction)
3375        let v1 = storage.read(txn_id, b"key1").unwrap();
3376        assert_eq!(v1, Some(b"value1".to_vec()));
3377
3378        // Commit
3379        let commit_ts = storage.commit(txn_id).unwrap();
3380        assert!(commit_ts > 0);
3381
3382        // Read in new transaction
3383        let txn2 = storage.begin_transaction().unwrap();
3384        let v1 = storage.read(txn2, b"key1").unwrap();
3385        assert_eq!(v1, Some(b"value1".to_vec()));
3386        storage.abort(txn2).unwrap();
3387    }
3388
3389    #[test]
3390    fn test_snapshot_isolation() {
3391        let dir = tempdir().unwrap();
3392        let storage = DurableStorage::open(dir.path()).unwrap();
3393
3394        // T1: Write initial value
3395        let t1 = storage.begin_transaction().unwrap();
3396        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3397        storage.commit(t1).unwrap();
3398
3399        // T2: Start reading (snapshot at this point)
3400        let t2 = storage.begin_transaction().unwrap();
3401
3402        // T3: Update the value
3403        let t3 = storage.begin_transaction().unwrap();
3404        storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3405        storage.commit(t3).unwrap();
3406
3407        // T2 should still see v1 (snapshot isolation)
3408        let v = storage.read(t2, b"key").unwrap();
3409        assert_eq!(v, Some(b"v1".to_vec()));
3410
3411        // New transaction should see v2
3412        let t4 = storage.begin_transaction().unwrap();
3413        let v = storage.read(t4, b"key").unwrap();
3414        assert_eq!(v, Some(b"v2".to_vec()));
3415
3416        storage.abort(t2).unwrap();
3417        storage.abort(t4).unwrap();
3418    }
3419
3420    #[test]
3421    fn test_gc_preserves_versions_for_active_snapshot() {
3422        // GC must not free a version that an in-flight reader's snapshot can
3423        // still observe. The low-water-mark (min_active_snapshot) is the oldest
3424        // live reader; any version visible to it must survive GC.
3425        let dir = tempdir().unwrap();
3426        let storage = DurableStorage::open(dir.path()).unwrap();
3427
3428        // Seed an initial committed version v1.
3429        let t1 = storage.begin_transaction().unwrap();
3430        storage.write(t1, b"k".to_vec(), b"v1".to_vec()).unwrap();
3431        storage.commit(t1).unwrap();
3432
3433        // Open a long-lived snapshot reader BEFORE any newer writes. Its
3434        // snapshot_ts pins the GC watermark so v1 cannot be reclaimed.
3435        let reader = storage.begin_transaction().unwrap();
3436        assert_eq!(
3437            storage.read(reader, b"k").unwrap(),
3438            Some(b"v1".to_vec()),
3439            "reader's snapshot must see v1"
3440        );
3441
3442        // Produce several newer committed versions while the reader is active.
3443        for v in ["v2", "v3", "v4"] {
3444            let w = storage.begin_transaction().unwrap();
3445            storage
3446                .write(w, b"k".to_vec(), v.as_bytes().to_vec())
3447                .unwrap();
3448            storage.commit(w).unwrap();
3449        }
3450
3451        // Run GC. Because `reader` is still active, the watermark is pinned at
3452        // its snapshot and the version it needs (v1) must NOT be freed.
3453        let _freed = storage.gc();
3454        assert_eq!(
3455            storage.read(reader, b"k").unwrap(),
3456            Some(b"v1".to_vec()),
3457            "GC must not free a version still visible to an active snapshot"
3458        );
3459
3460        // A fresh transaction sees the latest committed version.
3461        let fresh = storage.begin_transaction().unwrap();
3462        assert_eq!(storage.read(fresh, b"k").unwrap(), Some(b"v4".to_vec()));
3463        storage.abort(fresh).unwrap();
3464
3465        // Release the old reader; the watermark can now advance.
3466        storage.abort(reader).unwrap();
3467
3468        // After the old snapshot is gone, GC may reclaim superseded versions,
3469        // and new readers still resolve the latest value correctly.
3470        let _freed2 = storage.gc();
3471        let after = storage.begin_transaction().unwrap();
3472        assert_eq!(storage.read(after, b"k").unwrap(), Some(b"v4".to_vec()));
3473        storage.abort(after).unwrap();
3474    }
3475
3476    #[test]
3477    fn test_ssi_detects_write_skew() {
3478        // Classic write-skew: two concurrent transactions each read what the
3479        // other writes (disjoint write keys). Under plain SI both would commit,
3480        // violating serializability. Under SSI the pivot transaction (with both
3481        // an inbound and an outbound rw-edge) must be aborted, so the two
3482        // commits cannot both succeed.
3483        let dir = tempdir().unwrap();
3484        let storage = DurableStorage::open(dir.path()).unwrap();
3485
3486        // Seed two keys.
3487        let seed = storage.begin_transaction().unwrap();
3488        storage.write(seed, b"x".to_vec(), b"0".to_vec()).unwrap();
3489        storage.write(seed, b"y".to_vec(), b"0".to_vec()).unwrap();
3490        storage.commit(seed).unwrap();
3491
3492        // T1 reads x and y, then writes x.
3493        let t1 = storage.begin_transaction().unwrap();
3494        let _ = storage.read(t1, b"x").unwrap();
3495        let _ = storage.read(t1, b"y").unwrap();
3496
3497        // T2 reads x and y, then writes y. (Concurrent with T1.)
3498        let t2 = storage.begin_transaction().unwrap();
3499        let _ = storage.read(t2, b"x").unwrap();
3500        let _ = storage.read(t2, b"y").unwrap();
3501
3502        storage.write(t1, b"x".to_vec(), b"1".to_vec()).unwrap();
3503        storage.write(t2, b"y".to_vec(), b"1".to_vec()).unwrap();
3504
3505        // First committer wins.
3506        let c1 = storage.commit(t1);
3507        assert!(c1.is_ok(), "first committer should succeed: {c1:?}");
3508
3509        // The second committer forms a dangerous rw-structure with T1 and must
3510        // be rejected to preserve serializability.
3511        let c2 = storage.commit(t2);
3512        assert!(
3513            c2.is_err(),
3514            "SSI must abort the write-skew pivot, but commit succeeded"
3515        );
3516    }
3517
3518    #[test]
3519    fn test_abort_transaction() {
3520        let dir = tempdir().unwrap();
3521        let storage = DurableStorage::open(dir.path()).unwrap();
3522
3523        // Write initial value
3524        let t1 = storage.begin_transaction().unwrap();
3525        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3526        storage.commit(t1).unwrap();
3527
3528        // Start transaction that will abort
3529        let t2 = storage.begin_transaction().unwrap();
3530        storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3531        storage.abort(t2).unwrap();
3532
3533        // New transaction should see v1 (aborted changes not visible)
3534        let t3 = storage.begin_transaction().unwrap();
3535        let v = storage.read(t3, b"key").unwrap();
3536        assert_eq!(v, Some(b"v1".to_vec()));
3537        storage.abort(t3).unwrap();
3538    }
3539
3540    #[test]
3541    fn test_crash_recovery() {
3542        let dir = tempdir().unwrap();
3543
3544        // Phase 1: Write data and commit
3545        {
3546            // Use open_without_lock for crash simulation tests
3547            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3548
3549            // Set sync mode to FULL to ensure data is synced before "crash"
3550            storage.set_sync_mode(2); // FULL: sync every commit
3551
3552            let txn = storage.begin_transaction().unwrap();
3553            storage
3554                .write(txn, b"persist".to_vec(), b"data".to_vec())
3555                .unwrap();
3556            storage.commit(txn).unwrap();
3557
3558            // Simulate crash (no clean shutdown)
3559            std::mem::forget(storage);
3560        }
3561
3562        // Phase 2: Reopen and recover
3563        {
3564            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3565            let stats = storage.recover().unwrap();
3566            assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3567
3568            // Data should be recovered
3569            let txn = storage.begin_transaction().unwrap();
3570            let v = storage.read(txn, b"persist").unwrap();
3571            assert_eq!(v, Some(b"data".to_vec()));
3572            storage.abort(txn).unwrap();
3573        }
3574    }
3575
3576    #[test]
3577    fn test_scan_prefix() {
3578        let dir = tempdir().unwrap();
3579        let storage = DurableStorage::open(dir.path()).unwrap();
3580
3581        let txn = storage.begin_transaction().unwrap();
3582        storage
3583            .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3584            .unwrap();
3585        storage
3586            .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3587            .unwrap();
3588        storage
3589            .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3590            .unwrap();
3591        storage.commit(txn).unwrap();
3592
3593        let txn2 = storage.begin_transaction().unwrap();
3594        let users = storage.scan(txn2, b"user:").unwrap();
3595        assert_eq!(users.len(), 2);
3596        storage.abort(txn2).unwrap();
3597    }
3598
3599    #[test]
3600    fn test_delete() {
3601        let dir = tempdir().unwrap();
3602        let storage = DurableStorage::open(dir.path()).unwrap();
3603
3604        // Insert
3605        let t1 = storage.begin_transaction().unwrap();
3606        storage
3607            .write(t1, b"key".to_vec(), b"value".to_vec())
3608            .unwrap();
3609        storage.commit(t1).unwrap();
3610
3611        // Verify exists
3612        let t2 = storage.begin_transaction().unwrap();
3613        assert!(storage.read(t2, b"key").unwrap().is_some());
3614        storage.abort(t2).unwrap();
3615
3616        // Delete
3617        let t3 = storage.begin_transaction().unwrap();
3618        storage.delete(t3, b"key".to_vec()).unwrap();
3619        storage.commit(t3).unwrap();
3620
3621        // Verify deleted
3622        let t4 = storage.begin_transaction().unwrap();
3623        assert!(storage.read(t4, b"key").unwrap().is_none());
3624        storage.abort(t4).unwrap();
3625    }
3626
3627    #[test]
3628    fn test_gc() {
3629        let dir = tempdir().unwrap();
3630        let storage = DurableStorage::open(dir.path()).unwrap();
3631
3632        // Create multiple versions
3633        for i in 0..10 {
3634            let txn = storage.begin_transaction().unwrap();
3635            storage
3636                .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3637                .unwrap();
3638            storage.commit(txn).unwrap();
3639        }
3640
3641        // GC should reclaim old versions
3642        let gc_count = storage.gc();
3643        // At least some versions should be collected
3644        // (exact count depends on implementation)
3645        let _ = gc_count; // gc_count is usize, always >= 0
3646    }
3647
3648    #[test]
3649    fn test_group_commit() {
3650        use std::sync::Arc;
3651        use std::thread;
3652
3653        let dir = tempdir().unwrap();
3654        let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3655
3656        // Spawn multiple threads to commit concurrently
3657        let mut handles = vec![];
3658        for i in 0..4 {
3659            let storage = Arc::clone(&storage);
3660            handles.push(thread::spawn(move || {
3661                let txn = storage.begin_transaction().unwrap();
3662                storage
3663                    .write(
3664                        txn,
3665                        format!("key{}", i).into_bytes(),
3666                        format!("val{}", i).into_bytes(),
3667                    )
3668                    .unwrap();
3669                storage.commit(txn).unwrap()
3670            }));
3671        }
3672
3673        // Wait for all commits
3674        let mut commit_times = vec![];
3675        for h in handles {
3676            commit_times.push(h.join().unwrap());
3677        }
3678
3679        // All commits should succeed
3680        assert!(commit_times.iter().all(|&ts| ts > 0));
3681
3682        // Verify data persisted
3683        let txn = storage.begin_transaction().unwrap();
3684        for i in 0..4 {
3685            let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3686            assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3687        }
3688        storage.abort(txn).unwrap();
3689    }
3690
3691    // ==================== ArenaMvccMemTable Tests ====================
3692
3693    #[test]
3694    fn test_arena_memtable_basic_write_read() {
3695        let memtable = ArenaMvccMemTable::new();
3696
3697        // Write some values
3698        memtable
3699            .write(b"key1", Some(b"value1".to_vec()), 1)
3700            .unwrap();
3701        memtable
3702            .write(b"key2", Some(b"value2".to_vec()), 1)
3703            .unwrap();
3704
3705        // Read them back (uncommitted, so need txn_id match)
3706        assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3707        assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3708        assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3709    }
3710
3711    #[test]
3712    fn test_arena_memtable_update() {
3713        let memtable = ArenaMvccMemTable::new();
3714
3715        memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3716        memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3717
3718        assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3719    }
3720
3721    #[test]
3722    fn test_arena_memtable_delete() {
3723        let memtable = ArenaMvccMemTable::new();
3724
3725        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3726        memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3727
3728        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3729    }
3730
3731    #[test]
3732    fn test_arena_memtable_scan_prefix() {
3733        let memtable = ArenaMvccMemTable::new();
3734
3735        memtable
3736            .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3737            .unwrap();
3738        memtable
3739            .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3740            .unwrap();
3741        memtable
3742            .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3743            .unwrap();
3744        memtable
3745            .write(b"order:1", Some(b"order_data".to_vec()), 1)
3746            .unwrap();
3747
3748        // Create a write set and commit
3749        let mut write_set = HashSet::new();
3750        write_set.insert(InlineKey::from_slice(b"user:1:name"));
3751        write_set.insert(InlineKey::from_slice(b"user:1:email"));
3752        write_set.insert(InlineKey::from_slice(b"user:2:name"));
3753        write_set.insert(InlineKey::from_slice(b"order:1"));
3754        memtable.commit(1, 10, &write_set);
3755
3756        // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3757        let results = memtable.scan_prefix(b"user:1:", 11, None);
3758        assert_eq!(results.len(), 2);
3759
3760        // Scan for all users
3761        let results = memtable.scan_prefix(b"user:", 11, None);
3762        assert_eq!(results.len(), 3);
3763    }
3764
3765    #[test]
3766    fn test_arena_memtable_write_batch() {
3767        let memtable = ArenaMvccMemTable::new();
3768
3769        let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3770            (b"k1", Some(b"v1".to_vec())),
3771            (b"k2", Some(b"v2".to_vec())),
3772            (b"k3", Some(b"v3".to_vec())),
3773        ];
3774
3775        memtable.write_batch(&writes, 1).unwrap();
3776
3777        assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3778        assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3779        assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3780    }
3781
3782    #[test]
3783    fn test_arena_memtable_gc() {
3784        let memtable = ArenaMvccMemTable::new();
3785
3786        // Write multiple versions
3787        for i in 0..10 {
3788            memtable
3789                .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3790                .unwrap();
3791
3792            let mut write_set = HashSet::new();
3793            write_set.insert(InlineKey::from_slice(b"key"));
3794            memtable.commit(i + 1, (i + 1) * 10, &write_set);
3795        }
3796
3797        // GC old versions
3798        let gc_count = memtable.gc(90);
3799        let _ = gc_count; // gc_count is usize, always >= 0
3800    }
3801
3802    #[test]
3803    fn test_arena_memtable_size_tracking() {
3804        let memtable = ArenaMvccMemTable::new();
3805
3806        assert_eq!(memtable.size(), 0);
3807
3808        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3809
3810        assert!(memtable.size() > 0);
3811    }
3812
3813    #[test]
3814    fn test_arena_memtable_abort() {
3815        let memtable = ArenaMvccMemTable::new();
3816
3817        memtable
3818            .write(b"key", Some(b"uncommitted".to_vec()), 1)
3819            .unwrap();
3820
3821        // Visible to same txn
3822        assert_eq!(
3823            memtable.read(b"key", 0, Some(1)),
3824            Some(b"uncommitted".to_vec())
3825        );
3826
3827        // Not visible to other txns
3828        assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3829
3830        // Abort
3831        memtable.abort(1);
3832
3833        // No longer visible
3834        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3835    }
3836
3837    // ========================================================================
3838    // MemTableKind Tests - Unified Abstraction
3839    // ========================================================================
3840
3841    #[test]
3842    fn test_memtable_kind_standard() {
3843        let memtable = MemTableKind::new(MemTableType::Standard, true);
3844        assert_eq!(memtable.kind(), MemTableType::Standard);
3845
3846        // Write and read
3847        memtable
3848            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
3849            .unwrap();
3850
3851        // Commit transaction at ts=100
3852        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3853        memtable.commit(1, 100, &write_set);
3854
3855        // Read after commit - snapshot_ts must be > commit_ts for visibility
3856        let v = memtable.read(b"key1", 101, None);
3857        assert_eq!(v, Some(b"value1".to_vec()));
3858    }
3859
3860    #[test]
3861    fn test_memtable_kind_arena() {
3862        let memtable = MemTableKind::new(MemTableType::Arena, true);
3863        assert_eq!(memtable.kind(), MemTableType::Arena);
3864
3865        // Write and read
3866        memtable
3867            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
3868            .unwrap();
3869
3870        // Commit at ts=100
3871        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3872        memtable.commit(1, 100, &write_set);
3873
3874        // Read after commit - snapshot_ts > commit_ts
3875        let v = memtable.read(b"key1", 101, None);
3876        assert_eq!(v, Some(b"value1".to_vec()));
3877    }
3878
3879    #[test]
3880    fn test_memtable_kind_scan_range() {
3881        // Test both implementations have consistent behavior
3882        for kind in [MemTableType::Standard, MemTableType::Arena] {
3883            let memtable = MemTableKind::new(kind, true);
3884
3885            // Write some data
3886            for i in 0..5 {
3887                let key = format!("key{}", i);
3888                let value = format!("value{}", i);
3889                memtable
3890                    .write(key.into_bytes(), Some(value.into_bytes()), 1)
3891                    .unwrap();
3892            }
3893
3894            // Commit all at ts=100
3895            let write_set: HashSet<InlineKey> = (0..5)
3896                .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3897                .collect();
3898            memtable.commit(1, 100, &write_set);
3899
3900            // Scan range with snapshot_ts > commit_ts
3901            let results = memtable.scan_range(b"key1", b"key4", 101, None);
3902            assert_eq!(
3903                results.len(),
3904                3,
3905                "kind={:?} should have 3 results (key1, key2, key3)",
3906                kind
3907            );
3908        }
3909    }
3910
3911    #[test]
3912    fn test_durable_storage_arena() {
3913        let dir = tempdir().unwrap();
3914        let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3915
3916        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3917
3918        // Basic transaction should work the same
3919        let txn_id = storage.begin_transaction().unwrap();
3920        storage
3921            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3922            .unwrap();
3923        storage.commit(txn_id).unwrap();
3924
3925        let txn2 = storage.begin_transaction().unwrap();
3926        let v = storage.read(txn2, b"key1").unwrap();
3927        assert_eq!(v, Some(b"value1".to_vec()));
3928        storage.abort(txn2).unwrap();
3929    }
3930
3931    #[test]
3932    fn test_durable_storage_full_config() {
3933        let dir = tempdir().unwrap();
3934
3935        // Test with Arena and ordered index enabled
3936        let storage =
3937            DurableStorage::open_with_full_config(dir.path(), true, MemTableType::Arena).unwrap();
3938
3939        assert_eq!(storage.memtable_type(), MemTableType::Arena);
3940
3941        // Write multiple keys
3942        let txn = storage.begin_transaction().unwrap();
3943        for i in 0..10 {
3944            let key = format!("key{:02}", i);
3945            let value = format!("value{}", i);
3946            storage
3947                .write(txn, key.into_bytes(), value.into_bytes())
3948                .unwrap();
3949        }
3950        storage.commit(txn).unwrap();
3951
3952        // Scan should work (uses scan method for prefix)
3953        let txn2 = storage.begin_transaction().unwrap();
3954        let results = storage.scan(txn2, b"key0").unwrap();
3955        assert_eq!(results.len(), 10); // key00 through key09
3956        storage.abort(txn2).unwrap();
3957    }
3958}