Skip to main content

sochdb_storage/
durable_storage.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! Wires together the live storage components into a durable engine:
21//!
22//! - WAL (txn_wal.rs) for crash-consistent durability + recovery
23//! - Group Commit for throughput
24//! - MVCC for isolation
25//! - LSCS for columnar efficiency
26//!
27//! Truth-in-capabilities: the live path provides crash-consistent WAL recovery,
28//! but NOT at-rest encryption, point-in-time recovery, ARIES checkpointing, or
29//! WAL fencing — those modules exist but are quarantined behind the empty,
30//! non-default `experimental` feature and are unwired. Query
31//! [`crate::durability_capabilities`] rather than relying on prose like
32//! "production-grade".
33//!
34//! ## Architecture
35//!
36//! ```text
37//! ┌─────────────────────────────────────────────────────────────────┐
38//! │                      DurableStorage                              │
39//! ├─────────────────────────────────────────────────────────────────┤
40//! │  ┌─────────────┐    ┌─────────────┐    ┌─────────────────────┐ │
41//! │  │ MvccManager │    │ GroupCommit │───▶│ TxnWal (fsync)      │ │
42//! │  │             │    │             │    └─────────────────────┘ │
43//! │  │ ┌─────────┐ │    └─────────────┘                            │
44//! │  │ │Snapshots│ │                                                │
45//! │  │ └─────────┘ │    ┌─────────────────────────────────────────┐│
46//! │  │ ┌─────────┐ │    │              MemTable                    ││
47//! │  │ │ Txn Map │ │    │  (key → (value, txn_id, version))       ││
48//! │  │ └─────────┘ │    └─────────────────────────────────────────┘│
49//! │  └─────────────┘                                                │
50//! │                      ┌─────────────────────────────────────────┐│
51//! │                      │              LSCS (SST)                  ││
52//! │                      │  Immutable columnar segments             ││
53//! │                      └─────────────────────────────────────────┘│
54//! └─────────────────────────────────────────────────────────────────┘
55//! ```
56//!
57//! ## Concurrency
58//!
59//! - Writers: Serialize through WAL, use MVCC for conflict detection
60//! - Readers: Lock-free reads at snapshot timestamp
61//! - Commits: Batched through GroupCommit for throughput
62//!
63//! ## Isolation Contract
64//!
65//! The live write path (`MvccManager`, used by `DurableStorage`) provides
66//! **Serializable Snapshot Isolation (SSI)**, which is strictly stronger than
67//! plain Snapshot Isolation (SI):
68//!
69//! - **Snapshot reads.** Every transaction reads from a consistent snapshot
70//!   fixed at `begin_transaction()` (its `snapshot_ts`). Concurrent commits are
71//!   invisible to it — see `test_snapshot_isolation`. This alone is SI and is
72//!   vulnerable to **write skew** (two transactions each read a set the other
73//!   writes, both commit, and no serial order reproduces the result).
74//! - **Write-skew prevention.** On commit, `MvccManager::validate_ssi` inspects
75//!   recently-committed concurrent transactions for rw-antidependency edges:
76//!   an inbound edge (another txn wrote a key we read, `T_other →rw→ T_me`) and
77//!   an outbound edge (we wrote a key another txn read, `T_me →rw→ T_other`).
78//!   When a transaction sits on **both** an inbound and an outbound rw-edge it
79//!   is the pivot of a potential dependency cycle (Cahill/Fekete "dangerous
80//!   structure"), so it is aborted. This is the conservative safe subset of the
81//!   SSI test: it may abort some serializable schedules (false positives) but
82//!   **never admits a non-serializable one** (no false negatives), so the
83//!   externally observable isolation level is Serializable.
84//! - **Read-only transactions** (`begin_read_only`) skip read-set tracking and
85//!   never participate in validation; they always observe a serializable
86//!   snapshot and never abort.
87//!
88//! ### MVCC garbage collection is snapshot-safe
89//!
90//! Old versions are pruned by `DurableStorage::gc()`, which feeds the
91//! **low-water-mark** `MvccManager::min_active_snapshot()` — the minimum
92//! `snapshot_ts` across all still-active transactions — into
93//! `MvccMemTable::gc()` and then `BinarySearchChain::gc_by_ts()`. The chain
94//! retains every version with `commit_ts > min_active_ts` **plus one anchor
95//! version at or below the watermark**, so any in-flight reader can still
96//! resolve the correct version for its snapshot. A version is only freed once
97//! **no active snapshot can observe it**. The watermark is recomputed on every
98//! `begin`/`commit`/`abort`, so it is monotonic with respect to the oldest live
99//! reader. See `test_gc_preserves_versions_for_active_snapshot`.
100
101use std::collections::HashSet;
102use std::path::{Path, PathBuf};
103use std::sync::Arc;
104use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
105
106use dashmap::DashMap;
107use smallvec::SmallVec;
108
109use crossbeam_skiplist::SkipMap;
110
111use crate::DurabilityCapabilities;
112use crate::deferred_index::{DeferredIndexConfig, DeferredSortedIndex};
113use crate::encryption::EncryptionKey;
114use crate::group_commit::EventDrivenGroupCommit;
115use crate::keyring;
116use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
117use sochdb_core::version_chain::{
118    BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
119    VisibilityContext, WriteConflictDetection,
120};
121use sochdb_core::{Result, SochDBError};
122
123// =============================================================================
124// SSI Bloom Filter - Fast Conflict Pre-Filtering
125// =============================================================================
126
127/// Space-efficient Bloom filter for SSI conflict detection
128///
129/// Used to quickly determine if two transactions MIGHT have conflicting keys.
130/// False positives are acceptable (leads to unnecessary exact checks),
131/// but false negatives are not allowed.
132///
133/// ## Configuration
134///
135/// For 1000 keys with 1% false positive rate:
136/// - m = ~9600 bits ≈ 1.2 KB per transaction
137/// - k = 7 hash functions
138///
139/// ## Lazy Initialization
140///
141/// The bit vector is lazily initialized on first insert to avoid
142/// allocation overhead for read-only transactions.
143#[derive(Clone, Debug)]
144pub struct SsiBloomFilter {
145    /// Bit vector (each u64 holds 64 bits) - lazily initialized
146    bits: Option<Vec<u64>>,
147    /// Expected capacity (used for lazy init sizing)
148    expected_capacity: usize,
149    /// Number of hash functions to use
150    num_hashes: u32,
151}
152
153impl SsiBloomFilter {
154    /// Optimal number of bits per item for 1% false positive rate
155    /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
156    const BITS_PER_ITEM: f64 = 9.6;
157
158    /// Optimal number of hash functions for 1% false positive rate
159    /// k = (m/n) × ln(2) ≈ 7
160    const DEFAULT_NUM_HASHES: u32 = 7;
161
162    /// Minimum capacity to avoid tiny filters
163    const MIN_CAPACITY: usize = 64;
164
165    /// Create a new bloom filter for expected item count (lazy allocation)
166    ///
167    /// Configured for ~1% false positive rate.
168    /// The bit vector is not allocated until first insert.
169    #[inline]
170    pub fn new(expected_items: usize) -> Self {
171        Self {
172            bits: None,
173            expected_capacity: expected_items.max(Self::MIN_CAPACITY),
174            num_hashes: Self::DEFAULT_NUM_HASHES,
175        }
176    }
177
178    /// Create with specific capacity in words (for memory-constrained scenarios)
179    pub fn with_word_capacity(words: usize) -> Self {
180        Self {
181            bits: None,
182            expected_capacity: words.max(1) * 64 / 10, // Approx items from words
183            num_hashes: Self::DEFAULT_NUM_HASHES,
184        }
185    }
186
187    /// Ensure bits are allocated (lazy initialization)
188    #[inline]
189    fn ensure_allocated(&mut self) {
190        if self.bits.is_none() {
191            let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
192            let num_words = num_bits.div_ceil(64);
193            self.bits = Some(vec![0u64; num_words]);
194        }
195    }
196
197    /// Add a key to the filter - O(k) where k = num_hashes
198    #[inline]
199    pub fn insert(&mut self, key: &[u8]) {
200        self.ensure_allocated();
201        let bits = self.bits.as_mut().unwrap();
202        let num_bits = bits.len() * 64;
203        if num_bits == 0 {
204            return;
205        }
206
207        // Use two hash functions to simulate k hash functions
208        // h(i) = h1 + i * h2 (double hashing technique)
209        let h1 = Self::hash1(key);
210        let h2 = Self::hash2(key);
211
212        for i in 0..self.num_hashes {
213            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
214            let bit_idx = (h as usize) % num_bits;
215            let word_idx = bit_idx / 64;
216            let bit_pos = bit_idx % 64;
217            bits[word_idx] |= 1 << bit_pos;
218        }
219    }
220
221    /// Check if a key might be present - O(k)
222    ///
223    /// Returns:
224    /// - false: Key is definitely NOT in the set (or filter not initialized)
225    /// - true: Key MIGHT be in the set (needs exact check)
226    #[inline]
227    pub fn may_contain(&self, key: &[u8]) -> bool {
228        let bits = match &self.bits {
229            Some(b) => b,
230            None => return false, // Uninitialized = empty
231        };
232        let num_bits = bits.len() * 64;
233        if num_bits == 0 {
234            return false;
235        }
236
237        let h1 = Self::hash1(key);
238        let h2 = Self::hash2(key);
239
240        for i in 0..self.num_hashes {
241            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
242            let bit_idx = (h as usize) % num_bits;
243            let word_idx = bit_idx / 64;
244            let bit_pos = bit_idx % 64;
245            if bits[word_idx] & (1 << bit_pos) == 0 {
246                return false; // Definitely not present
247            }
248        }
249        true // Might be present
250    }
251
252    /// Check if this filter might intersect with another
253    ///
254    /// Fast O(m/64) check using bitwise AND of all words.
255    /// If no bits are shared, sets are definitely disjoint.
256    #[inline]
257    pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
258        let (self_bits, other_bits) = match (&self.bits, &other.bits) {
259            (Some(s), Some(o)) => (s, o),
260            _ => return false, // Either uninitialized = no intersection
261        };
262        let min_len = self_bits.len().min(other_bits.len());
263        for i in 0..min_len {
264            if self_bits[i] & other_bits[i] != 0 {
265                return true; // Might intersect
266            }
267        }
268        false // Definitely disjoint
269    }
270
271    /// First hash function (using built-in hasher)
272    #[inline]
273    fn hash1(key: &[u8]) -> u64 {
274        use std::collections::hash_map::DefaultHasher;
275        use std::hash::{Hash, Hasher};
276        let mut hasher = DefaultHasher::new();
277        key.hash(&mut hasher);
278        hasher.finish()
279    }
280
281    /// Second hash function (using twox-hash for independence)
282    #[inline]
283    fn hash2(key: &[u8]) -> u64 {
284        twox_hash::xxh3::hash64(key)
285    }
286
287    /// Get the memory size in bytes
288    pub fn size_bytes(&self) -> usize {
289        self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
290    }
291
292    /// Check if the filter is empty
293    pub fn is_empty(&self) -> bool {
294        match &self.bits {
295            Some(bits) => bits.iter().all(|&w| w == 0),
296            None => true,
297        }
298    }
299}
300
301/// Type alias for inline key storage - keys up to 32 bytes stored on stack
302/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
303pub type InlineKey = SmallVec<[u8; 32]>;
304
305/// Version of a key-value pair
306#[derive(Debug, Clone)]
307pub struct Version {
308    /// The value (None = tombstone)
309    pub value: Option<Vec<u8>>,
310    /// Transaction that created this version
311    pub txn_id: u64,
312    /// Commit timestamp (0 = uncommitted)
313    pub commit_ts: u64,
314}
315
316// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
317impl ChainEntry for Version {
318    #[inline]
319    fn commit_ts(&self) -> u64 {
320        self.commit_ts
321    }
322    #[inline]
323    fn txn_id(&self) -> u64 {
324        self.txn_id
325    }
326    #[inline]
327    fn set_commit_ts(&mut self, ts: u64) {
328        self.commit_ts = ts;
329    }
330}
331
332// ============================================================================
333// Optimized VersionChain with Binary Search (Task 1: mm.md)
334// ============================================================================
335
336/// Multi-version data for a single key with O(log v) read complexity
337///
338/// ## Optimization: Binary Search with Sorted Commit Ordering
339///
340/// Separates committed versions (sorted descending by commit_ts) from
341/// uncommitted version (single optional slot per transaction).
342///
343/// **Before:** O(v) linear scan + O(v) max computation = O(v)
344/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
345///
346/// For v=10 versions: 3.3x speedup
347/// For v=100 versions: 7x speedup
348///
349/// ## Rec 11: Consolidated
350///
351/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
352/// eliminating duplication with `mvcc_concurrent::VersionChain`.
353#[derive(Debug, Default)]
354pub struct VersionChain {
355    /// Consolidated binary-search chain (Rec 11)
356    inner: BinarySearchChain<Version>,
357}
358
359impl VersionChain {
360    /// Create a new empty version chain
361    #[inline]
362    pub fn new() -> Self {
363        Self {
364            inner: BinarySearchChain::new(),
365        }
366    }
367
368    /// Add a new uncommitted version
369    /// If there's already an uncommitted version from this txn, update it in place
370    ///
371    /// O(1) - just updates the uncommitted slot
372    #[inline]
373    pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
374        match self.inner.uncommitted_mut() {
375            Some(v) if v.txn_id == txn_id => {
376                // Update in place - O(1)
377                v.value = value;
378            }
379            _ => {
380                // New or different txn — set the slot
381                self.inner.set_uncommitted(Version {
382                    value,
383                    txn_id,
384                    commit_ts: 0,
385                });
386            }
387        }
388    }
389
390    /// Commit a version - moves from uncommitted slot to sorted committed list
391    ///
392    /// O(log v) - inserts into sorted position using binary search
393    #[inline]
394    pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
395        self.inner.commit(txn_id, commit_ts)
396    }
397
398    /// Abort a version (remove uncommitted version for txn)
399    ///
400    /// O(1) - just clears the uncommitted slot if it matches
401    #[inline]
402    pub fn abort(&mut self, txn_id: u64) {
403        self.inner.abort(txn_id);
404    }
405
406    /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
407    ///
408    /// ## Complexity: O(1) + O(log v) = O(log v)
409    #[inline]
410    pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
411        self.inner.read_at(snapshot_ts, current_txn_id)
412    }
413
414    /// Check if there's an uncommitted version by another transaction
415    ///
416    /// O(1) - just checks the uncommitted slot
417    #[inline]
418    pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
419        self.inner.has_write_conflict(my_txn_id)
420    }
421
422    /// Garbage collect old versions
423    pub fn gc(&mut self, min_active_ts: u64) {
424        self.inner.gc_by_ts(min_active_ts);
425    }
426
427    /// Get total version count (committed + uncommitted)
428    #[inline]
429    pub fn version_count(&self) -> usize {
430        self.inner.version_count()
431    }
432
433    // Legacy compatibility: get versions vec (for tests)
434    #[cfg(test)]
435    pub fn versions(&self) -> Vec<Version> {
436        let mut result = self.inner.committed_versions().to_vec();
437        if let Some(v) = self.inner.uncommitted() {
438            result.push(v.clone());
439        }
440        result
441    }
442}
443
444// =============================================================================
445// Rec 6: Unified Version Chain Trait Implementations
446// =============================================================================
447
448impl MvccVersionChain for VersionChain {
449    type Value = Option<Vec<u8>>;
450
451    fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
452        // Delegate to BinarySearchChain, then project to value field
453        self.inner
454            .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
455            .map(|v| &v.value)
456    }
457
458    fn get_latest(&self) -> Option<&Self::Value> {
459        self.inner.latest().map(|v| &v.value)
460    }
461
462    fn version_count(&self) -> usize {
463        self.inner.version_count()
464    }
465}
466
467impl MvccVersionChainMut for VersionChain {
468    fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
469        self.add_uncommitted(value, txn_id);
470    }
471
472    fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
473        self.inner.commit(txn_id, commit_ts)
474    }
475
476    fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
477        // Insert a tombstone (None value) as uncommitted
478        self.add_uncommitted(None, txn_id);
479        true
480    }
481
482    fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
483        let before = self.inner.committed_count();
484        self.inner.gc_by_ts(min_visible_ts);
485        let removed = before - self.inner.committed_count();
486        (removed, removed * std::mem::size_of::<Version>())
487    }
488}
489
490impl WriteConflictDetection for VersionChain {
491    fn has_write_conflict(&self, txn_id: TxnId) -> bool {
492        self.has_write_conflict(txn_id)
493    }
494}
495
496// =============================================================================
497// Pre-sizing Constants to Avoid HashSet Resize Overhead
498// =============================================================================
499
500/// Default capacity for write_set HashSet
501/// Sized for typical OLTP transactions (10-50 keys)
502/// Avoids resize overhead that caused +11% regression
503const WRITE_SET_INITIAL_CAPACITY: usize = 32;
504
505/// Default capacity for read_set HashSet  
506/// Typically larger than write_set due to read-heavy patterns
507const READ_SET_INITIAL_CAPACITY: usize = 64;
508
509/// Transaction state for MVCC
510#[derive(Debug, Clone)]
511pub struct MvccTransaction {
512    /// Transaction ID
513    pub txn_id: u64,
514    /// Snapshot timestamp (reads see commits before this)
515    pub snapshot_ts: u64,
516    /// Keys written by this transaction - uses SmallVec for inline storage
517    /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
518    pub write_set: HashSet<InlineKey>,
519    /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
520    /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
521    pub read_set: HashSet<InlineKey>,
522    /// Bloom filter for write set - fast SSI pre-filtering
523    pub write_bloom: SsiBloomFilter,
524    /// Bloom filter for read set - fast SSI pre-filtering
525    pub read_bloom: SsiBloomFilter,
526    /// Transaction state
527    pub state: TxnState,
528    /// Transaction mode for SSI optimization (Recommendation 9)
529    /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
530    pub mode: TransactionMode,
531}
532
533impl MvccTransaction {
534    /// Create a new transaction with pre-sized collections
535    ///
536    /// This avoids HashSet resize overhead during the transaction
537    /// which was causing +11% regression on write_set.insert().
538    #[inline]
539    pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
540        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
541    }
542
543    /// Create a read-only transaction (SSI bypass - 2.6x faster)
544    ///
545    /// Read-only transactions skip all SSI tracking:
546    /// - No read_set allocation
547    /// - No read_bloom allocation  
548    /// - No commit validation
549    ///
550    /// ## Performance
551    ///
552    /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
553    #[inline]
554    pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
555        Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
556    }
557
558    /// Create a write-only transaction (partial SSI bypass)
559    ///
560    /// Write-only transactions skip read tracking:
561    /// - No read_set tracking
562    /// - No read_bloom inserts
563    /// - Still needs write_set for commit
564    #[inline]
565    pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
566        Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
567    }
568
569    /// Create transaction with specific mode
570    #[inline]
571    pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
572        // Optimize allocation based on mode
573        let (write_capacity, read_capacity) = match mode {
574            TransactionMode::ReadOnly => (0, 0), // No tracking needed
575            TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
576            TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
577        };
578        Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
579    }
580
581    /// Create with custom capacities for expected workload
582    ///
583    /// Use this when you know the transaction will write many keys
584    /// to avoid resize overhead entirely.
585    #[inline]
586    pub fn with_capacity(
587        txn_id: u64,
588        snapshot_ts: u64,
589        write_capacity: usize,
590        read_capacity: usize,
591        mode: TransactionMode,
592    ) -> Self {
593        Self {
594            txn_id,
595            snapshot_ts,
596            write_set: HashSet::with_capacity(write_capacity),
597            read_set: HashSet::with_capacity(read_capacity),
598            write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
599            read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
600            state: TxnState::Active,
601            mode,
602        }
603    }
604
605    /// Check if this is a read-only transaction
606    #[inline]
607    pub fn is_read_only(&self) -> bool {
608        self.write_set.is_empty()
609    }
610
611    /// Check if this is a single-key write transaction
612    #[inline]
613    pub fn is_single_key_write(&self) -> bool {
614        self.write_set.len() == 1 && self.read_set.len() <= 1
615    }
616}
617
618/// Transaction state
619#[derive(Debug, Clone, Copy, PartialEq, Eq)]
620pub enum TxnState {
621    Active,
622    Committed,
623    Aborted,
624}
625
626// =============================================================================
627// Transaction Mode for SSI Bypass (Recommendation 9)
628// =============================================================================
629
630/// Transaction mode for SSI optimization
631///
632/// By classifying transactions at begin time, we can skip expensive SSI
633/// tracking for the majority of transactions:
634///
635/// | Mode      | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
636/// |-----------|-------------------|--------------------|-----------------|
637/// | ReadOnly  | None             | None               | ~10 ns          |
638/// | WriteOnly | None             | Full               | ~30 ns          |
639/// | ReadWrite | Full             | Full               | ~50 ns          |
640///
641/// ## Performance Analysis
642///
643/// For read-only transactions (typically 90% of workload):
644/// ```text
645/// Current:  T_txn = T_begin + N × (T_read + T_record) + T_commit
646///                 = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
647///
648/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
649///                 = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
650///
651/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
652/// ```
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
654pub enum TransactionMode {
655    /// Read-only transaction - skips ALL SSI tracking
656    /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
657    /// Safe to skip read_set, read_bloom, and commit validation entirely
658    ReadOnly,
659
660    /// Write-only transaction - skips read tracking
661    /// Cannot form incoming rw-edges (no reads from concurrent writers)
662    /// Only needs write_set and write_bloom tracking
663    WriteOnly,
664
665    /// Full read-write transaction (default) - complete SSI tracking
666    /// May form both incoming and outgoing rw-edges
667    /// Requires full validation at commit time
668    #[default]
669    ReadWrite,
670}
671
672impl TransactionMode {
673    /// Check if this mode requires read tracking
674    #[inline]
675    pub fn tracks_reads(&self) -> bool {
676        matches!(self, TransactionMode::ReadWrite)
677    }
678
679    /// Check if this mode requires write tracking
680    #[inline]
681    pub fn tracks_writes(&self) -> bool {
682        matches!(
683            self,
684            TransactionMode::WriteOnly | TransactionMode::ReadWrite
685        )
686    }
687
688    /// Check if commit needs SSI validation
689    #[inline]
690    pub fn needs_ssi_validation(&self) -> bool {
691        matches!(self, TransactionMode::ReadWrite)
692    }
693}
694
695/// SSI conflict edge type
696#[derive(Debug, Clone, Copy, PartialEq, Eq)]
697pub enum ConflictType {
698    /// Read-write conflict: T1 reads X, then T2 writes X
699    ReadWrite,
700    /// Write-read conflict: T1 writes X, then T2 reads X  
701    WriteRead,
702}
703
704/// SSI conflict edge for dangerous structure detection
705#[derive(Debug, Clone)]
706pub struct ConflictEdge {
707    /// Source transaction
708    pub from_txn: u64,
709    /// Target transaction
710    pub to_txn: u64,
711    /// Type of conflict
712    pub conflict_type: ConflictType,
713}
714
715/// MVCC Manager with SSI support
716///
717/// Uses DashMap for lock-free per-transaction access.
718/// Implements Serializable Snapshot Isolation (SSI) with
719/// dangerous structure detection for rw-antidependency cycles.
720#[allow(clippy::type_complexity)]
721pub struct MvccManager {
722    /// Active transactions (sharded for concurrent access)
723    active_txns: DashMap<u64, MvccTransaction>,
724    /// Current timestamp counter
725    ts_counter: AtomicU64,
726    /// Minimum active snapshot timestamp (for GC)
727    min_active_ts: AtomicU64,
728    /// Recently committed transactions for SSI validation
729    /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
730    /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
731    recent_commits: DashMap<
732        u64,
733        (
734            u64,
735            SsiBloomFilter,
736            SsiBloomFilter,
737            HashSet<InlineKey>,
738            HashSet<InlineKey>,
739        ),
740    >,
741    /// Max recent commits to track
742    max_recent_commits: usize,
743}
744
745impl Default for MvccManager {
746    fn default() -> Self {
747        Self::new()
748    }
749}
750
751impl MvccManager {
752    pub fn new() -> Self {
753        Self {
754            active_txns: DashMap::new(),
755            ts_counter: AtomicU64::new(1),
756            min_active_ts: AtomicU64::new(0),
757            recent_commits: DashMap::new(),
758            max_recent_commits: 1000, // Track last 1000 commits for SSI
759        }
760    }
761
762    /// Begin a new transaction with snapshot isolation
763    ///
764    /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
765    pub fn begin(&self, txn_id: u64) -> MvccTransaction {
766        self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
767    }
768
769    /// Begin a read-only transaction (SSI bypass - 2.6x faster)
770    ///
771    /// Read-only transactions skip all SSI tracking, reducing
772    /// per-read overhead from ~82ns to ~32ns.
773    ///
774    /// ## Safety
775    ///
776    /// Caller must ensure no writes are performed. Attempting to
777    /// write in a read-only transaction will still succeed but
778    /// won't be tracked for SSI validation.
779    #[inline]
780    pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
781        self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
782    }
783
784    /// Begin a write-only transaction (partial SSI bypass)
785    ///
786    /// Write-only transactions skip read tracking, reducing overhead
787    /// for insert-heavy workloads.
788    #[inline]
789    pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
790        self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
791    }
792
793    /// Begin a transaction with specific mode
794    ///
795    /// This is the core transaction creation method that all other
796    /// begin_* methods delegate to.
797    pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
798        let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
799
800        // Create transaction with mode-optimized allocations
801        let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
802
803        self.active_txns.insert(txn_id, txn.clone());
804        self.update_min_active_ts();
805
806        txn
807    }
808
809    /// Get transaction if active (clones - use get_snapshot_ts for hot path)
810    pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
811        self.active_txns.get(&txn_id).map(|t| t.clone())
812    }
813
814    /// Fast path: get just the snapshot timestamp without cloning
815    /// This is the hot path for reads - avoids cloning bloom filters
816    #[inline]
817    pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
818        self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
819    }
820
821    /// Record a read (for SSI) - uses inline key storage + bloom filter
822    ///
823    /// ## SSI Bypass (Recommendation 9)
824    ///
825    /// For ReadOnly mode transactions, this is a no-op (instant return).
826    /// For WriteOnly mode transactions, this is a no-op.
827    /// Only ReadWrite mode transactions track reads for SSI.
828    ///
829    /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
830    #[inline]
831    pub fn record_read(&self, txn_id: u64, key: &[u8]) {
832        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
833            // SSI Bypass: Skip tracking for read-only and write-only modes
834            if !txn.mode.tracks_reads() {
835                return;
836            }
837
838            // Only track reads if within reasonable bounds
839            if txn.read_set.len() < 10000 {
840                txn.read_set.insert(SmallVec::from_slice(key));
841                txn.read_bloom.insert(key);
842            }
843        }
844    }
845
846    /// Record a write - uses inline key storage + bloom filter
847    ///
848    /// Note: Even ReadOnly transactions can record writes (mode is a hint).
849    /// The mode only affects SSI tracking, not write capability.
850    pub fn record_write(&self, txn_id: u64, key: &[u8]) {
851        if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
852            txn.write_set.insert(SmallVec::from_slice(key));
853            txn.write_bloom.insert(key);
854        }
855    }
856
857    /// Allocate commit timestamp
858    pub fn alloc_commit_ts(&self) -> u64 {
859        self.ts_counter.fetch_add(1, Ordering::SeqCst)
860    }
861
862    /// Commit transaction with SSI validation
863    /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
864    /// Returns None if SSI validation fails (dangerous structure detected)
865    ///
866    /// ## SSI Bypass (Recommendation 9)
867    ///
868    /// For ReadOnly mode: Skip validation entirely (~10ns commit)
869    /// For WriteOnly mode: Skip read-based validation (~30ns commit)
870    /// For ReadWrite mode: Full validation (~50ns commit)
871    pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
872        // Get transaction before removing
873        let txn = self.active_txns.get(&txn_id)?.clone();
874
875        // SSI Bypass: Skip validation for ReadOnly transactions
876        // ReadOnly can never form rw-antidependency cycles
877        if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
878            // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
879            // For ReadWrite: check SSI validation result
880            if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
881                // Abort on SSI violation
882                self.active_txns.remove(&txn_id);
883                self.update_min_active_ts();
884                return None;
885            }
886        }
887
888        let commit_ts = self.alloc_commit_ts();
889
890        // Extract write_set and remove transaction - takes ownership
891        let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
892
893        // OPTIMIZATION: Only track ReadWrite transactions for SSI
894        // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
895        let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
896            && !removed_txn.read_set.is_empty()
897            && !removed_txn.write_set.is_empty();
898
899        if needs_ssi_tracking {
900            // Need to clone write_set since we return it AND track it
901            let write_set_for_return = removed_txn.write_set.clone();
902
903            self.track_commit_owned(
904                txn_id,
905                commit_ts,
906                removed_txn.read_bloom,
907                removed_txn.write_bloom,
908                removed_txn.read_set,
909                removed_txn.write_set,
910            );
911
912            self.update_min_active_ts();
913            Some((commit_ts, write_set_for_return))
914        } else {
915            // Fast path: no SSI tracking needed, avoid clone entirely
916            self.update_min_active_ts();
917            Some((commit_ts, removed_txn.write_set))
918        }
919    }
920
921    /// Validate SSI constraints for a committing transaction
922    ///
923    /// ## Transaction Classification (Task 3: Optimistic MVCC)
924    ///
925    /// Transactions are classified and routed through appropriate fast paths:
926    ///
927    /// | Class      | Criteria                      | Validation Cost |
928    /// |------------|-------------------------------|-----------------|
929    /// | ReadOnly   | write_set.is_empty()          | 0 ns           |
930    /// | SingleKey  | write_set.len() == 1          | 0 ns           |
931    /// | Disjoint   | bloom filters don't intersect | ~10 ns         |
932    /// | General    | full SSI check                | ~50 ns         |
933    ///
934    /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
935    /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
936    ///
937    /// Detects "dangerous structures" - rw-antidependency cycles:
938    /// - T1 reads X (snapshot sees old value)
939    /// - T2 writes X (concurrent write)  
940    /// - T2 reads Y (snapshot sees old value)
941    /// - T1 writes Y (concurrent write)
942    ///
943    /// If T1 → rw → T2 → rw → T1 exists, abort T1
944    #[inline]
945    fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
946        // =================================================================
947        // Fast Path 1: Read-only transactions (0 ns)
948        // =================================================================
949        // Read-only transactions can never form rw-antidependency cycles
950        // because they have no writes to create outgoing rw-edges
951        if txn.write_set.is_empty() {
952            return true;
953        }
954
955        // =================================================================
956        // Fast Path 2: No recent commits to check (0 ns)
957        // =================================================================
958        if self.recent_commits.is_empty() {
959            return true;
960        }
961
962        // =================================================================
963        // Fast Path 3: Single-key write transactions (0 ns)
964        // =================================================================
965        // A single-key write transaction cannot form a dangerous cycle:
966        // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
967        //   AND T2 to read what T1 wrote
968        // - With only one key in write_set, the same key would need to be
969        //   in both read_set AND write_set of both transactions
970        // - This is already prevented by our conflict detection (write-write)
971        if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
972            return true;
973        }
974
975        let my_snapshot = txn.snapshot_ts;
976
977        // =================================================================
978        // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
979        // =================================================================
980        // Pre-filter using bloom filters: if our write_bloom doesn't intersect
981        // with any concurrent transaction's read_bloom AND vice versa,
982        // there can be no rw-antidependency
983        let mut any_may_intersect = false;
984        for entry in self.recent_commits.iter() {
985            let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
986
987            // Only check concurrent transactions.
988            //
989            // A committed transaction is *concurrent* with us iff its writes are
990            // invisible to our snapshot. Read visibility is strict
991            // (`commit_ts < snapshot_ts`), so a transaction with
992            // `commit_ts >= my_snapshot` is invisible and must be validated
993            // against. Using `<` here (not `<=`) keeps this window consistent
994            // with `read_at`; a `<=` would skip a boundary transaction whose
995            // `commit_ts == my_snapshot` and miss a genuine write-skew.
996            if *other_commit_ts < my_snapshot {
997                continue;
998            }
999
1000            // Check bloom filter intersection (O(m/64) per filter)
1001            // If our writes may intersect their reads OR their writes may intersect our reads
1002            if txn.write_bloom.may_intersect(other_read_bloom)
1003                || other_write_bloom.may_intersect(&txn.read_bloom)
1004            {
1005                any_may_intersect = true;
1006                break;
1007            }
1008        }
1009
1010        // No bloom intersection means definitely disjoint - no SSI conflict possible
1011        if !any_may_intersect {
1012            return true;
1013        }
1014
1015        // =================================================================
1016        // Full SSI Validation (~50 ns)
1017        // =================================================================
1018        // Check for rw-conflicts with recently committed transactions
1019        // An rw-conflict exists if:
1020        // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
1021        // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
1022
1023        let mut in_conflict_with: Vec<u64> = Vec::new();
1024        let mut out_conflict_to: Vec<u64> = Vec::new();
1025
1026        for entry in self.recent_commits.iter() {
1027            let (
1028                other_txn_id,
1029                (
1030                    other_commit_ts,
1031                    _other_read_bloom,
1032                    other_write_bloom,
1033                    other_read_set,
1034                    other_write_set,
1035                ),
1036            ) = entry.pair();
1037
1038            // Only consider transactions concurrent with us: those whose writes
1039            // are invisible to our snapshot. Read visibility is strict
1040            // (`commit_ts < snapshot_ts`), so `commit_ts >= my_snapshot` means
1041            // concurrent. `<` (not `<=`) keeps this consistent with `read_at`.
1042            if *other_commit_ts < my_snapshot {
1043                continue;
1044            }
1045
1046            // Check: other wrote → we read (other →rw→ me)
1047            // T_other wrote a key that T_me read (rw-dependency inbound)
1048            //
1049            // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
1050            // Only do expensive HashSet intersection if bloom says "maybe conflict"
1051            let mut has_in_conflict = false;
1052            for key in txn.read_set.iter() {
1053                if other_write_bloom.may_contain(key) {
1054                    // Bloom says maybe - do exact check
1055                    if other_write_set.contains(key) {
1056                        has_in_conflict = true;
1057                        break;
1058                    }
1059                }
1060            }
1061            if has_in_conflict {
1062                in_conflict_with.push(*other_txn_id);
1063            }
1064
1065            // Check: we wrote → other read (me →rw→ other)
1066            // T_me wrote a key that T_other read (rw-dependency outbound)
1067            //
1068            // Bloom-accelerated: Use our write_bloom against their read_set
1069            let mut has_out_conflict = false;
1070            for key in other_read_set.iter() {
1071                if txn.write_bloom.may_contain(key) {
1072                    // Bloom says maybe - do exact check
1073                    if txn.write_set.contains(key) {
1074                        has_out_conflict = true;
1075                        break;
1076                    }
1077                }
1078            }
1079            if has_out_conflict {
1080                out_conflict_to.push(*other_txn_id);
1081            }
1082        }
1083
1084        // Dangerous structure: we have both incoming AND outgoing rw-edges
1085        // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1086        //
1087        // Conservative check: if both exist, abort
1088        // A more precise check would verify the cycle path, but this is safe
1089        if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1090            return false; // SSI violation - abort
1091        }
1092
1093        true
1094    }
1095
1096    /// Track a committed transaction for future SSI validation
1097    ///
1098    /// Only tracks transactions that have both reads AND writes, since SSI
1099    /// only detects rw-antidependency cycles. Pure read or pure write
1100    /// transactions can't form cycles.
1101    ///
1102    /// ## Optimization: Zero-Copy Transfer
1103    ///
1104    /// Takes ownership of sets instead of cloning to avoid the +15% commit
1105    /// phase regression. The caller should use mem::take() to transfer ownership.
1106    fn track_commit_owned(
1107        &self,
1108        txn_id: u64,
1109        commit_ts: u64,
1110        read_bloom: SsiBloomFilter,
1111        write_bloom: SsiBloomFilter,
1112        read_set: HashSet<InlineKey>,
1113        write_set: HashSet<InlineKey>,
1114    ) {
1115        // Optimization: Only track mixed read-write transactions
1116        // Pure reads can't create outgoing rw-edges
1117        // Pure writes can't create incoming rw-edges
1118        if read_set.is_empty() || write_set.is_empty() {
1119            return; // Skip tracking - can't form SSI cycle
1120        }
1121
1122        // Add to recent commits with bloom filters for fast SSI pre-filtering
1123        // No cloning needed - we take ownership
1124        self.recent_commits.insert(
1125            txn_id,
1126            (commit_ts, read_bloom, write_bloom, read_set, write_set),
1127        );
1128
1129        // Lazy pruning: only prune when we're significantly over capacity
1130        // Avoids pruning overhead on every commit
1131        if self.recent_commits.len() > self.max_recent_commits * 2 {
1132            // Remove entries with lowest commit_ts
1133            let min_active = self.min_active_ts.load(Ordering::Relaxed);
1134            self.recent_commits
1135                .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1136        }
1137    }
1138
1139    /// Legacy track_commit that clones - kept for compatibility
1140    #[allow(dead_code)]
1141    fn track_commit(
1142        &self,
1143        txn_id: u64,
1144        commit_ts: u64,
1145        read_bloom: SsiBloomFilter,
1146        write_bloom: SsiBloomFilter,
1147        read_set: &HashSet<InlineKey>,
1148        write_set: &HashSet<InlineKey>,
1149    ) {
1150        if read_set.is_empty() || write_set.is_empty() {
1151            return;
1152        }
1153        self.recent_commits.insert(
1154            txn_id,
1155            (
1156                commit_ts,
1157                read_bloom,
1158                write_bloom,
1159                read_set.clone(),
1160                write_set.clone(),
1161            ),
1162        );
1163    }
1164
1165    /// Abort transaction
1166    pub fn abort(&self, txn_id: u64) {
1167        self.active_txns.remove(&txn_id);
1168        self.update_min_active_ts();
1169    }
1170
1171    /// Get minimum active snapshot timestamp
1172    pub fn min_active_snapshot(&self) -> u64 {
1173        self.min_active_ts.load(Ordering::SeqCst)
1174    }
1175
1176    /// Get count of active transactions
1177    pub fn active_transaction_count(&self) -> usize {
1178        self.active_txns.len()
1179    }
1180
1181    fn update_min_active_ts(&self) {
1182        let min = self
1183            .active_txns
1184            .iter()
1185            .map(|entry| entry.value().snapshot_ts)
1186            .min()
1187            .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1188        self.min_active_ts.store(min, Ordering::SeqCst);
1189    }
1190}
1191
1192/// Epoch-based dirty list for O(expired) GC instead of O(n)
1193///
1194/// Instead of scanning ALL version chains, we track which keys have versions
1195/// created in each epoch. GC only needs to visit keys from old epochs.
1196struct EpochDirtyList {
1197    /// Ring buffer of epoch -> dirty keys
1198    /// Index = epoch % EPOCH_RING_SIZE
1199    epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1200    /// Current epoch
1201    current_epoch: AtomicU64,
1202}
1203
1204const EPOCH_RING_SIZE: usize = 4;
1205
1206impl EpochDirtyList {
1207    fn new() -> Self {
1208        Self {
1209            epochs: [
1210                parking_lot::Mutex::new(Vec::new()),
1211                parking_lot::Mutex::new(Vec::new()),
1212                parking_lot::Mutex::new(Vec::new()),
1213                parking_lot::Mutex::new(Vec::new()),
1214            ],
1215            current_epoch: AtomicU64::new(0),
1216        }
1217    }
1218
1219    /// Record a version created in the current epoch
1220    #[inline]
1221    fn record_version(&self, key: Vec<u8>) {
1222        let epoch = self.current_epoch.load(Ordering::Relaxed);
1223        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1224        self.epochs[idx].lock().push(key);
1225    }
1226
1227    /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1228    ///
1229    /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1230    /// For 100 writes: ~100x fewer mutex operations.
1231    #[inline]
1232    fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1233        let epoch = self.current_epoch.load(Ordering::Relaxed);
1234        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1235        let mut guard = self.epochs[idx].lock();
1236        guard.extend(keys);
1237    }
1238
1239    /// Advance to next epoch, returning old epoch's dirty keys
1240    fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1241        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1242        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1243
1244        // Drain the old epoch's dirty list
1245        let mut guard = self.epochs[old_idx].lock();
1246        let keys = std::mem::take(&mut *guard);
1247        (old_epoch, keys)
1248    }
1249
1250    /// Get current epoch
1251    #[allow(dead_code)]
1252    fn current(&self) -> u64 {
1253        self.current_epoch.load(Ordering::Relaxed)
1254    }
1255}
1256
1257// ============================================================================
1258// Streaming Scan Iterator
1259// ============================================================================
1260
1261/// Streaming iterator for range scans
1262///
1263/// Yields results one at a time without materializing the full result set.
1264/// This enables processing of very large result sets with O(1) memory per
1265/// iteration instead of O(N) for the entire result set.
1266struct ScanRangeIterator<'a> {
1267    memtable: &'a MvccMemTable,
1268    start: Vec<u8>,
1269    end: Vec<u8>,
1270    snapshot_ts: u64,
1271    current_txn_id: Option<u64>,
1272    use_ordered: bool,
1273    // We use Option to defer initialization
1274    ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1275    unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1276    initialized: bool,
1277}
1278
1279impl<'a> Iterator for ScanRangeIterator<'a> {
1280    type Item = (Vec<u8>, Vec<u8>);
1281
1282    fn next(&mut self) -> Option<Self::Item> {
1283        // Lazy initialization on first call
1284        if !self.initialized {
1285            self.initialized = true;
1286
1287            if self.use_ordered {
1288                // Try deferred index first (after compaction, it uses a SkipMap internally)
1289                if let Some(ref def_idx) = self.memtable.deferred_index {
1290                    let start = self.start.clone();
1291                    let end = self.end.clone();
1292                    let snapshot_ts = self.snapshot_ts;
1293                    let current_txn_id = self.current_txn_id;
1294                    let data = &self.memtable.data;
1295
1296                    // Collect keys from deferred index (already sorted after compact)
1297                    let keys: Vec<Vec<u8>> = if end.is_empty() {
1298                        def_idx.range_from(&start).collect()
1299                    } else {
1300                        def_idx.range(&start, &end).collect()
1301                    };
1302
1303                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1304                        Box::new(keys.into_iter().filter_map(move |key| {
1305                            if let Some(chain) = data.get(&key)
1306                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1307                                && let Some(value) = &v.value
1308                            {
1309                                Some((key, value.clone()))
1310                            } else {
1311                                None
1312                            }
1313                        }));
1314                    self.ordered_iter = Some(iter);
1315                } else if let Some(ref idx) = self.memtable.ordered_index {
1316                    let start = self.start.clone();
1317                    let end = self.end.clone();
1318                    let snapshot_ts = self.snapshot_ts;
1319                    let current_txn_id = self.current_txn_id;
1320                    let data = &self.memtable.data;
1321
1322                    let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty()
1323                    {
1324                        Box::new(idx.range(start..).filter_map(move |entry| {
1325                            let key = entry.key();
1326                            if let Some(chain) = data.get(key)
1327                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1328                                && let Some(value) = &v.value
1329                            {
1330                                Some((key.clone(), value.clone()))
1331                            } else {
1332                                None
1333                            }
1334                        }))
1335                    } else {
1336                        Box::new(idx.range(start..end).filter_map(move |entry| {
1337                            let key = entry.key();
1338                            if let Some(chain) = data.get(key)
1339                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1340                                && let Some(value) = &v.value
1341                            {
1342                                Some((key.clone(), value.clone()))
1343                            } else {
1344                                None
1345                            }
1346                        }))
1347                    };
1348                    self.ordered_iter = Some(iter);
1349                }
1350            } else {
1351                // Unordered full scan
1352                let start = self.start.clone();
1353                let end = self.end.clone();
1354                let snapshot_ts = self.snapshot_ts;
1355                let current_txn_id = self.current_txn_id;
1356
1357                let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1358                    Box::new(self.memtable.data.iter().filter_map(move |entry| {
1359                        let key = entry.key();
1360
1361                        if key.as_slice() < start.as_slice() {
1362                            return None;
1363                        }
1364                        if !end.is_empty() && key.as_slice() >= end.as_slice() {
1365                            return None;
1366                        }
1367
1368                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1369                            && let Some(value) = &v.value
1370                        {
1371                            Some((key.clone(), value.clone()))
1372                        } else {
1373                            None
1374                        }
1375                    }));
1376                self.unordered_iter = Some(iter);
1377            }
1378        }
1379
1380        // Get next from appropriate iterator
1381        if let Some(ref mut iter) = self.ordered_iter {
1382            iter.next()
1383        } else if let Some(ref mut iter) = self.unordered_iter {
1384            iter.next()
1385        } else {
1386            None
1387        }
1388    }
1389}
1390
1391/// MemTable with MVCC support
1392///
1393/// Uses DashMap for lock-free concurrent access per key.
1394/// This eliminates the global write lock bottleneck.
1395///
1396/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1397/// Maintains a deferred sorted index for efficient scans:
1398/// - Writes: O(1) append to hot buffer
1399/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1400pub struct MvccMemTable {
1401    /// Key -> VersionChain (sharded for concurrent access)
1402    data: DashMap<Vec<u8>, VersionChain>,
1403    /// Deferred sorted index for efficient prefix/range scans (optional)
1404    /// O(1) insert to hot buffer, O(N log N) sort on first scan
1405    /// When None, scan_prefix will fall back to O(N) DashMap iteration
1406    deferred_index: Option<DeferredSortedIndex>,
1407    /// Legacy SkipMap for compatibility (used when deferred=false)
1408    ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1409    /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1410    #[allow(dead_code)]
1411    use_deferred: bool,
1412    /// Approximate size in bytes
1413    size_bytes: AtomicU64,
1414    /// Epoch-based dirty list for efficient GC
1415    dirty_list: EpochDirtyList,
1416}
1417
1418impl Default for MvccMemTable {
1419    fn default() -> Self {
1420        Self::new()
1421    }
1422}
1423
1424impl MvccMemTable {
1425    pub fn new() -> Self {
1426        Self::with_ordered_index(true)
1427    }
1428
1429    /// Create memtable with optional ordered index
1430    ///
1431    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1432    /// but scan_prefix becomes O(N) instead of O(log N + K)
1433    ///
1434    /// Uses deferred sorting by default for better write performance:
1435    /// - Writes: O(1) append to hot buffer
1436    /// - Scans: O(N log N) sort-on-demand
1437    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1438        Self::with_index_mode(enable_ordered_index, true)
1439    }
1440
1441    /// Create memtable with fine-grained control over indexing
1442    ///
1443    /// # Arguments
1444    /// * `enable_ordered_index` - Whether to maintain an ordered index
1445    /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1446    ///                    If false, use SkipMap (O(log N) writes)
1447    pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1448        Self {
1449            data: DashMap::new(),
1450            deferred_index: if enable_ordered_index && use_deferred {
1451                Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1452                    max_unsorted_entries: 10_000, // Compact every 10K writes
1453                    enabled: true,
1454                }))
1455            } else {
1456                None
1457            },
1458            ordered_index: if enable_ordered_index && !use_deferred {
1459                Some(SkipMap::new())
1460            } else {
1461                None
1462            },
1463            use_deferred,
1464            size_bytes: AtomicU64::new(0),
1465            dirty_list: EpochDirtyList::new(),
1466        }
1467    }
1468
1469    /// Write a key-value pair (uncommitted)
1470    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1471        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1472        let key_len = key.len();
1473
1474        // Track this key in the current epoch's dirty list for GC
1475        self.dirty_list.record_version(key.clone());
1476
1477        // Insert into ordered index for prefix scans (if enabled)
1478        // Deferred: O(1) append to hot buffer
1479        // SkipMap: O(log N) insert
1480        if let Some(ref idx) = self.deferred_index {
1481            idx.insert(key.clone());
1482        } else if let Some(ref idx) = self.ordered_index {
1483            idx.insert(key.clone(), ());
1484        }
1485
1486        // Use entry API for atomic get-or-insert
1487        let mut entry = self.data.entry(key).or_default();
1488
1489        // Check for write-write conflict
1490        if entry.has_write_conflict(txn_id) {
1491            return Err(SochDBError::Internal(
1492                "Write-write conflict detected".into(),
1493            ));
1494        }
1495        entry.add_uncommitted(value, txn_id);
1496        self.size_bytes
1497            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1498
1499        Ok(())
1500    }
1501
1502    /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1503    ///
1504    /// Optimizations applied (Rec 3: MVCC Batching):
1505    /// - Batched dirty list tracking: single lock acquire for all keys
1506    /// - Deferred index: O(1) append per key
1507    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1508        let mut total_size = 0u64;
1509
1510        // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1511        self.dirty_list
1512            .record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1513
1514        for (key, value) in writes {
1515            // Insert into ordered index (if enabled)
1516            // Deferred: O(1) append, SkipMap: O(log N)
1517            if let Some(ref idx) = self.deferred_index {
1518                idx.insert(key.clone());
1519            } else if let Some(ref idx) = self.ordered_index {
1520                idx.insert(key.clone(), ());
1521            }
1522
1523            let mut entry = self.data.entry(key.clone()).or_default();
1524
1525            if entry.has_write_conflict(txn_id) {
1526                return Err(SochDBError::Internal(
1527                    "Write-write conflict detected".into(),
1528                ));
1529            }
1530
1531            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1532            entry.add_uncommitted(value.clone(), txn_id);
1533            total_size += (key.len() + value_size) as u64;
1534        }
1535
1536        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1537        Ok(())
1538    }
1539
1540    /// Read at snapshot timestamp, with optional current txn to see own writes
1541    pub fn read(
1542        &self,
1543        key: &[u8],
1544        snapshot_ts: u64,
1545        current_txn_id: Option<u64>,
1546    ) -> Option<Vec<u8>> {
1547        self.data.get(key).and_then(|chain| {
1548            chain
1549                .read_at(snapshot_ts, current_txn_id)
1550                .and_then(|v| v.value.clone())
1551        })
1552    }
1553
1554    /// Commit all versions for a transaction
1555    ///
1556    /// Only updates the keys that were written by this transaction (tracked in write_set).
1557    /// Accepts InlineKey for zero-allocation MVCC tracking.
1558    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1559        // Only iterate over keys we know were written - O(k) instead of O(n)
1560        for key in write_set {
1561            if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1562                chain.commit(txn_id, commit_ts);
1563            }
1564        }
1565    }
1566
1567    /// Legacy commit method (iterates all keys) - kept for backward compatibility
1568    #[allow(dead_code)]
1569    pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1570        for mut entry in self.data.iter_mut() {
1571            entry.value_mut().commit(txn_id, commit_ts);
1572        }
1573    }
1574
1575    /// Abort all versions for a transaction
1576    pub fn abort(&self, txn_id: u64) {
1577        for mut entry in self.data.iter_mut() {
1578            entry.value_mut().abort(txn_id);
1579        }
1580    }
1581
1582    /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1583    ///
1584    /// ## Performance
1585    ///
1586    /// When ordered_index is enabled: O(log N + K) complexity
1587    /// - O(log N) to seek to the first key with prefix
1588    /// - O(K) to iterate matching keys
1589    ///
1590    /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1591    ///
1592    /// ## Optimizations Applied
1593    ///
1594    /// - Pre-allocates result vector based on expected output size
1595    /// - Uses batch-friendly iteration patterns
1596    /// - Minimizes allocations during iteration
1597    /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1598    pub fn scan_prefix(
1599        &self,
1600        prefix: &[u8],
1601        snapshot_ts: u64,
1602        current_txn_id: Option<u64>,
1603    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1604        // Estimate result size for pre-allocation (use 10% of total as heuristic)
1605        let estimated_size = (self.data.len() / 10).max(64);
1606        let mut results = Vec::with_capacity(estimated_size);
1607
1608        if let Some(ref idx) = self.deferred_index {
1609            // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1610            for key in idx.range_from(prefix) {
1611                // Stop when we've passed the prefix range
1612                if !key.starts_with(prefix) {
1613                    break;
1614                }
1615
1616                // O(1) lookup in DashMap for version chain
1617                if let Some(chain) = self.data.get(&key)
1618                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1619                    && let Some(value) = &v.value
1620                {
1621                    results.push((key, value.clone()));
1622                }
1623            }
1624        } else if let Some(ref idx) = self.ordered_index {
1625            // Fast path: O(log N) seek to first key >= prefix
1626            for entry in idx.range(prefix.to_vec()..) {
1627                let key = entry.key();
1628
1629                // Stop when we've passed the prefix range
1630                if !key.starts_with(prefix) {
1631                    break;
1632                }
1633
1634                // O(1) lookup in DashMap for version chain
1635                if let Some(chain) = self.data.get(key)
1636                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1637                    && let Some(value) = &v.value
1638                {
1639                    results.push((key.clone(), value.clone()));
1640                }
1641            }
1642        } else {
1643            // Fallback: O(N) full DashMap scan when ordered_index is disabled
1644            // Optimized with batch-friendly iteration
1645            for entry in self.data.iter() {
1646                let key = entry.key();
1647                if !key.starts_with(prefix) {
1648                    continue;
1649                }
1650                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1651                    && let Some(value) = &v.value
1652                {
1653                    results.push((key.clone(), value.clone()));
1654                }
1655            }
1656        }
1657
1658        results
1659    }
1660
1661    /// Optimized full scan with batch allocation
1662    ///
1663    /// For use when scanning entire tables/namespaces.
1664    /// Pre-allocates based on actual data size.
1665    pub fn scan_all(
1666        &self,
1667        snapshot_ts: u64,
1668        current_txn_id: Option<u64>,
1669    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1670        let mut results = Vec::with_capacity(self.data.len());
1671
1672        for entry in self.data.iter() {
1673            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1674                && let Some(value) = &v.value
1675            {
1676                results.push((entry.key().clone(), value.clone()));
1677            }
1678        }
1679
1680        results
1681    }
1682
1683    /// Streaming scan iterator for very large datasets
1684    ///
1685    /// Returns an iterator that yields (key, value) pairs without
1686    /// materializing the entire result set in memory.
1687    pub fn scan_prefix_iter<'a>(
1688        &'a self,
1689        prefix: &'a [u8],
1690        snapshot_ts: u64,
1691        current_txn_id: Option<u64>,
1692    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1693        self.data.iter().filter_map(move |entry| {
1694            let key = entry.key();
1695            if !key.starts_with(prefix) {
1696                return None;
1697            }
1698            if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1699                && let Some(value) = &v.value
1700            {
1701                Some((key.clone(), value.clone()))
1702            } else {
1703                None
1704            }
1705        })
1706    }
1707
1708    /// Scan range
1709    pub fn scan_range(
1710        &self,
1711        start: &[u8],
1712        end: &[u8],
1713        snapshot_ts: u64,
1714        current_txn_id: Option<u64>,
1715    ) -> Vec<(Vec<u8>, Vec<u8>)> {
1716        let mut results = Vec::new();
1717
1718        if let Some(ref idx) = self.deferred_index {
1719            // Deferred index path: sort-on-scan
1720            if end.is_empty() {
1721                for key in idx.range_from(start) {
1722                    if let Some(chain) = self.data.get(&key)
1723                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1724                        && let Some(value) = &v.value
1725                    {
1726                        results.push((key, value.clone()));
1727                    }
1728                }
1729            } else {
1730                for key in idx.range(start, end) {
1731                    if let Some(chain) = self.data.get(&key)
1732                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1733                        && let Some(value) = &v.value
1734                    {
1735                        results.push((key, value.clone()));
1736                    }
1737                }
1738            }
1739        } else if let Some(ref idx) = self.ordered_index {
1740            // Use range scan on SkipMap
1741            if end.is_empty() {
1742                // Unbounded end
1743                for entry in idx.range(start.to_vec()..) {
1744                    let key = entry.key();
1745                    if let Some(chain) = self.data.get(key)
1746                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1747                        && let Some(value) = &v.value
1748                    {
1749                        results.push((key.clone(), value.clone()));
1750                    }
1751                }
1752            } else {
1753                for entry in idx.range(start.to_vec()..end.to_vec()) {
1754                    let key = entry.key();
1755                    if let Some(chain) = self.data.get(key)
1756                        && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1757                        && let Some(value) = &v.value
1758                    {
1759                        results.push((key.clone(), value.clone()));
1760                    }
1761                }
1762            }
1763        } else {
1764            // Fallback to full scan if no ordered index
1765            for entry in self.data.iter() {
1766                let key = entry.key();
1767
1768                if key.as_slice() < start {
1769                    continue;
1770                }
1771                if !end.is_empty() && key.as_slice() >= end {
1772                    continue;
1773                }
1774
1775                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1776                    && let Some(value) = &v.value
1777                {
1778                    results.push((key.clone(), value.clone()));
1779                }
1780            }
1781        }
1782
1783        results
1784    }
1785
1786    /// Streaming range scan iterator for very large datasets
1787    ///
1788    /// Returns an iterator that yields (key, value) pairs without
1789    /// materializing the entire result set in memory. Uses the ordered
1790    /// index when available for O(log N + K) complexity.
1791    ///
1792    /// ## Zero-Allocation Design
1793    ///
1794    /// While the iterator itself cannot avoid allocations for returned
1795    /// values (since the caller needs ownership), it avoids:
1796    /// - Pre-materializing all results
1797    /// - Intermediate buffers
1798    /// - Repeated key comparisons for already-visited entries
1799    ///
1800    /// ## Usage
1801    ///
1802    /// ```ignore
1803    /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1804    ///     // Process each result as it arrives
1805    ///     // Memory usage is O(1) per iteration, not O(N) total
1806    /// }
1807    /// ```
1808    pub fn scan_range_iter<'a>(
1809        &'a self,
1810        start: &'a [u8],
1811        end: &'a [u8],
1812        snapshot_ts: u64,
1813        current_txn_id: Option<u64>,
1814    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1815        // Compact deferred index before scanning if needed
1816        if let Some(ref idx) = self.deferred_index {
1817            idx.compact();
1818        }
1819
1820        // Use either ordered index or full scan
1821        let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1822
1823        // Create iterator based on availability of ordered index
1824        ScanRangeIterator {
1825            memtable: self,
1826            start: start.to_vec(),
1827            end: end.to_vec(),
1828            snapshot_ts,
1829            current_txn_id,
1830            use_ordered,
1831            ordered_iter: None,
1832            unordered_iter: None,
1833            initialized: false,
1834        }
1835    }
1836
1837    /// Get approximate size
1838    pub fn size(&self) -> u64 {
1839        self.size_bytes.load(Ordering::Relaxed)
1840    }
1841
1842    /// Garbage collect old versions using epoch-based dirty list
1843    ///
1844    /// O(expired_versions) instead of O(all_versions)
1845    /// Only visits keys that had versions created in the old epoch.
1846    pub fn gc(&self, min_active_ts: u64) -> usize {
1847        // Advance epoch and get the dirty keys from the old epoch
1848        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1849
1850        if dirty_keys.is_empty() {
1851            return 0;
1852        }
1853
1854        let mut gc_count = 0;
1855
1856        // Only visit keys that were modified in the old epoch
1857        // Use a HashSet to deduplicate keys that were written multiple times
1858        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1859
1860        for key in unique_keys {
1861            if let Some(mut entry) = self.data.get_mut(&key) {
1862                let before = entry.value().version_count();
1863                entry.value_mut().gc(min_active_ts);
1864                gc_count += before.saturating_sub(entry.value().version_count());
1865            }
1866        }
1867
1868        gc_count
1869    }
1870
1871    /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1872    #[allow(dead_code)]
1873    pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1874        let mut gc_count = 0;
1875
1876        for mut entry in self.data.iter_mut() {
1877            let before = entry.value().version_count();
1878            entry.value_mut().gc(min_active_ts);
1879            gc_count += before.saturating_sub(entry.value().version_count());
1880        }
1881
1882        gc_count
1883    }
1884}
1885
1886// =============================================================================
1887// Rec 11: Unified MvccStore Implementation for MvccMemTable
1888// =============================================================================
1889
1890impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1891    fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1892        self.read(key, snapshot_ts, txn_id)
1893    }
1894
1895    fn mvcc_put(
1896        &self,
1897        key: &[u8],
1898        value: Option<Vec<u8>>,
1899        txn_id: u64,
1900    ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1901        let mut entry = self.data.entry(key.to_vec()).or_default();
1902        if entry.has_write_conflict(txn_id) {
1903            return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1904        }
1905        entry.add_uncommitted(value, txn_id);
1906        Ok(())
1907    }
1908
1909    fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1910        if let Some(mut chain) = self.data.get_mut(key) {
1911            return chain.commit(txn_id, commit_ts);
1912        }
1913        false
1914    }
1915
1916    fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1917        if let Some(mut chain) = self.data.get_mut(key) {
1918            chain.abort(txn_id);
1919        }
1920    }
1921
1922    fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1923        self.data
1924            .get(key)
1925            .map(|chain| chain.has_write_conflict(txn_id))
1926            .unwrap_or(false)
1927    }
1928
1929    fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1930        let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1931        for mut entry in self.data.iter_mut() {
1932            stats.keys_scanned += 1;
1933            let before = entry.value().version_count();
1934            entry.value_mut().gc(min_ts);
1935            stats.versions_removed += before.saturating_sub(entry.value().version_count());
1936        }
1937        stats
1938    }
1939
1940    fn mvcc_key_count(&self) -> usize {
1941        self.data.len()
1942    }
1943}
1944
1945// ============================================================================
1946// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1947// ============================================================================
1948
1949use crate::key_buffer::ArenaKeyHandle;
1950
1951/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1952struct ArenaEpochDirtyList {
1953    epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1954    current_epoch: AtomicU64,
1955}
1956
1957impl ArenaEpochDirtyList {
1958    fn new() -> Self {
1959        Self {
1960            epochs: [
1961                parking_lot::Mutex::new(Vec::new()),
1962                parking_lot::Mutex::new(Vec::new()),
1963                parking_lot::Mutex::new(Vec::new()),
1964                parking_lot::Mutex::new(Vec::new()),
1965            ],
1966            current_epoch: AtomicU64::new(0),
1967        }
1968    }
1969
1970    #[inline]
1971    fn record_version(&self, key: ArenaKeyHandle) {
1972        let epoch = self.current_epoch.load(Ordering::Relaxed);
1973        let idx = (epoch as usize) % EPOCH_RING_SIZE;
1974        self.epochs[idx].lock().push(key);
1975    }
1976
1977    fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1978        let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1979        let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1980        let mut guard = self.epochs[old_idx].lock();
1981        let keys = std::mem::take(&mut *guard);
1982        (old_epoch, keys)
1983    }
1984}
1985
1986/// Arena-backed MVCC MemTable with optimized key storage
1987///
1988/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1989/// reducing per-write allocations from 3 to 1:
1990///
1991/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1992/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1993///
1994/// ## Performance
1995///
1996/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1997/// by reducing:
1998/// - Heap allocations: 3 → 1 per write
1999/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
2000pub struct ArenaMvccMemTable {
2001    /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
2002    data: DashMap<ArenaKeyHandle, VersionChain>,
2003    /// Ordered index for prefix scans
2004    ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
2005    /// Approximate size in bytes
2006    size_bytes: AtomicU64,
2007    /// Epoch-based dirty list (arena-backed)
2008    dirty_list: ArenaEpochDirtyList,
2009}
2010
2011impl ArenaMvccMemTable {
2012    pub fn new() -> Self {
2013        Self::with_ordered_index(true)
2014    }
2015
2016    pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
2017        Self {
2018            data: DashMap::new(),
2019            ordered_index: if enable_ordered_index {
2020                Some(SkipMap::new())
2021            } else {
2022                None
2023            },
2024            size_bytes: AtomicU64::new(0),
2025            dirty_list: ArenaEpochDirtyList::new(),
2026        }
2027    }
2028
2029    /// Write a key-value pair using arena key handle
2030    ///
2031    /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
2032    /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
2033    pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2034        let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2035        let key_len = key.len();
2036
2037        // Create ONE ArenaKeyHandle - this is the only allocation for the key
2038        let key_handle = ArenaKeyHandle::new(key);
2039
2040        // Track in dirty list (O(1) copy of 16-byte handle)
2041        self.dirty_list.record_version(key_handle.clone());
2042
2043        // Insert into ordered index (O(1) copy of 16-byte handle)
2044        if let Some(ref idx) = self.ordered_index {
2045            idx.insert(key_handle.clone(), ());
2046        }
2047
2048        // Use entry API with the handle
2049        let mut entry = self.data.entry(key_handle).or_default();
2050
2051        if entry.has_write_conflict(txn_id) {
2052            return Err(SochDBError::Internal(
2053                "Write-write conflict detected".into(),
2054            ));
2055        }
2056        entry.add_uncommitted(value, txn_id);
2057        self.size_bytes
2058            .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2059
2060        Ok(())
2061    }
2062
2063    /// Write batch using arena key handles
2064    pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2065        let mut total_size = 0u64;
2066
2067        for (key, value) in writes {
2068            let key_handle = ArenaKeyHandle::new(key);
2069
2070            self.dirty_list.record_version(key_handle.clone());
2071
2072            if let Some(ref idx) = self.ordered_index {
2073                idx.insert(key_handle.clone(), ());
2074            }
2075
2076            let mut entry = self.data.entry(key_handle).or_default();
2077
2078            if entry.has_write_conflict(txn_id) {
2079                return Err(SochDBError::Internal(
2080                    "Write-write conflict detected".into(),
2081                ));
2082            }
2083
2084            let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2085            entry.add_uncommitted(value.clone(), txn_id);
2086            total_size += (key.len() + value_size) as u64;
2087        }
2088
2089        self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2090        Ok(())
2091    }
2092
2093    /// Read at snapshot timestamp
2094    pub fn read(
2095        &self,
2096        key: &[u8],
2097        snapshot_ts: u64,
2098        current_txn_id: Option<u64>,
2099    ) -> Option<Vec<u8>> {
2100        // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2101        let key_handle = ArenaKeyHandle::new(key);
2102        self.data.get(&key_handle).and_then(|chain| {
2103            chain
2104                .read_at(snapshot_ts, current_txn_id)
2105                .and_then(|v| v.value.clone())
2106        })
2107    }
2108
2109    /// Commit transaction
2110    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2111        for key in write_set {
2112            let key_handle = ArenaKeyHandle::new(key.as_slice());
2113            if let Some(mut chain) = self.data.get_mut(&key_handle) {
2114                chain.commit(txn_id, commit_ts);
2115            }
2116        }
2117    }
2118
2119    /// Abort transaction
2120    pub fn abort(&self, txn_id: u64) {
2121        for mut entry in self.data.iter_mut() {
2122            entry.value_mut().abort(txn_id);
2123        }
2124    }
2125
2126    /// Scan prefix
2127    pub fn scan_prefix(
2128        &self,
2129        prefix: &[u8],
2130        snapshot_ts: u64,
2131        current_txn_id: Option<u64>,
2132    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2133        let mut results = Vec::new();
2134        let prefix_handle = ArenaKeyHandle::new(prefix);
2135
2136        if let Some(ref idx) = self.ordered_index {
2137            for entry in idx.range(prefix_handle..) {
2138                let key = entry.key();
2139
2140                if !key.as_bytes().starts_with(prefix) {
2141                    break;
2142                }
2143
2144                if let Some(chain) = self.data.get(key)
2145                    && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2146                    && let Some(value) = &v.value
2147                {
2148                    results.push((key.as_bytes().to_vec(), value.clone()));
2149                }
2150            }
2151        } else {
2152            for entry in self.data.iter() {
2153                let key = entry.key();
2154                if !key.as_bytes().starts_with(prefix) {
2155                    continue;
2156                }
2157                if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2158                    && let Some(value) = &v.value
2159                {
2160                    results.push((key.as_bytes().to_vec(), value.clone()));
2161                }
2162            }
2163        }
2164
2165        results
2166    }
2167
2168    /// Get approximate size
2169    pub fn size(&self) -> u64 {
2170        self.size_bytes.load(Ordering::Relaxed)
2171    }
2172
2173    /// Garbage collect old versions
2174    pub fn gc(&self, min_active_ts: u64) -> usize {
2175        let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2176
2177        if dirty_keys.is_empty() {
2178            return 0;
2179        }
2180
2181        let mut gc_count = 0;
2182        let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2183
2184        for key in unique_keys {
2185            if let Some(mut entry) = self.data.get_mut(&key) {
2186                let before = entry.value().version_count();
2187                entry.value_mut().gc(min_active_ts);
2188                gc_count += before.saturating_sub(entry.value().version_count());
2189            }
2190        }
2191
2192        gc_count
2193    }
2194}
2195
2196impl Default for ArenaMvccMemTable {
2197    fn default() -> Self {
2198        Self::new()
2199    }
2200}
2201
2202// ============================================================================
2203// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2204// ============================================================================
2205
2206/// Configuration for memtable type selection
2207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2208pub enum MemTableType {
2209    /// Standard MVCC memtable with deferred sorting
2210    /// Best for: general workloads, balanced read/write
2211    Standard,
2212    /// Arena-backed memtable with reduced allocations
2213    /// Best for: write-heavy workloads, large keys
2214    Arena,
2215}
2216
2217impl Default for MemTableType {
2218    fn default() -> Self {
2219        // Default to Standard which now has deferred sorting
2220        MemTableType::Standard
2221    }
2222}
2223
2224/// Unified memtable abstraction using enum dispatch
2225///
2226/// This pattern provides:
2227/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2228/// - Type-safe switching between implementations
2229/// - Easy extensibility for future memtable types
2230///
2231/// ## Why Enum over Trait Object?
2232///
2233/// - Hot path performance: enum match is a single branch vs vtable indirection
2234/// - Cache friendliness: no pointer chasing
2235/// - Inlining: compiler can inline through enum dispatch
2236pub enum MemTableKind {
2237    Standard(MvccMemTable),
2238    Arena(ArenaMvccMemTable),
2239}
2240
2241impl MemTableKind {
2242    /// Create a new memtable of the specified type
2243    pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2244        match kind {
2245            MemTableType::Standard => {
2246                MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2247            }
2248            MemTableType::Arena => {
2249                MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2250            }
2251        }
2252    }
2253
2254    /// Write a key-value pair
2255    #[inline]
2256    pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2257        match self {
2258            MemTableKind::Standard(m) => m.write(key, value, txn_id),
2259            MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2260        }
2261    }
2262
2263    /// Write batch of key-value pairs
2264    #[inline]
2265    pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2266        match self {
2267            MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2268            MemTableKind::Arena(m) => {
2269                // Convert to arena-compatible format
2270                let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2271                    .iter()
2272                    .map(|(k, v)| (k.as_slice(), v.clone()))
2273                    .collect();
2274                m.write_batch(&arena_writes, txn_id)
2275            }
2276        }
2277    }
2278
2279    /// Read at snapshot timestamp
2280    #[inline]
2281    pub fn read(
2282        &self,
2283        key: &[u8],
2284        snapshot_ts: u64,
2285        current_txn_id: Option<u64>,
2286    ) -> Option<Vec<u8>> {
2287        match self {
2288            MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2289            MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2290        }
2291    }
2292
2293    /// Commit transaction
2294    #[inline]
2295    pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2296        match self {
2297            MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2298            MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2299        }
2300    }
2301
2302    /// Abort transaction
2303    #[inline]
2304    pub fn abort(&self, txn_id: u64) {
2305        match self {
2306            MemTableKind::Standard(m) => m.abort(txn_id),
2307            MemTableKind::Arena(m) => m.abort(txn_id),
2308        }
2309    }
2310
2311    /// Scan prefix
2312    #[inline]
2313    pub fn scan_prefix(
2314        &self,
2315        prefix: &[u8],
2316        snapshot_ts: u64,
2317        current_txn_id: Option<u64>,
2318    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2319        match self {
2320            MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2321            MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2322        }
2323    }
2324
2325    /// Scan range
2326    #[inline]
2327    pub fn scan_range(
2328        &self,
2329        start: &[u8],
2330        end: &[u8],
2331        snapshot_ts: u64,
2332        current_txn_id: Option<u64>,
2333    ) -> Vec<(Vec<u8>, Vec<u8>)> {
2334        match self {
2335            MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2336            MemTableKind::Arena(m) => {
2337                // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2338                let mut results = Vec::new();
2339                if let Some(ref idx) = m.ordered_index {
2340                    let start_handle = ArenaKeyHandle::new(start);
2341                    let end_handle = ArenaKeyHandle::new(end);
2342
2343                    if end.is_empty() {
2344                        for entry in idx.range(start_handle..) {
2345                            let key = entry.key();
2346                            if let Some(chain) = m.data.get(key)
2347                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2348                                && let Some(value) = &v.value
2349                            {
2350                                results.push((key.as_bytes().to_vec(), value.clone()));
2351                            }
2352                        }
2353                    } else {
2354                        for entry in idx.range(start_handle..end_handle) {
2355                            let key = entry.key();
2356                            if let Some(chain) = m.data.get(key)
2357                                && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2358                                && let Some(value) = &v.value
2359                            {
2360                                results.push((key.as_bytes().to_vec(), value.clone()));
2361                            }
2362                        }
2363                    }
2364                } else {
2365                    for entry in m.data.iter() {
2366                        let key = entry.key();
2367                        let key_bytes = key.as_bytes();
2368                        if key_bytes < start {
2369                            continue;
2370                        }
2371                        if !end.is_empty() && key_bytes >= end {
2372                            continue;
2373                        }
2374                        if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2375                            && let Some(value) = &v.value
2376                        {
2377                            results.push((key_bytes.to_vec(), value.clone()));
2378                        }
2379                    }
2380                }
2381                results
2382            }
2383        }
2384    }
2385
2386    /// Scan range iterator (returns collected results for now)
2387    #[inline]
2388    pub fn scan_range_iter<'a>(
2389        &'a self,
2390        start: &'a [u8],
2391        end: &'a [u8],
2392        snapshot_ts: u64,
2393        current_txn_id: Option<u64>,
2394    ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2395        match self {
2396            MemTableKind::Standard(m) => {
2397                Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2398            }
2399            MemTableKind::Arena(_) => {
2400                // Arena version returns collected results as iterator
2401                let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2402                Box::new(results.into_iter())
2403            }
2404        }
2405    }
2406
2407    /// Get approximate size
2408    #[inline]
2409    pub fn size(&self) -> u64 {
2410        match self {
2411            MemTableKind::Standard(m) => m.size(),
2412            MemTableKind::Arena(m) => m.size(),
2413        }
2414    }
2415
2416    /// Garbage collect old versions
2417    #[inline]
2418    pub fn gc(&self, min_active_ts: u64) -> usize {
2419        match self {
2420            MemTableKind::Standard(m) => m.gc(min_active_ts),
2421            MemTableKind::Arena(m) => m.gc(min_active_ts),
2422        }
2423    }
2424
2425    /// Get the kind of memtable
2426    pub fn kind(&self) -> MemTableType {
2427        match self {
2428            MemTableKind::Standard(_) => MemTableType::Standard,
2429            MemTableKind::Arena(_) => MemTableType::Arena,
2430        }
2431    }
2432}
2433
2434/// Durable storage engine with full ACID support
2435pub struct DurableStorage {
2436    /// Path to storage directory
2437    path: PathBuf,
2438    /// Write-ahead log
2439    wal: Arc<TxnWal>,
2440    /// MVCC manager
2441    mvcc: Arc<MvccManager>,
2442    /// In-memory data (unified abstraction over Standard/Arena)
2443    memtable: Arc<MemTableKind>,
2444    /// Per-transaction WAL buffers for batched writes
2445    /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2446    /// At commit, buffer is flushed to WAL with single lock acquisition
2447    txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2448    /// Group commit buffer (optional)
2449    group_commit: Option<Arc<EventDrivenGroupCommit>>,
2450    /// Recovery state
2451    needs_recovery: AtomicU64, // 1 = needs recovery
2452    /// Last checkpoint LSN
2453    last_checkpoint_lsn: AtomicU64,
2454    /// Synchronous mode (like SQLite's PRAGMA synchronous)
2455    /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2456    sync_mode: AtomicU64,
2457    /// Commits since last sync (for NORMAL mode)
2458    commits_since_sync: AtomicU64,
2459    /// Adaptive batch sizing for NORMAL mode (Little's Law)
2460    /// Arrival rate in requests/sec × 1000 for precision
2461    arrival_rate_ema: AtomicU64,
2462    /// Last commit timestamp in microseconds
2463    last_commit_us: AtomicU64,
2464    /// Estimated fsync latency in microseconds
2465    fsync_latency_us: AtomicU64,
2466    /// Database lock for exclusive access (None = no locking)
2467    #[allow(dead_code)]
2468    db_lock: Option<crate::lock::DatabaseLock>,
2469    /// Whether at-rest encryption is active for this instance (drives the live
2470    /// per-instance durability matrix). Set from the resolved keyring at open.
2471    at_rest_encrypted: bool,
2472    /// Whether Point-in-Time Recovery is enabled for this database (Task 3B PITR
2473    /// phase 1). Derived from the presence of `wal.manifest` at open (the manifest
2474    /// is the single source of truth), or set by `enable_point_in_time_recovery`.
2475    /// When enabled, the destructive `truncate_wal()` is forbidden (segment
2476    /// sealing is the PITR-safe replacement, landing in a later phase) so the WAL
2477    /// record ordinal stays a stable, durable monotonic LSN across restarts.
2478    pitr_enabled: AtomicBool,
2479}
2480
2481/// Encryption configuration for opening a [`DurableStorage`].
2482///
2483/// `disabled()` (the default for the legacy open variants) keeps the database
2484/// plaintext and byte-compatible with pre-encryption binaries. `with_kek()`
2485/// supplies the Key-Encryption-Key — the operator secret that *wraps* a
2486/// per-database data key; it is never used verbatim as the cipher key (see
2487/// [`crate::keyring`]). A wrong/missing KEK for an encrypted database fails
2488/// closed at open (the DB will refuse to open, never silently read as plaintext).
2489pub struct StorageEncryption {
2490    /// The KEK. `None` ⇒ plaintext database.
2491    pub kek: Option<EncryptionKey>,
2492    /// Human-readable identifier for the key source (e.g. "env:SOCHDB_ENCRYPTION_KEY",
2493    /// "embedded", "kms:..."). Bound into the keyring for provenance.
2494    pub source_id: String,
2495}
2496
2497impl StorageEncryption {
2498    /// Plaintext (no encryption) — the default.
2499    pub fn disabled() -> Self {
2500        Self {
2501            kek: None,
2502            source_id: "none".to_string(),
2503        }
2504    }
2505
2506    /// Encrypt at rest under the given KEK.
2507    pub fn with_kek(kek: EncryptionKey, source_id: impl Into<String>) -> Self {
2508        Self {
2509            kek: Some(kek),
2510            source_id: source_id.into(),
2511        }
2512    }
2513
2514    /// Whether a key is configured (i.e. encryption is requested).
2515    pub fn is_enabled(&self) -> bool {
2516        self.kek.is_some()
2517    }
2518}
2519
2520impl DurableStorage {
2521    /// Open or create durable storage at path
2522    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2523        Self::open_with_config(path, true)
2524    }
2525
2526    /// Open with configurable ordered index
2527    ///
2528    /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2529    /// but scan_prefix becomes O(N) instead of O(log N + K)
2530    pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2531        Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2532    }
2533
2534    /// Open with arena-backed memtable for write-heavy workloads
2535    ///
2536    /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2537    /// Best for workloads with:
2538    /// - High write throughput
2539    /// - Large keys (reduces allocation overhead)
2540    /// - Minimal concurrent reads during writes
2541    pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2542        Self::open_with_full_config(path, true, MemTableType::Arena)
2543    }
2544
2545    /// Open with full configuration options
2546    ///
2547    /// # Arguments
2548    /// * `path` - Storage directory path
2549    /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2550    /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2551    ///
2552    /// # Locking
2553    ///
2554    /// Acquires an exclusive advisory lock on the database directory.
2555    /// This prevents concurrent multi-process access which would corrupt data.
2556    /// If another process has the database open, returns `Err(DatabaseLocked)`.
2557    pub fn open_with_full_config<P: AsRef<Path>>(
2558        path: P,
2559        enable_ordered_index: bool,
2560        memtable_type: MemTableType,
2561    ) -> Result<Self> {
2562        Self::open_with_full_config_internal(
2563            path,
2564            enable_ordered_index,
2565            memtable_type,
2566            true,
2567            StorageEncryption::disabled(),
2568        )
2569    }
2570
2571    /// Open with at-rest encryption configured via [`StorageEncryption`].
2572    ///
2573    /// With `StorageEncryption::disabled()` this is identical to
2574    /// [`Self::open_with_full_config`]. With a KEK, the database is opened (or
2575    /// created) encrypted: a per-DB data key is generated and wrapped into the
2576    /// keyring on first open, and a wrong/missing key on a subsequent open fails
2577    /// closed. Reads are unaffected (the live read path is in-memory); only the
2578    /// WAL write/recovery path pays the AEAD cost.
2579    pub fn open_with_encryption<P: AsRef<Path>>(
2580        path: P,
2581        enable_ordered_index: bool,
2582        memtable_type: MemTableType,
2583        encryption: StorageEncryption,
2584    ) -> Result<Self> {
2585        Self::open_with_full_config_internal(
2586            path,
2587            enable_ordered_index,
2588            memtable_type,
2589            true,
2590            encryption,
2591        )
2592    }
2593
2594    /// The live, per-instance durability capabilities of THIS database.
2595    ///
2596    /// Unlike the build-level [`crate::durability_capabilities`] (which reports
2597    /// defaults), this reflects the actual resolved state — notably whether
2598    /// at-rest encryption is active for this opened instance.
2599    pub fn durability_capabilities(&self) -> DurabilityCapabilities {
2600        DurabilityCapabilities {
2601            crash_recovery: true,
2602            at_rest_encryption: self.at_rest_encrypted,
2603            // Point-in-time recovery is live (recover_to) when PITR is enabled:
2604            // the WAL is fully retained and can be replayed to an LSN/timestamp.
2605            point_in_time_recovery: self.pitr_enabled.load(Ordering::SeqCst),
2606            // ARIES / fencing remain unwired on the live path.
2607            aries_checkpoint: false,
2608            wal_fencing: false,
2609        }
2610    }
2611
2612    /// Whether at-rest encryption is active for this instance.
2613    pub fn is_encrypted(&self) -> bool {
2614        self.at_rest_encrypted
2615    }
2616
2617    /// Open without locking (for testing crash recovery scenarios)
2618    ///
2619    /// # Safety
2620    /// This should ONLY be used in tests that simulate crashes by forgetting
2621    /// the storage instance. In production, always use `open_with_full_config`.
2622    #[cfg(test)]
2623    pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2624        Self::open_with_full_config_internal(
2625            path,
2626            true,
2627            MemTableType::Standard,
2628            false,
2629            StorageEncryption::disabled(),
2630        )
2631    }
2632
2633    /// Open an ephemeral (in-memory-like) DurableStorage backed by a temp directory.
2634    ///
2635    /// Uses the full DurableStorage engine (WAL, MVCC, SSI) but writes to a
2636    /// temporary directory that is automatically cleaned up when the
2637    /// `EphemeralHandle` is dropped. This ensures test and production code paths
2638    /// are identical — bugs found in tests are guaranteed to reproduce in production.
2639    ///
2640    /// # Returns
2641    /// An `EphemeralHandle` that owns both the storage and the temp directory.
2642    /// Access the storage via `handle.storage()` or `Deref` coercion.
2643    ///
2644    /// # Example
2645    /// ```ignore
2646    /// let handle = DurableStorage::open_ephemeral()?;
2647    /// let txn = handle.begin_transaction()?;
2648    /// handle.write(txn, b"key".to_vec(), b"value".to_vec())?;
2649    /// handle.commit(txn)?;
2650    /// // temp directory cleaned up when `handle` drops
2651    /// ```
2652    pub fn open_ephemeral() -> Result<EphemeralHandle> {
2653        let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2654        let storage = Self::open_with_full_config_internal(
2655            tmp.path(),
2656            true,
2657            MemTableType::Standard,
2658            false, // No lock needed for ephemeral
2659            StorageEncryption::disabled(),
2660        )?;
2661        Ok(EphemeralHandle {
2662            storage,
2663            _tmpdir: tmp,
2664        })
2665    }
2666
2667    /// Open an ephemeral DurableStorage with group commit enabled.
2668    ///
2669    /// Same as `open_ephemeral()` but with group commit for higher throughput.
2670    pub fn open_ephemeral_with_group_commit() -> Result<EphemeralHandle> {
2671        let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2672        let mut storage = Self::open_with_full_config_internal(
2673            tmp.path(),
2674            true,
2675            MemTableType::Standard,
2676            false,
2677            StorageEncryption::disabled(),
2678        )?;
2679
2680        let wal = storage.wal.clone();
2681        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2682            for &txn_id in txn_ids {
2683                let entry = TxnWalEntry::txn_commit(txn_id);
2684                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2685            }
2686            wal.flush().map_err(|e| e.to_string())?;
2687            wal.sync().map_err(|e| e.to_string())?;
2688            Ok(std::time::SystemTime::now()
2689                .duration_since(std::time::UNIX_EPOCH)
2690                .unwrap()
2691                .as_micros() as u64)
2692        });
2693        storage.group_commit = Some(Arc::new(gc));
2694
2695        Ok(EphemeralHandle {
2696            storage,
2697            _tmpdir: tmp,
2698        })
2699    }
2700
2701    fn open_with_full_config_internal<P: AsRef<Path>>(
2702        path: P,
2703        enable_ordered_index: bool,
2704        memtable_type: MemTableType,
2705        acquire_lock: bool,
2706        encryption: StorageEncryption,
2707    ) -> Result<Self> {
2708        let path = path.as_ref().to_path_buf();
2709        std::fs::create_dir_all(&path)?;
2710
2711        // Acquire exclusive lock on database directory (unless disabled for testing)
2712        let db_lock = if acquire_lock {
2713            Some(
2714                crate::lock::DatabaseLock::acquire(&path)
2715                    .map_err(|e| SochDBError::LockError(e.to_string()))?,
2716            )
2717        } else {
2718            None
2719        };
2720
2721        let wal_path = path.join("wal.log");
2722
2723        // Resolve at-rest encryption from the keyring BEFORE touching the WAL.
2724        // - A fresh DB (no wal.log yet) with a KEK ⇒ create an encrypted keyring.
2725        // - An existing plaintext DB with a KEK ⇒ refused (must migrate explicitly).
2726        // - An encrypted DB with a wrong/missing KEK ⇒ refused fail-closed.
2727        let is_new_db = !wal_path.exists();
2728        let enc_state = keyring::load_or_init(
2729            &path,
2730            encryption.kek.as_ref(),
2731            &encryption.source_id,
2732            is_new_db,
2733        )?;
2734        let at_rest_encrypted = enc_state.is_encrypted();
2735        let wal = Arc::new(TxnWal::new_with_encryption(
2736            &wal_path,
2737            enc_state.engine(),
2738            enc_state.db_uuid(),
2739            enc_state.key_epoch(),
2740        )?);
2741
2742        // PITR anchor: the presence of wal.manifest is the single source of truth
2743        // that this DB is PITR-enabled. If present, seed last_checkpoint_lsn from
2744        // it (it is otherwise in-memory and lost on restart).
2745        let (pitr_enabled, initial_checkpoint_lsn) =
2746            if crate::wal_manifest::WalManifest::exists(&path) {
2747                let m = crate::wal_manifest::WalManifest::load(&path)?;
2748                (true, m.last_checkpoint_lsn)
2749            } else {
2750                (false, 0)
2751            };
2752
2753        let storage = Self {
2754            path,
2755            wal: wal.clone(),
2756            mvcc: Arc::new(MvccManager::new()),
2757            memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2758            txn_write_buffers: DashMap::new(),
2759            group_commit: None,
2760            needs_recovery: AtomicU64::new(0),
2761            last_checkpoint_lsn: AtomicU64::new(initial_checkpoint_lsn),
2762            sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2763            commits_since_sync: AtomicU64::new(0),
2764            // Adaptive batch sizing (Little's Law)
2765            arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2766            last_commit_us: AtomicU64::new(0),
2767            fsync_latency_us: AtomicU64::new(5000), // 5ms default
2768            db_lock,
2769            at_rest_encrypted,
2770            pitr_enabled: AtomicBool::new(pitr_enabled),
2771        };
2772
2773        // Check if recovery needed
2774        if storage.check_recovery_needed()? {
2775            storage.needs_recovery.store(1, Ordering::SeqCst);
2776        }
2777
2778        Ok(storage)
2779    }
2780
2781    /// Open with group commit enabled
2782    pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2783        Self::open_with_group_commit_and_config(path, true)
2784    }
2785
2786    /// Open with group commit and configurable ordered index
2787    pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2788        path: P,
2789        enable_ordered_index: bool,
2790    ) -> Result<Self> {
2791        let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2792
2793        let wal = storage.wal.clone();
2794        let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2795            // Write all commit records WITHOUT flushing (batch them)
2796            for &txn_id in txn_ids {
2797                let entry = TxnWalEntry::txn_commit(txn_id);
2798                wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2799            }
2800
2801            // Then do a SINGLE flush + fsync for the entire batch
2802            wal.flush().map_err(|e| e.to_string())?;
2803            wal.sync().map_err(|e| e.to_string())?;
2804
2805            // Return commit timestamp
2806            Ok(std::time::SystemTime::now()
2807                .duration_since(std::time::UNIX_EPOCH)
2808                .unwrap()
2809                .as_micros() as u64)
2810        });
2811
2812        storage.group_commit = Some(Arc::new(gc));
2813        Ok(storage)
2814    }
2815
2816    /// Open with IndexPolicy for automatic memtable/index configuration
2817    ///
2818    /// This is the recommended constructor for new code. The policy determines:
2819    /// - Whether to use ordered index (ScanOptimized only)
2820    /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2821    /// - Default settings optimized for the workload pattern
2822    ///
2823    /// # Arguments
2824    /// * `path` - Storage directory path
2825    /// * `policy` - Index policy determining write/scan tradeoffs
2826    /// * `group_commit` - Whether to enable group commit for throughput
2827    pub fn open_with_policy<P: AsRef<Path>>(
2828        path: P,
2829        policy: crate::index_policy::IndexPolicy,
2830        group_commit: bool,
2831    ) -> Result<Self> {
2832        Self::open_with_policy_encrypted(path, policy, group_commit, StorageEncryption::disabled())
2833    }
2834
2835    /// Policy-based open with at-rest encryption configured.
2836    ///
2837    /// Identical to [`Self::open_with_policy`] but threads a [`StorageEncryption`]
2838    /// down to the keyring/WAL so the embedded `Database` kernel can open (or
2839    /// create) an encrypted database. `StorageEncryption::disabled()` is exactly
2840    /// the plaintext behaviour.
2841    pub fn open_with_policy_encrypted<P: AsRef<Path>>(
2842        path: P,
2843        policy: crate::index_policy::IndexPolicy,
2844        group_commit: bool,
2845        encryption: StorageEncryption,
2846    ) -> Result<Self> {
2847        use crate::index_policy::IndexPolicy;
2848
2849        // Derive configuration from policy
2850        let (enable_ordered_index, memtable_type) = match policy {
2851            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2852                // Write-heavy: no ordered index, use arena for reduced allocations
2853                (false, MemTableType::Arena)
2854            }
2855            IndexPolicy::Balanced => {
2856                // Mixed OLTP: deferred sorting (already implemented in Standard)
2857                (true, MemTableType::Standard)
2858            }
2859            IndexPolicy::ScanOptimized => {
2860                // Scan-heavy: maintain ordered index
2861                (true, MemTableType::Standard)
2862            }
2863        };
2864
2865        if group_commit {
2866            let mut storage =
2867                Self::open_with_encryption(path, enable_ordered_index, memtable_type, encryption)?;
2868
2869            let wal = storage.wal.clone();
2870            let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2871                for &txn_id in txn_ids {
2872                    let entry = TxnWalEntry::txn_commit(txn_id);
2873                    wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2874                }
2875                wal.flush().map_err(|e| e.to_string())?;
2876                wal.sync().map_err(|e| e.to_string())?;
2877                Ok(std::time::SystemTime::now()
2878                    .duration_since(std::time::UNIX_EPOCH)
2879                    .unwrap()
2880                    .as_micros() as u64)
2881            });
2882            storage.group_commit = Some(Arc::new(gc));
2883            Ok(storage)
2884        } else {
2885            Self::open_with_encryption(path, enable_ordered_index, memtable_type, encryption)
2886        }
2887    }
2888
2889    /// Open storage for concurrent mode (multi-reader, single-writer)
2890    ///
2891    /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2892    /// Coordination is handled by the concurrent MVCC layer instead.
2893    ///
2894    /// # Safety
2895    ///
2896    /// This must ONLY be called from `Database::open_concurrent()` which
2897    /// manages the concurrent MVCC coordination. Direct use will cause
2898    /// data corruption.
2899    pub fn open_for_concurrent<P: AsRef<Path>>(
2900        path: P,
2901        policy: crate::index_policy::IndexPolicy,
2902    ) -> Result<Self> {
2903        Self::open_for_concurrent_encrypted(path, policy, StorageEncryption::disabled())
2904    }
2905
2906    /// Concurrent-mode open with at-rest encryption configured.
2907    ///
2908    /// Identical to [`Self::open_for_concurrent`] but threads a
2909    /// [`StorageEncryption`] through, so an encrypted database can also be opened
2910    /// in concurrent (multi-reader) mode rather than failing closed for lack of a
2911    /// key channel.
2912    pub fn open_for_concurrent_encrypted<P: AsRef<Path>>(
2913        path: P,
2914        policy: crate::index_policy::IndexPolicy,
2915        encryption: StorageEncryption,
2916    ) -> Result<Self> {
2917        use crate::index_policy::IndexPolicy;
2918
2919        let (enable_ordered_index, memtable_type) = match policy {
2920            IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => (false, MemTableType::Arena),
2921            IndexPolicy::Balanced => (true, MemTableType::Standard),
2922            IndexPolicy::ScanOptimized => (true, MemTableType::Standard),
2923        };
2924
2925        // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2926        Self::open_with_full_config_internal(
2927            path,
2928            enable_ordered_index,
2929            memtable_type,
2930            false,
2931            encryption,
2932        )
2933    }
2934
2935    /// Get the memtable type being used
2936    pub fn memtable_type(&self) -> MemTableType {
2937        self.memtable.kind()
2938    }
2939
2940    /// Check if recovery is needed (dirty shutdown detection)
2941    ///
2942    /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2943    /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2944    /// but committed data still needs to be loaded from WAL into the memtable.
2945    fn check_recovery_needed(&self) -> Result<bool> {
2946        let marker_path = self.path.join(".clean_shutdown");
2947        if marker_path.exists() {
2948            // Clean shutdown - remove marker
2949            std::fs::remove_file(&marker_path)?;
2950        }
2951        // ALWAYS need recovery to rebuild memtable from WAL
2952        // The memtable is in-memory only and needs to be restored on every startup
2953        Ok(true)
2954    }
2955
2956    /// Perform crash recovery
2957    pub fn recover(&self) -> Result<RecoveryStats> {
2958        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2959            return Ok(RecoveryStats::default());
2960        }
2961
2962        let (writes, txn_count) = self.wal.replay_for_recovery()?;
2963
2964        // Apply committed writes to memtable
2965        let recovery_txn_id = self.wal.alloc_txn_id();
2966        let commit_ts = self.mvcc.alloc_commit_ts();
2967
2968        // Collect keys being written for efficient commit
2969        let mut write_set: HashSet<InlineKey> = HashSet::new();
2970        for (key, value) in &writes {
2971            write_set.insert(SmallVec::from_slice(key));
2972            self.memtable
2973                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2974        }
2975        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2976
2977        self.needs_recovery.store(0, Ordering::SeqCst);
2978
2979        Ok(RecoveryStats {
2980            transactions_recovered: txn_count,
2981            writes_recovered: writes.len(),
2982            commit_ts,
2983        })
2984    }
2985
2986    /// Point-in-Time Recovery: rebuild the in-memory state as of `target`.
2987    ///
2988    /// This is the PITR analogue of [`Self::recover`] — call it on a FRESH open
2989    /// INSTEAD of `recover()` (not in addition to it), to materialize the
2990    /// database as it existed at a chosen LSN or commit timestamp. Because PITR
2991    /// mode never truncates the WAL, the full history is retained and replayed up
2992    /// to (and stopping at) the target, with transaction atomicity preserved
2993    /// (a transaction is applied only if its commit is within the target).
2994    ///
2995    /// Requires PITR to be enabled (the WAL must be fully retained); returns an
2996    /// error otherwise, since a truncated WAL cannot honor an arbitrary target.
2997    pub fn recover_to(&self, target: crate::txn_wal::RecoveryTarget) -> Result<RecoveryStats> {
2998        if !self.pitr_enabled.load(Ordering::SeqCst) {
2999            return Err(SochDBError::InvalidArgument(
3000                "recover_to requires Point-in-Time Recovery to be enabled \
3001                 (the full WAL must be retained); call enable_point_in_time_recovery first"
3002                    .to_string(),
3003            ));
3004        }
3005
3006        // Single-shot recovery on a fresh open: refuse if state was already
3007        // rebuilt (by recover() or a prior recover_to()). Re-applying would layer
3008        // a stale set over the point-in-time state under a newer commit_ts and
3009        // silently corrupt it (recover()/recover_to() both clear needs_recovery).
3010        if self.needs_recovery.load(Ordering::SeqCst) == 0 {
3011            return Err(SochDBError::InvalidArgument(
3012                "recover_to must be the sole recovery on a fresh open, but state \
3013                 was already recovered; reopen the database and call recover_to first"
3014                    .to_string(),
3015            ));
3016        }
3017
3018        // Make the on-disk WAL match current_lsn() before replaying. Under the
3019        // default NORMAL sync mode the commit record(s) may still sit in the
3020        // BufWriter, while current_lsn() counts the in-memory sequence — so
3021        // without this flush+fsync a captured-LSN cut would silently drop
3022        // committed-but-unflushed records (replay reads a fresh on-disk handle).
3023        self.wal.flush()?;
3024        self.wal.sync()?;
3025
3026        let (writes, txn_count) = self.wal.replay_to_target(target)?;
3027
3028        // Apply the bounded set of committed writes to the (fresh) memtable,
3029        // mirroring recover().
3030        let recovery_txn_id = self.wal.alloc_txn_id();
3031        let commit_ts = self.mvcc.alloc_commit_ts();
3032        let mut write_set: HashSet<InlineKey> = HashSet::new();
3033        for (key, value) in &writes {
3034            write_set.insert(SmallVec::from_slice(key));
3035            self.memtable
3036                .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
3037        }
3038        self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
3039
3040        self.needs_recovery.store(0, Ordering::SeqCst);
3041
3042        Ok(RecoveryStats {
3043            transactions_recovered: txn_count,
3044            writes_recovered: writes.len(),
3045            commit_ts,
3046        })
3047    }
3048
3049    /// Begin a new transaction
3050    pub fn begin_transaction(&self) -> Result<u64> {
3051        let txn_id = self.wal.begin_transaction()?;
3052        self.mvcc.begin(txn_id);
3053        Ok(txn_id)
3054    }
3055
3056    /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
3057    ///
3058    /// This enables mode-aware optimizations:
3059    /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
3060    /// - WriteOnly: Skip read tracking, faster bulk inserts
3061    /// - ReadWrite: Full SSI for serializable isolation
3062    pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
3063        let txn_id = self.wal.begin_transaction()?;
3064        self.mvcc.begin_with_mode(txn_id, mode);
3065        Ok(txn_id)
3066    }
3067
3068    /// Begin a read-only transaction without any WAL records.
3069    ///
3070    /// This is a performance-critical optimization that eliminates two WAL
3071    /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
3072    /// transactions have no state to recover, WAL records are unnecessary.
3073    ///
3074    /// Callers MUST use `abort_read_only_fast()` to clean up.
3075    #[inline]
3076    pub fn begin_read_only_fast(&self) -> u64 {
3077        let txn_id = self.wal.alloc_txn_id();
3078        self.mvcc.begin_read_only(txn_id);
3079        txn_id
3080    }
3081
3082    /// Abort a fast read-only transaction.
3083    ///
3084    /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
3085    #[inline]
3086    pub fn abort_read_only_fast(&self, txn_id: u64) {
3087        self.mvcc.abort(txn_id);
3088    }
3089
3090    /// Read a key WITHOUT any MVCC transaction tracking.
3091    ///
3092    /// Uses the current global timestamp to see all committed writes.
3093    /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
3094    /// Only safe for single-threaded access (no concurrent writes).
3095    #[inline]
3096    pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
3097        let snapshot_ts = self
3098            .mvcc
3099            .ts_counter
3100            .load(std::sync::atomic::Ordering::Relaxed);
3101        self.memtable.read(key, snapshot_ts, None)
3102    }
3103
3104    /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
3105    ///
3106    /// Uses the current global timestamp. Only safe for single-threaded access.
3107    #[inline]
3108    pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
3109        let snapshot_ts = self
3110            .mvcc
3111            .ts_counter
3112            .load(std::sync::atomic::Ordering::Relaxed);
3113        self.memtable.scan_prefix(prefix, snapshot_ts, None)
3114    }
3115
3116    /// Read a key within a transaction
3117    #[inline]
3118    pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
3119        // Fast path: get just snapshot_ts without cloning whole transaction
3120        let snapshot_ts = self
3121            .mvcc
3122            .get_snapshot_ts(txn_id)
3123            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3124
3125        // Record read for SSI (skipped for read-only transactions)
3126        self.mvcc.record_read(txn_id, key);
3127
3128        // Read at snapshot timestamp, seeing own uncommitted writes
3129        Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
3130    }
3131
3132    /// Write a key-value pair within a transaction
3133    ///
3134    /// Writes are buffered and only flushed to disk on commit.
3135    /// This provides ~10× better throughput for batched inserts.
3136    pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
3137        // Use the zero-allocation path internally
3138        self.write_refs(txn_id, &key, &value)?;
3139
3140        Ok(())
3141    }
3142
3143    /// Write from references - zero allocation hot path
3144    ///
3145    /// Avoids cloning key/value by writing to WAL from refs directly,
3146    /// then only allocating once for memtable storage.
3147    #[inline]
3148    pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
3149        // Record write for MVCC (uses InlineKey - zero allocation for small keys)
3150        self.mvcc.record_write(txn_id, key);
3151
3152        // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
3153        // This reduces lock contention from O(writes) to O(1) per transaction
3154        self.txn_write_buffers
3155            .entry(txn_id)
3156            .or_insert_with(|| TxnWalBuffer::new(txn_id))
3157            .append(key, value);
3158
3159        // Write to memtable (needs owned key/value for storage)
3160        self.memtable
3161            .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
3162
3163        Ok(())
3164    }
3165
3166    /// Delete a key within a transaction
3167    pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
3168        // Record write (uses InlineKey - zero allocation for small keys)
3169        self.mvcc.record_write(txn_id, &key);
3170
3171        // Buffer tombstone in memory - NO WAL lock taken!
3172        self.txn_write_buffers
3173            .entry(txn_id)
3174            .or_insert_with(|| TxnWalBuffer::new(txn_id))
3175            .append(&key, &[]); // Empty value = tombstone
3176
3177        // Write tombstone to memtable
3178        self.memtable.write(key, None, txn_id)?;
3179
3180        Ok(())
3181    }
3182
3183    /// Batch write multiple key-value pairs with reduced overhead
3184    ///
3185    /// This API amortizes fixed costs over the batch:
3186    /// - Single DashMap entry lookup for TxnWalBuffer
3187    /// - Single MVCC write set update
3188    /// - Batch memtable operations
3189    ///
3190    /// Performance: ~2-3x faster than individual write_refs calls
3191    /// for batches of 100+ entries.
3192    ///
3193    /// # Arguments
3194    /// * `txn_id` - Transaction ID
3195    /// * `writes` - Slice of (key, value) pairs
3196    #[inline]
3197    pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
3198        if writes.is_empty() {
3199            return Ok(());
3200        }
3201
3202        // Single DashMap access for entire batch
3203        let mut buffer_entry = self
3204            .txn_write_buffers
3205            .entry(txn_id)
3206            .or_insert_with(|| TxnWalBuffer::new(txn_id));
3207
3208        // Batch operations with reduced per-row overhead
3209        for (key, value) in writes {
3210            // Record write for MVCC
3211            self.mvcc.record_write(txn_id, key);
3212
3213            // Append to WAL buffer
3214            buffer_entry.append(key, value);
3215        }
3216        drop(buffer_entry);
3217
3218        // Batch write to memtable
3219        let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
3220            .iter()
3221            .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
3222            .collect();
3223        self.memtable.write_batch(&owned_writes, txn_id)?;
3224
3225        Ok(())
3226    }
3227
3228    /// Commit a transaction
3229    ///
3230    /// With sync_mode:
3231    /// - 0 (OFF): No sync, risk of data loss
3232    /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
3233    /// - 2 (FULL): Sync every commit (safest, slowest)
3234    pub fn commit(&self, txn_id: u64) -> Result<u64> {
3235        // First, flush all buffered DATA writes to the WAL with a SINGLE lock
3236        // acquisition (O(1) lock instead of O(writes) locks). Flushing data
3237        // records before validation is safe: ARIES recovery only treats them as
3238        // winners if a *durable commit record* for this transaction also exists,
3239        // and that record is written below — strictly after validation succeeds.
3240        if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
3241            && !buffer.is_empty()
3242        {
3243            // Flush entire buffer to WAL with one lock
3244            self.wal.flush_buffer(&buffer)?;
3245        }
3246
3247        // ====================================================================
3248        // Task 1 — Linearizable commit invariant:
3249        //
3250        //   A commit record must NEVER become durable unless the transaction
3251        //   has already passed SSI validation and its write set is final.
3252        //
3253        // We therefore VALIDATE (and freeze the write set) *before* making the
3254        // commit record durable. `mvcc.commit` performs SSI validation and
3255        // returns `None` on a dangerous structure (serialization conflict) or
3256        // if the transaction no longer exists. Writing/fsyncing the commit
3257        // record before this point would let a crash resurrect a transaction
3258        // that the live system rejected — the classic "committed-after-crash,
3259        // aborted-before-crash" non-linearizability bug.
3260        // ====================================================================
3261        let (commit_ts, write_set) = self.mvcc.commit(txn_id).ok_or_else(|| {
3262            SochDBError::Validation(
3263                "transaction aborted: SSI validation failed (serialization conflict) \
3264                 or transaction not found"
3265                    .into(),
3266            )
3267        })?;
3268
3269        // Validation passed and the write set is frozen. Only now may the
3270        // commit record become durable.
3271        if let Some(gc) = &self.group_commit {
3272            // Submit the *validated* commit intent to group commit and wait.
3273            // This batches multiple validated commits into a single fsync.
3274            gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
3275        } else {
3276            // Direct commit path with adaptive sync (Little's Law)
3277            let sync_mode = self.sync_mode.load(Ordering::Relaxed);
3278            let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
3279
3280            // Update arrival rate for adaptive batching
3281            self.update_arrival_rate();
3282
3283            // Write commit record (no flush yet - BufWriter will buffer it)
3284            let entry = TxnWalEntry::txn_commit(txn_id);
3285            self.wal.append_no_flush(&entry)?;
3286
3287            // Determine if we should sync/flush based on mode
3288            let should_sync = match sync_mode {
3289                0 => false,                                      // OFF: never sync
3290                1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
3291                _ => true,                                       // FULL: always sync
3292            };
3293
3294            if should_sync {
3295                // Measure fsync latency for adaptive tuning
3296                let start = std::time::Instant::now();
3297                self.wal.flush()?;
3298                self.wal.sync()?;
3299                let latency_us = start.elapsed().as_micros() as u64;
3300
3301                // Update fsync latency estimate (EMA with α = 0.1)
3302                let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
3303                let new_latency = (old_latency * 9 + latency_us) / 10;
3304                self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
3305
3306                self.commits_since_sync.store(0, Ordering::Relaxed);
3307            }
3308        }
3309
3310        // Commit record is durable (or buffered per sync mode) for a validated
3311        // transaction — publish the writes to the memtable. (O(k), k = keys.)
3312        self.memtable.commit(txn_id, commit_ts, &write_set);
3313
3314        Ok(commit_ts)
3315    }
3316
3317    /// Update arrival rate using exponential moving average
3318    #[inline]
3319    fn update_arrival_rate(&self) {
3320        let now_us = std::time::SystemTime::now()
3321            .duration_since(std::time::UNIX_EPOCH)
3322            .unwrap()
3323            .as_micros() as u64;
3324
3325        let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
3326
3327        if last > 0 {
3328            let delta_us = now_us.saturating_sub(last);
3329            if delta_us > 0 && delta_us < 10_000_000 {
3330                // Ignore gaps > 10s
3331                // Rate = 1_000_000 / delta_us (requests/sec)
3332                // Stored as rate × 1000 for precision
3333                let instant_rate = 1_000_000_000 / delta_us;
3334
3335                // EMA with α = 0.1
3336                let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
3337                let new_rate = (old_rate * 9 + instant_rate) / 10;
3338                self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
3339            }
3340        }
3341    }
3342
3343    /// Compute optimal batch threshold using Little's Law
3344    ///
3345    /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
3346    /// Returns the number of commits to batch before fsync
3347    #[inline]
3348    fn adaptive_batch_threshold(&self) -> u64 {
3349        let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
3350        let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
3351
3352        if lambda <= 0.0 || tau <= 0.0 {
3353            return 100; // Fallback to fixed threshold
3354        }
3355
3356        // Little's Law: W* = sqrt(2 × τ × λ)
3357        // This minimizes total time = wait_time + fsync_overhead
3358        let n_opt = (2.0 * tau * lambda).sqrt();
3359
3360        // Clamp between 1 and 1000
3361        (n_opt as u64).clamp(1, 1000)
3362    }
3363
3364    /// Set synchronous mode
3365    ///
3366    /// - 0: OFF - No fsync (risk of data loss)
3367    /// - 1: NORMAL - Periodic fsync (balanced)
3368    /// - 2: FULL - Fsync every commit (safest)
3369    pub fn set_sync_mode(&self, mode: u64) {
3370        self.sync_mode.store(mode.min(2), Ordering::Relaxed);
3371    }
3372
3373    /// Force a group commit flush (useful for benchmarking or testing)
3374    pub fn flush_group_commit(&self) {
3375        if let Some(gc) = &self.group_commit {
3376            gc.flush_batch();
3377        }
3378    }
3379
3380    /// Abort a transaction
3381    ///
3382    /// Performance: O(1) for read-only transactions (no writes to clean up).
3383    /// For write transactions, O(N) memtable scan is required to remove
3384    /// uncommitted versions.
3385    pub fn abort(&self, txn_id: u64) -> Result<()> {
3386        // Check if transaction had any buffered writes.
3387        // Read-only transactions never populate txn_write_buffers,
3388        // so this returns None — allowing us to skip the O(N) memtable scan.
3389        let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3390
3391        if had_writes {
3392            // Write abort record to WAL (only needed if data was written)
3393            self.wal.abort_transaction(txn_id)?;
3394            // Clean up uncommitted memtable entries
3395            self.memtable.abort(txn_id);
3396        }
3397
3398        // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3399        self.mvcc.abort(txn_id);
3400
3401        Ok(())
3402    }
3403
3404    /// Scan keys with prefix
3405    #[inline]
3406    pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3407        // Fast path: get just snapshot_ts without cloning whole transaction
3408        let snapshot_ts = self
3409            .mvcc
3410            .get_snapshot_ts(txn_id)
3411            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3412
3413        // Note: Scan doesn't record individual key reads for SSI (too expensive)
3414        // SSI conflicts are tracked at the prefix level if needed
3415        Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3416    }
3417
3418    /// Scan keys in range
3419    #[inline]
3420    pub fn scan_range(
3421        &self,
3422        txn_id: u64,
3423        start: &[u8],
3424        end: &[u8],
3425    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3426        let snapshot_ts = self
3427            .mvcc
3428            .get_snapshot_ts(txn_id)
3429            .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3430
3431        Ok(self
3432            .memtable
3433            .scan_range(start, end, snapshot_ts, Some(txn_id)))
3434    }
3435
3436    /// Streaming scan for very large result sets
3437    ///
3438    /// Returns an iterator that yields (key, value) pairs without
3439    /// materializing the entire result set in memory.
3440    #[inline]
3441    pub fn scan_range_iter<'a>(
3442        &'a self,
3443        txn_id: u64,
3444        start: &'a [u8],
3445        end: &'a [u8],
3446    ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3447        let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3448        self.memtable
3449            .scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3450    }
3451
3452    /// Force fsync to disk
3453    /// Flush the WAL's in-memory buffer to the OS
3454    ///
3455    /// This ensures all buffered writes are pushed from the BufWriter
3456    /// into the OS page cache. Call this before `fsync()` to ensure
3457    /// all data is durable.
3458    pub fn flush_wal(&self) -> Result<()> {
3459        self.wal.flush()
3460    }
3461
3462    /// Force sync the WAL to disk (fsync)
3463    pub fn fsync(&self) -> Result<()> {
3464        self.wal.sync()
3465    }
3466
3467    /// Write checkpoint
3468    pub fn checkpoint(&self) -> Result<u64> {
3469        let txn_id = 0; // System transaction
3470        let entry = TxnWalEntry::checkpoint(txn_id);
3471        let lsn = self.wal.append_sync(&entry)?;
3472        self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3473        // PITR: persist the checkpoint LSN to the durable manifest so it survives
3474        // restart. This piggybacks the fsync that append_sync already performed;
3475        // the manifest write is itself crash-safe (temp + fsync + atomic rename).
3476        // No-op (and no manifest write) when PITR is not enabled.
3477        if self.pitr_enabled.load(Ordering::SeqCst) {
3478            self.persist_pitr_manifest(lsn)?;
3479        }
3480        Ok(lsn)
3481    }
3482
3483    /// The current durable monotonic LSN (the WAL record ordinal). In PITR mode
3484    /// the WAL is never truncated, so this is stable and monotonic across
3485    /// restarts (`recover_state` rebuilds it by re-counting on reopen).
3486    pub fn current_lsn(&self) -> u64 {
3487        self.wal.sequence()
3488    }
3489
3490    /// Whether Point-in-Time Recovery is enabled for this database.
3491    pub fn is_pitr_enabled(&self) -> bool {
3492        self.pitr_enabled.load(Ordering::SeqCst)
3493    }
3494
3495    /// Enable Point-in-Time Recovery for this database (one-way, explicit opt-in).
3496    ///
3497    /// Writes the durable `wal.manifest` whose presence marks the DB PITR-enabled
3498    /// on every future open. Once enabled, the destructive [`Self::truncate_wal`]
3499    /// is forbidden so the WAL record ordinal stays a stable durable LSN (segment
3500    /// sealing — the PITR-safe space-reclaim — lands in a later phase). Idempotent.
3501    pub fn enable_point_in_time_recovery(&self) -> Result<()> {
3502        if self.pitr_enabled.load(Ordering::SeqCst) {
3503            return Ok(()); // already enabled
3504        }
3505        // Persist the durable manifest FIRST; only flip the in-memory flag after
3506        // the durable write succeeds. Otherwise a failed manifest write would
3507        // leave `durability_capabilities()` reporting PITR (and the truncate
3508        // guard active) with no durable anchor on disk.
3509        let lsn = self.last_checkpoint_lsn.load(Ordering::SeqCst);
3510        if crate::wal_manifest::WalManifest::exists(&self.path) {
3511            self.persist_pitr_manifest(lsn)?;
3512        } else {
3513            crate::wal_manifest::WalManifest::new(lsn).write_atomic(&self.path)?;
3514        }
3515        self.pitr_enabled.store(true, Ordering::SeqCst);
3516        Ok(())
3517    }
3518
3519    /// Persist the PITR manifest with the given checkpoint LSN, preserving the
3520    /// existing db identity if a manifest is already present.
3521    fn persist_pitr_manifest(&self, lsn: u64) -> Result<()> {
3522        let manifest = match crate::wal_manifest::WalManifest::load(&self.path) {
3523            Ok(mut m) => {
3524                m.last_checkpoint_lsn = lsn;
3525                m
3526            }
3527            Err(_) => crate::wal_manifest::WalManifest::new(lsn),
3528        };
3529        manifest.write_atomic(&self.path)
3530    }
3531
3532    /// Truncate the WAL file after checkpoint.
3533    ///
3534    /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3535    /// space. The in-memory memtable retains all data for the current
3536    /// session, but a crash after truncation will result in data loss
3537    /// since the WAL is the only persistence mechanism for DurableStorage.
3538    ///
3539    /// Call after `checkpoint()` when WAL durability across restarts is
3540    /// not required (e.g. desktop telemetry viewers, caches).
3541    ///
3542    /// Refused when PITR is enabled: truncation resets the WAL record ordinal,
3543    /// which would break the durable monotonic LSN that PITR anchors on. The
3544    /// PITR-safe way to reclaim space is segment sealing (a later phase).
3545    pub fn truncate_wal(&self) -> Result<()> {
3546        if self.pitr_enabled.load(Ordering::SeqCst) {
3547            return Err(SochDBError::InvalidArgument(
3548                "truncate_wal is disabled while Point-in-Time Recovery is enabled \
3549                 (it would reset the durable LSN); use segment sealing to reclaim space"
3550                    .to_string(),
3551            ));
3552        }
3553        self.wal.truncate()
3554    }
3555
3556    /// Get storage statistics
3557    pub fn stats(&self) -> StorageStats {
3558        // Get WAL size from the WAL manager
3559        let wal_size = self.wal.size_bytes();
3560
3561        // Get active transaction count from MVCC
3562        let active_txns = self.mvcc.active_transaction_count();
3563
3564        StorageStats {
3565            memtable_size_bytes: self.memtable.size(),
3566            wal_size_bytes: wal_size,
3567            active_transactions: active_txns,
3568            min_active_snapshot: self.mvcc.min_active_snapshot(),
3569            last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3570        }
3571    }
3572
3573    /// Garbage collect old versions
3574    pub fn gc(&self) -> usize {
3575        let min_ts = self.mvcc.min_active_snapshot();
3576        self.memtable.gc(min_ts)
3577    }
3578
3579    /// Clean shutdown
3580    pub fn shutdown(&self) -> Result<()> {
3581        // Sync WAL
3582        self.fsync()?;
3583
3584        // Write clean shutdown marker
3585        let marker_path = self.path.join(".clean_shutdown");
3586        std::fs::write(&marker_path, b"clean")?;
3587
3588        Ok(())
3589    }
3590}
3591
3592impl Drop for DurableStorage {
3593    fn drop(&mut self) {
3594        let _ = self.shutdown();
3595    }
3596}
3597
3598// =============================================================================
3599// EphemeralHandle - Temp-directory-backed DurableStorage for testing
3600// =============================================================================
3601
3602/// Owns a `DurableStorage` instance backed by a temporary directory.
3603///
3604/// The temp directory is automatically cleaned up when this handle is dropped.
3605/// Access the underlying storage via `Deref` coercion or `.storage()`.
3606///
3607/// # Why this exists
3608///
3609/// SochDB previously had two storage engines — `LscsStorage` (BTreeMap-backed,
3610/// in-memory WAL, used by tests) and `DurableStorage` (SkipMap-backed, real WAL,
3611/// used in production). This dual-engine architecture meant bugs could surface
3612/// only in production because the test path exercised different code.
3613///
3614/// `EphemeralHandle` eliminates this divergence: tests use the exact same
3615/// `DurableStorage` engine as production, just backed by a temp directory.
3616pub struct EphemeralHandle {
3617    storage: DurableStorage,
3618    _tmpdir: tempfile::TempDir,
3619}
3620
3621impl EphemeralHandle {
3622    /// Get a reference to the underlying storage
3623    pub fn storage(&self) -> &DurableStorage {
3624        &self.storage
3625    }
3626
3627    /// Get a mutable reference to the underlying storage
3628    pub fn storage_mut(&mut self) -> &mut DurableStorage {
3629        &mut self.storage
3630    }
3631
3632    /// Consume the handle and return the storage and temp directory.
3633    ///
3634    /// Useful when you need an `Arc<DurableStorage>` — the caller must keep
3635    /// the `TempDir` alive for the lifetime of the storage.
3636    pub fn into_parts(self) -> (DurableStorage, tempfile::TempDir) {
3637        (self.storage, self._tmpdir)
3638    }
3639}
3640
3641impl std::ops::Deref for EphemeralHandle {
3642    type Target = DurableStorage;
3643    fn deref(&self) -> &DurableStorage {
3644        &self.storage
3645    }
3646}
3647
3648impl std::ops::DerefMut for EphemeralHandle {
3649    fn deref_mut(&mut self) -> &mut DurableStorage {
3650        &mut self.storage
3651    }
3652}
3653
3654/// Recovery statistics
3655#[derive(Debug, Default)]
3656pub struct RecoveryStats {
3657    pub transactions_recovered: usize,
3658    pub writes_recovered: usize,
3659    pub commit_ts: u64,
3660}
3661
3662/// Storage statistics
3663#[derive(Debug, Default)]
3664pub struct StorageStats {
3665    pub memtable_size_bytes: u64,
3666    pub wal_size_bytes: u64,
3667    pub active_transactions: usize,
3668    pub min_active_snapshot: u64,
3669    pub last_checkpoint_lsn: u64,
3670}
3671
3672#[cfg(test)]
3673mod tests {
3674    use super::*;
3675    use tempfile::tempdir;
3676
3677    #[test]
3678    fn test_basic_transaction() {
3679        let dir = tempdir().unwrap();
3680        let storage = DurableStorage::open(dir.path()).unwrap();
3681
3682        // Begin transaction
3683        let txn_id = storage.begin_transaction().unwrap();
3684
3685        // Write data
3686        storage
3687            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3688            .unwrap();
3689        storage
3690            .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3691            .unwrap();
3692
3693        // Read back (within same transaction)
3694        let v1 = storage.read(txn_id, b"key1").unwrap();
3695        assert_eq!(v1, Some(b"value1".to_vec()));
3696
3697        // Commit
3698        let commit_ts = storage.commit(txn_id).unwrap();
3699        assert!(commit_ts > 0);
3700
3701        // Read in new transaction
3702        let txn2 = storage.begin_transaction().unwrap();
3703        let v1 = storage.read(txn2, b"key1").unwrap();
3704        assert_eq!(v1, Some(b"value1".to_vec()));
3705        storage.abort(txn2).unwrap();
3706    }
3707
3708    #[test]
3709    fn test_snapshot_isolation() {
3710        let dir = tempdir().unwrap();
3711        let storage = DurableStorage::open(dir.path()).unwrap();
3712
3713        // T1: Write initial value
3714        let t1 = storage.begin_transaction().unwrap();
3715        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3716        storage.commit(t1).unwrap();
3717
3718        // T2: Start reading (snapshot at this point)
3719        let t2 = storage.begin_transaction().unwrap();
3720
3721        // T3: Update the value
3722        let t3 = storage.begin_transaction().unwrap();
3723        storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3724        storage.commit(t3).unwrap();
3725
3726        // T2 should still see v1 (snapshot isolation)
3727        let v = storage.read(t2, b"key").unwrap();
3728        assert_eq!(v, Some(b"v1".to_vec()));
3729
3730        // New transaction should see v2
3731        let t4 = storage.begin_transaction().unwrap();
3732        let v = storage.read(t4, b"key").unwrap();
3733        assert_eq!(v, Some(b"v2".to_vec()));
3734
3735        storage.abort(t2).unwrap();
3736        storage.abort(t4).unwrap();
3737    }
3738
3739    #[test]
3740    fn test_gc_preserves_versions_for_active_snapshot() {
3741        // GC must not free a version that an in-flight reader's snapshot can
3742        // still observe. The low-water-mark (min_active_snapshot) is the oldest
3743        // live reader; any version visible to it must survive GC.
3744        let dir = tempdir().unwrap();
3745        let storage = DurableStorage::open(dir.path()).unwrap();
3746
3747        // Seed an initial committed version v1.
3748        let t1 = storage.begin_transaction().unwrap();
3749        storage.write(t1, b"k".to_vec(), b"v1".to_vec()).unwrap();
3750        storage.commit(t1).unwrap();
3751
3752        // Open a long-lived snapshot reader BEFORE any newer writes. Its
3753        // snapshot_ts pins the GC watermark so v1 cannot be reclaimed.
3754        let reader = storage.begin_transaction().unwrap();
3755        assert_eq!(
3756            storage.read(reader, b"k").unwrap(),
3757            Some(b"v1".to_vec()),
3758            "reader's snapshot must see v1"
3759        );
3760
3761        // Produce several newer committed versions while the reader is active.
3762        for v in ["v2", "v3", "v4"] {
3763            let w = storage.begin_transaction().unwrap();
3764            storage
3765                .write(w, b"k".to_vec(), v.as_bytes().to_vec())
3766                .unwrap();
3767            storage.commit(w).unwrap();
3768        }
3769
3770        // Run GC. Because `reader` is still active, the watermark is pinned at
3771        // its snapshot and the version it needs (v1) must NOT be freed.
3772        let _freed = storage.gc();
3773        assert_eq!(
3774            storage.read(reader, b"k").unwrap(),
3775            Some(b"v1".to_vec()),
3776            "GC must not free a version still visible to an active snapshot"
3777        );
3778
3779        // A fresh transaction sees the latest committed version.
3780        let fresh = storage.begin_transaction().unwrap();
3781        assert_eq!(storage.read(fresh, b"k").unwrap(), Some(b"v4".to_vec()));
3782        storage.abort(fresh).unwrap();
3783
3784        // Release the old reader; the watermark can now advance.
3785        storage.abort(reader).unwrap();
3786
3787        // After the old snapshot is gone, GC may reclaim superseded versions,
3788        // and new readers still resolve the latest value correctly.
3789        let _freed2 = storage.gc();
3790        let after = storage.begin_transaction().unwrap();
3791        assert_eq!(storage.read(after, b"k").unwrap(), Some(b"v4".to_vec()));
3792        storage.abort(after).unwrap();
3793    }
3794
3795    #[test]
3796    fn test_ssi_detects_write_skew() {
3797        // Classic write-skew: two concurrent transactions each read what the
3798        // other writes (disjoint write keys). Under plain SI both would commit,
3799        // violating serializability. Under SSI the pivot transaction (with both
3800        // an inbound and an outbound rw-edge) must be aborted, so the two
3801        // commits cannot both succeed.
3802        let dir = tempdir().unwrap();
3803        let storage = DurableStorage::open(dir.path()).unwrap();
3804
3805        // Seed two keys.
3806        let seed = storage.begin_transaction().unwrap();
3807        storage.write(seed, b"x".to_vec(), b"0".to_vec()).unwrap();
3808        storage.write(seed, b"y".to_vec(), b"0".to_vec()).unwrap();
3809        storage.commit(seed).unwrap();
3810
3811        // T1 reads x and y, then writes x.
3812        let t1 = storage.begin_transaction().unwrap();
3813        let _ = storage.read(t1, b"x").unwrap();
3814        let _ = storage.read(t1, b"y").unwrap();
3815
3816        // T2 reads x and y, then writes y. (Concurrent with T1.)
3817        let t2 = storage.begin_transaction().unwrap();
3818        let _ = storage.read(t2, b"x").unwrap();
3819        let _ = storage.read(t2, b"y").unwrap();
3820
3821        storage.write(t1, b"x".to_vec(), b"1".to_vec()).unwrap();
3822        storage.write(t2, b"y".to_vec(), b"1".to_vec()).unwrap();
3823
3824        // First committer wins.
3825        let c1 = storage.commit(t1);
3826        assert!(c1.is_ok(), "first committer should succeed: {c1:?}");
3827
3828        // The second committer forms a dangerous rw-structure with T1 and must
3829        // be rejected to preserve serializability.
3830        let c2 = storage.commit(t2);
3831        assert!(
3832            c2.is_err(),
3833            "SSI must abort the write-skew pivot, but commit succeeded"
3834        );
3835    }
3836
3837    #[test]
3838    fn test_abort_transaction() {
3839        let dir = tempdir().unwrap();
3840        let storage = DurableStorage::open(dir.path()).unwrap();
3841
3842        // Write initial value
3843        let t1 = storage.begin_transaction().unwrap();
3844        storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3845        storage.commit(t1).unwrap();
3846
3847        // Start transaction that will abort
3848        let t2 = storage.begin_transaction().unwrap();
3849        storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3850        storage.abort(t2).unwrap();
3851
3852        // New transaction should see v1 (aborted changes not visible)
3853        let t3 = storage.begin_transaction().unwrap();
3854        let v = storage.read(t3, b"key").unwrap();
3855        assert_eq!(v, Some(b"v1".to_vec()));
3856        storage.abort(t3).unwrap();
3857    }
3858
3859    #[test]
3860    fn test_crash_recovery() {
3861        let dir = tempdir().unwrap();
3862
3863        // Phase 1: Write data and commit
3864        {
3865            // Use open_without_lock for crash simulation tests
3866            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3867
3868            // Set sync mode to FULL to ensure data is synced before "crash"
3869            storage.set_sync_mode(2); // FULL: sync every commit
3870
3871            let txn = storage.begin_transaction().unwrap();
3872            storage
3873                .write(txn, b"persist".to_vec(), b"data".to_vec())
3874                .unwrap();
3875            storage.commit(txn).unwrap();
3876
3877            // Simulate crash (no clean shutdown)
3878            std::mem::forget(storage);
3879        }
3880
3881        // Phase 2: Reopen and recover
3882        {
3883            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3884            let stats = storage.recover().unwrap();
3885            assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3886
3887            // Data should be recovered
3888            let txn = storage.begin_transaction().unwrap();
3889            let v = storage.read(txn, b"persist").unwrap();
3890            assert_eq!(v, Some(b"data".to_vec()));
3891            storage.abort(txn).unwrap();
3892        }
3893    }
3894
3895    /// At-rest encryption end-to-end through the live DurableStorage engine
3896    /// (Task 3B): an encrypted DB round-trips committed data, never leaks
3897    /// plaintext to the WAL file, flips the per-instance durability matrix, and
3898    /// fails CLOSED on a wrong or missing key (never a silent plaintext/empty
3899    /// open).
3900    #[test]
3901    fn test_at_rest_encryption_end_to_end() {
3902        let dir = tempdir().unwrap();
3903        let kek = || EncryptionKey::new([0xABu8; 32]);
3904
3905        // Phase 1: create an encrypted DB, write committed data, clean close.
3906        {
3907            let storage = DurableStorage::open_with_encryption(
3908                dir.path(),
3909                true,
3910                MemTableType::Standard,
3911                StorageEncryption::with_kek(kek(), "test"),
3912            )
3913            .unwrap();
3914            assert!(storage.is_encrypted());
3915            assert!(storage.durability_capabilities().at_rest_encryption);
3916            storage.set_sync_mode(2);
3917            let t = storage.begin_transaction().unwrap();
3918            storage
3919                .write(t, b"secret-key".to_vec(), b"secret-value".to_vec())
3920                .unwrap();
3921            storage.commit(t).unwrap();
3922        } // clean drop releases the file lock
3923
3924        // The keyring exists and the WAL bytes do not leak the plaintext record.
3925        assert!(dir.path().join("keyring.json").exists());
3926        let raw = std::fs::read(dir.path().join("wal.log")).unwrap();
3927        assert!(!contains(&raw, b"secret-value"), "value leaked to WAL");
3928        assert!(!contains(&raw, b"secret-key"), "key leaked to WAL");
3929
3930        // Phase 2: reopen with the CORRECT key, recover, read back.
3931        {
3932            let storage = DurableStorage::open_with_encryption(
3933                dir.path(),
3934                true,
3935                MemTableType::Standard,
3936                StorageEncryption::with_kek(kek(), "test"),
3937            )
3938            .unwrap();
3939            storage.recover().unwrap();
3940            let t = storage.begin_transaction().unwrap();
3941            assert_eq!(
3942                storage.read(t, b"secret-key").unwrap(),
3943                Some(b"secret-value".to_vec()),
3944                "committed encrypted data must round-trip"
3945            );
3946            storage.abort(t).unwrap();
3947        }
3948
3949        // Phase 3: WRONG key must fail closed (keyring canary), not open empty.
3950        {
3951            let wrong = DurableStorage::open_with_encryption(
3952                dir.path(),
3953                true,
3954                MemTableType::Standard,
3955                StorageEncryption::with_kek(EncryptionKey::new([0x00u8; 32]), "test"),
3956            );
3957            assert!(wrong.is_err(), "wrong KEK must fail closed");
3958        }
3959
3960        // Phase 4: opening an encrypted DB with NO key must fail closed.
3961        {
3962            let no_key = DurableStorage::open_with_encryption(
3963                dir.path(),
3964                true,
3965                MemTableType::Standard,
3966                StorageEncryption::disabled(),
3967            );
3968            assert!(
3969                no_key.is_err(),
3970                "encrypted DB opened without key must fail closed"
3971            );
3972        }
3973    }
3974
3975    /// A plaintext DB reports at_rest_encryption=false on the live matrix.
3976    #[test]
3977    fn test_plaintext_db_reports_unencrypted() {
3978        let dir = tempdir().unwrap();
3979        let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3980        assert!(!storage.is_encrypted());
3981        assert!(!storage.durability_capabilities().at_rest_encryption);
3982        assert!(storage.durability_capabilities().crash_recovery);
3983        assert!(!dir.path().join("keyring.json").exists());
3984    }
3985
3986    fn contains(haystack: &[u8], needle: &[u8]) -> bool {
3987        haystack.windows(needle.len()).any(|w| w == needle)
3988    }
3989
3990    /// PITR phase 1 — durable monotonic LSN anchor.
3991    ///
3992    /// Enabling PITR writes the durable manifest; the WAL record ordinal (LSN)
3993    /// and the last-checkpoint LSN then survive a reopen, and the destructive
3994    /// truncate is refused so the anchor can never reset. A non-PITR DB is
3995    /// completely unaffected (no manifest, truncate works).
3996    #[test]
3997    fn test_pitr_durable_lsn_and_truncate_guard() {
3998        let dir = tempdir().unwrap();
3999
4000        // Default DB: not PITR, no manifest, truncate allowed.
4001        {
4002            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4003            assert!(!s.is_pitr_enabled());
4004            assert!(!s.durability_capabilities().point_in_time_recovery);
4005            assert!(s.truncate_wal().is_ok());
4006        }
4007        assert!(!dir.path().join("wal.manifest").exists());
4008
4009        // Enable PITR, write+checkpoint, capture the LSN.
4010        let lsn_before;
4011        let ckpt_lsn;
4012        {
4013            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4014            s.enable_point_in_time_recovery().unwrap();
4015            assert!(s.is_pitr_enabled());
4016
4017            let t = s.begin_transaction().unwrap();
4018            s.write(t, b"k1".to_vec(), b"v1".to_vec()).unwrap();
4019            s.commit(t).unwrap();
4020            ckpt_lsn = s.checkpoint().unwrap();
4021            lsn_before = s.current_lsn();
4022            assert!(lsn_before > 0);
4023
4024            // truncate is refused while PITR is on.
4025            assert!(
4026                s.truncate_wal().is_err(),
4027                "truncate must be refused in PITR mode"
4028            );
4029        }
4030        assert!(dir.path().join("wal.manifest").exists());
4031
4032        // Reopen: PITR auto-detected from the manifest; LSN did NOT reset.
4033        {
4034            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4035            assert!(
4036                s.is_pitr_enabled(),
4037                "PITR must be auto-detected from manifest"
4038            );
4039            s.recover().unwrap();
4040            assert_eq!(
4041                s.current_lsn(),
4042                lsn_before,
4043                "durable LSN must survive reopen (not reset to 0/record-recount drift)"
4044            );
4045            assert_eq!(
4046                s.stats().last_checkpoint_lsn,
4047                ckpt_lsn,
4048                "last_checkpoint_lsn must be restored from the manifest"
4049            );
4050            // Committed data still readable.
4051            let t = s.begin_transaction().unwrap();
4052            assert_eq!(s.read(t, b"k1").unwrap(), Some(b"v1".to_vec()));
4053            s.abort(t).unwrap();
4054        }
4055    }
4056
4057    /// PITR phase 2 — END-TO-END restore to a point in time.
4058    ///
4059    /// Enable PITR, commit two transactions, then on fresh reopens
4060    /// `recover_to(target)` materializes the exact historical state: an LSN cut
4061    /// between the two transactions sees only the first; the full LSN / a
4062    /// far-future timestamp sees both; timestamp 0 sees nothing. Transaction
4063    /// atomicity is preserved at the cut.
4064    #[test]
4065    fn test_pitr_recover_to_point_in_time() {
4066        use crate::txn_wal::RecoveryTarget;
4067
4068        let dir = tempdir().unwrap();
4069
4070        // Build history: txn1 sets k1=v1; txn2 overwrites k1=v1b and adds k2=v2.
4071        let (lsn_after_txn1, lsn_after_txn2);
4072        {
4073            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4074            s.enable_point_in_time_recovery().unwrap();
4075
4076            let t1 = s.begin_transaction().unwrap();
4077            s.write(t1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
4078            s.commit(t1).unwrap();
4079            lsn_after_txn1 = s.current_lsn();
4080
4081            let t2 = s.begin_transaction().unwrap();
4082            s.write(t2, b"k1".to_vec(), b"v1b".to_vec()).unwrap();
4083            s.write(t2, b"k2".to_vec(), b"v2".to_vec()).unwrap();
4084            s.commit(t2).unwrap();
4085            lsn_after_txn2 = s.current_lsn();
4086
4087            s.checkpoint().unwrap();
4088        }
4089        assert!(lsn_after_txn2 > lsn_after_txn1);
4090
4091        // Restore to the cut between txn1 and txn2: only txn1's effect is visible.
4092        {
4093            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4094            s.recover_to(RecoveryTarget::Lsn(lsn_after_txn1)).unwrap();
4095            let t = s.begin_transaction().unwrap();
4096            assert_eq!(
4097                s.read(t, b"k1").unwrap(),
4098                Some(b"v1".to_vec()),
4099                "txn1 value"
4100            );
4101            assert_eq!(
4102                s.read(t, b"k2").unwrap(),
4103                None,
4104                "txn2 must NOT be present at the cut"
4105            );
4106            s.abort(t).unwrap();
4107        }
4108
4109        // Restore to the full LSN: both transactions visible (txn2 wins on k1).
4110        {
4111            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4112            s.recover_to(RecoveryTarget::Lsn(lsn_after_txn2)).unwrap();
4113            let t = s.begin_transaction().unwrap();
4114            assert_eq!(s.read(t, b"k1").unwrap(), Some(b"v1b".to_vec()));
4115            assert_eq!(s.read(t, b"k2").unwrap(), Some(b"v2".to_vec()));
4116            s.abort(t).unwrap();
4117        }
4118
4119        // Timestamp bounds: MAX => everything; 0 => nothing.
4120        {
4121            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4122            s.recover_to(RecoveryTarget::Timestamp(u64::MAX)).unwrap();
4123            let t = s.begin_transaction().unwrap();
4124            assert_eq!(s.read(t, b"k1").unwrap(), Some(b"v1b".to_vec()));
4125            assert_eq!(s.read(t, b"k2").unwrap(), Some(b"v2".to_vec()));
4126            s.abort(t).unwrap();
4127        }
4128        {
4129            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4130            s.recover_to(RecoveryTarget::Timestamp(0)).unwrap();
4131            let t = s.begin_transaction().unwrap();
4132            assert_eq!(s.read(t, b"k1").unwrap(), None, "no commit is <= ts 0");
4133            s.abort(t).unwrap();
4134        }
4135
4136        // The capability matrix now reports PITR live for this DB.
4137        {
4138            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4139            assert!(s.durability_capabilities().point_in_time_recovery);
4140        }
4141    }
4142
4143    /// recover_to is refused on a non-PITR database (the WAL may be truncated, so
4144    /// an arbitrary target cannot be honored).
4145    #[test]
4146    fn test_recover_to_refused_without_pitr() {
4147        use crate::txn_wal::RecoveryTarget;
4148        let dir = tempdir().unwrap();
4149        let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4150        assert!(s.recover_to(RecoveryTarget::Lsn(1)).is_err());
4151        assert!(!s.durability_capabilities().point_in_time_recovery);
4152    }
4153
4154    /// Regression (review HIGH): under the DEFAULT NORMAL sync mode a commit
4155    /// record may sit unflushed in the BufWriter while current_lsn() counts it.
4156    /// recover_to MUST flush+fsync before replaying, or a same-process restore to
4157    /// the captured LSN silently drops the committed-but-unflushed tail.
4158    #[test]
4159    fn test_recover_to_flushes_before_replay() {
4160        use crate::txn_wal::RecoveryTarget;
4161        let dir = tempdir().unwrap();
4162        let s = DurableStorage::open_without_lock(dir.path()).unwrap(); // NORMAL sync
4163        s.enable_point_in_time_recovery().unwrap();
4164        let t = s.begin_transaction().unwrap();
4165        s.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
4166        s.commit(t).unwrap(); // commit record likely unflushed under NORMAL
4167        let lsn = s.current_lsn();
4168        // No checkpoint, SAME process: replay reads a fresh on-disk handle.
4169        let stats = s.recover_to(RecoveryTarget::Lsn(lsn)).unwrap();
4170        assert!(
4171            stats.writes_recovered >= 1,
4172            "committed-but-unflushed data must be recovered (flush before replay)"
4173        );
4174    }
4175
4176    /// Regression (review MEDIUM): recover_to must be the SOLE recovery on a
4177    /// fresh open. After recover() (or a prior recover_to) it must refuse, rather
4178    /// than silently layer a stale set over the point-in-time state.
4179    #[test]
4180    fn test_recover_to_refuses_after_recovery() {
4181        use crate::txn_wal::RecoveryTarget;
4182        let dir = tempdir().unwrap();
4183        {
4184            let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4185            s.enable_point_in_time_recovery().unwrap();
4186            let t = s.begin_transaction().unwrap();
4187            s.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
4188            s.commit(t).unwrap();
4189            s.checkpoint().unwrap();
4190        }
4191        let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4192        s.recover().unwrap(); // full recovery first
4193        assert!(
4194            s.recover_to(RecoveryTarget::Lsn(1)).is_err(),
4195            "recover_to after recover() must refuse (would double-apply)"
4196        );
4197        // And a second recover_to after a first also refuses.
4198        let s2 = DurableStorage::open_without_lock(dir.path()).unwrap();
4199        s2.recover_to(RecoveryTarget::Lsn(u64::MAX)).unwrap();
4200        assert!(s2.recover_to(RecoveryTarget::Lsn(1)).is_err());
4201    }
4202
4203    /// recover_to over an ENCRYPTED WAL: the bounded replay decrypts correctly
4204    /// (review LOW: the encrypted bounded path was previously untested).
4205    #[test]
4206    fn test_pitr_recover_to_encrypted() {
4207        use crate::encryption::EncryptionKey;
4208        use crate::txn_wal::RecoveryTarget;
4209
4210        let dir = tempdir().unwrap();
4211        let kek = [0x9Fu8; 32];
4212        let mk = || StorageEncryption::with_kek(EncryptionKey::new(kek), "test");
4213
4214        let lsn_after_txn1;
4215        {
4216            let s = DurableStorage::open_with_encryption(
4217                dir.path(),
4218                true,
4219                MemTableType::Standard,
4220                mk(),
4221            )
4222            .unwrap();
4223            s.enable_point_in_time_recovery().unwrap();
4224            let t1 = s.begin_transaction().unwrap();
4225            s.write(t1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
4226            s.commit(t1).unwrap();
4227            lsn_after_txn1 = s.current_lsn();
4228            let t2 = s.begin_transaction().unwrap();
4229            s.write(t2, b"k2".to_vec(), b"v2".to_vec()).unwrap();
4230            s.commit(t2).unwrap();
4231            s.checkpoint().unwrap();
4232            s.shutdown().ok();
4233        }
4234
4235        // Reopen encrypted, restore to the cut between the two txns.
4236        let s =
4237            DurableStorage::open_with_encryption(dir.path(), true, MemTableType::Standard, mk())
4238                .unwrap();
4239        s.recover_to(RecoveryTarget::Lsn(lsn_after_txn1)).unwrap();
4240        let t = s.begin_transaction().unwrap();
4241        assert_eq!(
4242            s.read(t, b"k1").unwrap(),
4243            Some(b"v1".to_vec()),
4244            "encrypted bounded replay must decrypt the in-window record"
4245        );
4246        assert_eq!(s.read(t, b"k2").unwrap(), None, "txn2 is past the cut");
4247        s.abort(t).unwrap();
4248    }
4249
4250    /// PITR composes with at-rest encryption (the manifest anchor is independent
4251    /// of the keyring; an encrypted PITR DB round-trips and stays fail-closed).
4252    #[test]
4253    fn test_pitr_with_encryption() {
4254        use crate::encryption::EncryptionKey;
4255
4256        let dir = tempdir().unwrap();
4257        let kek = [0x2Bu8; 32];
4258
4259        {
4260            let s = DurableStorage::open_with_encryption(
4261                dir.path(),
4262                true,
4263                MemTableType::Standard,
4264                StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
4265            )
4266            .unwrap();
4267            s.enable_point_in_time_recovery().unwrap();
4268            let t = s.begin_transaction().unwrap();
4269            s.write(t, b"ek".to_vec(), b"ev".to_vec()).unwrap();
4270            s.commit(t).unwrap();
4271            s.checkpoint().unwrap();
4272            s.shutdown().ok();
4273        }
4274        assert!(dir.path().join("keyring.json").exists());
4275        assert!(dir.path().join("wal.manifest").exists());
4276
4277        // Reopen encrypted + PITR.
4278        let s = DurableStorage::open_with_encryption(
4279            dir.path(),
4280            true,
4281            MemTableType::Standard,
4282            StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
4283        )
4284        .unwrap();
4285        assert!(s.is_pitr_enabled());
4286        assert!(s.is_encrypted());
4287        s.recover().unwrap();
4288        let t = s.begin_transaction().unwrap();
4289        assert_eq!(s.read(t, b"ek").unwrap(), Some(b"ev".to_vec()));
4290        s.abort(t).unwrap();
4291    }
4292
4293    /// Crash-atomicity on an ENCRYPTED database: a simulated crash (forget) on an
4294    /// encrypted WAL must, on reopen with the correct key, replay committed data
4295    /// (decrypted via the crypto-aware recovery path) while NOT resurrecting
4296    /// aborted/in-flight writes — and a wrong key after the crash fails closed.
4297    /// Combines the at-rest-encryption + crash-atomicity guarantees.
4298    #[test]
4299    fn test_encrypted_crash_recovery_atomicity() {
4300        use crate::encryption::EncryptionKey;
4301        let dir = tempdir().unwrap();
4302        let kek = || StorageEncryption::with_kek(EncryptionKey::new([0xC1u8; 32]), "test");
4303        // open encrypted WITHOUT the file lock so a forget()+reopen works in-test.
4304        let open = |enc: StorageEncryption| {
4305            DurableStorage::open_with_full_config_internal(
4306                dir.path(),
4307                true,
4308                MemTableType::Standard,
4309                false,
4310                enc,
4311            )
4312        };
4313
4314        {
4315            let storage = open(kek()).unwrap();
4316            assert!(storage.is_encrypted());
4317            storage.set_sync_mode(2); // FULL: fsync each commit before the "crash"
4318            let t1 = storage.begin_transaction().unwrap();
4319            storage
4320                .write(t1, b"committed".to_vec(), b"durable".to_vec())
4321                .unwrap();
4322            storage.commit(t1).unwrap();
4323            let t2 = storage.begin_transaction().unwrap();
4324            storage
4325                .write(t2, b"aborted".to_vec(), b"x".to_vec())
4326                .unwrap();
4327            storage.abort(t2).unwrap();
4328            let t3 = storage.begin_transaction().unwrap();
4329            storage
4330                .write(t3, b"inflight".to_vec(), b"y".to_vec())
4331                .unwrap();
4332            std::mem::forget(storage); // crash: skip clean shutdown
4333        }
4334
4335        // Reopen with the CORRECT key: committed survives, others do not.
4336        {
4337            let storage = open(kek()).unwrap();
4338            storage.recover().unwrap();
4339            let t = storage.begin_transaction().unwrap();
4340            assert_eq!(
4341                storage.read(t, b"committed").unwrap(),
4342                Some(b"durable".to_vec()),
4343                "committed encrypted write must survive the crash"
4344            );
4345            assert_eq!(storage.read(t, b"aborted").unwrap(), None);
4346            assert_eq!(storage.read(t, b"inflight").unwrap(), None);
4347            storage.abort(t).unwrap();
4348        }
4349
4350        // Wrong key after the crash must fail closed (keyring canary).
4351        assert!(
4352            open(StorageEncryption::with_kek(
4353                EncryptionKey::new([0u8; 32]),
4354                "test"
4355            ))
4356            .is_err(),
4357            "wrong key after crash must fail closed"
4358        );
4359    }
4360
4361    /// Crash-atomicity invariant (Task 4 — completes the Task 1 single-writer
4362    /// contract). `test_crash_recovery` proves committed data survives a crash;
4363    /// this proves the other half: recovery must NOT resurrect aborted or
4364    /// in-flight (never-committed) writes. Together they assert the live
4365    /// single-writer engine's commit is atomic AND durable across a crash.
4366    #[test]
4367    fn test_crash_recovery_atomicity() {
4368        let dir = tempdir().unwrap();
4369
4370        // Phase 1: one committed write, one aborted, one in-flight at crash.
4371        {
4372            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
4373            storage.set_sync_mode(2); // FULL: fsync every commit before the "crash"
4374
4375            // Committed — must survive.
4376            let t1 = storage.begin_transaction().unwrap();
4377            storage
4378                .write(t1, b"committed".to_vec(), b"durable".to_vec())
4379                .unwrap();
4380            storage.commit(t1).unwrap();
4381
4382            // Aborted — must NOT be resurrected.
4383            let t2 = storage.begin_transaction().unwrap();
4384            storage
4385                .write(t2, b"aborted".to_vec(), b"rolledback".to_vec())
4386                .unwrap();
4387            storage.abort(t2).unwrap();
4388
4389            // In-flight (never committed) at crash time — must NOT be resurrected.
4390            let t3 = storage.begin_transaction().unwrap();
4391            storage
4392                .write(t3, b"inflight".to_vec(), b"lost".to_vec())
4393                .unwrap();
4394
4395            // Simulate a crash: skip Drop / clean shutdown.
4396            std::mem::forget(storage);
4397        }
4398
4399        // Phase 2: reopen + recover; assert atomicity.
4400        {
4401            let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
4402            storage.recover().unwrap();
4403
4404            let t = storage.begin_transaction().unwrap();
4405            assert_eq!(
4406                storage.read(t, b"committed").unwrap(),
4407                Some(b"durable".to_vec()),
4408                "committed write must survive the crash"
4409            );
4410            assert_eq!(
4411                storage.read(t, b"aborted").unwrap(),
4412                None,
4413                "aborted write must not be resurrected by recovery"
4414            );
4415            assert_eq!(
4416                storage.read(t, b"inflight").unwrap(),
4417                None,
4418                "uncommitted in-flight write must not be resurrected by recovery"
4419            );
4420            storage.abort(t).unwrap();
4421        }
4422    }
4423
4424    #[test]
4425    fn test_scan_prefix() {
4426        let dir = tempdir().unwrap();
4427        let storage = DurableStorage::open(dir.path()).unwrap();
4428
4429        let txn = storage.begin_transaction().unwrap();
4430        storage
4431            .write(txn, b"user:1".to_vec(), b"alice".to_vec())
4432            .unwrap();
4433        storage
4434            .write(txn, b"user:2".to_vec(), b"bob".to_vec())
4435            .unwrap();
4436        storage
4437            .write(txn, b"order:1".to_vec(), b"order1".to_vec())
4438            .unwrap();
4439        storage.commit(txn).unwrap();
4440
4441        let txn2 = storage.begin_transaction().unwrap();
4442        let users = storage.scan(txn2, b"user:").unwrap();
4443        assert_eq!(users.len(), 2);
4444        storage.abort(txn2).unwrap();
4445    }
4446
4447    #[test]
4448    fn test_delete() {
4449        let dir = tempdir().unwrap();
4450        let storage = DurableStorage::open(dir.path()).unwrap();
4451
4452        // Insert
4453        let t1 = storage.begin_transaction().unwrap();
4454        storage
4455            .write(t1, b"key".to_vec(), b"value".to_vec())
4456            .unwrap();
4457        storage.commit(t1).unwrap();
4458
4459        // Verify exists
4460        let t2 = storage.begin_transaction().unwrap();
4461        assert!(storage.read(t2, b"key").unwrap().is_some());
4462        storage.abort(t2).unwrap();
4463
4464        // Delete
4465        let t3 = storage.begin_transaction().unwrap();
4466        storage.delete(t3, b"key".to_vec()).unwrap();
4467        storage.commit(t3).unwrap();
4468
4469        // Verify deleted
4470        let t4 = storage.begin_transaction().unwrap();
4471        assert!(storage.read(t4, b"key").unwrap().is_none());
4472        storage.abort(t4).unwrap();
4473    }
4474
4475    #[test]
4476    fn test_gc() {
4477        let dir = tempdir().unwrap();
4478        let storage = DurableStorage::open(dir.path()).unwrap();
4479
4480        // Create multiple versions
4481        for i in 0..10 {
4482            let txn = storage.begin_transaction().unwrap();
4483            storage
4484                .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
4485                .unwrap();
4486            storage.commit(txn).unwrap();
4487        }
4488
4489        // GC should reclaim old versions
4490        let gc_count = storage.gc();
4491        // At least some versions should be collected
4492        // (exact count depends on implementation)
4493        let _ = gc_count; // gc_count is usize, always >= 0
4494    }
4495
4496    #[test]
4497    fn test_group_commit() {
4498        use std::sync::Arc;
4499        use std::thread;
4500
4501        let dir = tempdir().unwrap();
4502        let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
4503
4504        // Spawn multiple threads to commit concurrently
4505        let mut handles = vec![];
4506        for i in 0..4 {
4507            let storage = Arc::clone(&storage);
4508            handles.push(thread::spawn(move || {
4509                let txn = storage.begin_transaction().unwrap();
4510                storage
4511                    .write(
4512                        txn,
4513                        format!("key{}", i).into_bytes(),
4514                        format!("val{}", i).into_bytes(),
4515                    )
4516                    .unwrap();
4517                storage.commit(txn).unwrap()
4518            }));
4519        }
4520
4521        // Wait for all commits
4522        let mut commit_times = vec![];
4523        for h in handles {
4524            commit_times.push(h.join().unwrap());
4525        }
4526
4527        // All commits should succeed
4528        assert!(commit_times.iter().all(|&ts| ts > 0));
4529
4530        // Verify data persisted
4531        let txn = storage.begin_transaction().unwrap();
4532        for i in 0..4 {
4533            let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
4534            assert_eq!(val, Some(format!("val{}", i).into_bytes()));
4535        }
4536        storage.abort(txn).unwrap();
4537    }
4538
4539    // ==================== ArenaMvccMemTable Tests ====================
4540
4541    #[test]
4542    fn test_arena_memtable_basic_write_read() {
4543        let memtable = ArenaMvccMemTable::new();
4544
4545        // Write some values
4546        memtable
4547            .write(b"key1", Some(b"value1".to_vec()), 1)
4548            .unwrap();
4549        memtable
4550            .write(b"key2", Some(b"value2".to_vec()), 1)
4551            .unwrap();
4552
4553        // Read them back (uncommitted, so need txn_id match)
4554        assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
4555        assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
4556        assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
4557    }
4558
4559    #[test]
4560    fn test_arena_memtable_update() {
4561        let memtable = ArenaMvccMemTable::new();
4562
4563        memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
4564        memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
4565
4566        assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
4567    }
4568
4569    #[test]
4570    fn test_arena_memtable_delete() {
4571        let memtable = ArenaMvccMemTable::new();
4572
4573        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4574        memtable.write(b"key", None, 1).unwrap(); // Delete = None value
4575
4576        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4577    }
4578
4579    #[test]
4580    fn test_arena_memtable_scan_prefix() {
4581        let memtable = ArenaMvccMemTable::new();
4582
4583        memtable
4584            .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
4585            .unwrap();
4586        memtable
4587            .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
4588            .unwrap();
4589        memtable
4590            .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
4591            .unwrap();
4592        memtable
4593            .write(b"order:1", Some(b"order_data".to_vec()), 1)
4594            .unwrap();
4595
4596        // Create a write set and commit
4597        let mut write_set = HashSet::new();
4598        write_set.insert(InlineKey::from_slice(b"user:1:name"));
4599        write_set.insert(InlineKey::from_slice(b"user:1:email"));
4600        write_set.insert(InlineKey::from_slice(b"user:2:name"));
4601        write_set.insert(InlineKey::from_slice(b"order:1"));
4602        memtable.commit(1, 10, &write_set);
4603
4604        // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
4605        let results = memtable.scan_prefix(b"user:1:", 11, None);
4606        assert_eq!(results.len(), 2);
4607
4608        // Scan for all users
4609        let results = memtable.scan_prefix(b"user:", 11, None);
4610        assert_eq!(results.len(), 3);
4611    }
4612
4613    #[test]
4614    fn test_arena_memtable_write_batch() {
4615        let memtable = ArenaMvccMemTable::new();
4616
4617        let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
4618            (b"k1", Some(b"v1".to_vec())),
4619            (b"k2", Some(b"v2".to_vec())),
4620            (b"k3", Some(b"v3".to_vec())),
4621        ];
4622
4623        memtable.write_batch(&writes, 1).unwrap();
4624
4625        assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
4626        assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
4627        assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
4628    }
4629
4630    #[test]
4631    fn test_arena_memtable_gc() {
4632        let memtable = ArenaMvccMemTable::new();
4633
4634        // Write multiple versions
4635        for i in 0..10 {
4636            memtable
4637                .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
4638                .unwrap();
4639
4640            let mut write_set = HashSet::new();
4641            write_set.insert(InlineKey::from_slice(b"key"));
4642            memtable.commit(i + 1, (i + 1) * 10, &write_set);
4643        }
4644
4645        // GC old versions
4646        let gc_count = memtable.gc(90);
4647        let _ = gc_count; // gc_count is usize, always >= 0
4648    }
4649
4650    #[test]
4651    fn test_arena_memtable_size_tracking() {
4652        let memtable = ArenaMvccMemTable::new();
4653
4654        assert_eq!(memtable.size(), 0);
4655
4656        memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4657
4658        assert!(memtable.size() > 0);
4659    }
4660
4661    #[test]
4662    fn test_arena_memtable_abort() {
4663        let memtable = ArenaMvccMemTable::new();
4664
4665        memtable
4666            .write(b"key", Some(b"uncommitted".to_vec()), 1)
4667            .unwrap();
4668
4669        // Visible to same txn
4670        assert_eq!(
4671            memtable.read(b"key", 0, Some(1)),
4672            Some(b"uncommitted".to_vec())
4673        );
4674
4675        // Not visible to other txns
4676        assert_eq!(memtable.read(b"key", 0, Some(2)), None);
4677
4678        // Abort
4679        memtable.abort(1);
4680
4681        // No longer visible
4682        assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4683    }
4684
4685    // ========================================================================
4686    // MemTableKind Tests - Unified Abstraction
4687    // ========================================================================
4688
4689    #[test]
4690    fn test_memtable_kind_standard() {
4691        let memtable = MemTableKind::new(MemTableType::Standard, true);
4692        assert_eq!(memtable.kind(), MemTableType::Standard);
4693
4694        // Write and read
4695        memtable
4696            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4697            .unwrap();
4698
4699        // Commit transaction at ts=100
4700        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4701        memtable.commit(1, 100, &write_set);
4702
4703        // Read after commit - snapshot_ts must be > commit_ts for visibility
4704        let v = memtable.read(b"key1", 101, None);
4705        assert_eq!(v, Some(b"value1".to_vec()));
4706    }
4707
4708    #[test]
4709    fn test_memtable_kind_arena() {
4710        let memtable = MemTableKind::new(MemTableType::Arena, true);
4711        assert_eq!(memtable.kind(), MemTableType::Arena);
4712
4713        // Write and read
4714        memtable
4715            .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4716            .unwrap();
4717
4718        // Commit at ts=100
4719        let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4720        memtable.commit(1, 100, &write_set);
4721
4722        // Read after commit - snapshot_ts > commit_ts
4723        let v = memtable.read(b"key1", 101, None);
4724        assert_eq!(v, Some(b"value1".to_vec()));
4725    }
4726
4727    #[test]
4728    fn test_memtable_kind_scan_range() {
4729        // Test both implementations have consistent behavior
4730        for kind in [MemTableType::Standard, MemTableType::Arena] {
4731            let memtable = MemTableKind::new(kind, true);
4732
4733            // Write some data
4734            for i in 0..5 {
4735                let key = format!("key{}", i);
4736                let value = format!("value{}", i);
4737                memtable
4738                    .write(key.into_bytes(), Some(value.into_bytes()), 1)
4739                    .unwrap();
4740            }
4741
4742            // Commit all at ts=100
4743            let write_set: HashSet<InlineKey> = (0..5)
4744                .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
4745                .collect();
4746            memtable.commit(1, 100, &write_set);
4747
4748            // Scan range with snapshot_ts > commit_ts
4749            let results = memtable.scan_range(b"key1", b"key4", 101, None);
4750            assert_eq!(
4751                results.len(),
4752                3,
4753                "kind={:?} should have 3 results (key1, key2, key3)",
4754                kind
4755            );
4756        }
4757    }
4758
4759    #[test]
4760    fn test_durable_storage_arena() {
4761        let dir = tempdir().unwrap();
4762        let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
4763
4764        assert_eq!(storage.memtable_type(), MemTableType::Arena);
4765
4766        // Basic transaction should work the same
4767        let txn_id = storage.begin_transaction().unwrap();
4768        storage
4769            .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
4770            .unwrap();
4771        storage.commit(txn_id).unwrap();
4772
4773        let txn2 = storage.begin_transaction().unwrap();
4774        let v = storage.read(txn2, b"key1").unwrap();
4775        assert_eq!(v, Some(b"value1".to_vec()));
4776        storage.abort(txn2).unwrap();
4777    }
4778
4779    #[test]
4780    fn test_durable_storage_full_config() {
4781        let dir = tempdir().unwrap();
4782
4783        // Test with Arena and ordered index enabled
4784        let storage =
4785            DurableStorage::open_with_full_config(dir.path(), true, MemTableType::Arena).unwrap();
4786
4787        assert_eq!(storage.memtable_type(), MemTableType::Arena);
4788
4789        // Write multiple keys
4790        let txn = storage.begin_transaction().unwrap();
4791        for i in 0..10 {
4792            let key = format!("key{:02}", i);
4793            let value = format!("value{}", i);
4794            storage
4795                .write(txn, key.into_bytes(), value.into_bytes())
4796                .unwrap();
4797        }
4798        storage.commit(txn).unwrap();
4799
4800        // Scan should work (uses scan method for prefix)
4801        let txn2 = storage.begin_transaction().unwrap();
4802        let results = storage.scan(txn2, b"key0").unwrap();
4803        assert_eq!(results.len(), 10); // key00 through key09
4804        storage.abort(txn2).unwrap();
4805    }
4806}