sochdb_storage/durable_storage.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! Wires together the live storage components into a durable engine:
21//!
22//! - WAL (txn_wal.rs) for crash-consistent durability + recovery
23//! - Group Commit for throughput
24//! - MVCC for isolation
25//! - LSCS for columnar efficiency
26//!
27//! Truth-in-capabilities: the live path provides crash-consistent WAL recovery,
28//! but NOT at-rest encryption, point-in-time recovery, ARIES checkpointing, or
29//! WAL fencing — those modules exist but are quarantined behind the empty,
30//! non-default `experimental` feature and are unwired. Query
31//! [`crate::durability_capabilities`] rather than relying on prose like
32//! "production-grade".
33//!
34//! ## Architecture
35//!
36//! ```text
37//! ┌─────────────────────────────────────────────────────────────────┐
38//! │ DurableStorage │
39//! ├─────────────────────────────────────────────────────────────────┤
40//! │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
41//! │ │ MvccManager │ │ GroupCommit │───▶│ TxnWal (fsync) │ │
42//! │ │ │ │ │ └─────────────────────┘ │
43//! │ │ ┌─────────┐ │ └─────────────┘ │
44//! │ │ │Snapshots│ │ │
45//! │ │ └─────────┘ │ ┌─────────────────────────────────────────┐│
46//! │ │ ┌─────────┐ │ │ MemTable ││
47//! │ │ │ Txn Map │ │ │ (key → (value, txn_id, version)) ││
48//! │ │ └─────────┘ │ └─────────────────────────────────────────┘│
49//! │ └─────────────┘ │
50//! │ ┌─────────────────────────────────────────┐│
51//! │ │ LSCS (SST) ││
52//! │ │ Immutable columnar segments ││
53//! │ └─────────────────────────────────────────┘│
54//! └─────────────────────────────────────────────────────────────────┘
55//! ```
56//!
57//! ## Concurrency
58//!
59//! - Writers: Serialize through WAL, use MVCC for conflict detection
60//! - Readers: Lock-free reads at snapshot timestamp
61//! - Commits: Batched through GroupCommit for throughput
62//!
63//! ## Isolation Contract
64//!
65//! The live write path (`MvccManager`, used by `DurableStorage`) provides
66//! **Serializable Snapshot Isolation (SSI)**, which is strictly stronger than
67//! plain Snapshot Isolation (SI):
68//!
69//! - **Snapshot reads.** Every transaction reads from a consistent snapshot
70//! fixed at `begin_transaction()` (its `snapshot_ts`). Concurrent commits are
71//! invisible to it — see `test_snapshot_isolation`. This alone is SI and is
72//! vulnerable to **write skew** (two transactions each read a set the other
73//! writes, both commit, and no serial order reproduces the result).
74//! - **Write-skew prevention.** On commit, `MvccManager::validate_ssi` inspects
75//! recently-committed concurrent transactions for rw-antidependency edges:
76//! an inbound edge (another txn wrote a key we read, `T_other →rw→ T_me`) and
77//! an outbound edge (we wrote a key another txn read, `T_me →rw→ T_other`).
78//! When a transaction sits on **both** an inbound and an outbound rw-edge it
79//! is the pivot of a potential dependency cycle (Cahill/Fekete "dangerous
80//! structure"), so it is aborted. This is the conservative safe subset of the
81//! SSI test: it may abort some serializable schedules (false positives) but
82//! **never admits a non-serializable one** (no false negatives), so the
83//! externally observable isolation level is Serializable.
84//! - **Read-only transactions** (`begin_read_only`) skip read-set tracking and
85//! never participate in validation; they always observe a serializable
86//! snapshot and never abort.
87//!
88//! ### MVCC garbage collection is snapshot-safe
89//!
90//! Old versions are pruned by `DurableStorage::gc()`, which feeds the
91//! **low-water-mark** `MvccManager::min_active_snapshot()` — the minimum
92//! `snapshot_ts` across all still-active transactions — into
93//! `MvccMemTable::gc()` and then `BinarySearchChain::gc_by_ts()`. The chain
94//! retains every version with `commit_ts > min_active_ts` **plus one anchor
95//! version at or below the watermark**, so any in-flight reader can still
96//! resolve the correct version for its snapshot. A version is only freed once
97//! **no active snapshot can observe it**. The watermark is recomputed on every
98//! `begin`/`commit`/`abort`, so it is monotonic with respect to the oldest live
99//! reader. See `test_gc_preserves_versions_for_active_snapshot`.
100
101use std::collections::HashSet;
102use std::path::{Path, PathBuf};
103use std::sync::Arc;
104use std::sync::atomic::{AtomicU64, Ordering};
105
106use dashmap::DashMap;
107use smallvec::SmallVec;
108
109use crossbeam_skiplist::SkipMap;
110
111use crate::DurabilityCapabilities;
112use crate::deferred_index::{DeferredIndexConfig, DeferredSortedIndex};
113use crate::encryption::EncryptionKey;
114use crate::group_commit::EventDrivenGroupCommit;
115use crate::keyring;
116use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
117use sochdb_core::version_chain::{
118 BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
119 VisibilityContext, WriteConflictDetection,
120};
121use sochdb_core::{Result, SochDBError};
122
123// =============================================================================
124// SSI Bloom Filter - Fast Conflict Pre-Filtering
125// =============================================================================
126
127/// Space-efficient Bloom filter for SSI conflict detection
128///
129/// Used to quickly determine if two transactions MIGHT have conflicting keys.
130/// False positives are acceptable (leads to unnecessary exact checks),
131/// but false negatives are not allowed.
132///
133/// ## Configuration
134///
135/// For 1000 keys with 1% false positive rate:
136/// - m = ~9600 bits ≈ 1.2 KB per transaction
137/// - k = 7 hash functions
138///
139/// ## Lazy Initialization
140///
141/// The bit vector is lazily initialized on first insert to avoid
142/// allocation overhead for read-only transactions.
143#[derive(Clone, Debug)]
144pub struct SsiBloomFilter {
145 /// Bit vector (each u64 holds 64 bits) - lazily initialized
146 bits: Option<Vec<u64>>,
147 /// Expected capacity (used for lazy init sizing)
148 expected_capacity: usize,
149 /// Number of hash functions to use
150 num_hashes: u32,
151}
152
153impl SsiBloomFilter {
154 /// Optimal number of bits per item for 1% false positive rate
155 /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
156 const BITS_PER_ITEM: f64 = 9.6;
157
158 /// Optimal number of hash functions for 1% false positive rate
159 /// k = (m/n) × ln(2) ≈ 7
160 const DEFAULT_NUM_HASHES: u32 = 7;
161
162 /// Minimum capacity to avoid tiny filters
163 const MIN_CAPACITY: usize = 64;
164
165 /// Create a new bloom filter for expected item count (lazy allocation)
166 ///
167 /// Configured for ~1% false positive rate.
168 /// The bit vector is not allocated until first insert.
169 #[inline]
170 pub fn new(expected_items: usize) -> Self {
171 Self {
172 bits: None,
173 expected_capacity: expected_items.max(Self::MIN_CAPACITY),
174 num_hashes: Self::DEFAULT_NUM_HASHES,
175 }
176 }
177
178 /// Create with specific capacity in words (for memory-constrained scenarios)
179 pub fn with_word_capacity(words: usize) -> Self {
180 Self {
181 bits: None,
182 expected_capacity: words.max(1) * 64 / 10, // Approx items from words
183 num_hashes: Self::DEFAULT_NUM_HASHES,
184 }
185 }
186
187 /// Ensure bits are allocated (lazy initialization)
188 #[inline]
189 fn ensure_allocated(&mut self) {
190 if self.bits.is_none() {
191 let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
192 let num_words = num_bits.div_ceil(64);
193 self.bits = Some(vec![0u64; num_words]);
194 }
195 }
196
197 /// Add a key to the filter - O(k) where k = num_hashes
198 #[inline]
199 pub fn insert(&mut self, key: &[u8]) {
200 self.ensure_allocated();
201 let bits = self.bits.as_mut().unwrap();
202 let num_bits = bits.len() * 64;
203 if num_bits == 0 {
204 return;
205 }
206
207 // Use two hash functions to simulate k hash functions
208 // h(i) = h1 + i * h2 (double hashing technique)
209 let h1 = Self::hash1(key);
210 let h2 = Self::hash2(key);
211
212 for i in 0..self.num_hashes {
213 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
214 let bit_idx = (h as usize) % num_bits;
215 let word_idx = bit_idx / 64;
216 let bit_pos = bit_idx % 64;
217 bits[word_idx] |= 1 << bit_pos;
218 }
219 }
220
221 /// Check if a key might be present - O(k)
222 ///
223 /// Returns:
224 /// - false: Key is definitely NOT in the set (or filter not initialized)
225 /// - true: Key MIGHT be in the set (needs exact check)
226 #[inline]
227 pub fn may_contain(&self, key: &[u8]) -> bool {
228 let bits = match &self.bits {
229 Some(b) => b,
230 None => return false, // Uninitialized = empty
231 };
232 let num_bits = bits.len() * 64;
233 if num_bits == 0 {
234 return false;
235 }
236
237 let h1 = Self::hash1(key);
238 let h2 = Self::hash2(key);
239
240 for i in 0..self.num_hashes {
241 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
242 let bit_idx = (h as usize) % num_bits;
243 let word_idx = bit_idx / 64;
244 let bit_pos = bit_idx % 64;
245 if bits[word_idx] & (1 << bit_pos) == 0 {
246 return false; // Definitely not present
247 }
248 }
249 true // Might be present
250 }
251
252 /// Check if this filter might intersect with another
253 ///
254 /// Fast O(m/64) check using bitwise AND of all words.
255 /// If no bits are shared, sets are definitely disjoint.
256 #[inline]
257 pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
258 let (self_bits, other_bits) = match (&self.bits, &other.bits) {
259 (Some(s), Some(o)) => (s, o),
260 _ => return false, // Either uninitialized = no intersection
261 };
262 let min_len = self_bits.len().min(other_bits.len());
263 for i in 0..min_len {
264 if self_bits[i] & other_bits[i] != 0 {
265 return true; // Might intersect
266 }
267 }
268 false // Definitely disjoint
269 }
270
271 /// First hash function (using built-in hasher)
272 #[inline]
273 fn hash1(key: &[u8]) -> u64 {
274 use std::collections::hash_map::DefaultHasher;
275 use std::hash::{Hash, Hasher};
276 let mut hasher = DefaultHasher::new();
277 key.hash(&mut hasher);
278 hasher.finish()
279 }
280
281 /// Second hash function (using twox-hash for independence)
282 #[inline]
283 fn hash2(key: &[u8]) -> u64 {
284 twox_hash::xxh3::hash64(key)
285 }
286
287 /// Get the memory size in bytes
288 pub fn size_bytes(&self) -> usize {
289 self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
290 }
291
292 /// Check if the filter is empty
293 pub fn is_empty(&self) -> bool {
294 match &self.bits {
295 Some(bits) => bits.iter().all(|&w| w == 0),
296 None => true,
297 }
298 }
299}
300
301/// Type alias for inline key storage - keys up to 32 bytes stored on stack
302/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
303pub type InlineKey = SmallVec<[u8; 32]>;
304
305/// Version of a key-value pair
306#[derive(Debug, Clone)]
307pub struct Version {
308 /// The value (None = tombstone)
309 pub value: Option<Vec<u8>>,
310 /// Transaction that created this version
311 pub txn_id: u64,
312 /// Commit timestamp (0 = uncommitted)
313 pub commit_ts: u64,
314}
315
316// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
317impl ChainEntry for Version {
318 #[inline]
319 fn commit_ts(&self) -> u64 {
320 self.commit_ts
321 }
322 #[inline]
323 fn txn_id(&self) -> u64 {
324 self.txn_id
325 }
326 #[inline]
327 fn set_commit_ts(&mut self, ts: u64) {
328 self.commit_ts = ts;
329 }
330}
331
332// ============================================================================
333// Optimized VersionChain with Binary Search (Task 1: mm.md)
334// ============================================================================
335
336/// Multi-version data for a single key with O(log v) read complexity
337///
338/// ## Optimization: Binary Search with Sorted Commit Ordering
339///
340/// Separates committed versions (sorted descending by commit_ts) from
341/// uncommitted version (single optional slot per transaction).
342///
343/// **Before:** O(v) linear scan + O(v) max computation = O(v)
344/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
345///
346/// For v=10 versions: 3.3x speedup
347/// For v=100 versions: 7x speedup
348///
349/// ## Rec 11: Consolidated
350///
351/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
352/// eliminating duplication with `mvcc_concurrent::VersionChain`.
353#[derive(Debug, Default)]
354pub struct VersionChain {
355 /// Consolidated binary-search chain (Rec 11)
356 inner: BinarySearchChain<Version>,
357}
358
359impl VersionChain {
360 /// Create a new empty version chain
361 #[inline]
362 pub fn new() -> Self {
363 Self {
364 inner: BinarySearchChain::new(),
365 }
366 }
367
368 /// Add a new uncommitted version
369 /// If there's already an uncommitted version from this txn, update it in place
370 ///
371 /// O(1) - just updates the uncommitted slot
372 #[inline]
373 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
374 match self.inner.uncommitted_mut() {
375 Some(v) if v.txn_id == txn_id => {
376 // Update in place - O(1)
377 v.value = value;
378 }
379 _ => {
380 // New or different txn — set the slot
381 self.inner.set_uncommitted(Version {
382 value,
383 txn_id,
384 commit_ts: 0,
385 });
386 }
387 }
388 }
389
390 /// Commit a version - moves from uncommitted slot to sorted committed list
391 ///
392 /// O(log v) - inserts into sorted position using binary search
393 #[inline]
394 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
395 self.inner.commit(txn_id, commit_ts)
396 }
397
398 /// Abort a version (remove uncommitted version for txn)
399 ///
400 /// O(1) - just clears the uncommitted slot if it matches
401 #[inline]
402 pub fn abort(&mut self, txn_id: u64) {
403 self.inner.abort(txn_id);
404 }
405
406 /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
407 ///
408 /// ## Complexity: O(1) + O(log v) = O(log v)
409 #[inline]
410 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
411 self.inner.read_at(snapshot_ts, current_txn_id)
412 }
413
414 /// Check if there's an uncommitted version by another transaction
415 ///
416 /// O(1) - just checks the uncommitted slot
417 #[inline]
418 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
419 self.inner.has_write_conflict(my_txn_id)
420 }
421
422 /// Garbage collect old versions
423 pub fn gc(&mut self, min_active_ts: u64) {
424 self.inner.gc_by_ts(min_active_ts);
425 }
426
427 /// Get total version count (committed + uncommitted)
428 #[inline]
429 pub fn version_count(&self) -> usize {
430 self.inner.version_count()
431 }
432
433 // Legacy compatibility: get versions vec (for tests)
434 #[cfg(test)]
435 pub fn versions(&self) -> Vec<Version> {
436 let mut result = self.inner.committed_versions().to_vec();
437 if let Some(v) = self.inner.uncommitted() {
438 result.push(v.clone());
439 }
440 result
441 }
442}
443
444// =============================================================================
445// Rec 6: Unified Version Chain Trait Implementations
446// =============================================================================
447
448impl MvccVersionChain for VersionChain {
449 type Value = Option<Vec<u8>>;
450
451 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
452 // Delegate to BinarySearchChain, then project to value field
453 self.inner
454 .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
455 .map(|v| &v.value)
456 }
457
458 fn get_latest(&self) -> Option<&Self::Value> {
459 self.inner.latest().map(|v| &v.value)
460 }
461
462 fn version_count(&self) -> usize {
463 self.inner.version_count()
464 }
465}
466
467impl MvccVersionChainMut for VersionChain {
468 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
469 self.add_uncommitted(value, txn_id);
470 }
471
472 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
473 self.inner.commit(txn_id, commit_ts)
474 }
475
476 fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
477 // Insert a tombstone (None value) as uncommitted
478 self.add_uncommitted(None, txn_id);
479 true
480 }
481
482 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
483 let before = self.inner.committed_count();
484 self.inner.gc_by_ts(min_visible_ts);
485 let removed = before - self.inner.committed_count();
486 (removed, removed * std::mem::size_of::<Version>())
487 }
488}
489
490impl WriteConflictDetection for VersionChain {
491 fn has_write_conflict(&self, txn_id: TxnId) -> bool {
492 self.has_write_conflict(txn_id)
493 }
494}
495
496// =============================================================================
497// Pre-sizing Constants to Avoid HashSet Resize Overhead
498// =============================================================================
499
500/// Default capacity for write_set HashSet
501/// Sized for typical OLTP transactions (10-50 keys)
502/// Avoids resize overhead that caused +11% regression
503const WRITE_SET_INITIAL_CAPACITY: usize = 32;
504
505/// Default capacity for read_set HashSet
506/// Typically larger than write_set due to read-heavy patterns
507const READ_SET_INITIAL_CAPACITY: usize = 64;
508
509/// Transaction state for MVCC
510#[derive(Debug, Clone)]
511pub struct MvccTransaction {
512 /// Transaction ID
513 pub txn_id: u64,
514 /// Snapshot timestamp (reads see commits before this)
515 pub snapshot_ts: u64,
516 /// Keys written by this transaction - uses SmallVec for inline storage
517 /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
518 pub write_set: HashSet<InlineKey>,
519 /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
520 /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
521 pub read_set: HashSet<InlineKey>,
522 /// Bloom filter for write set - fast SSI pre-filtering
523 pub write_bloom: SsiBloomFilter,
524 /// Bloom filter for read set - fast SSI pre-filtering
525 pub read_bloom: SsiBloomFilter,
526 /// Transaction state
527 pub state: TxnState,
528 /// Transaction mode for SSI optimization (Recommendation 9)
529 /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
530 pub mode: TransactionMode,
531}
532
533impl MvccTransaction {
534 /// Create a new transaction with pre-sized collections
535 ///
536 /// This avoids HashSet resize overhead during the transaction
537 /// which was causing +11% regression on write_set.insert().
538 #[inline]
539 pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
540 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
541 }
542
543 /// Create a read-only transaction (SSI bypass - 2.6x faster)
544 ///
545 /// Read-only transactions skip all SSI tracking:
546 /// - No read_set allocation
547 /// - No read_bloom allocation
548 /// - No commit validation
549 ///
550 /// ## Performance
551 ///
552 /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
553 #[inline]
554 pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
555 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
556 }
557
558 /// Create a write-only transaction (partial SSI bypass)
559 ///
560 /// Write-only transactions skip read tracking:
561 /// - No read_set tracking
562 /// - No read_bloom inserts
563 /// - Still needs write_set for commit
564 #[inline]
565 pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
566 Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
567 }
568
569 /// Create transaction with specific mode
570 #[inline]
571 pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
572 // Optimize allocation based on mode
573 let (write_capacity, read_capacity) = match mode {
574 TransactionMode::ReadOnly => (0, 0), // No tracking needed
575 TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
576 TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
577 };
578 Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
579 }
580
581 /// Create with custom capacities for expected workload
582 ///
583 /// Use this when you know the transaction will write many keys
584 /// to avoid resize overhead entirely.
585 #[inline]
586 pub fn with_capacity(
587 txn_id: u64,
588 snapshot_ts: u64,
589 write_capacity: usize,
590 read_capacity: usize,
591 mode: TransactionMode,
592 ) -> Self {
593 Self {
594 txn_id,
595 snapshot_ts,
596 write_set: HashSet::with_capacity(write_capacity),
597 read_set: HashSet::with_capacity(read_capacity),
598 write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
599 read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
600 state: TxnState::Active,
601 mode,
602 }
603 }
604
605 /// Check if this is a read-only transaction
606 #[inline]
607 pub fn is_read_only(&self) -> bool {
608 self.write_set.is_empty()
609 }
610
611 /// Check if this is a single-key write transaction
612 #[inline]
613 pub fn is_single_key_write(&self) -> bool {
614 self.write_set.len() == 1 && self.read_set.len() <= 1
615 }
616}
617
618/// Transaction state
619#[derive(Debug, Clone, Copy, PartialEq, Eq)]
620pub enum TxnState {
621 Active,
622 Committed,
623 Aborted,
624}
625
626// =============================================================================
627// Transaction Mode for SSI Bypass (Recommendation 9)
628// =============================================================================
629
630/// Transaction mode for SSI optimization
631///
632/// By classifying transactions at begin time, we can skip expensive SSI
633/// tracking for the majority of transactions:
634///
635/// | Mode | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
636/// |-----------|-------------------|--------------------|-----------------|
637/// | ReadOnly | None | None | ~10 ns |
638/// | WriteOnly | None | Full | ~30 ns |
639/// | ReadWrite | Full | Full | ~50 ns |
640///
641/// ## Performance Analysis
642///
643/// For read-only transactions (typically 90% of workload):
644/// ```text
645/// Current: T_txn = T_begin + N × (T_read + T_record) + T_commit
646/// = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
647///
648/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
649/// = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
650///
651/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
652/// ```
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
654pub enum TransactionMode {
655 /// Read-only transaction - skips ALL SSI tracking
656 /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
657 /// Safe to skip read_set, read_bloom, and commit validation entirely
658 ReadOnly,
659
660 /// Write-only transaction - skips read tracking
661 /// Cannot form incoming rw-edges (no reads from concurrent writers)
662 /// Only needs write_set and write_bloom tracking
663 WriteOnly,
664
665 /// Full read-write transaction (default) - complete SSI tracking
666 /// May form both incoming and outgoing rw-edges
667 /// Requires full validation at commit time
668 #[default]
669 ReadWrite,
670}
671
672impl TransactionMode {
673 /// Check if this mode requires read tracking
674 #[inline]
675 pub fn tracks_reads(&self) -> bool {
676 matches!(self, TransactionMode::ReadWrite)
677 }
678
679 /// Check if this mode requires write tracking
680 #[inline]
681 pub fn tracks_writes(&self) -> bool {
682 matches!(
683 self,
684 TransactionMode::WriteOnly | TransactionMode::ReadWrite
685 )
686 }
687
688 /// Check if commit needs SSI validation
689 #[inline]
690 pub fn needs_ssi_validation(&self) -> bool {
691 matches!(self, TransactionMode::ReadWrite)
692 }
693}
694
695/// SSI conflict edge type
696#[derive(Debug, Clone, Copy, PartialEq, Eq)]
697pub enum ConflictType {
698 /// Read-write conflict: T1 reads X, then T2 writes X
699 ReadWrite,
700 /// Write-read conflict: T1 writes X, then T2 reads X
701 WriteRead,
702}
703
704/// SSI conflict edge for dangerous structure detection
705#[derive(Debug, Clone)]
706pub struct ConflictEdge {
707 /// Source transaction
708 pub from_txn: u64,
709 /// Target transaction
710 pub to_txn: u64,
711 /// Type of conflict
712 pub conflict_type: ConflictType,
713}
714
715/// MVCC Manager with SSI support
716///
717/// Uses DashMap for lock-free per-transaction access.
718/// Implements Serializable Snapshot Isolation (SSI) with
719/// dangerous structure detection for rw-antidependency cycles.
720#[allow(clippy::type_complexity)]
721pub struct MvccManager {
722 /// Active transactions (sharded for concurrent access)
723 active_txns: DashMap<u64, MvccTransaction>,
724 /// Current timestamp counter
725 ts_counter: AtomicU64,
726 /// Minimum active snapshot timestamp (for GC)
727 min_active_ts: AtomicU64,
728 /// Recently committed transactions for SSI validation
729 /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
730 /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
731 recent_commits: DashMap<
732 u64,
733 (
734 u64,
735 SsiBloomFilter,
736 SsiBloomFilter,
737 HashSet<InlineKey>,
738 HashSet<InlineKey>,
739 ),
740 >,
741 /// Max recent commits to track
742 max_recent_commits: usize,
743}
744
745impl Default for MvccManager {
746 fn default() -> Self {
747 Self::new()
748 }
749}
750
751impl MvccManager {
752 pub fn new() -> Self {
753 Self {
754 active_txns: DashMap::new(),
755 ts_counter: AtomicU64::new(1),
756 min_active_ts: AtomicU64::new(0),
757 recent_commits: DashMap::new(),
758 max_recent_commits: 1000, // Track last 1000 commits for SSI
759 }
760 }
761
762 /// Begin a new transaction with snapshot isolation
763 ///
764 /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
765 pub fn begin(&self, txn_id: u64) -> MvccTransaction {
766 self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
767 }
768
769 /// Begin a read-only transaction (SSI bypass - 2.6x faster)
770 ///
771 /// Read-only transactions skip all SSI tracking, reducing
772 /// per-read overhead from ~82ns to ~32ns.
773 ///
774 /// ## Safety
775 ///
776 /// Caller must ensure no writes are performed. Attempting to
777 /// write in a read-only transaction will still succeed but
778 /// won't be tracked for SSI validation.
779 #[inline]
780 pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
781 self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
782 }
783
784 /// Begin a write-only transaction (partial SSI bypass)
785 ///
786 /// Write-only transactions skip read tracking, reducing overhead
787 /// for insert-heavy workloads.
788 #[inline]
789 pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
790 self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
791 }
792
793 /// Begin a transaction with specific mode
794 ///
795 /// This is the core transaction creation method that all other
796 /// begin_* methods delegate to.
797 pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
798 let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
799
800 // Create transaction with mode-optimized allocations
801 let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
802
803 self.active_txns.insert(txn_id, txn.clone());
804 self.update_min_active_ts();
805
806 txn
807 }
808
809 /// Get transaction if active (clones - use get_snapshot_ts for hot path)
810 pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
811 self.active_txns.get(&txn_id).map(|t| t.clone())
812 }
813
814 /// Fast path: get just the snapshot timestamp without cloning
815 /// This is the hot path for reads - avoids cloning bloom filters
816 #[inline]
817 pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
818 self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
819 }
820
821 /// Record a read (for SSI) - uses inline key storage + bloom filter
822 ///
823 /// ## SSI Bypass (Recommendation 9)
824 ///
825 /// For ReadOnly mode transactions, this is a no-op (instant return).
826 /// For WriteOnly mode transactions, this is a no-op.
827 /// Only ReadWrite mode transactions track reads for SSI.
828 ///
829 /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
830 #[inline]
831 pub fn record_read(&self, txn_id: u64, key: &[u8]) {
832 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
833 // SSI Bypass: Skip tracking for read-only and write-only modes
834 if !txn.mode.tracks_reads() {
835 return;
836 }
837
838 // Only track reads if within reasonable bounds
839 if txn.read_set.len() < 10000 {
840 txn.read_set.insert(SmallVec::from_slice(key));
841 txn.read_bloom.insert(key);
842 }
843 }
844 }
845
846 /// Record a write - uses inline key storage + bloom filter
847 ///
848 /// Note: Even ReadOnly transactions can record writes (mode is a hint).
849 /// The mode only affects SSI tracking, not write capability.
850 pub fn record_write(&self, txn_id: u64, key: &[u8]) {
851 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
852 txn.write_set.insert(SmallVec::from_slice(key));
853 txn.write_bloom.insert(key);
854 }
855 }
856
857 /// Allocate commit timestamp
858 pub fn alloc_commit_ts(&self) -> u64 {
859 self.ts_counter.fetch_add(1, Ordering::SeqCst)
860 }
861
862 /// Commit transaction with SSI validation
863 /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
864 /// Returns None if SSI validation fails (dangerous structure detected)
865 ///
866 /// ## SSI Bypass (Recommendation 9)
867 ///
868 /// For ReadOnly mode: Skip validation entirely (~10ns commit)
869 /// For WriteOnly mode: Skip read-based validation (~30ns commit)
870 /// For ReadWrite mode: Full validation (~50ns commit)
871 pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
872 // Get transaction before removing
873 let txn = self.active_txns.get(&txn_id)?.clone();
874
875 // SSI Bypass: Skip validation for ReadOnly transactions
876 // ReadOnly can never form rw-antidependency cycles
877 if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
878 // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
879 // For ReadWrite: check SSI validation result
880 if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
881 // Abort on SSI violation
882 self.active_txns.remove(&txn_id);
883 self.update_min_active_ts();
884 return None;
885 }
886 }
887
888 let commit_ts = self.alloc_commit_ts();
889
890 // Extract write_set and remove transaction - takes ownership
891 let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
892
893 // OPTIMIZATION: Only track ReadWrite transactions for SSI
894 // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
895 let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
896 && !removed_txn.read_set.is_empty()
897 && !removed_txn.write_set.is_empty();
898
899 if needs_ssi_tracking {
900 // Need to clone write_set since we return it AND track it
901 let write_set_for_return = removed_txn.write_set.clone();
902
903 self.track_commit_owned(
904 txn_id,
905 commit_ts,
906 removed_txn.read_bloom,
907 removed_txn.write_bloom,
908 removed_txn.read_set,
909 removed_txn.write_set,
910 );
911
912 self.update_min_active_ts();
913 Some((commit_ts, write_set_for_return))
914 } else {
915 // Fast path: no SSI tracking needed, avoid clone entirely
916 self.update_min_active_ts();
917 Some((commit_ts, removed_txn.write_set))
918 }
919 }
920
921 /// Validate SSI constraints for a committing transaction
922 ///
923 /// ## Transaction Classification (Task 3: Optimistic MVCC)
924 ///
925 /// Transactions are classified and routed through appropriate fast paths:
926 ///
927 /// | Class | Criteria | Validation Cost |
928 /// |------------|-------------------------------|-----------------|
929 /// | ReadOnly | write_set.is_empty() | 0 ns |
930 /// | SingleKey | write_set.len() == 1 | 0 ns |
931 /// | Disjoint | bloom filters don't intersect | ~10 ns |
932 /// | General | full SSI check | ~50 ns |
933 ///
934 /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
935 /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
936 ///
937 /// Detects "dangerous structures" - rw-antidependency cycles:
938 /// - T1 reads X (snapshot sees old value)
939 /// - T2 writes X (concurrent write)
940 /// - T2 reads Y (snapshot sees old value)
941 /// - T1 writes Y (concurrent write)
942 ///
943 /// If T1 → rw → T2 → rw → T1 exists, abort T1
944 #[inline]
945 fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
946 // =================================================================
947 // Fast Path 1: Read-only transactions (0 ns)
948 // =================================================================
949 // Read-only transactions can never form rw-antidependency cycles
950 // because they have no writes to create outgoing rw-edges
951 if txn.write_set.is_empty() {
952 return true;
953 }
954
955 // =================================================================
956 // Fast Path 2: No recent commits to check (0 ns)
957 // =================================================================
958 if self.recent_commits.is_empty() {
959 return true;
960 }
961
962 // =================================================================
963 // Fast Path 3: Single-key write transactions (0 ns)
964 // =================================================================
965 // A single-key write transaction cannot form a dangerous cycle:
966 // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
967 // AND T2 to read what T1 wrote
968 // - With only one key in write_set, the same key would need to be
969 // in both read_set AND write_set of both transactions
970 // - This is already prevented by our conflict detection (write-write)
971 if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
972 return true;
973 }
974
975 let my_snapshot = txn.snapshot_ts;
976
977 // =================================================================
978 // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
979 // =================================================================
980 // Pre-filter using bloom filters: if our write_bloom doesn't intersect
981 // with any concurrent transaction's read_bloom AND vice versa,
982 // there can be no rw-antidependency
983 let mut any_may_intersect = false;
984 for entry in self.recent_commits.iter() {
985 let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
986
987 // Only check concurrent transactions.
988 //
989 // A committed transaction is *concurrent* with us iff its writes are
990 // invisible to our snapshot. Read visibility is strict
991 // (`commit_ts < snapshot_ts`), so a transaction with
992 // `commit_ts >= my_snapshot` is invisible and must be validated
993 // against. Using `<` here (not `<=`) keeps this window consistent
994 // with `read_at`; a `<=` would skip a boundary transaction whose
995 // `commit_ts == my_snapshot` and miss a genuine write-skew.
996 if *other_commit_ts < my_snapshot {
997 continue;
998 }
999
1000 // Check bloom filter intersection (O(m/64) per filter)
1001 // If our writes may intersect their reads OR their writes may intersect our reads
1002 if txn.write_bloom.may_intersect(other_read_bloom)
1003 || other_write_bloom.may_intersect(&txn.read_bloom)
1004 {
1005 any_may_intersect = true;
1006 break;
1007 }
1008 }
1009
1010 // No bloom intersection means definitely disjoint - no SSI conflict possible
1011 if !any_may_intersect {
1012 return true;
1013 }
1014
1015 // =================================================================
1016 // Full SSI Validation (~50 ns)
1017 // =================================================================
1018 // Check for rw-conflicts with recently committed transactions
1019 // An rw-conflict exists if:
1020 // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
1021 // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
1022
1023 let mut in_conflict_with: Vec<u64> = Vec::new();
1024 let mut out_conflict_to: Vec<u64> = Vec::new();
1025
1026 for entry in self.recent_commits.iter() {
1027 let (
1028 other_txn_id,
1029 (
1030 other_commit_ts,
1031 _other_read_bloom,
1032 other_write_bloom,
1033 other_read_set,
1034 other_write_set,
1035 ),
1036 ) = entry.pair();
1037
1038 // Only consider transactions concurrent with us: those whose writes
1039 // are invisible to our snapshot. Read visibility is strict
1040 // (`commit_ts < snapshot_ts`), so `commit_ts >= my_snapshot` means
1041 // concurrent. `<` (not `<=`) keeps this consistent with `read_at`.
1042 if *other_commit_ts < my_snapshot {
1043 continue;
1044 }
1045
1046 // Check: other wrote → we read (other →rw→ me)
1047 // T_other wrote a key that T_me read (rw-dependency inbound)
1048 //
1049 // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
1050 // Only do expensive HashSet intersection if bloom says "maybe conflict"
1051 let mut has_in_conflict = false;
1052 for key in txn.read_set.iter() {
1053 if other_write_bloom.may_contain(key) {
1054 // Bloom says maybe - do exact check
1055 if other_write_set.contains(key) {
1056 has_in_conflict = true;
1057 break;
1058 }
1059 }
1060 }
1061 if has_in_conflict {
1062 in_conflict_with.push(*other_txn_id);
1063 }
1064
1065 // Check: we wrote → other read (me →rw→ other)
1066 // T_me wrote a key that T_other read (rw-dependency outbound)
1067 //
1068 // Bloom-accelerated: Use our write_bloom against their read_set
1069 let mut has_out_conflict = false;
1070 for key in other_read_set.iter() {
1071 if txn.write_bloom.may_contain(key) {
1072 // Bloom says maybe - do exact check
1073 if txn.write_set.contains(key) {
1074 has_out_conflict = true;
1075 break;
1076 }
1077 }
1078 }
1079 if has_out_conflict {
1080 out_conflict_to.push(*other_txn_id);
1081 }
1082 }
1083
1084 // Dangerous structure: we have both incoming AND outgoing rw-edges
1085 // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1086 //
1087 // Conservative check: if both exist, abort
1088 // A more precise check would verify the cycle path, but this is safe
1089 if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1090 return false; // SSI violation - abort
1091 }
1092
1093 true
1094 }
1095
1096 /// Track a committed transaction for future SSI validation
1097 ///
1098 /// Only tracks transactions that have both reads AND writes, since SSI
1099 /// only detects rw-antidependency cycles. Pure read or pure write
1100 /// transactions can't form cycles.
1101 ///
1102 /// ## Optimization: Zero-Copy Transfer
1103 ///
1104 /// Takes ownership of sets instead of cloning to avoid the +15% commit
1105 /// phase regression. The caller should use mem::take() to transfer ownership.
1106 fn track_commit_owned(
1107 &self,
1108 txn_id: u64,
1109 commit_ts: u64,
1110 read_bloom: SsiBloomFilter,
1111 write_bloom: SsiBloomFilter,
1112 read_set: HashSet<InlineKey>,
1113 write_set: HashSet<InlineKey>,
1114 ) {
1115 // Optimization: Only track mixed read-write transactions
1116 // Pure reads can't create outgoing rw-edges
1117 // Pure writes can't create incoming rw-edges
1118 if read_set.is_empty() || write_set.is_empty() {
1119 return; // Skip tracking - can't form SSI cycle
1120 }
1121
1122 // Add to recent commits with bloom filters for fast SSI pre-filtering
1123 // No cloning needed - we take ownership
1124 self.recent_commits.insert(
1125 txn_id,
1126 (commit_ts, read_bloom, write_bloom, read_set, write_set),
1127 );
1128
1129 // Lazy pruning: only prune when we're significantly over capacity
1130 // Avoids pruning overhead on every commit
1131 if self.recent_commits.len() > self.max_recent_commits * 2 {
1132 // Remove entries with lowest commit_ts
1133 let min_active = self.min_active_ts.load(Ordering::Relaxed);
1134 self.recent_commits
1135 .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1136 }
1137 }
1138
1139 /// Legacy track_commit that clones - kept for compatibility
1140 #[allow(dead_code)]
1141 fn track_commit(
1142 &self,
1143 txn_id: u64,
1144 commit_ts: u64,
1145 read_bloom: SsiBloomFilter,
1146 write_bloom: SsiBloomFilter,
1147 read_set: &HashSet<InlineKey>,
1148 write_set: &HashSet<InlineKey>,
1149 ) {
1150 if read_set.is_empty() || write_set.is_empty() {
1151 return;
1152 }
1153 self.recent_commits.insert(
1154 txn_id,
1155 (
1156 commit_ts,
1157 read_bloom,
1158 write_bloom,
1159 read_set.clone(),
1160 write_set.clone(),
1161 ),
1162 );
1163 }
1164
1165 /// Abort transaction
1166 pub fn abort(&self, txn_id: u64) {
1167 self.active_txns.remove(&txn_id);
1168 self.update_min_active_ts();
1169 }
1170
1171 /// Get minimum active snapshot timestamp
1172 pub fn min_active_snapshot(&self) -> u64 {
1173 self.min_active_ts.load(Ordering::SeqCst)
1174 }
1175
1176 /// Get count of active transactions
1177 pub fn active_transaction_count(&self) -> usize {
1178 self.active_txns.len()
1179 }
1180
1181 fn update_min_active_ts(&self) {
1182 let min = self
1183 .active_txns
1184 .iter()
1185 .map(|entry| entry.value().snapshot_ts)
1186 .min()
1187 .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1188 self.min_active_ts.store(min, Ordering::SeqCst);
1189 }
1190}
1191
1192/// Epoch-based dirty list for O(expired) GC instead of O(n)
1193///
1194/// Instead of scanning ALL version chains, we track which keys have versions
1195/// created in each epoch. GC only needs to visit keys from old epochs.
1196struct EpochDirtyList {
1197 /// Ring buffer of epoch -> dirty keys
1198 /// Index = epoch % EPOCH_RING_SIZE
1199 epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1200 /// Current epoch
1201 current_epoch: AtomicU64,
1202}
1203
1204const EPOCH_RING_SIZE: usize = 4;
1205
1206impl EpochDirtyList {
1207 fn new() -> Self {
1208 Self {
1209 epochs: [
1210 parking_lot::Mutex::new(Vec::new()),
1211 parking_lot::Mutex::new(Vec::new()),
1212 parking_lot::Mutex::new(Vec::new()),
1213 parking_lot::Mutex::new(Vec::new()),
1214 ],
1215 current_epoch: AtomicU64::new(0),
1216 }
1217 }
1218
1219 /// Record a version created in the current epoch
1220 #[inline]
1221 fn record_version(&self, key: Vec<u8>) {
1222 let epoch = self.current_epoch.load(Ordering::Relaxed);
1223 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1224 self.epochs[idx].lock().push(key);
1225 }
1226
1227 /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1228 ///
1229 /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1230 /// For 100 writes: ~100x fewer mutex operations.
1231 #[inline]
1232 fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1233 let epoch = self.current_epoch.load(Ordering::Relaxed);
1234 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1235 let mut guard = self.epochs[idx].lock();
1236 guard.extend(keys);
1237 }
1238
1239 /// Advance to next epoch, returning old epoch's dirty keys
1240 fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1241 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1242 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1243
1244 // Drain the old epoch's dirty list
1245 let mut guard = self.epochs[old_idx].lock();
1246 let keys = std::mem::take(&mut *guard);
1247 (old_epoch, keys)
1248 }
1249
1250 /// Get current epoch
1251 #[allow(dead_code)]
1252 fn current(&self) -> u64 {
1253 self.current_epoch.load(Ordering::Relaxed)
1254 }
1255}
1256
1257// ============================================================================
1258// Streaming Scan Iterator
1259// ============================================================================
1260
1261/// Streaming iterator for range scans
1262///
1263/// Yields results one at a time without materializing the full result set.
1264/// This enables processing of very large result sets with O(1) memory per
1265/// iteration instead of O(N) for the entire result set.
1266struct ScanRangeIterator<'a> {
1267 memtable: &'a MvccMemTable,
1268 start: Vec<u8>,
1269 end: Vec<u8>,
1270 snapshot_ts: u64,
1271 current_txn_id: Option<u64>,
1272 use_ordered: bool,
1273 // We use Option to defer initialization
1274 ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1275 unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1276 initialized: bool,
1277}
1278
1279impl<'a> Iterator for ScanRangeIterator<'a> {
1280 type Item = (Vec<u8>, Vec<u8>);
1281
1282 fn next(&mut self) -> Option<Self::Item> {
1283 // Lazy initialization on first call
1284 if !self.initialized {
1285 self.initialized = true;
1286
1287 if self.use_ordered {
1288 // Try deferred index first (after compaction, it uses a SkipMap internally)
1289 if let Some(ref def_idx) = self.memtable.deferred_index {
1290 let start = self.start.clone();
1291 let end = self.end.clone();
1292 let snapshot_ts = self.snapshot_ts;
1293 let current_txn_id = self.current_txn_id;
1294 let data = &self.memtable.data;
1295
1296 // Collect keys from deferred index (already sorted after compact)
1297 let keys: Vec<Vec<u8>> = if end.is_empty() {
1298 def_idx.range_from(&start).collect()
1299 } else {
1300 def_idx.range(&start, &end).collect()
1301 };
1302
1303 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1304 Box::new(keys.into_iter().filter_map(move |key| {
1305 if let Some(chain) = data.get(&key)
1306 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1307 && let Some(value) = &v.value
1308 {
1309 Some((key, value.clone()))
1310 } else {
1311 None
1312 }
1313 }));
1314 self.ordered_iter = Some(iter);
1315 } else if let Some(ref idx) = self.memtable.ordered_index {
1316 let start = self.start.clone();
1317 let end = self.end.clone();
1318 let snapshot_ts = self.snapshot_ts;
1319 let current_txn_id = self.current_txn_id;
1320 let data = &self.memtable.data;
1321
1322 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty()
1323 {
1324 Box::new(idx.range(start..).filter_map(move |entry| {
1325 let key = entry.key();
1326 if let Some(chain) = data.get(key)
1327 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1328 && let Some(value) = &v.value
1329 {
1330 Some((key.clone(), value.clone()))
1331 } else {
1332 None
1333 }
1334 }))
1335 } else {
1336 Box::new(idx.range(start..end).filter_map(move |entry| {
1337 let key = entry.key();
1338 if let Some(chain) = data.get(key)
1339 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1340 && let Some(value) = &v.value
1341 {
1342 Some((key.clone(), value.clone()))
1343 } else {
1344 None
1345 }
1346 }))
1347 };
1348 self.ordered_iter = Some(iter);
1349 }
1350 } else {
1351 // Unordered full scan
1352 let start = self.start.clone();
1353 let end = self.end.clone();
1354 let snapshot_ts = self.snapshot_ts;
1355 let current_txn_id = self.current_txn_id;
1356
1357 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1358 Box::new(self.memtable.data.iter().filter_map(move |entry| {
1359 let key = entry.key();
1360
1361 if key.as_slice() < start.as_slice() {
1362 return None;
1363 }
1364 if !end.is_empty() && key.as_slice() >= end.as_slice() {
1365 return None;
1366 }
1367
1368 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1369 && let Some(value) = &v.value
1370 {
1371 Some((key.clone(), value.clone()))
1372 } else {
1373 None
1374 }
1375 }));
1376 self.unordered_iter = Some(iter);
1377 }
1378 }
1379
1380 // Get next from appropriate iterator
1381 if let Some(ref mut iter) = self.ordered_iter {
1382 iter.next()
1383 } else if let Some(ref mut iter) = self.unordered_iter {
1384 iter.next()
1385 } else {
1386 None
1387 }
1388 }
1389}
1390
1391/// MemTable with MVCC support
1392///
1393/// Uses DashMap for lock-free concurrent access per key.
1394/// This eliminates the global write lock bottleneck.
1395///
1396/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1397/// Maintains a deferred sorted index for efficient scans:
1398/// - Writes: O(1) append to hot buffer
1399/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1400pub struct MvccMemTable {
1401 /// Key -> VersionChain (sharded for concurrent access)
1402 data: DashMap<Vec<u8>, VersionChain>,
1403 /// Deferred sorted index for efficient prefix/range scans (optional)
1404 /// O(1) insert to hot buffer, O(N log N) sort on first scan
1405 /// When None, scan_prefix will fall back to O(N) DashMap iteration
1406 deferred_index: Option<DeferredSortedIndex>,
1407 /// Legacy SkipMap for compatibility (used when deferred=false)
1408 ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1409 /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1410 #[allow(dead_code)]
1411 use_deferred: bool,
1412 /// Approximate size in bytes
1413 size_bytes: AtomicU64,
1414 /// Epoch-based dirty list for efficient GC
1415 dirty_list: EpochDirtyList,
1416}
1417
1418impl Default for MvccMemTable {
1419 fn default() -> Self {
1420 Self::new()
1421 }
1422}
1423
1424impl MvccMemTable {
1425 pub fn new() -> Self {
1426 Self::with_ordered_index(true)
1427 }
1428
1429 /// Create memtable with optional ordered index
1430 ///
1431 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1432 /// but scan_prefix becomes O(N) instead of O(log N + K)
1433 ///
1434 /// Uses deferred sorting by default for better write performance:
1435 /// - Writes: O(1) append to hot buffer
1436 /// - Scans: O(N log N) sort-on-demand
1437 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1438 Self::with_index_mode(enable_ordered_index, true)
1439 }
1440
1441 /// Create memtable with fine-grained control over indexing
1442 ///
1443 /// # Arguments
1444 /// * `enable_ordered_index` - Whether to maintain an ordered index
1445 /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1446 /// If false, use SkipMap (O(log N) writes)
1447 pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1448 Self {
1449 data: DashMap::new(),
1450 deferred_index: if enable_ordered_index && use_deferred {
1451 Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1452 max_unsorted_entries: 10_000, // Compact every 10K writes
1453 enabled: true,
1454 }))
1455 } else {
1456 None
1457 },
1458 ordered_index: if enable_ordered_index && !use_deferred {
1459 Some(SkipMap::new())
1460 } else {
1461 None
1462 },
1463 use_deferred,
1464 size_bytes: AtomicU64::new(0),
1465 dirty_list: EpochDirtyList::new(),
1466 }
1467 }
1468
1469 /// Write a key-value pair (uncommitted)
1470 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1471 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1472 let key_len = key.len();
1473
1474 // Track this key in the current epoch's dirty list for GC
1475 self.dirty_list.record_version(key.clone());
1476
1477 // Insert into ordered index for prefix scans (if enabled)
1478 // Deferred: O(1) append to hot buffer
1479 // SkipMap: O(log N) insert
1480 if let Some(ref idx) = self.deferred_index {
1481 idx.insert(key.clone());
1482 } else if let Some(ref idx) = self.ordered_index {
1483 idx.insert(key.clone(), ());
1484 }
1485
1486 // Use entry API for atomic get-or-insert
1487 let mut entry = self.data.entry(key).or_default();
1488
1489 // Check for write-write conflict
1490 if entry.has_write_conflict(txn_id) {
1491 return Err(SochDBError::Internal(
1492 "Write-write conflict detected".into(),
1493 ));
1494 }
1495 entry.add_uncommitted(value, txn_id);
1496 self.size_bytes
1497 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1498
1499 Ok(())
1500 }
1501
1502 /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1503 ///
1504 /// Optimizations applied (Rec 3: MVCC Batching):
1505 /// - Batched dirty list tracking: single lock acquire for all keys
1506 /// - Deferred index: O(1) append per key
1507 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1508 let mut total_size = 0u64;
1509
1510 // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1511 self.dirty_list
1512 .record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1513
1514 for (key, value) in writes {
1515 // Insert into ordered index (if enabled)
1516 // Deferred: O(1) append, SkipMap: O(log N)
1517 if let Some(ref idx) = self.deferred_index {
1518 idx.insert(key.clone());
1519 } else if let Some(ref idx) = self.ordered_index {
1520 idx.insert(key.clone(), ());
1521 }
1522
1523 let mut entry = self.data.entry(key.clone()).or_default();
1524
1525 if entry.has_write_conflict(txn_id) {
1526 return Err(SochDBError::Internal(
1527 "Write-write conflict detected".into(),
1528 ));
1529 }
1530
1531 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1532 entry.add_uncommitted(value.clone(), txn_id);
1533 total_size += (key.len() + value_size) as u64;
1534 }
1535
1536 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1537 Ok(())
1538 }
1539
1540 /// Read at snapshot timestamp, with optional current txn to see own writes
1541 pub fn read(
1542 &self,
1543 key: &[u8],
1544 snapshot_ts: u64,
1545 current_txn_id: Option<u64>,
1546 ) -> Option<Vec<u8>> {
1547 self.data.get(key).and_then(|chain| {
1548 chain
1549 .read_at(snapshot_ts, current_txn_id)
1550 .and_then(|v| v.value.clone())
1551 })
1552 }
1553
1554 /// Commit all versions for a transaction
1555 ///
1556 /// Only updates the keys that were written by this transaction (tracked in write_set).
1557 /// Accepts InlineKey for zero-allocation MVCC tracking.
1558 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1559 // Only iterate over keys we know were written - O(k) instead of O(n)
1560 for key in write_set {
1561 if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1562 chain.commit(txn_id, commit_ts);
1563 }
1564 }
1565 }
1566
1567 /// Legacy commit method (iterates all keys) - kept for backward compatibility
1568 #[allow(dead_code)]
1569 pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1570 for mut entry in self.data.iter_mut() {
1571 entry.value_mut().commit(txn_id, commit_ts);
1572 }
1573 }
1574
1575 /// Abort all versions for a transaction
1576 pub fn abort(&self, txn_id: u64) {
1577 for mut entry in self.data.iter_mut() {
1578 entry.value_mut().abort(txn_id);
1579 }
1580 }
1581
1582 /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1583 ///
1584 /// ## Performance
1585 ///
1586 /// When ordered_index is enabled: O(log N + K) complexity
1587 /// - O(log N) to seek to the first key with prefix
1588 /// - O(K) to iterate matching keys
1589 ///
1590 /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1591 ///
1592 /// ## Optimizations Applied
1593 ///
1594 /// - Pre-allocates result vector based on expected output size
1595 /// - Uses batch-friendly iteration patterns
1596 /// - Minimizes allocations during iteration
1597 /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1598 pub fn scan_prefix(
1599 &self,
1600 prefix: &[u8],
1601 snapshot_ts: u64,
1602 current_txn_id: Option<u64>,
1603 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1604 // Estimate result size for pre-allocation (use 10% of total as heuristic)
1605 let estimated_size = (self.data.len() / 10).max(64);
1606 let mut results = Vec::with_capacity(estimated_size);
1607
1608 if let Some(ref idx) = self.deferred_index {
1609 // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1610 for key in idx.range_from(prefix) {
1611 // Stop when we've passed the prefix range
1612 if !key.starts_with(prefix) {
1613 break;
1614 }
1615
1616 // O(1) lookup in DashMap for version chain
1617 if let Some(chain) = self.data.get(&key)
1618 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1619 && let Some(value) = &v.value
1620 {
1621 results.push((key, value.clone()));
1622 }
1623 }
1624 } else if let Some(ref idx) = self.ordered_index {
1625 // Fast path: O(log N) seek to first key >= prefix
1626 for entry in idx.range(prefix.to_vec()..) {
1627 let key = entry.key();
1628
1629 // Stop when we've passed the prefix range
1630 if !key.starts_with(prefix) {
1631 break;
1632 }
1633
1634 // O(1) lookup in DashMap for version chain
1635 if let Some(chain) = self.data.get(key)
1636 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1637 && let Some(value) = &v.value
1638 {
1639 results.push((key.clone(), value.clone()));
1640 }
1641 }
1642 } else {
1643 // Fallback: O(N) full DashMap scan when ordered_index is disabled
1644 // Optimized with batch-friendly iteration
1645 for entry in self.data.iter() {
1646 let key = entry.key();
1647 if !key.starts_with(prefix) {
1648 continue;
1649 }
1650 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1651 && let Some(value) = &v.value
1652 {
1653 results.push((key.clone(), value.clone()));
1654 }
1655 }
1656 }
1657
1658 results
1659 }
1660
1661 /// Optimized full scan with batch allocation
1662 ///
1663 /// For use when scanning entire tables/namespaces.
1664 /// Pre-allocates based on actual data size.
1665 pub fn scan_all(
1666 &self,
1667 snapshot_ts: u64,
1668 current_txn_id: Option<u64>,
1669 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1670 let mut results = Vec::with_capacity(self.data.len());
1671
1672 for entry in self.data.iter() {
1673 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1674 && let Some(value) = &v.value
1675 {
1676 results.push((entry.key().clone(), value.clone()));
1677 }
1678 }
1679
1680 results
1681 }
1682
1683 /// Streaming scan iterator for very large datasets
1684 ///
1685 /// Returns an iterator that yields (key, value) pairs without
1686 /// materializing the entire result set in memory.
1687 pub fn scan_prefix_iter<'a>(
1688 &'a self,
1689 prefix: &'a [u8],
1690 snapshot_ts: u64,
1691 current_txn_id: Option<u64>,
1692 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1693 self.data.iter().filter_map(move |entry| {
1694 let key = entry.key();
1695 if !key.starts_with(prefix) {
1696 return None;
1697 }
1698 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1699 && let Some(value) = &v.value
1700 {
1701 Some((key.clone(), value.clone()))
1702 } else {
1703 None
1704 }
1705 })
1706 }
1707
1708 /// Scan range
1709 pub fn scan_range(
1710 &self,
1711 start: &[u8],
1712 end: &[u8],
1713 snapshot_ts: u64,
1714 current_txn_id: Option<u64>,
1715 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1716 let mut results = Vec::new();
1717
1718 if let Some(ref idx) = self.deferred_index {
1719 // Deferred index path: sort-on-scan
1720 if end.is_empty() {
1721 for key in idx.range_from(start) {
1722 if let Some(chain) = self.data.get(&key)
1723 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1724 && let Some(value) = &v.value
1725 {
1726 results.push((key, value.clone()));
1727 }
1728 }
1729 } else {
1730 for key in idx.range(start, end) {
1731 if let Some(chain) = self.data.get(&key)
1732 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1733 && let Some(value) = &v.value
1734 {
1735 results.push((key, value.clone()));
1736 }
1737 }
1738 }
1739 } else if let Some(ref idx) = self.ordered_index {
1740 // Use range scan on SkipMap
1741 if end.is_empty() {
1742 // Unbounded end
1743 for entry in idx.range(start.to_vec()..) {
1744 let key = entry.key();
1745 if let Some(chain) = self.data.get(key)
1746 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1747 && let Some(value) = &v.value
1748 {
1749 results.push((key.clone(), value.clone()));
1750 }
1751 }
1752 } else {
1753 for entry in idx.range(start.to_vec()..end.to_vec()) {
1754 let key = entry.key();
1755 if let Some(chain) = self.data.get(key)
1756 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1757 && let Some(value) = &v.value
1758 {
1759 results.push((key.clone(), value.clone()));
1760 }
1761 }
1762 }
1763 } else {
1764 // Fallback to full scan if no ordered index
1765 for entry in self.data.iter() {
1766 let key = entry.key();
1767
1768 if key.as_slice() < start {
1769 continue;
1770 }
1771 if !end.is_empty() && key.as_slice() >= end {
1772 continue;
1773 }
1774
1775 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1776 && let Some(value) = &v.value
1777 {
1778 results.push((key.clone(), value.clone()));
1779 }
1780 }
1781 }
1782
1783 results
1784 }
1785
1786 /// Streaming range scan iterator for very large datasets
1787 ///
1788 /// Returns an iterator that yields (key, value) pairs without
1789 /// materializing the entire result set in memory. Uses the ordered
1790 /// index when available for O(log N + K) complexity.
1791 ///
1792 /// ## Zero-Allocation Design
1793 ///
1794 /// While the iterator itself cannot avoid allocations for returned
1795 /// values (since the caller needs ownership), it avoids:
1796 /// - Pre-materializing all results
1797 /// - Intermediate buffers
1798 /// - Repeated key comparisons for already-visited entries
1799 ///
1800 /// ## Usage
1801 ///
1802 /// ```ignore
1803 /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1804 /// // Process each result as it arrives
1805 /// // Memory usage is O(1) per iteration, not O(N) total
1806 /// }
1807 /// ```
1808 pub fn scan_range_iter<'a>(
1809 &'a self,
1810 start: &'a [u8],
1811 end: &'a [u8],
1812 snapshot_ts: u64,
1813 current_txn_id: Option<u64>,
1814 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1815 // Compact deferred index before scanning if needed
1816 if let Some(ref idx) = self.deferred_index {
1817 idx.compact();
1818 }
1819
1820 // Use either ordered index or full scan
1821 let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1822
1823 // Create iterator based on availability of ordered index
1824 ScanRangeIterator {
1825 memtable: self,
1826 start: start.to_vec(),
1827 end: end.to_vec(),
1828 snapshot_ts,
1829 current_txn_id,
1830 use_ordered,
1831 ordered_iter: None,
1832 unordered_iter: None,
1833 initialized: false,
1834 }
1835 }
1836
1837 /// Get approximate size
1838 pub fn size(&self) -> u64 {
1839 self.size_bytes.load(Ordering::Relaxed)
1840 }
1841
1842 /// Garbage collect old versions using epoch-based dirty list
1843 ///
1844 /// O(expired_versions) instead of O(all_versions)
1845 /// Only visits keys that had versions created in the old epoch.
1846 pub fn gc(&self, min_active_ts: u64) -> usize {
1847 // Advance epoch and get the dirty keys from the old epoch
1848 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1849
1850 if dirty_keys.is_empty() {
1851 return 0;
1852 }
1853
1854 let mut gc_count = 0;
1855
1856 // Only visit keys that were modified in the old epoch
1857 // Use a HashSet to deduplicate keys that were written multiple times
1858 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1859
1860 for key in unique_keys {
1861 if let Some(mut entry) = self.data.get_mut(&key) {
1862 let before = entry.value().version_count();
1863 entry.value_mut().gc(min_active_ts);
1864 gc_count += before.saturating_sub(entry.value().version_count());
1865 }
1866 }
1867
1868 gc_count
1869 }
1870
1871 /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1872 #[allow(dead_code)]
1873 pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1874 let mut gc_count = 0;
1875
1876 for mut entry in self.data.iter_mut() {
1877 let before = entry.value().version_count();
1878 entry.value_mut().gc(min_active_ts);
1879 gc_count += before.saturating_sub(entry.value().version_count());
1880 }
1881
1882 gc_count
1883 }
1884}
1885
1886// =============================================================================
1887// Rec 11: Unified MvccStore Implementation for MvccMemTable
1888// =============================================================================
1889
1890impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1891 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1892 self.read(key, snapshot_ts, txn_id)
1893 }
1894
1895 fn mvcc_put(
1896 &self,
1897 key: &[u8],
1898 value: Option<Vec<u8>>,
1899 txn_id: u64,
1900 ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1901 let mut entry = self.data.entry(key.to_vec()).or_default();
1902 if entry.has_write_conflict(txn_id) {
1903 return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1904 }
1905 entry.add_uncommitted(value, txn_id);
1906 Ok(())
1907 }
1908
1909 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1910 if let Some(mut chain) = self.data.get_mut(key) {
1911 return chain.commit(txn_id, commit_ts);
1912 }
1913 false
1914 }
1915
1916 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1917 if let Some(mut chain) = self.data.get_mut(key) {
1918 chain.abort(txn_id);
1919 }
1920 }
1921
1922 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1923 self.data
1924 .get(key)
1925 .map(|chain| chain.has_write_conflict(txn_id))
1926 .unwrap_or(false)
1927 }
1928
1929 fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1930 let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1931 for mut entry in self.data.iter_mut() {
1932 stats.keys_scanned += 1;
1933 let before = entry.value().version_count();
1934 entry.value_mut().gc(min_ts);
1935 stats.versions_removed += before.saturating_sub(entry.value().version_count());
1936 }
1937 stats
1938 }
1939
1940 fn mvcc_key_count(&self) -> usize {
1941 self.data.len()
1942 }
1943}
1944
1945// ============================================================================
1946// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1947// ============================================================================
1948
1949use crate::key_buffer::ArenaKeyHandle;
1950
1951/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1952struct ArenaEpochDirtyList {
1953 epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1954 current_epoch: AtomicU64,
1955}
1956
1957impl ArenaEpochDirtyList {
1958 fn new() -> Self {
1959 Self {
1960 epochs: [
1961 parking_lot::Mutex::new(Vec::new()),
1962 parking_lot::Mutex::new(Vec::new()),
1963 parking_lot::Mutex::new(Vec::new()),
1964 parking_lot::Mutex::new(Vec::new()),
1965 ],
1966 current_epoch: AtomicU64::new(0),
1967 }
1968 }
1969
1970 #[inline]
1971 fn record_version(&self, key: ArenaKeyHandle) {
1972 let epoch = self.current_epoch.load(Ordering::Relaxed);
1973 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1974 self.epochs[idx].lock().push(key);
1975 }
1976
1977 fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1978 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1979 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1980 let mut guard = self.epochs[old_idx].lock();
1981 let keys = std::mem::take(&mut *guard);
1982 (old_epoch, keys)
1983 }
1984}
1985
1986/// Arena-backed MVCC MemTable with optimized key storage
1987///
1988/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1989/// reducing per-write allocations from 3 to 1:
1990///
1991/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1992/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1993///
1994/// ## Performance
1995///
1996/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1997/// by reducing:
1998/// - Heap allocations: 3 → 1 per write
1999/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
2000pub struct ArenaMvccMemTable {
2001 /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
2002 data: DashMap<ArenaKeyHandle, VersionChain>,
2003 /// Ordered index for prefix scans
2004 ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
2005 /// Approximate size in bytes
2006 size_bytes: AtomicU64,
2007 /// Epoch-based dirty list (arena-backed)
2008 dirty_list: ArenaEpochDirtyList,
2009}
2010
2011impl ArenaMvccMemTable {
2012 pub fn new() -> Self {
2013 Self::with_ordered_index(true)
2014 }
2015
2016 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
2017 Self {
2018 data: DashMap::new(),
2019 ordered_index: if enable_ordered_index {
2020 Some(SkipMap::new())
2021 } else {
2022 None
2023 },
2024 size_bytes: AtomicU64::new(0),
2025 dirty_list: ArenaEpochDirtyList::new(),
2026 }
2027 }
2028
2029 /// Write a key-value pair using arena key handle
2030 ///
2031 /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
2032 /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
2033 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2034 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2035 let key_len = key.len();
2036
2037 // Create ONE ArenaKeyHandle - this is the only allocation for the key
2038 let key_handle = ArenaKeyHandle::new(key);
2039
2040 // Track in dirty list (O(1) copy of 16-byte handle)
2041 self.dirty_list.record_version(key_handle.clone());
2042
2043 // Insert into ordered index (O(1) copy of 16-byte handle)
2044 if let Some(ref idx) = self.ordered_index {
2045 idx.insert(key_handle.clone(), ());
2046 }
2047
2048 // Use entry API with the handle
2049 let mut entry = self.data.entry(key_handle).or_default();
2050
2051 if entry.has_write_conflict(txn_id) {
2052 return Err(SochDBError::Internal(
2053 "Write-write conflict detected".into(),
2054 ));
2055 }
2056 entry.add_uncommitted(value, txn_id);
2057 self.size_bytes
2058 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2059
2060 Ok(())
2061 }
2062
2063 /// Write batch using arena key handles
2064 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2065 let mut total_size = 0u64;
2066
2067 for (key, value) in writes {
2068 let key_handle = ArenaKeyHandle::new(key);
2069
2070 self.dirty_list.record_version(key_handle.clone());
2071
2072 if let Some(ref idx) = self.ordered_index {
2073 idx.insert(key_handle.clone(), ());
2074 }
2075
2076 let mut entry = self.data.entry(key_handle).or_default();
2077
2078 if entry.has_write_conflict(txn_id) {
2079 return Err(SochDBError::Internal(
2080 "Write-write conflict detected".into(),
2081 ));
2082 }
2083
2084 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2085 entry.add_uncommitted(value.clone(), txn_id);
2086 total_size += (key.len() + value_size) as u64;
2087 }
2088
2089 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2090 Ok(())
2091 }
2092
2093 /// Read at snapshot timestamp
2094 pub fn read(
2095 &self,
2096 key: &[u8],
2097 snapshot_ts: u64,
2098 current_txn_id: Option<u64>,
2099 ) -> Option<Vec<u8>> {
2100 // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2101 let key_handle = ArenaKeyHandle::new(key);
2102 self.data.get(&key_handle).and_then(|chain| {
2103 chain
2104 .read_at(snapshot_ts, current_txn_id)
2105 .and_then(|v| v.value.clone())
2106 })
2107 }
2108
2109 /// Commit transaction
2110 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2111 for key in write_set {
2112 let key_handle = ArenaKeyHandle::new(key.as_slice());
2113 if let Some(mut chain) = self.data.get_mut(&key_handle) {
2114 chain.commit(txn_id, commit_ts);
2115 }
2116 }
2117 }
2118
2119 /// Abort transaction
2120 pub fn abort(&self, txn_id: u64) {
2121 for mut entry in self.data.iter_mut() {
2122 entry.value_mut().abort(txn_id);
2123 }
2124 }
2125
2126 /// Scan prefix
2127 pub fn scan_prefix(
2128 &self,
2129 prefix: &[u8],
2130 snapshot_ts: u64,
2131 current_txn_id: Option<u64>,
2132 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2133 let mut results = Vec::new();
2134 let prefix_handle = ArenaKeyHandle::new(prefix);
2135
2136 if let Some(ref idx) = self.ordered_index {
2137 for entry in idx.range(prefix_handle..) {
2138 let key = entry.key();
2139
2140 if !key.as_bytes().starts_with(prefix) {
2141 break;
2142 }
2143
2144 if let Some(chain) = self.data.get(key)
2145 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2146 && let Some(value) = &v.value
2147 {
2148 results.push((key.as_bytes().to_vec(), value.clone()));
2149 }
2150 }
2151 } else {
2152 for entry in self.data.iter() {
2153 let key = entry.key();
2154 if !key.as_bytes().starts_with(prefix) {
2155 continue;
2156 }
2157 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2158 && let Some(value) = &v.value
2159 {
2160 results.push((key.as_bytes().to_vec(), value.clone()));
2161 }
2162 }
2163 }
2164
2165 results
2166 }
2167
2168 /// Get approximate size
2169 pub fn size(&self) -> u64 {
2170 self.size_bytes.load(Ordering::Relaxed)
2171 }
2172
2173 /// Garbage collect old versions
2174 pub fn gc(&self, min_active_ts: u64) -> usize {
2175 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2176
2177 if dirty_keys.is_empty() {
2178 return 0;
2179 }
2180
2181 let mut gc_count = 0;
2182 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2183
2184 for key in unique_keys {
2185 if let Some(mut entry) = self.data.get_mut(&key) {
2186 let before = entry.value().version_count();
2187 entry.value_mut().gc(min_active_ts);
2188 gc_count += before.saturating_sub(entry.value().version_count());
2189 }
2190 }
2191
2192 gc_count
2193 }
2194}
2195
2196impl Default for ArenaMvccMemTable {
2197 fn default() -> Self {
2198 Self::new()
2199 }
2200}
2201
2202// ============================================================================
2203// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2204// ============================================================================
2205
2206/// Configuration for memtable type selection
2207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2208pub enum MemTableType {
2209 /// Standard MVCC memtable with deferred sorting
2210 /// Best for: general workloads, balanced read/write
2211 Standard,
2212 /// Arena-backed memtable with reduced allocations
2213 /// Best for: write-heavy workloads, large keys
2214 Arena,
2215}
2216
2217impl Default for MemTableType {
2218 fn default() -> Self {
2219 // Default to Standard which now has deferred sorting
2220 MemTableType::Standard
2221 }
2222}
2223
2224/// Unified memtable abstraction using enum dispatch
2225///
2226/// This pattern provides:
2227/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2228/// - Type-safe switching between implementations
2229/// - Easy extensibility for future memtable types
2230///
2231/// ## Why Enum over Trait Object?
2232///
2233/// - Hot path performance: enum match is a single branch vs vtable indirection
2234/// - Cache friendliness: no pointer chasing
2235/// - Inlining: compiler can inline through enum dispatch
2236pub enum MemTableKind {
2237 Standard(MvccMemTable),
2238 Arena(ArenaMvccMemTable),
2239}
2240
2241impl MemTableKind {
2242 /// Create a new memtable of the specified type
2243 pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2244 match kind {
2245 MemTableType::Standard => {
2246 MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2247 }
2248 MemTableType::Arena => {
2249 MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2250 }
2251 }
2252 }
2253
2254 /// Write a key-value pair
2255 #[inline]
2256 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2257 match self {
2258 MemTableKind::Standard(m) => m.write(key, value, txn_id),
2259 MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2260 }
2261 }
2262
2263 /// Write batch of key-value pairs
2264 #[inline]
2265 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2266 match self {
2267 MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2268 MemTableKind::Arena(m) => {
2269 // Convert to arena-compatible format
2270 let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2271 .iter()
2272 .map(|(k, v)| (k.as_slice(), v.clone()))
2273 .collect();
2274 m.write_batch(&arena_writes, txn_id)
2275 }
2276 }
2277 }
2278
2279 /// Read at snapshot timestamp
2280 #[inline]
2281 pub fn read(
2282 &self,
2283 key: &[u8],
2284 snapshot_ts: u64,
2285 current_txn_id: Option<u64>,
2286 ) -> Option<Vec<u8>> {
2287 match self {
2288 MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2289 MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2290 }
2291 }
2292
2293 /// Commit transaction
2294 #[inline]
2295 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2296 match self {
2297 MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2298 MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2299 }
2300 }
2301
2302 /// Abort transaction
2303 #[inline]
2304 pub fn abort(&self, txn_id: u64) {
2305 match self {
2306 MemTableKind::Standard(m) => m.abort(txn_id),
2307 MemTableKind::Arena(m) => m.abort(txn_id),
2308 }
2309 }
2310
2311 /// Scan prefix
2312 #[inline]
2313 pub fn scan_prefix(
2314 &self,
2315 prefix: &[u8],
2316 snapshot_ts: u64,
2317 current_txn_id: Option<u64>,
2318 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2319 match self {
2320 MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2321 MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2322 }
2323 }
2324
2325 /// Scan range
2326 #[inline]
2327 pub fn scan_range(
2328 &self,
2329 start: &[u8],
2330 end: &[u8],
2331 snapshot_ts: u64,
2332 current_txn_id: Option<u64>,
2333 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2334 match self {
2335 MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2336 MemTableKind::Arena(m) => {
2337 // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2338 let mut results = Vec::new();
2339 if let Some(ref idx) = m.ordered_index {
2340 let start_handle = ArenaKeyHandle::new(start);
2341 let end_handle = ArenaKeyHandle::new(end);
2342
2343 if end.is_empty() {
2344 for entry in idx.range(start_handle..) {
2345 let key = entry.key();
2346 if let Some(chain) = m.data.get(key)
2347 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2348 && let Some(value) = &v.value
2349 {
2350 results.push((key.as_bytes().to_vec(), value.clone()));
2351 }
2352 }
2353 } else {
2354 for entry in idx.range(start_handle..end_handle) {
2355 let key = entry.key();
2356 if let Some(chain) = m.data.get(key)
2357 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2358 && let Some(value) = &v.value
2359 {
2360 results.push((key.as_bytes().to_vec(), value.clone()));
2361 }
2362 }
2363 }
2364 } else {
2365 for entry in m.data.iter() {
2366 let key = entry.key();
2367 let key_bytes = key.as_bytes();
2368 if key_bytes < start {
2369 continue;
2370 }
2371 if !end.is_empty() && key_bytes >= end {
2372 continue;
2373 }
2374 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2375 && let Some(value) = &v.value
2376 {
2377 results.push((key_bytes.to_vec(), value.clone()));
2378 }
2379 }
2380 }
2381 results
2382 }
2383 }
2384 }
2385
2386 /// Scan range iterator (returns collected results for now)
2387 #[inline]
2388 pub fn scan_range_iter<'a>(
2389 &'a self,
2390 start: &'a [u8],
2391 end: &'a [u8],
2392 snapshot_ts: u64,
2393 current_txn_id: Option<u64>,
2394 ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2395 match self {
2396 MemTableKind::Standard(m) => {
2397 Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2398 }
2399 MemTableKind::Arena(_) => {
2400 // Arena version returns collected results as iterator
2401 let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2402 Box::new(results.into_iter())
2403 }
2404 }
2405 }
2406
2407 /// Get approximate size
2408 #[inline]
2409 pub fn size(&self) -> u64 {
2410 match self {
2411 MemTableKind::Standard(m) => m.size(),
2412 MemTableKind::Arena(m) => m.size(),
2413 }
2414 }
2415
2416 /// Garbage collect old versions
2417 #[inline]
2418 pub fn gc(&self, min_active_ts: u64) -> usize {
2419 match self {
2420 MemTableKind::Standard(m) => m.gc(min_active_ts),
2421 MemTableKind::Arena(m) => m.gc(min_active_ts),
2422 }
2423 }
2424
2425 /// Get the kind of memtable
2426 pub fn kind(&self) -> MemTableType {
2427 match self {
2428 MemTableKind::Standard(_) => MemTableType::Standard,
2429 MemTableKind::Arena(_) => MemTableType::Arena,
2430 }
2431 }
2432}
2433
2434/// Durable storage engine with full ACID support
2435pub struct DurableStorage {
2436 /// Path to storage directory
2437 path: PathBuf,
2438 /// Write-ahead log
2439 wal: Arc<TxnWal>,
2440 /// MVCC manager
2441 mvcc: Arc<MvccManager>,
2442 /// In-memory data (unified abstraction over Standard/Arena)
2443 memtable: Arc<MemTableKind>,
2444 /// Per-transaction WAL buffers for batched writes
2445 /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2446 /// At commit, buffer is flushed to WAL with single lock acquisition
2447 txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2448 /// Group commit buffer (optional)
2449 group_commit: Option<Arc<EventDrivenGroupCommit>>,
2450 /// Recovery state
2451 needs_recovery: AtomicU64, // 1 = needs recovery
2452 /// Last checkpoint LSN
2453 last_checkpoint_lsn: AtomicU64,
2454 /// Synchronous mode (like SQLite's PRAGMA synchronous)
2455 /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2456 sync_mode: AtomicU64,
2457 /// Commits since last sync (for NORMAL mode)
2458 commits_since_sync: AtomicU64,
2459 /// Adaptive batch sizing for NORMAL mode (Little's Law)
2460 /// Arrival rate in requests/sec × 1000 for precision
2461 arrival_rate_ema: AtomicU64,
2462 /// Last commit timestamp in microseconds
2463 last_commit_us: AtomicU64,
2464 /// Estimated fsync latency in microseconds
2465 fsync_latency_us: AtomicU64,
2466 /// Database lock for exclusive access (None = no locking)
2467 #[allow(dead_code)]
2468 db_lock: Option<crate::lock::DatabaseLock>,
2469 /// Whether at-rest encryption is active for this instance (drives the live
2470 /// per-instance durability matrix). Set from the resolved keyring at open.
2471 at_rest_encrypted: bool,
2472}
2473
2474/// Encryption configuration for opening a [`DurableStorage`].
2475///
2476/// `disabled()` (the default for the legacy open variants) keeps the database
2477/// plaintext and byte-compatible with pre-encryption binaries. `with_kek()`
2478/// supplies the Key-Encryption-Key — the operator secret that *wraps* a
2479/// per-database data key; it is never used verbatim as the cipher key (see
2480/// [`crate::keyring`]). A wrong/missing KEK for an encrypted database fails
2481/// closed at open (the DB will refuse to open, never silently read as plaintext).
2482pub struct StorageEncryption {
2483 /// The KEK. `None` ⇒ plaintext database.
2484 pub kek: Option<EncryptionKey>,
2485 /// Human-readable identifier for the key source (e.g. "env:SOCHDB_ENCRYPTION_KEY",
2486 /// "embedded", "kms:..."). Bound into the keyring for provenance.
2487 pub source_id: String,
2488}
2489
2490impl StorageEncryption {
2491 /// Plaintext (no encryption) — the default.
2492 pub fn disabled() -> Self {
2493 Self {
2494 kek: None,
2495 source_id: "none".to_string(),
2496 }
2497 }
2498
2499 /// Encrypt at rest under the given KEK.
2500 pub fn with_kek(kek: EncryptionKey, source_id: impl Into<String>) -> Self {
2501 Self {
2502 kek: Some(kek),
2503 source_id: source_id.into(),
2504 }
2505 }
2506
2507 /// Whether a key is configured (i.e. encryption is requested).
2508 pub fn is_enabled(&self) -> bool {
2509 self.kek.is_some()
2510 }
2511}
2512
2513impl DurableStorage {
2514 /// Open or create durable storage at path
2515 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2516 Self::open_with_config(path, true)
2517 }
2518
2519 /// Open with configurable ordered index
2520 ///
2521 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2522 /// but scan_prefix becomes O(N) instead of O(log N + K)
2523 pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2524 Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2525 }
2526
2527 /// Open with arena-backed memtable for write-heavy workloads
2528 ///
2529 /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2530 /// Best for workloads with:
2531 /// - High write throughput
2532 /// - Large keys (reduces allocation overhead)
2533 /// - Minimal concurrent reads during writes
2534 pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2535 Self::open_with_full_config(path, true, MemTableType::Arena)
2536 }
2537
2538 /// Open with full configuration options
2539 ///
2540 /// # Arguments
2541 /// * `path` - Storage directory path
2542 /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2543 /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2544 ///
2545 /// # Locking
2546 ///
2547 /// Acquires an exclusive advisory lock on the database directory.
2548 /// This prevents concurrent multi-process access which would corrupt data.
2549 /// If another process has the database open, returns `Err(DatabaseLocked)`.
2550 pub fn open_with_full_config<P: AsRef<Path>>(
2551 path: P,
2552 enable_ordered_index: bool,
2553 memtable_type: MemTableType,
2554 ) -> Result<Self> {
2555 Self::open_with_full_config_internal(
2556 path,
2557 enable_ordered_index,
2558 memtable_type,
2559 true,
2560 StorageEncryption::disabled(),
2561 )
2562 }
2563
2564 /// Open with at-rest encryption configured via [`StorageEncryption`].
2565 ///
2566 /// With `StorageEncryption::disabled()` this is identical to
2567 /// [`Self::open_with_full_config`]. With a KEK, the database is opened (or
2568 /// created) encrypted: a per-DB data key is generated and wrapped into the
2569 /// keyring on first open, and a wrong/missing key on a subsequent open fails
2570 /// closed. Reads are unaffected (the live read path is in-memory); only the
2571 /// WAL write/recovery path pays the AEAD cost.
2572 pub fn open_with_encryption<P: AsRef<Path>>(
2573 path: P,
2574 enable_ordered_index: bool,
2575 memtable_type: MemTableType,
2576 encryption: StorageEncryption,
2577 ) -> Result<Self> {
2578 Self::open_with_full_config_internal(
2579 path,
2580 enable_ordered_index,
2581 memtable_type,
2582 true,
2583 encryption,
2584 )
2585 }
2586
2587 /// The live, per-instance durability capabilities of THIS database.
2588 ///
2589 /// Unlike the build-level [`crate::durability_capabilities`] (which reports
2590 /// defaults), this reflects the actual resolved state — notably whether
2591 /// at-rest encryption is active for this opened instance.
2592 pub fn durability_capabilities(&self) -> DurabilityCapabilities {
2593 DurabilityCapabilities {
2594 crash_recovery: true,
2595 at_rest_encryption: self.at_rest_encrypted,
2596 // PITR / ARIES / fencing remain unwired on the live path (landing
2597 // incrementally in Task 3B).
2598 point_in_time_recovery: false,
2599 aries_checkpoint: false,
2600 wal_fencing: false,
2601 }
2602 }
2603
2604 /// Whether at-rest encryption is active for this instance.
2605 pub fn is_encrypted(&self) -> bool {
2606 self.at_rest_encrypted
2607 }
2608
2609 /// Open without locking (for testing crash recovery scenarios)
2610 ///
2611 /// # Safety
2612 /// This should ONLY be used in tests that simulate crashes by forgetting
2613 /// the storage instance. In production, always use `open_with_full_config`.
2614 #[cfg(test)]
2615 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2616 Self::open_with_full_config_internal(
2617 path,
2618 true,
2619 MemTableType::Standard,
2620 false,
2621 StorageEncryption::disabled(),
2622 )
2623 }
2624
2625 /// Open an ephemeral (in-memory-like) DurableStorage backed by a temp directory.
2626 ///
2627 /// Uses the full DurableStorage engine (WAL, MVCC, SSI) but writes to a
2628 /// temporary directory that is automatically cleaned up when the
2629 /// `EphemeralHandle` is dropped. This ensures test and production code paths
2630 /// are identical — bugs found in tests are guaranteed to reproduce in production.
2631 ///
2632 /// # Returns
2633 /// An `EphemeralHandle` that owns both the storage and the temp directory.
2634 /// Access the storage via `handle.storage()` or `Deref` coercion.
2635 ///
2636 /// # Example
2637 /// ```ignore
2638 /// let handle = DurableStorage::open_ephemeral()?;
2639 /// let txn = handle.begin_transaction()?;
2640 /// handle.write(txn, b"key".to_vec(), b"value".to_vec())?;
2641 /// handle.commit(txn)?;
2642 /// // temp directory cleaned up when `handle` drops
2643 /// ```
2644 pub fn open_ephemeral() -> Result<EphemeralHandle> {
2645 let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2646 let storage = Self::open_with_full_config_internal(
2647 tmp.path(),
2648 true,
2649 MemTableType::Standard,
2650 false, // No lock needed for ephemeral
2651 StorageEncryption::disabled(),
2652 )?;
2653 Ok(EphemeralHandle {
2654 storage,
2655 _tmpdir: tmp,
2656 })
2657 }
2658
2659 /// Open an ephemeral DurableStorage with group commit enabled.
2660 ///
2661 /// Same as `open_ephemeral()` but with group commit for higher throughput.
2662 pub fn open_ephemeral_with_group_commit() -> Result<EphemeralHandle> {
2663 let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2664 let mut storage = Self::open_with_full_config_internal(
2665 tmp.path(),
2666 true,
2667 MemTableType::Standard,
2668 false,
2669 StorageEncryption::disabled(),
2670 )?;
2671
2672 let wal = storage.wal.clone();
2673 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2674 for &txn_id in txn_ids {
2675 let entry = TxnWalEntry::txn_commit(txn_id);
2676 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2677 }
2678 wal.flush().map_err(|e| e.to_string())?;
2679 wal.sync().map_err(|e| e.to_string())?;
2680 Ok(std::time::SystemTime::now()
2681 .duration_since(std::time::UNIX_EPOCH)
2682 .unwrap()
2683 .as_micros() as u64)
2684 });
2685 storage.group_commit = Some(Arc::new(gc));
2686
2687 Ok(EphemeralHandle {
2688 storage,
2689 _tmpdir: tmp,
2690 })
2691 }
2692
2693 fn open_with_full_config_internal<P: AsRef<Path>>(
2694 path: P,
2695 enable_ordered_index: bool,
2696 memtable_type: MemTableType,
2697 acquire_lock: bool,
2698 encryption: StorageEncryption,
2699 ) -> Result<Self> {
2700 let path = path.as_ref().to_path_buf();
2701 std::fs::create_dir_all(&path)?;
2702
2703 // Acquire exclusive lock on database directory (unless disabled for testing)
2704 let db_lock = if acquire_lock {
2705 Some(
2706 crate::lock::DatabaseLock::acquire(&path)
2707 .map_err(|e| SochDBError::LockError(e.to_string()))?,
2708 )
2709 } else {
2710 None
2711 };
2712
2713 let wal_path = path.join("wal.log");
2714
2715 // Resolve at-rest encryption from the keyring BEFORE touching the WAL.
2716 // - A fresh DB (no wal.log yet) with a KEK ⇒ create an encrypted keyring.
2717 // - An existing plaintext DB with a KEK ⇒ refused (must migrate explicitly).
2718 // - An encrypted DB with a wrong/missing KEK ⇒ refused fail-closed.
2719 let is_new_db = !wal_path.exists();
2720 let enc_state = keyring::load_or_init(
2721 &path,
2722 encryption.kek.as_ref(),
2723 &encryption.source_id,
2724 is_new_db,
2725 )?;
2726 let at_rest_encrypted = enc_state.is_encrypted();
2727 let wal = Arc::new(TxnWal::new_with_encryption(
2728 &wal_path,
2729 enc_state.engine(),
2730 enc_state.db_uuid(),
2731 enc_state.key_epoch(),
2732 )?);
2733
2734 let storage = Self {
2735 path,
2736 wal: wal.clone(),
2737 mvcc: Arc::new(MvccManager::new()),
2738 memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2739 txn_write_buffers: DashMap::new(),
2740 group_commit: None,
2741 needs_recovery: AtomicU64::new(0),
2742 last_checkpoint_lsn: AtomicU64::new(0),
2743 sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2744 commits_since_sync: AtomicU64::new(0),
2745 // Adaptive batch sizing (Little's Law)
2746 arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2747 last_commit_us: AtomicU64::new(0),
2748 fsync_latency_us: AtomicU64::new(5000), // 5ms default
2749 db_lock,
2750 at_rest_encrypted,
2751 };
2752
2753 // Check if recovery needed
2754 if storage.check_recovery_needed()? {
2755 storage.needs_recovery.store(1, Ordering::SeqCst);
2756 }
2757
2758 Ok(storage)
2759 }
2760
2761 /// Open with group commit enabled
2762 pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2763 Self::open_with_group_commit_and_config(path, true)
2764 }
2765
2766 /// Open with group commit and configurable ordered index
2767 pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2768 path: P,
2769 enable_ordered_index: bool,
2770 ) -> Result<Self> {
2771 let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2772
2773 let wal = storage.wal.clone();
2774 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2775 // Write all commit records WITHOUT flushing (batch them)
2776 for &txn_id in txn_ids {
2777 let entry = TxnWalEntry::txn_commit(txn_id);
2778 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2779 }
2780
2781 // Then do a SINGLE flush + fsync for the entire batch
2782 wal.flush().map_err(|e| e.to_string())?;
2783 wal.sync().map_err(|e| e.to_string())?;
2784
2785 // Return commit timestamp
2786 Ok(std::time::SystemTime::now()
2787 .duration_since(std::time::UNIX_EPOCH)
2788 .unwrap()
2789 .as_micros() as u64)
2790 });
2791
2792 storage.group_commit = Some(Arc::new(gc));
2793 Ok(storage)
2794 }
2795
2796 /// Open with IndexPolicy for automatic memtable/index configuration
2797 ///
2798 /// This is the recommended constructor for new code. The policy determines:
2799 /// - Whether to use ordered index (ScanOptimized only)
2800 /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2801 /// - Default settings optimized for the workload pattern
2802 ///
2803 /// # Arguments
2804 /// * `path` - Storage directory path
2805 /// * `policy` - Index policy determining write/scan tradeoffs
2806 /// * `group_commit` - Whether to enable group commit for throughput
2807 pub fn open_with_policy<P: AsRef<Path>>(
2808 path: P,
2809 policy: crate::index_policy::IndexPolicy,
2810 group_commit: bool,
2811 ) -> Result<Self> {
2812 use crate::index_policy::IndexPolicy;
2813
2814 // Derive configuration from policy
2815 let (enable_ordered_index, memtable_type) = match policy {
2816 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2817 // Write-heavy: no ordered index, use arena for reduced allocations
2818 (false, MemTableType::Arena)
2819 }
2820 IndexPolicy::Balanced => {
2821 // Mixed OLTP: deferred sorting (already implemented in Standard)
2822 (true, MemTableType::Standard)
2823 }
2824 IndexPolicy::ScanOptimized => {
2825 // Scan-heavy: maintain ordered index
2826 (true, MemTableType::Standard)
2827 }
2828 };
2829
2830 if group_commit {
2831 let mut storage =
2832 Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2833
2834 let wal = storage.wal.clone();
2835 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2836 for &txn_id in txn_ids {
2837 let entry = TxnWalEntry::txn_commit(txn_id);
2838 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2839 }
2840 wal.flush().map_err(|e| e.to_string())?;
2841 wal.sync().map_err(|e| e.to_string())?;
2842 Ok(std::time::SystemTime::now()
2843 .duration_since(std::time::UNIX_EPOCH)
2844 .unwrap()
2845 .as_micros() as u64)
2846 });
2847 storage.group_commit = Some(Arc::new(gc));
2848 Ok(storage)
2849 } else {
2850 Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2851 }
2852 }
2853
2854 /// Open storage for concurrent mode (multi-reader, single-writer)
2855 ///
2856 /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2857 /// Coordination is handled by the concurrent MVCC layer instead.
2858 ///
2859 /// # Safety
2860 ///
2861 /// This must ONLY be called from `Database::open_concurrent()` which
2862 /// manages the concurrent MVCC coordination. Direct use will cause
2863 /// data corruption.
2864 pub fn open_for_concurrent<P: AsRef<Path>>(
2865 path: P,
2866 policy: crate::index_policy::IndexPolicy,
2867 ) -> Result<Self> {
2868 use crate::index_policy::IndexPolicy;
2869
2870 let (enable_ordered_index, memtable_type) = match policy {
2871 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => (false, MemTableType::Arena),
2872 IndexPolicy::Balanced => (true, MemTableType::Standard),
2873 IndexPolicy::ScanOptimized => (true, MemTableType::Standard),
2874 };
2875
2876 // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2877 Self::open_with_full_config_internal(
2878 path,
2879 enable_ordered_index,
2880 memtable_type,
2881 false,
2882 StorageEncryption::disabled(),
2883 )
2884 }
2885
2886 /// Get the memtable type being used
2887 pub fn memtable_type(&self) -> MemTableType {
2888 self.memtable.kind()
2889 }
2890
2891 /// Check if recovery is needed (dirty shutdown detection)
2892 ///
2893 /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2894 /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2895 /// but committed data still needs to be loaded from WAL into the memtable.
2896 fn check_recovery_needed(&self) -> Result<bool> {
2897 let marker_path = self.path.join(".clean_shutdown");
2898 if marker_path.exists() {
2899 // Clean shutdown - remove marker
2900 std::fs::remove_file(&marker_path)?;
2901 }
2902 // ALWAYS need recovery to rebuild memtable from WAL
2903 // The memtable is in-memory only and needs to be restored on every startup
2904 Ok(true)
2905 }
2906
2907 /// Perform crash recovery
2908 pub fn recover(&self) -> Result<RecoveryStats> {
2909 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2910 return Ok(RecoveryStats::default());
2911 }
2912
2913 let (writes, txn_count) = self.wal.replay_for_recovery()?;
2914
2915 // Apply committed writes to memtable
2916 let recovery_txn_id = self.wal.alloc_txn_id();
2917 let commit_ts = self.mvcc.alloc_commit_ts();
2918
2919 // Collect keys being written for efficient commit
2920 let mut write_set: HashSet<InlineKey> = HashSet::new();
2921 for (key, value) in &writes {
2922 write_set.insert(SmallVec::from_slice(key));
2923 self.memtable
2924 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2925 }
2926 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2927
2928 self.needs_recovery.store(0, Ordering::SeqCst);
2929
2930 Ok(RecoveryStats {
2931 transactions_recovered: txn_count,
2932 writes_recovered: writes.len(),
2933 commit_ts,
2934 })
2935 }
2936
2937 /// Begin a new transaction
2938 pub fn begin_transaction(&self) -> Result<u64> {
2939 let txn_id = self.wal.begin_transaction()?;
2940 self.mvcc.begin(txn_id);
2941 Ok(txn_id)
2942 }
2943
2944 /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2945 ///
2946 /// This enables mode-aware optimizations:
2947 /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2948 /// - WriteOnly: Skip read tracking, faster bulk inserts
2949 /// - ReadWrite: Full SSI for serializable isolation
2950 pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2951 let txn_id = self.wal.begin_transaction()?;
2952 self.mvcc.begin_with_mode(txn_id, mode);
2953 Ok(txn_id)
2954 }
2955
2956 /// Begin a read-only transaction without any WAL records.
2957 ///
2958 /// This is a performance-critical optimization that eliminates two WAL
2959 /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
2960 /// transactions have no state to recover, WAL records are unnecessary.
2961 ///
2962 /// Callers MUST use `abort_read_only_fast()` to clean up.
2963 #[inline]
2964 pub fn begin_read_only_fast(&self) -> u64 {
2965 let txn_id = self.wal.alloc_txn_id();
2966 self.mvcc.begin_read_only(txn_id);
2967 txn_id
2968 }
2969
2970 /// Abort a fast read-only transaction.
2971 ///
2972 /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
2973 #[inline]
2974 pub fn abort_read_only_fast(&self, txn_id: u64) {
2975 self.mvcc.abort(txn_id);
2976 }
2977
2978 /// Read a key WITHOUT any MVCC transaction tracking.
2979 ///
2980 /// Uses the current global timestamp to see all committed writes.
2981 /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
2982 /// Only safe for single-threaded access (no concurrent writes).
2983 #[inline]
2984 pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
2985 let snapshot_ts = self
2986 .mvcc
2987 .ts_counter
2988 .load(std::sync::atomic::Ordering::Relaxed);
2989 self.memtable.read(key, snapshot_ts, None)
2990 }
2991
2992 /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
2993 ///
2994 /// Uses the current global timestamp. Only safe for single-threaded access.
2995 #[inline]
2996 pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
2997 let snapshot_ts = self
2998 .mvcc
2999 .ts_counter
3000 .load(std::sync::atomic::Ordering::Relaxed);
3001 self.memtable.scan_prefix(prefix, snapshot_ts, None)
3002 }
3003
3004 /// Read a key within a transaction
3005 #[inline]
3006 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
3007 // Fast path: get just snapshot_ts without cloning whole transaction
3008 let snapshot_ts = self
3009 .mvcc
3010 .get_snapshot_ts(txn_id)
3011 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3012
3013 // Record read for SSI (skipped for read-only transactions)
3014 self.mvcc.record_read(txn_id, key);
3015
3016 // Read at snapshot timestamp, seeing own uncommitted writes
3017 Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
3018 }
3019
3020 /// Write a key-value pair within a transaction
3021 ///
3022 /// Writes are buffered and only flushed to disk on commit.
3023 /// This provides ~10× better throughput for batched inserts.
3024 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
3025 // Use the zero-allocation path internally
3026 self.write_refs(txn_id, &key, &value)?;
3027
3028 Ok(())
3029 }
3030
3031 /// Write from references - zero allocation hot path
3032 ///
3033 /// Avoids cloning key/value by writing to WAL from refs directly,
3034 /// then only allocating once for memtable storage.
3035 #[inline]
3036 pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
3037 // Record write for MVCC (uses InlineKey - zero allocation for small keys)
3038 self.mvcc.record_write(txn_id, key);
3039
3040 // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
3041 // This reduces lock contention from O(writes) to O(1) per transaction
3042 self.txn_write_buffers
3043 .entry(txn_id)
3044 .or_insert_with(|| TxnWalBuffer::new(txn_id))
3045 .append(key, value);
3046
3047 // Write to memtable (needs owned key/value for storage)
3048 self.memtable
3049 .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
3050
3051 Ok(())
3052 }
3053
3054 /// Delete a key within a transaction
3055 pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
3056 // Record write (uses InlineKey - zero allocation for small keys)
3057 self.mvcc.record_write(txn_id, &key);
3058
3059 // Buffer tombstone in memory - NO WAL lock taken!
3060 self.txn_write_buffers
3061 .entry(txn_id)
3062 .or_insert_with(|| TxnWalBuffer::new(txn_id))
3063 .append(&key, &[]); // Empty value = tombstone
3064
3065 // Write tombstone to memtable
3066 self.memtable.write(key, None, txn_id)?;
3067
3068 Ok(())
3069 }
3070
3071 /// Batch write multiple key-value pairs with reduced overhead
3072 ///
3073 /// This API amortizes fixed costs over the batch:
3074 /// - Single DashMap entry lookup for TxnWalBuffer
3075 /// - Single MVCC write set update
3076 /// - Batch memtable operations
3077 ///
3078 /// Performance: ~2-3x faster than individual write_refs calls
3079 /// for batches of 100+ entries.
3080 ///
3081 /// # Arguments
3082 /// * `txn_id` - Transaction ID
3083 /// * `writes` - Slice of (key, value) pairs
3084 #[inline]
3085 pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
3086 if writes.is_empty() {
3087 return Ok(());
3088 }
3089
3090 // Single DashMap access for entire batch
3091 let mut buffer_entry = self
3092 .txn_write_buffers
3093 .entry(txn_id)
3094 .or_insert_with(|| TxnWalBuffer::new(txn_id));
3095
3096 // Batch operations with reduced per-row overhead
3097 for (key, value) in writes {
3098 // Record write for MVCC
3099 self.mvcc.record_write(txn_id, key);
3100
3101 // Append to WAL buffer
3102 buffer_entry.append(key, value);
3103 }
3104 drop(buffer_entry);
3105
3106 // Batch write to memtable
3107 let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
3108 .iter()
3109 .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
3110 .collect();
3111 self.memtable.write_batch(&owned_writes, txn_id)?;
3112
3113 Ok(())
3114 }
3115
3116 /// Commit a transaction
3117 ///
3118 /// With sync_mode:
3119 /// - 0 (OFF): No sync, risk of data loss
3120 /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
3121 /// - 2 (FULL): Sync every commit (safest, slowest)
3122 pub fn commit(&self, txn_id: u64) -> Result<u64> {
3123 // First, flush all buffered DATA writes to the WAL with a SINGLE lock
3124 // acquisition (O(1) lock instead of O(writes) locks). Flushing data
3125 // records before validation is safe: ARIES recovery only treats them as
3126 // winners if a *durable commit record* for this transaction also exists,
3127 // and that record is written below — strictly after validation succeeds.
3128 if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
3129 && !buffer.is_empty()
3130 {
3131 // Flush entire buffer to WAL with one lock
3132 self.wal.flush_buffer(&buffer)?;
3133 }
3134
3135 // ====================================================================
3136 // Task 1 — Linearizable commit invariant:
3137 //
3138 // A commit record must NEVER become durable unless the transaction
3139 // has already passed SSI validation and its write set is final.
3140 //
3141 // We therefore VALIDATE (and freeze the write set) *before* making the
3142 // commit record durable. `mvcc.commit` performs SSI validation and
3143 // returns `None` on a dangerous structure (serialization conflict) or
3144 // if the transaction no longer exists. Writing/fsyncing the commit
3145 // record before this point would let a crash resurrect a transaction
3146 // that the live system rejected — the classic "committed-after-crash,
3147 // aborted-before-crash" non-linearizability bug.
3148 // ====================================================================
3149 let (commit_ts, write_set) = self.mvcc.commit(txn_id).ok_or_else(|| {
3150 SochDBError::Validation(
3151 "transaction aborted: SSI validation failed (serialization conflict) \
3152 or transaction not found"
3153 .into(),
3154 )
3155 })?;
3156
3157 // Validation passed and the write set is frozen. Only now may the
3158 // commit record become durable.
3159 if let Some(gc) = &self.group_commit {
3160 // Submit the *validated* commit intent to group commit and wait.
3161 // This batches multiple validated commits into a single fsync.
3162 gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
3163 } else {
3164 // Direct commit path with adaptive sync (Little's Law)
3165 let sync_mode = self.sync_mode.load(Ordering::Relaxed);
3166 let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
3167
3168 // Update arrival rate for adaptive batching
3169 self.update_arrival_rate();
3170
3171 // Write commit record (no flush yet - BufWriter will buffer it)
3172 let entry = TxnWalEntry::txn_commit(txn_id);
3173 self.wal.append_no_flush(&entry)?;
3174
3175 // Determine if we should sync/flush based on mode
3176 let should_sync = match sync_mode {
3177 0 => false, // OFF: never sync
3178 1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
3179 _ => true, // FULL: always sync
3180 };
3181
3182 if should_sync {
3183 // Measure fsync latency for adaptive tuning
3184 let start = std::time::Instant::now();
3185 self.wal.flush()?;
3186 self.wal.sync()?;
3187 let latency_us = start.elapsed().as_micros() as u64;
3188
3189 // Update fsync latency estimate (EMA with α = 0.1)
3190 let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
3191 let new_latency = (old_latency * 9 + latency_us) / 10;
3192 self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
3193
3194 self.commits_since_sync.store(0, Ordering::Relaxed);
3195 }
3196 }
3197
3198 // Commit record is durable (or buffered per sync mode) for a validated
3199 // transaction — publish the writes to the memtable. (O(k), k = keys.)
3200 self.memtable.commit(txn_id, commit_ts, &write_set);
3201
3202 Ok(commit_ts)
3203 }
3204
3205 /// Update arrival rate using exponential moving average
3206 #[inline]
3207 fn update_arrival_rate(&self) {
3208 let now_us = std::time::SystemTime::now()
3209 .duration_since(std::time::UNIX_EPOCH)
3210 .unwrap()
3211 .as_micros() as u64;
3212
3213 let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
3214
3215 if last > 0 {
3216 let delta_us = now_us.saturating_sub(last);
3217 if delta_us > 0 && delta_us < 10_000_000 {
3218 // Ignore gaps > 10s
3219 // Rate = 1_000_000 / delta_us (requests/sec)
3220 // Stored as rate × 1000 for precision
3221 let instant_rate = 1_000_000_000 / delta_us;
3222
3223 // EMA with α = 0.1
3224 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
3225 let new_rate = (old_rate * 9 + instant_rate) / 10;
3226 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
3227 }
3228 }
3229 }
3230
3231 /// Compute optimal batch threshold using Little's Law
3232 ///
3233 /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
3234 /// Returns the number of commits to batch before fsync
3235 #[inline]
3236 fn adaptive_batch_threshold(&self) -> u64 {
3237 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
3238 let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
3239
3240 if lambda <= 0.0 || tau <= 0.0 {
3241 return 100; // Fallback to fixed threshold
3242 }
3243
3244 // Little's Law: W* = sqrt(2 × τ × λ)
3245 // This minimizes total time = wait_time + fsync_overhead
3246 let n_opt = (2.0 * tau * lambda).sqrt();
3247
3248 // Clamp between 1 and 1000
3249 (n_opt as u64).clamp(1, 1000)
3250 }
3251
3252 /// Set synchronous mode
3253 ///
3254 /// - 0: OFF - No fsync (risk of data loss)
3255 /// - 1: NORMAL - Periodic fsync (balanced)
3256 /// - 2: FULL - Fsync every commit (safest)
3257 pub fn set_sync_mode(&self, mode: u64) {
3258 self.sync_mode.store(mode.min(2), Ordering::Relaxed);
3259 }
3260
3261 /// Force a group commit flush (useful for benchmarking or testing)
3262 pub fn flush_group_commit(&self) {
3263 if let Some(gc) = &self.group_commit {
3264 gc.flush_batch();
3265 }
3266 }
3267
3268 /// Abort a transaction
3269 ///
3270 /// Performance: O(1) for read-only transactions (no writes to clean up).
3271 /// For write transactions, O(N) memtable scan is required to remove
3272 /// uncommitted versions.
3273 pub fn abort(&self, txn_id: u64) -> Result<()> {
3274 // Check if transaction had any buffered writes.
3275 // Read-only transactions never populate txn_write_buffers,
3276 // so this returns None — allowing us to skip the O(N) memtable scan.
3277 let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3278
3279 if had_writes {
3280 // Write abort record to WAL (only needed if data was written)
3281 self.wal.abort_transaction(txn_id)?;
3282 // Clean up uncommitted memtable entries
3283 self.memtable.abort(txn_id);
3284 }
3285
3286 // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3287 self.mvcc.abort(txn_id);
3288
3289 Ok(())
3290 }
3291
3292 /// Scan keys with prefix
3293 #[inline]
3294 pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3295 // Fast path: get just snapshot_ts without cloning whole transaction
3296 let snapshot_ts = self
3297 .mvcc
3298 .get_snapshot_ts(txn_id)
3299 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3300
3301 // Note: Scan doesn't record individual key reads for SSI (too expensive)
3302 // SSI conflicts are tracked at the prefix level if needed
3303 Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3304 }
3305
3306 /// Scan keys in range
3307 #[inline]
3308 pub fn scan_range(
3309 &self,
3310 txn_id: u64,
3311 start: &[u8],
3312 end: &[u8],
3313 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3314 let snapshot_ts = self
3315 .mvcc
3316 .get_snapshot_ts(txn_id)
3317 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3318
3319 Ok(self
3320 .memtable
3321 .scan_range(start, end, snapshot_ts, Some(txn_id)))
3322 }
3323
3324 /// Streaming scan for very large result sets
3325 ///
3326 /// Returns an iterator that yields (key, value) pairs without
3327 /// materializing the entire result set in memory.
3328 #[inline]
3329 pub fn scan_range_iter<'a>(
3330 &'a self,
3331 txn_id: u64,
3332 start: &'a [u8],
3333 end: &'a [u8],
3334 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3335 let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3336 self.memtable
3337 .scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3338 }
3339
3340 /// Force fsync to disk
3341 /// Flush the WAL's in-memory buffer to the OS
3342 ///
3343 /// This ensures all buffered writes are pushed from the BufWriter
3344 /// into the OS page cache. Call this before `fsync()` to ensure
3345 /// all data is durable.
3346 pub fn flush_wal(&self) -> Result<()> {
3347 self.wal.flush()
3348 }
3349
3350 /// Force sync the WAL to disk (fsync)
3351 pub fn fsync(&self) -> Result<()> {
3352 self.wal.sync()
3353 }
3354
3355 /// Write checkpoint
3356 pub fn checkpoint(&self) -> Result<u64> {
3357 let txn_id = 0; // System transaction
3358 let entry = TxnWalEntry::checkpoint(txn_id);
3359 let lsn = self.wal.append_sync(&entry)?;
3360 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3361 Ok(lsn)
3362 }
3363
3364 /// Truncate the WAL file after checkpoint.
3365 ///
3366 /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3367 /// space. The in-memory memtable retains all data for the current
3368 /// session, but a crash after truncation will result in data loss
3369 /// since the WAL is the only persistence mechanism for DurableStorage.
3370 ///
3371 /// Call after `checkpoint()` when WAL durability across restarts is
3372 /// not required (e.g. desktop telemetry viewers, caches).
3373 pub fn truncate_wal(&self) -> Result<()> {
3374 self.wal.truncate()
3375 }
3376
3377 /// Get storage statistics
3378 pub fn stats(&self) -> StorageStats {
3379 // Get WAL size from the WAL manager
3380 let wal_size = self.wal.size_bytes();
3381
3382 // Get active transaction count from MVCC
3383 let active_txns = self.mvcc.active_transaction_count();
3384
3385 StorageStats {
3386 memtable_size_bytes: self.memtable.size(),
3387 wal_size_bytes: wal_size,
3388 active_transactions: active_txns,
3389 min_active_snapshot: self.mvcc.min_active_snapshot(),
3390 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3391 }
3392 }
3393
3394 /// Garbage collect old versions
3395 pub fn gc(&self) -> usize {
3396 let min_ts = self.mvcc.min_active_snapshot();
3397 self.memtable.gc(min_ts)
3398 }
3399
3400 /// Clean shutdown
3401 pub fn shutdown(&self) -> Result<()> {
3402 // Sync WAL
3403 self.fsync()?;
3404
3405 // Write clean shutdown marker
3406 let marker_path = self.path.join(".clean_shutdown");
3407 std::fs::write(&marker_path, b"clean")?;
3408
3409 Ok(())
3410 }
3411}
3412
3413impl Drop for DurableStorage {
3414 fn drop(&mut self) {
3415 let _ = self.shutdown();
3416 }
3417}
3418
3419// =============================================================================
3420// EphemeralHandle - Temp-directory-backed DurableStorage for testing
3421// =============================================================================
3422
3423/// Owns a `DurableStorage` instance backed by a temporary directory.
3424///
3425/// The temp directory is automatically cleaned up when this handle is dropped.
3426/// Access the underlying storage via `Deref` coercion or `.storage()`.
3427///
3428/// # Why this exists
3429///
3430/// SochDB previously had two storage engines — `LscsStorage` (BTreeMap-backed,
3431/// in-memory WAL, used by tests) and `DurableStorage` (SkipMap-backed, real WAL,
3432/// used in production). This dual-engine architecture meant bugs could surface
3433/// only in production because the test path exercised different code.
3434///
3435/// `EphemeralHandle` eliminates this divergence: tests use the exact same
3436/// `DurableStorage` engine as production, just backed by a temp directory.
3437pub struct EphemeralHandle {
3438 storage: DurableStorage,
3439 _tmpdir: tempfile::TempDir,
3440}
3441
3442impl EphemeralHandle {
3443 /// Get a reference to the underlying storage
3444 pub fn storage(&self) -> &DurableStorage {
3445 &self.storage
3446 }
3447
3448 /// Get a mutable reference to the underlying storage
3449 pub fn storage_mut(&mut self) -> &mut DurableStorage {
3450 &mut self.storage
3451 }
3452
3453 /// Consume the handle and return the storage and temp directory.
3454 ///
3455 /// Useful when you need an `Arc<DurableStorage>` — the caller must keep
3456 /// the `TempDir` alive for the lifetime of the storage.
3457 pub fn into_parts(self) -> (DurableStorage, tempfile::TempDir) {
3458 (self.storage, self._tmpdir)
3459 }
3460}
3461
3462impl std::ops::Deref for EphemeralHandle {
3463 type Target = DurableStorage;
3464 fn deref(&self) -> &DurableStorage {
3465 &self.storage
3466 }
3467}
3468
3469impl std::ops::DerefMut for EphemeralHandle {
3470 fn deref_mut(&mut self) -> &mut DurableStorage {
3471 &mut self.storage
3472 }
3473}
3474
3475/// Recovery statistics
3476#[derive(Debug, Default)]
3477pub struct RecoveryStats {
3478 pub transactions_recovered: usize,
3479 pub writes_recovered: usize,
3480 pub commit_ts: u64,
3481}
3482
3483/// Storage statistics
3484#[derive(Debug, Default)]
3485pub struct StorageStats {
3486 pub memtable_size_bytes: u64,
3487 pub wal_size_bytes: u64,
3488 pub active_transactions: usize,
3489 pub min_active_snapshot: u64,
3490 pub last_checkpoint_lsn: u64,
3491}
3492
3493#[cfg(test)]
3494mod tests {
3495 use super::*;
3496 use tempfile::tempdir;
3497
3498 #[test]
3499 fn test_basic_transaction() {
3500 let dir = tempdir().unwrap();
3501 let storage = DurableStorage::open(dir.path()).unwrap();
3502
3503 // Begin transaction
3504 let txn_id = storage.begin_transaction().unwrap();
3505
3506 // Write data
3507 storage
3508 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3509 .unwrap();
3510 storage
3511 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3512 .unwrap();
3513
3514 // Read back (within same transaction)
3515 let v1 = storage.read(txn_id, b"key1").unwrap();
3516 assert_eq!(v1, Some(b"value1".to_vec()));
3517
3518 // Commit
3519 let commit_ts = storage.commit(txn_id).unwrap();
3520 assert!(commit_ts > 0);
3521
3522 // Read in new transaction
3523 let txn2 = storage.begin_transaction().unwrap();
3524 let v1 = storage.read(txn2, b"key1").unwrap();
3525 assert_eq!(v1, Some(b"value1".to_vec()));
3526 storage.abort(txn2).unwrap();
3527 }
3528
3529 #[test]
3530 fn test_snapshot_isolation() {
3531 let dir = tempdir().unwrap();
3532 let storage = DurableStorage::open(dir.path()).unwrap();
3533
3534 // T1: Write initial value
3535 let t1 = storage.begin_transaction().unwrap();
3536 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3537 storage.commit(t1).unwrap();
3538
3539 // T2: Start reading (snapshot at this point)
3540 let t2 = storage.begin_transaction().unwrap();
3541
3542 // T3: Update the value
3543 let t3 = storage.begin_transaction().unwrap();
3544 storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3545 storage.commit(t3).unwrap();
3546
3547 // T2 should still see v1 (snapshot isolation)
3548 let v = storage.read(t2, b"key").unwrap();
3549 assert_eq!(v, Some(b"v1".to_vec()));
3550
3551 // New transaction should see v2
3552 let t4 = storage.begin_transaction().unwrap();
3553 let v = storage.read(t4, b"key").unwrap();
3554 assert_eq!(v, Some(b"v2".to_vec()));
3555
3556 storage.abort(t2).unwrap();
3557 storage.abort(t4).unwrap();
3558 }
3559
3560 #[test]
3561 fn test_gc_preserves_versions_for_active_snapshot() {
3562 // GC must not free a version that an in-flight reader's snapshot can
3563 // still observe. The low-water-mark (min_active_snapshot) is the oldest
3564 // live reader; any version visible to it must survive GC.
3565 let dir = tempdir().unwrap();
3566 let storage = DurableStorage::open(dir.path()).unwrap();
3567
3568 // Seed an initial committed version v1.
3569 let t1 = storage.begin_transaction().unwrap();
3570 storage.write(t1, b"k".to_vec(), b"v1".to_vec()).unwrap();
3571 storage.commit(t1).unwrap();
3572
3573 // Open a long-lived snapshot reader BEFORE any newer writes. Its
3574 // snapshot_ts pins the GC watermark so v1 cannot be reclaimed.
3575 let reader = storage.begin_transaction().unwrap();
3576 assert_eq!(
3577 storage.read(reader, b"k").unwrap(),
3578 Some(b"v1".to_vec()),
3579 "reader's snapshot must see v1"
3580 );
3581
3582 // Produce several newer committed versions while the reader is active.
3583 for v in ["v2", "v3", "v4"] {
3584 let w = storage.begin_transaction().unwrap();
3585 storage
3586 .write(w, b"k".to_vec(), v.as_bytes().to_vec())
3587 .unwrap();
3588 storage.commit(w).unwrap();
3589 }
3590
3591 // Run GC. Because `reader` is still active, the watermark is pinned at
3592 // its snapshot and the version it needs (v1) must NOT be freed.
3593 let _freed = storage.gc();
3594 assert_eq!(
3595 storage.read(reader, b"k").unwrap(),
3596 Some(b"v1".to_vec()),
3597 "GC must not free a version still visible to an active snapshot"
3598 );
3599
3600 // A fresh transaction sees the latest committed version.
3601 let fresh = storage.begin_transaction().unwrap();
3602 assert_eq!(storage.read(fresh, b"k").unwrap(), Some(b"v4".to_vec()));
3603 storage.abort(fresh).unwrap();
3604
3605 // Release the old reader; the watermark can now advance.
3606 storage.abort(reader).unwrap();
3607
3608 // After the old snapshot is gone, GC may reclaim superseded versions,
3609 // and new readers still resolve the latest value correctly.
3610 let _freed2 = storage.gc();
3611 let after = storage.begin_transaction().unwrap();
3612 assert_eq!(storage.read(after, b"k").unwrap(), Some(b"v4".to_vec()));
3613 storage.abort(after).unwrap();
3614 }
3615
3616 #[test]
3617 fn test_ssi_detects_write_skew() {
3618 // Classic write-skew: two concurrent transactions each read what the
3619 // other writes (disjoint write keys). Under plain SI both would commit,
3620 // violating serializability. Under SSI the pivot transaction (with both
3621 // an inbound and an outbound rw-edge) must be aborted, so the two
3622 // commits cannot both succeed.
3623 let dir = tempdir().unwrap();
3624 let storage = DurableStorage::open(dir.path()).unwrap();
3625
3626 // Seed two keys.
3627 let seed = storage.begin_transaction().unwrap();
3628 storage.write(seed, b"x".to_vec(), b"0".to_vec()).unwrap();
3629 storage.write(seed, b"y".to_vec(), b"0".to_vec()).unwrap();
3630 storage.commit(seed).unwrap();
3631
3632 // T1 reads x and y, then writes x.
3633 let t1 = storage.begin_transaction().unwrap();
3634 let _ = storage.read(t1, b"x").unwrap();
3635 let _ = storage.read(t1, b"y").unwrap();
3636
3637 // T2 reads x and y, then writes y. (Concurrent with T1.)
3638 let t2 = storage.begin_transaction().unwrap();
3639 let _ = storage.read(t2, b"x").unwrap();
3640 let _ = storage.read(t2, b"y").unwrap();
3641
3642 storage.write(t1, b"x".to_vec(), b"1".to_vec()).unwrap();
3643 storage.write(t2, b"y".to_vec(), b"1".to_vec()).unwrap();
3644
3645 // First committer wins.
3646 let c1 = storage.commit(t1);
3647 assert!(c1.is_ok(), "first committer should succeed: {c1:?}");
3648
3649 // The second committer forms a dangerous rw-structure with T1 and must
3650 // be rejected to preserve serializability.
3651 let c2 = storage.commit(t2);
3652 assert!(
3653 c2.is_err(),
3654 "SSI must abort the write-skew pivot, but commit succeeded"
3655 );
3656 }
3657
3658 #[test]
3659 fn test_abort_transaction() {
3660 let dir = tempdir().unwrap();
3661 let storage = DurableStorage::open(dir.path()).unwrap();
3662
3663 // Write initial value
3664 let t1 = storage.begin_transaction().unwrap();
3665 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3666 storage.commit(t1).unwrap();
3667
3668 // Start transaction that will abort
3669 let t2 = storage.begin_transaction().unwrap();
3670 storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3671 storage.abort(t2).unwrap();
3672
3673 // New transaction should see v1 (aborted changes not visible)
3674 let t3 = storage.begin_transaction().unwrap();
3675 let v = storage.read(t3, b"key").unwrap();
3676 assert_eq!(v, Some(b"v1".to_vec()));
3677 storage.abort(t3).unwrap();
3678 }
3679
3680 #[test]
3681 fn test_crash_recovery() {
3682 let dir = tempdir().unwrap();
3683
3684 // Phase 1: Write data and commit
3685 {
3686 // Use open_without_lock for crash simulation tests
3687 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3688
3689 // Set sync mode to FULL to ensure data is synced before "crash"
3690 storage.set_sync_mode(2); // FULL: sync every commit
3691
3692 let txn = storage.begin_transaction().unwrap();
3693 storage
3694 .write(txn, b"persist".to_vec(), b"data".to_vec())
3695 .unwrap();
3696 storage.commit(txn).unwrap();
3697
3698 // Simulate crash (no clean shutdown)
3699 std::mem::forget(storage);
3700 }
3701
3702 // Phase 2: Reopen and recover
3703 {
3704 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3705 let stats = storage.recover().unwrap();
3706 assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3707
3708 // Data should be recovered
3709 let txn = storage.begin_transaction().unwrap();
3710 let v = storage.read(txn, b"persist").unwrap();
3711 assert_eq!(v, Some(b"data".to_vec()));
3712 storage.abort(txn).unwrap();
3713 }
3714 }
3715
3716 /// At-rest encryption end-to-end through the live DurableStorage engine
3717 /// (Task 3B): an encrypted DB round-trips committed data, never leaks
3718 /// plaintext to the WAL file, flips the per-instance durability matrix, and
3719 /// fails CLOSED on a wrong or missing key (never a silent plaintext/empty
3720 /// open).
3721 #[test]
3722 fn test_at_rest_encryption_end_to_end() {
3723 let dir = tempdir().unwrap();
3724 let kek = || EncryptionKey::new([0xABu8; 32]);
3725
3726 // Phase 1: create an encrypted DB, write committed data, clean close.
3727 {
3728 let storage = DurableStorage::open_with_encryption(
3729 dir.path(),
3730 true,
3731 MemTableType::Standard,
3732 StorageEncryption::with_kek(kek(), "test"),
3733 )
3734 .unwrap();
3735 assert!(storage.is_encrypted());
3736 assert!(storage.durability_capabilities().at_rest_encryption);
3737 storage.set_sync_mode(2);
3738 let t = storage.begin_transaction().unwrap();
3739 storage
3740 .write(t, b"secret-key".to_vec(), b"secret-value".to_vec())
3741 .unwrap();
3742 storage.commit(t).unwrap();
3743 } // clean drop releases the file lock
3744
3745 // The keyring exists and the WAL bytes do not leak the plaintext record.
3746 assert!(dir.path().join("keyring.json").exists());
3747 let raw = std::fs::read(dir.path().join("wal.log")).unwrap();
3748 assert!(!contains(&raw, b"secret-value"), "value leaked to WAL");
3749 assert!(!contains(&raw, b"secret-key"), "key leaked to WAL");
3750
3751 // Phase 2: reopen with the CORRECT key, recover, read back.
3752 {
3753 let storage = DurableStorage::open_with_encryption(
3754 dir.path(),
3755 true,
3756 MemTableType::Standard,
3757 StorageEncryption::with_kek(kek(), "test"),
3758 )
3759 .unwrap();
3760 storage.recover().unwrap();
3761 let t = storage.begin_transaction().unwrap();
3762 assert_eq!(
3763 storage.read(t, b"secret-key").unwrap(),
3764 Some(b"secret-value".to_vec()),
3765 "committed encrypted data must round-trip"
3766 );
3767 storage.abort(t).unwrap();
3768 }
3769
3770 // Phase 3: WRONG key must fail closed (keyring canary), not open empty.
3771 {
3772 let wrong = DurableStorage::open_with_encryption(
3773 dir.path(),
3774 true,
3775 MemTableType::Standard,
3776 StorageEncryption::with_kek(EncryptionKey::new([0x00u8; 32]), "test"),
3777 );
3778 assert!(wrong.is_err(), "wrong KEK must fail closed");
3779 }
3780
3781 // Phase 4: opening an encrypted DB with NO key must fail closed.
3782 {
3783 let no_key = DurableStorage::open_with_encryption(
3784 dir.path(),
3785 true,
3786 MemTableType::Standard,
3787 StorageEncryption::disabled(),
3788 );
3789 assert!(
3790 no_key.is_err(),
3791 "encrypted DB opened without key must fail closed"
3792 );
3793 }
3794 }
3795
3796 /// A plaintext DB reports at_rest_encryption=false on the live matrix.
3797 #[test]
3798 fn test_plaintext_db_reports_unencrypted() {
3799 let dir = tempdir().unwrap();
3800 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3801 assert!(!storage.is_encrypted());
3802 assert!(!storage.durability_capabilities().at_rest_encryption);
3803 assert!(storage.durability_capabilities().crash_recovery);
3804 assert!(!dir.path().join("keyring.json").exists());
3805 }
3806
3807 fn contains(haystack: &[u8], needle: &[u8]) -> bool {
3808 haystack.windows(needle.len()).any(|w| w == needle)
3809 }
3810
3811 /// Crash-atomicity invariant (Task 4 — completes the Task 1 single-writer
3812 /// contract). `test_crash_recovery` proves committed data survives a crash;
3813 /// this proves the other half: recovery must NOT resurrect aborted or
3814 /// in-flight (never-committed) writes. Together they assert the live
3815 /// single-writer engine's commit is atomic AND durable across a crash.
3816 #[test]
3817 fn test_crash_recovery_atomicity() {
3818 let dir = tempdir().unwrap();
3819
3820 // Phase 1: one committed write, one aborted, one in-flight at crash.
3821 {
3822 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3823 storage.set_sync_mode(2); // FULL: fsync every commit before the "crash"
3824
3825 // Committed — must survive.
3826 let t1 = storage.begin_transaction().unwrap();
3827 storage
3828 .write(t1, b"committed".to_vec(), b"durable".to_vec())
3829 .unwrap();
3830 storage.commit(t1).unwrap();
3831
3832 // Aborted — must NOT be resurrected.
3833 let t2 = storage.begin_transaction().unwrap();
3834 storage
3835 .write(t2, b"aborted".to_vec(), b"rolledback".to_vec())
3836 .unwrap();
3837 storage.abort(t2).unwrap();
3838
3839 // In-flight (never committed) at crash time — must NOT be resurrected.
3840 let t3 = storage.begin_transaction().unwrap();
3841 storage
3842 .write(t3, b"inflight".to_vec(), b"lost".to_vec())
3843 .unwrap();
3844
3845 // Simulate a crash: skip Drop / clean shutdown.
3846 std::mem::forget(storage);
3847 }
3848
3849 // Phase 2: reopen + recover; assert atomicity.
3850 {
3851 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3852 storage.recover().unwrap();
3853
3854 let t = storage.begin_transaction().unwrap();
3855 assert_eq!(
3856 storage.read(t, b"committed").unwrap(),
3857 Some(b"durable".to_vec()),
3858 "committed write must survive the crash"
3859 );
3860 assert_eq!(
3861 storage.read(t, b"aborted").unwrap(),
3862 None,
3863 "aborted write must not be resurrected by recovery"
3864 );
3865 assert_eq!(
3866 storage.read(t, b"inflight").unwrap(),
3867 None,
3868 "uncommitted in-flight write must not be resurrected by recovery"
3869 );
3870 storage.abort(t).unwrap();
3871 }
3872 }
3873
3874 #[test]
3875 fn test_scan_prefix() {
3876 let dir = tempdir().unwrap();
3877 let storage = DurableStorage::open(dir.path()).unwrap();
3878
3879 let txn = storage.begin_transaction().unwrap();
3880 storage
3881 .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3882 .unwrap();
3883 storage
3884 .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3885 .unwrap();
3886 storage
3887 .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3888 .unwrap();
3889 storage.commit(txn).unwrap();
3890
3891 let txn2 = storage.begin_transaction().unwrap();
3892 let users = storage.scan(txn2, b"user:").unwrap();
3893 assert_eq!(users.len(), 2);
3894 storage.abort(txn2).unwrap();
3895 }
3896
3897 #[test]
3898 fn test_delete() {
3899 let dir = tempdir().unwrap();
3900 let storage = DurableStorage::open(dir.path()).unwrap();
3901
3902 // Insert
3903 let t1 = storage.begin_transaction().unwrap();
3904 storage
3905 .write(t1, b"key".to_vec(), b"value".to_vec())
3906 .unwrap();
3907 storage.commit(t1).unwrap();
3908
3909 // Verify exists
3910 let t2 = storage.begin_transaction().unwrap();
3911 assert!(storage.read(t2, b"key").unwrap().is_some());
3912 storage.abort(t2).unwrap();
3913
3914 // Delete
3915 let t3 = storage.begin_transaction().unwrap();
3916 storage.delete(t3, b"key".to_vec()).unwrap();
3917 storage.commit(t3).unwrap();
3918
3919 // Verify deleted
3920 let t4 = storage.begin_transaction().unwrap();
3921 assert!(storage.read(t4, b"key").unwrap().is_none());
3922 storage.abort(t4).unwrap();
3923 }
3924
3925 #[test]
3926 fn test_gc() {
3927 let dir = tempdir().unwrap();
3928 let storage = DurableStorage::open(dir.path()).unwrap();
3929
3930 // Create multiple versions
3931 for i in 0..10 {
3932 let txn = storage.begin_transaction().unwrap();
3933 storage
3934 .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3935 .unwrap();
3936 storage.commit(txn).unwrap();
3937 }
3938
3939 // GC should reclaim old versions
3940 let gc_count = storage.gc();
3941 // At least some versions should be collected
3942 // (exact count depends on implementation)
3943 let _ = gc_count; // gc_count is usize, always >= 0
3944 }
3945
3946 #[test]
3947 fn test_group_commit() {
3948 use std::sync::Arc;
3949 use std::thread;
3950
3951 let dir = tempdir().unwrap();
3952 let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3953
3954 // Spawn multiple threads to commit concurrently
3955 let mut handles = vec![];
3956 for i in 0..4 {
3957 let storage = Arc::clone(&storage);
3958 handles.push(thread::spawn(move || {
3959 let txn = storage.begin_transaction().unwrap();
3960 storage
3961 .write(
3962 txn,
3963 format!("key{}", i).into_bytes(),
3964 format!("val{}", i).into_bytes(),
3965 )
3966 .unwrap();
3967 storage.commit(txn).unwrap()
3968 }));
3969 }
3970
3971 // Wait for all commits
3972 let mut commit_times = vec![];
3973 for h in handles {
3974 commit_times.push(h.join().unwrap());
3975 }
3976
3977 // All commits should succeed
3978 assert!(commit_times.iter().all(|&ts| ts > 0));
3979
3980 // Verify data persisted
3981 let txn = storage.begin_transaction().unwrap();
3982 for i in 0..4 {
3983 let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3984 assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3985 }
3986 storage.abort(txn).unwrap();
3987 }
3988
3989 // ==================== ArenaMvccMemTable Tests ====================
3990
3991 #[test]
3992 fn test_arena_memtable_basic_write_read() {
3993 let memtable = ArenaMvccMemTable::new();
3994
3995 // Write some values
3996 memtable
3997 .write(b"key1", Some(b"value1".to_vec()), 1)
3998 .unwrap();
3999 memtable
4000 .write(b"key2", Some(b"value2".to_vec()), 1)
4001 .unwrap();
4002
4003 // Read them back (uncommitted, so need txn_id match)
4004 assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
4005 assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
4006 assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
4007 }
4008
4009 #[test]
4010 fn test_arena_memtable_update() {
4011 let memtable = ArenaMvccMemTable::new();
4012
4013 memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
4014 memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
4015
4016 assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
4017 }
4018
4019 #[test]
4020 fn test_arena_memtable_delete() {
4021 let memtable = ArenaMvccMemTable::new();
4022
4023 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4024 memtable.write(b"key", None, 1).unwrap(); // Delete = None value
4025
4026 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4027 }
4028
4029 #[test]
4030 fn test_arena_memtable_scan_prefix() {
4031 let memtable = ArenaMvccMemTable::new();
4032
4033 memtable
4034 .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
4035 .unwrap();
4036 memtable
4037 .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
4038 .unwrap();
4039 memtable
4040 .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
4041 .unwrap();
4042 memtable
4043 .write(b"order:1", Some(b"order_data".to_vec()), 1)
4044 .unwrap();
4045
4046 // Create a write set and commit
4047 let mut write_set = HashSet::new();
4048 write_set.insert(InlineKey::from_slice(b"user:1:name"));
4049 write_set.insert(InlineKey::from_slice(b"user:1:email"));
4050 write_set.insert(InlineKey::from_slice(b"user:2:name"));
4051 write_set.insert(InlineKey::from_slice(b"order:1"));
4052 memtable.commit(1, 10, &write_set);
4053
4054 // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
4055 let results = memtable.scan_prefix(b"user:1:", 11, None);
4056 assert_eq!(results.len(), 2);
4057
4058 // Scan for all users
4059 let results = memtable.scan_prefix(b"user:", 11, None);
4060 assert_eq!(results.len(), 3);
4061 }
4062
4063 #[test]
4064 fn test_arena_memtable_write_batch() {
4065 let memtable = ArenaMvccMemTable::new();
4066
4067 let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
4068 (b"k1", Some(b"v1".to_vec())),
4069 (b"k2", Some(b"v2".to_vec())),
4070 (b"k3", Some(b"v3".to_vec())),
4071 ];
4072
4073 memtable.write_batch(&writes, 1).unwrap();
4074
4075 assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
4076 assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
4077 assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
4078 }
4079
4080 #[test]
4081 fn test_arena_memtable_gc() {
4082 let memtable = ArenaMvccMemTable::new();
4083
4084 // Write multiple versions
4085 for i in 0..10 {
4086 memtable
4087 .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
4088 .unwrap();
4089
4090 let mut write_set = HashSet::new();
4091 write_set.insert(InlineKey::from_slice(b"key"));
4092 memtable.commit(i + 1, (i + 1) * 10, &write_set);
4093 }
4094
4095 // GC old versions
4096 let gc_count = memtable.gc(90);
4097 let _ = gc_count; // gc_count is usize, always >= 0
4098 }
4099
4100 #[test]
4101 fn test_arena_memtable_size_tracking() {
4102 let memtable = ArenaMvccMemTable::new();
4103
4104 assert_eq!(memtable.size(), 0);
4105
4106 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4107
4108 assert!(memtable.size() > 0);
4109 }
4110
4111 #[test]
4112 fn test_arena_memtable_abort() {
4113 let memtable = ArenaMvccMemTable::new();
4114
4115 memtable
4116 .write(b"key", Some(b"uncommitted".to_vec()), 1)
4117 .unwrap();
4118
4119 // Visible to same txn
4120 assert_eq!(
4121 memtable.read(b"key", 0, Some(1)),
4122 Some(b"uncommitted".to_vec())
4123 );
4124
4125 // Not visible to other txns
4126 assert_eq!(memtable.read(b"key", 0, Some(2)), None);
4127
4128 // Abort
4129 memtable.abort(1);
4130
4131 // No longer visible
4132 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4133 }
4134
4135 // ========================================================================
4136 // MemTableKind Tests - Unified Abstraction
4137 // ========================================================================
4138
4139 #[test]
4140 fn test_memtable_kind_standard() {
4141 let memtable = MemTableKind::new(MemTableType::Standard, true);
4142 assert_eq!(memtable.kind(), MemTableType::Standard);
4143
4144 // Write and read
4145 memtable
4146 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4147 .unwrap();
4148
4149 // Commit transaction at ts=100
4150 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4151 memtable.commit(1, 100, &write_set);
4152
4153 // Read after commit - snapshot_ts must be > commit_ts for visibility
4154 let v = memtable.read(b"key1", 101, None);
4155 assert_eq!(v, Some(b"value1".to_vec()));
4156 }
4157
4158 #[test]
4159 fn test_memtable_kind_arena() {
4160 let memtable = MemTableKind::new(MemTableType::Arena, true);
4161 assert_eq!(memtable.kind(), MemTableType::Arena);
4162
4163 // Write and read
4164 memtable
4165 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4166 .unwrap();
4167
4168 // Commit at ts=100
4169 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4170 memtable.commit(1, 100, &write_set);
4171
4172 // Read after commit - snapshot_ts > commit_ts
4173 let v = memtable.read(b"key1", 101, None);
4174 assert_eq!(v, Some(b"value1".to_vec()));
4175 }
4176
4177 #[test]
4178 fn test_memtable_kind_scan_range() {
4179 // Test both implementations have consistent behavior
4180 for kind in [MemTableType::Standard, MemTableType::Arena] {
4181 let memtable = MemTableKind::new(kind, true);
4182
4183 // Write some data
4184 for i in 0..5 {
4185 let key = format!("key{}", i);
4186 let value = format!("value{}", i);
4187 memtable
4188 .write(key.into_bytes(), Some(value.into_bytes()), 1)
4189 .unwrap();
4190 }
4191
4192 // Commit all at ts=100
4193 let write_set: HashSet<InlineKey> = (0..5)
4194 .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
4195 .collect();
4196 memtable.commit(1, 100, &write_set);
4197
4198 // Scan range with snapshot_ts > commit_ts
4199 let results = memtable.scan_range(b"key1", b"key4", 101, None);
4200 assert_eq!(
4201 results.len(),
4202 3,
4203 "kind={:?} should have 3 results (key1, key2, key3)",
4204 kind
4205 );
4206 }
4207 }
4208
4209 #[test]
4210 fn test_durable_storage_arena() {
4211 let dir = tempdir().unwrap();
4212 let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
4213
4214 assert_eq!(storage.memtable_type(), MemTableType::Arena);
4215
4216 // Basic transaction should work the same
4217 let txn_id = storage.begin_transaction().unwrap();
4218 storage
4219 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
4220 .unwrap();
4221 storage.commit(txn_id).unwrap();
4222
4223 let txn2 = storage.begin_transaction().unwrap();
4224 let v = storage.read(txn2, b"key1").unwrap();
4225 assert_eq!(v, Some(b"value1".to_vec()));
4226 storage.abort(txn2).unwrap();
4227 }
4228
4229 #[test]
4230 fn test_durable_storage_full_config() {
4231 let dir = tempdir().unwrap();
4232
4233 // Test with Arena and ordered index enabled
4234 let storage =
4235 DurableStorage::open_with_full_config(dir.path(), true, MemTableType::Arena).unwrap();
4236
4237 assert_eq!(storage.memtable_type(), MemTableType::Arena);
4238
4239 // Write multiple keys
4240 let txn = storage.begin_transaction().unwrap();
4241 for i in 0..10 {
4242 let key = format!("key{:02}", i);
4243 let value = format!("value{}", i);
4244 storage
4245 .write(txn, key.into_bytes(), value.into_bytes())
4246 .unwrap();
4247 }
4248 storage.commit(txn).unwrap();
4249
4250 // Scan should work (uses scan method for prefix)
4251 let txn2 = storage.begin_transaction().unwrap();
4252 let results = storage.scan(txn2, b"key0").unwrap();
4253 assert_eq!(results.len(), 10); // key00 through key09
4254 storage.abort(txn2).unwrap();
4255 }
4256}