sochdb_storage/durable_storage.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! Wires together the live storage components into a durable engine:
21//!
22//! - WAL (txn_wal.rs) for crash-consistent durability + recovery
23//! - Group Commit for throughput
24//! - MVCC for isolation
25//! - LSCS for columnar efficiency
26//!
27//! Truth-in-capabilities: the live path provides crash-consistent WAL recovery,
28//! but NOT at-rest encryption, point-in-time recovery, ARIES checkpointing, or
29//! WAL fencing — those modules exist but are quarantined behind the empty,
30//! non-default `experimental` feature and are unwired. Query
31//! [`crate::durability_capabilities`] rather than relying on prose like
32//! "production-grade".
33//!
34//! ## Architecture
35//!
36//! ```text
37//! ┌─────────────────────────────────────────────────────────────────┐
38//! │ DurableStorage │
39//! ├─────────────────────────────────────────────────────────────────┤
40//! │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
41//! │ │ MvccManager │ │ GroupCommit │───▶│ TxnWal (fsync) │ │
42//! │ │ │ │ │ └─────────────────────┘ │
43//! │ │ ┌─────────┐ │ └─────────────┘ │
44//! │ │ │Snapshots│ │ │
45//! │ │ └─────────┘ │ ┌─────────────────────────────────────────┐│
46//! │ │ ┌─────────┐ │ │ MemTable ││
47//! │ │ │ Txn Map │ │ │ (key → (value, txn_id, version)) ││
48//! │ │ └─────────┘ │ └─────────────────────────────────────────┘│
49//! │ └─────────────┘ │
50//! │ ┌─────────────────────────────────────────┐│
51//! │ │ LSCS (SST) ││
52//! │ │ Immutable columnar segments ││
53//! │ └─────────────────────────────────────────┘│
54//! └─────────────────────────────────────────────────────────────────┘
55//! ```
56//!
57//! ## Concurrency
58//!
59//! - Writers: Serialize through WAL, use MVCC for conflict detection
60//! - Readers: Lock-free reads at snapshot timestamp
61//! - Commits: Batched through GroupCommit for throughput
62//!
63//! ## Isolation Contract
64//!
65//! The live write path (`MvccManager`, used by `DurableStorage`) provides
66//! **Serializable Snapshot Isolation (SSI)**, which is strictly stronger than
67//! plain Snapshot Isolation (SI):
68//!
69//! - **Snapshot reads.** Every transaction reads from a consistent snapshot
70//! fixed at `begin_transaction()` (its `snapshot_ts`). Concurrent commits are
71//! invisible to it — see `test_snapshot_isolation`. This alone is SI and is
72//! vulnerable to **write skew** (two transactions each read a set the other
73//! writes, both commit, and no serial order reproduces the result).
74//! - **Write-skew prevention.** On commit, `MvccManager::validate_ssi` inspects
75//! recently-committed concurrent transactions for rw-antidependency edges:
76//! an inbound edge (another txn wrote a key we read, `T_other →rw→ T_me`) and
77//! an outbound edge (we wrote a key another txn read, `T_me →rw→ T_other`).
78//! When a transaction sits on **both** an inbound and an outbound rw-edge it
79//! is the pivot of a potential dependency cycle (Cahill/Fekete "dangerous
80//! structure"), so it is aborted. This is the conservative safe subset of the
81//! SSI test: it may abort some serializable schedules (false positives) but
82//! **never admits a non-serializable one** (no false negatives), so the
83//! externally observable isolation level is Serializable.
84//! - **Read-only transactions** (`begin_read_only`) skip read-set tracking and
85//! never participate in validation; they always observe a serializable
86//! snapshot and never abort.
87//!
88//! ### MVCC garbage collection is snapshot-safe
89//!
90//! Old versions are pruned by `DurableStorage::gc()`, which feeds the
91//! **low-water-mark** `MvccManager::min_active_snapshot()` — the minimum
92//! `snapshot_ts` across all still-active transactions — into
93//! `MvccMemTable::gc()` and then `BinarySearchChain::gc_by_ts()`. The chain
94//! retains every version with `commit_ts > min_active_ts` **plus one anchor
95//! version at or below the watermark**, so any in-flight reader can still
96//! resolve the correct version for its snapshot. A version is only freed once
97//! **no active snapshot can observe it**. The watermark is recomputed on every
98//! `begin`/`commit`/`abort`, so it is monotonic with respect to the oldest live
99//! reader. See `test_gc_preserves_versions_for_active_snapshot`.
100
101use std::collections::HashSet;
102use std::path::{Path, PathBuf};
103use std::sync::Arc;
104use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
105
106use dashmap::DashMap;
107use smallvec::SmallVec;
108
109use crossbeam_skiplist::SkipMap;
110
111use crate::DurabilityCapabilities;
112use crate::deferred_index::{DeferredIndexConfig, DeferredSortedIndex};
113use crate::encryption::EncryptionKey;
114use crate::group_commit::EventDrivenGroupCommit;
115use crate::keyring;
116use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
117use sochdb_core::version_chain::{
118 BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
119 VisibilityContext, WriteConflictDetection,
120};
121use sochdb_core::{Result, SochDBError};
122
123// =============================================================================
124// SSI Bloom Filter - Fast Conflict Pre-Filtering
125// =============================================================================
126
127/// Space-efficient Bloom filter for SSI conflict detection
128///
129/// Used to quickly determine if two transactions MIGHT have conflicting keys.
130/// False positives are acceptable (leads to unnecessary exact checks),
131/// but false negatives are not allowed.
132///
133/// ## Configuration
134///
135/// For 1000 keys with 1% false positive rate:
136/// - m = ~9600 bits ≈ 1.2 KB per transaction
137/// - k = 7 hash functions
138///
139/// ## Lazy Initialization
140///
141/// The bit vector is lazily initialized on first insert to avoid
142/// allocation overhead for read-only transactions.
143#[derive(Clone, Debug)]
144pub struct SsiBloomFilter {
145 /// Bit vector (each u64 holds 64 bits) - lazily initialized
146 bits: Option<Vec<u64>>,
147 /// Expected capacity (used for lazy init sizing)
148 expected_capacity: usize,
149 /// Number of hash functions to use
150 num_hashes: u32,
151}
152
153impl SsiBloomFilter {
154 /// Optimal number of bits per item for 1% false positive rate
155 /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
156 const BITS_PER_ITEM: f64 = 9.6;
157
158 /// Optimal number of hash functions for 1% false positive rate
159 /// k = (m/n) × ln(2) ≈ 7
160 const DEFAULT_NUM_HASHES: u32 = 7;
161
162 /// Minimum capacity to avoid tiny filters
163 const MIN_CAPACITY: usize = 64;
164
165 /// Create a new bloom filter for expected item count (lazy allocation)
166 ///
167 /// Configured for ~1% false positive rate.
168 /// The bit vector is not allocated until first insert.
169 #[inline]
170 pub fn new(expected_items: usize) -> Self {
171 Self {
172 bits: None,
173 expected_capacity: expected_items.max(Self::MIN_CAPACITY),
174 num_hashes: Self::DEFAULT_NUM_HASHES,
175 }
176 }
177
178 /// Create with specific capacity in words (for memory-constrained scenarios)
179 pub fn with_word_capacity(words: usize) -> Self {
180 Self {
181 bits: None,
182 expected_capacity: words.max(1) * 64 / 10, // Approx items from words
183 num_hashes: Self::DEFAULT_NUM_HASHES,
184 }
185 }
186
187 /// Ensure bits are allocated (lazy initialization)
188 #[inline]
189 fn ensure_allocated(&mut self) {
190 if self.bits.is_none() {
191 let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
192 let num_words = num_bits.div_ceil(64);
193 self.bits = Some(vec![0u64; num_words]);
194 }
195 }
196
197 /// Add a key to the filter - O(k) where k = num_hashes
198 #[inline]
199 pub fn insert(&mut self, key: &[u8]) {
200 self.ensure_allocated();
201 let bits = self.bits.as_mut().unwrap();
202 let num_bits = bits.len() * 64;
203 if num_bits == 0 {
204 return;
205 }
206
207 // Use two hash functions to simulate k hash functions
208 // h(i) = h1 + i * h2 (double hashing technique)
209 let h1 = Self::hash1(key);
210 let h2 = Self::hash2(key);
211
212 for i in 0..self.num_hashes {
213 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
214 let bit_idx = (h as usize) % num_bits;
215 let word_idx = bit_idx / 64;
216 let bit_pos = bit_idx % 64;
217 bits[word_idx] |= 1 << bit_pos;
218 }
219 }
220
221 /// Check if a key might be present - O(k)
222 ///
223 /// Returns:
224 /// - false: Key is definitely NOT in the set (or filter not initialized)
225 /// - true: Key MIGHT be in the set (needs exact check)
226 #[inline]
227 pub fn may_contain(&self, key: &[u8]) -> bool {
228 let bits = match &self.bits {
229 Some(b) => b,
230 None => return false, // Uninitialized = empty
231 };
232 let num_bits = bits.len() * 64;
233 if num_bits == 0 {
234 return false;
235 }
236
237 let h1 = Self::hash1(key);
238 let h2 = Self::hash2(key);
239
240 for i in 0..self.num_hashes {
241 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
242 let bit_idx = (h as usize) % num_bits;
243 let word_idx = bit_idx / 64;
244 let bit_pos = bit_idx % 64;
245 if bits[word_idx] & (1 << bit_pos) == 0 {
246 return false; // Definitely not present
247 }
248 }
249 true // Might be present
250 }
251
252 /// Check if this filter might intersect with another
253 ///
254 /// Fast O(m/64) check using bitwise AND of all words.
255 /// If no bits are shared, sets are definitely disjoint.
256 #[inline]
257 pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
258 let (self_bits, other_bits) = match (&self.bits, &other.bits) {
259 (Some(s), Some(o)) => (s, o),
260 _ => return false, // Either uninitialized = no intersection
261 };
262 let min_len = self_bits.len().min(other_bits.len());
263 for i in 0..min_len {
264 if self_bits[i] & other_bits[i] != 0 {
265 return true; // Might intersect
266 }
267 }
268 false // Definitely disjoint
269 }
270
271 /// First hash function (using built-in hasher)
272 #[inline]
273 fn hash1(key: &[u8]) -> u64 {
274 use std::collections::hash_map::DefaultHasher;
275 use std::hash::{Hash, Hasher};
276 let mut hasher = DefaultHasher::new();
277 key.hash(&mut hasher);
278 hasher.finish()
279 }
280
281 /// Second hash function (using twox-hash for independence)
282 #[inline]
283 fn hash2(key: &[u8]) -> u64 {
284 twox_hash::xxh3::hash64(key)
285 }
286
287 /// Get the memory size in bytes
288 pub fn size_bytes(&self) -> usize {
289 self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
290 }
291
292 /// Check if the filter is empty
293 pub fn is_empty(&self) -> bool {
294 match &self.bits {
295 Some(bits) => bits.iter().all(|&w| w == 0),
296 None => true,
297 }
298 }
299}
300
301/// Type alias for inline key storage - keys up to 32 bytes stored on stack
302/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
303pub type InlineKey = SmallVec<[u8; 32]>;
304
305/// Version of a key-value pair
306#[derive(Debug, Clone)]
307pub struct Version {
308 /// The value (None = tombstone)
309 pub value: Option<Vec<u8>>,
310 /// Transaction that created this version
311 pub txn_id: u64,
312 /// Commit timestamp (0 = uncommitted)
313 pub commit_ts: u64,
314}
315
316// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
317impl ChainEntry for Version {
318 #[inline]
319 fn commit_ts(&self) -> u64 {
320 self.commit_ts
321 }
322 #[inline]
323 fn txn_id(&self) -> u64 {
324 self.txn_id
325 }
326 #[inline]
327 fn set_commit_ts(&mut self, ts: u64) {
328 self.commit_ts = ts;
329 }
330}
331
332// ============================================================================
333// Optimized VersionChain with Binary Search (Task 1: mm.md)
334// ============================================================================
335
336/// Multi-version data for a single key with O(log v) read complexity
337///
338/// ## Optimization: Binary Search with Sorted Commit Ordering
339///
340/// Separates committed versions (sorted descending by commit_ts) from
341/// uncommitted version (single optional slot per transaction).
342///
343/// **Before:** O(v) linear scan + O(v) max computation = O(v)
344/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
345///
346/// For v=10 versions: 3.3x speedup
347/// For v=100 versions: 7x speedup
348///
349/// ## Rec 11: Consolidated
350///
351/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
352/// eliminating duplication with `mvcc_concurrent::VersionChain`.
353#[derive(Debug, Default)]
354pub struct VersionChain {
355 /// Consolidated binary-search chain (Rec 11)
356 inner: BinarySearchChain<Version>,
357}
358
359impl VersionChain {
360 /// Create a new empty version chain
361 #[inline]
362 pub fn new() -> Self {
363 Self {
364 inner: BinarySearchChain::new(),
365 }
366 }
367
368 /// Add a new uncommitted version
369 /// If there's already an uncommitted version from this txn, update it in place
370 ///
371 /// O(1) - just updates the uncommitted slot
372 #[inline]
373 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
374 match self.inner.uncommitted_mut() {
375 Some(v) if v.txn_id == txn_id => {
376 // Update in place - O(1)
377 v.value = value;
378 }
379 _ => {
380 // New or different txn — set the slot
381 self.inner.set_uncommitted(Version {
382 value,
383 txn_id,
384 commit_ts: 0,
385 });
386 }
387 }
388 }
389
390 /// Commit a version - moves from uncommitted slot to sorted committed list
391 ///
392 /// O(log v) - inserts into sorted position using binary search
393 #[inline]
394 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
395 self.inner.commit(txn_id, commit_ts)
396 }
397
398 /// Abort a version (remove uncommitted version for txn)
399 ///
400 /// O(1) - just clears the uncommitted slot if it matches
401 #[inline]
402 pub fn abort(&mut self, txn_id: u64) {
403 self.inner.abort(txn_id);
404 }
405
406 /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
407 ///
408 /// ## Complexity: O(1) + O(log v) = O(log v)
409 #[inline]
410 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
411 self.inner.read_at(snapshot_ts, current_txn_id)
412 }
413
414 /// Check if there's an uncommitted version by another transaction
415 ///
416 /// O(1) - just checks the uncommitted slot
417 #[inline]
418 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
419 self.inner.has_write_conflict(my_txn_id)
420 }
421
422 /// Garbage collect old versions
423 pub fn gc(&mut self, min_active_ts: u64) {
424 self.inner.gc_by_ts(min_active_ts);
425 }
426
427 /// Get total version count (committed + uncommitted)
428 #[inline]
429 pub fn version_count(&self) -> usize {
430 self.inner.version_count()
431 }
432
433 // Legacy compatibility: get versions vec (for tests)
434 #[cfg(test)]
435 pub fn versions(&self) -> Vec<Version> {
436 let mut result = self.inner.committed_versions().to_vec();
437 if let Some(v) = self.inner.uncommitted() {
438 result.push(v.clone());
439 }
440 result
441 }
442}
443
444// =============================================================================
445// Rec 6: Unified Version Chain Trait Implementations
446// =============================================================================
447
448impl MvccVersionChain for VersionChain {
449 type Value = Option<Vec<u8>>;
450
451 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
452 // Delegate to BinarySearchChain, then project to value field
453 self.inner
454 .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
455 .map(|v| &v.value)
456 }
457
458 fn get_latest(&self) -> Option<&Self::Value> {
459 self.inner.latest().map(|v| &v.value)
460 }
461
462 fn version_count(&self) -> usize {
463 self.inner.version_count()
464 }
465}
466
467impl MvccVersionChainMut for VersionChain {
468 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
469 self.add_uncommitted(value, txn_id);
470 }
471
472 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
473 self.inner.commit(txn_id, commit_ts)
474 }
475
476 fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
477 // Insert a tombstone (None value) as uncommitted
478 self.add_uncommitted(None, txn_id);
479 true
480 }
481
482 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
483 let before = self.inner.committed_count();
484 self.inner.gc_by_ts(min_visible_ts);
485 let removed = before - self.inner.committed_count();
486 (removed, removed * std::mem::size_of::<Version>())
487 }
488}
489
490impl WriteConflictDetection for VersionChain {
491 fn has_write_conflict(&self, txn_id: TxnId) -> bool {
492 self.has_write_conflict(txn_id)
493 }
494}
495
496// =============================================================================
497// Pre-sizing Constants to Avoid HashSet Resize Overhead
498// =============================================================================
499
500/// Default capacity for write_set HashSet
501/// Sized for typical OLTP transactions (10-50 keys)
502/// Avoids resize overhead that caused +11% regression
503const WRITE_SET_INITIAL_CAPACITY: usize = 32;
504
505/// Default capacity for read_set HashSet
506/// Typically larger than write_set due to read-heavy patterns
507const READ_SET_INITIAL_CAPACITY: usize = 64;
508
509/// Transaction state for MVCC
510#[derive(Debug, Clone)]
511pub struct MvccTransaction {
512 /// Transaction ID
513 pub txn_id: u64,
514 /// Snapshot timestamp (reads see commits before this)
515 pub snapshot_ts: u64,
516 /// Keys written by this transaction - uses SmallVec for inline storage
517 /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
518 pub write_set: HashSet<InlineKey>,
519 /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
520 /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
521 pub read_set: HashSet<InlineKey>,
522 /// Bloom filter for write set - fast SSI pre-filtering
523 pub write_bloom: SsiBloomFilter,
524 /// Bloom filter for read set - fast SSI pre-filtering
525 pub read_bloom: SsiBloomFilter,
526 /// Transaction state
527 pub state: TxnState,
528 /// Transaction mode for SSI optimization (Recommendation 9)
529 /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
530 pub mode: TransactionMode,
531}
532
533impl MvccTransaction {
534 /// Create a new transaction with pre-sized collections
535 ///
536 /// This avoids HashSet resize overhead during the transaction
537 /// which was causing +11% regression on write_set.insert().
538 #[inline]
539 pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
540 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
541 }
542
543 /// Create a read-only transaction (SSI bypass - 2.6x faster)
544 ///
545 /// Read-only transactions skip all SSI tracking:
546 /// - No read_set allocation
547 /// - No read_bloom allocation
548 /// - No commit validation
549 ///
550 /// ## Performance
551 ///
552 /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
553 #[inline]
554 pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
555 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
556 }
557
558 /// Create a write-only transaction (partial SSI bypass)
559 ///
560 /// Write-only transactions skip read tracking:
561 /// - No read_set tracking
562 /// - No read_bloom inserts
563 /// - Still needs write_set for commit
564 #[inline]
565 pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
566 Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
567 }
568
569 /// Create transaction with specific mode
570 #[inline]
571 pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
572 // Optimize allocation based on mode
573 let (write_capacity, read_capacity) = match mode {
574 TransactionMode::ReadOnly => (0, 0), // No tracking needed
575 TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
576 TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
577 };
578 Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
579 }
580
581 /// Create with custom capacities for expected workload
582 ///
583 /// Use this when you know the transaction will write many keys
584 /// to avoid resize overhead entirely.
585 #[inline]
586 pub fn with_capacity(
587 txn_id: u64,
588 snapshot_ts: u64,
589 write_capacity: usize,
590 read_capacity: usize,
591 mode: TransactionMode,
592 ) -> Self {
593 Self {
594 txn_id,
595 snapshot_ts,
596 write_set: HashSet::with_capacity(write_capacity),
597 read_set: HashSet::with_capacity(read_capacity),
598 write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
599 read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
600 state: TxnState::Active,
601 mode,
602 }
603 }
604
605 /// Check if this is a read-only transaction
606 #[inline]
607 pub fn is_read_only(&self) -> bool {
608 self.write_set.is_empty()
609 }
610
611 /// Check if this is a single-key write transaction
612 #[inline]
613 pub fn is_single_key_write(&self) -> bool {
614 self.write_set.len() == 1 && self.read_set.len() <= 1
615 }
616}
617
618/// Transaction state
619#[derive(Debug, Clone, Copy, PartialEq, Eq)]
620pub enum TxnState {
621 Active,
622 Committed,
623 Aborted,
624}
625
626// =============================================================================
627// Transaction Mode for SSI Bypass (Recommendation 9)
628// =============================================================================
629
630/// Transaction mode for SSI optimization
631///
632/// By classifying transactions at begin time, we can skip expensive SSI
633/// tracking for the majority of transactions:
634///
635/// | Mode | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
636/// |-----------|-------------------|--------------------|-----------------|
637/// | ReadOnly | None | None | ~10 ns |
638/// | WriteOnly | None | Full | ~30 ns |
639/// | ReadWrite | Full | Full | ~50 ns |
640///
641/// ## Performance Analysis
642///
643/// For read-only transactions (typically 90% of workload):
644/// ```text
645/// Current: T_txn = T_begin + N × (T_read + T_record) + T_commit
646/// = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
647///
648/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
649/// = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
650///
651/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
652/// ```
653#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
654pub enum TransactionMode {
655 /// Read-only transaction - skips ALL SSI tracking
656 /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
657 /// Safe to skip read_set, read_bloom, and commit validation entirely
658 ReadOnly,
659
660 /// Write-only transaction - skips read tracking
661 /// Cannot form incoming rw-edges (no reads from concurrent writers)
662 /// Only needs write_set and write_bloom tracking
663 WriteOnly,
664
665 /// Full read-write transaction (default) - complete SSI tracking
666 /// May form both incoming and outgoing rw-edges
667 /// Requires full validation at commit time
668 #[default]
669 ReadWrite,
670}
671
672impl TransactionMode {
673 /// Check if this mode requires read tracking
674 #[inline]
675 pub fn tracks_reads(&self) -> bool {
676 matches!(self, TransactionMode::ReadWrite)
677 }
678
679 /// Check if this mode requires write tracking
680 #[inline]
681 pub fn tracks_writes(&self) -> bool {
682 matches!(
683 self,
684 TransactionMode::WriteOnly | TransactionMode::ReadWrite
685 )
686 }
687
688 /// Check if commit needs SSI validation
689 #[inline]
690 pub fn needs_ssi_validation(&self) -> bool {
691 matches!(self, TransactionMode::ReadWrite)
692 }
693}
694
695/// SSI conflict edge type
696#[derive(Debug, Clone, Copy, PartialEq, Eq)]
697pub enum ConflictType {
698 /// Read-write conflict: T1 reads X, then T2 writes X
699 ReadWrite,
700 /// Write-read conflict: T1 writes X, then T2 reads X
701 WriteRead,
702}
703
704/// SSI conflict edge for dangerous structure detection
705#[derive(Debug, Clone)]
706pub struct ConflictEdge {
707 /// Source transaction
708 pub from_txn: u64,
709 /// Target transaction
710 pub to_txn: u64,
711 /// Type of conflict
712 pub conflict_type: ConflictType,
713}
714
715/// MVCC Manager with SSI support
716///
717/// Uses DashMap for lock-free per-transaction access.
718/// Implements Serializable Snapshot Isolation (SSI) with
719/// dangerous structure detection for rw-antidependency cycles.
720#[allow(clippy::type_complexity)]
721pub struct MvccManager {
722 /// Active transactions (sharded for concurrent access)
723 active_txns: DashMap<u64, MvccTransaction>,
724 /// Current timestamp counter
725 ts_counter: AtomicU64,
726 /// Minimum active snapshot timestamp (for GC)
727 min_active_ts: AtomicU64,
728 /// Recently committed transactions for SSI validation
729 /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
730 /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
731 recent_commits: DashMap<
732 u64,
733 (
734 u64,
735 SsiBloomFilter,
736 SsiBloomFilter,
737 HashSet<InlineKey>,
738 HashSet<InlineKey>,
739 ),
740 >,
741 /// Max recent commits to track
742 max_recent_commits: usize,
743}
744
745impl Default for MvccManager {
746 fn default() -> Self {
747 Self::new()
748 }
749}
750
751impl MvccManager {
752 pub fn new() -> Self {
753 Self {
754 active_txns: DashMap::new(),
755 ts_counter: AtomicU64::new(1),
756 min_active_ts: AtomicU64::new(0),
757 recent_commits: DashMap::new(),
758 max_recent_commits: 1000, // Track last 1000 commits for SSI
759 }
760 }
761
762 /// Begin a new transaction with snapshot isolation
763 ///
764 /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
765 pub fn begin(&self, txn_id: u64) -> MvccTransaction {
766 self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
767 }
768
769 /// Begin a read-only transaction (SSI bypass - 2.6x faster)
770 ///
771 /// Read-only transactions skip all SSI tracking, reducing
772 /// per-read overhead from ~82ns to ~32ns.
773 ///
774 /// ## Safety
775 ///
776 /// Caller must ensure no writes are performed. Attempting to
777 /// write in a read-only transaction will still succeed but
778 /// won't be tracked for SSI validation.
779 #[inline]
780 pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
781 self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
782 }
783
784 /// Begin a write-only transaction (partial SSI bypass)
785 ///
786 /// Write-only transactions skip read tracking, reducing overhead
787 /// for insert-heavy workloads.
788 #[inline]
789 pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
790 self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
791 }
792
793 /// Begin a transaction with specific mode
794 ///
795 /// This is the core transaction creation method that all other
796 /// begin_* methods delegate to.
797 pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
798 let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
799
800 // Create transaction with mode-optimized allocations
801 let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
802
803 self.active_txns.insert(txn_id, txn.clone());
804 self.update_min_active_ts();
805
806 txn
807 }
808
809 /// Get transaction if active (clones - use get_snapshot_ts for hot path)
810 pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
811 self.active_txns.get(&txn_id).map(|t| t.clone())
812 }
813
814 /// Fast path: get just the snapshot timestamp without cloning
815 /// This is the hot path for reads - avoids cloning bloom filters
816 #[inline]
817 pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
818 self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
819 }
820
821 /// Record a read (for SSI) - uses inline key storage + bloom filter
822 ///
823 /// ## SSI Bypass (Recommendation 9)
824 ///
825 /// For ReadOnly mode transactions, this is a no-op (instant return).
826 /// For WriteOnly mode transactions, this is a no-op.
827 /// Only ReadWrite mode transactions track reads for SSI.
828 ///
829 /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
830 #[inline]
831 pub fn record_read(&self, txn_id: u64, key: &[u8]) {
832 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
833 // SSI Bypass: Skip tracking for read-only and write-only modes
834 if !txn.mode.tracks_reads() {
835 return;
836 }
837
838 // Only track reads if within reasonable bounds
839 if txn.read_set.len() < 10000 {
840 txn.read_set.insert(SmallVec::from_slice(key));
841 txn.read_bloom.insert(key);
842 }
843 }
844 }
845
846 /// Record a write - uses inline key storage + bloom filter
847 ///
848 /// Note: Even ReadOnly transactions can record writes (mode is a hint).
849 /// The mode only affects SSI tracking, not write capability.
850 pub fn record_write(&self, txn_id: u64, key: &[u8]) {
851 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
852 txn.write_set.insert(SmallVec::from_slice(key));
853 txn.write_bloom.insert(key);
854 }
855 }
856
857 /// Allocate commit timestamp
858 pub fn alloc_commit_ts(&self) -> u64 {
859 self.ts_counter.fetch_add(1, Ordering::SeqCst)
860 }
861
862 /// Commit transaction with SSI validation
863 /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
864 /// Returns None if SSI validation fails (dangerous structure detected)
865 ///
866 /// ## SSI Bypass (Recommendation 9)
867 ///
868 /// For ReadOnly mode: Skip validation entirely (~10ns commit)
869 /// For WriteOnly mode: Skip read-based validation (~30ns commit)
870 /// For ReadWrite mode: Full validation (~50ns commit)
871 pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
872 // Get transaction before removing
873 let txn = self.active_txns.get(&txn_id)?.clone();
874
875 // SSI Bypass: Skip validation for ReadOnly transactions
876 // ReadOnly can never form rw-antidependency cycles
877 if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
878 // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
879 // For ReadWrite: check SSI validation result
880 if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
881 // Abort on SSI violation
882 self.active_txns.remove(&txn_id);
883 self.update_min_active_ts();
884 return None;
885 }
886 }
887
888 let commit_ts = self.alloc_commit_ts();
889
890 // Extract write_set and remove transaction - takes ownership
891 let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
892
893 // OPTIMIZATION: Only track ReadWrite transactions for SSI
894 // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
895 let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
896 && !removed_txn.read_set.is_empty()
897 && !removed_txn.write_set.is_empty();
898
899 if needs_ssi_tracking {
900 // Need to clone write_set since we return it AND track it
901 let write_set_for_return = removed_txn.write_set.clone();
902
903 self.track_commit_owned(
904 txn_id,
905 commit_ts,
906 removed_txn.read_bloom,
907 removed_txn.write_bloom,
908 removed_txn.read_set,
909 removed_txn.write_set,
910 );
911
912 self.update_min_active_ts();
913 Some((commit_ts, write_set_for_return))
914 } else {
915 // Fast path: no SSI tracking needed, avoid clone entirely
916 self.update_min_active_ts();
917 Some((commit_ts, removed_txn.write_set))
918 }
919 }
920
921 /// Validate SSI constraints for a committing transaction
922 ///
923 /// ## Transaction Classification (Task 3: Optimistic MVCC)
924 ///
925 /// Transactions are classified and routed through appropriate fast paths:
926 ///
927 /// | Class | Criteria | Validation Cost |
928 /// |------------|-------------------------------|-----------------|
929 /// | ReadOnly | write_set.is_empty() | 0 ns |
930 /// | SingleKey | write_set.len() == 1 | 0 ns |
931 /// | Disjoint | bloom filters don't intersect | ~10 ns |
932 /// | General | full SSI check | ~50 ns |
933 ///
934 /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
935 /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
936 ///
937 /// Detects "dangerous structures" - rw-antidependency cycles:
938 /// - T1 reads X (snapshot sees old value)
939 /// - T2 writes X (concurrent write)
940 /// - T2 reads Y (snapshot sees old value)
941 /// - T1 writes Y (concurrent write)
942 ///
943 /// If T1 → rw → T2 → rw → T1 exists, abort T1
944 #[inline]
945 fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
946 // =================================================================
947 // Fast Path 1: Read-only transactions (0 ns)
948 // =================================================================
949 // Read-only transactions can never form rw-antidependency cycles
950 // because they have no writes to create outgoing rw-edges
951 if txn.write_set.is_empty() {
952 return true;
953 }
954
955 // =================================================================
956 // Fast Path 2: No recent commits to check (0 ns)
957 // =================================================================
958 if self.recent_commits.is_empty() {
959 return true;
960 }
961
962 // =================================================================
963 // Fast Path 3: Single-key write transactions (0 ns)
964 // =================================================================
965 // A single-key write transaction cannot form a dangerous cycle:
966 // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
967 // AND T2 to read what T1 wrote
968 // - With only one key in write_set, the same key would need to be
969 // in both read_set AND write_set of both transactions
970 // - This is already prevented by our conflict detection (write-write)
971 if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
972 return true;
973 }
974
975 let my_snapshot = txn.snapshot_ts;
976
977 // =================================================================
978 // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
979 // =================================================================
980 // Pre-filter using bloom filters: if our write_bloom doesn't intersect
981 // with any concurrent transaction's read_bloom AND vice versa,
982 // there can be no rw-antidependency
983 let mut any_may_intersect = false;
984 for entry in self.recent_commits.iter() {
985 let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
986
987 // Only check concurrent transactions.
988 //
989 // A committed transaction is *concurrent* with us iff its writes are
990 // invisible to our snapshot. Read visibility is strict
991 // (`commit_ts < snapshot_ts`), so a transaction with
992 // `commit_ts >= my_snapshot` is invisible and must be validated
993 // against. Using `<` here (not `<=`) keeps this window consistent
994 // with `read_at`; a `<=` would skip a boundary transaction whose
995 // `commit_ts == my_snapshot` and miss a genuine write-skew.
996 if *other_commit_ts < my_snapshot {
997 continue;
998 }
999
1000 // Check bloom filter intersection (O(m/64) per filter)
1001 // If our writes may intersect their reads OR their writes may intersect our reads
1002 if txn.write_bloom.may_intersect(other_read_bloom)
1003 || other_write_bloom.may_intersect(&txn.read_bloom)
1004 {
1005 any_may_intersect = true;
1006 break;
1007 }
1008 }
1009
1010 // No bloom intersection means definitely disjoint - no SSI conflict possible
1011 if !any_may_intersect {
1012 return true;
1013 }
1014
1015 // =================================================================
1016 // Full SSI Validation (~50 ns)
1017 // =================================================================
1018 // Check for rw-conflicts with recently committed transactions
1019 // An rw-conflict exists if:
1020 // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
1021 // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
1022
1023 let mut in_conflict_with: Vec<u64> = Vec::new();
1024 let mut out_conflict_to: Vec<u64> = Vec::new();
1025
1026 for entry in self.recent_commits.iter() {
1027 let (
1028 other_txn_id,
1029 (
1030 other_commit_ts,
1031 _other_read_bloom,
1032 other_write_bloom,
1033 other_read_set,
1034 other_write_set,
1035 ),
1036 ) = entry.pair();
1037
1038 // Only consider transactions concurrent with us: those whose writes
1039 // are invisible to our snapshot. Read visibility is strict
1040 // (`commit_ts < snapshot_ts`), so `commit_ts >= my_snapshot` means
1041 // concurrent. `<` (not `<=`) keeps this consistent with `read_at`.
1042 if *other_commit_ts < my_snapshot {
1043 continue;
1044 }
1045
1046 // Check: other wrote → we read (other →rw→ me)
1047 // T_other wrote a key that T_me read (rw-dependency inbound)
1048 //
1049 // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
1050 // Only do expensive HashSet intersection if bloom says "maybe conflict"
1051 let mut has_in_conflict = false;
1052 for key in txn.read_set.iter() {
1053 if other_write_bloom.may_contain(key) {
1054 // Bloom says maybe - do exact check
1055 if other_write_set.contains(key) {
1056 has_in_conflict = true;
1057 break;
1058 }
1059 }
1060 }
1061 if has_in_conflict {
1062 in_conflict_with.push(*other_txn_id);
1063 }
1064
1065 // Check: we wrote → other read (me →rw→ other)
1066 // T_me wrote a key that T_other read (rw-dependency outbound)
1067 //
1068 // Bloom-accelerated: Use our write_bloom against their read_set
1069 let mut has_out_conflict = false;
1070 for key in other_read_set.iter() {
1071 if txn.write_bloom.may_contain(key) {
1072 // Bloom says maybe - do exact check
1073 if txn.write_set.contains(key) {
1074 has_out_conflict = true;
1075 break;
1076 }
1077 }
1078 }
1079 if has_out_conflict {
1080 out_conflict_to.push(*other_txn_id);
1081 }
1082 }
1083
1084 // Dangerous structure: we have both incoming AND outgoing rw-edges
1085 // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1086 //
1087 // Conservative check: if both exist, abort
1088 // A more precise check would verify the cycle path, but this is safe
1089 if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1090 return false; // SSI violation - abort
1091 }
1092
1093 true
1094 }
1095
1096 /// Track a committed transaction for future SSI validation
1097 ///
1098 /// Only tracks transactions that have both reads AND writes, since SSI
1099 /// only detects rw-antidependency cycles. Pure read or pure write
1100 /// transactions can't form cycles.
1101 ///
1102 /// ## Optimization: Zero-Copy Transfer
1103 ///
1104 /// Takes ownership of sets instead of cloning to avoid the +15% commit
1105 /// phase regression. The caller should use mem::take() to transfer ownership.
1106 fn track_commit_owned(
1107 &self,
1108 txn_id: u64,
1109 commit_ts: u64,
1110 read_bloom: SsiBloomFilter,
1111 write_bloom: SsiBloomFilter,
1112 read_set: HashSet<InlineKey>,
1113 write_set: HashSet<InlineKey>,
1114 ) {
1115 // Optimization: Only track mixed read-write transactions
1116 // Pure reads can't create outgoing rw-edges
1117 // Pure writes can't create incoming rw-edges
1118 if read_set.is_empty() || write_set.is_empty() {
1119 return; // Skip tracking - can't form SSI cycle
1120 }
1121
1122 // Add to recent commits with bloom filters for fast SSI pre-filtering
1123 // No cloning needed - we take ownership
1124 self.recent_commits.insert(
1125 txn_id,
1126 (commit_ts, read_bloom, write_bloom, read_set, write_set),
1127 );
1128
1129 // Lazy pruning: only prune when we're significantly over capacity
1130 // Avoids pruning overhead on every commit
1131 if self.recent_commits.len() > self.max_recent_commits * 2 {
1132 // Remove entries with lowest commit_ts
1133 let min_active = self.min_active_ts.load(Ordering::Relaxed);
1134 self.recent_commits
1135 .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1136 }
1137 }
1138
1139 /// Legacy track_commit that clones - kept for compatibility
1140 #[allow(dead_code)]
1141 fn track_commit(
1142 &self,
1143 txn_id: u64,
1144 commit_ts: u64,
1145 read_bloom: SsiBloomFilter,
1146 write_bloom: SsiBloomFilter,
1147 read_set: &HashSet<InlineKey>,
1148 write_set: &HashSet<InlineKey>,
1149 ) {
1150 if read_set.is_empty() || write_set.is_empty() {
1151 return;
1152 }
1153 self.recent_commits.insert(
1154 txn_id,
1155 (
1156 commit_ts,
1157 read_bloom,
1158 write_bloom,
1159 read_set.clone(),
1160 write_set.clone(),
1161 ),
1162 );
1163 }
1164
1165 /// Abort transaction
1166 pub fn abort(&self, txn_id: u64) {
1167 self.active_txns.remove(&txn_id);
1168 self.update_min_active_ts();
1169 }
1170
1171 /// Get minimum active snapshot timestamp
1172 pub fn min_active_snapshot(&self) -> u64 {
1173 self.min_active_ts.load(Ordering::SeqCst)
1174 }
1175
1176 /// Get count of active transactions
1177 pub fn active_transaction_count(&self) -> usize {
1178 self.active_txns.len()
1179 }
1180
1181 fn update_min_active_ts(&self) {
1182 let min = self
1183 .active_txns
1184 .iter()
1185 .map(|entry| entry.value().snapshot_ts)
1186 .min()
1187 .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1188 self.min_active_ts.store(min, Ordering::SeqCst);
1189 }
1190}
1191
1192/// Epoch-based dirty list for O(expired) GC instead of O(n)
1193///
1194/// Instead of scanning ALL version chains, we track which keys have versions
1195/// created in each epoch. GC only needs to visit keys from old epochs.
1196struct EpochDirtyList {
1197 /// Ring buffer of epoch -> dirty keys
1198 /// Index = epoch % EPOCH_RING_SIZE
1199 epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1200 /// Current epoch
1201 current_epoch: AtomicU64,
1202}
1203
1204const EPOCH_RING_SIZE: usize = 4;
1205
1206impl EpochDirtyList {
1207 fn new() -> Self {
1208 Self {
1209 epochs: [
1210 parking_lot::Mutex::new(Vec::new()),
1211 parking_lot::Mutex::new(Vec::new()),
1212 parking_lot::Mutex::new(Vec::new()),
1213 parking_lot::Mutex::new(Vec::new()),
1214 ],
1215 current_epoch: AtomicU64::new(0),
1216 }
1217 }
1218
1219 /// Record a version created in the current epoch
1220 #[inline]
1221 fn record_version(&self, key: Vec<u8>) {
1222 let epoch = self.current_epoch.load(Ordering::Relaxed);
1223 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1224 self.epochs[idx].lock().push(key);
1225 }
1226
1227 /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1228 ///
1229 /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1230 /// For 100 writes: ~100x fewer mutex operations.
1231 #[inline]
1232 fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1233 let epoch = self.current_epoch.load(Ordering::Relaxed);
1234 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1235 let mut guard = self.epochs[idx].lock();
1236 guard.extend(keys);
1237 }
1238
1239 /// Advance to next epoch, returning old epoch's dirty keys
1240 fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1241 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1242 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1243
1244 // Drain the old epoch's dirty list
1245 let mut guard = self.epochs[old_idx].lock();
1246 let keys = std::mem::take(&mut *guard);
1247 (old_epoch, keys)
1248 }
1249
1250 /// Get current epoch
1251 #[allow(dead_code)]
1252 fn current(&self) -> u64 {
1253 self.current_epoch.load(Ordering::Relaxed)
1254 }
1255}
1256
1257// ============================================================================
1258// Streaming Scan Iterator
1259// ============================================================================
1260
1261/// Streaming iterator for range scans
1262///
1263/// Yields results one at a time without materializing the full result set.
1264/// This enables processing of very large result sets with O(1) memory per
1265/// iteration instead of O(N) for the entire result set.
1266struct ScanRangeIterator<'a> {
1267 memtable: &'a MvccMemTable,
1268 start: Vec<u8>,
1269 end: Vec<u8>,
1270 snapshot_ts: u64,
1271 current_txn_id: Option<u64>,
1272 use_ordered: bool,
1273 // We use Option to defer initialization
1274 ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1275 unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1276 initialized: bool,
1277}
1278
1279impl<'a> Iterator for ScanRangeIterator<'a> {
1280 type Item = (Vec<u8>, Vec<u8>);
1281
1282 fn next(&mut self) -> Option<Self::Item> {
1283 // Lazy initialization on first call
1284 if !self.initialized {
1285 self.initialized = true;
1286
1287 if self.use_ordered {
1288 // Try deferred index first (after compaction, it uses a SkipMap internally)
1289 if let Some(ref def_idx) = self.memtable.deferred_index {
1290 let start = self.start.clone();
1291 let end = self.end.clone();
1292 let snapshot_ts = self.snapshot_ts;
1293 let current_txn_id = self.current_txn_id;
1294 let data = &self.memtable.data;
1295
1296 // Collect keys from deferred index (already sorted after compact)
1297 let keys: Vec<Vec<u8>> = if end.is_empty() {
1298 def_idx.range_from(&start).collect()
1299 } else {
1300 def_idx.range(&start, &end).collect()
1301 };
1302
1303 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1304 Box::new(keys.into_iter().filter_map(move |key| {
1305 if let Some(chain) = data.get(&key)
1306 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1307 && let Some(value) = &v.value
1308 {
1309 Some((key, value.clone()))
1310 } else {
1311 None
1312 }
1313 }));
1314 self.ordered_iter = Some(iter);
1315 } else if let Some(ref idx) = self.memtable.ordered_index {
1316 let start = self.start.clone();
1317 let end = self.end.clone();
1318 let snapshot_ts = self.snapshot_ts;
1319 let current_txn_id = self.current_txn_id;
1320 let data = &self.memtable.data;
1321
1322 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty()
1323 {
1324 Box::new(idx.range(start..).filter_map(move |entry| {
1325 let key = entry.key();
1326 if let Some(chain) = data.get(key)
1327 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1328 && let Some(value) = &v.value
1329 {
1330 Some((key.clone(), value.clone()))
1331 } else {
1332 None
1333 }
1334 }))
1335 } else {
1336 Box::new(idx.range(start..end).filter_map(move |entry| {
1337 let key = entry.key();
1338 if let Some(chain) = data.get(key)
1339 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1340 && let Some(value) = &v.value
1341 {
1342 Some((key.clone(), value.clone()))
1343 } else {
1344 None
1345 }
1346 }))
1347 };
1348 self.ordered_iter = Some(iter);
1349 }
1350 } else {
1351 // Unordered full scan
1352 let start = self.start.clone();
1353 let end = self.end.clone();
1354 let snapshot_ts = self.snapshot_ts;
1355 let current_txn_id = self.current_txn_id;
1356
1357 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1358 Box::new(self.memtable.data.iter().filter_map(move |entry| {
1359 let key = entry.key();
1360
1361 if key.as_slice() < start.as_slice() {
1362 return None;
1363 }
1364 if !end.is_empty() && key.as_slice() >= end.as_slice() {
1365 return None;
1366 }
1367
1368 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1369 && let Some(value) = &v.value
1370 {
1371 Some((key.clone(), value.clone()))
1372 } else {
1373 None
1374 }
1375 }));
1376 self.unordered_iter = Some(iter);
1377 }
1378 }
1379
1380 // Get next from appropriate iterator
1381 if let Some(ref mut iter) = self.ordered_iter {
1382 iter.next()
1383 } else if let Some(ref mut iter) = self.unordered_iter {
1384 iter.next()
1385 } else {
1386 None
1387 }
1388 }
1389}
1390
1391/// MemTable with MVCC support
1392///
1393/// Uses DashMap for lock-free concurrent access per key.
1394/// This eliminates the global write lock bottleneck.
1395///
1396/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1397/// Maintains a deferred sorted index for efficient scans:
1398/// - Writes: O(1) append to hot buffer
1399/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1400pub struct MvccMemTable {
1401 /// Key -> VersionChain (sharded for concurrent access)
1402 data: DashMap<Vec<u8>, VersionChain>,
1403 /// Deferred sorted index for efficient prefix/range scans (optional)
1404 /// O(1) insert to hot buffer, O(N log N) sort on first scan
1405 /// When None, scan_prefix will fall back to O(N) DashMap iteration
1406 deferred_index: Option<DeferredSortedIndex>,
1407 /// Legacy SkipMap for compatibility (used when deferred=false)
1408 ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1409 /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1410 #[allow(dead_code)]
1411 use_deferred: bool,
1412 /// Approximate size in bytes
1413 size_bytes: AtomicU64,
1414 /// Epoch-based dirty list for efficient GC
1415 dirty_list: EpochDirtyList,
1416}
1417
1418impl Default for MvccMemTable {
1419 fn default() -> Self {
1420 Self::new()
1421 }
1422}
1423
1424impl MvccMemTable {
1425 pub fn new() -> Self {
1426 Self::with_ordered_index(true)
1427 }
1428
1429 /// Create memtable with optional ordered index
1430 ///
1431 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1432 /// but scan_prefix becomes O(N) instead of O(log N + K)
1433 ///
1434 /// Uses deferred sorting by default for better write performance:
1435 /// - Writes: O(1) append to hot buffer
1436 /// - Scans: O(N log N) sort-on-demand
1437 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1438 Self::with_index_mode(enable_ordered_index, true)
1439 }
1440
1441 /// Create memtable with fine-grained control over indexing
1442 ///
1443 /// # Arguments
1444 /// * `enable_ordered_index` - Whether to maintain an ordered index
1445 /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1446 /// If false, use SkipMap (O(log N) writes)
1447 pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1448 Self {
1449 data: DashMap::new(),
1450 deferred_index: if enable_ordered_index && use_deferred {
1451 Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1452 max_unsorted_entries: 10_000, // Compact every 10K writes
1453 enabled: true,
1454 }))
1455 } else {
1456 None
1457 },
1458 ordered_index: if enable_ordered_index && !use_deferred {
1459 Some(SkipMap::new())
1460 } else {
1461 None
1462 },
1463 use_deferred,
1464 size_bytes: AtomicU64::new(0),
1465 dirty_list: EpochDirtyList::new(),
1466 }
1467 }
1468
1469 /// Write a key-value pair (uncommitted)
1470 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1471 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1472 let key_len = key.len();
1473
1474 // Track this key in the current epoch's dirty list for GC
1475 self.dirty_list.record_version(key.clone());
1476
1477 // Insert into ordered index for prefix scans (if enabled)
1478 // Deferred: O(1) append to hot buffer
1479 // SkipMap: O(log N) insert
1480 if let Some(ref idx) = self.deferred_index {
1481 idx.insert(key.clone());
1482 } else if let Some(ref idx) = self.ordered_index {
1483 idx.insert(key.clone(), ());
1484 }
1485
1486 // Use entry API for atomic get-or-insert
1487 let mut entry = self.data.entry(key).or_default();
1488
1489 // Check for write-write conflict
1490 if entry.has_write_conflict(txn_id) {
1491 return Err(SochDBError::Internal(
1492 "Write-write conflict detected".into(),
1493 ));
1494 }
1495 entry.add_uncommitted(value, txn_id);
1496 self.size_bytes
1497 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1498
1499 Ok(())
1500 }
1501
1502 /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1503 ///
1504 /// Optimizations applied (Rec 3: MVCC Batching):
1505 /// - Batched dirty list tracking: single lock acquire for all keys
1506 /// - Deferred index: O(1) append per key
1507 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1508 let mut total_size = 0u64;
1509
1510 // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1511 self.dirty_list
1512 .record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1513
1514 for (key, value) in writes {
1515 // Insert into ordered index (if enabled)
1516 // Deferred: O(1) append, SkipMap: O(log N)
1517 if let Some(ref idx) = self.deferred_index {
1518 idx.insert(key.clone());
1519 } else if let Some(ref idx) = self.ordered_index {
1520 idx.insert(key.clone(), ());
1521 }
1522
1523 let mut entry = self.data.entry(key.clone()).or_default();
1524
1525 if entry.has_write_conflict(txn_id) {
1526 return Err(SochDBError::Internal(
1527 "Write-write conflict detected".into(),
1528 ));
1529 }
1530
1531 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1532 entry.add_uncommitted(value.clone(), txn_id);
1533 total_size += (key.len() + value_size) as u64;
1534 }
1535
1536 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1537 Ok(())
1538 }
1539
1540 /// Read at snapshot timestamp, with optional current txn to see own writes
1541 pub fn read(
1542 &self,
1543 key: &[u8],
1544 snapshot_ts: u64,
1545 current_txn_id: Option<u64>,
1546 ) -> Option<Vec<u8>> {
1547 self.data.get(key).and_then(|chain| {
1548 chain
1549 .read_at(snapshot_ts, current_txn_id)
1550 .and_then(|v| v.value.clone())
1551 })
1552 }
1553
1554 /// Commit all versions for a transaction
1555 ///
1556 /// Only updates the keys that were written by this transaction (tracked in write_set).
1557 /// Accepts InlineKey for zero-allocation MVCC tracking.
1558 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1559 // Only iterate over keys we know were written - O(k) instead of O(n)
1560 for key in write_set {
1561 if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1562 chain.commit(txn_id, commit_ts);
1563 }
1564 }
1565 }
1566
1567 /// Legacy commit method (iterates all keys) - kept for backward compatibility
1568 #[allow(dead_code)]
1569 pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1570 for mut entry in self.data.iter_mut() {
1571 entry.value_mut().commit(txn_id, commit_ts);
1572 }
1573 }
1574
1575 /// Abort all versions for a transaction
1576 pub fn abort(&self, txn_id: u64) {
1577 for mut entry in self.data.iter_mut() {
1578 entry.value_mut().abort(txn_id);
1579 }
1580 }
1581
1582 /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1583 ///
1584 /// ## Performance
1585 ///
1586 /// When ordered_index is enabled: O(log N + K) complexity
1587 /// - O(log N) to seek to the first key with prefix
1588 /// - O(K) to iterate matching keys
1589 ///
1590 /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1591 ///
1592 /// ## Optimizations Applied
1593 ///
1594 /// - Pre-allocates result vector based on expected output size
1595 /// - Uses batch-friendly iteration patterns
1596 /// - Minimizes allocations during iteration
1597 /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1598 pub fn scan_prefix(
1599 &self,
1600 prefix: &[u8],
1601 snapshot_ts: u64,
1602 current_txn_id: Option<u64>,
1603 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1604 // Estimate result size for pre-allocation (use 10% of total as heuristic)
1605 let estimated_size = (self.data.len() / 10).max(64);
1606 let mut results = Vec::with_capacity(estimated_size);
1607
1608 if let Some(ref idx) = self.deferred_index {
1609 // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1610 for key in idx.range_from(prefix) {
1611 // Stop when we've passed the prefix range
1612 if !key.starts_with(prefix) {
1613 break;
1614 }
1615
1616 // O(1) lookup in DashMap for version chain
1617 if let Some(chain) = self.data.get(&key)
1618 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1619 && let Some(value) = &v.value
1620 {
1621 results.push((key, value.clone()));
1622 }
1623 }
1624 } else if let Some(ref idx) = self.ordered_index {
1625 // Fast path: O(log N) seek to first key >= prefix
1626 for entry in idx.range(prefix.to_vec()..) {
1627 let key = entry.key();
1628
1629 // Stop when we've passed the prefix range
1630 if !key.starts_with(prefix) {
1631 break;
1632 }
1633
1634 // O(1) lookup in DashMap for version chain
1635 if let Some(chain) = self.data.get(key)
1636 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1637 && let Some(value) = &v.value
1638 {
1639 results.push((key.clone(), value.clone()));
1640 }
1641 }
1642 } else {
1643 // Fallback: O(N) full DashMap scan when ordered_index is disabled
1644 // Optimized with batch-friendly iteration
1645 for entry in self.data.iter() {
1646 let key = entry.key();
1647 if !key.starts_with(prefix) {
1648 continue;
1649 }
1650 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1651 && let Some(value) = &v.value
1652 {
1653 results.push((key.clone(), value.clone()));
1654 }
1655 }
1656 }
1657
1658 results
1659 }
1660
1661 /// Optimized full scan with batch allocation
1662 ///
1663 /// For use when scanning entire tables/namespaces.
1664 /// Pre-allocates based on actual data size.
1665 pub fn scan_all(
1666 &self,
1667 snapshot_ts: u64,
1668 current_txn_id: Option<u64>,
1669 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1670 let mut results = Vec::with_capacity(self.data.len());
1671
1672 for entry in self.data.iter() {
1673 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1674 && let Some(value) = &v.value
1675 {
1676 results.push((entry.key().clone(), value.clone()));
1677 }
1678 }
1679
1680 results
1681 }
1682
1683 /// Streaming scan iterator for very large datasets
1684 ///
1685 /// Returns an iterator that yields (key, value) pairs without
1686 /// materializing the entire result set in memory.
1687 pub fn scan_prefix_iter<'a>(
1688 &'a self,
1689 prefix: &'a [u8],
1690 snapshot_ts: u64,
1691 current_txn_id: Option<u64>,
1692 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1693 self.data.iter().filter_map(move |entry| {
1694 let key = entry.key();
1695 if !key.starts_with(prefix) {
1696 return None;
1697 }
1698 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1699 && let Some(value) = &v.value
1700 {
1701 Some((key.clone(), value.clone()))
1702 } else {
1703 None
1704 }
1705 })
1706 }
1707
1708 /// Scan range
1709 pub fn scan_range(
1710 &self,
1711 start: &[u8],
1712 end: &[u8],
1713 snapshot_ts: u64,
1714 current_txn_id: Option<u64>,
1715 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1716 let mut results = Vec::new();
1717
1718 if let Some(ref idx) = self.deferred_index {
1719 // Deferred index path: sort-on-scan
1720 if end.is_empty() {
1721 for key in idx.range_from(start) {
1722 if let Some(chain) = self.data.get(&key)
1723 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1724 && let Some(value) = &v.value
1725 {
1726 results.push((key, value.clone()));
1727 }
1728 }
1729 } else {
1730 for key in idx.range(start, end) {
1731 if let Some(chain) = self.data.get(&key)
1732 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1733 && let Some(value) = &v.value
1734 {
1735 results.push((key, value.clone()));
1736 }
1737 }
1738 }
1739 } else if let Some(ref idx) = self.ordered_index {
1740 // Use range scan on SkipMap
1741 if end.is_empty() {
1742 // Unbounded end
1743 for entry in idx.range(start.to_vec()..) {
1744 let key = entry.key();
1745 if let Some(chain) = self.data.get(key)
1746 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1747 && let Some(value) = &v.value
1748 {
1749 results.push((key.clone(), value.clone()));
1750 }
1751 }
1752 } else {
1753 for entry in idx.range(start.to_vec()..end.to_vec()) {
1754 let key = entry.key();
1755 if let Some(chain) = self.data.get(key)
1756 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1757 && let Some(value) = &v.value
1758 {
1759 results.push((key.clone(), value.clone()));
1760 }
1761 }
1762 }
1763 } else {
1764 // Fallback to full scan if no ordered index
1765 for entry in self.data.iter() {
1766 let key = entry.key();
1767
1768 if key.as_slice() < start {
1769 continue;
1770 }
1771 if !end.is_empty() && key.as_slice() >= end {
1772 continue;
1773 }
1774
1775 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1776 && let Some(value) = &v.value
1777 {
1778 results.push((key.clone(), value.clone()));
1779 }
1780 }
1781 }
1782
1783 results
1784 }
1785
1786 /// Streaming range scan iterator for very large datasets
1787 ///
1788 /// Returns an iterator that yields (key, value) pairs without
1789 /// materializing the entire result set in memory. Uses the ordered
1790 /// index when available for O(log N + K) complexity.
1791 ///
1792 /// ## Zero-Allocation Design
1793 ///
1794 /// While the iterator itself cannot avoid allocations for returned
1795 /// values (since the caller needs ownership), it avoids:
1796 /// - Pre-materializing all results
1797 /// - Intermediate buffers
1798 /// - Repeated key comparisons for already-visited entries
1799 ///
1800 /// ## Usage
1801 ///
1802 /// ```ignore
1803 /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1804 /// // Process each result as it arrives
1805 /// // Memory usage is O(1) per iteration, not O(N) total
1806 /// }
1807 /// ```
1808 pub fn scan_range_iter<'a>(
1809 &'a self,
1810 start: &'a [u8],
1811 end: &'a [u8],
1812 snapshot_ts: u64,
1813 current_txn_id: Option<u64>,
1814 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1815 // Compact deferred index before scanning if needed
1816 if let Some(ref idx) = self.deferred_index {
1817 idx.compact();
1818 }
1819
1820 // Use either ordered index or full scan
1821 let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1822
1823 // Create iterator based on availability of ordered index
1824 ScanRangeIterator {
1825 memtable: self,
1826 start: start.to_vec(),
1827 end: end.to_vec(),
1828 snapshot_ts,
1829 current_txn_id,
1830 use_ordered,
1831 ordered_iter: None,
1832 unordered_iter: None,
1833 initialized: false,
1834 }
1835 }
1836
1837 /// Get approximate size
1838 pub fn size(&self) -> u64 {
1839 self.size_bytes.load(Ordering::Relaxed)
1840 }
1841
1842 /// Garbage collect old versions using epoch-based dirty list
1843 ///
1844 /// O(expired_versions) instead of O(all_versions)
1845 /// Only visits keys that had versions created in the old epoch.
1846 pub fn gc(&self, min_active_ts: u64) -> usize {
1847 // Advance epoch and get the dirty keys from the old epoch
1848 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1849
1850 if dirty_keys.is_empty() {
1851 return 0;
1852 }
1853
1854 let mut gc_count = 0;
1855
1856 // Only visit keys that were modified in the old epoch
1857 // Use a HashSet to deduplicate keys that were written multiple times
1858 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1859
1860 for key in unique_keys {
1861 if let Some(mut entry) = self.data.get_mut(&key) {
1862 let before = entry.value().version_count();
1863 entry.value_mut().gc(min_active_ts);
1864 gc_count += before.saturating_sub(entry.value().version_count());
1865 }
1866 }
1867
1868 gc_count
1869 }
1870
1871 /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1872 #[allow(dead_code)]
1873 pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1874 let mut gc_count = 0;
1875
1876 for mut entry in self.data.iter_mut() {
1877 let before = entry.value().version_count();
1878 entry.value_mut().gc(min_active_ts);
1879 gc_count += before.saturating_sub(entry.value().version_count());
1880 }
1881
1882 gc_count
1883 }
1884}
1885
1886// =============================================================================
1887// Rec 11: Unified MvccStore Implementation for MvccMemTable
1888// =============================================================================
1889
1890impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1891 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1892 self.read(key, snapshot_ts, txn_id)
1893 }
1894
1895 fn mvcc_put(
1896 &self,
1897 key: &[u8],
1898 value: Option<Vec<u8>>,
1899 txn_id: u64,
1900 ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1901 let mut entry = self.data.entry(key.to_vec()).or_default();
1902 if entry.has_write_conflict(txn_id) {
1903 return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1904 }
1905 entry.add_uncommitted(value, txn_id);
1906 Ok(())
1907 }
1908
1909 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1910 if let Some(mut chain) = self.data.get_mut(key) {
1911 return chain.commit(txn_id, commit_ts);
1912 }
1913 false
1914 }
1915
1916 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1917 if let Some(mut chain) = self.data.get_mut(key) {
1918 chain.abort(txn_id);
1919 }
1920 }
1921
1922 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1923 self.data
1924 .get(key)
1925 .map(|chain| chain.has_write_conflict(txn_id))
1926 .unwrap_or(false)
1927 }
1928
1929 fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1930 let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1931 for mut entry in self.data.iter_mut() {
1932 stats.keys_scanned += 1;
1933 let before = entry.value().version_count();
1934 entry.value_mut().gc(min_ts);
1935 stats.versions_removed += before.saturating_sub(entry.value().version_count());
1936 }
1937 stats
1938 }
1939
1940 fn mvcc_key_count(&self) -> usize {
1941 self.data.len()
1942 }
1943}
1944
1945// ============================================================================
1946// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1947// ============================================================================
1948
1949use crate::key_buffer::ArenaKeyHandle;
1950
1951/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1952struct ArenaEpochDirtyList {
1953 epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1954 current_epoch: AtomicU64,
1955}
1956
1957impl ArenaEpochDirtyList {
1958 fn new() -> Self {
1959 Self {
1960 epochs: [
1961 parking_lot::Mutex::new(Vec::new()),
1962 parking_lot::Mutex::new(Vec::new()),
1963 parking_lot::Mutex::new(Vec::new()),
1964 parking_lot::Mutex::new(Vec::new()),
1965 ],
1966 current_epoch: AtomicU64::new(0),
1967 }
1968 }
1969
1970 #[inline]
1971 fn record_version(&self, key: ArenaKeyHandle) {
1972 let epoch = self.current_epoch.load(Ordering::Relaxed);
1973 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1974 self.epochs[idx].lock().push(key);
1975 }
1976
1977 fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1978 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1979 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1980 let mut guard = self.epochs[old_idx].lock();
1981 let keys = std::mem::take(&mut *guard);
1982 (old_epoch, keys)
1983 }
1984}
1985
1986/// Arena-backed MVCC MemTable with optimized key storage
1987///
1988/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1989/// reducing per-write allocations from 3 to 1:
1990///
1991/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1992/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1993///
1994/// ## Performance
1995///
1996/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1997/// by reducing:
1998/// - Heap allocations: 3 → 1 per write
1999/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
2000pub struct ArenaMvccMemTable {
2001 /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
2002 data: DashMap<ArenaKeyHandle, VersionChain>,
2003 /// Ordered index for prefix scans
2004 ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
2005 /// Approximate size in bytes
2006 size_bytes: AtomicU64,
2007 /// Epoch-based dirty list (arena-backed)
2008 dirty_list: ArenaEpochDirtyList,
2009}
2010
2011impl ArenaMvccMemTable {
2012 pub fn new() -> Self {
2013 Self::with_ordered_index(true)
2014 }
2015
2016 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
2017 Self {
2018 data: DashMap::new(),
2019 ordered_index: if enable_ordered_index {
2020 Some(SkipMap::new())
2021 } else {
2022 None
2023 },
2024 size_bytes: AtomicU64::new(0),
2025 dirty_list: ArenaEpochDirtyList::new(),
2026 }
2027 }
2028
2029 /// Write a key-value pair using arena key handle
2030 ///
2031 /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
2032 /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
2033 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2034 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2035 let key_len = key.len();
2036
2037 // Create ONE ArenaKeyHandle - this is the only allocation for the key
2038 let key_handle = ArenaKeyHandle::new(key);
2039
2040 // Track in dirty list (O(1) copy of 16-byte handle)
2041 self.dirty_list.record_version(key_handle.clone());
2042
2043 // Insert into ordered index (O(1) copy of 16-byte handle)
2044 if let Some(ref idx) = self.ordered_index {
2045 idx.insert(key_handle.clone(), ());
2046 }
2047
2048 // Use entry API with the handle
2049 let mut entry = self.data.entry(key_handle).or_default();
2050
2051 if entry.has_write_conflict(txn_id) {
2052 return Err(SochDBError::Internal(
2053 "Write-write conflict detected".into(),
2054 ));
2055 }
2056 entry.add_uncommitted(value, txn_id);
2057 self.size_bytes
2058 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2059
2060 Ok(())
2061 }
2062
2063 /// Write batch using arena key handles
2064 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2065 let mut total_size = 0u64;
2066
2067 for (key, value) in writes {
2068 let key_handle = ArenaKeyHandle::new(key);
2069
2070 self.dirty_list.record_version(key_handle.clone());
2071
2072 if let Some(ref idx) = self.ordered_index {
2073 idx.insert(key_handle.clone(), ());
2074 }
2075
2076 let mut entry = self.data.entry(key_handle).or_default();
2077
2078 if entry.has_write_conflict(txn_id) {
2079 return Err(SochDBError::Internal(
2080 "Write-write conflict detected".into(),
2081 ));
2082 }
2083
2084 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2085 entry.add_uncommitted(value.clone(), txn_id);
2086 total_size += (key.len() + value_size) as u64;
2087 }
2088
2089 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2090 Ok(())
2091 }
2092
2093 /// Read at snapshot timestamp
2094 pub fn read(
2095 &self,
2096 key: &[u8],
2097 snapshot_ts: u64,
2098 current_txn_id: Option<u64>,
2099 ) -> Option<Vec<u8>> {
2100 // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2101 let key_handle = ArenaKeyHandle::new(key);
2102 self.data.get(&key_handle).and_then(|chain| {
2103 chain
2104 .read_at(snapshot_ts, current_txn_id)
2105 .and_then(|v| v.value.clone())
2106 })
2107 }
2108
2109 /// Commit transaction
2110 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2111 for key in write_set {
2112 let key_handle = ArenaKeyHandle::new(key.as_slice());
2113 if let Some(mut chain) = self.data.get_mut(&key_handle) {
2114 chain.commit(txn_id, commit_ts);
2115 }
2116 }
2117 }
2118
2119 /// Abort transaction
2120 pub fn abort(&self, txn_id: u64) {
2121 for mut entry in self.data.iter_mut() {
2122 entry.value_mut().abort(txn_id);
2123 }
2124 }
2125
2126 /// Scan prefix
2127 pub fn scan_prefix(
2128 &self,
2129 prefix: &[u8],
2130 snapshot_ts: u64,
2131 current_txn_id: Option<u64>,
2132 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2133 let mut results = Vec::new();
2134 let prefix_handle = ArenaKeyHandle::new(prefix);
2135
2136 if let Some(ref idx) = self.ordered_index {
2137 for entry in idx.range(prefix_handle..) {
2138 let key = entry.key();
2139
2140 if !key.as_bytes().starts_with(prefix) {
2141 break;
2142 }
2143
2144 if let Some(chain) = self.data.get(key)
2145 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2146 && let Some(value) = &v.value
2147 {
2148 results.push((key.as_bytes().to_vec(), value.clone()));
2149 }
2150 }
2151 } else {
2152 for entry in self.data.iter() {
2153 let key = entry.key();
2154 if !key.as_bytes().starts_with(prefix) {
2155 continue;
2156 }
2157 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2158 && let Some(value) = &v.value
2159 {
2160 results.push((key.as_bytes().to_vec(), value.clone()));
2161 }
2162 }
2163 }
2164
2165 results
2166 }
2167
2168 /// Get approximate size
2169 pub fn size(&self) -> u64 {
2170 self.size_bytes.load(Ordering::Relaxed)
2171 }
2172
2173 /// Garbage collect old versions
2174 pub fn gc(&self, min_active_ts: u64) -> usize {
2175 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2176
2177 if dirty_keys.is_empty() {
2178 return 0;
2179 }
2180
2181 let mut gc_count = 0;
2182 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2183
2184 for key in unique_keys {
2185 if let Some(mut entry) = self.data.get_mut(&key) {
2186 let before = entry.value().version_count();
2187 entry.value_mut().gc(min_active_ts);
2188 gc_count += before.saturating_sub(entry.value().version_count());
2189 }
2190 }
2191
2192 gc_count
2193 }
2194}
2195
2196impl Default for ArenaMvccMemTable {
2197 fn default() -> Self {
2198 Self::new()
2199 }
2200}
2201
2202// ============================================================================
2203// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2204// ============================================================================
2205
2206/// Configuration for memtable type selection
2207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2208pub enum MemTableType {
2209 /// Standard MVCC memtable with deferred sorting
2210 /// Best for: general workloads, balanced read/write
2211 Standard,
2212 /// Arena-backed memtable with reduced allocations
2213 /// Best for: write-heavy workloads, large keys
2214 Arena,
2215}
2216
2217impl Default for MemTableType {
2218 fn default() -> Self {
2219 // Default to Standard which now has deferred sorting
2220 MemTableType::Standard
2221 }
2222}
2223
2224/// Unified memtable abstraction using enum dispatch
2225///
2226/// This pattern provides:
2227/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2228/// - Type-safe switching between implementations
2229/// - Easy extensibility for future memtable types
2230///
2231/// ## Why Enum over Trait Object?
2232///
2233/// - Hot path performance: enum match is a single branch vs vtable indirection
2234/// - Cache friendliness: no pointer chasing
2235/// - Inlining: compiler can inline through enum dispatch
2236pub enum MemTableKind {
2237 Standard(MvccMemTable),
2238 Arena(ArenaMvccMemTable),
2239}
2240
2241impl MemTableKind {
2242 /// Create a new memtable of the specified type
2243 pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2244 match kind {
2245 MemTableType::Standard => {
2246 MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2247 }
2248 MemTableType::Arena => {
2249 MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2250 }
2251 }
2252 }
2253
2254 /// Write a key-value pair
2255 #[inline]
2256 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2257 match self {
2258 MemTableKind::Standard(m) => m.write(key, value, txn_id),
2259 MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2260 }
2261 }
2262
2263 /// Write batch of key-value pairs
2264 #[inline]
2265 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2266 match self {
2267 MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2268 MemTableKind::Arena(m) => {
2269 // Convert to arena-compatible format
2270 let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2271 .iter()
2272 .map(|(k, v)| (k.as_slice(), v.clone()))
2273 .collect();
2274 m.write_batch(&arena_writes, txn_id)
2275 }
2276 }
2277 }
2278
2279 /// Read at snapshot timestamp
2280 #[inline]
2281 pub fn read(
2282 &self,
2283 key: &[u8],
2284 snapshot_ts: u64,
2285 current_txn_id: Option<u64>,
2286 ) -> Option<Vec<u8>> {
2287 match self {
2288 MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2289 MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2290 }
2291 }
2292
2293 /// Commit transaction
2294 #[inline]
2295 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2296 match self {
2297 MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2298 MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2299 }
2300 }
2301
2302 /// Abort transaction
2303 #[inline]
2304 pub fn abort(&self, txn_id: u64) {
2305 match self {
2306 MemTableKind::Standard(m) => m.abort(txn_id),
2307 MemTableKind::Arena(m) => m.abort(txn_id),
2308 }
2309 }
2310
2311 /// Scan prefix
2312 #[inline]
2313 pub fn scan_prefix(
2314 &self,
2315 prefix: &[u8],
2316 snapshot_ts: u64,
2317 current_txn_id: Option<u64>,
2318 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2319 match self {
2320 MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2321 MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2322 }
2323 }
2324
2325 /// Scan range
2326 #[inline]
2327 pub fn scan_range(
2328 &self,
2329 start: &[u8],
2330 end: &[u8],
2331 snapshot_ts: u64,
2332 current_txn_id: Option<u64>,
2333 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2334 match self {
2335 MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2336 MemTableKind::Arena(m) => {
2337 // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2338 let mut results = Vec::new();
2339 if let Some(ref idx) = m.ordered_index {
2340 let start_handle = ArenaKeyHandle::new(start);
2341 let end_handle = ArenaKeyHandle::new(end);
2342
2343 if end.is_empty() {
2344 for entry in idx.range(start_handle..) {
2345 let key = entry.key();
2346 if let Some(chain) = m.data.get(key)
2347 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2348 && let Some(value) = &v.value
2349 {
2350 results.push((key.as_bytes().to_vec(), value.clone()));
2351 }
2352 }
2353 } else {
2354 for entry in idx.range(start_handle..end_handle) {
2355 let key = entry.key();
2356 if let Some(chain) = m.data.get(key)
2357 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2358 && let Some(value) = &v.value
2359 {
2360 results.push((key.as_bytes().to_vec(), value.clone()));
2361 }
2362 }
2363 }
2364 } else {
2365 for entry in m.data.iter() {
2366 let key = entry.key();
2367 let key_bytes = key.as_bytes();
2368 if key_bytes < start {
2369 continue;
2370 }
2371 if !end.is_empty() && key_bytes >= end {
2372 continue;
2373 }
2374 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2375 && let Some(value) = &v.value
2376 {
2377 results.push((key_bytes.to_vec(), value.clone()));
2378 }
2379 }
2380 }
2381 results
2382 }
2383 }
2384 }
2385
2386 /// Scan range iterator (returns collected results for now)
2387 #[inline]
2388 pub fn scan_range_iter<'a>(
2389 &'a self,
2390 start: &'a [u8],
2391 end: &'a [u8],
2392 snapshot_ts: u64,
2393 current_txn_id: Option<u64>,
2394 ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2395 match self {
2396 MemTableKind::Standard(m) => {
2397 Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2398 }
2399 MemTableKind::Arena(_) => {
2400 // Arena version returns collected results as iterator
2401 let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2402 Box::new(results.into_iter())
2403 }
2404 }
2405 }
2406
2407 /// Get approximate size
2408 #[inline]
2409 pub fn size(&self) -> u64 {
2410 match self {
2411 MemTableKind::Standard(m) => m.size(),
2412 MemTableKind::Arena(m) => m.size(),
2413 }
2414 }
2415
2416 /// Garbage collect old versions
2417 #[inline]
2418 pub fn gc(&self, min_active_ts: u64) -> usize {
2419 match self {
2420 MemTableKind::Standard(m) => m.gc(min_active_ts),
2421 MemTableKind::Arena(m) => m.gc(min_active_ts),
2422 }
2423 }
2424
2425 /// Get the kind of memtable
2426 pub fn kind(&self) -> MemTableType {
2427 match self {
2428 MemTableKind::Standard(_) => MemTableType::Standard,
2429 MemTableKind::Arena(_) => MemTableType::Arena,
2430 }
2431 }
2432}
2433
2434/// Durable storage engine with full ACID support
2435pub struct DurableStorage {
2436 /// Path to storage directory
2437 path: PathBuf,
2438 /// Write-ahead log
2439 wal: Arc<TxnWal>,
2440 /// MVCC manager
2441 mvcc: Arc<MvccManager>,
2442 /// In-memory data (unified abstraction over Standard/Arena)
2443 memtable: Arc<MemTableKind>,
2444 /// Per-transaction WAL buffers for batched writes
2445 /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2446 /// At commit, buffer is flushed to WAL with single lock acquisition
2447 txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2448 /// Group commit buffer (optional)
2449 group_commit: Option<Arc<EventDrivenGroupCommit>>,
2450 /// Recovery state
2451 needs_recovery: AtomicU64, // 1 = needs recovery
2452 /// Last checkpoint LSN
2453 last_checkpoint_lsn: AtomicU64,
2454 /// Synchronous mode (like SQLite's PRAGMA synchronous)
2455 /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2456 sync_mode: AtomicU64,
2457 /// Commits since last sync (for NORMAL mode)
2458 commits_since_sync: AtomicU64,
2459 /// Adaptive batch sizing for NORMAL mode (Little's Law)
2460 /// Arrival rate in requests/sec × 1000 for precision
2461 arrival_rate_ema: AtomicU64,
2462 /// Last commit timestamp in microseconds
2463 last_commit_us: AtomicU64,
2464 /// Estimated fsync latency in microseconds
2465 fsync_latency_us: AtomicU64,
2466 /// Database lock for exclusive access (None = no locking)
2467 #[allow(dead_code)]
2468 db_lock: Option<crate::lock::DatabaseLock>,
2469 /// Whether at-rest encryption is active for this instance (drives the live
2470 /// per-instance durability matrix). Set from the resolved keyring at open.
2471 at_rest_encrypted: bool,
2472 /// Whether Point-in-Time Recovery is enabled for this database (Task 3B PITR
2473 /// phase 1). Derived from the presence of `wal.manifest` at open (the manifest
2474 /// is the single source of truth), or set by `enable_point_in_time_recovery`.
2475 /// When enabled, the destructive `truncate_wal()` is forbidden (segment
2476 /// sealing is the PITR-safe replacement, landing in a later phase) so the WAL
2477 /// record ordinal stays a stable, durable monotonic LSN across restarts.
2478 pitr_enabled: AtomicBool,
2479}
2480
2481/// Encryption configuration for opening a [`DurableStorage`].
2482///
2483/// `disabled()` (the default for the legacy open variants) keeps the database
2484/// plaintext and byte-compatible with pre-encryption binaries. `with_kek()`
2485/// supplies the Key-Encryption-Key — the operator secret that *wraps* a
2486/// per-database data key; it is never used verbatim as the cipher key (see
2487/// [`crate::keyring`]). A wrong/missing KEK for an encrypted database fails
2488/// closed at open (the DB will refuse to open, never silently read as plaintext).
2489pub struct StorageEncryption {
2490 /// The KEK. `None` ⇒ plaintext database.
2491 pub kek: Option<EncryptionKey>,
2492 /// Human-readable identifier for the key source (e.g. "env:SOCHDB_ENCRYPTION_KEY",
2493 /// "embedded", "kms:..."). Bound into the keyring for provenance.
2494 pub source_id: String,
2495}
2496
2497impl StorageEncryption {
2498 /// Plaintext (no encryption) — the default.
2499 pub fn disabled() -> Self {
2500 Self {
2501 kek: None,
2502 source_id: "none".to_string(),
2503 }
2504 }
2505
2506 /// Encrypt at rest under the given KEK.
2507 pub fn with_kek(kek: EncryptionKey, source_id: impl Into<String>) -> Self {
2508 Self {
2509 kek: Some(kek),
2510 source_id: source_id.into(),
2511 }
2512 }
2513
2514 /// Whether a key is configured (i.e. encryption is requested).
2515 pub fn is_enabled(&self) -> bool {
2516 self.kek.is_some()
2517 }
2518}
2519
2520impl DurableStorage {
2521 /// Open or create durable storage at path
2522 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2523 Self::open_with_config(path, true)
2524 }
2525
2526 /// Open with configurable ordered index
2527 ///
2528 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2529 /// but scan_prefix becomes O(N) instead of O(log N + K)
2530 pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2531 Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2532 }
2533
2534 /// Open with arena-backed memtable for write-heavy workloads
2535 ///
2536 /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2537 /// Best for workloads with:
2538 /// - High write throughput
2539 /// - Large keys (reduces allocation overhead)
2540 /// - Minimal concurrent reads during writes
2541 pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2542 Self::open_with_full_config(path, true, MemTableType::Arena)
2543 }
2544
2545 /// Open with full configuration options
2546 ///
2547 /// # Arguments
2548 /// * `path` - Storage directory path
2549 /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2550 /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2551 ///
2552 /// # Locking
2553 ///
2554 /// Acquires an exclusive advisory lock on the database directory.
2555 /// This prevents concurrent multi-process access which would corrupt data.
2556 /// If another process has the database open, returns `Err(DatabaseLocked)`.
2557 pub fn open_with_full_config<P: AsRef<Path>>(
2558 path: P,
2559 enable_ordered_index: bool,
2560 memtable_type: MemTableType,
2561 ) -> Result<Self> {
2562 Self::open_with_full_config_internal(
2563 path,
2564 enable_ordered_index,
2565 memtable_type,
2566 true,
2567 StorageEncryption::disabled(),
2568 )
2569 }
2570
2571 /// Open with at-rest encryption configured via [`StorageEncryption`].
2572 ///
2573 /// With `StorageEncryption::disabled()` this is identical to
2574 /// [`Self::open_with_full_config`]. With a KEK, the database is opened (or
2575 /// created) encrypted: a per-DB data key is generated and wrapped into the
2576 /// keyring on first open, and a wrong/missing key on a subsequent open fails
2577 /// closed. Reads are unaffected (the live read path is in-memory); only the
2578 /// WAL write/recovery path pays the AEAD cost.
2579 pub fn open_with_encryption<P: AsRef<Path>>(
2580 path: P,
2581 enable_ordered_index: bool,
2582 memtable_type: MemTableType,
2583 encryption: StorageEncryption,
2584 ) -> Result<Self> {
2585 Self::open_with_full_config_internal(
2586 path,
2587 enable_ordered_index,
2588 memtable_type,
2589 true,
2590 encryption,
2591 )
2592 }
2593
2594 /// The live, per-instance durability capabilities of THIS database.
2595 ///
2596 /// Unlike the build-level [`crate::durability_capabilities`] (which reports
2597 /// defaults), this reflects the actual resolved state — notably whether
2598 /// at-rest encryption is active for this opened instance.
2599 pub fn durability_capabilities(&self) -> DurabilityCapabilities {
2600 DurabilityCapabilities {
2601 crash_recovery: true,
2602 at_rest_encryption: self.at_rest_encrypted,
2603 // Point-in-time recovery is live (recover_to) when PITR is enabled:
2604 // the WAL is fully retained and can be replayed to an LSN/timestamp.
2605 point_in_time_recovery: self.pitr_enabled.load(Ordering::SeqCst),
2606 // ARIES / fencing remain unwired on the live path.
2607 aries_checkpoint: false,
2608 wal_fencing: false,
2609 }
2610 }
2611
2612 /// Whether at-rest encryption is active for this instance.
2613 pub fn is_encrypted(&self) -> bool {
2614 self.at_rest_encrypted
2615 }
2616
2617 /// Open without locking (for testing crash recovery scenarios)
2618 ///
2619 /// # Safety
2620 /// This should ONLY be used in tests that simulate crashes by forgetting
2621 /// the storage instance. In production, always use `open_with_full_config`.
2622 #[cfg(test)]
2623 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2624 Self::open_with_full_config_internal(
2625 path,
2626 true,
2627 MemTableType::Standard,
2628 false,
2629 StorageEncryption::disabled(),
2630 )
2631 }
2632
2633 /// Open an ephemeral (in-memory-like) DurableStorage backed by a temp directory.
2634 ///
2635 /// Uses the full DurableStorage engine (WAL, MVCC, SSI) but writes to a
2636 /// temporary directory that is automatically cleaned up when the
2637 /// `EphemeralHandle` is dropped. This ensures test and production code paths
2638 /// are identical — bugs found in tests are guaranteed to reproduce in production.
2639 ///
2640 /// # Returns
2641 /// An `EphemeralHandle` that owns both the storage and the temp directory.
2642 /// Access the storage via `handle.storage()` or `Deref` coercion.
2643 ///
2644 /// # Example
2645 /// ```ignore
2646 /// let handle = DurableStorage::open_ephemeral()?;
2647 /// let txn = handle.begin_transaction()?;
2648 /// handle.write(txn, b"key".to_vec(), b"value".to_vec())?;
2649 /// handle.commit(txn)?;
2650 /// // temp directory cleaned up when `handle` drops
2651 /// ```
2652 pub fn open_ephemeral() -> Result<EphemeralHandle> {
2653 let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2654 let storage = Self::open_with_full_config_internal(
2655 tmp.path(),
2656 true,
2657 MemTableType::Standard,
2658 false, // No lock needed for ephemeral
2659 StorageEncryption::disabled(),
2660 )?;
2661 Ok(EphemeralHandle {
2662 storage,
2663 _tmpdir: tmp,
2664 })
2665 }
2666
2667 /// Open an ephemeral DurableStorage with group commit enabled.
2668 ///
2669 /// Same as `open_ephemeral()` but with group commit for higher throughput.
2670 pub fn open_ephemeral_with_group_commit() -> Result<EphemeralHandle> {
2671 let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2672 let mut storage = Self::open_with_full_config_internal(
2673 tmp.path(),
2674 true,
2675 MemTableType::Standard,
2676 false,
2677 StorageEncryption::disabled(),
2678 )?;
2679
2680 let wal = storage.wal.clone();
2681 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2682 for &txn_id in txn_ids {
2683 let entry = TxnWalEntry::txn_commit(txn_id);
2684 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2685 }
2686 wal.flush().map_err(|e| e.to_string())?;
2687 wal.sync().map_err(|e| e.to_string())?;
2688 Ok(std::time::SystemTime::now()
2689 .duration_since(std::time::UNIX_EPOCH)
2690 .unwrap()
2691 .as_micros() as u64)
2692 });
2693 storage.group_commit = Some(Arc::new(gc));
2694
2695 Ok(EphemeralHandle {
2696 storage,
2697 _tmpdir: tmp,
2698 })
2699 }
2700
2701 fn open_with_full_config_internal<P: AsRef<Path>>(
2702 path: P,
2703 enable_ordered_index: bool,
2704 memtable_type: MemTableType,
2705 acquire_lock: bool,
2706 encryption: StorageEncryption,
2707 ) -> Result<Self> {
2708 let path = path.as_ref().to_path_buf();
2709 std::fs::create_dir_all(&path)?;
2710
2711 // Acquire exclusive lock on database directory (unless disabled for testing)
2712 let db_lock = if acquire_lock {
2713 Some(
2714 crate::lock::DatabaseLock::acquire(&path)
2715 .map_err(|e| SochDBError::LockError(e.to_string()))?,
2716 )
2717 } else {
2718 None
2719 };
2720
2721 let wal_path = path.join("wal.log");
2722
2723 // Resolve at-rest encryption from the keyring BEFORE touching the WAL.
2724 // - A fresh DB (no wal.log yet) with a KEK ⇒ create an encrypted keyring.
2725 // - An existing plaintext DB with a KEK ⇒ refused (must migrate explicitly).
2726 // - An encrypted DB with a wrong/missing KEK ⇒ refused fail-closed.
2727 let is_new_db = !wal_path.exists();
2728 let enc_state = keyring::load_or_init(
2729 &path,
2730 encryption.kek.as_ref(),
2731 &encryption.source_id,
2732 is_new_db,
2733 )?;
2734 let at_rest_encrypted = enc_state.is_encrypted();
2735 let wal = Arc::new(TxnWal::new_with_encryption(
2736 &wal_path,
2737 enc_state.engine(),
2738 enc_state.db_uuid(),
2739 enc_state.key_epoch(),
2740 )?);
2741
2742 // PITR anchor: the presence of wal.manifest is the single source of truth
2743 // that this DB is PITR-enabled. If present, seed last_checkpoint_lsn from
2744 // it (it is otherwise in-memory and lost on restart).
2745 let (pitr_enabled, initial_checkpoint_lsn) =
2746 if crate::wal_manifest::WalManifest::exists(&path) {
2747 let m = crate::wal_manifest::WalManifest::load(&path)?;
2748 (true, m.last_checkpoint_lsn)
2749 } else {
2750 (false, 0)
2751 };
2752
2753 let storage = Self {
2754 path,
2755 wal: wal.clone(),
2756 mvcc: Arc::new(MvccManager::new()),
2757 memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2758 txn_write_buffers: DashMap::new(),
2759 group_commit: None,
2760 needs_recovery: AtomicU64::new(0),
2761 last_checkpoint_lsn: AtomicU64::new(initial_checkpoint_lsn),
2762 sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2763 commits_since_sync: AtomicU64::new(0),
2764 // Adaptive batch sizing (Little's Law)
2765 arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2766 last_commit_us: AtomicU64::new(0),
2767 fsync_latency_us: AtomicU64::new(5000), // 5ms default
2768 db_lock,
2769 at_rest_encrypted,
2770 pitr_enabled: AtomicBool::new(pitr_enabled),
2771 };
2772
2773 // Check if recovery needed
2774 if storage.check_recovery_needed()? {
2775 storage.needs_recovery.store(1, Ordering::SeqCst);
2776 }
2777
2778 Ok(storage)
2779 }
2780
2781 /// Open with group commit enabled
2782 pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2783 Self::open_with_group_commit_and_config(path, true)
2784 }
2785
2786 /// Open with group commit and configurable ordered index
2787 pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2788 path: P,
2789 enable_ordered_index: bool,
2790 ) -> Result<Self> {
2791 let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2792
2793 let wal = storage.wal.clone();
2794 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2795 // Write all commit records WITHOUT flushing (batch them)
2796 for &txn_id in txn_ids {
2797 let entry = TxnWalEntry::txn_commit(txn_id);
2798 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2799 }
2800
2801 // Then do a SINGLE flush + fsync for the entire batch
2802 wal.flush().map_err(|e| e.to_string())?;
2803 wal.sync().map_err(|e| e.to_string())?;
2804
2805 // Return commit timestamp
2806 Ok(std::time::SystemTime::now()
2807 .duration_since(std::time::UNIX_EPOCH)
2808 .unwrap()
2809 .as_micros() as u64)
2810 });
2811
2812 storage.group_commit = Some(Arc::new(gc));
2813 Ok(storage)
2814 }
2815
2816 /// Open with IndexPolicy for automatic memtable/index configuration
2817 ///
2818 /// This is the recommended constructor for new code. The policy determines:
2819 /// - Whether to use ordered index (ScanOptimized only)
2820 /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2821 /// - Default settings optimized for the workload pattern
2822 ///
2823 /// # Arguments
2824 /// * `path` - Storage directory path
2825 /// * `policy` - Index policy determining write/scan tradeoffs
2826 /// * `group_commit` - Whether to enable group commit for throughput
2827 pub fn open_with_policy<P: AsRef<Path>>(
2828 path: P,
2829 policy: crate::index_policy::IndexPolicy,
2830 group_commit: bool,
2831 ) -> Result<Self> {
2832 Self::open_with_policy_encrypted(path, policy, group_commit, StorageEncryption::disabled())
2833 }
2834
2835 /// Policy-based open with at-rest encryption configured.
2836 ///
2837 /// Identical to [`Self::open_with_policy`] but threads a [`StorageEncryption`]
2838 /// down to the keyring/WAL so the embedded `Database` kernel can open (or
2839 /// create) an encrypted database. `StorageEncryption::disabled()` is exactly
2840 /// the plaintext behaviour.
2841 pub fn open_with_policy_encrypted<P: AsRef<Path>>(
2842 path: P,
2843 policy: crate::index_policy::IndexPolicy,
2844 group_commit: bool,
2845 encryption: StorageEncryption,
2846 ) -> Result<Self> {
2847 use crate::index_policy::IndexPolicy;
2848
2849 // Derive configuration from policy
2850 let (enable_ordered_index, memtable_type) = match policy {
2851 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2852 // Write-heavy: no ordered index, use arena for reduced allocations
2853 (false, MemTableType::Arena)
2854 }
2855 IndexPolicy::Balanced => {
2856 // Mixed OLTP: deferred sorting (already implemented in Standard)
2857 (true, MemTableType::Standard)
2858 }
2859 IndexPolicy::ScanOptimized => {
2860 // Scan-heavy: maintain ordered index
2861 (true, MemTableType::Standard)
2862 }
2863 };
2864
2865 if group_commit {
2866 let mut storage =
2867 Self::open_with_encryption(path, enable_ordered_index, memtable_type, encryption)?;
2868
2869 let wal = storage.wal.clone();
2870 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2871 for &txn_id in txn_ids {
2872 let entry = TxnWalEntry::txn_commit(txn_id);
2873 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2874 }
2875 wal.flush().map_err(|e| e.to_string())?;
2876 wal.sync().map_err(|e| e.to_string())?;
2877 Ok(std::time::SystemTime::now()
2878 .duration_since(std::time::UNIX_EPOCH)
2879 .unwrap()
2880 .as_micros() as u64)
2881 });
2882 storage.group_commit = Some(Arc::new(gc));
2883 Ok(storage)
2884 } else {
2885 Self::open_with_encryption(path, enable_ordered_index, memtable_type, encryption)
2886 }
2887 }
2888
2889 /// Open storage for concurrent mode (multi-reader, single-writer)
2890 ///
2891 /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2892 /// Coordination is handled by the concurrent MVCC layer instead.
2893 ///
2894 /// # Safety
2895 ///
2896 /// This must ONLY be called from `Database::open_concurrent()` which
2897 /// manages the concurrent MVCC coordination. Direct use will cause
2898 /// data corruption.
2899 pub fn open_for_concurrent<P: AsRef<Path>>(
2900 path: P,
2901 policy: crate::index_policy::IndexPolicy,
2902 ) -> Result<Self> {
2903 Self::open_for_concurrent_encrypted(path, policy, StorageEncryption::disabled())
2904 }
2905
2906 /// Concurrent-mode open with at-rest encryption configured.
2907 ///
2908 /// Identical to [`Self::open_for_concurrent`] but threads a
2909 /// [`StorageEncryption`] through, so an encrypted database can also be opened
2910 /// in concurrent (multi-reader) mode rather than failing closed for lack of a
2911 /// key channel.
2912 pub fn open_for_concurrent_encrypted<P: AsRef<Path>>(
2913 path: P,
2914 policy: crate::index_policy::IndexPolicy,
2915 encryption: StorageEncryption,
2916 ) -> Result<Self> {
2917 use crate::index_policy::IndexPolicy;
2918
2919 let (enable_ordered_index, memtable_type) = match policy {
2920 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => (false, MemTableType::Arena),
2921 IndexPolicy::Balanced => (true, MemTableType::Standard),
2922 IndexPolicy::ScanOptimized => (true, MemTableType::Standard),
2923 };
2924
2925 // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2926 Self::open_with_full_config_internal(
2927 path,
2928 enable_ordered_index,
2929 memtable_type,
2930 false,
2931 encryption,
2932 )
2933 }
2934
2935 /// Get the memtable type being used
2936 pub fn memtable_type(&self) -> MemTableType {
2937 self.memtable.kind()
2938 }
2939
2940 /// Check if recovery is needed (dirty shutdown detection)
2941 ///
2942 /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2943 /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2944 /// but committed data still needs to be loaded from WAL into the memtable.
2945 fn check_recovery_needed(&self) -> Result<bool> {
2946 let marker_path = self.path.join(".clean_shutdown");
2947 if marker_path.exists() {
2948 // Clean shutdown - remove marker
2949 std::fs::remove_file(&marker_path)?;
2950 }
2951 // ALWAYS need recovery to rebuild memtable from WAL
2952 // The memtable is in-memory only and needs to be restored on every startup
2953 Ok(true)
2954 }
2955
2956 /// Perform crash recovery
2957 pub fn recover(&self) -> Result<RecoveryStats> {
2958 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2959 return Ok(RecoveryStats::default());
2960 }
2961
2962 let (writes, txn_count) = self.wal.replay_for_recovery()?;
2963
2964 // Apply committed writes to memtable
2965 let recovery_txn_id = self.wal.alloc_txn_id();
2966 let commit_ts = self.mvcc.alloc_commit_ts();
2967
2968 // Collect keys being written for efficient commit
2969 let mut write_set: HashSet<InlineKey> = HashSet::new();
2970 for (key, value) in &writes {
2971 write_set.insert(SmallVec::from_slice(key));
2972 self.memtable
2973 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2974 }
2975 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2976
2977 self.needs_recovery.store(0, Ordering::SeqCst);
2978
2979 Ok(RecoveryStats {
2980 transactions_recovered: txn_count,
2981 writes_recovered: writes.len(),
2982 commit_ts,
2983 })
2984 }
2985
2986 /// Point-in-Time Recovery: rebuild the in-memory state as of `target`.
2987 ///
2988 /// This is the PITR analogue of [`Self::recover`] — call it on a FRESH open
2989 /// INSTEAD of `recover()` (not in addition to it), to materialize the
2990 /// database as it existed at a chosen LSN or commit timestamp. Because PITR
2991 /// mode never truncates the WAL, the full history is retained and replayed up
2992 /// to (and stopping at) the target, with transaction atomicity preserved
2993 /// (a transaction is applied only if its commit is within the target).
2994 ///
2995 /// Requires PITR to be enabled (the WAL must be fully retained); returns an
2996 /// error otherwise, since a truncated WAL cannot honor an arbitrary target.
2997 pub fn recover_to(&self, target: crate::txn_wal::RecoveryTarget) -> Result<RecoveryStats> {
2998 if !self.pitr_enabled.load(Ordering::SeqCst) {
2999 return Err(SochDBError::InvalidArgument(
3000 "recover_to requires Point-in-Time Recovery to be enabled \
3001 (the full WAL must be retained); call enable_point_in_time_recovery first"
3002 .to_string(),
3003 ));
3004 }
3005
3006 // Single-shot recovery on a fresh open: refuse if state was already
3007 // rebuilt (by recover() or a prior recover_to()). Re-applying would layer
3008 // a stale set over the point-in-time state under a newer commit_ts and
3009 // silently corrupt it (recover()/recover_to() both clear needs_recovery).
3010 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
3011 return Err(SochDBError::InvalidArgument(
3012 "recover_to must be the sole recovery on a fresh open, but state \
3013 was already recovered; reopen the database and call recover_to first"
3014 .to_string(),
3015 ));
3016 }
3017
3018 // Make the on-disk WAL match current_lsn() before replaying. Under the
3019 // default NORMAL sync mode the commit record(s) may still sit in the
3020 // BufWriter, while current_lsn() counts the in-memory sequence — so
3021 // without this flush+fsync a captured-LSN cut would silently drop
3022 // committed-but-unflushed records (replay reads a fresh on-disk handle).
3023 self.wal.flush()?;
3024 self.wal.sync()?;
3025
3026 let (writes, txn_count) = self.wal.replay_to_target(target)?;
3027
3028 // Apply the bounded set of committed writes to the (fresh) memtable,
3029 // mirroring recover().
3030 let recovery_txn_id = self.wal.alloc_txn_id();
3031 let commit_ts = self.mvcc.alloc_commit_ts();
3032 let mut write_set: HashSet<InlineKey> = HashSet::new();
3033 for (key, value) in &writes {
3034 write_set.insert(SmallVec::from_slice(key));
3035 self.memtable
3036 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
3037 }
3038 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
3039
3040 self.needs_recovery.store(0, Ordering::SeqCst);
3041
3042 Ok(RecoveryStats {
3043 transactions_recovered: txn_count,
3044 writes_recovered: writes.len(),
3045 commit_ts,
3046 })
3047 }
3048
3049 /// Begin a new transaction
3050 pub fn begin_transaction(&self) -> Result<u64> {
3051 let txn_id = self.wal.begin_transaction()?;
3052 self.mvcc.begin(txn_id);
3053 Ok(txn_id)
3054 }
3055
3056 /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
3057 ///
3058 /// This enables mode-aware optimizations:
3059 /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
3060 /// - WriteOnly: Skip read tracking, faster bulk inserts
3061 /// - ReadWrite: Full SSI for serializable isolation
3062 pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
3063 let txn_id = self.wal.begin_transaction()?;
3064 self.mvcc.begin_with_mode(txn_id, mode);
3065 Ok(txn_id)
3066 }
3067
3068 /// Begin a read-only transaction without any WAL records.
3069 ///
3070 /// This is a performance-critical optimization that eliminates two WAL
3071 /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
3072 /// transactions have no state to recover, WAL records are unnecessary.
3073 ///
3074 /// Callers MUST use `abort_read_only_fast()` to clean up.
3075 #[inline]
3076 pub fn begin_read_only_fast(&self) -> u64 {
3077 let txn_id = self.wal.alloc_txn_id();
3078 self.mvcc.begin_read_only(txn_id);
3079 txn_id
3080 }
3081
3082 /// Abort a fast read-only transaction.
3083 ///
3084 /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
3085 #[inline]
3086 pub fn abort_read_only_fast(&self, txn_id: u64) {
3087 self.mvcc.abort(txn_id);
3088 }
3089
3090 /// Read a key WITHOUT any MVCC transaction tracking.
3091 ///
3092 /// Uses the current global timestamp to see all committed writes.
3093 /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
3094 /// Only safe for single-threaded access (no concurrent writes).
3095 #[inline]
3096 pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
3097 let snapshot_ts = self
3098 .mvcc
3099 .ts_counter
3100 .load(std::sync::atomic::Ordering::Relaxed);
3101 self.memtable.read(key, snapshot_ts, None)
3102 }
3103
3104 /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
3105 ///
3106 /// Uses the current global timestamp. Only safe for single-threaded access.
3107 #[inline]
3108 pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
3109 let snapshot_ts = self
3110 .mvcc
3111 .ts_counter
3112 .load(std::sync::atomic::Ordering::Relaxed);
3113 self.memtable.scan_prefix(prefix, snapshot_ts, None)
3114 }
3115
3116 /// Read a key within a transaction
3117 #[inline]
3118 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
3119 // Fast path: get just snapshot_ts without cloning whole transaction
3120 let snapshot_ts = self
3121 .mvcc
3122 .get_snapshot_ts(txn_id)
3123 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3124
3125 // Record read for SSI (skipped for read-only transactions)
3126 self.mvcc.record_read(txn_id, key);
3127
3128 // Read at snapshot timestamp, seeing own uncommitted writes
3129 Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
3130 }
3131
3132 /// Write a key-value pair within a transaction
3133 ///
3134 /// Writes are buffered and only flushed to disk on commit.
3135 /// This provides ~10× better throughput for batched inserts.
3136 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
3137 // Use the zero-allocation path internally
3138 self.write_refs(txn_id, &key, &value)?;
3139
3140 Ok(())
3141 }
3142
3143 /// Write from references - zero allocation hot path
3144 ///
3145 /// Avoids cloning key/value by writing to WAL from refs directly,
3146 /// then only allocating once for memtable storage.
3147 #[inline]
3148 pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
3149 // Record write for MVCC (uses InlineKey - zero allocation for small keys)
3150 self.mvcc.record_write(txn_id, key);
3151
3152 // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
3153 // This reduces lock contention from O(writes) to O(1) per transaction
3154 self.txn_write_buffers
3155 .entry(txn_id)
3156 .or_insert_with(|| TxnWalBuffer::new(txn_id))
3157 .append(key, value);
3158
3159 // Write to memtable (needs owned key/value for storage)
3160 self.memtable
3161 .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
3162
3163 Ok(())
3164 }
3165
3166 /// Delete a key within a transaction
3167 pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
3168 // Record write (uses InlineKey - zero allocation for small keys)
3169 self.mvcc.record_write(txn_id, &key);
3170
3171 // Buffer tombstone in memory - NO WAL lock taken!
3172 self.txn_write_buffers
3173 .entry(txn_id)
3174 .or_insert_with(|| TxnWalBuffer::new(txn_id))
3175 .append(&key, &[]); // Empty value = tombstone
3176
3177 // Write tombstone to memtable
3178 self.memtable.write(key, None, txn_id)?;
3179
3180 Ok(())
3181 }
3182
3183 /// Batch write multiple key-value pairs with reduced overhead
3184 ///
3185 /// This API amortizes fixed costs over the batch:
3186 /// - Single DashMap entry lookup for TxnWalBuffer
3187 /// - Single MVCC write set update
3188 /// - Batch memtable operations
3189 ///
3190 /// Performance: ~2-3x faster than individual write_refs calls
3191 /// for batches of 100+ entries.
3192 ///
3193 /// # Arguments
3194 /// * `txn_id` - Transaction ID
3195 /// * `writes` - Slice of (key, value) pairs
3196 #[inline]
3197 pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
3198 if writes.is_empty() {
3199 return Ok(());
3200 }
3201
3202 // Single DashMap access for entire batch
3203 let mut buffer_entry = self
3204 .txn_write_buffers
3205 .entry(txn_id)
3206 .or_insert_with(|| TxnWalBuffer::new(txn_id));
3207
3208 // Batch operations with reduced per-row overhead
3209 for (key, value) in writes {
3210 // Record write for MVCC
3211 self.mvcc.record_write(txn_id, key);
3212
3213 // Append to WAL buffer
3214 buffer_entry.append(key, value);
3215 }
3216 drop(buffer_entry);
3217
3218 // Batch write to memtable
3219 let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
3220 .iter()
3221 .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
3222 .collect();
3223 self.memtable.write_batch(&owned_writes, txn_id)?;
3224
3225 Ok(())
3226 }
3227
3228 /// Commit a transaction
3229 ///
3230 /// With sync_mode:
3231 /// - 0 (OFF): No sync, risk of data loss
3232 /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
3233 /// - 2 (FULL): Sync every commit (safest, slowest)
3234 pub fn commit(&self, txn_id: u64) -> Result<u64> {
3235 // First, flush all buffered DATA writes to the WAL with a SINGLE lock
3236 // acquisition (O(1) lock instead of O(writes) locks). Flushing data
3237 // records before validation is safe: ARIES recovery only treats them as
3238 // winners if a *durable commit record* for this transaction also exists,
3239 // and that record is written below — strictly after validation succeeds.
3240 if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
3241 && !buffer.is_empty()
3242 {
3243 // Flush entire buffer to WAL with one lock
3244 self.wal.flush_buffer(&buffer)?;
3245 }
3246
3247 // ====================================================================
3248 // Task 1 — Linearizable commit invariant:
3249 //
3250 // A commit record must NEVER become durable unless the transaction
3251 // has already passed SSI validation and its write set is final.
3252 //
3253 // We therefore VALIDATE (and freeze the write set) *before* making the
3254 // commit record durable. `mvcc.commit` performs SSI validation and
3255 // returns `None` on a dangerous structure (serialization conflict) or
3256 // if the transaction no longer exists. Writing/fsyncing the commit
3257 // record before this point would let a crash resurrect a transaction
3258 // that the live system rejected — the classic "committed-after-crash,
3259 // aborted-before-crash" non-linearizability bug.
3260 // ====================================================================
3261 let (commit_ts, write_set) = self.mvcc.commit(txn_id).ok_or_else(|| {
3262 SochDBError::Validation(
3263 "transaction aborted: SSI validation failed (serialization conflict) \
3264 or transaction not found"
3265 .into(),
3266 )
3267 })?;
3268
3269 // Validation passed and the write set is frozen. Only now may the
3270 // commit record become durable.
3271 if let Some(gc) = &self.group_commit {
3272 // Submit the *validated* commit intent to group commit and wait.
3273 // This batches multiple validated commits into a single fsync.
3274 gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
3275 } else {
3276 // Direct commit path with adaptive sync (Little's Law)
3277 let sync_mode = self.sync_mode.load(Ordering::Relaxed);
3278 let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
3279
3280 // Update arrival rate for adaptive batching
3281 self.update_arrival_rate();
3282
3283 // Write commit record (no flush yet - BufWriter will buffer it)
3284 let entry = TxnWalEntry::txn_commit(txn_id);
3285 self.wal.append_no_flush(&entry)?;
3286
3287 // Determine if we should sync/flush based on mode
3288 let should_sync = match sync_mode {
3289 0 => false, // OFF: never sync
3290 1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
3291 _ => true, // FULL: always sync
3292 };
3293
3294 if should_sync {
3295 // Measure fsync latency for adaptive tuning
3296 let start = std::time::Instant::now();
3297 self.wal.flush()?;
3298 self.wal.sync()?;
3299 let latency_us = start.elapsed().as_micros() as u64;
3300
3301 // Update fsync latency estimate (EMA with α = 0.1)
3302 let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
3303 let new_latency = (old_latency * 9 + latency_us) / 10;
3304 self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
3305
3306 self.commits_since_sync.store(0, Ordering::Relaxed);
3307 }
3308 }
3309
3310 // Commit record is durable (or buffered per sync mode) for a validated
3311 // transaction — publish the writes to the memtable. (O(k), k = keys.)
3312 self.memtable.commit(txn_id, commit_ts, &write_set);
3313
3314 Ok(commit_ts)
3315 }
3316
3317 /// Update arrival rate using exponential moving average
3318 #[inline]
3319 fn update_arrival_rate(&self) {
3320 let now_us = std::time::SystemTime::now()
3321 .duration_since(std::time::UNIX_EPOCH)
3322 .unwrap()
3323 .as_micros() as u64;
3324
3325 let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
3326
3327 if last > 0 {
3328 let delta_us = now_us.saturating_sub(last);
3329 if delta_us > 0 && delta_us < 10_000_000 {
3330 // Ignore gaps > 10s
3331 // Rate = 1_000_000 / delta_us (requests/sec)
3332 // Stored as rate × 1000 for precision
3333 let instant_rate = 1_000_000_000 / delta_us;
3334
3335 // EMA with α = 0.1
3336 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
3337 let new_rate = (old_rate * 9 + instant_rate) / 10;
3338 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
3339 }
3340 }
3341 }
3342
3343 /// Compute optimal batch threshold using Little's Law
3344 ///
3345 /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
3346 /// Returns the number of commits to batch before fsync
3347 #[inline]
3348 fn adaptive_batch_threshold(&self) -> u64 {
3349 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
3350 let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
3351
3352 if lambda <= 0.0 || tau <= 0.0 {
3353 return 100; // Fallback to fixed threshold
3354 }
3355
3356 // Little's Law: W* = sqrt(2 × τ × λ)
3357 // This minimizes total time = wait_time + fsync_overhead
3358 let n_opt = (2.0 * tau * lambda).sqrt();
3359
3360 // Clamp between 1 and 1000
3361 (n_opt as u64).clamp(1, 1000)
3362 }
3363
3364 /// Set synchronous mode
3365 ///
3366 /// - 0: OFF - No fsync (risk of data loss)
3367 /// - 1: NORMAL - Periodic fsync (balanced)
3368 /// - 2: FULL - Fsync every commit (safest)
3369 pub fn set_sync_mode(&self, mode: u64) {
3370 self.sync_mode.store(mode.min(2), Ordering::Relaxed);
3371 }
3372
3373 /// Force a group commit flush (useful for benchmarking or testing)
3374 pub fn flush_group_commit(&self) {
3375 if let Some(gc) = &self.group_commit {
3376 gc.flush_batch();
3377 }
3378 }
3379
3380 /// Abort a transaction
3381 ///
3382 /// Performance: O(1) for read-only transactions (no writes to clean up).
3383 /// For write transactions, O(N) memtable scan is required to remove
3384 /// uncommitted versions.
3385 pub fn abort(&self, txn_id: u64) -> Result<()> {
3386 // Check if transaction had any buffered writes.
3387 // Read-only transactions never populate txn_write_buffers,
3388 // so this returns None — allowing us to skip the O(N) memtable scan.
3389 let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3390
3391 if had_writes {
3392 // Write abort record to WAL (only needed if data was written)
3393 self.wal.abort_transaction(txn_id)?;
3394 // Clean up uncommitted memtable entries
3395 self.memtable.abort(txn_id);
3396 }
3397
3398 // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3399 self.mvcc.abort(txn_id);
3400
3401 Ok(())
3402 }
3403
3404 /// Scan keys with prefix
3405 #[inline]
3406 pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3407 // Fast path: get just snapshot_ts without cloning whole transaction
3408 let snapshot_ts = self
3409 .mvcc
3410 .get_snapshot_ts(txn_id)
3411 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3412
3413 // Note: Scan doesn't record individual key reads for SSI (too expensive)
3414 // SSI conflicts are tracked at the prefix level if needed
3415 Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3416 }
3417
3418 /// Scan keys in range
3419 #[inline]
3420 pub fn scan_range(
3421 &self,
3422 txn_id: u64,
3423 start: &[u8],
3424 end: &[u8],
3425 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3426 let snapshot_ts = self
3427 .mvcc
3428 .get_snapshot_ts(txn_id)
3429 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3430
3431 Ok(self
3432 .memtable
3433 .scan_range(start, end, snapshot_ts, Some(txn_id)))
3434 }
3435
3436 /// Streaming scan for very large result sets
3437 ///
3438 /// Returns an iterator that yields (key, value) pairs without
3439 /// materializing the entire result set in memory.
3440 #[inline]
3441 pub fn scan_range_iter<'a>(
3442 &'a self,
3443 txn_id: u64,
3444 start: &'a [u8],
3445 end: &'a [u8],
3446 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3447 let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3448 self.memtable
3449 .scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3450 }
3451
3452 /// Force fsync to disk
3453 /// Flush the WAL's in-memory buffer to the OS
3454 ///
3455 /// This ensures all buffered writes are pushed from the BufWriter
3456 /// into the OS page cache. Call this before `fsync()` to ensure
3457 /// all data is durable.
3458 pub fn flush_wal(&self) -> Result<()> {
3459 self.wal.flush()
3460 }
3461
3462 /// Force sync the WAL to disk (fsync)
3463 pub fn fsync(&self) -> Result<()> {
3464 self.wal.sync()
3465 }
3466
3467 /// Write checkpoint
3468 pub fn checkpoint(&self) -> Result<u64> {
3469 let txn_id = 0; // System transaction
3470 let entry = TxnWalEntry::checkpoint(txn_id);
3471 let lsn = self.wal.append_sync(&entry)?;
3472 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3473 // PITR: persist the checkpoint LSN to the durable manifest so it survives
3474 // restart. This piggybacks the fsync that append_sync already performed;
3475 // the manifest write is itself crash-safe (temp + fsync + atomic rename).
3476 // No-op (and no manifest write) when PITR is not enabled.
3477 if self.pitr_enabled.load(Ordering::SeqCst) {
3478 self.persist_pitr_manifest(lsn)?;
3479 }
3480 Ok(lsn)
3481 }
3482
3483 /// The current durable monotonic LSN (the WAL record ordinal). In PITR mode
3484 /// the WAL is never truncated, so this is stable and monotonic across
3485 /// restarts (`recover_state` rebuilds it by re-counting on reopen).
3486 pub fn current_lsn(&self) -> u64 {
3487 self.wal.sequence()
3488 }
3489
3490 /// Whether Point-in-Time Recovery is enabled for this database.
3491 pub fn is_pitr_enabled(&self) -> bool {
3492 self.pitr_enabled.load(Ordering::SeqCst)
3493 }
3494
3495 /// Enable Point-in-Time Recovery for this database (one-way, explicit opt-in).
3496 ///
3497 /// Writes the durable `wal.manifest` whose presence marks the DB PITR-enabled
3498 /// on every future open. Once enabled, the destructive [`Self::truncate_wal`]
3499 /// is forbidden so the WAL record ordinal stays a stable durable LSN (segment
3500 /// sealing — the PITR-safe space-reclaim — lands in a later phase). Idempotent.
3501 pub fn enable_point_in_time_recovery(&self) -> Result<()> {
3502 if self.pitr_enabled.load(Ordering::SeqCst) {
3503 return Ok(()); // already enabled
3504 }
3505 // Persist the durable manifest FIRST; only flip the in-memory flag after
3506 // the durable write succeeds. Otherwise a failed manifest write would
3507 // leave `durability_capabilities()` reporting PITR (and the truncate
3508 // guard active) with no durable anchor on disk.
3509 let lsn = self.last_checkpoint_lsn.load(Ordering::SeqCst);
3510 if crate::wal_manifest::WalManifest::exists(&self.path) {
3511 self.persist_pitr_manifest(lsn)?;
3512 } else {
3513 crate::wal_manifest::WalManifest::new(lsn).write_atomic(&self.path)?;
3514 }
3515 self.pitr_enabled.store(true, Ordering::SeqCst);
3516 Ok(())
3517 }
3518
3519 /// Persist the PITR manifest with the given checkpoint LSN, preserving the
3520 /// existing db identity if a manifest is already present.
3521 fn persist_pitr_manifest(&self, lsn: u64) -> Result<()> {
3522 let manifest = match crate::wal_manifest::WalManifest::load(&self.path) {
3523 Ok(mut m) => {
3524 m.last_checkpoint_lsn = lsn;
3525 m
3526 }
3527 Err(_) => crate::wal_manifest::WalManifest::new(lsn),
3528 };
3529 manifest.write_atomic(&self.path)
3530 }
3531
3532 /// Truncate the WAL file after checkpoint.
3533 ///
3534 /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3535 /// space. The in-memory memtable retains all data for the current
3536 /// session, but a crash after truncation will result in data loss
3537 /// since the WAL is the only persistence mechanism for DurableStorage.
3538 ///
3539 /// Call after `checkpoint()` when WAL durability across restarts is
3540 /// not required (e.g. desktop telemetry viewers, caches).
3541 ///
3542 /// Refused when PITR is enabled: truncation resets the WAL record ordinal,
3543 /// which would break the durable monotonic LSN that PITR anchors on. The
3544 /// PITR-safe way to reclaim space is segment sealing (a later phase).
3545 pub fn truncate_wal(&self) -> Result<()> {
3546 if self.pitr_enabled.load(Ordering::SeqCst) {
3547 return Err(SochDBError::InvalidArgument(
3548 "truncate_wal is disabled while Point-in-Time Recovery is enabled \
3549 (it would reset the durable LSN); use segment sealing to reclaim space"
3550 .to_string(),
3551 ));
3552 }
3553 self.wal.truncate()
3554 }
3555
3556 /// Get storage statistics
3557 pub fn stats(&self) -> StorageStats {
3558 // Get WAL size from the WAL manager
3559 let wal_size = self.wal.size_bytes();
3560
3561 // Get active transaction count from MVCC
3562 let active_txns = self.mvcc.active_transaction_count();
3563
3564 StorageStats {
3565 memtable_size_bytes: self.memtable.size(),
3566 wal_size_bytes: wal_size,
3567 active_transactions: active_txns,
3568 min_active_snapshot: self.mvcc.min_active_snapshot(),
3569 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3570 }
3571 }
3572
3573 /// Garbage collect old versions
3574 pub fn gc(&self) -> usize {
3575 let min_ts = self.mvcc.min_active_snapshot();
3576 self.memtable.gc(min_ts)
3577 }
3578
3579 /// Clean shutdown
3580 pub fn shutdown(&self) -> Result<()> {
3581 // Sync WAL
3582 self.fsync()?;
3583
3584 // Write clean shutdown marker
3585 let marker_path = self.path.join(".clean_shutdown");
3586 std::fs::write(&marker_path, b"clean")?;
3587
3588 Ok(())
3589 }
3590}
3591
3592impl Drop for DurableStorage {
3593 fn drop(&mut self) {
3594 let _ = self.shutdown();
3595 }
3596}
3597
3598// =============================================================================
3599// EphemeralHandle - Temp-directory-backed DurableStorage for testing
3600// =============================================================================
3601
3602/// Owns a `DurableStorage` instance backed by a temporary directory.
3603///
3604/// The temp directory is automatically cleaned up when this handle is dropped.
3605/// Access the underlying storage via `Deref` coercion or `.storage()`.
3606///
3607/// # Why this exists
3608///
3609/// SochDB previously had two storage engines — `LscsStorage` (BTreeMap-backed,
3610/// in-memory WAL, used by tests) and `DurableStorage` (SkipMap-backed, real WAL,
3611/// used in production). This dual-engine architecture meant bugs could surface
3612/// only in production because the test path exercised different code.
3613///
3614/// `EphemeralHandle` eliminates this divergence: tests use the exact same
3615/// `DurableStorage` engine as production, just backed by a temp directory.
3616pub struct EphemeralHandle {
3617 storage: DurableStorage,
3618 _tmpdir: tempfile::TempDir,
3619}
3620
3621impl EphemeralHandle {
3622 /// Get a reference to the underlying storage
3623 pub fn storage(&self) -> &DurableStorage {
3624 &self.storage
3625 }
3626
3627 /// Get a mutable reference to the underlying storage
3628 pub fn storage_mut(&mut self) -> &mut DurableStorage {
3629 &mut self.storage
3630 }
3631
3632 /// Consume the handle and return the storage and temp directory.
3633 ///
3634 /// Useful when you need an `Arc<DurableStorage>` — the caller must keep
3635 /// the `TempDir` alive for the lifetime of the storage.
3636 pub fn into_parts(self) -> (DurableStorage, tempfile::TempDir) {
3637 (self.storage, self._tmpdir)
3638 }
3639}
3640
3641impl std::ops::Deref for EphemeralHandle {
3642 type Target = DurableStorage;
3643 fn deref(&self) -> &DurableStorage {
3644 &self.storage
3645 }
3646}
3647
3648impl std::ops::DerefMut for EphemeralHandle {
3649 fn deref_mut(&mut self) -> &mut DurableStorage {
3650 &mut self.storage
3651 }
3652}
3653
3654/// Recovery statistics
3655#[derive(Debug, Default)]
3656pub struct RecoveryStats {
3657 pub transactions_recovered: usize,
3658 pub writes_recovered: usize,
3659 pub commit_ts: u64,
3660}
3661
3662/// Storage statistics
3663#[derive(Debug, Default)]
3664pub struct StorageStats {
3665 pub memtable_size_bytes: u64,
3666 pub wal_size_bytes: u64,
3667 pub active_transactions: usize,
3668 pub min_active_snapshot: u64,
3669 pub last_checkpoint_lsn: u64,
3670}
3671
3672#[cfg(test)]
3673mod tests {
3674 use super::*;
3675 use tempfile::tempdir;
3676
3677 #[test]
3678 fn test_basic_transaction() {
3679 let dir = tempdir().unwrap();
3680 let storage = DurableStorage::open(dir.path()).unwrap();
3681
3682 // Begin transaction
3683 let txn_id = storage.begin_transaction().unwrap();
3684
3685 // Write data
3686 storage
3687 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3688 .unwrap();
3689 storage
3690 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3691 .unwrap();
3692
3693 // Read back (within same transaction)
3694 let v1 = storage.read(txn_id, b"key1").unwrap();
3695 assert_eq!(v1, Some(b"value1".to_vec()));
3696
3697 // Commit
3698 let commit_ts = storage.commit(txn_id).unwrap();
3699 assert!(commit_ts > 0);
3700
3701 // Read in new transaction
3702 let txn2 = storage.begin_transaction().unwrap();
3703 let v1 = storage.read(txn2, b"key1").unwrap();
3704 assert_eq!(v1, Some(b"value1".to_vec()));
3705 storage.abort(txn2).unwrap();
3706 }
3707
3708 #[test]
3709 fn test_snapshot_isolation() {
3710 let dir = tempdir().unwrap();
3711 let storage = DurableStorage::open(dir.path()).unwrap();
3712
3713 // T1: Write initial value
3714 let t1 = storage.begin_transaction().unwrap();
3715 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3716 storage.commit(t1).unwrap();
3717
3718 // T2: Start reading (snapshot at this point)
3719 let t2 = storage.begin_transaction().unwrap();
3720
3721 // T3: Update the value
3722 let t3 = storage.begin_transaction().unwrap();
3723 storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3724 storage.commit(t3).unwrap();
3725
3726 // T2 should still see v1 (snapshot isolation)
3727 let v = storage.read(t2, b"key").unwrap();
3728 assert_eq!(v, Some(b"v1".to_vec()));
3729
3730 // New transaction should see v2
3731 let t4 = storage.begin_transaction().unwrap();
3732 let v = storage.read(t4, b"key").unwrap();
3733 assert_eq!(v, Some(b"v2".to_vec()));
3734
3735 storage.abort(t2).unwrap();
3736 storage.abort(t4).unwrap();
3737 }
3738
3739 #[test]
3740 fn test_gc_preserves_versions_for_active_snapshot() {
3741 // GC must not free a version that an in-flight reader's snapshot can
3742 // still observe. The low-water-mark (min_active_snapshot) is the oldest
3743 // live reader; any version visible to it must survive GC.
3744 let dir = tempdir().unwrap();
3745 let storage = DurableStorage::open(dir.path()).unwrap();
3746
3747 // Seed an initial committed version v1.
3748 let t1 = storage.begin_transaction().unwrap();
3749 storage.write(t1, b"k".to_vec(), b"v1".to_vec()).unwrap();
3750 storage.commit(t1).unwrap();
3751
3752 // Open a long-lived snapshot reader BEFORE any newer writes. Its
3753 // snapshot_ts pins the GC watermark so v1 cannot be reclaimed.
3754 let reader = storage.begin_transaction().unwrap();
3755 assert_eq!(
3756 storage.read(reader, b"k").unwrap(),
3757 Some(b"v1".to_vec()),
3758 "reader's snapshot must see v1"
3759 );
3760
3761 // Produce several newer committed versions while the reader is active.
3762 for v in ["v2", "v3", "v4"] {
3763 let w = storage.begin_transaction().unwrap();
3764 storage
3765 .write(w, b"k".to_vec(), v.as_bytes().to_vec())
3766 .unwrap();
3767 storage.commit(w).unwrap();
3768 }
3769
3770 // Run GC. Because `reader` is still active, the watermark is pinned at
3771 // its snapshot and the version it needs (v1) must NOT be freed.
3772 let _freed = storage.gc();
3773 assert_eq!(
3774 storage.read(reader, b"k").unwrap(),
3775 Some(b"v1".to_vec()),
3776 "GC must not free a version still visible to an active snapshot"
3777 );
3778
3779 // A fresh transaction sees the latest committed version.
3780 let fresh = storage.begin_transaction().unwrap();
3781 assert_eq!(storage.read(fresh, b"k").unwrap(), Some(b"v4".to_vec()));
3782 storage.abort(fresh).unwrap();
3783
3784 // Release the old reader; the watermark can now advance.
3785 storage.abort(reader).unwrap();
3786
3787 // After the old snapshot is gone, GC may reclaim superseded versions,
3788 // and new readers still resolve the latest value correctly.
3789 let _freed2 = storage.gc();
3790 let after = storage.begin_transaction().unwrap();
3791 assert_eq!(storage.read(after, b"k").unwrap(), Some(b"v4".to_vec()));
3792 storage.abort(after).unwrap();
3793 }
3794
3795 #[test]
3796 fn test_ssi_detects_write_skew() {
3797 // Classic write-skew: two concurrent transactions each read what the
3798 // other writes (disjoint write keys). Under plain SI both would commit,
3799 // violating serializability. Under SSI the pivot transaction (with both
3800 // an inbound and an outbound rw-edge) must be aborted, so the two
3801 // commits cannot both succeed.
3802 let dir = tempdir().unwrap();
3803 let storage = DurableStorage::open(dir.path()).unwrap();
3804
3805 // Seed two keys.
3806 let seed = storage.begin_transaction().unwrap();
3807 storage.write(seed, b"x".to_vec(), b"0".to_vec()).unwrap();
3808 storage.write(seed, b"y".to_vec(), b"0".to_vec()).unwrap();
3809 storage.commit(seed).unwrap();
3810
3811 // T1 reads x and y, then writes x.
3812 let t1 = storage.begin_transaction().unwrap();
3813 let _ = storage.read(t1, b"x").unwrap();
3814 let _ = storage.read(t1, b"y").unwrap();
3815
3816 // T2 reads x and y, then writes y. (Concurrent with T1.)
3817 let t2 = storage.begin_transaction().unwrap();
3818 let _ = storage.read(t2, b"x").unwrap();
3819 let _ = storage.read(t2, b"y").unwrap();
3820
3821 storage.write(t1, b"x".to_vec(), b"1".to_vec()).unwrap();
3822 storage.write(t2, b"y".to_vec(), b"1".to_vec()).unwrap();
3823
3824 // First committer wins.
3825 let c1 = storage.commit(t1);
3826 assert!(c1.is_ok(), "first committer should succeed: {c1:?}");
3827
3828 // The second committer forms a dangerous rw-structure with T1 and must
3829 // be rejected to preserve serializability.
3830 let c2 = storage.commit(t2);
3831 assert!(
3832 c2.is_err(),
3833 "SSI must abort the write-skew pivot, but commit succeeded"
3834 );
3835 }
3836
3837 #[test]
3838 fn test_abort_transaction() {
3839 let dir = tempdir().unwrap();
3840 let storage = DurableStorage::open(dir.path()).unwrap();
3841
3842 // Write initial value
3843 let t1 = storage.begin_transaction().unwrap();
3844 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3845 storage.commit(t1).unwrap();
3846
3847 // Start transaction that will abort
3848 let t2 = storage.begin_transaction().unwrap();
3849 storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3850 storage.abort(t2).unwrap();
3851
3852 // New transaction should see v1 (aborted changes not visible)
3853 let t3 = storage.begin_transaction().unwrap();
3854 let v = storage.read(t3, b"key").unwrap();
3855 assert_eq!(v, Some(b"v1".to_vec()));
3856 storage.abort(t3).unwrap();
3857 }
3858
3859 #[test]
3860 fn test_crash_recovery() {
3861 let dir = tempdir().unwrap();
3862
3863 // Phase 1: Write data and commit
3864 {
3865 // Use open_without_lock for crash simulation tests
3866 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3867
3868 // Set sync mode to FULL to ensure data is synced before "crash"
3869 storage.set_sync_mode(2); // FULL: sync every commit
3870
3871 let txn = storage.begin_transaction().unwrap();
3872 storage
3873 .write(txn, b"persist".to_vec(), b"data".to_vec())
3874 .unwrap();
3875 storage.commit(txn).unwrap();
3876
3877 // Simulate crash (no clean shutdown)
3878 std::mem::forget(storage);
3879 }
3880
3881 // Phase 2: Reopen and recover
3882 {
3883 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3884 let stats = storage.recover().unwrap();
3885 assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3886
3887 // Data should be recovered
3888 let txn = storage.begin_transaction().unwrap();
3889 let v = storage.read(txn, b"persist").unwrap();
3890 assert_eq!(v, Some(b"data".to_vec()));
3891 storage.abort(txn).unwrap();
3892 }
3893 }
3894
3895 /// At-rest encryption end-to-end through the live DurableStorage engine
3896 /// (Task 3B): an encrypted DB round-trips committed data, never leaks
3897 /// plaintext to the WAL file, flips the per-instance durability matrix, and
3898 /// fails CLOSED on a wrong or missing key (never a silent plaintext/empty
3899 /// open).
3900 #[test]
3901 fn test_at_rest_encryption_end_to_end() {
3902 let dir = tempdir().unwrap();
3903 let kek = || EncryptionKey::new([0xABu8; 32]);
3904
3905 // Phase 1: create an encrypted DB, write committed data, clean close.
3906 {
3907 let storage = DurableStorage::open_with_encryption(
3908 dir.path(),
3909 true,
3910 MemTableType::Standard,
3911 StorageEncryption::with_kek(kek(), "test"),
3912 )
3913 .unwrap();
3914 assert!(storage.is_encrypted());
3915 assert!(storage.durability_capabilities().at_rest_encryption);
3916 storage.set_sync_mode(2);
3917 let t = storage.begin_transaction().unwrap();
3918 storage
3919 .write(t, b"secret-key".to_vec(), b"secret-value".to_vec())
3920 .unwrap();
3921 storage.commit(t).unwrap();
3922 } // clean drop releases the file lock
3923
3924 // The keyring exists and the WAL bytes do not leak the plaintext record.
3925 assert!(dir.path().join("keyring.json").exists());
3926 let raw = std::fs::read(dir.path().join("wal.log")).unwrap();
3927 assert!(!contains(&raw, b"secret-value"), "value leaked to WAL");
3928 assert!(!contains(&raw, b"secret-key"), "key leaked to WAL");
3929
3930 // Phase 2: reopen with the CORRECT key, recover, read back.
3931 {
3932 let storage = DurableStorage::open_with_encryption(
3933 dir.path(),
3934 true,
3935 MemTableType::Standard,
3936 StorageEncryption::with_kek(kek(), "test"),
3937 )
3938 .unwrap();
3939 storage.recover().unwrap();
3940 let t = storage.begin_transaction().unwrap();
3941 assert_eq!(
3942 storage.read(t, b"secret-key").unwrap(),
3943 Some(b"secret-value".to_vec()),
3944 "committed encrypted data must round-trip"
3945 );
3946 storage.abort(t).unwrap();
3947 }
3948
3949 // Phase 3: WRONG key must fail closed (keyring canary), not open empty.
3950 {
3951 let wrong = DurableStorage::open_with_encryption(
3952 dir.path(),
3953 true,
3954 MemTableType::Standard,
3955 StorageEncryption::with_kek(EncryptionKey::new([0x00u8; 32]), "test"),
3956 );
3957 assert!(wrong.is_err(), "wrong KEK must fail closed");
3958 }
3959
3960 // Phase 4: opening an encrypted DB with NO key must fail closed.
3961 {
3962 let no_key = DurableStorage::open_with_encryption(
3963 dir.path(),
3964 true,
3965 MemTableType::Standard,
3966 StorageEncryption::disabled(),
3967 );
3968 assert!(
3969 no_key.is_err(),
3970 "encrypted DB opened without key must fail closed"
3971 );
3972 }
3973 }
3974
3975 /// A plaintext DB reports at_rest_encryption=false on the live matrix.
3976 #[test]
3977 fn test_plaintext_db_reports_unencrypted() {
3978 let dir = tempdir().unwrap();
3979 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3980 assert!(!storage.is_encrypted());
3981 assert!(!storage.durability_capabilities().at_rest_encryption);
3982 assert!(storage.durability_capabilities().crash_recovery);
3983 assert!(!dir.path().join("keyring.json").exists());
3984 }
3985
3986 fn contains(haystack: &[u8], needle: &[u8]) -> bool {
3987 haystack.windows(needle.len()).any(|w| w == needle)
3988 }
3989
3990 /// PITR phase 1 — durable monotonic LSN anchor.
3991 ///
3992 /// Enabling PITR writes the durable manifest; the WAL record ordinal (LSN)
3993 /// and the last-checkpoint LSN then survive a reopen, and the destructive
3994 /// truncate is refused so the anchor can never reset. A non-PITR DB is
3995 /// completely unaffected (no manifest, truncate works).
3996 #[test]
3997 fn test_pitr_durable_lsn_and_truncate_guard() {
3998 let dir = tempdir().unwrap();
3999
4000 // Default DB: not PITR, no manifest, truncate allowed.
4001 {
4002 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4003 assert!(!s.is_pitr_enabled());
4004 assert!(!s.durability_capabilities().point_in_time_recovery);
4005 assert!(s.truncate_wal().is_ok());
4006 }
4007 assert!(!dir.path().join("wal.manifest").exists());
4008
4009 // Enable PITR, write+checkpoint, capture the LSN.
4010 let lsn_before;
4011 let ckpt_lsn;
4012 {
4013 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4014 s.enable_point_in_time_recovery().unwrap();
4015 assert!(s.is_pitr_enabled());
4016
4017 let t = s.begin_transaction().unwrap();
4018 s.write(t, b"k1".to_vec(), b"v1".to_vec()).unwrap();
4019 s.commit(t).unwrap();
4020 ckpt_lsn = s.checkpoint().unwrap();
4021 lsn_before = s.current_lsn();
4022 assert!(lsn_before > 0);
4023
4024 // truncate is refused while PITR is on.
4025 assert!(
4026 s.truncate_wal().is_err(),
4027 "truncate must be refused in PITR mode"
4028 );
4029 }
4030 assert!(dir.path().join("wal.manifest").exists());
4031
4032 // Reopen: PITR auto-detected from the manifest; LSN did NOT reset.
4033 {
4034 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4035 assert!(
4036 s.is_pitr_enabled(),
4037 "PITR must be auto-detected from manifest"
4038 );
4039 s.recover().unwrap();
4040 assert_eq!(
4041 s.current_lsn(),
4042 lsn_before,
4043 "durable LSN must survive reopen (not reset to 0/record-recount drift)"
4044 );
4045 assert_eq!(
4046 s.stats().last_checkpoint_lsn,
4047 ckpt_lsn,
4048 "last_checkpoint_lsn must be restored from the manifest"
4049 );
4050 // Committed data still readable.
4051 let t = s.begin_transaction().unwrap();
4052 assert_eq!(s.read(t, b"k1").unwrap(), Some(b"v1".to_vec()));
4053 s.abort(t).unwrap();
4054 }
4055 }
4056
4057 /// PITR phase 2 — END-TO-END restore to a point in time.
4058 ///
4059 /// Enable PITR, commit two transactions, then on fresh reopens
4060 /// `recover_to(target)` materializes the exact historical state: an LSN cut
4061 /// between the two transactions sees only the first; the full LSN / a
4062 /// far-future timestamp sees both; timestamp 0 sees nothing. Transaction
4063 /// atomicity is preserved at the cut.
4064 #[test]
4065 fn test_pitr_recover_to_point_in_time() {
4066 use crate::txn_wal::RecoveryTarget;
4067
4068 let dir = tempdir().unwrap();
4069
4070 // Build history: txn1 sets k1=v1; txn2 overwrites k1=v1b and adds k2=v2.
4071 let (lsn_after_txn1, lsn_after_txn2);
4072 {
4073 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4074 s.enable_point_in_time_recovery().unwrap();
4075
4076 let t1 = s.begin_transaction().unwrap();
4077 s.write(t1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
4078 s.commit(t1).unwrap();
4079 lsn_after_txn1 = s.current_lsn();
4080
4081 let t2 = s.begin_transaction().unwrap();
4082 s.write(t2, b"k1".to_vec(), b"v1b".to_vec()).unwrap();
4083 s.write(t2, b"k2".to_vec(), b"v2".to_vec()).unwrap();
4084 s.commit(t2).unwrap();
4085 lsn_after_txn2 = s.current_lsn();
4086
4087 s.checkpoint().unwrap();
4088 }
4089 assert!(lsn_after_txn2 > lsn_after_txn1);
4090
4091 // Restore to the cut between txn1 and txn2: only txn1's effect is visible.
4092 {
4093 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4094 s.recover_to(RecoveryTarget::Lsn(lsn_after_txn1)).unwrap();
4095 let t = s.begin_transaction().unwrap();
4096 assert_eq!(
4097 s.read(t, b"k1").unwrap(),
4098 Some(b"v1".to_vec()),
4099 "txn1 value"
4100 );
4101 assert_eq!(
4102 s.read(t, b"k2").unwrap(),
4103 None,
4104 "txn2 must NOT be present at the cut"
4105 );
4106 s.abort(t).unwrap();
4107 }
4108
4109 // Restore to the full LSN: both transactions visible (txn2 wins on k1).
4110 {
4111 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4112 s.recover_to(RecoveryTarget::Lsn(lsn_after_txn2)).unwrap();
4113 let t = s.begin_transaction().unwrap();
4114 assert_eq!(s.read(t, b"k1").unwrap(), Some(b"v1b".to_vec()));
4115 assert_eq!(s.read(t, b"k2").unwrap(), Some(b"v2".to_vec()));
4116 s.abort(t).unwrap();
4117 }
4118
4119 // Timestamp bounds: MAX => everything; 0 => nothing.
4120 {
4121 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4122 s.recover_to(RecoveryTarget::Timestamp(u64::MAX)).unwrap();
4123 let t = s.begin_transaction().unwrap();
4124 assert_eq!(s.read(t, b"k1").unwrap(), Some(b"v1b".to_vec()));
4125 assert_eq!(s.read(t, b"k2").unwrap(), Some(b"v2".to_vec()));
4126 s.abort(t).unwrap();
4127 }
4128 {
4129 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4130 s.recover_to(RecoveryTarget::Timestamp(0)).unwrap();
4131 let t = s.begin_transaction().unwrap();
4132 assert_eq!(s.read(t, b"k1").unwrap(), None, "no commit is <= ts 0");
4133 s.abort(t).unwrap();
4134 }
4135
4136 // The capability matrix now reports PITR live for this DB.
4137 {
4138 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4139 assert!(s.durability_capabilities().point_in_time_recovery);
4140 }
4141 }
4142
4143 /// recover_to is refused on a non-PITR database (the WAL may be truncated, so
4144 /// an arbitrary target cannot be honored).
4145 #[test]
4146 fn test_recover_to_refused_without_pitr() {
4147 use crate::txn_wal::RecoveryTarget;
4148 let dir = tempdir().unwrap();
4149 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4150 assert!(s.recover_to(RecoveryTarget::Lsn(1)).is_err());
4151 assert!(!s.durability_capabilities().point_in_time_recovery);
4152 }
4153
4154 /// Regression (review HIGH): under the DEFAULT NORMAL sync mode a commit
4155 /// record may sit unflushed in the BufWriter while current_lsn() counts it.
4156 /// recover_to MUST flush+fsync before replaying, or a same-process restore to
4157 /// the captured LSN silently drops the committed-but-unflushed tail.
4158 #[test]
4159 fn test_recover_to_flushes_before_replay() {
4160 use crate::txn_wal::RecoveryTarget;
4161 let dir = tempdir().unwrap();
4162 let s = DurableStorage::open_without_lock(dir.path()).unwrap(); // NORMAL sync
4163 s.enable_point_in_time_recovery().unwrap();
4164 let t = s.begin_transaction().unwrap();
4165 s.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
4166 s.commit(t).unwrap(); // commit record likely unflushed under NORMAL
4167 let lsn = s.current_lsn();
4168 // No checkpoint, SAME process: replay reads a fresh on-disk handle.
4169 let stats = s.recover_to(RecoveryTarget::Lsn(lsn)).unwrap();
4170 assert!(
4171 stats.writes_recovered >= 1,
4172 "committed-but-unflushed data must be recovered (flush before replay)"
4173 );
4174 }
4175
4176 /// Regression (review MEDIUM): recover_to must be the SOLE recovery on a
4177 /// fresh open. After recover() (or a prior recover_to) it must refuse, rather
4178 /// than silently layer a stale set over the point-in-time state.
4179 #[test]
4180 fn test_recover_to_refuses_after_recovery() {
4181 use crate::txn_wal::RecoveryTarget;
4182 let dir = tempdir().unwrap();
4183 {
4184 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4185 s.enable_point_in_time_recovery().unwrap();
4186 let t = s.begin_transaction().unwrap();
4187 s.write(t, b"k".to_vec(), b"v".to_vec()).unwrap();
4188 s.commit(t).unwrap();
4189 s.checkpoint().unwrap();
4190 }
4191 let s = DurableStorage::open_without_lock(dir.path()).unwrap();
4192 s.recover().unwrap(); // full recovery first
4193 assert!(
4194 s.recover_to(RecoveryTarget::Lsn(1)).is_err(),
4195 "recover_to after recover() must refuse (would double-apply)"
4196 );
4197 // And a second recover_to after a first also refuses.
4198 let s2 = DurableStorage::open_without_lock(dir.path()).unwrap();
4199 s2.recover_to(RecoveryTarget::Lsn(u64::MAX)).unwrap();
4200 assert!(s2.recover_to(RecoveryTarget::Lsn(1)).is_err());
4201 }
4202
4203 /// recover_to over an ENCRYPTED WAL: the bounded replay decrypts correctly
4204 /// (review LOW: the encrypted bounded path was previously untested).
4205 #[test]
4206 fn test_pitr_recover_to_encrypted() {
4207 use crate::encryption::EncryptionKey;
4208 use crate::txn_wal::RecoveryTarget;
4209
4210 let dir = tempdir().unwrap();
4211 let kek = [0x9Fu8; 32];
4212 let mk = || StorageEncryption::with_kek(EncryptionKey::new(kek), "test");
4213
4214 let lsn_after_txn1;
4215 {
4216 let s = DurableStorage::open_with_encryption(
4217 dir.path(),
4218 true,
4219 MemTableType::Standard,
4220 mk(),
4221 )
4222 .unwrap();
4223 s.enable_point_in_time_recovery().unwrap();
4224 let t1 = s.begin_transaction().unwrap();
4225 s.write(t1, b"k1".to_vec(), b"v1".to_vec()).unwrap();
4226 s.commit(t1).unwrap();
4227 lsn_after_txn1 = s.current_lsn();
4228 let t2 = s.begin_transaction().unwrap();
4229 s.write(t2, b"k2".to_vec(), b"v2".to_vec()).unwrap();
4230 s.commit(t2).unwrap();
4231 s.checkpoint().unwrap();
4232 s.shutdown().ok();
4233 }
4234
4235 // Reopen encrypted, restore to the cut between the two txns.
4236 let s =
4237 DurableStorage::open_with_encryption(dir.path(), true, MemTableType::Standard, mk())
4238 .unwrap();
4239 s.recover_to(RecoveryTarget::Lsn(lsn_after_txn1)).unwrap();
4240 let t = s.begin_transaction().unwrap();
4241 assert_eq!(
4242 s.read(t, b"k1").unwrap(),
4243 Some(b"v1".to_vec()),
4244 "encrypted bounded replay must decrypt the in-window record"
4245 );
4246 assert_eq!(s.read(t, b"k2").unwrap(), None, "txn2 is past the cut");
4247 s.abort(t).unwrap();
4248 }
4249
4250 /// PITR composes with at-rest encryption (the manifest anchor is independent
4251 /// of the keyring; an encrypted PITR DB round-trips and stays fail-closed).
4252 #[test]
4253 fn test_pitr_with_encryption() {
4254 use crate::encryption::EncryptionKey;
4255
4256 let dir = tempdir().unwrap();
4257 let kek = [0x2Bu8; 32];
4258
4259 {
4260 let s = DurableStorage::open_with_encryption(
4261 dir.path(),
4262 true,
4263 MemTableType::Standard,
4264 StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
4265 )
4266 .unwrap();
4267 s.enable_point_in_time_recovery().unwrap();
4268 let t = s.begin_transaction().unwrap();
4269 s.write(t, b"ek".to_vec(), b"ev".to_vec()).unwrap();
4270 s.commit(t).unwrap();
4271 s.checkpoint().unwrap();
4272 s.shutdown().ok();
4273 }
4274 assert!(dir.path().join("keyring.json").exists());
4275 assert!(dir.path().join("wal.manifest").exists());
4276
4277 // Reopen encrypted + PITR.
4278 let s = DurableStorage::open_with_encryption(
4279 dir.path(),
4280 true,
4281 MemTableType::Standard,
4282 StorageEncryption::with_kek(EncryptionKey::new(kek), "test"),
4283 )
4284 .unwrap();
4285 assert!(s.is_pitr_enabled());
4286 assert!(s.is_encrypted());
4287 s.recover().unwrap();
4288 let t = s.begin_transaction().unwrap();
4289 assert_eq!(s.read(t, b"ek").unwrap(), Some(b"ev".to_vec()));
4290 s.abort(t).unwrap();
4291 }
4292
4293 /// Crash-atomicity on an ENCRYPTED database: a simulated crash (forget) on an
4294 /// encrypted WAL must, on reopen with the correct key, replay committed data
4295 /// (decrypted via the crypto-aware recovery path) while NOT resurrecting
4296 /// aborted/in-flight writes — and a wrong key after the crash fails closed.
4297 /// Combines the at-rest-encryption + crash-atomicity guarantees.
4298 #[test]
4299 fn test_encrypted_crash_recovery_atomicity() {
4300 use crate::encryption::EncryptionKey;
4301 let dir = tempdir().unwrap();
4302 let kek = || StorageEncryption::with_kek(EncryptionKey::new([0xC1u8; 32]), "test");
4303 // open encrypted WITHOUT the file lock so a forget()+reopen works in-test.
4304 let open = |enc: StorageEncryption| {
4305 DurableStorage::open_with_full_config_internal(
4306 dir.path(),
4307 true,
4308 MemTableType::Standard,
4309 false,
4310 enc,
4311 )
4312 };
4313
4314 {
4315 let storage = open(kek()).unwrap();
4316 assert!(storage.is_encrypted());
4317 storage.set_sync_mode(2); // FULL: fsync each commit before the "crash"
4318 let t1 = storage.begin_transaction().unwrap();
4319 storage
4320 .write(t1, b"committed".to_vec(), b"durable".to_vec())
4321 .unwrap();
4322 storage.commit(t1).unwrap();
4323 let t2 = storage.begin_transaction().unwrap();
4324 storage
4325 .write(t2, b"aborted".to_vec(), b"x".to_vec())
4326 .unwrap();
4327 storage.abort(t2).unwrap();
4328 let t3 = storage.begin_transaction().unwrap();
4329 storage
4330 .write(t3, b"inflight".to_vec(), b"y".to_vec())
4331 .unwrap();
4332 std::mem::forget(storage); // crash: skip clean shutdown
4333 }
4334
4335 // Reopen with the CORRECT key: committed survives, others do not.
4336 {
4337 let storage = open(kek()).unwrap();
4338 storage.recover().unwrap();
4339 let t = storage.begin_transaction().unwrap();
4340 assert_eq!(
4341 storage.read(t, b"committed").unwrap(),
4342 Some(b"durable".to_vec()),
4343 "committed encrypted write must survive the crash"
4344 );
4345 assert_eq!(storage.read(t, b"aborted").unwrap(), None);
4346 assert_eq!(storage.read(t, b"inflight").unwrap(), None);
4347 storage.abort(t).unwrap();
4348 }
4349
4350 // Wrong key after the crash must fail closed (keyring canary).
4351 assert!(
4352 open(StorageEncryption::with_kek(
4353 EncryptionKey::new([0u8; 32]),
4354 "test"
4355 ))
4356 .is_err(),
4357 "wrong key after crash must fail closed"
4358 );
4359 }
4360
4361 /// Crash-atomicity invariant (Task 4 — completes the Task 1 single-writer
4362 /// contract). `test_crash_recovery` proves committed data survives a crash;
4363 /// this proves the other half: recovery must NOT resurrect aborted or
4364 /// in-flight (never-committed) writes. Together they assert the live
4365 /// single-writer engine's commit is atomic AND durable across a crash.
4366 #[test]
4367 fn test_crash_recovery_atomicity() {
4368 let dir = tempdir().unwrap();
4369
4370 // Phase 1: one committed write, one aborted, one in-flight at crash.
4371 {
4372 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
4373 storage.set_sync_mode(2); // FULL: fsync every commit before the "crash"
4374
4375 // Committed — must survive.
4376 let t1 = storage.begin_transaction().unwrap();
4377 storage
4378 .write(t1, b"committed".to_vec(), b"durable".to_vec())
4379 .unwrap();
4380 storage.commit(t1).unwrap();
4381
4382 // Aborted — must NOT be resurrected.
4383 let t2 = storage.begin_transaction().unwrap();
4384 storage
4385 .write(t2, b"aborted".to_vec(), b"rolledback".to_vec())
4386 .unwrap();
4387 storage.abort(t2).unwrap();
4388
4389 // In-flight (never committed) at crash time — must NOT be resurrected.
4390 let t3 = storage.begin_transaction().unwrap();
4391 storage
4392 .write(t3, b"inflight".to_vec(), b"lost".to_vec())
4393 .unwrap();
4394
4395 // Simulate a crash: skip Drop / clean shutdown.
4396 std::mem::forget(storage);
4397 }
4398
4399 // Phase 2: reopen + recover; assert atomicity.
4400 {
4401 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
4402 storage.recover().unwrap();
4403
4404 let t = storage.begin_transaction().unwrap();
4405 assert_eq!(
4406 storage.read(t, b"committed").unwrap(),
4407 Some(b"durable".to_vec()),
4408 "committed write must survive the crash"
4409 );
4410 assert_eq!(
4411 storage.read(t, b"aborted").unwrap(),
4412 None,
4413 "aborted write must not be resurrected by recovery"
4414 );
4415 assert_eq!(
4416 storage.read(t, b"inflight").unwrap(),
4417 None,
4418 "uncommitted in-flight write must not be resurrected by recovery"
4419 );
4420 storage.abort(t).unwrap();
4421 }
4422 }
4423
4424 #[test]
4425 fn test_scan_prefix() {
4426 let dir = tempdir().unwrap();
4427 let storage = DurableStorage::open(dir.path()).unwrap();
4428
4429 let txn = storage.begin_transaction().unwrap();
4430 storage
4431 .write(txn, b"user:1".to_vec(), b"alice".to_vec())
4432 .unwrap();
4433 storage
4434 .write(txn, b"user:2".to_vec(), b"bob".to_vec())
4435 .unwrap();
4436 storage
4437 .write(txn, b"order:1".to_vec(), b"order1".to_vec())
4438 .unwrap();
4439 storage.commit(txn).unwrap();
4440
4441 let txn2 = storage.begin_transaction().unwrap();
4442 let users = storage.scan(txn2, b"user:").unwrap();
4443 assert_eq!(users.len(), 2);
4444 storage.abort(txn2).unwrap();
4445 }
4446
4447 #[test]
4448 fn test_delete() {
4449 let dir = tempdir().unwrap();
4450 let storage = DurableStorage::open(dir.path()).unwrap();
4451
4452 // Insert
4453 let t1 = storage.begin_transaction().unwrap();
4454 storage
4455 .write(t1, b"key".to_vec(), b"value".to_vec())
4456 .unwrap();
4457 storage.commit(t1).unwrap();
4458
4459 // Verify exists
4460 let t2 = storage.begin_transaction().unwrap();
4461 assert!(storage.read(t2, b"key").unwrap().is_some());
4462 storage.abort(t2).unwrap();
4463
4464 // Delete
4465 let t3 = storage.begin_transaction().unwrap();
4466 storage.delete(t3, b"key".to_vec()).unwrap();
4467 storage.commit(t3).unwrap();
4468
4469 // Verify deleted
4470 let t4 = storage.begin_transaction().unwrap();
4471 assert!(storage.read(t4, b"key").unwrap().is_none());
4472 storage.abort(t4).unwrap();
4473 }
4474
4475 #[test]
4476 fn test_gc() {
4477 let dir = tempdir().unwrap();
4478 let storage = DurableStorage::open(dir.path()).unwrap();
4479
4480 // Create multiple versions
4481 for i in 0..10 {
4482 let txn = storage.begin_transaction().unwrap();
4483 storage
4484 .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
4485 .unwrap();
4486 storage.commit(txn).unwrap();
4487 }
4488
4489 // GC should reclaim old versions
4490 let gc_count = storage.gc();
4491 // At least some versions should be collected
4492 // (exact count depends on implementation)
4493 let _ = gc_count; // gc_count is usize, always >= 0
4494 }
4495
4496 #[test]
4497 fn test_group_commit() {
4498 use std::sync::Arc;
4499 use std::thread;
4500
4501 let dir = tempdir().unwrap();
4502 let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
4503
4504 // Spawn multiple threads to commit concurrently
4505 let mut handles = vec![];
4506 for i in 0..4 {
4507 let storage = Arc::clone(&storage);
4508 handles.push(thread::spawn(move || {
4509 let txn = storage.begin_transaction().unwrap();
4510 storage
4511 .write(
4512 txn,
4513 format!("key{}", i).into_bytes(),
4514 format!("val{}", i).into_bytes(),
4515 )
4516 .unwrap();
4517 storage.commit(txn).unwrap()
4518 }));
4519 }
4520
4521 // Wait for all commits
4522 let mut commit_times = vec![];
4523 for h in handles {
4524 commit_times.push(h.join().unwrap());
4525 }
4526
4527 // All commits should succeed
4528 assert!(commit_times.iter().all(|&ts| ts > 0));
4529
4530 // Verify data persisted
4531 let txn = storage.begin_transaction().unwrap();
4532 for i in 0..4 {
4533 let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
4534 assert_eq!(val, Some(format!("val{}", i).into_bytes()));
4535 }
4536 storage.abort(txn).unwrap();
4537 }
4538
4539 // ==================== ArenaMvccMemTable Tests ====================
4540
4541 #[test]
4542 fn test_arena_memtable_basic_write_read() {
4543 let memtable = ArenaMvccMemTable::new();
4544
4545 // Write some values
4546 memtable
4547 .write(b"key1", Some(b"value1".to_vec()), 1)
4548 .unwrap();
4549 memtable
4550 .write(b"key2", Some(b"value2".to_vec()), 1)
4551 .unwrap();
4552
4553 // Read them back (uncommitted, so need txn_id match)
4554 assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
4555 assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
4556 assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
4557 }
4558
4559 #[test]
4560 fn test_arena_memtable_update() {
4561 let memtable = ArenaMvccMemTable::new();
4562
4563 memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
4564 memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
4565
4566 assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
4567 }
4568
4569 #[test]
4570 fn test_arena_memtable_delete() {
4571 let memtable = ArenaMvccMemTable::new();
4572
4573 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4574 memtable.write(b"key", None, 1).unwrap(); // Delete = None value
4575
4576 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4577 }
4578
4579 #[test]
4580 fn test_arena_memtable_scan_prefix() {
4581 let memtable = ArenaMvccMemTable::new();
4582
4583 memtable
4584 .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
4585 .unwrap();
4586 memtable
4587 .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
4588 .unwrap();
4589 memtable
4590 .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
4591 .unwrap();
4592 memtable
4593 .write(b"order:1", Some(b"order_data".to_vec()), 1)
4594 .unwrap();
4595
4596 // Create a write set and commit
4597 let mut write_set = HashSet::new();
4598 write_set.insert(InlineKey::from_slice(b"user:1:name"));
4599 write_set.insert(InlineKey::from_slice(b"user:1:email"));
4600 write_set.insert(InlineKey::from_slice(b"user:2:name"));
4601 write_set.insert(InlineKey::from_slice(b"order:1"));
4602 memtable.commit(1, 10, &write_set);
4603
4604 // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
4605 let results = memtable.scan_prefix(b"user:1:", 11, None);
4606 assert_eq!(results.len(), 2);
4607
4608 // Scan for all users
4609 let results = memtable.scan_prefix(b"user:", 11, None);
4610 assert_eq!(results.len(), 3);
4611 }
4612
4613 #[test]
4614 fn test_arena_memtable_write_batch() {
4615 let memtable = ArenaMvccMemTable::new();
4616
4617 let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
4618 (b"k1", Some(b"v1".to_vec())),
4619 (b"k2", Some(b"v2".to_vec())),
4620 (b"k3", Some(b"v3".to_vec())),
4621 ];
4622
4623 memtable.write_batch(&writes, 1).unwrap();
4624
4625 assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
4626 assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
4627 assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
4628 }
4629
4630 #[test]
4631 fn test_arena_memtable_gc() {
4632 let memtable = ArenaMvccMemTable::new();
4633
4634 // Write multiple versions
4635 for i in 0..10 {
4636 memtable
4637 .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
4638 .unwrap();
4639
4640 let mut write_set = HashSet::new();
4641 write_set.insert(InlineKey::from_slice(b"key"));
4642 memtable.commit(i + 1, (i + 1) * 10, &write_set);
4643 }
4644
4645 // GC old versions
4646 let gc_count = memtable.gc(90);
4647 let _ = gc_count; // gc_count is usize, always >= 0
4648 }
4649
4650 #[test]
4651 fn test_arena_memtable_size_tracking() {
4652 let memtable = ArenaMvccMemTable::new();
4653
4654 assert_eq!(memtable.size(), 0);
4655
4656 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
4657
4658 assert!(memtable.size() > 0);
4659 }
4660
4661 #[test]
4662 fn test_arena_memtable_abort() {
4663 let memtable = ArenaMvccMemTable::new();
4664
4665 memtable
4666 .write(b"key", Some(b"uncommitted".to_vec()), 1)
4667 .unwrap();
4668
4669 // Visible to same txn
4670 assert_eq!(
4671 memtable.read(b"key", 0, Some(1)),
4672 Some(b"uncommitted".to_vec())
4673 );
4674
4675 // Not visible to other txns
4676 assert_eq!(memtable.read(b"key", 0, Some(2)), None);
4677
4678 // Abort
4679 memtable.abort(1);
4680
4681 // No longer visible
4682 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
4683 }
4684
4685 // ========================================================================
4686 // MemTableKind Tests - Unified Abstraction
4687 // ========================================================================
4688
4689 #[test]
4690 fn test_memtable_kind_standard() {
4691 let memtable = MemTableKind::new(MemTableType::Standard, true);
4692 assert_eq!(memtable.kind(), MemTableType::Standard);
4693
4694 // Write and read
4695 memtable
4696 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4697 .unwrap();
4698
4699 // Commit transaction at ts=100
4700 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4701 memtable.commit(1, 100, &write_set);
4702
4703 // Read after commit - snapshot_ts must be > commit_ts for visibility
4704 let v = memtable.read(b"key1", 101, None);
4705 assert_eq!(v, Some(b"value1".to_vec()));
4706 }
4707
4708 #[test]
4709 fn test_memtable_kind_arena() {
4710 let memtable = MemTableKind::new(MemTableType::Arena, true);
4711 assert_eq!(memtable.kind(), MemTableType::Arena);
4712
4713 // Write and read
4714 memtable
4715 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
4716 .unwrap();
4717
4718 // Commit at ts=100
4719 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
4720 memtable.commit(1, 100, &write_set);
4721
4722 // Read after commit - snapshot_ts > commit_ts
4723 let v = memtable.read(b"key1", 101, None);
4724 assert_eq!(v, Some(b"value1".to_vec()));
4725 }
4726
4727 #[test]
4728 fn test_memtable_kind_scan_range() {
4729 // Test both implementations have consistent behavior
4730 for kind in [MemTableType::Standard, MemTableType::Arena] {
4731 let memtable = MemTableKind::new(kind, true);
4732
4733 // Write some data
4734 for i in 0..5 {
4735 let key = format!("key{}", i);
4736 let value = format!("value{}", i);
4737 memtable
4738 .write(key.into_bytes(), Some(value.into_bytes()), 1)
4739 .unwrap();
4740 }
4741
4742 // Commit all at ts=100
4743 let write_set: HashSet<InlineKey> = (0..5)
4744 .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
4745 .collect();
4746 memtable.commit(1, 100, &write_set);
4747
4748 // Scan range with snapshot_ts > commit_ts
4749 let results = memtable.scan_range(b"key1", b"key4", 101, None);
4750 assert_eq!(
4751 results.len(),
4752 3,
4753 "kind={:?} should have 3 results (key1, key2, key3)",
4754 kind
4755 );
4756 }
4757 }
4758
4759 #[test]
4760 fn test_durable_storage_arena() {
4761 let dir = tempdir().unwrap();
4762 let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
4763
4764 assert_eq!(storage.memtable_type(), MemTableType::Arena);
4765
4766 // Basic transaction should work the same
4767 let txn_id = storage.begin_transaction().unwrap();
4768 storage
4769 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
4770 .unwrap();
4771 storage.commit(txn_id).unwrap();
4772
4773 let txn2 = storage.begin_transaction().unwrap();
4774 let v = storage.read(txn2, b"key1").unwrap();
4775 assert_eq!(v, Some(b"value1".to_vec()));
4776 storage.abort(txn2).unwrap();
4777 }
4778
4779 #[test]
4780 fn test_durable_storage_full_config() {
4781 let dir = tempdir().unwrap();
4782
4783 // Test with Arena and ordered index enabled
4784 let storage =
4785 DurableStorage::open_with_full_config(dir.path(), true, MemTableType::Arena).unwrap();
4786
4787 assert_eq!(storage.memtable_type(), MemTableType::Arena);
4788
4789 // Write multiple keys
4790 let txn = storage.begin_transaction().unwrap();
4791 for i in 0..10 {
4792 let key = format!("key{:02}", i);
4793 let value = format!("value{}", i);
4794 storage
4795 .write(txn, key.into_bytes(), value.into_bytes())
4796 .unwrap();
4797 }
4798 storage.commit(txn).unwrap();
4799
4800 // Scan should work (uses scan method for prefix)
4801 let txn2 = storage.begin_transaction().unwrap();
4802 let results = storage.scan(txn2, b"key0").unwrap();
4803 assert_eq!(results.len(), 10); // key00 through key09
4804 storage.abort(txn2).unwrap();
4805 }
4806}