Skip to main content

sochdb_storage/
durable_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! Wires together the live storage components into a durable engine:
21//!
22//! - WAL (txn_wal.rs) for crash-consistent durability + recovery
23//! - Group Commit for throughput
24//! - MVCC for isolation
25//! - LSCS for columnar efficiency
26//!
27//! Truth-in-capabilities: the live path provides crash-consistent WAL recovery,
28//! but NOT at-rest encryption, point-in-time recovery, ARIES checkpointing, or
29//! WAL fencing — those modules exist but are quarantined behind the empty,
30//! non-default `experimental` feature and are unwired. Query
31//! [`crate::durability_capabilities`] rather than relying on prose like
32//! "production-grade".
33//!
34//! ## Architecture
35//!
36//! ```text
37//! ┌─────────────────────────────────────────────────────────────────┐
38//! │                      DurableStorage                              │
39//! ├─────────────────────────────────────────────────────────────────┤
40//! │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
41//! │  │ MvccManager │    │ GroupCommit │───▶│ TxnWal (fsync)      │ │
42//! │  │             │    │             │    └─────────────────────┘ │
43//! │  │ ┌─────────┐ │    └─────────────┘                            │
44//! │  │ │Snapshots│ │                                                │
45//! │  │ └─────────┘ │    ┌─────────────────────────────────────────┐│
46//! │  │ ┌─────────┐ │    │              MemTable                    ││
47//! │  │ │ Txn Map │ │    │  (key → (value, txn_id, version))       ││
48//! │  │ └─────────┘ │    └─────────────────────────────────────────┘│
49//! │  └─────────────┘                                                │
50//! │                      ┌─────────────────────────────────────────┐│
51//! │                      │              LSCS (SST)                  ││
52//! │                      │  Immutable columnar segments             ││
53//! │                      └─────────────────────────────────────────┘│
54//! └─────────────────────────────────────────────────────────────────┘
55//! ```
56//!
57//! ## Concurrency
58//!
59//! - Writers: Serialize through WAL, use MVCC for conflict detection
60//! - Readers: Lock-free reads at snapshot timestamp
61//! - Commits: Batched through GroupCommit for throughput
62//!
63//! ## Isolation Contract
64//!
65//! The live write path (`MvccManager`, used by `DurableStorage`) provides
66//! **Serializable Snapshot Isolation (SSI)**, which is strictly stronger than
67//! plain Snapshot Isolation (SI):
68//!
69//! - **Snapshot reads.** Every transaction reads from a consistent snapshot
70//!   fixed at `begin_transaction()` (its `snapshot_ts`). Concurrent commits are
71//!   invisible to it — see `test_snapshot_isolation`. This alone is SI and is
72//!   vulnerable to **write skew** (two transactions each read a set the other
73//!   writes, both commit, and no serial order reproduces the result).
74//! - **Write-skew prevention.** On commit, `MvccManager::validate_ssi` inspects
75//!   recently-committed concurrent transactions for rw-antidependency edges:
76//!   an inbound edge (another txn wrote a key we read, `T_other →rw→ T_me`) and
77//!   an outbound edge (we wrote a key another txn read, `T_me →rw→ T_other`).
78//!   When a transaction sits on **both** an inbound and an outbound rw-edge it
79//!   is the pivot of a potential dependency cycle (Cahill/Fekete "dangerous
80//!   structure"), so it is aborted. This is the conservative safe subset of the
81//!   SSI test: it may abort some serializable schedules (false positives) but
82//!   **never admits a non-serializable one** (no false negatives), so the
83//!   externally observable isolation level is Serializable.
84//! - **Read-only transactions** (`begin_read_only`) skip read-set tracking and
85//!   never participate in validation; they always observe a serializable
86//!   snapshot and never abort.
87//!
88//! ### MVCC garbage collection is snapshot-safe
89//!
90//! Old versions are pruned by `DurableStorage::gc()`, which feeds the
91//! **low-water-mark** `MvccManager::min_active_snapshot()` — the minimum
92//! `snapshot_ts` across all still-active transactions — into
93//! `MvccMemTable::gc()` and then `BinarySearchChain::gc_by_ts()`. The chain
94//! retains every version with `commit_ts > min_active_ts` **plus one anchor
95//! version at or below the watermark**, so any in-flight reader can still
96//! resolve the correct version for its snapshot. A version is only freed once
97//! **no active snapshot can observe it**. The watermark is recomputed on every
98//! `begin`/`commit`/`abort`, so it is monotonic with respect to the oldest live
99//! reader. See `test_gc_preserves_versions_for_active_snapshot`.
100
101use std::collections::HashSet;
102use std::path::{Path, PathBuf};
103use std::sync::Arc;
104use std::sync::atomic::{AtomicU64, Ordering};
105
106use dashmap::DashMap;
107use smallvec::SmallVec;
108
109use crossbeam_skiplist::SkipMap;
110
111use crate::DurabilityCapabilities;
112use crate::deferred_index::{DeferredIndexConfig, DeferredSortedIndex};
113use crate::encryption::EncryptionKey;
114use crate::group_commit::EventDrivenGroupCommit;
115use crate::keyring;
116use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
117use sochdb_core::version_chain::{
118    BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
119    VisibilityContext, WriteConflictDetection,
120};
121use sochdb_core::{Result, SochDBError};
122
123// =============================================================================
124// SSI Bloom Filter - Fast Conflict Pre-Filtering
125// =============================================================================
126
127/// Space-efficient Bloom filter for SSI conflict detection
128///
129/// Used to quickly determine if two transactions MIGHT have conflicting keys.
130/// False positives are acceptable (leads to unnecessary exact checks),
131/// but false negatives are not allowed.
132///
133/// ## Configuration
134///
135/// For 1000 keys with 1% false positive rate:
136/// - m = ~9600 bits ≈ 1.2 KB per transaction
137/// - k = 7 hash functions
138///
139/// ## Lazy Initialization
140///
141/// The bit vector is lazily initialized on first insert to avoid
142/// allocation overhead for read-only transactions.
143#[derive(Clone, Debug)]
144pub struct SsiBloomFilter {
145    /// Bit vector (each u64 holds 64 bits) - lazily initialized
146    bits: Option<Vec<u64>>,
147    /// Expected capacity (used for lazy init sizing)
148    expected_capacity: usize,
149    /// Number of hash functions to use
150    num_hashes: u32,
151}
152
153impl SsiBloomFilter {
154    /// Optimal number of bits per item for 1% false positive rate
155    /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
156    const BITS_PER_ITEM: f64 = 9.6;
157
158    /// Optimal number of hash functions for 1% false positive rate
159    /// k = (m/n) × ln(2) ≈ 7
160    const DEFAULT_NUM_HASHES: u32 = 7;
161
162    /// Minimum capacity to avoid tiny filters
163    const MIN_CAPACITY: usize = 64;
164
165    /// Create a new bloom filter for expected item count (lazy allocation)
166    ///
167    /// Configured for ~1% false positive rate.
168    /// The bit vector is not allocated until first insert.
169    #[inline]
170    pub fn new(expected_items: usize) -> Self {
171        Self {
172            bits: None,
173            expected_capacity: expected_items.max(Self::MIN_CAPACITY),
174            num_hashes: Self::DEFAULT_NUM_HASHES,
175        }
176    }
177
178    /// Create with specific capacity in words (for memory-constrained scenarios)
179    pub fn with_word_capacity(words: usize) -> Self {
180        Self {
181            bits: None,
182            expected_capacity: words.max(1) * 64 / 10, // Approx items from words
183            num_hashes: Self::DEFAULT_NUM_HASHES,
184        }
185    }
186
187    /// Ensure bits are allocated (lazy initialization)
188    #[inline]
189    fn ensure_allocated(&mut self) {
190        if self.bits.is_none() {
191            let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
192            let num_words = num_bits.div_ceil(64);
193            self.bits = Some(vec![0u64; num_words]);
194        }
195    }
196
197    /// Add a key to the filter - O(k) where k = num_hashes
198    #[inline]
199    pub fn insert(&mut self, key: &[u8]) {
200        self.ensure_allocated();
201        let bits = self.bits.as_mut().unwrap();
202        let num_bits = bits.len() * 64;
203        if num_bits == 0 {
204            return;
205        }
206
207        // Use two hash functions to simulate k hash functions
208        // h(i) = h1 + i * h2 (double hashing technique)
209        let h1 = Self::hash1(key);
210        let h2 = Self::hash2(key);
211
212        for i in 0..self.num_hashes {
213            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
214            let bit_idx = (h as usize) % num_bits;
215            let word_idx = bit_idx / 64;
216            let bit_pos = bit_idx % 64;
217            bits[word_idx] |= 1 << bit_pos;
218        }
219    }
220
221    /// Check if a key might be present - O(k)
222    ///
223    /// Returns:
224    /// - false: Key is definitely NOT in the set (or filter not initialized)
225    /// - true: Key MIGHT be in the set (needs exact check)
226    #[inline]
227    pub fn may_contain(&self, key: &[u8]) -> bool {
228        let bits = match &self.bits {
229            Some(b) => b,
230            None => return false, // Uninitialized = empty
231        };
232        let num_bits = bits.len() * 64;
233        if num_bits == 0 {
234            return false;
235        }
236
237        let h1 = Self::hash1(key);
238        let h2 = Self::hash2(key);
239
240        for i in 0..self.num_hashes {
241            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
242            let bit_idx = (h as usize) % num_bits;
243            let word_idx = bit_idx / 64;
244            let bit_pos = bit_idx % 64;
245            if bits[word_idx] & (1 << bit_pos) == 0 {
246                return false; // Definitely not present
247            }
248        }
249        true // Might be present
250    }
251
252    /// Check if this filter might intersect with another
253    ///
254    /// Fast O(m/64) check using bitwise AND of all words.
255    /// If no bits are shared, sets are definitely disjoint.
256    #[inline]
257    pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
258        let (self_bits, other_bits) = match (&self.bits, &other.bits) {
259            (Some(s), Some(o)) => (s, o),
260            _ => return false, // Either uninitialized = no intersection
261        };
262        let min_len = self_bits.len().min(other_bits.len());
263        for i in 0..min_len {
264            if self_bits[i] & other_bits[i] != 0 {
265                return true; // Might intersect
266            }
267        }
268        false // Definitely disjoint
269    }
270
271    /// First hash function (using built-in hasher)
272    #[inline]
273    fn hash1(key: &[u8]) -> u64 {
274        use std::collections::hash_map::DefaultHasher;
275        use std::hash::{Hash, Hasher};
276        let mut hasher = DefaultHasher::new();
277        key.hash(&mut hasher);
278        hasher.finish()
279    }
280
281    /// Second hash function (using twox-hash for independence)
282    #[inline]
283    fn hash2(key: &[u8]) -> u64 {
284        twox_hash::xxh3::hash64(key)
285    }
286
287    /// Get the memory size in bytes
288    pub fn size_bytes(&self) -> usize {
289        self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
290    }
291
292    /// Check if the filter is empty
293    pub fn is_empty(&self) -> bool {
294        match &self.bits {
295            Some(bits) => bits.iter().all(|&w| w == 0),
296            None => true,
297        }
298    }
299}
300
301/// Type alias for inline key storage - keys up to 32 bytes stored on stack
302/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
303pub type InlineKey = SmallVec<[u8; 32]>;
304
305/// Version of a key-value pair
306#[derive(Debug, Clone)]
307pub struct Version {
308    /// The value (None = tombstone)
309    pub value: Option<Vec<u8>>,
310    /// Transaction that created this version
311    pub txn_id: u64,
312    /// Commit timestamp (0 = uncommitted)
313    pub commit_ts: u64,
314}
315
316// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
317impl ChainEntry for Version {
318    #[inline]
319    fn commit_ts(&self) -> u64 {
320        self.commit_ts
321    }
322    #[inline]
323    fn txn_id(&self) -> u64 {
324        self.txn_id
325    }
326    #[inline]
327    fn set_commit_ts(&mut self, ts: u64) {
328        self.commit_ts = ts;
329    }
330}
331
332// ============================================================================
333// Optimized VersionChain with Binary Search (Task 1: mm.md)
334// ============================================================================
335
336/// Multi-version data for a single key with O(log v) read complexity
337///
338/// ## Optimization: Binary Search with Sorted Commit Ordering
339///
340/// Separates committed versions (sorted descending by commit_ts) from
341/// uncommitted version (single optional slot per transaction).
342///
343/// **Before:** O(v) linear scan + O(v) max computation = O(v)
344/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
345///
346/// For v=10 versions: 3.3x speedup
347/// For v=100 versions: 7x speedup
348///
349/// ## Rec 11: Consolidated
350///
351/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
352/// eliminating duplication with `mvcc_concurrent::VersionChain`.
353#[derive(Debug, Default)]
354pub struct VersionChain {
355    /// Consolidated binary-search chain (Rec 11)
356    inner: BinarySearchChain<Version>,
357}
358
359impl VersionChain {
360    /// Create a new empty version chain
361    #[inline]
362    pub fn new() -> Self {
363        Self {
364            inner: BinarySearchChain::new(),
365        }
366    }
367
368    /// Add a new uncommitted version
369    /// If there's already an uncommitted version from this txn, update it in place
370    ///
371    /// O(1) - just updates the uncommitted slot
372    #[inline]
373    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
374        match self.inner.uncommitted_mut() {
375            Some(v) if v.txn_id == txn_id => {
376                // Update in place - O(1)
377                v.value = value;
378            }
379            _ => {
380                // New or different txn — set the slot
381                self.inner.set_uncommitted(Version {
382                    value,
383                    txn_id,
384                    commit_ts: 0,
385                });
386            }
387        }
388    }
389
390    /// Commit a version - moves from uncommitted slot to sorted committed list
391    ///
392    /// O(log v) - inserts into sorted position using binary search
393    #[inline]
394    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
395        self.inner.commit(txn_id, commit_ts)
396    }
397
398    /// Abort a version (remove uncommitted version for txn)
399    ///
400    /// O(1) - just clears the uncommitted slot if it matches
401    #[inline]
402    pub fn abort(&mut self, txn_id: u64) {
403        self.inner.abort(txn_id);
404    }
405
406    /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
407    ///
408    /// ## Complexity: O(1) + O(log v) = O(log v)
409    #[inline]
410    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
411        self.inner.read_at(snapshot_ts, current_txn_id)
412    }
413
414    /// Check if there's an uncommitted version by another transaction
415    ///
416    /// O(1) - just checks the uncommitted slot
417    #[inline]
418    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
419        self.inner.has_write_conflict(my_txn_id)
420    }
421
422    /// Garbage collect old versions
423    pub fn gc(&mut self, min_active_ts: u64) {
424        self.inner.gc_by_ts(min_active_ts);
425    }
426
427    /// Get total version count (committed + uncommitted)
428    #[inline]
429    pub fn version_count(&self) -> usize {
430        self.inner.version_count()
431    }
432
433    // Legacy compatibility: get versions vec (for tests)
434    #[cfg(test)]
435    pub fn versions(&self) -> Vec<Version> {
436        let mut result = self.inner.committed_versions().to_vec();
437        if let Some(v) = self.inner.uncommitted() {
438            result.push(v.clone());
439        }
440        result
441    }
442}
443
444// =============================================================================
445// Rec 6: Unified Version Chain Trait Implementations
446// =============================================================================
447
448impl MvccVersionChain for VersionChain {
449    type Value = Option<Vec<u8>>;
450
451    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
452        // Delegate to BinarySearchChain, then project to value field
453        self.inner
454            .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
455            .map(|v| &v.value)
456    }
457
458    fn get_latest(&self) -> Option<&Self::Value> {
459        self.inner.latest().map(|v| &v.value)
460    }
461
462    fn version_count(&self) -> usize {
463        self.inner.version_count()
464    }
465}
466
467impl MvccVersionChainMut for VersionChain {
468    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
469        self.add_uncommitted(value, txn_id);
470    }
471
472    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
473        self.inner.commit(txn_id, commit_ts)
474    }
475
476    fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
477        // Insert a tombstone (None value) as uncommitted
478        self.add_uncommitted(None, txn_id);
479        true
480    }
481
482    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
483        let before = self.inner.committed_count();
484        self.inner.gc_by_ts(min_visible_ts);
485        let removed = before - self.inner.committed_count();
486        (removed, removed * std::mem::size_of::<Version>())
487    }
488}
489
490impl WriteConflictDetection for VersionChain {
491    fn has_write_conflict(&self, txn_id: TxnId) -> bool {
492        self.has_write_conflict(txn_id)
493    }
494}
495
496// =============================================================================
497// Pre-sizing Constants to Avoid HashSet Resize Overhead
498// =============================================================================
499
500/// Default capacity for write_set HashSet
501/// Sized for typical OLTP transactions (10-50 keys)
502/// Avoids resize overhead that caused +11% regression
503const WRITE_SET_INITIAL_CAPACITY: usize = 32;
504
505/// Default capacity for read_set HashSet  
506/// Typically larger than write_set due to read-heavy patterns
507const READ_SET_INITIAL_CAPACITY: usize = 64;
508
509/// Transaction state for MVCC
510#[derive(Debug, Clone)]
511pub struct MvccTransaction {
512    /// Transaction ID
513    pub txn_id: u64,
514    /// Snapshot timestamp (reads see commits before this)
515    pub snapshot_ts: u64,
516    /// Keys written by this transaction - uses SmallVec for inline storage
517    /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
518    pub write_set: HashSet<InlineKey>,
519    /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
520    /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
521    pub read_set: HashSet<InlineKey>,
522    /// Bloom filter for write set - fast SSI pre-filtering
523    pub write_bloom: SsiBloomFilter,
524    /// Bloom filter for read set - fast SSI pre-filtering
525    pub read_bloom: SsiBloomFilter,
526    /// Transaction state
527    pub state: TxnState,
528    /// Transaction mode for SSI optimization (Recommendation 9)
529    /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
530    pub mode: TransactionMode,
531}
532
533impl MvccTransaction {
534    /// Create a new transaction with pre-sized collections
535    ///
536    /// This avoids HashSet resize overhead during the transaction
537    /// which was causing +11% regression on write_set.insert().
538    #[inline]
539    pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
540        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
541    }
542
543    /// Create a read-only transaction (SSI bypass - 2.6x faster)
544    ///
545    /// Read-only transactions skip all SSI tracking:
546    /// - No read_set allocation
547    /// - No read_bloom allocation  
548    /// - No commit validation
549    ///
550    /// ## Performance
551    ///
552    /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
553    #[inline]
554    pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
555        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
556    }
557
558    /// Create a write-only transaction (partial SSI bypass)
559    ///
560    /// Write-only transactions skip read tracking:
561    /// - No read_set tracking
562    /// - No read_bloom inserts
563    /// - Still needs write_set for commit
564    #[inline]
565    pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
566        Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
567    }
568
569    /// Create transaction with specific mode
570    #[inline]
571    pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
572        // Optimize allocation based on mode
573        let (write_capacity, read_capacity) = match mode {
574            TransactionMode::ReadOnly => (0, 0), // No tracking needed
575            TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
576            TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
577        };
578        Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
579    }
580
581    /// Create with custom capacities for expected workload
582    ///
583    /// Use this when you know the transaction will write many keys
584    /// to avoid resize overhead entirely.
585    #[inline]
586    pub fn with_capacity(
587        txn_id: u64,
588        snapshot_ts: u64,
589        write_capacity: usize,
590        read_capacity: usize,
591        mode: TransactionMode,
592    ) -> Self {
593        Self {
594            txn_id,
595            snapshot_ts,
596            write_set: HashSet::with_capacity(write_capacity),
597            read_set: HashSet::with_capacity(read_capacity),
598            write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
599            read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
600            state: TxnState::Active,
601            mode,
602        }
603    }
604
605    /// Check if this is a read-only transaction
606    #[inline]
607    pub fn is_read_only(&self) -> bool {
608        self.write_set.is_empty()
609    }
610
611    /// Check if this is a single-key write transaction
612    #[inline]
613    pub fn is_single_key_write(&self) -> bool {
614        self.write_set.len() == 1 && self.read_set.len() <= 1
615    }
616}
617
618/// Transaction state
619#[derive(Debug, Clone, Copy, PartialEq, Eq)]
620pub enum TxnState {
621    Active,
622    Committed,
623    Aborted,
624}
625
626// =============================================================================
627// Transaction Mode for SSI Bypass (Recommendation 9)
628// =============================================================================
629
630/// Transaction mode for SSI optimization
631///
632/// By classifying transactions at begin time, we can skip expensive SSI
633/// tracking for the majority of transactions:
634///
635/// | Mode      | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
636/// |-----------|-------------------|--------------------|-----------------|
637/// | ReadOnly  | None             | None               | ~10 ns          |
638/// | WriteOnly | None             | Full               | ~30 ns          |
639/// | ReadWrite | Full             | Full               | ~50 ns          |
640///
641/// ## Performance Analysis
642///
643/// For read-only transactions (typically 90% of workload):
644/// ```text
645/// Current:  T_txn = T_begin + N × (T_read + T_record) + T_commit
646///                 = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
647///
648/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
649///                 = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
650///
651/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
652/// ```
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
654pub enum TransactionMode {
655    /// Read-only transaction - skips ALL SSI tracking
656    /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
657    /// Safe to skip read_set, read_bloom, and commit validation entirely
658    ReadOnly,
659
660    /// Write-only transaction - skips read tracking
661    /// Cannot form incoming rw-edges (no reads from concurrent writers)
662    /// Only needs write_set and write_bloom tracking
663    WriteOnly,
664
665    /// Full read-write transaction (default) - complete SSI tracking
666    /// May form both incoming and outgoing rw-edges
667    /// Requires full validation at commit time
668    #[default]
669    ReadWrite,
670}
671
672impl TransactionMode {
673    /// Check if this mode requires read tracking
674    #[inline]
675    pub fn tracks_reads(&self) -> bool {
676        matches!(self, TransactionMode::ReadWrite)
677    }
678
679    /// Check if this mode requires write tracking
680    #[inline]
681    pub fn tracks_writes(&self) -> bool {
682        matches!(
683            self,
684            TransactionMode::WriteOnly | TransactionMode::ReadWrite
685        )
686    }
687
688    /// Check if commit needs SSI validation
689    #[inline]
690    pub fn needs_ssi_validation(&self) -> bool {
691        matches!(self, TransactionMode::ReadWrite)
692    }
693}
694
695/// SSI conflict edge type
696#[derive(Debug, Clone, Copy, PartialEq, Eq)]
697pub enum ConflictType {
698    /// Read-write conflict: T1 reads X, then T2 writes X
699    ReadWrite,
700    /// Write-read conflict: T1 writes X, then T2 reads X  
701    WriteRead,
702}
703
704/// SSI conflict edge for dangerous structure detection
705#[derive(Debug, Clone)]
706pub struct ConflictEdge {
707    /// Source transaction
708    pub from_txn: u64,
709    /// Target transaction
710    pub to_txn: u64,
711    /// Type of conflict
712    pub conflict_type: ConflictType,
713}
714
715/// MVCC Manager with SSI support
716///
717/// Uses DashMap for lock-free per-transaction access.
718/// Implements Serializable Snapshot Isolation (SSI) with
719/// dangerous structure detection for rw-antidependency cycles.
720#[allow(clippy::type_complexity)]
721pub struct MvccManager {
722    /// Active transactions (sharded for concurrent access)
723    active_txns: DashMap<u64, MvccTransaction>,
724    /// Current timestamp counter
725    ts_counter: AtomicU64,
726    /// Minimum active snapshot timestamp (for GC)
727    min_active_ts: AtomicU64,
728    /// Recently committed transactions for SSI validation
729    /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
730    /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
731    recent_commits: DashMap<
732        u64,
733        (
734            u64,
735            SsiBloomFilter,
736            SsiBloomFilter,
737            HashSet<InlineKey>,
738            HashSet<InlineKey>,
739        ),
740    >,
741    /// Max recent commits to track
742    max_recent_commits: usize,
743}
744
745impl Default for MvccManager {
746    fn default() -> Self {
747        Self::new()
748    }
749}
750
751impl MvccManager {
752    pub fn new() -> Self {
753        Self {
754            active_txns: DashMap::new(),
755            ts_counter: AtomicU64::new(1),
756            min_active_ts: AtomicU64::new(0),
757            recent_commits: DashMap::new(),
758            max_recent_commits: 1000, // Track last 1000 commits for SSI
759        }
760    }
761
762    /// Begin a new transaction with snapshot isolation
763    ///
764    /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
765    pub fn begin(&self, txn_id: u64) -> MvccTransaction {
766        self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
767    }
768
769    /// Begin a read-only transaction (SSI bypass - 2.6x faster)
770    ///
771    /// Read-only transactions skip all SSI tracking, reducing
772    /// per-read overhead from ~82ns to ~32ns.
773    ///
774    /// ## Safety
775    ///
776    /// Caller must ensure no writes are performed. Attempting to
777    /// write in a read-only transaction will still succeed but
778    /// won't be tracked for SSI validation.
779    #[inline]
780    pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
781        self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
782    }
783
784    /// Begin a write-only transaction (partial SSI bypass)
785    ///
786    /// Write-only transactions skip read tracking, reducing overhead
787    /// for insert-heavy workloads.
788    #[inline]
789    pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
790        self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
791    }
792
793    /// Begin a transaction with specific mode
794    ///
795    /// This is the core transaction creation method that all other
796    /// begin_* methods delegate to.
797    pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
798        let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
799
800        // Create transaction with mode-optimized allocations
801        let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
802
803        self.active_txns.insert(txn_id, txn.clone());
804        self.update_min_active_ts();
805
806        txn
807    }
808
809    /// Get transaction if active (clones - use get_snapshot_ts for hot path)
810    pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
811        self.active_txns.get(&txn_id).map(|t| t.clone())
812    }
813
814    /// Fast path: get just the snapshot timestamp without cloning
815    /// This is the hot path for reads - avoids cloning bloom filters
816    #[inline]
817    pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
818        self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
819    }
820
821    /// Record a read (for SSI) - uses inline key storage + bloom filter
822    ///
823    /// ## SSI Bypass (Recommendation 9)
824    ///
825    /// For ReadOnly mode transactions, this is a no-op (instant return).
826    /// For WriteOnly mode transactions, this is a no-op.
827    /// Only ReadWrite mode transactions track reads for SSI.
828    ///
829    /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
830    #[inline]
831    pub fn record_read(&self, txn_id: u64, key: &[u8]) {
832        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
833            // SSI Bypass: Skip tracking for read-only and write-only modes
834            if !txn.mode.tracks_reads() {
835                return;
836            }
837
838            // Only track reads if within reasonable bounds
839            if txn.read_set.len() < 10000 {
840                txn.read_set.insert(SmallVec::from_slice(key));
841                txn.read_bloom.insert(key);
842            }
843        }
844    }
845
846    /// Record a write - uses inline key storage + bloom filter
847    ///
848    /// Note: Even ReadOnly transactions can record writes (mode is a hint).
849    /// The mode only affects SSI tracking, not write capability.
850    pub fn record_write(&self, txn_id: u64, key: &[u8]) {
851        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
852            txn.write_set.insert(SmallVec::from_slice(key));
853            txn.write_bloom.insert(key);
854        }
855    }
856
857    /// Allocate commit timestamp
858    pub fn alloc_commit_ts(&self) -> u64 {
859        self.ts_counter.fetch_add(1, Ordering::SeqCst)
860    }
861
862    /// Commit transaction with SSI validation
863    /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
864    /// Returns None if SSI validation fails (dangerous structure detected)
865    ///
866    /// ## SSI Bypass (Recommendation 9)
867    ///
868    /// For ReadOnly mode: Skip validation entirely (~10ns commit)
869    /// For WriteOnly mode: Skip read-based validation (~30ns commit)
870    /// For ReadWrite mode: Full validation (~50ns commit)
871    pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
872        // Get transaction before removing
873        let txn = self.active_txns.get(&txn_id)?.clone();
874
875        // SSI Bypass: Skip validation for ReadOnly transactions
876        // ReadOnly can never form rw-antidependency cycles
877        if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
878            // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
879            // For ReadWrite: check SSI validation result
880            if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
881                // Abort on SSI violation
882                self.active_txns.remove(&txn_id);
883                self.update_min_active_ts();
884                return None;
885            }
886        }
887
888        let commit_ts = self.alloc_commit_ts();
889
890        // Extract write_set and remove transaction - takes ownership
891        let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
892
893        // OPTIMIZATION: Only track ReadWrite transactions for SSI
894        // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
895        let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
896            && !removed_txn.read_set.is_empty()
897            && !removed_txn.write_set.is_empty();
898
899        if needs_ssi_tracking {
900            // Need to clone write_set since we return it AND track it
901            let write_set_for_return = removed_txn.write_set.clone();
902
903            self.track_commit_owned(
904                txn_id,
905                commit_ts,
906                removed_txn.read_bloom,
907                removed_txn.write_bloom,
908                removed_txn.read_set,
909                removed_txn.write_set,
910            );
911
912            self.update_min_active_ts();
913            Some((commit_ts, write_set_for_return))
914        } else {
915            // Fast path: no SSI tracking needed, avoid clone entirely
916            self.update_min_active_ts();
917            Some((commit_ts, removed_txn.write_set))
918        }
919    }
920
921    /// Validate SSI constraints for a committing transaction
922    ///
923    /// ## Transaction Classification (Task 3: Optimistic MVCC)
924    ///
925    /// Transactions are classified and routed through appropriate fast paths:
926    ///
927    /// | Class      | Criteria                      | Validation Cost |
928    /// |------------|-------------------------------|-----------------|
929    /// | ReadOnly   | write_set.is_empty()          | 0 ns           |
930    /// | SingleKey  | write_set.len() == 1          | 0 ns           |
931    /// | Disjoint   | bloom filters don't intersect | ~10 ns         |
932    /// | General    | full SSI check                | ~50 ns         |
933    ///
934    /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
935    /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
936    ///
937    /// Detects "dangerous structures" - rw-antidependency cycles:
938    /// - T1 reads X (snapshot sees old value)
939    /// - T2 writes X (concurrent write)  
940    /// - T2 reads Y (snapshot sees old value)
941    /// - T1 writes Y (concurrent write)
942    ///
943    /// If T1 → rw → T2 → rw → T1 exists, abort T1
944    #[inline]
945    fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
946        // =================================================================
947        // Fast Path 1: Read-only transactions (0 ns)
948        // =================================================================
949        // Read-only transactions can never form rw-antidependency cycles
950        // because they have no writes to create outgoing rw-edges
951        if txn.write_set.is_empty() {
952            return true;
953        }
954
955        // =================================================================
956        // Fast Path 2: No recent commits to check (0 ns)
957        // =================================================================
958        if self.recent_commits.is_empty() {
959            return true;
960        }
961
962        // =================================================================
963        // Fast Path 3: Single-key write transactions (0 ns)
964        // =================================================================
965        // A single-key write transaction cannot form a dangerous cycle:
966        // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
967        //   AND T2 to read what T1 wrote
968        // - With only one key in write_set, the same key would need to be
969        //   in both read_set AND write_set of both transactions
970        // - This is already prevented by our conflict detection (write-write)
971        if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
972            return true;
973        }
974
975        let my_snapshot = txn.snapshot_ts;
976
977        // =================================================================
978        // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
979        // =================================================================
980        // Pre-filter using bloom filters: if our write_bloom doesn't intersect
981        // with any concurrent transaction's read_bloom AND vice versa,
982        // there can be no rw-antidependency
983        let mut any_may_intersect = false;
984        for entry in self.recent_commits.iter() {
985            let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
986
987            // Only check concurrent transactions.
988            //
989            // A committed transaction is *concurrent* with us iff its writes are
990            // invisible to our snapshot. Read visibility is strict
991            // (`commit_ts < snapshot_ts`), so a transaction with
992            // `commit_ts >= my_snapshot` is invisible and must be validated
993            // against. Using `<` here (not `<=`) keeps this window consistent
994            // with `read_at`; a `<=` would skip a boundary transaction whose
995            // `commit_ts == my_snapshot` and miss a genuine write-skew.
996            if *other_commit_ts < my_snapshot {
997                continue;
998            }
999
1000            // Check bloom filter intersection (O(m/64) per filter)
1001            // If our writes may intersect their reads OR their writes may intersect our reads
1002            if txn.write_bloom.may_intersect(other_read_bloom)
1003                || other_write_bloom.may_intersect(&txn.read_bloom)
1004            {
1005                any_may_intersect = true;
1006                break;
1007            }
1008        }
1009
1010        // No bloom intersection means definitely disjoint - no SSI conflict possible
1011        if !any_may_intersect {
1012            return true;
1013        }
1014
1015        // =================================================================
1016        // Full SSI Validation (~50 ns)
1017        // =================================================================
1018        // Check for rw-conflicts with recently committed transactions
1019        // An rw-conflict exists if:
1020        // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
1021        // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
1022
1023        let mut in_conflict_with: Vec<u64> = Vec::new();
1024        let mut out_conflict_to: Vec<u64> = Vec::new();
1025
1026        for entry in self.recent_commits.iter() {
1027            let (
1028                other_txn_id,
1029                (
1030                    other_commit_ts,
1031                    _other_read_bloom,
1032                    other_write_bloom,
1033                    other_read_set,
1034                    other_write_set,
1035                ),
1036            ) = entry.pair();
1037
1038            // Only consider transactions concurrent with us: those whose writes
1039            // are invisible to our snapshot. Read visibility is strict
1040            // (`commit_ts < snapshot_ts`), so `commit_ts >= my_snapshot` means
1041            // concurrent. `<` (not `<=`) keeps this consistent with `read_at`.
1042            if *other_commit_ts < my_snapshot {
1043                continue;
1044            }
1045
1046            // Check: other wrote → we read (other →rw→ me)
1047            // T_other wrote a key that T_me read (rw-dependency inbound)
1048            //
1049            // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
1050            // Only do expensive HashSet intersection if bloom says "maybe conflict"
1051            let mut has_in_conflict = false;
1052            for key in txn.read_set.iter() {
1053                if other_write_bloom.may_contain(key) {
1054                    // Bloom says maybe - do exact check
1055                    if other_write_set.contains(key) {
1056                        has_in_conflict = true;
1057                        break;
1058                    }
1059                }
1060            }
1061            if has_in_conflict {
1062                in_conflict_with.push(*other_txn_id);
1063            }
1064
1065            // Check: we wrote → other read (me →rw→ other)
1066            // T_me wrote a key that T_other read (rw-dependency outbound)
1067            //
1068            // Bloom-accelerated: Use our write_bloom against their read_set
1069            let mut has_out_conflict = false;
1070            for key in other_read_set.iter() {
1071                if txn.write_bloom.may_contain(key) {
1072                    // Bloom says maybe - do exact check
1073                    if txn.write_set.contains(key) {
1074                        has_out_conflict = true;
1075                        break;
1076                    }
1077                }
1078            }
1079            if has_out_conflict {
1080                out_conflict_to.push(*other_txn_id);
1081            }
1082        }
1083
1084        // Dangerous structure: we have both incoming AND outgoing rw-edges
1085        // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1086        //
1087        // Conservative check: if both exist, abort
1088        // A more precise check would verify the cycle path, but this is safe
1089        if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1090            return false; // SSI violation - abort
1091        }
1092
1093        true
1094    }
1095
1096    /// Track a committed transaction for future SSI validation
1097    ///
1098    /// Only tracks transactions that have both reads AND writes, since SSI
1099    /// only detects rw-antidependency cycles. Pure read or pure write
1100    /// transactions can't form cycles.
1101    ///
1102    /// ## Optimization: Zero-Copy Transfer
1103    ///
1104    /// Takes ownership of sets instead of cloning to avoid the +15% commit
1105    /// phase regression. The caller should use mem::take() to transfer ownership.
1106    fn track_commit_owned(
1107        &self,
1108        txn_id: u64,
1109        commit_ts: u64,
1110        read_bloom: SsiBloomFilter,
1111        write_bloom: SsiBloomFilter,
1112        read_set: HashSet<InlineKey>,
1113        write_set: HashSet<InlineKey>,
1114    ) {
1115        // Optimization: Only track mixed read-write transactions
1116        // Pure reads can't create outgoing rw-edges
1117        // Pure writes can't create incoming rw-edges
1118        if read_set.is_empty() || write_set.is_empty() {
1119            return; // Skip tracking - can't form SSI cycle
1120        }
1121
1122        // Add to recent commits with bloom filters for fast SSI pre-filtering
1123        // No cloning needed - we take ownership
1124        self.recent_commits.insert(
1125            txn_id,
1126            (commit_ts, read_bloom, write_bloom, read_set, write_set),
1127        );
1128
1129        // Lazy pruning: only prune when we're significantly over capacity
1130        // Avoids pruning overhead on every commit
1131        if self.recent_commits.len() > self.max_recent_commits * 2 {
1132            // Remove entries with lowest commit_ts
1133            let min_active = self.min_active_ts.load(Ordering::Relaxed);
1134            self.recent_commits
1135                .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1136        }
1137    }
1138
1139    /// Legacy track_commit that clones - kept for compatibility
1140    #[allow(dead_code)]
1141    fn track_commit(
1142        &self,
1143        txn_id: u64,
1144        commit_ts: u64,
1145        read_bloom: SsiBloomFilter,
1146        write_bloom: SsiBloomFilter,
1147        read_set: &HashSet<InlineKey>,
1148        write_set: &HashSet<InlineKey>,
1149    ) {
1150        if read_set.is_empty() || write_set.is_empty() {
1151            return;
1152        }
1153        self.recent_commits.insert(
1154            txn_id,
1155            (
1156                commit_ts,
1157                read_bloom,
1158                write_bloom,
1159                read_set.clone(),
1160                write_set.clone(),
1161            ),
1162        );
1163    }
1164
1165    /// Abort transaction
1166    pub fn abort(&self, txn_id: u64) {
1167        self.active_txns.remove(&txn_id);
1168        self.update_min_active_ts();
1169    }
1170
1171    /// Get minimum active snapshot timestamp
1172    pub fn min_active_snapshot(&self) -> u64 {
1173        self.min_active_ts.load(Ordering::SeqCst)
1174    }
1175
1176    /// Get count of active transactions
1177    pub fn active_transaction_count(&self) -> usize {
1178        self.active_txns.len()
1179    }
1180
1181    fn update_min_active_ts(&self) {
1182        let min = self
1183            .active_txns
1184            .iter()
1185            .map(|entry| entry.value().snapshot_ts)
1186            .min()
1187            .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1188        self.min_active_ts.store(min, Ordering::SeqCst);
1189    }
1190}
1191
1192/// Epoch-based dirty list for O(expired) GC instead of O(n)
1193///
1194/// Instead of scanning ALL version chains, we track which keys have versions
1195/// created in each epoch. GC only needs to visit keys from old epochs.
1196struct EpochDirtyList {
1197    /// Ring buffer of epoch -> dirty keys
1198    /// Index = epoch % EPOCH_RING_SIZE
1199    epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1200    /// Current epoch
1201    current_epoch: AtomicU64,
1202}
1203
1204const EPOCH_RING_SIZE: usize = 4;
1205
1206impl EpochDirtyList {
1207    fn new() -> Self {
1208        Self {
1209            epochs: [
1210                parking_lot::Mutex::new(Vec::new()),
1211                parking_lot::Mutex::new(Vec::new()),
1212                parking_lot::Mutex::new(Vec::new()),
1213                parking_lot::Mutex::new(Vec::new()),
1214            ],
1215            current_epoch: AtomicU64::new(0),
1216        }
1217    }
1218
1219    /// Record a version created in the current epoch
1220    #[inline]
1221    fn record_version(&self, key: Vec<u8>) {
1222        let epoch = self.current_epoch.load(Ordering::Relaxed);
1223        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1224        self.epochs[idx].lock().push(key);
1225    }
1226
1227    /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1228    ///
1229    /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1230    /// For 100 writes: ~100x fewer mutex operations.
1231    #[inline]
1232    fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1233        let epoch = self.current_epoch.load(Ordering::Relaxed);
1234        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1235        let mut guard = self.epochs[idx].lock();
1236        guard.extend(keys);
1237    }
1238
1239    /// Advance to next epoch, returning old epoch's dirty keys
1240    fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1241        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1242        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1243
1244        // Drain the old epoch's dirty list
1245        let mut guard = self.epochs[old_idx].lock();
1246        let keys = std::mem::take(&mut *guard);
1247        (old_epoch, keys)
1248    }
1249
1250    /// Get current epoch
1251    #[allow(dead_code)]
1252    fn current(&self) -> u64 {
1253        self.current_epoch.load(Ordering::Relaxed)
1254    }
1255}
1256
1257// ============================================================================
1258// Streaming Scan Iterator
1259// ============================================================================
1260
1261/// Streaming iterator for range scans
1262///
1263/// Yields results one at a time without materializing the full result set.
1264/// This enables processing of very large result sets with O(1) memory per
1265/// iteration instead of O(N) for the entire result set.
1266struct ScanRangeIterator<'a> {
1267    memtable: &'a MvccMemTable,
1268    start: Vec<u8>,
1269    end: Vec<u8>,
1270    snapshot_ts: u64,
1271    current_txn_id: Option<u64>,
1272    use_ordered: bool,
1273    // We use Option to defer initialization
1274    ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1275    unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1276    initialized: bool,
1277}
1278
1279impl<'a> Iterator for ScanRangeIterator<'a> {
1280    type Item = (Vec<u8>, Vec<u8>);
1281
1282    fn next(&mut self) -> Option<Self::Item> {
1283        // Lazy initialization on first call
1284        if !self.initialized {
1285            self.initialized = true;
1286
1287            if self.use_ordered {
1288                // Try deferred index first (after compaction, it uses a SkipMap internally)
1289                if let Some(ref def_idx) = self.memtable.deferred_index {
1290                    let start = self.start.clone();
1291                    let end = self.end.clone();
1292                    let snapshot_ts = self.snapshot_ts;
1293                    let current_txn_id = self.current_txn_id;
1294                    let data = &self.memtable.data;
1295
1296                    // Collect keys from deferred index (already sorted after compact)
1297                    let keys: Vec<Vec<u8>> = if end.is_empty() {
1298                        def_idx.range_from(&start).collect()
1299                    } else {
1300                        def_idx.range(&start, &end).collect()
1301                    };
1302
1303                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1304                        Box::new(keys.into_iter().filter_map(move |key| {
1305                            if let Some(chain) = data.get(&key)
1306                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1307                                && let Some(value) = &v.value
1308                            {
1309                                Some((key, value.clone()))
1310                            } else {
1311                                None
1312                            }
1313                        }));
1314                    self.ordered_iter = Some(iter);
1315                } else if let Some(ref idx) = self.memtable.ordered_index {
1316                    let start = self.start.clone();
1317                    let end = self.end.clone();
1318                    let snapshot_ts = self.snapshot_ts;
1319                    let current_txn_id = self.current_txn_id;
1320                    let data = &self.memtable.data;
1321
1322                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty()
1323                    {
1324                        Box::new(idx.range(start..).filter_map(move |entry| {
1325                            let key = entry.key();
1326                            if let Some(chain) = data.get(key)
1327                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1328                                && let Some(value) = &v.value
1329                            {
1330                                Some((key.clone(), value.clone()))
1331                            } else {
1332                                None
1333                            }
1334                        }))
1335                    } else {
1336                        Box::new(idx.range(start..end).filter_map(move |entry| {
1337                            let key = entry.key();
1338                            if let Some(chain) = data.get(key)
1339                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1340                                && let Some(value) = &v.value
1341                            {
1342                                Some((key.clone(), value.clone()))
1343                            } else {
1344                                None
1345                            }
1346                        }))
1347                    };
1348                    self.ordered_iter = Some(iter);
1349                }
1350            } else {
1351                // Unordered full scan
1352                let start = self.start.clone();
1353                let end = self.end.clone();
1354                let snapshot_ts = self.snapshot_ts;
1355                let current_txn_id = self.current_txn_id;
1356
1357                let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1358                    Box::new(self.memtable.data.iter().filter_map(move |entry| {
1359                        let key = entry.key();
1360
1361                        if key.as_slice() < start.as_slice() {
1362                            return None;
1363                        }
1364                        if !end.is_empty() && key.as_slice() >= end.as_slice() {
1365                            return None;
1366                        }
1367
1368                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1369                            && let Some(value) = &v.value
1370                        {
1371                            Some((key.clone(), value.clone()))
1372                        } else {
1373                            None
1374                        }
1375                    }));
1376                self.unordered_iter = Some(iter);
1377            }
1378        }
1379
1380        // Get next from appropriate iterator
1381        if let Some(ref mut iter) = self.ordered_iter {
1382            iter.next()
1383        } else if let Some(ref mut iter) = self.unordered_iter {
1384            iter.next()
1385        } else {
1386            None
1387        }
1388    }
1389}
1390
1391/// MemTable with MVCC support
1392///
1393/// Uses DashMap for lock-free concurrent access per key.
1394/// This eliminates the global write lock bottleneck.
1395///
1396/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1397/// Maintains a deferred sorted index for efficient scans:
1398/// - Writes: O(1) append to hot buffer
1399/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1400pub struct MvccMemTable {
1401    /// Key -> VersionChain (sharded for concurrent access)
1402    data: DashMap<Vec<u8>, VersionChain>,
1403    /// Deferred sorted index for efficient prefix/range scans (optional)
1404    /// O(1) insert to hot buffer, O(N log N) sort on first scan
1405    /// When None, scan_prefix will fall back to O(N) DashMap iteration
1406    deferred_index: Option<DeferredSortedIndex>,
1407    /// Legacy SkipMap for compatibility (used when deferred=false)
1408    ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1409    /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1410    #[allow(dead_code)]
1411    use_deferred: bool,
1412    /// Approximate size in bytes
1413    size_bytes: AtomicU64,
1414    /// Epoch-based dirty list for efficient GC
1415    dirty_list: EpochDirtyList,
1416}
1417
1418impl Default for MvccMemTable {
1419    fn default() -> Self {
1420        Self::new()
1421    }
1422}
1423
1424impl MvccMemTable {
1425    pub fn new() -> Self {
1426        Self::with_ordered_index(true)
1427    }
1428
1429    /// Create memtable with optional ordered index
1430    ///
1431    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1432    /// but scan_prefix becomes O(N) instead of O(log N + K)
1433    ///
1434    /// Uses deferred sorting by default for better write performance:
1435    /// - Writes: O(1) append to hot buffer
1436    /// - Scans: O(N log N) sort-on-demand
1437    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1438        Self::with_index_mode(enable_ordered_index, true)
1439    }
1440
1441    /// Create memtable with fine-grained control over indexing
1442    ///
1443    /// # Arguments
1444    /// * `enable_ordered_index` - Whether to maintain an ordered index
1445    /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1446    ///                    If false, use SkipMap (O(log N) writes)
1447    pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1448        Self {
1449            data: DashMap::new(),
1450            deferred_index: if enable_ordered_index && use_deferred {
1451                Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1452                    max_unsorted_entries: 10_000, // Compact every 10K writes
1453                    enabled: true,
1454                }))
1455            } else {
1456                None
1457            },
1458            ordered_index: if enable_ordered_index && !use_deferred {
1459                Some(SkipMap::new())
1460            } else {
1461                None
1462            },
1463            use_deferred,
1464            size_bytes: AtomicU64::new(0),
1465            dirty_list: EpochDirtyList::new(),
1466        }
1467    }
1468
1469    /// Write a key-value pair (uncommitted)
1470    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1471        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1472        let key_len = key.len();
1473
1474        // Track this key in the current epoch's dirty list for GC
1475        self.dirty_list.record_version(key.clone());
1476
1477        // Insert into ordered index for prefix scans (if enabled)
1478        // Deferred: O(1) append to hot buffer
1479        // SkipMap: O(log N) insert
1480        if let Some(ref idx) = self.deferred_index {
1481            idx.insert(key.clone());
1482        } else if let Some(ref idx) = self.ordered_index {
1483            idx.insert(key.clone(), ());
1484        }
1485
1486        // Use entry API for atomic get-or-insert
1487        let mut entry = self.data.entry(key).or_default();
1488
1489        // Check for write-write conflict
1490        if entry.has_write_conflict(txn_id) {
1491            return Err(SochDBError::Internal(
1492                "Write-write conflict detected".into(),
1493            ));
1494        }
1495        entry.add_uncommitted(value, txn_id);
1496        self.size_bytes
1497            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1498
1499        Ok(())
1500    }
1501
1502    /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1503    ///
1504    /// Optimizations applied (Rec 3: MVCC Batching):
1505    /// - Batched dirty list tracking: single lock acquire for all keys
1506    /// - Deferred index: O(1) append per key
1507    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1508        let mut total_size = 0u64;
1509
1510        // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1511        self.dirty_list
1512            .record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1513
1514        for (key, value) in writes {
1515            // Insert into ordered index (if enabled)
1516            // Deferred: O(1) append, SkipMap: O(log N)
1517            if let Some(ref idx) = self.deferred_index {
1518                idx.insert(key.clone());
1519            } else if let Some(ref idx) = self.ordered_index {
1520                idx.insert(key.clone(), ());
1521            }
1522
1523            let mut entry = self.data.entry(key.clone()).or_default();
1524
1525            if entry.has_write_conflict(txn_id) {
1526                return Err(SochDBError::Internal(
1527                    "Write-write conflict detected".into(),
1528                ));
1529            }
1530
1531            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1532            entry.add_uncommitted(value.clone(), txn_id);
1533            total_size += (key.len() + value_size) as u64;
1534        }
1535
1536        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1537        Ok(())
1538    }
1539
1540    /// Read at snapshot timestamp, with optional current txn to see own writes
1541    pub fn read(
1542        &self,
1543        key: &[u8],
1544        snapshot_ts: u64,
1545        current_txn_id: Option<u64>,
1546    ) -> Option<Vec<u8>> {
1547        self.data.get(key).and_then(|chain| {
1548            chain
1549                .read_at(snapshot_ts, current_txn_id)
1550                .and_then(|v| v.value.clone())
1551        })
1552    }
1553
1554    /// Commit all versions for a transaction
1555    ///
1556    /// Only updates the keys that were written by this transaction (tracked in write_set).
1557    /// Accepts InlineKey for zero-allocation MVCC tracking.
1558    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1559        // Only iterate over keys we know were written - O(k) instead of O(n)
1560        for key in write_set {
1561            if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1562                chain.commit(txn_id, commit_ts);
1563            }
1564        }
1565    }
1566
1567    /// Legacy commit method (iterates all keys) - kept for backward compatibility
1568    #[allow(dead_code)]
1569    pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1570        for mut entry in self.data.iter_mut() {
1571            entry.value_mut().commit(txn_id, commit_ts);
1572        }
1573    }
1574
1575    /// Abort all versions for a transaction
1576    pub fn abort(&self, txn_id: u64) {
1577        for mut entry in self.data.iter_mut() {
1578            entry.value_mut().abort(txn_id);
1579        }
1580    }
1581
1582    /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1583    ///
1584    /// ## Performance
1585    ///
1586    /// When ordered_index is enabled: O(log N + K) complexity
1587    /// - O(log N) to seek to the first key with prefix
1588    /// - O(K) to iterate matching keys
1589    ///
1590    /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1591    ///
1592    /// ## Optimizations Applied
1593    ///
1594    /// - Pre-allocates result vector based on expected output size
1595    /// - Uses batch-friendly iteration patterns
1596    /// - Minimizes allocations during iteration
1597    /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1598    pub fn scan_prefix(
1599        &self,
1600        prefix: &[u8],
1601        snapshot_ts: u64,
1602        current_txn_id: Option<u64>,
1603    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1604        // Estimate result size for pre-allocation (use 10% of total as heuristic)
1605        let estimated_size = (self.data.len() / 10).max(64);
1606        let mut results = Vec::with_capacity(estimated_size);
1607
1608        if let Some(ref idx) = self.deferred_index {
1609            // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1610            for key in idx.range_from(prefix) {
1611                // Stop when we've passed the prefix range
1612                if !key.starts_with(prefix) {
1613                    break;
1614                }
1615
1616                // O(1) lookup in DashMap for version chain
1617                if let Some(chain) = self.data.get(&key)
1618                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1619                    && let Some(value) = &v.value
1620                {
1621                    results.push((key, value.clone()));
1622                }
1623            }
1624        } else if let Some(ref idx) = self.ordered_index {
1625            // Fast path: O(log N) seek to first key >= prefix
1626            for entry in idx.range(prefix.to_vec()..) {
1627                let key = entry.key();
1628
1629                // Stop when we've passed the prefix range
1630                if !key.starts_with(prefix) {
1631                    break;
1632                }
1633
1634                // O(1) lookup in DashMap for version chain
1635                if let Some(chain) = self.data.get(key)
1636                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1637                    && let Some(value) = &v.value
1638                {
1639                    results.push((key.clone(), value.clone()));
1640                }
1641            }
1642        } else {
1643            // Fallback: O(N) full DashMap scan when ordered_index is disabled
1644            // Optimized with batch-friendly iteration
1645            for entry in self.data.iter() {
1646                let key = entry.key();
1647                if !key.starts_with(prefix) {
1648                    continue;
1649                }
1650                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1651                    && let Some(value) = &v.value
1652                {
1653                    results.push((key.clone(), value.clone()));
1654                }
1655            }
1656        }
1657
1658        results
1659    }
1660
1661    /// Optimized full scan with batch allocation
1662    ///
1663    /// For use when scanning entire tables/namespaces.
1664    /// Pre-allocates based on actual data size.
1665    pub fn scan_all(
1666        &self,
1667        snapshot_ts: u64,
1668        current_txn_id: Option<u64>,
1669    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1670        let mut results = Vec::with_capacity(self.data.len());
1671
1672        for entry in self.data.iter() {
1673            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1674                && let Some(value) = &v.value
1675            {
1676                results.push((entry.key().clone(), value.clone()));
1677            }
1678        }
1679
1680        results
1681    }
1682
1683    /// Streaming scan iterator for very large datasets
1684    ///
1685    /// Returns an iterator that yields (key, value) pairs without
1686    /// materializing the entire result set in memory.
1687    pub fn scan_prefix_iter<'a>(
1688        &'a self,
1689        prefix: &'a [u8],
1690        snapshot_ts: u64,
1691        current_txn_id: Option<u64>,
1692    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1693        self.data.iter().filter_map(move |entry| {
1694            let key = entry.key();
1695            if !key.starts_with(prefix) {
1696                return None;
1697            }
1698            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1699                && let Some(value) = &v.value
1700            {
1701                Some((key.clone(), value.clone()))
1702            } else {
1703                None
1704            }
1705        })
1706    }
1707
1708    /// Scan range
1709    pub fn scan_range(
1710        &self,
1711        start: &[u8],
1712        end: &[u8],
1713        snapshot_ts: u64,
1714        current_txn_id: Option<u64>,
1715    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1716        let mut results = Vec::new();
1717
1718        if let Some(ref idx) = self.deferred_index {
1719            // Deferred index path: sort-on-scan
1720            if end.is_empty() {
1721                for key in idx.range_from(start) {
1722                    if let Some(chain) = self.data.get(&key)
1723                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1724                        && let Some(value) = &v.value
1725                    {
1726                        results.push((key, value.clone()));
1727                    }
1728                }
1729            } else {
1730                for key in idx.range(start, end) {
1731                    if let Some(chain) = self.data.get(&key)
1732                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1733                        && let Some(value) = &v.value
1734                    {
1735                        results.push((key, value.clone()));
1736                    }
1737                }
1738            }
1739        } else if let Some(ref idx) = self.ordered_index {
1740            // Use range scan on SkipMap
1741            if end.is_empty() {
1742                // Unbounded end
1743                for entry in idx.range(start.to_vec()..) {
1744                    let key = entry.key();
1745                    if let Some(chain) = self.data.get(key)
1746                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1747                        && let Some(value) = &v.value
1748                    {
1749                        results.push((key.clone(), value.clone()));
1750                    }
1751                }
1752            } else {
1753                for entry in idx.range(start.to_vec()..end.to_vec()) {
1754                    let key = entry.key();
1755                    if let Some(chain) = self.data.get(key)
1756                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1757                        && let Some(value) = &v.value
1758                    {
1759                        results.push((key.clone(), value.clone()));
1760                    }
1761                }
1762            }
1763        } else {
1764            // Fallback to full scan if no ordered index
1765            for entry in self.data.iter() {
1766                let key = entry.key();
1767
1768                if key.as_slice() < start {
1769                    continue;
1770                }
1771                if !end.is_empty() && key.as_slice() >= end {
1772                    continue;
1773                }
1774
1775                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1776                    && let Some(value) = &v.value
1777                {
1778                    results.push((key.clone(), value.clone()));
1779                }
1780            }
1781        }
1782
1783        results
1784    }
1785
1786    /// Streaming range scan iterator for very large datasets
1787    ///
1788    /// Returns an iterator that yields (key, value) pairs without
1789    /// materializing the entire result set in memory. Uses the ordered
1790    /// index when available for O(log N + K) complexity.
1791    ///
1792    /// ## Zero-Allocation Design
1793    ///
1794    /// While the iterator itself cannot avoid allocations for returned
1795    /// values (since the caller needs ownership), it avoids:
1796    /// - Pre-materializing all results
1797    /// - Intermediate buffers
1798    /// - Repeated key comparisons for already-visited entries
1799    ///
1800    /// ## Usage
1801    ///
1802    /// ```ignore
1803    /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1804    ///     // Process each result as it arrives
1805    ///     // Memory usage is O(1) per iteration, not O(N) total
1806    /// }
1807    /// ```
1808    pub fn scan_range_iter<'a>(
1809        &'a self,
1810        start: &'a [u8],
1811        end: &'a [u8],
1812        snapshot_ts: u64,
1813        current_txn_id: Option<u64>,
1814    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1815        // Compact deferred index before scanning if needed
1816        if let Some(ref idx) = self.deferred_index {
1817            idx.compact();
1818        }
1819
1820        // Use either ordered index or full scan
1821        let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1822
1823        // Create iterator based on availability of ordered index
1824        ScanRangeIterator {
1825            memtable: self,
1826            start: start.to_vec(),
1827            end: end.to_vec(),
1828            snapshot_ts,
1829            current_txn_id,
1830            use_ordered,
1831            ordered_iter: None,
1832            unordered_iter: None,
1833            initialized: false,
1834        }
1835    }
1836
1837    /// Get approximate size
1838    pub fn size(&self) -> u64 {
1839        self.size_bytes.load(Ordering::Relaxed)
1840    }
1841
1842    /// Garbage collect old versions using epoch-based dirty list
1843    ///
1844    /// O(expired_versions) instead of O(all_versions)
1845    /// Only visits keys that had versions created in the old epoch.
1846    pub fn gc(&self, min_active_ts: u64) -> usize {
1847        // Advance epoch and get the dirty keys from the old epoch
1848        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1849
1850        if dirty_keys.is_empty() {
1851            return 0;
1852        }
1853
1854        let mut gc_count = 0;
1855
1856        // Only visit keys that were modified in the old epoch
1857        // Use a HashSet to deduplicate keys that were written multiple times
1858        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1859
1860        for key in unique_keys {
1861            if let Some(mut entry) = self.data.get_mut(&key) {
1862                let before = entry.value().version_count();
1863                entry.value_mut().gc(min_active_ts);
1864                gc_count += before.saturating_sub(entry.value().version_count());
1865            }
1866        }
1867
1868        gc_count
1869    }
1870
1871    /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1872    #[allow(dead_code)]
1873    pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1874        let mut gc_count = 0;
1875
1876        for mut entry in self.data.iter_mut() {
1877            let before = entry.value().version_count();
1878            entry.value_mut().gc(min_active_ts);
1879            gc_count += before.saturating_sub(entry.value().version_count());
1880        }
1881
1882        gc_count
1883    }
1884}
1885
1886// =============================================================================
1887// Rec 11: Unified MvccStore Implementation for MvccMemTable
1888// =============================================================================
1889
1890impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1891    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1892        self.read(key, snapshot_ts, txn_id)
1893    }
1894
1895    fn mvcc_put(
1896        &self,
1897        key: &[u8],
1898        value: Option<Vec<u8>>,
1899        txn_id: u64,
1900    ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1901        let mut entry = self.data.entry(key.to_vec()).or_default();
1902        if entry.has_write_conflict(txn_id) {
1903            return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1904        }
1905        entry.add_uncommitted(value, txn_id);
1906        Ok(())
1907    }
1908
1909    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1910        if let Some(mut chain) = self.data.get_mut(key) {
1911            return chain.commit(txn_id, commit_ts);
1912        }
1913        false
1914    }
1915
1916    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1917        if let Some(mut chain) = self.data.get_mut(key) {
1918            chain.abort(txn_id);
1919        }
1920    }
1921
1922    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1923        self.data
1924            .get(key)
1925            .map(|chain| chain.has_write_conflict(txn_id))
1926            .unwrap_or(false)
1927    }
1928
1929    fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1930        let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1931        for mut entry in self.data.iter_mut() {
1932            stats.keys_scanned += 1;
1933            let before = entry.value().version_count();
1934            entry.value_mut().gc(min_ts);
1935            stats.versions_removed += before.saturating_sub(entry.value().version_count());
1936        }
1937        stats
1938    }
1939
1940    fn mvcc_key_count(&self) -> usize {
1941        self.data.len()
1942    }
1943}
1944
1945// ============================================================================
1946// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1947// ============================================================================
1948
1949use crate::key_buffer::ArenaKeyHandle;
1950
1951/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1952struct ArenaEpochDirtyList {
1953    epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1954    current_epoch: AtomicU64,
1955}
1956
1957impl ArenaEpochDirtyList {
1958    fn new() -> Self {
1959        Self {
1960            epochs: [
1961                parking_lot::Mutex::new(Vec::new()),
1962                parking_lot::Mutex::new(Vec::new()),
1963                parking_lot::Mutex::new(Vec::new()),
1964                parking_lot::Mutex::new(Vec::new()),
1965            ],
1966            current_epoch: AtomicU64::new(0),
1967        }
1968    }
1969
1970    #[inline]
1971    fn record_version(&self, key: ArenaKeyHandle) {
1972        let epoch = self.current_epoch.load(Ordering::Relaxed);
1973        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1974        self.epochs[idx].lock().push(key);
1975    }
1976
1977    fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1978        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1979        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1980        let mut guard = self.epochs[old_idx].lock();
1981        let keys = std::mem::take(&mut *guard);
1982        (old_epoch, keys)
1983    }
1984}
1985
1986/// Arena-backed MVCC MemTable with optimized key storage
1987///
1988/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1989/// reducing per-write allocations from 3 to 1:
1990///
1991/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1992/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1993///
1994/// ## Performance
1995///
1996/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1997/// by reducing:
1998/// - Heap allocations: 3 → 1 per write
1999/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
2000pub struct ArenaMvccMemTable {
2001    /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
2002    data: DashMap<ArenaKeyHandle, VersionChain>,
2003    /// Ordered index for prefix scans
2004    ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
2005    /// Approximate size in bytes
2006    size_bytes: AtomicU64,
2007    /// Epoch-based dirty list (arena-backed)
2008    dirty_list: ArenaEpochDirtyList,
2009}
2010
2011impl ArenaMvccMemTable {
2012    pub fn new() -> Self {
2013        Self::with_ordered_index(true)
2014    }
2015
2016    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
2017        Self {
2018            data: DashMap::new(),
2019            ordered_index: if enable_ordered_index {
2020                Some(SkipMap::new())
2021            } else {
2022                None
2023            },
2024            size_bytes: AtomicU64::new(0),
2025            dirty_list: ArenaEpochDirtyList::new(),
2026        }
2027    }
2028
2029    /// Write a key-value pair using arena key handle
2030    ///
2031    /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
2032    /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
2033    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2034        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2035        let key_len = key.len();
2036
2037        // Create ONE ArenaKeyHandle - this is the only allocation for the key
2038        let key_handle = ArenaKeyHandle::new(key);
2039
2040        // Track in dirty list (O(1) copy of 16-byte handle)
2041        self.dirty_list.record_version(key_handle.clone());
2042
2043        // Insert into ordered index (O(1) copy of 16-byte handle)
2044        if let Some(ref idx) = self.ordered_index {
2045            idx.insert(key_handle.clone(), ());
2046        }
2047
2048        // Use entry API with the handle
2049        let mut entry = self.data.entry(key_handle).or_default();
2050
2051        if entry.has_write_conflict(txn_id) {
2052            return Err(SochDBError::Internal(
2053                "Write-write conflict detected".into(),
2054            ));
2055        }
2056        entry.add_uncommitted(value, txn_id);
2057        self.size_bytes
2058            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2059
2060        Ok(())
2061    }
2062
2063    /// Write batch using arena key handles
2064    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2065        let mut total_size = 0u64;
2066
2067        for (key, value) in writes {
2068            let key_handle = ArenaKeyHandle::new(key);
2069
2070            self.dirty_list.record_version(key_handle.clone());
2071
2072            if let Some(ref idx) = self.ordered_index {
2073                idx.insert(key_handle.clone(), ());
2074            }
2075
2076            let mut entry = self.data.entry(key_handle).or_default();
2077
2078            if entry.has_write_conflict(txn_id) {
2079                return Err(SochDBError::Internal(
2080                    "Write-write conflict detected".into(),
2081                ));
2082            }
2083
2084            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2085            entry.add_uncommitted(value.clone(), txn_id);
2086            total_size += (key.len() + value_size) as u64;
2087        }
2088
2089        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2090        Ok(())
2091    }
2092
2093    /// Read at snapshot timestamp
2094    pub fn read(
2095        &self,
2096        key: &[u8],
2097        snapshot_ts: u64,
2098        current_txn_id: Option<u64>,
2099    ) -> Option<Vec<u8>> {
2100        // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2101        let key_handle = ArenaKeyHandle::new(key);
2102        self.data.get(&key_handle).and_then(|chain| {
2103            chain
2104                .read_at(snapshot_ts, current_txn_id)
2105                .and_then(|v| v.value.clone())
2106        })
2107    }
2108
2109    /// Commit transaction
2110    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2111        for key in write_set {
2112            let key_handle = ArenaKeyHandle::new(key.as_slice());
2113            if let Some(mut chain) = self.data.get_mut(&key_handle) {
2114                chain.commit(txn_id, commit_ts);
2115            }
2116        }
2117    }
2118
2119    /// Abort transaction
2120    pub fn abort(&self, txn_id: u64) {
2121        for mut entry in self.data.iter_mut() {
2122            entry.value_mut().abort(txn_id);
2123        }
2124    }
2125
2126    /// Scan prefix
2127    pub fn scan_prefix(
2128        &self,
2129        prefix: &[u8],
2130        snapshot_ts: u64,
2131        current_txn_id: Option<u64>,
2132    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2133        let mut results = Vec::new();
2134        let prefix_handle = ArenaKeyHandle::new(prefix);
2135
2136        if let Some(ref idx) = self.ordered_index {
2137            for entry in idx.range(prefix_handle..) {
2138                let key = entry.key();
2139
2140                if !key.as_bytes().starts_with(prefix) {
2141                    break;
2142                }
2143
2144                if let Some(chain) = self.data.get(key)
2145                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2146                    && let Some(value) = &v.value
2147                {
2148                    results.push((key.as_bytes().to_vec(), value.clone()));
2149                }
2150            }
2151        } else {
2152            for entry in self.data.iter() {
2153                let key = entry.key();
2154                if !key.as_bytes().starts_with(prefix) {
2155                    continue;
2156                }
2157                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2158                    && let Some(value) = &v.value
2159                {
2160                    results.push((key.as_bytes().to_vec(), value.clone()));
2161                }
2162            }
2163        }
2164
2165        results
2166    }
2167
2168    /// Get approximate size
2169    pub fn size(&self) -> u64 {
2170        self.size_bytes.load(Ordering::Relaxed)
2171    }
2172
2173    /// Garbage collect old versions
2174    pub fn gc(&self, min_active_ts: u64) -> usize {
2175        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2176
2177        if dirty_keys.is_empty() {
2178            return 0;
2179        }
2180
2181        let mut gc_count = 0;
2182        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2183
2184        for key in unique_keys {
2185            if let Some(mut entry) = self.data.get_mut(&key) {
2186                let before = entry.value().version_count();
2187                entry.value_mut().gc(min_active_ts);
2188                gc_count += before.saturating_sub(entry.value().version_count());
2189            }
2190        }
2191
2192        gc_count
2193    }
2194}
2195
2196impl Default for ArenaMvccMemTable {
2197    fn default() -> Self {
2198        Self::new()
2199    }
2200}
2201
2202// ============================================================================
2203// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2204// ============================================================================
2205
2206/// Configuration for memtable type selection
2207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2208pub enum MemTableType {
2209    /// Standard MVCC memtable with deferred sorting
2210    /// Best for: general workloads, balanced read/write
2211    Standard,
2212    /// Arena-backed memtable with reduced allocations
2213    /// Best for: write-heavy workloads, large keys
2214    Arena,
2215}
2216
2217impl Default for MemTableType {
2218    fn default() -> Self {
2219        // Default to Standard which now has deferred sorting
2220        MemTableType::Standard
2221    }
2222}
2223
2224/// Unified memtable abstraction using enum dispatch
2225///
2226/// This pattern provides:
2227/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2228/// - Type-safe switching between implementations
2229/// - Easy extensibility for future memtable types
2230///
2231/// ## Why Enum over Trait Object?
2232///
2233/// - Hot path performance: enum match is a single branch vs vtable indirection
2234/// - Cache friendliness: no pointer chasing
2235/// - Inlining: compiler can inline through enum dispatch
2236pub enum MemTableKind {
2237    Standard(MvccMemTable),
2238    Arena(ArenaMvccMemTable),
2239}
2240
2241impl MemTableKind {
2242    /// Create a new memtable of the specified type
2243    pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2244        match kind {
2245            MemTableType::Standard => {
2246                MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2247            }
2248            MemTableType::Arena => {
2249                MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2250            }
2251        }
2252    }
2253
2254    /// Write a key-value pair
2255    #[inline]
2256    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2257        match self {
2258            MemTableKind::Standard(m) => m.write(key, value, txn_id),
2259            MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2260        }
2261    }
2262
2263    /// Write batch of key-value pairs
2264    #[inline]
2265    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2266        match self {
2267            MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2268            MemTableKind::Arena(m) => {
2269                // Convert to arena-compatible format
2270                let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2271                    .iter()
2272                    .map(|(k, v)| (k.as_slice(), v.clone()))
2273                    .collect();
2274                m.write_batch(&arena_writes, txn_id)
2275            }
2276        }
2277    }
2278
2279    /// Read at snapshot timestamp
2280    #[inline]
2281    pub fn read(
2282        &self,
2283        key: &[u8],
2284        snapshot_ts: u64,
2285        current_txn_id: Option<u64>,
2286    ) -> Option<Vec<u8>> {
2287        match self {
2288            MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2289            MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2290        }
2291    }
2292
2293    /// Commit transaction
2294    #[inline]
2295    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2296        match self {
2297            MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2298            MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2299        }
2300    }
2301
2302    /// Abort transaction
2303    #[inline]
2304    pub fn abort(&self, txn_id: u64) {
2305        match self {
2306            MemTableKind::Standard(m) => m.abort(txn_id),
2307            MemTableKind::Arena(m) => m.abort(txn_id),
2308        }
2309    }
2310
2311    /// Scan prefix
2312    #[inline]
2313    pub fn scan_prefix(
2314        &self,
2315        prefix: &[u8],
2316        snapshot_ts: u64,
2317        current_txn_id: Option<u64>,
2318    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2319        match self {
2320            MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2321            MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2322        }
2323    }
2324
2325    /// Scan range
2326    #[inline]
2327    pub fn scan_range(
2328        &self,
2329        start: &[u8],
2330        end: &[u8],
2331        snapshot_ts: u64,
2332        current_txn_id: Option<u64>,
2333    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2334        match self {
2335            MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2336            MemTableKind::Arena(m) => {
2337                // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2338                let mut results = Vec::new();
2339                if let Some(ref idx) = m.ordered_index {
2340                    let start_handle = ArenaKeyHandle::new(start);
2341                    let end_handle = ArenaKeyHandle::new(end);
2342
2343                    if end.is_empty() {
2344                        for entry in idx.range(start_handle..) {
2345                            let key = entry.key();
2346                            if let Some(chain) = m.data.get(key)
2347                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2348                                && let Some(value) = &v.value
2349                            {
2350                                results.push((key.as_bytes().to_vec(), value.clone()));
2351                            }
2352                        }
2353                    } else {
2354                        for entry in idx.range(start_handle..end_handle) {
2355                            let key = entry.key();
2356                            if let Some(chain) = m.data.get(key)
2357                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2358                                && let Some(value) = &v.value
2359                            {
2360                                results.push((key.as_bytes().to_vec(), value.clone()));
2361                            }
2362                        }
2363                    }
2364                } else {
2365                    for entry in m.data.iter() {
2366                        let key = entry.key();
2367                        let key_bytes = key.as_bytes();
2368                        if key_bytes < start {
2369                            continue;
2370                        }
2371                        if !end.is_empty() && key_bytes >= end {
2372                            continue;
2373                        }
2374                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2375                            && let Some(value) = &v.value
2376                        {
2377                            results.push((key_bytes.to_vec(), value.clone()));
2378                        }
2379                    }
2380                }
2381                results
2382            }
2383        }
2384    }
2385
2386    /// Scan range iterator (returns collected results for now)
2387    #[inline]
2388    pub fn scan_range_iter<'a>(
2389        &'a self,
2390        start: &'a [u8],
2391        end: &'a [u8],
2392        snapshot_ts: u64,
2393        current_txn_id: Option<u64>,
2394    ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2395        match self {
2396            MemTableKind::Standard(m) => {
2397                Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2398            }
2399            MemTableKind::Arena(_) => {
2400                // Arena version returns collected results as iterator
2401                let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2402                Box::new(results.into_iter())
2403            }
2404        }
2405    }
2406
2407    /// Get approximate size
2408    #[inline]
2409    pub fn size(&self) -> u64 {
2410        match self {
2411            MemTableKind::Standard(m) => m.size(),
2412            MemTableKind::Arena(m) => m.size(),
2413        }
2414    }
2415
2416    /// Garbage collect old versions
2417    #[inline]
2418    pub fn gc(&self, min_active_ts: u64) -> usize {
2419        match self {
2420            MemTableKind::Standard(m) => m.gc(min_active_ts),
2421            MemTableKind::Arena(m) => m.gc(min_active_ts),
2422        }
2423    }
2424
2425    /// Get the kind of memtable
2426    pub fn kind(&self) -> MemTableType {
2427        match self {
2428            MemTableKind::Standard(_) => MemTableType::Standard,
2429            MemTableKind::Arena(_) => MemTableType::Arena,
2430        }
2431    }
2432}
2433
2434/// Durable storage engine with full ACID support
2435pub struct DurableStorage {
2436    /// Path to storage directory
2437    path: PathBuf,
2438    /// Write-ahead log
2439    wal: Arc<TxnWal>,
2440    /// MVCC manager
2441    mvcc: Arc<MvccManager>,
2442    /// In-memory data (unified abstraction over Standard/Arena)
2443    memtable: Arc<MemTableKind>,
2444    /// Per-transaction WAL buffers for batched writes
2445    /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2446    /// At commit, buffer is flushed to WAL with single lock acquisition
2447    txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2448    /// Group commit buffer (optional)
2449    group_commit: Option<Arc<EventDrivenGroupCommit>>,
2450    /// Recovery state
2451    needs_recovery: AtomicU64, // 1 = needs recovery
2452    /// Last checkpoint LSN
2453    last_checkpoint_lsn: AtomicU64,
2454    /// Synchronous mode (like SQLite's PRAGMA synchronous)
2455    /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2456    sync_mode: AtomicU64,
2457    /// Commits since last sync (for NORMAL mode)
2458    commits_since_sync: AtomicU64,
2459    /// Adaptive batch sizing for NORMAL mode (Little's Law)
2460    /// Arrival rate in requests/sec × 1000 for precision
2461    arrival_rate_ema: AtomicU64,
2462    /// Last commit timestamp in microseconds
2463    last_commit_us: AtomicU64,
2464    /// Estimated fsync latency in microseconds
2465    fsync_latency_us: AtomicU64,
2466    /// Database lock for exclusive access (None = no locking)
2467    #[allow(dead_code)]
2468    db_lock: Option<crate::lock::DatabaseLock>,
2469    /// Whether at-rest encryption is active for this instance (drives the live
2470    /// per-instance durability matrix). Set from the resolved keyring at open.
2471    at_rest_encrypted: bool,
2472}
2473
2474/// Encryption configuration for opening a [`DurableStorage`].
2475///
2476/// `disabled()` (the default for the legacy open variants) keeps the database
2477/// plaintext and byte-compatible with pre-encryption binaries. `with_kek()`
2478/// supplies the Key-Encryption-Key — the operator secret that *wraps* a
2479/// per-database data key; it is never used verbatim as the cipher key (see
2480/// [`crate::keyring`]). A wrong/missing KEK for an encrypted database fails
2481/// closed at open (the DB will refuse to open, never silently read as plaintext).
2482pub struct StorageEncryption {
2483    /// The KEK. `None` ⇒ plaintext database.
2484    pub kek: Option<EncryptionKey>,
2485    /// Human-readable identifier for the key source (e.g. "env:SOCHDB_ENCRYPTION_KEY",
2486    /// "embedded", "kms:..."). Bound into the keyring for provenance.
2487    pub source_id: String,
2488}
2489
2490impl StorageEncryption {
2491    /// Plaintext (no encryption) — the default.
2492    pub fn disabled() -> Self {
2493        Self {
2494            kek: None,
2495            source_id: "none".to_string(),
2496        }
2497    }
2498
2499    /// Encrypt at rest under the given KEK.
2500    pub fn with_kek(kek: EncryptionKey, source_id: impl Into<String>) -> Self {
2501        Self {
2502            kek: Some(kek),
2503            source_id: source_id.into(),
2504        }
2505    }
2506
2507    /// Whether a key is configured (i.e. encryption is requested).
2508    pub fn is_enabled(&self) -> bool {
2509        self.kek.is_some()
2510    }
2511}
2512
2513impl DurableStorage {
2514    /// Open or create durable storage at path
2515    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2516        Self::open_with_config(path, true)
2517    }
2518
2519    /// Open with configurable ordered index
2520    ///
2521    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2522    /// but scan_prefix becomes O(N) instead of O(log N + K)
2523    pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2524        Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2525    }
2526
2527    /// Open with arena-backed memtable for write-heavy workloads
2528    ///
2529    /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2530    /// Best for workloads with:
2531    /// - High write throughput
2532    /// - Large keys (reduces allocation overhead)
2533    /// - Minimal concurrent reads during writes
2534    pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2535        Self::open_with_full_config(path, true, MemTableType::Arena)
2536    }
2537
2538    /// Open with full configuration options
2539    ///
2540    /// # Arguments
2541    /// * `path` - Storage directory path
2542    /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2543    /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2544    ///
2545    /// # Locking
2546    ///
2547    /// Acquires an exclusive advisory lock on the database directory.
2548    /// This prevents concurrent multi-process access which would corrupt data.
2549    /// If another process has the database open, returns `Err(DatabaseLocked)`.
2550    pub fn open_with_full_config<P: AsRef<Path>>(
2551        path: P,
2552        enable_ordered_index: bool,
2553        memtable_type: MemTableType,
2554    ) -> Result<Self> {
2555        Self::open_with_full_config_internal(
2556            path,
2557            enable_ordered_index,
2558            memtable_type,
2559            true,
2560            StorageEncryption::disabled(),
2561        )
2562    }
2563
2564    /// Open with at-rest encryption configured via [`StorageEncryption`].
2565    ///
2566    /// With `StorageEncryption::disabled()` this is identical to
2567    /// [`Self::open_with_full_config`]. With a KEK, the database is opened (or
2568    /// created) encrypted: a per-DB data key is generated and wrapped into the
2569    /// keyring on first open, and a wrong/missing key on a subsequent open fails
2570    /// closed. Reads are unaffected (the live read path is in-memory); only the
2571    /// WAL write/recovery path pays the AEAD cost.
2572    pub fn open_with_encryption<P: AsRef<Path>>(
2573        path: P,
2574        enable_ordered_index: bool,
2575        memtable_type: MemTableType,
2576        encryption: StorageEncryption,
2577    ) -> Result<Self> {
2578        Self::open_with_full_config_internal(
2579            path,
2580            enable_ordered_index,
2581            memtable_type,
2582            true,
2583            encryption,
2584        )
2585    }
2586
2587    /// The live, per-instance durability capabilities of THIS database.
2588    ///
2589    /// Unlike the build-level [`crate::durability_capabilities`] (which reports
2590    /// defaults), this reflects the actual resolved state — notably whether
2591    /// at-rest encryption is active for this opened instance.
2592    pub fn durability_capabilities(&self) -> DurabilityCapabilities {
2593        DurabilityCapabilities {
2594            crash_recovery: true,
2595            at_rest_encryption: self.at_rest_encrypted,
2596            // PITR / ARIES / fencing remain unwired on the live path (landing
2597            // incrementally in Task 3B).
2598            point_in_time_recovery: false,
2599            aries_checkpoint: false,
2600            wal_fencing: false,
2601        }
2602    }
2603
2604    /// Whether at-rest encryption is active for this instance.
2605    pub fn is_encrypted(&self) -> bool {
2606        self.at_rest_encrypted
2607    }
2608
2609    /// Open without locking (for testing crash recovery scenarios)
2610    ///
2611    /// # Safety
2612    /// This should ONLY be used in tests that simulate crashes by forgetting
2613    /// the storage instance. In production, always use `open_with_full_config`.
2614    #[cfg(test)]
2615    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2616        Self::open_with_full_config_internal(
2617            path,
2618            true,
2619            MemTableType::Standard,
2620            false,
2621            StorageEncryption::disabled(),
2622        )
2623    }
2624
2625    /// Open an ephemeral (in-memory-like) DurableStorage backed by a temp directory.
2626    ///
2627    /// Uses the full DurableStorage engine (WAL, MVCC, SSI) but writes to a
2628    /// temporary directory that is automatically cleaned up when the
2629    /// `EphemeralHandle` is dropped. This ensures test and production code paths
2630    /// are identical — bugs found in tests are guaranteed to reproduce in production.
2631    ///
2632    /// # Returns
2633    /// An `EphemeralHandle` that owns both the storage and the temp directory.
2634    /// Access the storage via `handle.storage()` or `Deref` coercion.
2635    ///
2636    /// # Example
2637    /// ```ignore
2638    /// let handle = DurableStorage::open_ephemeral()?;
2639    /// let txn = handle.begin_transaction()?;
2640    /// handle.write(txn, b"key".to_vec(), b"value".to_vec())?;
2641    /// handle.commit(txn)?;
2642    /// // temp directory cleaned up when `handle` drops
2643    /// ```
2644    pub fn open_ephemeral() -> Result<EphemeralHandle> {
2645        let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2646        let storage = Self::open_with_full_config_internal(
2647            tmp.path(),
2648            true,
2649            MemTableType::Standard,
2650            false, // No lock needed for ephemeral
2651            StorageEncryption::disabled(),
2652        )?;
2653        Ok(EphemeralHandle {
2654            storage,
2655            _tmpdir: tmp,
2656        })
2657    }
2658
2659    /// Open an ephemeral DurableStorage with group commit enabled.
2660    ///
2661    /// Same as `open_ephemeral()` but with group commit for higher throughput.
2662    pub fn open_ephemeral_with_group_commit() -> Result<EphemeralHandle> {
2663        let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2664        let mut storage = Self::open_with_full_config_internal(
2665            tmp.path(),
2666            true,
2667            MemTableType::Standard,
2668            false,
2669            StorageEncryption::disabled(),
2670        )?;
2671
2672        let wal = storage.wal.clone();
2673        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2674            for &txn_id in txn_ids {
2675                let entry = TxnWalEntry::txn_commit(txn_id);
2676                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2677            }
2678            wal.flush().map_err(|e| e.to_string())?;
2679            wal.sync().map_err(|e| e.to_string())?;
2680            Ok(std::time::SystemTime::now()
2681                .duration_since(std::time::UNIX_EPOCH)
2682                .unwrap()
2683                .as_micros() as u64)
2684        });
2685        storage.group_commit = Some(Arc::new(gc));
2686
2687        Ok(EphemeralHandle {
2688            storage,
2689            _tmpdir: tmp,
2690        })
2691    }
2692
2693    fn open_with_full_config_internal<P: AsRef<Path>>(
2694        path: P,
2695        enable_ordered_index: bool,
2696        memtable_type: MemTableType,
2697        acquire_lock: bool,
2698        encryption: StorageEncryption,
2699    ) -> Result<Self> {
2700        let path = path.as_ref().to_path_buf();
2701        std::fs::create_dir_all(&path)?;
2702
2703        // Acquire exclusive lock on database directory (unless disabled for testing)
2704        let db_lock = if acquire_lock {
2705            Some(
2706                crate::lock::DatabaseLock::acquire(&path)
2707                    .map_err(|e| SochDBError::LockError(e.to_string()))?,
2708            )
2709        } else {
2710            None
2711        };
2712
2713        let wal_path = path.join("wal.log");
2714
2715        // Resolve at-rest encryption from the keyring BEFORE touching the WAL.
2716        // - A fresh DB (no wal.log yet) with a KEK ⇒ create an encrypted keyring.
2717        // - An existing plaintext DB with a KEK ⇒ refused (must migrate explicitly).
2718        // - An encrypted DB with a wrong/missing KEK ⇒ refused fail-closed.
2719        let is_new_db = !wal_path.exists();
2720        let enc_state = keyring::load_or_init(
2721            &path,
2722            encryption.kek.as_ref(),
2723            &encryption.source_id,
2724            is_new_db,
2725        )?;
2726        let at_rest_encrypted = enc_state.is_encrypted();
2727        let wal = Arc::new(TxnWal::new_with_encryption(
2728            &wal_path,
2729            enc_state.engine(),
2730            enc_state.db_uuid(),
2731            enc_state.key_epoch(),
2732        )?);
2733
2734        let storage = Self {
2735            path,
2736            wal: wal.clone(),
2737            mvcc: Arc::new(MvccManager::new()),
2738            memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2739            txn_write_buffers: DashMap::new(),
2740            group_commit: None,
2741            needs_recovery: AtomicU64::new(0),
2742            last_checkpoint_lsn: AtomicU64::new(0),
2743            sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2744            commits_since_sync: AtomicU64::new(0),
2745            // Adaptive batch sizing (Little's Law)
2746            arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2747            last_commit_us: AtomicU64::new(0),
2748            fsync_latency_us: AtomicU64::new(5000), // 5ms default
2749            db_lock,
2750            at_rest_encrypted,
2751        };
2752
2753        // Check if recovery needed
2754        if storage.check_recovery_needed()? {
2755            storage.needs_recovery.store(1, Ordering::SeqCst);
2756        }
2757
2758        Ok(storage)
2759    }
2760
2761    /// Open with group commit enabled
2762    pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2763        Self::open_with_group_commit_and_config(path, true)
2764    }
2765
2766    /// Open with group commit and configurable ordered index
2767    pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2768        path: P,
2769        enable_ordered_index: bool,
2770    ) -> Result<Self> {
2771        let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2772
2773        let wal = storage.wal.clone();
2774        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2775            // Write all commit records WITHOUT flushing (batch them)
2776            for &txn_id in txn_ids {
2777                let entry = TxnWalEntry::txn_commit(txn_id);
2778                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2779            }
2780
2781            // Then do a SINGLE flush + fsync for the entire batch
2782            wal.flush().map_err(|e| e.to_string())?;
2783            wal.sync().map_err(|e| e.to_string())?;
2784
2785            // Return commit timestamp
2786            Ok(std::time::SystemTime::now()
2787                .duration_since(std::time::UNIX_EPOCH)
2788                .unwrap()
2789                .as_micros() as u64)
2790        });
2791
2792        storage.group_commit = Some(Arc::new(gc));
2793        Ok(storage)
2794    }
2795
2796    /// Open with IndexPolicy for automatic memtable/index configuration
2797    ///
2798    /// This is the recommended constructor for new code. The policy determines:
2799    /// - Whether to use ordered index (ScanOptimized only)
2800    /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2801    /// - Default settings optimized for the workload pattern
2802    ///
2803    /// # Arguments
2804    /// * `path` - Storage directory path
2805    /// * `policy` - Index policy determining write/scan tradeoffs
2806    /// * `group_commit` - Whether to enable group commit for throughput
2807    pub fn open_with_policy<P: AsRef<Path>>(
2808        path: P,
2809        policy: crate::index_policy::IndexPolicy,
2810        group_commit: bool,
2811    ) -> Result<Self> {
2812        use crate::index_policy::IndexPolicy;
2813
2814        // Derive configuration from policy
2815        let (enable_ordered_index, memtable_type) = match policy {
2816            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2817                // Write-heavy: no ordered index, use arena for reduced allocations
2818                (false, MemTableType::Arena)
2819            }
2820            IndexPolicy::Balanced => {
2821                // Mixed OLTP: deferred sorting (already implemented in Standard)
2822                (true, MemTableType::Standard)
2823            }
2824            IndexPolicy::ScanOptimized => {
2825                // Scan-heavy: maintain ordered index
2826                (true, MemTableType::Standard)
2827            }
2828        };
2829
2830        if group_commit {
2831            let mut storage =
2832                Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2833
2834            let wal = storage.wal.clone();
2835            let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2836                for &txn_id in txn_ids {
2837                    let entry = TxnWalEntry::txn_commit(txn_id);
2838                    wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2839                }
2840                wal.flush().map_err(|e| e.to_string())?;
2841                wal.sync().map_err(|e| e.to_string())?;
2842                Ok(std::time::SystemTime::now()
2843                    .duration_since(std::time::UNIX_EPOCH)
2844                    .unwrap()
2845                    .as_micros() as u64)
2846            });
2847            storage.group_commit = Some(Arc::new(gc));
2848            Ok(storage)
2849        } else {
2850            Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2851        }
2852    }
2853
2854    /// Open storage for concurrent mode (multi-reader, single-writer)
2855    ///
2856    /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2857    /// Coordination is handled by the concurrent MVCC layer instead.
2858    ///
2859    /// # Safety
2860    ///
2861    /// This must ONLY be called from `Database::open_concurrent()` which
2862    /// manages the concurrent MVCC coordination. Direct use will cause
2863    /// data corruption.
2864    pub fn open_for_concurrent<P: AsRef<Path>>(
2865        path: P,
2866        policy: crate::index_policy::IndexPolicy,
2867    ) -> Result<Self> {
2868        use crate::index_policy::IndexPolicy;
2869
2870        let (enable_ordered_index, memtable_type) = match policy {
2871            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => (false, MemTableType::Arena),
2872            IndexPolicy::Balanced => (true, MemTableType::Standard),
2873            IndexPolicy::ScanOptimized => (true, MemTableType::Standard),
2874        };
2875
2876        // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2877        Self::open_with_full_config_internal(
2878            path,
2879            enable_ordered_index,
2880            memtable_type,
2881            false,
2882            StorageEncryption::disabled(),
2883        )
2884    }
2885
2886    /// Get the memtable type being used
2887    pub fn memtable_type(&self) -> MemTableType {
2888        self.memtable.kind()
2889    }
2890
2891    /// Check if recovery is needed (dirty shutdown detection)
2892    ///
2893    /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2894    /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2895    /// but committed data still needs to be loaded from WAL into the memtable.
2896    fn check_recovery_needed(&self) -> Result<bool> {
2897        let marker_path = self.path.join(".clean_shutdown");
2898        if marker_path.exists() {
2899            // Clean shutdown - remove marker
2900            std::fs::remove_file(&marker_path)?;
2901        }
2902        // ALWAYS need recovery to rebuild memtable from WAL
2903        // The memtable is in-memory only and needs to be restored on every startup
2904        Ok(true)
2905    }
2906
2907    /// Perform crash recovery
2908    pub fn recover(&self) -> Result<RecoveryStats> {
2909        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2910            return Ok(RecoveryStats::default());
2911        }
2912
2913        let (writes, txn_count) = self.wal.replay_for_recovery()?;
2914
2915        // Apply committed writes to memtable
2916        let recovery_txn_id = self.wal.alloc_txn_id();
2917        let commit_ts = self.mvcc.alloc_commit_ts();
2918
2919        // Collect keys being written for efficient commit
2920        let mut write_set: HashSet<InlineKey> = HashSet::new();
2921        for (key, value) in &writes {
2922            write_set.insert(SmallVec::from_slice(key));
2923            self.memtable
2924                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2925        }
2926        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2927
2928        self.needs_recovery.store(0, Ordering::SeqCst);
2929
2930        Ok(RecoveryStats {
2931            transactions_recovered: txn_count,
2932            writes_recovered: writes.len(),
2933            commit_ts,
2934        })
2935    }
2936
2937    /// Begin a new transaction
2938    pub fn begin_transaction(&self) -> Result<u64> {
2939        let txn_id = self.wal.begin_transaction()?;
2940        self.mvcc.begin(txn_id);
2941        Ok(txn_id)
2942    }
2943
2944    /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2945    ///
2946    /// This enables mode-aware optimizations:
2947    /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2948    /// - WriteOnly: Skip read tracking, faster bulk inserts
2949    /// - ReadWrite: Full SSI for serializable isolation
2950    pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2951        let txn_id = self.wal.begin_transaction()?;
2952        self.mvcc.begin_with_mode(txn_id, mode);
2953        Ok(txn_id)
2954    }
2955
2956    /// Begin a read-only transaction without any WAL records.
2957    ///
2958    /// This is a performance-critical optimization that eliminates two WAL
2959    /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
2960    /// transactions have no state to recover, WAL records are unnecessary.
2961    ///
2962    /// Callers MUST use `abort_read_only_fast()` to clean up.
2963    #[inline]
2964    pub fn begin_read_only_fast(&self) -> u64 {
2965        let txn_id = self.wal.alloc_txn_id();
2966        self.mvcc.begin_read_only(txn_id);
2967        txn_id
2968    }
2969
2970    /// Abort a fast read-only transaction.
2971    ///
2972    /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
2973    #[inline]
2974    pub fn abort_read_only_fast(&self, txn_id: u64) {
2975        self.mvcc.abort(txn_id);
2976    }
2977
2978    /// Read a key WITHOUT any MVCC transaction tracking.
2979    ///
2980    /// Uses the current global timestamp to see all committed writes.
2981    /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
2982    /// Only safe for single-threaded access (no concurrent writes).
2983    #[inline]
2984    pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
2985        let snapshot_ts = self
2986            .mvcc
2987            .ts_counter
2988            .load(std::sync::atomic::Ordering::Relaxed);
2989        self.memtable.read(key, snapshot_ts, None)
2990    }
2991
2992    /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
2993    ///
2994    /// Uses the current global timestamp. Only safe for single-threaded access.
2995    #[inline]
2996    pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
2997        let snapshot_ts = self
2998            .mvcc
2999            .ts_counter
3000            .load(std::sync::atomic::Ordering::Relaxed);
3001        self.memtable.scan_prefix(prefix, snapshot_ts, None)
3002    }
3003
3004    /// Read a key within a transaction
3005    #[inline]
3006    pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
3007        // Fast path: get just snapshot_ts without cloning whole transaction
3008        let snapshot_ts = self
3009            .mvcc
3010            .get_snapshot_ts(txn_id)
3011            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3012
3013        // Record read for SSI (skipped for read-only transactions)
3014        self.mvcc.record_read(txn_id, key);
3015
3016        // Read at snapshot timestamp, seeing own uncommitted writes
3017        Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
3018    }
3019
3020    /// Write a key-value pair within a transaction
3021    ///
3022    /// Writes are buffered and only flushed to disk on commit.
3023    /// This provides ~10× better throughput for batched inserts.
3024    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
3025        // Use the zero-allocation path internally
3026        self.write_refs(txn_id, &key, &value)?;
3027
3028        Ok(())
3029    }
3030
3031    /// Write from references - zero allocation hot path
3032    ///
3033    /// Avoids cloning key/value by writing to WAL from refs directly,
3034    /// then only allocating once for memtable storage.
3035    #[inline]
3036    pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
3037        // Record write for MVCC (uses InlineKey - zero allocation for small keys)
3038        self.mvcc.record_write(txn_id, key);
3039
3040        // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
3041        // This reduces lock contention from O(writes) to O(1) per transaction
3042        self.txn_write_buffers
3043            .entry(txn_id)
3044            .or_insert_with(|| TxnWalBuffer::new(txn_id))
3045            .append(key, value);
3046
3047        // Write to memtable (needs owned key/value for storage)
3048        self.memtable
3049            .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
3050
3051        Ok(())
3052    }
3053
3054    /// Delete a key within a transaction
3055    pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
3056        // Record write (uses InlineKey - zero allocation for small keys)
3057        self.mvcc.record_write(txn_id, &key);
3058
3059        // Buffer tombstone in memory - NO WAL lock taken!
3060        self.txn_write_buffers
3061            .entry(txn_id)
3062            .or_insert_with(|| TxnWalBuffer::new(txn_id))
3063            .append(&key, &[]); // Empty value = tombstone
3064
3065        // Write tombstone to memtable
3066        self.memtable.write(key, None, txn_id)?;
3067
3068        Ok(())
3069    }
3070
3071    /// Batch write multiple key-value pairs with reduced overhead
3072    ///
3073    /// This API amortizes fixed costs over the batch:
3074    /// - Single DashMap entry lookup for TxnWalBuffer
3075    /// - Single MVCC write set update
3076    /// - Batch memtable operations
3077    ///
3078    /// Performance: ~2-3x faster than individual write_refs calls
3079    /// for batches of 100+ entries.
3080    ///
3081    /// # Arguments
3082    /// * `txn_id` - Transaction ID
3083    /// * `writes` - Slice of (key, value) pairs
3084    #[inline]
3085    pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
3086        if writes.is_empty() {
3087            return Ok(());
3088        }
3089
3090        // Single DashMap access for entire batch
3091        let mut buffer_entry = self
3092            .txn_write_buffers
3093            .entry(txn_id)
3094            .or_insert_with(|| TxnWalBuffer::new(txn_id));
3095
3096        // Batch operations with reduced per-row overhead
3097        for (key, value) in writes {
3098            // Record write for MVCC
3099            self.mvcc.record_write(txn_id, key);
3100
3101            // Append to WAL buffer
3102            buffer_entry.append(key, value);
3103        }
3104        drop(buffer_entry);
3105
3106        // Batch write to memtable
3107        let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
3108            .iter()
3109            .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
3110            .collect();
3111        self.memtable.write_batch(&owned_writes, txn_id)?;
3112
3113        Ok(())
3114    }
3115
3116    /// Commit a transaction
3117    ///
3118    /// With sync_mode:
3119    /// - 0 (OFF): No sync, risk of data loss
3120    /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
3121    /// - 2 (FULL): Sync every commit (safest, slowest)
3122    pub fn commit(&self, txn_id: u64) -> Result<u64> {
3123        // First, flush all buffered DATA writes to the WAL with a SINGLE lock
3124        // acquisition (O(1) lock instead of O(writes) locks). Flushing data
3125        // records before validation is safe: ARIES recovery only treats them as
3126        // winners if a *durable commit record* for this transaction also exists,
3127        // and that record is written below — strictly after validation succeeds.
3128        if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
3129            && !buffer.is_empty()
3130        {
3131            // Flush entire buffer to WAL with one lock
3132            self.wal.flush_buffer(&buffer)?;
3133        }
3134
3135        // ====================================================================
3136        // Task 1 — Linearizable commit invariant:
3137        //
3138        //   A commit record must NEVER become durable unless the transaction
3139        //   has already passed SSI validation and its write set is final.
3140        //
3141        // We therefore VALIDATE (and freeze the write set) *before* making the
3142        // commit record durable. `mvcc.commit` performs SSI validation and
3143        // returns `None` on a dangerous structure (serialization conflict) or
3144        // if the transaction no longer exists. Writing/fsyncing the commit
3145        // record before this point would let a crash resurrect a transaction
3146        // that the live system rejected — the classic "committed-after-crash,
3147        // aborted-before-crash" non-linearizability bug.
3148        // ====================================================================
3149        let (commit_ts, write_set) = self.mvcc.commit(txn_id).ok_or_else(|| {
3150            SochDBError::Validation(
3151                "transaction aborted: SSI validation failed (serialization conflict) \
3152                 or transaction not found"
3153                    .into(),
3154            )
3155        })?;
3156
3157        // Validation passed and the write set is frozen. Only now may the
3158        // commit record become durable.
3159        if let Some(gc) = &self.group_commit {
3160            // Submit the *validated* commit intent to group commit and wait.
3161            // This batches multiple validated commits into a single fsync.
3162            gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
3163        } else {
3164            // Direct commit path with adaptive sync (Little's Law)
3165            let sync_mode = self.sync_mode.load(Ordering::Relaxed);
3166            let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
3167
3168            // Update arrival rate for adaptive batching
3169            self.update_arrival_rate();
3170
3171            // Write commit record (no flush yet - BufWriter will buffer it)
3172            let entry = TxnWalEntry::txn_commit(txn_id);
3173            self.wal.append_no_flush(&entry)?;
3174
3175            // Determine if we should sync/flush based on mode
3176            let should_sync = match sync_mode {
3177                0 => false,                                      // OFF: never sync
3178                1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
3179                _ => true,                                       // FULL: always sync
3180            };
3181
3182            if should_sync {
3183                // Measure fsync latency for adaptive tuning
3184                let start = std::time::Instant::now();
3185                self.wal.flush()?;
3186                self.wal.sync()?;
3187                let latency_us = start.elapsed().as_micros() as u64;
3188
3189                // Update fsync latency estimate (EMA with α = 0.1)
3190                let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
3191                let new_latency = (old_latency * 9 + latency_us) / 10;
3192                self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
3193
3194                self.commits_since_sync.store(0, Ordering::Relaxed);
3195            }
3196        }
3197
3198        // Commit record is durable (or buffered per sync mode) for a validated
3199        // transaction — publish the writes to the memtable. (O(k), k = keys.)
3200        self.memtable.commit(txn_id, commit_ts, &write_set);
3201
3202        Ok(commit_ts)
3203    }
3204
3205    /// Update arrival rate using exponential moving average
3206    #[inline]
3207    fn update_arrival_rate(&self) {
3208        let now_us = std::time::SystemTime::now()
3209            .duration_since(std::time::UNIX_EPOCH)
3210            .unwrap()
3211            .as_micros() as u64;
3212
3213        let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
3214
3215        if last > 0 {
3216            let delta_us = now_us.saturating_sub(last);
3217            if delta_us > 0 && delta_us < 10_000_000 {
3218                // Ignore gaps > 10s
3219                // Rate = 1_000_000 / delta_us (requests/sec)
3220                // Stored as rate × 1000 for precision
3221                let instant_rate = 1_000_000_000 / delta_us;
3222
3223                // EMA with α = 0.1
3224                let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
3225                let new_rate = (old_rate * 9 + instant_rate) / 10;
3226                self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
3227            }
3228        }
3229    }
3230
3231    /// Compute optimal batch threshold using Little's Law
3232    ///
3233    /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
3234    /// Returns the number of commits to batch before fsync
3235    #[inline]
3236    fn adaptive_batch_threshold(&self) -> u64 {
3237        let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
3238        let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
3239
3240        if lambda <= 0.0 || tau <= 0.0 {
3241            return 100; // Fallback to fixed threshold
3242        }
3243
3244        // Little's Law: W* = sqrt(2 × τ × λ)
3245        // This minimizes total time = wait_time + fsync_overhead
3246        let n_opt = (2.0 * tau * lambda).sqrt();
3247
3248        // Clamp between 1 and 1000
3249        (n_opt as u64).clamp(1, 1000)
3250    }
3251
3252    /// Set synchronous mode
3253    ///
3254    /// - 0: OFF - No fsync (risk of data loss)
3255    /// - 1: NORMAL - Periodic fsync (balanced)
3256    /// - 2: FULL - Fsync every commit (safest)
3257    pub fn set_sync_mode(&self, mode: u64) {
3258        self.sync_mode.store(mode.min(2), Ordering::Relaxed);
3259    }
3260
3261    /// Force a group commit flush (useful for benchmarking or testing)
3262    pub fn flush_group_commit(&self) {
3263        if let Some(gc) = &self.group_commit {
3264            gc.flush_batch();
3265        }
3266    }
3267
3268    /// Abort a transaction
3269    ///
3270    /// Performance: O(1) for read-only transactions (no writes to clean up).
3271    /// For write transactions, O(N) memtable scan is required to remove
3272    /// uncommitted versions.
3273    pub fn abort(&self, txn_id: u64) -> Result<()> {
3274        // Check if transaction had any buffered writes.
3275        // Read-only transactions never populate txn_write_buffers,
3276        // so this returns None — allowing us to skip the O(N) memtable scan.
3277        let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3278
3279        if had_writes {
3280            // Write abort record to WAL (only needed if data was written)
3281            self.wal.abort_transaction(txn_id)?;
3282            // Clean up uncommitted memtable entries
3283            self.memtable.abort(txn_id);
3284        }
3285
3286        // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3287        self.mvcc.abort(txn_id);
3288
3289        Ok(())
3290    }
3291
3292    /// Scan keys with prefix
3293    #[inline]
3294    pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3295        // Fast path: get just snapshot_ts without cloning whole transaction
3296        let snapshot_ts = self
3297            .mvcc
3298            .get_snapshot_ts(txn_id)
3299            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3300
3301        // Note: Scan doesn't record individual key reads for SSI (too expensive)
3302        // SSI conflicts are tracked at the prefix level if needed
3303        Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3304    }
3305
3306    /// Scan keys in range
3307    #[inline]
3308    pub fn scan_range(
3309        &self,
3310        txn_id: u64,
3311        start: &[u8],
3312        end: &[u8],
3313    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3314        let snapshot_ts = self
3315            .mvcc
3316            .get_snapshot_ts(txn_id)
3317            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3318
3319        Ok(self
3320            .memtable
3321            .scan_range(start, end, snapshot_ts, Some(txn_id)))
3322    }
3323
3324    /// Streaming scan for very large result sets
3325    ///
3326    /// Returns an iterator that yields (key, value) pairs without
3327    /// materializing the entire result set in memory.
3328    #[inline]
3329    pub fn scan_range_iter<'a>(
3330        &'a self,
3331        txn_id: u64,
3332        start: &'a [u8],
3333        end: &'a [u8],
3334    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3335        let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3336        self.memtable
3337            .scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3338    }
3339
3340    /// Force fsync to disk
3341    /// Flush the WAL's in-memory buffer to the OS
3342    ///
3343    /// This ensures all buffered writes are pushed from the BufWriter
3344    /// into the OS page cache. Call this before `fsync()` to ensure
3345    /// all data is durable.
3346    pub fn flush_wal(&self) -> Result<()> {
3347        self.wal.flush()
3348    }
3349
3350    /// Force sync the WAL to disk (fsync)
3351    pub fn fsync(&self) -> Result<()> {
3352        self.wal.sync()
3353    }
3354
3355    /// Write checkpoint
3356    pub fn checkpoint(&self) -> Result<u64> {
3357        let txn_id = 0; // System transaction
3358        let entry = TxnWalEntry::checkpoint(txn_id);
3359        let lsn = self.wal.append_sync(&entry)?;
3360        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3361        Ok(lsn)
3362    }
3363
3364    /// Truncate the WAL file after checkpoint.
3365    ///
3366    /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3367    /// space. The in-memory memtable retains all data for the current
3368    /// session, but a crash after truncation will result in data loss
3369    /// since the WAL is the only persistence mechanism for DurableStorage.
3370    ///
3371    /// Call after `checkpoint()` when WAL durability across restarts is
3372    /// not required (e.g. desktop telemetry viewers, caches).
3373    pub fn truncate_wal(&self) -> Result<()> {
3374        self.wal.truncate()
3375    }
3376
3377    /// Get storage statistics
3378    pub fn stats(&self) -> StorageStats {
3379        // Get WAL size from the WAL manager
3380        let wal_size = self.wal.size_bytes();
3381
3382        // Get active transaction count from MVCC
3383        let active_txns = self.mvcc.active_transaction_count();
3384
3385        StorageStats {
3386            memtable_size_bytes: self.memtable.size(),
3387            wal_size_bytes: wal_size,
3388            active_transactions: active_txns,
3389            min_active_snapshot: self.mvcc.min_active_snapshot(),
3390            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3391        }
3392    }
3393
3394    /// Garbage collect old versions
3395    pub fn gc(&self) -> usize {
3396        let min_ts = self.mvcc.min_active_snapshot();
3397        self.memtable.gc(min_ts)
3398    }
3399
3400    /// Clean shutdown
3401    pub fn shutdown(&self) -> Result<()> {
3402        // Sync WAL
3403        self.fsync()?;
3404
3405        // Write clean shutdown marker
3406        let marker_path = self.path.join(".clean_shutdown");
3407        std::fs::write(&marker_path, b"clean")?;
3408
3409        Ok(())
3410    }
3411}
3412
3413impl Drop for DurableStorage {
3414    fn drop(&mut self) {
3415        let _ = self.shutdown();
3416    }
3417}
3418
3419// =============================================================================
3420// EphemeralHandle - Temp-directory-backed DurableStorage for testing
3421// =============================================================================
3422
3423/// Owns a `DurableStorage` instance backed by a temporary directory.
3424///
3425/// The temp directory is automatically cleaned up when this handle is dropped.
3426/// Access the underlying storage via `Deref` coercion or `.storage()`.
3427///
3428/// # Why this exists
3429///
3430/// SochDB previously had two storage engines — `LscsStorage` (BTreeMap-backed,
3431/// in-memory WAL, used by tests) and `DurableStorage` (SkipMap-backed, real WAL,
3432/// used in production). This dual-engine architecture meant bugs could surface
3433/// only in production because the test path exercised different code.
3434///
3435/// `EphemeralHandle` eliminates this divergence: tests use the exact same
3436/// `DurableStorage` engine as production, just backed by a temp directory.
3437pub struct EphemeralHandle {
3438    storage: DurableStorage,
3439    _tmpdir: tempfile::TempDir,
3440}
3441
3442impl EphemeralHandle {
3443    /// Get a reference to the underlying storage
3444    pub fn storage(&self) -> &DurableStorage {
3445        &self.storage
3446    }
3447
3448    /// Get a mutable reference to the underlying storage
3449    pub fn storage_mut(&mut self) -> &mut DurableStorage {
3450        &mut self.storage
3451    }
3452
3453    /// Consume the handle and return the storage and temp directory.
3454    ///
3455    /// Useful when you need an `Arc<DurableStorage>` — the caller must keep
3456    /// the `TempDir` alive for the lifetime of the storage.
3457    pub fn into_parts(self) -> (DurableStorage, tempfile::TempDir) {
3458        (self.storage, self._tmpdir)
3459    }
3460}
3461
3462impl std::ops::Deref for EphemeralHandle {
3463    type Target = DurableStorage;
3464    fn deref(&self) -> &DurableStorage {
3465        &self.storage
3466    }
3467}
3468
3469impl std::ops::DerefMut for EphemeralHandle {
3470    fn deref_mut(&mut self) -> &mut DurableStorage {
3471        &mut self.storage
3472    }
3473}
3474
3475/// Recovery statistics
3476#[derive(Debug, Default)]
3477pub struct RecoveryStats {
3478    pub transactions_recovered: usize,
3479    pub writes_recovered: usize,
3480    pub commit_ts: u64,
3481}
3482
3483/// Storage statistics
3484#[derive(Debug, Default)]
3485pub struct StorageStats {
3486    pub memtable_size_bytes: u64,
3487    pub wal_size_bytes: u64,
3488    pub active_transactions: usize,
3489    pub min_active_snapshot: u64,
3490    pub last_checkpoint_lsn: u64,
3491}
3492
3493#[cfg(test)]
3494mod tests {
3495    use super::*;
3496    use tempfile::tempdir;
3497
3498    #[test]
3499    fn test_basic_transaction() {
3500        let dir = tempdir().unwrap();
3501        let storage = DurableStorage::open(dir.path()).unwrap();
3502
3503        // Begin transaction
3504        let txn_id = storage.begin_transaction().unwrap();
3505
3506        // Write data
3507        storage
3508            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3509            .unwrap();
3510        storage
3511            .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3512            .unwrap();
3513
3514        // Read back (within same transaction)
3515        let v1 = storage.read(txn_id, b"key1").unwrap();
3516        assert_eq!(v1, Some(b"value1".to_vec()));
3517
3518        // Commit
3519        let commit_ts = storage.commit(txn_id).unwrap();
3520        assert!(commit_ts > 0);
3521
3522        // Read in new transaction
3523        let txn2 = storage.begin_transaction().unwrap();
3524        let v1 = storage.read(txn2, b"key1").unwrap();
3525        assert_eq!(v1, Some(b"value1".to_vec()));
3526        storage.abort(txn2).unwrap();
3527    }
3528
3529    #[test]
3530    fn test_snapshot_isolation() {
3531        let dir = tempdir().unwrap();
3532        let storage = DurableStorage::open(dir.path()).unwrap();
3533
3534        // T1: Write initial value
3535        let t1 = storage.begin_transaction().unwrap();
3536        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3537        storage.commit(t1).unwrap();
3538
3539        // T2: Start reading (snapshot at this point)
3540        let t2 = storage.begin_transaction().unwrap();
3541
3542        // T3: Update the value
3543        let t3 = storage.begin_transaction().unwrap();
3544        storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3545        storage.commit(t3).unwrap();
3546
3547        // T2 should still see v1 (snapshot isolation)
3548        let v = storage.read(t2, b"key").unwrap();
3549        assert_eq!(v, Some(b"v1".to_vec()));
3550
3551        // New transaction should see v2
3552        let t4 = storage.begin_transaction().unwrap();
3553        let v = storage.read(t4, b"key").unwrap();
3554        assert_eq!(v, Some(b"v2".to_vec()));
3555
3556        storage.abort(t2).unwrap();
3557        storage.abort(t4).unwrap();
3558    }
3559
3560    #[test]
3561    fn test_gc_preserves_versions_for_active_snapshot() {
3562        // GC must not free a version that an in-flight reader's snapshot can
3563        // still observe. The low-water-mark (min_active_snapshot) is the oldest
3564        // live reader; any version visible to it must survive GC.
3565        let dir = tempdir().unwrap();
3566        let storage = DurableStorage::open(dir.path()).unwrap();
3567
3568        // Seed an initial committed version v1.
3569        let t1 = storage.begin_transaction().unwrap();
3570        storage.write(t1, b"k".to_vec(), b"v1".to_vec()).unwrap();
3571        storage.commit(t1).unwrap();
3572
3573        // Open a long-lived snapshot reader BEFORE any newer writes. Its
3574        // snapshot_ts pins the GC watermark so v1 cannot be reclaimed.
3575        let reader = storage.begin_transaction().unwrap();
3576        assert_eq!(
3577            storage.read(reader, b"k").unwrap(),
3578            Some(b"v1".to_vec()),
3579            "reader's snapshot must see v1"
3580        );
3581
3582        // Produce several newer committed versions while the reader is active.
3583        for v in ["v2", "v3", "v4"] {
3584            let w = storage.begin_transaction().unwrap();
3585            storage
3586                .write(w, b"k".to_vec(), v.as_bytes().to_vec())
3587                .unwrap();
3588            storage.commit(w).unwrap();
3589        }
3590
3591        // Run GC. Because `reader` is still active, the watermark is pinned at
3592        // its snapshot and the version it needs (v1) must NOT be freed.
3593        let _freed = storage.gc();
3594        assert_eq!(
3595            storage.read(reader, b"k").unwrap(),
3596            Some(b"v1".to_vec()),
3597            "GC must not free a version still visible to an active snapshot"
3598        );
3599
3600        // A fresh transaction sees the latest committed version.
3601        let fresh = storage.begin_transaction().unwrap();
3602        assert_eq!(storage.read(fresh, b"k").unwrap(), Some(b"v4".to_vec()));
3603        storage.abort(fresh).unwrap();
3604
3605        // Release the old reader; the watermark can now advance.
3606        storage.abort(reader).unwrap();
3607
3608        // After the old snapshot is gone, GC may reclaim superseded versions,
3609        // and new readers still resolve the latest value correctly.
3610        let _freed2 = storage.gc();
3611        let after = storage.begin_transaction().unwrap();
3612        assert_eq!(storage.read(after, b"k").unwrap(), Some(b"v4".to_vec()));
3613        storage.abort(after).unwrap();
3614    }
3615
3616    #[test]
3617    fn test_ssi_detects_write_skew() {
3618        // Classic write-skew: two concurrent transactions each read what the
3619        // other writes (disjoint write keys). Under plain SI both would commit,
3620        // violating serializability. Under SSI the pivot transaction (with both
3621        // an inbound and an outbound rw-edge) must be aborted, so the two
3622        // commits cannot both succeed.
3623        let dir = tempdir().unwrap();
3624        let storage = DurableStorage::open(dir.path()).unwrap();
3625
3626        // Seed two keys.
3627        let seed = storage.begin_transaction().unwrap();
3628        storage.write(seed, b"x".to_vec(), b"0".to_vec()).unwrap();
3629        storage.write(seed, b"y".to_vec(), b"0".to_vec()).unwrap();
3630        storage.commit(seed).unwrap();
3631
3632        // T1 reads x and y, then writes x.
3633        let t1 = storage.begin_transaction().unwrap();
3634        let _ = storage.read(t1, b"x").unwrap();
3635        let _ = storage.read(t1, b"y").unwrap();
3636
3637        // T2 reads x and y, then writes y. (Concurrent with T1.)
3638        let t2 = storage.begin_transaction().unwrap();
3639        let _ = storage.read(t2, b"x").unwrap();
3640        let _ = storage.read(t2, b"y").unwrap();
3641
3642        storage.write(t1, b"x".to_vec(), b"1".to_vec()).unwrap();
3643        storage.write(t2, b"y".to_vec(), b"1".to_vec()).unwrap();
3644
3645        // First committer wins.
3646        let c1 = storage.commit(t1);
3647        assert!(c1.is_ok(), "first committer should succeed: {c1:?}");
3648
3649        // The second committer forms a dangerous rw-structure with T1 and must
3650        // be rejected to preserve serializability.
3651        let c2 = storage.commit(t2);
3652        assert!(
3653            c2.is_err(),
3654            "SSI must abort the write-skew pivot, but commit succeeded"
3655        );
3656    }
3657
3658    #[test]
3659    fn test_abort_transaction() {
3660        let dir = tempdir().unwrap();
3661        let storage = DurableStorage::open(dir.path()).unwrap();
3662
3663        // Write initial value
3664        let t1 = storage.begin_transaction().unwrap();
3665        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3666        storage.commit(t1).unwrap();
3667
3668        // Start transaction that will abort
3669        let t2 = storage.begin_transaction().unwrap();
3670        storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3671        storage.abort(t2).unwrap();
3672
3673        // New transaction should see v1 (aborted changes not visible)
3674        let t3 = storage.begin_transaction().unwrap();
3675        let v = storage.read(t3, b"key").unwrap();
3676        assert_eq!(v, Some(b"v1".to_vec()));
3677        storage.abort(t3).unwrap();
3678    }
3679
3680    #[test]
3681    fn test_crash_recovery() {
3682        let dir = tempdir().unwrap();
3683
3684        // Phase 1: Write data and commit
3685        {
3686            // Use open_without_lock for crash simulation tests
3687            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3688
3689            // Set sync mode to FULL to ensure data is synced before "crash"
3690            storage.set_sync_mode(2); // FULL: sync every commit
3691
3692            let txn = storage.begin_transaction().unwrap();
3693            storage
3694                .write(txn, b"persist".to_vec(), b"data".to_vec())
3695                .unwrap();
3696            storage.commit(txn).unwrap();
3697
3698            // Simulate crash (no clean shutdown)
3699            std::mem::forget(storage);
3700        }
3701
3702        // Phase 2: Reopen and recover
3703        {
3704            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3705            let stats = storage.recover().unwrap();
3706            assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3707
3708            // Data should be recovered
3709            let txn = storage.begin_transaction().unwrap();
3710            let v = storage.read(txn, b"persist").unwrap();
3711            assert_eq!(v, Some(b"data".to_vec()));
3712            storage.abort(txn).unwrap();
3713        }
3714    }
3715
3716    /// At-rest encryption end-to-end through the live DurableStorage engine
3717    /// (Task 3B): an encrypted DB round-trips committed data, never leaks
3718    /// plaintext to the WAL file, flips the per-instance durability matrix, and
3719    /// fails CLOSED on a wrong or missing key (never a silent plaintext/empty
3720    /// open).
3721    #[test]
3722    fn test_at_rest_encryption_end_to_end() {
3723        let dir = tempdir().unwrap();
3724        let kek = || EncryptionKey::new([0xABu8; 32]);
3725
3726        // Phase 1: create an encrypted DB, write committed data, clean close.
3727        {
3728            let storage = DurableStorage::open_with_encryption(
3729                dir.path(),
3730                true,
3731                MemTableType::Standard,
3732                StorageEncryption::with_kek(kek(), "test"),
3733            )
3734            .unwrap();
3735            assert!(storage.is_encrypted());
3736            assert!(storage.durability_capabilities().at_rest_encryption);
3737            storage.set_sync_mode(2);
3738            let t = storage.begin_transaction().unwrap();
3739            storage
3740                .write(t, b"secret-key".to_vec(), b"secret-value".to_vec())
3741                .unwrap();
3742            storage.commit(t).unwrap();
3743        } // clean drop releases the file lock
3744
3745        // The keyring exists and the WAL bytes do not leak the plaintext record.
3746        assert!(dir.path().join("keyring.json").exists());
3747        let raw = std::fs::read(dir.path().join("wal.log")).unwrap();
3748        assert!(!contains(&raw, b"secret-value"), "value leaked to WAL");
3749        assert!(!contains(&raw, b"secret-key"), "key leaked to WAL");
3750
3751        // Phase 2: reopen with the CORRECT key, recover, read back.
3752        {
3753            let storage = DurableStorage::open_with_encryption(
3754                dir.path(),
3755                true,
3756                MemTableType::Standard,
3757                StorageEncryption::with_kek(kek(), "test"),
3758            )
3759            .unwrap();
3760            storage.recover().unwrap();
3761            let t = storage.begin_transaction().unwrap();
3762            assert_eq!(
3763                storage.read(t, b"secret-key").unwrap(),
3764                Some(b"secret-value".to_vec()),
3765                "committed encrypted data must round-trip"
3766            );
3767            storage.abort(t).unwrap();
3768        }
3769
3770        // Phase 3: WRONG key must fail closed (keyring canary), not open empty.
3771        {
3772            let wrong = DurableStorage::open_with_encryption(
3773                dir.path(),
3774                true,
3775                MemTableType::Standard,
3776                StorageEncryption::with_kek(EncryptionKey::new([0x00u8; 32]), "test"),
3777            );
3778            assert!(wrong.is_err(), "wrong KEK must fail closed");
3779        }
3780
3781        // Phase 4: opening an encrypted DB with NO key must fail closed.
3782        {
3783            let no_key = DurableStorage::open_with_encryption(
3784                dir.path(),
3785                true,
3786                MemTableType::Standard,
3787                StorageEncryption::disabled(),
3788            );
3789            assert!(
3790                no_key.is_err(),
3791                "encrypted DB opened without key must fail closed"
3792            );
3793        }
3794    }
3795
3796    /// A plaintext DB reports at_rest_encryption=false on the live matrix.
3797    #[test]
3798    fn test_plaintext_db_reports_unencrypted() {
3799        let dir = tempdir().unwrap();
3800        let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3801        assert!(!storage.is_encrypted());
3802        assert!(!storage.durability_capabilities().at_rest_encryption);
3803        assert!(storage.durability_capabilities().crash_recovery);
3804        assert!(!dir.path().join("keyring.json").exists());
3805    }
3806
3807    fn contains(haystack: &[u8], needle: &[u8]) -> bool {
3808        haystack.windows(needle.len()).any(|w| w == needle)
3809    }
3810
3811    /// Crash-atomicity invariant (Task 4 — completes the Task 1 single-writer
3812    /// contract). `test_crash_recovery` proves committed data survives a crash;
3813    /// this proves the other half: recovery must NOT resurrect aborted or
3814    /// in-flight (never-committed) writes. Together they assert the live
3815    /// single-writer engine's commit is atomic AND durable across a crash.
3816    #[test]
3817    fn test_crash_recovery_atomicity() {
3818        let dir = tempdir().unwrap();
3819
3820        // Phase 1: one committed write, one aborted, one in-flight at crash.
3821        {
3822            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3823            storage.set_sync_mode(2); // FULL: fsync every commit before the "crash"
3824
3825            // Committed — must survive.
3826            let t1 = storage.begin_transaction().unwrap();
3827            storage
3828                .write(t1, b"committed".to_vec(), b"durable".to_vec())
3829                .unwrap();
3830            storage.commit(t1).unwrap();
3831
3832            // Aborted — must NOT be resurrected.
3833            let t2 = storage.begin_transaction().unwrap();
3834            storage
3835                .write(t2, b"aborted".to_vec(), b"rolledback".to_vec())
3836                .unwrap();
3837            storage.abort(t2).unwrap();
3838
3839            // In-flight (never committed) at crash time — must NOT be resurrected.
3840            let t3 = storage.begin_transaction().unwrap();
3841            storage
3842                .write(t3, b"inflight".to_vec(), b"lost".to_vec())
3843                .unwrap();
3844
3845            // Simulate a crash: skip Drop / clean shutdown.
3846            std::mem::forget(storage);
3847        }
3848
3849        // Phase 2: reopen + recover; assert atomicity.
3850        {
3851            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3852            storage.recover().unwrap();
3853
3854            let t = storage.begin_transaction().unwrap();
3855            assert_eq!(
3856                storage.read(t, b"committed").unwrap(),
3857                Some(b"durable".to_vec()),
3858                "committed write must survive the crash"
3859            );
3860            assert_eq!(
3861                storage.read(t, b"aborted").unwrap(),
3862                None,
3863                "aborted write must not be resurrected by recovery"
3864            );
3865            assert_eq!(
3866                storage.read(t, b"inflight").unwrap(),
3867                None,
3868                "uncommitted in-flight write must not be resurrected by recovery"
3869            );
3870            storage.abort(t).unwrap();
3871        }
3872    }
3873
3874    #[test]
3875    fn test_scan_prefix() {
3876        let dir = tempdir().unwrap();
3877        let storage = DurableStorage::open(dir.path()).unwrap();
3878
3879        let txn = storage.begin_transaction().unwrap();
3880        storage
3881            .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3882            .unwrap();
3883        storage
3884            .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3885            .unwrap();
3886        storage
3887            .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3888            .unwrap();
3889        storage.commit(txn).unwrap();
3890
3891        let txn2 = storage.begin_transaction().unwrap();
3892        let users = storage.scan(txn2, b"user:").unwrap();
3893        assert_eq!(users.len(), 2);
3894        storage.abort(txn2).unwrap();
3895    }
3896
3897    #[test]
3898    fn test_delete() {
3899        let dir = tempdir().unwrap();
3900        let storage = DurableStorage::open(dir.path()).unwrap();
3901
3902        // Insert
3903        let t1 = storage.begin_transaction().unwrap();
3904        storage
3905            .write(t1, b"key".to_vec(), b"value".to_vec())
3906            .unwrap();
3907        storage.commit(t1).unwrap();
3908
3909        // Verify exists
3910        let t2 = storage.begin_transaction().unwrap();
3911        assert!(storage.read(t2, b"key").unwrap().is_some());
3912        storage.abort(t2).unwrap();
3913
3914        // Delete
3915        let t3 = storage.begin_transaction().unwrap();
3916        storage.delete(t3, b"key".to_vec()).unwrap();
3917        storage.commit(t3).unwrap();
3918
3919        // Verify deleted
3920        let t4 = storage.begin_transaction().unwrap();
3921        assert!(storage.read(t4, b"key").unwrap().is_none());
3922        storage.abort(t4).unwrap();
3923    }
3924
3925    #[test]
3926    fn test_gc() {
3927        let dir = tempdir().unwrap();
3928        let storage = DurableStorage::open(dir.path()).unwrap();
3929
3930        // Create multiple versions
3931        for i in 0..10 {
3932            let txn = storage.begin_transaction().unwrap();
3933            storage
3934                .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3935                .unwrap();
3936            storage.commit(txn).unwrap();
3937        }
3938
3939        // GC should reclaim old versions
3940        let gc_count = storage.gc();
3941        // At least some versions should be collected
3942        // (exact count depends on implementation)
3943        let _ = gc_count; // gc_count is usize, always >= 0
3944    }
3945
3946    #[test]
3947    fn test_group_commit() {
3948        use std::sync::Arc;
3949        use std::thread;
3950
3951        let dir = tempdir().unwrap();
3952        let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3953
3954        // Spawn multiple threads to commit concurrently
3955        let mut handles = vec![];
3956        for i in 0..4 {
3957            let storage = Arc::clone(&storage);
3958            handles.push(thread::spawn(move || {
3959                let txn = storage.begin_transaction().unwrap();
3960                storage
3961                    .write(
3962                        txn,
3963                        format!("key{}", i).into_bytes(),
3964                        format!("val{}", i).into_bytes(),
3965                    )
3966                    .unwrap();
3967                storage.commit(txn).unwrap()
3968            }));
3969        }
3970
3971        // Wait for all commits
3972        let mut commit_times = vec![];
3973        for h in handles {
3974            commit_times.push(h.join().unwrap());
3975        }
3976
3977        // All commits should succeed
3978        assert!(commit_times.iter().all(|&ts| ts > 0));
3979
3980        // Verify data persisted
3981        let txn = storage.begin_transaction().unwrap();
3982        for i in 0..4 {
3983            let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3984            assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3985        }
3986        storage.abort(txn).unwrap();
3987    }
3988
3989    // ==================== ArenaMvccMemTable Tests ====================
3990
3991    #[test]
3992    fn test_arena_memtable_basic_write_read() {
3993        let memtable = ArenaMvccMemTable::new();
3994
3995        // Write some values
3996        memtable
3997            .write(b"key1", Some(b"value1".to_vec()), 1)
3998            .unwrap();
3999        memtable
4000            .write(b"key2", Some(b"value2".to_vec()), 1)
4001            .unwrap();
4002
4003        // Read them back (uncommitted, so need txn_id match)
4004        assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
4005        assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
4006        assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
4007    }
4008
4009    #[test]
4010    fn test_arena_memtable_update() {
4011        let memtable = ArenaMvccMemTable::new();
4012
4013        memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
4014        memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
4015
4016        assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
4017    }
4018
4019    #[test]
4020    fn test_arena_memtable_delete() {
4021        let memtable = ArenaMvccMemTable::new();
4022
4023        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4024        memtable.write(b"key", None, 1).unwrap(); // Delete = None value
4025
4026        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4027    }
4028
4029    #[test]
4030    fn test_arena_memtable_scan_prefix() {
4031        let memtable = ArenaMvccMemTable::new();
4032
4033        memtable
4034            .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
4035            .unwrap();
4036        memtable
4037            .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
4038            .unwrap();
4039        memtable
4040            .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
4041            .unwrap();
4042        memtable
4043            .write(b"order:1", Some(b"order_data".to_vec()), 1)
4044            .unwrap();
4045
4046        // Create a write set and commit
4047        let mut write_set = HashSet::new();
4048        write_set.insert(InlineKey::from_slice(b"user:1:name"));
4049        write_set.insert(InlineKey::from_slice(b"user:1:email"));
4050        write_set.insert(InlineKey::from_slice(b"user:2:name"));
4051        write_set.insert(InlineKey::from_slice(b"order:1"));
4052        memtable.commit(1, 10, &write_set);
4053
4054        // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
4055        let results = memtable.scan_prefix(b"user:1:", 11, None);
4056        assert_eq!(results.len(), 2);
4057
4058        // Scan for all users
4059        let results = memtable.scan_prefix(b"user:", 11, None);
4060        assert_eq!(results.len(), 3);
4061    }
4062
4063    #[test]
4064    fn test_arena_memtable_write_batch() {
4065        let memtable = ArenaMvccMemTable::new();
4066
4067        let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
4068            (b"k1", Some(b"v1".to_vec())),
4069            (b"k2", Some(b"v2".to_vec())),
4070            (b"k3", Some(b"v3".to_vec())),
4071        ];
4072
4073        memtable.write_batch(&writes, 1).unwrap();
4074
4075        assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
4076        assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
4077        assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
4078    }
4079
4080    #[test]
4081    fn test_arena_memtable_gc() {
4082        let memtable = ArenaMvccMemTable::new();
4083
4084        // Write multiple versions
4085        for i in 0..10 {
4086            memtable
4087                .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
4088                .unwrap();
4089
4090            let mut write_set = HashSet::new();
4091            write_set.insert(InlineKey::from_slice(b"key"));
4092            memtable.commit(i + 1, (i + 1) * 10, &write_set);
4093        }
4094
4095        // GC old versions
4096        let gc_count = memtable.gc(90);
4097        let _ = gc_count; // gc_count is usize, always >= 0
4098    }
4099
4100    #[test]
4101    fn test_arena_memtable_size_tracking() {
4102        let memtable = ArenaMvccMemTable::new();
4103
4104        assert_eq!(memtable.size(), 0);
4105
4106        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4107
4108        assert!(memtable.size() > 0);
4109    }
4110
4111    #[test]
4112    fn test_arena_memtable_abort() {
4113        let memtable = ArenaMvccMemTable::new();
4114
4115        memtable
4116            .write(b"key", Some(b"uncommitted".to_vec()), 1)
4117            .unwrap();
4118
4119        // Visible to same txn
4120        assert_eq!(
4121            memtable.read(b"key", 0, Some(1)),
4122            Some(b"uncommitted".to_vec())
4123        );
4124
4125        // Not visible to other txns
4126        assert_eq!(memtable.read(b"key", 0, Some(2)), None);
4127
4128        // Abort
4129        memtable.abort(1);
4130
4131        // No longer visible
4132        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4133    }
4134
4135    // ========================================================================
4136    // MemTableKind Tests - Unified Abstraction
4137    // ========================================================================
4138
4139    #[test]
4140    fn test_memtable_kind_standard() {
4141        let memtable = MemTableKind::new(MemTableType::Standard, true);
4142        assert_eq!(memtable.kind(), MemTableType::Standard);
4143
4144        // Write and read
4145        memtable
4146            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4147            .unwrap();
4148
4149        // Commit transaction at ts=100
4150        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4151        memtable.commit(1, 100, &write_set);
4152
4153        // Read after commit - snapshot_ts must be > commit_ts for visibility
4154        let v = memtable.read(b"key1", 101, None);
4155        assert_eq!(v, Some(b"value1".to_vec()));
4156    }
4157
4158    #[test]
4159    fn test_memtable_kind_arena() {
4160        let memtable = MemTableKind::new(MemTableType::Arena, true);
4161        assert_eq!(memtable.kind(), MemTableType::Arena);
4162
4163        // Write and read
4164        memtable
4165            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4166            .unwrap();
4167
4168        // Commit at ts=100
4169        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4170        memtable.commit(1, 100, &write_set);
4171
4172        // Read after commit - snapshot_ts > commit_ts
4173        let v = memtable.read(b"key1", 101, None);
4174        assert_eq!(v, Some(b"value1".to_vec()));
4175    }
4176
4177    #[test]
4178    fn test_memtable_kind_scan_range() {
4179        // Test both implementations have consistent behavior
4180        for kind in [MemTableType::Standard, MemTableType::Arena] {
4181            let memtable = MemTableKind::new(kind, true);
4182
4183            // Write some data
4184            for i in 0..5 {
4185                let key = format!("key{}", i);
4186                let value = format!("value{}", i);
4187                memtable
4188                    .write(key.into_bytes(), Some(value.into_bytes()), 1)
4189                    .unwrap();
4190            }
4191
4192            // Commit all at ts=100
4193            let write_set: HashSet<InlineKey> = (0..5)
4194                .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
4195                .collect();
4196            memtable.commit(1, 100, &write_set);
4197
4198            // Scan range with snapshot_ts > commit_ts
4199            let results = memtable.scan_range(b"key1", b"key4", 101, None);
4200            assert_eq!(
4201                results.len(),
4202                3,
4203                "kind={:?} should have 3 results (key1, key2, key3)",
4204                kind
4205            );
4206        }
4207    }
4208
4209    #[test]
4210    fn test_durable_storage_arena() {
4211        let dir = tempdir().unwrap();
4212        let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
4213
4214        assert_eq!(storage.memtable_type(), MemTableType::Arena);
4215
4216        // Basic transaction should work the same
4217        let txn_id = storage.begin_transaction().unwrap();
4218        storage
4219            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
4220            .unwrap();
4221        storage.commit(txn_id).unwrap();
4222
4223        let txn2 = storage.begin_transaction().unwrap();
4224        let v = storage.read(txn2, b"key1").unwrap();
4225        assert_eq!(v, Some(b"value1".to_vec()));
4226        storage.abort(txn2).unwrap();
4227    }
4228
4229    #[test]
4230    fn test_durable_storage_full_config() {
4231        let dir = tempdir().unwrap();
4232
4233        // Test with Arena and ordered index enabled
4234        let storage =
4235            DurableStorage::open_with_full_config(dir.path(), true, MemTableType::Arena).unwrap();
4236
4237        assert_eq!(storage.memtable_type(), MemTableType::Arena);
4238
4239        // Write multiple keys
4240        let txn = storage.begin_transaction().unwrap();
4241        for i in 0..10 {
4242            let key = format!("key{:02}", i);
4243            let value = format!("value{}", i);
4244            storage
4245                .write(txn, key.into_bytes(), value.into_bytes())
4246                .unwrap();
4247        }
4248        storage.commit(txn).unwrap();
4249
4250        // Scan should work (uses scan method for prefix)
4251        let txn2 = storage.begin_transaction().unwrap();
4252        let results = storage.scan(txn2, b"key0").unwrap();
4253        assert_eq!(results.len(), 10); // key00 through key09
4254        storage.abort(txn2).unwrap();
4255    }
4256}