sochdb_storage/durable_storage.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! This module wires together all storage components into a production-ready
21//! durable storage engine:
22//!
23//! - WAL (txn_wal.rs) for durability
24//! - Group Commit for throughput
25//! - MVCC for isolation
26//! - LSCS for columnar efficiency
27//!
28//! ## Architecture
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │ DurableStorage │
33//! ├─────────────────────────────────────────────────────────────────┤
34//! │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
35//! │ │ MvccManager │ │ GroupCommit │───▶│ TxnWal (fsync) │ │
36//! │ │ │ │ │ └─────────────────────┘ │
37//! │ │ ┌─────────┐ │ └─────────────┘ │
38//! │ │ │Snapshots│ │ │
39//! │ │ └─────────┘ │ ┌─────────────────────────────────────────┐│
40//! │ │ ┌─────────┐ │ │ MemTable ││
41//! │ │ │ Txn Map │ │ │ (key → (value, txn_id, version)) ││
42//! │ │ └─────────┘ │ └─────────────────────────────────────────┘│
43//! │ └─────────────┘ │
44//! │ ┌─────────────────────────────────────────┐│
45//! │ │ LSCS (SST) ││
46//! │ │ Immutable columnar segments ││
47//! │ └─────────────────────────────────────────┘│
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Concurrency
52//!
53//! - Writers: Serialize through WAL, use MVCC for conflict detection
54//! - Readers: Lock-free reads at snapshot timestamp
55//! - Commits: Batched through GroupCommit for throughput
56//!
57//! ## Isolation Contract
58//!
59//! The live write path (`MvccManager`, used by `DurableStorage`) provides
60//! **Serializable Snapshot Isolation (SSI)**, which is strictly stronger than
61//! plain Snapshot Isolation (SI):
62//!
63//! - **Snapshot reads.** Every transaction reads from a consistent snapshot
64//! fixed at `begin_transaction()` (its `snapshot_ts`). Concurrent commits are
65//! invisible to it — see `test_snapshot_isolation`. This alone is SI and is
66//! vulnerable to **write skew** (two transactions each read a set the other
67//! writes, both commit, and no serial order reproduces the result).
68//! - **Write-skew prevention.** On commit, `MvccManager::validate_ssi` inspects
69//! recently-committed concurrent transactions for rw-antidependency edges:
70//! an inbound edge (another txn wrote a key we read, `T_other →rw→ T_me`) and
71//! an outbound edge (we wrote a key another txn read, `T_me →rw→ T_other`).
72//! When a transaction sits on **both** an inbound and an outbound rw-edge it
73//! is the pivot of a potential dependency cycle (Cahill/Fekete "dangerous
74//! structure"), so it is aborted. This is the conservative safe subset of the
75//! SSI test: it may abort some serializable schedules (false positives) but
76//! **never admits a non-serializable one** (no false negatives), so the
77//! externally observable isolation level is Serializable.
78//! - **Read-only transactions** (`begin_read_only`) skip read-set tracking and
79//! never participate in validation; they always observe a serializable
80//! snapshot and never abort.
81//!
82//! ### MVCC garbage collection is snapshot-safe
83//!
84//! Old versions are pruned by `DurableStorage::gc()`, which feeds the
85//! **low-water-mark** `MvccManager::min_active_snapshot()` — the minimum
86//! `snapshot_ts` across all still-active transactions — into
87//! `MvccMemTable::gc()` and then `BinarySearchChain::gc_by_ts()`. The chain
88//! retains every version with `commit_ts > min_active_ts` **plus one anchor
89//! version at or below the watermark**, so any in-flight reader can still
90//! resolve the correct version for its snapshot. A version is only freed once
91//! **no active snapshot can observe it**. The watermark is recomputed on every
92//! `begin`/`commit`/`abort`, so it is monotonic with respect to the oldest live
93//! reader. See `test_gc_preserves_versions_for_active_snapshot`.
94
95use std::collections::HashSet;
96use std::path::{Path, PathBuf};
97use std::sync::Arc;
98use std::sync::atomic::{AtomicU64, Ordering};
99
100use dashmap::DashMap;
101use smallvec::SmallVec;
102
103use crossbeam_skiplist::SkipMap;
104
105use crate::deferred_index::{DeferredIndexConfig, DeferredSortedIndex};
106use crate::group_commit::EventDrivenGroupCommit;
107use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
108use sochdb_core::version_chain::{
109 BinarySearchChain, ChainEntry, MvccVersionChain, MvccVersionChainMut, Timestamp, TxnId,
110 VisibilityContext, WriteConflictDetection,
111};
112use sochdb_core::{Result, SochDBError};
113
114// =============================================================================
115// SSI Bloom Filter - Fast Conflict Pre-Filtering
116// =============================================================================
117
118/// Space-efficient Bloom filter for SSI conflict detection
119///
120/// Used to quickly determine if two transactions MIGHT have conflicting keys.
121/// False positives are acceptable (leads to unnecessary exact checks),
122/// but false negatives are not allowed.
123///
124/// ## Configuration
125///
126/// For 1000 keys with 1% false positive rate:
127/// - m = ~9600 bits ≈ 1.2 KB per transaction
128/// - k = 7 hash functions
129///
130/// ## Lazy Initialization
131///
132/// The bit vector is lazily initialized on first insert to avoid
133/// allocation overhead for read-only transactions.
134#[derive(Clone, Debug)]
135pub struct SsiBloomFilter {
136 /// Bit vector (each u64 holds 64 bits) - lazily initialized
137 bits: Option<Vec<u64>>,
138 /// Expected capacity (used for lazy init sizing)
139 expected_capacity: usize,
140 /// Number of hash functions to use
141 num_hashes: u32,
142}
143
144impl SsiBloomFilter {
145 /// Optimal number of bits per item for 1% false positive rate
146 /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
147 const BITS_PER_ITEM: f64 = 9.6;
148
149 /// Optimal number of hash functions for 1% false positive rate
150 /// k = (m/n) × ln(2) ≈ 7
151 const DEFAULT_NUM_HASHES: u32 = 7;
152
153 /// Minimum capacity to avoid tiny filters
154 const MIN_CAPACITY: usize = 64;
155
156 /// Create a new bloom filter for expected item count (lazy allocation)
157 ///
158 /// Configured for ~1% false positive rate.
159 /// The bit vector is not allocated until first insert.
160 #[inline]
161 pub fn new(expected_items: usize) -> Self {
162 Self {
163 bits: None,
164 expected_capacity: expected_items.max(Self::MIN_CAPACITY),
165 num_hashes: Self::DEFAULT_NUM_HASHES,
166 }
167 }
168
169 /// Create with specific capacity in words (for memory-constrained scenarios)
170 pub fn with_word_capacity(words: usize) -> Self {
171 Self {
172 bits: None,
173 expected_capacity: words.max(1) * 64 / 10, // Approx items from words
174 num_hashes: Self::DEFAULT_NUM_HASHES,
175 }
176 }
177
178 /// Ensure bits are allocated (lazy initialization)
179 #[inline]
180 fn ensure_allocated(&mut self) {
181 if self.bits.is_none() {
182 let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
183 let num_words = num_bits.div_ceil(64);
184 self.bits = Some(vec![0u64; num_words]);
185 }
186 }
187
188 /// Add a key to the filter - O(k) where k = num_hashes
189 #[inline]
190 pub fn insert(&mut self, key: &[u8]) {
191 self.ensure_allocated();
192 let bits = self.bits.as_mut().unwrap();
193 let num_bits = bits.len() * 64;
194 if num_bits == 0 {
195 return;
196 }
197
198 // Use two hash functions to simulate k hash functions
199 // h(i) = h1 + i * h2 (double hashing technique)
200 let h1 = Self::hash1(key);
201 let h2 = Self::hash2(key);
202
203 for i in 0..self.num_hashes {
204 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
205 let bit_idx = (h as usize) % num_bits;
206 let word_idx = bit_idx / 64;
207 let bit_pos = bit_idx % 64;
208 bits[word_idx] |= 1 << bit_pos;
209 }
210 }
211
212 /// Check if a key might be present - O(k)
213 ///
214 /// Returns:
215 /// - false: Key is definitely NOT in the set (or filter not initialized)
216 /// - true: Key MIGHT be in the set (needs exact check)
217 #[inline]
218 pub fn may_contain(&self, key: &[u8]) -> bool {
219 let bits = match &self.bits {
220 Some(b) => b,
221 None => return false, // Uninitialized = empty
222 };
223 let num_bits = bits.len() * 64;
224 if num_bits == 0 {
225 return false;
226 }
227
228 let h1 = Self::hash1(key);
229 let h2 = Self::hash2(key);
230
231 for i in 0..self.num_hashes {
232 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
233 let bit_idx = (h as usize) % num_bits;
234 let word_idx = bit_idx / 64;
235 let bit_pos = bit_idx % 64;
236 if bits[word_idx] & (1 << bit_pos) == 0 {
237 return false; // Definitely not present
238 }
239 }
240 true // Might be present
241 }
242
243 /// Check if this filter might intersect with another
244 ///
245 /// Fast O(m/64) check using bitwise AND of all words.
246 /// If no bits are shared, sets are definitely disjoint.
247 #[inline]
248 pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
249 let (self_bits, other_bits) = match (&self.bits, &other.bits) {
250 (Some(s), Some(o)) => (s, o),
251 _ => return false, // Either uninitialized = no intersection
252 };
253 let min_len = self_bits.len().min(other_bits.len());
254 for i in 0..min_len {
255 if self_bits[i] & other_bits[i] != 0 {
256 return true; // Might intersect
257 }
258 }
259 false // Definitely disjoint
260 }
261
262 /// First hash function (using built-in hasher)
263 #[inline]
264 fn hash1(key: &[u8]) -> u64 {
265 use std::collections::hash_map::DefaultHasher;
266 use std::hash::{Hash, Hasher};
267 let mut hasher = DefaultHasher::new();
268 key.hash(&mut hasher);
269 hasher.finish()
270 }
271
272 /// Second hash function (using twox-hash for independence)
273 #[inline]
274 fn hash2(key: &[u8]) -> u64 {
275 twox_hash::xxh3::hash64(key)
276 }
277
278 /// Get the memory size in bytes
279 pub fn size_bytes(&self) -> usize {
280 self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
281 }
282
283 /// Check if the filter is empty
284 pub fn is_empty(&self) -> bool {
285 match &self.bits {
286 Some(bits) => bits.iter().all(|&w| w == 0),
287 None => true,
288 }
289 }
290}
291
292/// Type alias for inline key storage - keys up to 32 bytes stored on stack
293/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
294pub type InlineKey = SmallVec<[u8; 32]>;
295
296/// Version of a key-value pair
297#[derive(Debug, Clone)]
298pub struct Version {
299 /// The value (None = tombstone)
300 pub value: Option<Vec<u8>>,
301 /// Transaction that created this version
302 pub txn_id: u64,
303 /// Commit timestamp (0 = uncommitted)
304 pub commit_ts: u64,
305}
306
307// Rec 11: Implement ChainEntry so BinarySearchChain<Version> works
308impl ChainEntry for Version {
309 #[inline]
310 fn commit_ts(&self) -> u64 {
311 self.commit_ts
312 }
313 #[inline]
314 fn txn_id(&self) -> u64 {
315 self.txn_id
316 }
317 #[inline]
318 fn set_commit_ts(&mut self, ts: u64) {
319 self.commit_ts = ts;
320 }
321}
322
323// ============================================================================
324// Optimized VersionChain with Binary Search (Task 1: mm.md)
325// ============================================================================
326
327/// Multi-version data for a single key with O(log v) read complexity
328///
329/// ## Optimization: Binary Search with Sorted Commit Ordering
330///
331/// Separates committed versions (sorted descending by commit_ts) from
332/// uncommitted version (single optional slot per transaction).
333///
334/// **Before:** O(v) linear scan + O(v) max computation = O(v)
335/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
336///
337/// For v=10 versions: 3.3x speedup
338/// For v=100 versions: 7x speedup
339///
340/// ## Rec 11: Consolidated
341///
342/// Delegates binary-search logic to `BinarySearchChain<Version>` from sochdb-core,
343/// eliminating duplication with `mvcc_concurrent::VersionChain`.
344#[derive(Debug, Default)]
345pub struct VersionChain {
346 /// Consolidated binary-search chain (Rec 11)
347 inner: BinarySearchChain<Version>,
348}
349
350impl VersionChain {
351 /// Create a new empty version chain
352 #[inline]
353 pub fn new() -> Self {
354 Self {
355 inner: BinarySearchChain::new(),
356 }
357 }
358
359 /// Add a new uncommitted version
360 /// If there's already an uncommitted version from this txn, update it in place
361 ///
362 /// O(1) - just updates the uncommitted slot
363 #[inline]
364 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
365 match self.inner.uncommitted_mut() {
366 Some(v) if v.txn_id == txn_id => {
367 // Update in place - O(1)
368 v.value = value;
369 }
370 _ => {
371 // New or different txn — set the slot
372 self.inner.set_uncommitted(Version {
373 value,
374 txn_id,
375 commit_ts: 0,
376 });
377 }
378 }
379 }
380
381 /// Commit a version - moves from uncommitted slot to sorted committed list
382 ///
383 /// O(log v) - inserts into sorted position using binary search
384 #[inline]
385 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
386 self.inner.commit(txn_id, commit_ts)
387 }
388
389 /// Abort a version (remove uncommitted version for txn)
390 ///
391 /// O(1) - just clears the uncommitted slot if it matches
392 #[inline]
393 pub fn abort(&mut self, txn_id: u64) {
394 self.inner.abort(txn_id);
395 }
396
397 /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
398 ///
399 /// ## Complexity: O(1) + O(log v) = O(log v)
400 #[inline]
401 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
402 self.inner.read_at(snapshot_ts, current_txn_id)
403 }
404
405 /// Check if there's an uncommitted version by another transaction
406 ///
407 /// O(1) - just checks the uncommitted slot
408 #[inline]
409 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
410 self.inner.has_write_conflict(my_txn_id)
411 }
412
413 /// Garbage collect old versions
414 pub fn gc(&mut self, min_active_ts: u64) {
415 self.inner.gc_by_ts(min_active_ts);
416 }
417
418 /// Get total version count (committed + uncommitted)
419 #[inline]
420 pub fn version_count(&self) -> usize {
421 self.inner.version_count()
422 }
423
424 // Legacy compatibility: get versions vec (for tests)
425 #[cfg(test)]
426 pub fn versions(&self) -> Vec<Version> {
427 let mut result = self.inner.committed_versions().to_vec();
428 if let Some(v) = self.inner.uncommitted() {
429 result.push(v.clone());
430 }
431 result
432 }
433}
434
435// =============================================================================
436// Rec 6: Unified Version Chain Trait Implementations
437// =============================================================================
438
439impl MvccVersionChain for VersionChain {
440 type Value = Option<Vec<u8>>;
441
442 fn get_visible(&self, ctx: &VisibilityContext) -> Option<&Self::Value> {
443 // Delegate to BinarySearchChain, then project to value field
444 self.inner
445 .read_at(ctx.snapshot_ts, Some(ctx.reader_txn_id))
446 .map(|v| &v.value)
447 }
448
449 fn get_latest(&self) -> Option<&Self::Value> {
450 self.inner.latest().map(|v| &v.value)
451 }
452
453 fn version_count(&self) -> usize {
454 self.inner.version_count()
455 }
456}
457
458impl MvccVersionChainMut for VersionChain {
459 fn add_uncommitted(&mut self, value: Self::Value, txn_id: TxnId) {
460 self.add_uncommitted(value, txn_id);
461 }
462
463 fn commit_version(&mut self, txn_id: TxnId, commit_ts: Timestamp) -> bool {
464 self.inner.commit(txn_id, commit_ts)
465 }
466
467 fn delete_version(&mut self, txn_id: TxnId, _delete_ts: Timestamp) -> bool {
468 // Insert a tombstone (None value) as uncommitted
469 self.add_uncommitted(None, txn_id);
470 true
471 }
472
473 fn gc(&mut self, min_visible_ts: Timestamp) -> (usize, usize) {
474 let before = self.inner.committed_count();
475 self.inner.gc_by_ts(min_visible_ts);
476 let removed = before - self.inner.committed_count();
477 (removed, removed * std::mem::size_of::<Version>())
478 }
479}
480
481impl WriteConflictDetection for VersionChain {
482 fn has_write_conflict(&self, txn_id: TxnId) -> bool {
483 self.has_write_conflict(txn_id)
484 }
485}
486
487// =============================================================================
488// Pre-sizing Constants to Avoid HashSet Resize Overhead
489// =============================================================================
490
491/// Default capacity for write_set HashSet
492/// Sized for typical OLTP transactions (10-50 keys)
493/// Avoids resize overhead that caused +11% regression
494const WRITE_SET_INITIAL_CAPACITY: usize = 32;
495
496/// Default capacity for read_set HashSet
497/// Typically larger than write_set due to read-heavy patterns
498const READ_SET_INITIAL_CAPACITY: usize = 64;
499
500/// Transaction state for MVCC
501#[derive(Debug, Clone)]
502pub struct MvccTransaction {
503 /// Transaction ID
504 pub txn_id: u64,
505 /// Snapshot timestamp (reads see commits before this)
506 pub snapshot_ts: u64,
507 /// Keys written by this transaction - uses SmallVec for inline storage
508 /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
509 pub write_set: HashSet<InlineKey>,
510 /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
511 /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
512 pub read_set: HashSet<InlineKey>,
513 /// Bloom filter for write set - fast SSI pre-filtering
514 pub write_bloom: SsiBloomFilter,
515 /// Bloom filter for read set - fast SSI pre-filtering
516 pub read_bloom: SsiBloomFilter,
517 /// Transaction state
518 pub state: TxnState,
519 /// Transaction mode for SSI optimization (Recommendation 9)
520 /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
521 pub mode: TransactionMode,
522}
523
524impl MvccTransaction {
525 /// Create a new transaction with pre-sized collections
526 ///
527 /// This avoids HashSet resize overhead during the transaction
528 /// which was causing +11% regression on write_set.insert().
529 #[inline]
530 pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
531 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
532 }
533
534 /// Create a read-only transaction (SSI bypass - 2.6x faster)
535 ///
536 /// Read-only transactions skip all SSI tracking:
537 /// - No read_set allocation
538 /// - No read_bloom allocation
539 /// - No commit validation
540 ///
541 /// ## Performance
542 ///
543 /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
544 #[inline]
545 pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
546 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
547 }
548
549 /// Create a write-only transaction (partial SSI bypass)
550 ///
551 /// Write-only transactions skip read tracking:
552 /// - No read_set tracking
553 /// - No read_bloom inserts
554 /// - Still needs write_set for commit
555 #[inline]
556 pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
557 Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
558 }
559
560 /// Create transaction with specific mode
561 #[inline]
562 pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
563 // Optimize allocation based on mode
564 let (write_capacity, read_capacity) = match mode {
565 TransactionMode::ReadOnly => (0, 0), // No tracking needed
566 TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
567 TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
568 };
569 Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
570 }
571
572 /// Create with custom capacities for expected workload
573 ///
574 /// Use this when you know the transaction will write many keys
575 /// to avoid resize overhead entirely.
576 #[inline]
577 pub fn with_capacity(
578 txn_id: u64,
579 snapshot_ts: u64,
580 write_capacity: usize,
581 read_capacity: usize,
582 mode: TransactionMode,
583 ) -> Self {
584 Self {
585 txn_id,
586 snapshot_ts,
587 write_set: HashSet::with_capacity(write_capacity),
588 read_set: HashSet::with_capacity(read_capacity),
589 write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
590 read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
591 state: TxnState::Active,
592 mode,
593 }
594 }
595
596 /// Check if this is a read-only transaction
597 #[inline]
598 pub fn is_read_only(&self) -> bool {
599 self.write_set.is_empty()
600 }
601
602 /// Check if this is a single-key write transaction
603 #[inline]
604 pub fn is_single_key_write(&self) -> bool {
605 self.write_set.len() == 1 && self.read_set.len() <= 1
606 }
607}
608
609/// Transaction state
610#[derive(Debug, Clone, Copy, PartialEq, Eq)]
611pub enum TxnState {
612 Active,
613 Committed,
614 Aborted,
615}
616
617// =============================================================================
618// Transaction Mode for SSI Bypass (Recommendation 9)
619// =============================================================================
620
621/// Transaction mode for SSI optimization
622///
623/// By classifying transactions at begin time, we can skip expensive SSI
624/// tracking for the majority of transactions:
625///
626/// | Mode | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
627/// |-----------|-------------------|--------------------|-----------------|
628/// | ReadOnly | None | None | ~10 ns |
629/// | WriteOnly | None | Full | ~30 ns |
630/// | ReadWrite | Full | Full | ~50 ns |
631///
632/// ## Performance Analysis
633///
634/// For read-only transactions (typically 90% of workload):
635/// ```text
636/// Current: T_txn = T_begin + N × (T_read + T_record) + T_commit
637/// = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
638///
639/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
640/// = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
641///
642/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
643/// ```
644#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
645pub enum TransactionMode {
646 /// Read-only transaction - skips ALL SSI tracking
647 /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
648 /// Safe to skip read_set, read_bloom, and commit validation entirely
649 ReadOnly,
650
651 /// Write-only transaction - skips read tracking
652 /// Cannot form incoming rw-edges (no reads from concurrent writers)
653 /// Only needs write_set and write_bloom tracking
654 WriteOnly,
655
656 /// Full read-write transaction (default) - complete SSI tracking
657 /// May form both incoming and outgoing rw-edges
658 /// Requires full validation at commit time
659 #[default]
660 ReadWrite,
661}
662
663impl TransactionMode {
664 /// Check if this mode requires read tracking
665 #[inline]
666 pub fn tracks_reads(&self) -> bool {
667 matches!(self, TransactionMode::ReadWrite)
668 }
669
670 /// Check if this mode requires write tracking
671 #[inline]
672 pub fn tracks_writes(&self) -> bool {
673 matches!(
674 self,
675 TransactionMode::WriteOnly | TransactionMode::ReadWrite
676 )
677 }
678
679 /// Check if commit needs SSI validation
680 #[inline]
681 pub fn needs_ssi_validation(&self) -> bool {
682 matches!(self, TransactionMode::ReadWrite)
683 }
684}
685
686/// SSI conflict edge type
687#[derive(Debug, Clone, Copy, PartialEq, Eq)]
688pub enum ConflictType {
689 /// Read-write conflict: T1 reads X, then T2 writes X
690 ReadWrite,
691 /// Write-read conflict: T1 writes X, then T2 reads X
692 WriteRead,
693}
694
695/// SSI conflict edge for dangerous structure detection
696#[derive(Debug, Clone)]
697pub struct ConflictEdge {
698 /// Source transaction
699 pub from_txn: u64,
700 /// Target transaction
701 pub to_txn: u64,
702 /// Type of conflict
703 pub conflict_type: ConflictType,
704}
705
706/// MVCC Manager with SSI support
707///
708/// Uses DashMap for lock-free per-transaction access.
709/// Implements Serializable Snapshot Isolation (SSI) with
710/// dangerous structure detection for rw-antidependency cycles.
711#[allow(clippy::type_complexity)]
712pub struct MvccManager {
713 /// Active transactions (sharded for concurrent access)
714 active_txns: DashMap<u64, MvccTransaction>,
715 /// Current timestamp counter
716 ts_counter: AtomicU64,
717 /// Minimum active snapshot timestamp (for GC)
718 min_active_ts: AtomicU64,
719 /// Recently committed transactions for SSI validation
720 /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
721 /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
722 recent_commits: DashMap<
723 u64,
724 (
725 u64,
726 SsiBloomFilter,
727 SsiBloomFilter,
728 HashSet<InlineKey>,
729 HashSet<InlineKey>,
730 ),
731 >,
732 /// Max recent commits to track
733 max_recent_commits: usize,
734}
735
736impl Default for MvccManager {
737 fn default() -> Self {
738 Self::new()
739 }
740}
741
742impl MvccManager {
743 pub fn new() -> Self {
744 Self {
745 active_txns: DashMap::new(),
746 ts_counter: AtomicU64::new(1),
747 min_active_ts: AtomicU64::new(0),
748 recent_commits: DashMap::new(),
749 max_recent_commits: 1000, // Track last 1000 commits for SSI
750 }
751 }
752
753 /// Begin a new transaction with snapshot isolation
754 ///
755 /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
756 pub fn begin(&self, txn_id: u64) -> MvccTransaction {
757 self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
758 }
759
760 /// Begin a read-only transaction (SSI bypass - 2.6x faster)
761 ///
762 /// Read-only transactions skip all SSI tracking, reducing
763 /// per-read overhead from ~82ns to ~32ns.
764 ///
765 /// ## Safety
766 ///
767 /// Caller must ensure no writes are performed. Attempting to
768 /// write in a read-only transaction will still succeed but
769 /// won't be tracked for SSI validation.
770 #[inline]
771 pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
772 self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
773 }
774
775 /// Begin a write-only transaction (partial SSI bypass)
776 ///
777 /// Write-only transactions skip read tracking, reducing overhead
778 /// for insert-heavy workloads.
779 #[inline]
780 pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
781 self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
782 }
783
784 /// Begin a transaction with specific mode
785 ///
786 /// This is the core transaction creation method that all other
787 /// begin_* methods delegate to.
788 pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
789 let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
790
791 // Create transaction with mode-optimized allocations
792 let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
793
794 self.active_txns.insert(txn_id, txn.clone());
795 self.update_min_active_ts();
796
797 txn
798 }
799
800 /// Get transaction if active (clones - use get_snapshot_ts for hot path)
801 pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
802 self.active_txns.get(&txn_id).map(|t| t.clone())
803 }
804
805 /// Fast path: get just the snapshot timestamp without cloning
806 /// This is the hot path for reads - avoids cloning bloom filters
807 #[inline]
808 pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
809 self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
810 }
811
812 /// Record a read (for SSI) - uses inline key storage + bloom filter
813 ///
814 /// ## SSI Bypass (Recommendation 9)
815 ///
816 /// For ReadOnly mode transactions, this is a no-op (instant return).
817 /// For WriteOnly mode transactions, this is a no-op.
818 /// Only ReadWrite mode transactions track reads for SSI.
819 ///
820 /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
821 #[inline]
822 pub fn record_read(&self, txn_id: u64, key: &[u8]) {
823 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
824 // SSI Bypass: Skip tracking for read-only and write-only modes
825 if !txn.mode.tracks_reads() {
826 return;
827 }
828
829 // Only track reads if within reasonable bounds
830 if txn.read_set.len() < 10000 {
831 txn.read_set.insert(SmallVec::from_slice(key));
832 txn.read_bloom.insert(key);
833 }
834 }
835 }
836
837 /// Record a write - uses inline key storage + bloom filter
838 ///
839 /// Note: Even ReadOnly transactions can record writes (mode is a hint).
840 /// The mode only affects SSI tracking, not write capability.
841 pub fn record_write(&self, txn_id: u64, key: &[u8]) {
842 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
843 txn.write_set.insert(SmallVec::from_slice(key));
844 txn.write_bloom.insert(key);
845 }
846 }
847
848 /// Allocate commit timestamp
849 pub fn alloc_commit_ts(&self) -> u64 {
850 self.ts_counter.fetch_add(1, Ordering::SeqCst)
851 }
852
853 /// Commit transaction with SSI validation
854 /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
855 /// Returns None if SSI validation fails (dangerous structure detected)
856 ///
857 /// ## SSI Bypass (Recommendation 9)
858 ///
859 /// For ReadOnly mode: Skip validation entirely (~10ns commit)
860 /// For WriteOnly mode: Skip read-based validation (~30ns commit)
861 /// For ReadWrite mode: Full validation (~50ns commit)
862 pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
863 // Get transaction before removing
864 let txn = self.active_txns.get(&txn_id)?.clone();
865
866 // SSI Bypass: Skip validation for ReadOnly transactions
867 // ReadOnly can never form rw-antidependency cycles
868 if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
869 // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
870 // For ReadWrite: check SSI validation result
871 if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
872 // Abort on SSI violation
873 self.active_txns.remove(&txn_id);
874 self.update_min_active_ts();
875 return None;
876 }
877 }
878
879 let commit_ts = self.alloc_commit_ts();
880
881 // Extract write_set and remove transaction - takes ownership
882 let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
883
884 // OPTIMIZATION: Only track ReadWrite transactions for SSI
885 // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
886 let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
887 && !removed_txn.read_set.is_empty()
888 && !removed_txn.write_set.is_empty();
889
890 if needs_ssi_tracking {
891 // Need to clone write_set since we return it AND track it
892 let write_set_for_return = removed_txn.write_set.clone();
893
894 self.track_commit_owned(
895 txn_id,
896 commit_ts,
897 removed_txn.read_bloom,
898 removed_txn.write_bloom,
899 removed_txn.read_set,
900 removed_txn.write_set,
901 );
902
903 self.update_min_active_ts();
904 Some((commit_ts, write_set_for_return))
905 } else {
906 // Fast path: no SSI tracking needed, avoid clone entirely
907 self.update_min_active_ts();
908 Some((commit_ts, removed_txn.write_set))
909 }
910 }
911
912 /// Validate SSI constraints for a committing transaction
913 ///
914 /// ## Transaction Classification (Task 3: Optimistic MVCC)
915 ///
916 /// Transactions are classified and routed through appropriate fast paths:
917 ///
918 /// | Class | Criteria | Validation Cost |
919 /// |------------|-------------------------------|-----------------|
920 /// | ReadOnly | write_set.is_empty() | 0 ns |
921 /// | SingleKey | write_set.len() == 1 | 0 ns |
922 /// | Disjoint | bloom filters don't intersect | ~10 ns |
923 /// | General | full SSI check | ~50 ns |
924 ///
925 /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
926 /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
927 ///
928 /// Detects "dangerous structures" - rw-antidependency cycles:
929 /// - T1 reads X (snapshot sees old value)
930 /// - T2 writes X (concurrent write)
931 /// - T2 reads Y (snapshot sees old value)
932 /// - T1 writes Y (concurrent write)
933 ///
934 /// If T1 → rw → T2 → rw → T1 exists, abort T1
935 #[inline]
936 fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
937 // =================================================================
938 // Fast Path 1: Read-only transactions (0 ns)
939 // =================================================================
940 // Read-only transactions can never form rw-antidependency cycles
941 // because they have no writes to create outgoing rw-edges
942 if txn.write_set.is_empty() {
943 return true;
944 }
945
946 // =================================================================
947 // Fast Path 2: No recent commits to check (0 ns)
948 // =================================================================
949 if self.recent_commits.is_empty() {
950 return true;
951 }
952
953 // =================================================================
954 // Fast Path 3: Single-key write transactions (0 ns)
955 // =================================================================
956 // A single-key write transaction cannot form a dangerous cycle:
957 // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
958 // AND T2 to read what T1 wrote
959 // - With only one key in write_set, the same key would need to be
960 // in both read_set AND write_set of both transactions
961 // - This is already prevented by our conflict detection (write-write)
962 if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
963 return true;
964 }
965
966 let my_snapshot = txn.snapshot_ts;
967
968 // =================================================================
969 // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
970 // =================================================================
971 // Pre-filter using bloom filters: if our write_bloom doesn't intersect
972 // with any concurrent transaction's read_bloom AND vice versa,
973 // there can be no rw-antidependency
974 let mut any_may_intersect = false;
975 for entry in self.recent_commits.iter() {
976 let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
977
978 // Only check concurrent transactions.
979 //
980 // A committed transaction is *concurrent* with us iff its writes are
981 // invisible to our snapshot. Read visibility is strict
982 // (`commit_ts < snapshot_ts`), so a transaction with
983 // `commit_ts >= my_snapshot` is invisible and must be validated
984 // against. Using `<` here (not `<=`) keeps this window consistent
985 // with `read_at`; a `<=` would skip a boundary transaction whose
986 // `commit_ts == my_snapshot` and miss a genuine write-skew.
987 if *other_commit_ts < my_snapshot {
988 continue;
989 }
990
991 // Check bloom filter intersection (O(m/64) per filter)
992 // If our writes may intersect their reads OR their writes may intersect our reads
993 if txn.write_bloom.may_intersect(other_read_bloom)
994 || other_write_bloom.may_intersect(&txn.read_bloom)
995 {
996 any_may_intersect = true;
997 break;
998 }
999 }
1000
1001 // No bloom intersection means definitely disjoint - no SSI conflict possible
1002 if !any_may_intersect {
1003 return true;
1004 }
1005
1006 // =================================================================
1007 // Full SSI Validation (~50 ns)
1008 // =================================================================
1009 // Check for rw-conflicts with recently committed transactions
1010 // An rw-conflict exists if:
1011 // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
1012 // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
1013
1014 let mut in_conflict_with: Vec<u64> = Vec::new();
1015 let mut out_conflict_to: Vec<u64> = Vec::new();
1016
1017 for entry in self.recent_commits.iter() {
1018 let (
1019 other_txn_id,
1020 (
1021 other_commit_ts,
1022 _other_read_bloom,
1023 other_write_bloom,
1024 other_read_set,
1025 other_write_set,
1026 ),
1027 ) = entry.pair();
1028
1029 // Only consider transactions concurrent with us: those whose writes
1030 // are invisible to our snapshot. Read visibility is strict
1031 // (`commit_ts < snapshot_ts`), so `commit_ts >= my_snapshot` means
1032 // concurrent. `<` (not `<=`) keeps this consistent with `read_at`.
1033 if *other_commit_ts < my_snapshot {
1034 continue;
1035 }
1036
1037 // Check: other wrote → we read (other →rw→ me)
1038 // T_other wrote a key that T_me read (rw-dependency inbound)
1039 //
1040 // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
1041 // Only do expensive HashSet intersection if bloom says "maybe conflict"
1042 let mut has_in_conflict = false;
1043 for key in txn.read_set.iter() {
1044 if other_write_bloom.may_contain(key) {
1045 // Bloom says maybe - do exact check
1046 if other_write_set.contains(key) {
1047 has_in_conflict = true;
1048 break;
1049 }
1050 }
1051 }
1052 if has_in_conflict {
1053 in_conflict_with.push(*other_txn_id);
1054 }
1055
1056 // Check: we wrote → other read (me →rw→ other)
1057 // T_me wrote a key that T_other read (rw-dependency outbound)
1058 //
1059 // Bloom-accelerated: Use our write_bloom against their read_set
1060 let mut has_out_conflict = false;
1061 for key in other_read_set.iter() {
1062 if txn.write_bloom.may_contain(key) {
1063 // Bloom says maybe - do exact check
1064 if txn.write_set.contains(key) {
1065 has_out_conflict = true;
1066 break;
1067 }
1068 }
1069 }
1070 if has_out_conflict {
1071 out_conflict_to.push(*other_txn_id);
1072 }
1073 }
1074
1075 // Dangerous structure: we have both incoming AND outgoing rw-edges
1076 // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1077 //
1078 // Conservative check: if both exist, abort
1079 // A more precise check would verify the cycle path, but this is safe
1080 if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1081 return false; // SSI violation - abort
1082 }
1083
1084 true
1085 }
1086
1087 /// Track a committed transaction for future SSI validation
1088 ///
1089 /// Only tracks transactions that have both reads AND writes, since SSI
1090 /// only detects rw-antidependency cycles. Pure read or pure write
1091 /// transactions can't form cycles.
1092 ///
1093 /// ## Optimization: Zero-Copy Transfer
1094 ///
1095 /// Takes ownership of sets instead of cloning to avoid the +15% commit
1096 /// phase regression. The caller should use mem::take() to transfer ownership.
1097 fn track_commit_owned(
1098 &self,
1099 txn_id: u64,
1100 commit_ts: u64,
1101 read_bloom: SsiBloomFilter,
1102 write_bloom: SsiBloomFilter,
1103 read_set: HashSet<InlineKey>,
1104 write_set: HashSet<InlineKey>,
1105 ) {
1106 // Optimization: Only track mixed read-write transactions
1107 // Pure reads can't create outgoing rw-edges
1108 // Pure writes can't create incoming rw-edges
1109 if read_set.is_empty() || write_set.is_empty() {
1110 return; // Skip tracking - can't form SSI cycle
1111 }
1112
1113 // Add to recent commits with bloom filters for fast SSI pre-filtering
1114 // No cloning needed - we take ownership
1115 self.recent_commits.insert(
1116 txn_id,
1117 (commit_ts, read_bloom, write_bloom, read_set, write_set),
1118 );
1119
1120 // Lazy pruning: only prune when we're significantly over capacity
1121 // Avoids pruning overhead on every commit
1122 if self.recent_commits.len() > self.max_recent_commits * 2 {
1123 // Remove entries with lowest commit_ts
1124 let min_active = self.min_active_ts.load(Ordering::Relaxed);
1125 self.recent_commits
1126 .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1127 }
1128 }
1129
1130 /// Legacy track_commit that clones - kept for compatibility
1131 #[allow(dead_code)]
1132 fn track_commit(
1133 &self,
1134 txn_id: u64,
1135 commit_ts: u64,
1136 read_bloom: SsiBloomFilter,
1137 write_bloom: SsiBloomFilter,
1138 read_set: &HashSet<InlineKey>,
1139 write_set: &HashSet<InlineKey>,
1140 ) {
1141 if read_set.is_empty() || write_set.is_empty() {
1142 return;
1143 }
1144 self.recent_commits.insert(
1145 txn_id,
1146 (
1147 commit_ts,
1148 read_bloom,
1149 write_bloom,
1150 read_set.clone(),
1151 write_set.clone(),
1152 ),
1153 );
1154 }
1155
1156 /// Abort transaction
1157 pub fn abort(&self, txn_id: u64) {
1158 self.active_txns.remove(&txn_id);
1159 self.update_min_active_ts();
1160 }
1161
1162 /// Get minimum active snapshot timestamp
1163 pub fn min_active_snapshot(&self) -> u64 {
1164 self.min_active_ts.load(Ordering::SeqCst)
1165 }
1166
1167 /// Get count of active transactions
1168 pub fn active_transaction_count(&self) -> usize {
1169 self.active_txns.len()
1170 }
1171
1172 fn update_min_active_ts(&self) {
1173 let min = self
1174 .active_txns
1175 .iter()
1176 .map(|entry| entry.value().snapshot_ts)
1177 .min()
1178 .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1179 self.min_active_ts.store(min, Ordering::SeqCst);
1180 }
1181}
1182
1183/// Epoch-based dirty list for O(expired) GC instead of O(n)
1184///
1185/// Instead of scanning ALL version chains, we track which keys have versions
1186/// created in each epoch. GC only needs to visit keys from old epochs.
1187struct EpochDirtyList {
1188 /// Ring buffer of epoch -> dirty keys
1189 /// Index = epoch % EPOCH_RING_SIZE
1190 epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1191 /// Current epoch
1192 current_epoch: AtomicU64,
1193}
1194
1195const EPOCH_RING_SIZE: usize = 4;
1196
1197impl EpochDirtyList {
1198 fn new() -> Self {
1199 Self {
1200 epochs: [
1201 parking_lot::Mutex::new(Vec::new()),
1202 parking_lot::Mutex::new(Vec::new()),
1203 parking_lot::Mutex::new(Vec::new()),
1204 parking_lot::Mutex::new(Vec::new()),
1205 ],
1206 current_epoch: AtomicU64::new(0),
1207 }
1208 }
1209
1210 /// Record a version created in the current epoch
1211 #[inline]
1212 fn record_version(&self, key: Vec<u8>) {
1213 let epoch = self.current_epoch.load(Ordering::Relaxed);
1214 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1215 self.epochs[idx].lock().push(key);
1216 }
1217
1218 /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1219 ///
1220 /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1221 /// For 100 writes: ~100x fewer mutex operations.
1222 #[inline]
1223 fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1224 let epoch = self.current_epoch.load(Ordering::Relaxed);
1225 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1226 let mut guard = self.epochs[idx].lock();
1227 guard.extend(keys);
1228 }
1229
1230 /// Advance to next epoch, returning old epoch's dirty keys
1231 fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1232 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1233 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1234
1235 // Drain the old epoch's dirty list
1236 let mut guard = self.epochs[old_idx].lock();
1237 let keys = std::mem::take(&mut *guard);
1238 (old_epoch, keys)
1239 }
1240
1241 /// Get current epoch
1242 #[allow(dead_code)]
1243 fn current(&self) -> u64 {
1244 self.current_epoch.load(Ordering::Relaxed)
1245 }
1246}
1247
1248// ============================================================================
1249// Streaming Scan Iterator
1250// ============================================================================
1251
1252/// Streaming iterator for range scans
1253///
1254/// Yields results one at a time without materializing the full result set.
1255/// This enables processing of very large result sets with O(1) memory per
1256/// iteration instead of O(N) for the entire result set.
1257struct ScanRangeIterator<'a> {
1258 memtable: &'a MvccMemTable,
1259 start: Vec<u8>,
1260 end: Vec<u8>,
1261 snapshot_ts: u64,
1262 current_txn_id: Option<u64>,
1263 use_ordered: bool,
1264 // We use Option to defer initialization
1265 ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1266 unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1267 initialized: bool,
1268}
1269
1270impl<'a> Iterator for ScanRangeIterator<'a> {
1271 type Item = (Vec<u8>, Vec<u8>);
1272
1273 fn next(&mut self) -> Option<Self::Item> {
1274 // Lazy initialization on first call
1275 if !self.initialized {
1276 self.initialized = true;
1277
1278 if self.use_ordered {
1279 // Try deferred index first (after compaction, it uses a SkipMap internally)
1280 if let Some(ref def_idx) = self.memtable.deferred_index {
1281 let start = self.start.clone();
1282 let end = self.end.clone();
1283 let snapshot_ts = self.snapshot_ts;
1284 let current_txn_id = self.current_txn_id;
1285 let data = &self.memtable.data;
1286
1287 // Collect keys from deferred index (already sorted after compact)
1288 let keys: Vec<Vec<u8>> = if end.is_empty() {
1289 def_idx.range_from(&start).collect()
1290 } else {
1291 def_idx.range(&start, &end).collect()
1292 };
1293
1294 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1295 Box::new(keys.into_iter().filter_map(move |key| {
1296 if let Some(chain) = data.get(&key)
1297 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1298 && let Some(value) = &v.value
1299 {
1300 Some((key, value.clone()))
1301 } else {
1302 None
1303 }
1304 }));
1305 self.ordered_iter = Some(iter);
1306 } else if let Some(ref idx) = self.memtable.ordered_index {
1307 let start = self.start.clone();
1308 let end = self.end.clone();
1309 let snapshot_ts = self.snapshot_ts;
1310 let current_txn_id = self.current_txn_id;
1311 let data = &self.memtable.data;
1312
1313 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty()
1314 {
1315 Box::new(idx.range(start..).filter_map(move |entry| {
1316 let key = entry.key();
1317 if let Some(chain) = data.get(key)
1318 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1319 && let Some(value) = &v.value
1320 {
1321 Some((key.clone(), value.clone()))
1322 } else {
1323 None
1324 }
1325 }))
1326 } else {
1327 Box::new(idx.range(start..end).filter_map(move |entry| {
1328 let key = entry.key();
1329 if let Some(chain) = data.get(key)
1330 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1331 && let Some(value) = &v.value
1332 {
1333 Some((key.clone(), value.clone()))
1334 } else {
1335 None
1336 }
1337 }))
1338 };
1339 self.ordered_iter = Some(iter);
1340 }
1341 } else {
1342 // Unordered full scan
1343 let start = self.start.clone();
1344 let end = self.end.clone();
1345 let snapshot_ts = self.snapshot_ts;
1346 let current_txn_id = self.current_txn_id;
1347
1348 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> =
1349 Box::new(self.memtable.data.iter().filter_map(move |entry| {
1350 let key = entry.key();
1351
1352 if key.as_slice() < start.as_slice() {
1353 return None;
1354 }
1355 if !end.is_empty() && key.as_slice() >= end.as_slice() {
1356 return None;
1357 }
1358
1359 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1360 && let Some(value) = &v.value
1361 {
1362 Some((key.clone(), value.clone()))
1363 } else {
1364 None
1365 }
1366 }));
1367 self.unordered_iter = Some(iter);
1368 }
1369 }
1370
1371 // Get next from appropriate iterator
1372 if let Some(ref mut iter) = self.ordered_iter {
1373 iter.next()
1374 } else if let Some(ref mut iter) = self.unordered_iter {
1375 iter.next()
1376 } else {
1377 None
1378 }
1379 }
1380}
1381
1382/// MemTable with MVCC support
1383///
1384/// Uses DashMap for lock-free concurrent access per key.
1385/// This eliminates the global write lock bottleneck.
1386///
1387/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1388/// Maintains a deferred sorted index for efficient scans:
1389/// - Writes: O(1) append to hot buffer
1390/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1391pub struct MvccMemTable {
1392 /// Key -> VersionChain (sharded for concurrent access)
1393 data: DashMap<Vec<u8>, VersionChain>,
1394 /// Deferred sorted index for efficient prefix/range scans (optional)
1395 /// O(1) insert to hot buffer, O(N log N) sort on first scan
1396 /// When None, scan_prefix will fall back to O(N) DashMap iteration
1397 deferred_index: Option<DeferredSortedIndex>,
1398 /// Legacy SkipMap for compatibility (used when deferred=false)
1399 ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1400 /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1401 #[allow(dead_code)]
1402 use_deferred: bool,
1403 /// Approximate size in bytes
1404 size_bytes: AtomicU64,
1405 /// Epoch-based dirty list for efficient GC
1406 dirty_list: EpochDirtyList,
1407}
1408
1409impl Default for MvccMemTable {
1410 fn default() -> Self {
1411 Self::new()
1412 }
1413}
1414
1415impl MvccMemTable {
1416 pub fn new() -> Self {
1417 Self::with_ordered_index(true)
1418 }
1419
1420 /// Create memtable with optional ordered index
1421 ///
1422 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1423 /// but scan_prefix becomes O(N) instead of O(log N + K)
1424 ///
1425 /// Uses deferred sorting by default for better write performance:
1426 /// - Writes: O(1) append to hot buffer
1427 /// - Scans: O(N log N) sort-on-demand
1428 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1429 Self::with_index_mode(enable_ordered_index, true)
1430 }
1431
1432 /// Create memtable with fine-grained control over indexing
1433 ///
1434 /// # Arguments
1435 /// * `enable_ordered_index` - Whether to maintain an ordered index
1436 /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1437 /// If false, use SkipMap (O(log N) writes)
1438 pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1439 Self {
1440 data: DashMap::new(),
1441 deferred_index: if enable_ordered_index && use_deferred {
1442 Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1443 max_unsorted_entries: 10_000, // Compact every 10K writes
1444 enabled: true,
1445 }))
1446 } else {
1447 None
1448 },
1449 ordered_index: if enable_ordered_index && !use_deferred {
1450 Some(SkipMap::new())
1451 } else {
1452 None
1453 },
1454 use_deferred,
1455 size_bytes: AtomicU64::new(0),
1456 dirty_list: EpochDirtyList::new(),
1457 }
1458 }
1459
1460 /// Write a key-value pair (uncommitted)
1461 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1462 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1463 let key_len = key.len();
1464
1465 // Track this key in the current epoch's dirty list for GC
1466 self.dirty_list.record_version(key.clone());
1467
1468 // Insert into ordered index for prefix scans (if enabled)
1469 // Deferred: O(1) append to hot buffer
1470 // SkipMap: O(log N) insert
1471 if let Some(ref idx) = self.deferred_index {
1472 idx.insert(key.clone());
1473 } else if let Some(ref idx) = self.ordered_index {
1474 idx.insert(key.clone(), ());
1475 }
1476
1477 // Use entry API for atomic get-or-insert
1478 let mut entry = self.data.entry(key).or_default();
1479
1480 // Check for write-write conflict
1481 if entry.has_write_conflict(txn_id) {
1482 return Err(SochDBError::Internal(
1483 "Write-write conflict detected".into(),
1484 ));
1485 }
1486 entry.add_uncommitted(value, txn_id);
1487 self.size_bytes
1488 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1489
1490 Ok(())
1491 }
1492
1493 /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1494 ///
1495 /// Optimizations applied (Rec 3: MVCC Batching):
1496 /// - Batched dirty list tracking: single lock acquire for all keys
1497 /// - Deferred index: O(1) append per key
1498 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1499 let mut total_size = 0u64;
1500
1501 // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1502 self.dirty_list
1503 .record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1504
1505 for (key, value) in writes {
1506 // Insert into ordered index (if enabled)
1507 // Deferred: O(1) append, SkipMap: O(log N)
1508 if let Some(ref idx) = self.deferred_index {
1509 idx.insert(key.clone());
1510 } else if let Some(ref idx) = self.ordered_index {
1511 idx.insert(key.clone(), ());
1512 }
1513
1514 let mut entry = self.data.entry(key.clone()).or_default();
1515
1516 if entry.has_write_conflict(txn_id) {
1517 return Err(SochDBError::Internal(
1518 "Write-write conflict detected".into(),
1519 ));
1520 }
1521
1522 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1523 entry.add_uncommitted(value.clone(), txn_id);
1524 total_size += (key.len() + value_size) as u64;
1525 }
1526
1527 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1528 Ok(())
1529 }
1530
1531 /// Read at snapshot timestamp, with optional current txn to see own writes
1532 pub fn read(
1533 &self,
1534 key: &[u8],
1535 snapshot_ts: u64,
1536 current_txn_id: Option<u64>,
1537 ) -> Option<Vec<u8>> {
1538 self.data.get(key).and_then(|chain| {
1539 chain
1540 .read_at(snapshot_ts, current_txn_id)
1541 .and_then(|v| v.value.clone())
1542 })
1543 }
1544
1545 /// Commit all versions for a transaction
1546 ///
1547 /// Only updates the keys that were written by this transaction (tracked in write_set).
1548 /// Accepts InlineKey for zero-allocation MVCC tracking.
1549 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1550 // Only iterate over keys we know were written - O(k) instead of O(n)
1551 for key in write_set {
1552 if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1553 chain.commit(txn_id, commit_ts);
1554 }
1555 }
1556 }
1557
1558 /// Legacy commit method (iterates all keys) - kept for backward compatibility
1559 #[allow(dead_code)]
1560 pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1561 for mut entry in self.data.iter_mut() {
1562 entry.value_mut().commit(txn_id, commit_ts);
1563 }
1564 }
1565
1566 /// Abort all versions for a transaction
1567 pub fn abort(&self, txn_id: u64) {
1568 for mut entry in self.data.iter_mut() {
1569 entry.value_mut().abort(txn_id);
1570 }
1571 }
1572
1573 /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1574 ///
1575 /// ## Performance
1576 ///
1577 /// When ordered_index is enabled: O(log N + K) complexity
1578 /// - O(log N) to seek to the first key with prefix
1579 /// - O(K) to iterate matching keys
1580 ///
1581 /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1582 ///
1583 /// ## Optimizations Applied
1584 ///
1585 /// - Pre-allocates result vector based on expected output size
1586 /// - Uses batch-friendly iteration patterns
1587 /// - Minimizes allocations during iteration
1588 /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1589 pub fn scan_prefix(
1590 &self,
1591 prefix: &[u8],
1592 snapshot_ts: u64,
1593 current_txn_id: Option<u64>,
1594 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1595 // Estimate result size for pre-allocation (use 10% of total as heuristic)
1596 let estimated_size = (self.data.len() / 10).max(64);
1597 let mut results = Vec::with_capacity(estimated_size);
1598
1599 if let Some(ref idx) = self.deferred_index {
1600 // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1601 for key in idx.range_from(prefix) {
1602 // Stop when we've passed the prefix range
1603 if !key.starts_with(prefix) {
1604 break;
1605 }
1606
1607 // O(1) lookup in DashMap for version chain
1608 if let Some(chain) = self.data.get(&key)
1609 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1610 && let Some(value) = &v.value
1611 {
1612 results.push((key, value.clone()));
1613 }
1614 }
1615 } else if let Some(ref idx) = self.ordered_index {
1616 // Fast path: O(log N) seek to first key >= prefix
1617 for entry in idx.range(prefix.to_vec()..) {
1618 let key = entry.key();
1619
1620 // Stop when we've passed the prefix range
1621 if !key.starts_with(prefix) {
1622 break;
1623 }
1624
1625 // O(1) lookup in DashMap for version chain
1626 if let Some(chain) = self.data.get(key)
1627 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1628 && let Some(value) = &v.value
1629 {
1630 results.push((key.clone(), value.clone()));
1631 }
1632 }
1633 } else {
1634 // Fallback: O(N) full DashMap scan when ordered_index is disabled
1635 // Optimized with batch-friendly iteration
1636 for entry in self.data.iter() {
1637 let key = entry.key();
1638 if !key.starts_with(prefix) {
1639 continue;
1640 }
1641 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1642 && let Some(value) = &v.value
1643 {
1644 results.push((key.clone(), value.clone()));
1645 }
1646 }
1647 }
1648
1649 results
1650 }
1651
1652 /// Optimized full scan with batch allocation
1653 ///
1654 /// For use when scanning entire tables/namespaces.
1655 /// Pre-allocates based on actual data size.
1656 pub fn scan_all(
1657 &self,
1658 snapshot_ts: u64,
1659 current_txn_id: Option<u64>,
1660 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1661 let mut results = Vec::with_capacity(self.data.len());
1662
1663 for entry in self.data.iter() {
1664 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1665 && let Some(value) = &v.value
1666 {
1667 results.push((entry.key().clone(), value.clone()));
1668 }
1669 }
1670
1671 results
1672 }
1673
1674 /// Streaming scan iterator for very large datasets
1675 ///
1676 /// Returns an iterator that yields (key, value) pairs without
1677 /// materializing the entire result set in memory.
1678 pub fn scan_prefix_iter<'a>(
1679 &'a self,
1680 prefix: &'a [u8],
1681 snapshot_ts: u64,
1682 current_txn_id: Option<u64>,
1683 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1684 self.data.iter().filter_map(move |entry| {
1685 let key = entry.key();
1686 if !key.starts_with(prefix) {
1687 return None;
1688 }
1689 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1690 && let Some(value) = &v.value
1691 {
1692 Some((key.clone(), value.clone()))
1693 } else {
1694 None
1695 }
1696 })
1697 }
1698
1699 /// Scan range
1700 pub fn scan_range(
1701 &self,
1702 start: &[u8],
1703 end: &[u8],
1704 snapshot_ts: u64,
1705 current_txn_id: Option<u64>,
1706 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1707 let mut results = Vec::new();
1708
1709 if let Some(ref idx) = self.deferred_index {
1710 // Deferred index path: sort-on-scan
1711 if end.is_empty() {
1712 for key in idx.range_from(start) {
1713 if let Some(chain) = self.data.get(&key)
1714 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1715 && let Some(value) = &v.value
1716 {
1717 results.push((key, value.clone()));
1718 }
1719 }
1720 } else {
1721 for key in idx.range(start, end) {
1722 if let Some(chain) = self.data.get(&key)
1723 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1724 && let Some(value) = &v.value
1725 {
1726 results.push((key, value.clone()));
1727 }
1728 }
1729 }
1730 } else if let Some(ref idx) = self.ordered_index {
1731 // Use range scan on SkipMap
1732 if end.is_empty() {
1733 // Unbounded end
1734 for entry in idx.range(start.to_vec()..) {
1735 let key = entry.key();
1736 if let Some(chain) = self.data.get(key)
1737 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1738 && let Some(value) = &v.value
1739 {
1740 results.push((key.clone(), value.clone()));
1741 }
1742 }
1743 } else {
1744 for entry in idx.range(start.to_vec()..end.to_vec()) {
1745 let key = entry.key();
1746 if let Some(chain) = self.data.get(key)
1747 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1748 && let Some(value) = &v.value
1749 {
1750 results.push((key.clone(), value.clone()));
1751 }
1752 }
1753 }
1754 } else {
1755 // Fallback to full scan if no ordered index
1756 for entry in self.data.iter() {
1757 let key = entry.key();
1758
1759 if key.as_slice() < start {
1760 continue;
1761 }
1762 if !end.is_empty() && key.as_slice() >= end {
1763 continue;
1764 }
1765
1766 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1767 && let Some(value) = &v.value
1768 {
1769 results.push((key.clone(), value.clone()));
1770 }
1771 }
1772 }
1773
1774 results
1775 }
1776
1777 /// Streaming range scan iterator for very large datasets
1778 ///
1779 /// Returns an iterator that yields (key, value) pairs without
1780 /// materializing the entire result set in memory. Uses the ordered
1781 /// index when available for O(log N + K) complexity.
1782 ///
1783 /// ## Zero-Allocation Design
1784 ///
1785 /// While the iterator itself cannot avoid allocations for returned
1786 /// values (since the caller needs ownership), it avoids:
1787 /// - Pre-materializing all results
1788 /// - Intermediate buffers
1789 /// - Repeated key comparisons for already-visited entries
1790 ///
1791 /// ## Usage
1792 ///
1793 /// ```ignore
1794 /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1795 /// // Process each result as it arrives
1796 /// // Memory usage is O(1) per iteration, not O(N) total
1797 /// }
1798 /// ```
1799 pub fn scan_range_iter<'a>(
1800 &'a self,
1801 start: &'a [u8],
1802 end: &'a [u8],
1803 snapshot_ts: u64,
1804 current_txn_id: Option<u64>,
1805 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1806 // Compact deferred index before scanning if needed
1807 if let Some(ref idx) = self.deferred_index {
1808 idx.compact();
1809 }
1810
1811 // Use either ordered index or full scan
1812 let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1813
1814 // Create iterator based on availability of ordered index
1815 ScanRangeIterator {
1816 memtable: self,
1817 start: start.to_vec(),
1818 end: end.to_vec(),
1819 snapshot_ts,
1820 current_txn_id,
1821 use_ordered,
1822 ordered_iter: None,
1823 unordered_iter: None,
1824 initialized: false,
1825 }
1826 }
1827
1828 /// Get approximate size
1829 pub fn size(&self) -> u64 {
1830 self.size_bytes.load(Ordering::Relaxed)
1831 }
1832
1833 /// Garbage collect old versions using epoch-based dirty list
1834 ///
1835 /// O(expired_versions) instead of O(all_versions)
1836 /// Only visits keys that had versions created in the old epoch.
1837 pub fn gc(&self, min_active_ts: u64) -> usize {
1838 // Advance epoch and get the dirty keys from the old epoch
1839 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1840
1841 if dirty_keys.is_empty() {
1842 return 0;
1843 }
1844
1845 let mut gc_count = 0;
1846
1847 // Only visit keys that were modified in the old epoch
1848 // Use a HashSet to deduplicate keys that were written multiple times
1849 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1850
1851 for key in unique_keys {
1852 if let Some(mut entry) = self.data.get_mut(&key) {
1853 let before = entry.value().version_count();
1854 entry.value_mut().gc(min_active_ts);
1855 gc_count += before.saturating_sub(entry.value().version_count());
1856 }
1857 }
1858
1859 gc_count
1860 }
1861
1862 /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1863 #[allow(dead_code)]
1864 pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1865 let mut gc_count = 0;
1866
1867 for mut entry in self.data.iter_mut() {
1868 let before = entry.value().version_count();
1869 entry.value_mut().gc(min_active_ts);
1870 gc_count += before.saturating_sub(entry.value().version_count());
1871 }
1872
1873 gc_count
1874 }
1875}
1876
1877// =============================================================================
1878// Rec 11: Unified MvccStore Implementation for MvccMemTable
1879// =============================================================================
1880
1881impl sochdb_core::version_chain::MvccStore for MvccMemTable {
1882 fn mvcc_get(&self, key: &[u8], snapshot_ts: u64, txn_id: Option<u64>) -> Option<Vec<u8>> {
1883 self.read(key, snapshot_ts, txn_id)
1884 }
1885
1886 fn mvcc_put(
1887 &self,
1888 key: &[u8],
1889 value: Option<Vec<u8>>,
1890 txn_id: u64,
1891 ) -> std::result::Result<(), sochdb_core::version_chain::MvccStoreError> {
1892 let mut entry = self.data.entry(key.to_vec()).or_default();
1893 if entry.has_write_conflict(txn_id) {
1894 return Err(sochdb_core::version_chain::MvccStoreError::WriteConflict);
1895 }
1896 entry.add_uncommitted(value, txn_id);
1897 Ok(())
1898 }
1899
1900 fn mvcc_commit_key(&self, key: &[u8], txn_id: u64, commit_ts: u64) -> bool {
1901 if let Some(mut chain) = self.data.get_mut(key) {
1902 return chain.commit(txn_id, commit_ts);
1903 }
1904 false
1905 }
1906
1907 fn mvcc_abort_key(&self, key: &[u8], txn_id: u64) {
1908 if let Some(mut chain) = self.data.get_mut(key) {
1909 chain.abort(txn_id);
1910 }
1911 }
1912
1913 fn mvcc_has_conflict(&self, key: &[u8], txn_id: u64) -> bool {
1914 self.data
1915 .get(key)
1916 .map(|chain| chain.has_write_conflict(txn_id))
1917 .unwrap_or(false)
1918 }
1919
1920 fn mvcc_gc(&self, min_ts: u64) -> sochdb_core::version_chain::MvccGcStats {
1921 let mut stats = sochdb_core::version_chain::MvccGcStats::default();
1922 for mut entry in self.data.iter_mut() {
1923 stats.keys_scanned += 1;
1924 let before = entry.value().version_count();
1925 entry.value_mut().gc(min_ts);
1926 stats.versions_removed += before.saturating_sub(entry.value().version_count());
1927 }
1928 stats
1929 }
1930
1931 fn mvcc_key_count(&self) -> usize {
1932 self.data.len()
1933 }
1934}
1935
1936// ============================================================================
1937// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1938// ============================================================================
1939
1940use crate::key_buffer::ArenaKeyHandle;
1941
1942/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1943struct ArenaEpochDirtyList {
1944 epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1945 current_epoch: AtomicU64,
1946}
1947
1948impl ArenaEpochDirtyList {
1949 fn new() -> Self {
1950 Self {
1951 epochs: [
1952 parking_lot::Mutex::new(Vec::new()),
1953 parking_lot::Mutex::new(Vec::new()),
1954 parking_lot::Mutex::new(Vec::new()),
1955 parking_lot::Mutex::new(Vec::new()),
1956 ],
1957 current_epoch: AtomicU64::new(0),
1958 }
1959 }
1960
1961 #[inline]
1962 fn record_version(&self, key: ArenaKeyHandle) {
1963 let epoch = self.current_epoch.load(Ordering::Relaxed);
1964 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1965 self.epochs[idx].lock().push(key);
1966 }
1967
1968 fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1969 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1970 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1971 let mut guard = self.epochs[old_idx].lock();
1972 let keys = std::mem::take(&mut *guard);
1973 (old_epoch, keys)
1974 }
1975}
1976
1977/// Arena-backed MVCC MemTable with optimized key storage
1978///
1979/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1980/// reducing per-write allocations from 3 to 1:
1981///
1982/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1983/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1984///
1985/// ## Performance
1986///
1987/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1988/// by reducing:
1989/// - Heap allocations: 3 → 1 per write
1990/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1991pub struct ArenaMvccMemTable {
1992 /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1993 data: DashMap<ArenaKeyHandle, VersionChain>,
1994 /// Ordered index for prefix scans
1995 ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1996 /// Approximate size in bytes
1997 size_bytes: AtomicU64,
1998 /// Epoch-based dirty list (arena-backed)
1999 dirty_list: ArenaEpochDirtyList,
2000}
2001
2002impl ArenaMvccMemTable {
2003 pub fn new() -> Self {
2004 Self::with_ordered_index(true)
2005 }
2006
2007 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
2008 Self {
2009 data: DashMap::new(),
2010 ordered_index: if enable_ordered_index {
2011 Some(SkipMap::new())
2012 } else {
2013 None
2014 },
2015 size_bytes: AtomicU64::new(0),
2016 dirty_list: ArenaEpochDirtyList::new(),
2017 }
2018 }
2019
2020 /// Write a key-value pair using arena key handle
2021 ///
2022 /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
2023 /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
2024 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2025 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2026 let key_len = key.len();
2027
2028 // Create ONE ArenaKeyHandle - this is the only allocation for the key
2029 let key_handle = ArenaKeyHandle::new(key);
2030
2031 // Track in dirty list (O(1) copy of 16-byte handle)
2032 self.dirty_list.record_version(key_handle.clone());
2033
2034 // Insert into ordered index (O(1) copy of 16-byte handle)
2035 if let Some(ref idx) = self.ordered_index {
2036 idx.insert(key_handle.clone(), ());
2037 }
2038
2039 // Use entry API with the handle
2040 let mut entry = self.data.entry(key_handle).or_default();
2041
2042 if entry.has_write_conflict(txn_id) {
2043 return Err(SochDBError::Internal(
2044 "Write-write conflict detected".into(),
2045 ));
2046 }
2047 entry.add_uncommitted(value, txn_id);
2048 self.size_bytes
2049 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
2050
2051 Ok(())
2052 }
2053
2054 /// Write batch using arena key handles
2055 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2056 let mut total_size = 0u64;
2057
2058 for (key, value) in writes {
2059 let key_handle = ArenaKeyHandle::new(key);
2060
2061 self.dirty_list.record_version(key_handle.clone());
2062
2063 if let Some(ref idx) = self.ordered_index {
2064 idx.insert(key_handle.clone(), ());
2065 }
2066
2067 let mut entry = self.data.entry(key_handle).or_default();
2068
2069 if entry.has_write_conflict(txn_id) {
2070 return Err(SochDBError::Internal(
2071 "Write-write conflict detected".into(),
2072 ));
2073 }
2074
2075 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
2076 entry.add_uncommitted(value.clone(), txn_id);
2077 total_size += (key.len() + value_size) as u64;
2078 }
2079
2080 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
2081 Ok(())
2082 }
2083
2084 /// Read at snapshot timestamp
2085 pub fn read(
2086 &self,
2087 key: &[u8],
2088 snapshot_ts: u64,
2089 current_txn_id: Option<u64>,
2090 ) -> Option<Vec<u8>> {
2091 // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2092 let key_handle = ArenaKeyHandle::new(key);
2093 self.data.get(&key_handle).and_then(|chain| {
2094 chain
2095 .read_at(snapshot_ts, current_txn_id)
2096 .and_then(|v| v.value.clone())
2097 })
2098 }
2099
2100 /// Commit transaction
2101 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2102 for key in write_set {
2103 let key_handle = ArenaKeyHandle::new(key.as_slice());
2104 if let Some(mut chain) = self.data.get_mut(&key_handle) {
2105 chain.commit(txn_id, commit_ts);
2106 }
2107 }
2108 }
2109
2110 /// Abort transaction
2111 pub fn abort(&self, txn_id: u64) {
2112 for mut entry in self.data.iter_mut() {
2113 entry.value_mut().abort(txn_id);
2114 }
2115 }
2116
2117 /// Scan prefix
2118 pub fn scan_prefix(
2119 &self,
2120 prefix: &[u8],
2121 snapshot_ts: u64,
2122 current_txn_id: Option<u64>,
2123 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2124 let mut results = Vec::new();
2125 let prefix_handle = ArenaKeyHandle::new(prefix);
2126
2127 if let Some(ref idx) = self.ordered_index {
2128 for entry in idx.range(prefix_handle..) {
2129 let key = entry.key();
2130
2131 if !key.as_bytes().starts_with(prefix) {
2132 break;
2133 }
2134
2135 if let Some(chain) = self.data.get(key)
2136 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2137 && let Some(value) = &v.value
2138 {
2139 results.push((key.as_bytes().to_vec(), value.clone()));
2140 }
2141 }
2142 } else {
2143 for entry in self.data.iter() {
2144 let key = entry.key();
2145 if !key.as_bytes().starts_with(prefix) {
2146 continue;
2147 }
2148 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2149 && let Some(value) = &v.value
2150 {
2151 results.push((key.as_bytes().to_vec(), value.clone()));
2152 }
2153 }
2154 }
2155
2156 results
2157 }
2158
2159 /// Get approximate size
2160 pub fn size(&self) -> u64 {
2161 self.size_bytes.load(Ordering::Relaxed)
2162 }
2163
2164 /// Garbage collect old versions
2165 pub fn gc(&self, min_active_ts: u64) -> usize {
2166 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2167
2168 if dirty_keys.is_empty() {
2169 return 0;
2170 }
2171
2172 let mut gc_count = 0;
2173 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2174
2175 for key in unique_keys {
2176 if let Some(mut entry) = self.data.get_mut(&key) {
2177 let before = entry.value().version_count();
2178 entry.value_mut().gc(min_active_ts);
2179 gc_count += before.saturating_sub(entry.value().version_count());
2180 }
2181 }
2182
2183 gc_count
2184 }
2185}
2186
2187impl Default for ArenaMvccMemTable {
2188 fn default() -> Self {
2189 Self::new()
2190 }
2191}
2192
2193// ============================================================================
2194// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2195// ============================================================================
2196
2197/// Configuration for memtable type selection
2198#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2199pub enum MemTableType {
2200 /// Standard MVCC memtable with deferred sorting
2201 /// Best for: general workloads, balanced read/write
2202 Standard,
2203 /// Arena-backed memtable with reduced allocations
2204 /// Best for: write-heavy workloads, large keys
2205 Arena,
2206}
2207
2208impl Default for MemTableType {
2209 fn default() -> Self {
2210 // Default to Standard which now has deferred sorting
2211 MemTableType::Standard
2212 }
2213}
2214
2215/// Unified memtable abstraction using enum dispatch
2216///
2217/// This pattern provides:
2218/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2219/// - Type-safe switching between implementations
2220/// - Easy extensibility for future memtable types
2221///
2222/// ## Why Enum over Trait Object?
2223///
2224/// - Hot path performance: enum match is a single branch vs vtable indirection
2225/// - Cache friendliness: no pointer chasing
2226/// - Inlining: compiler can inline through enum dispatch
2227pub enum MemTableKind {
2228 Standard(MvccMemTable),
2229 Arena(ArenaMvccMemTable),
2230}
2231
2232impl MemTableKind {
2233 /// Create a new memtable of the specified type
2234 pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2235 match kind {
2236 MemTableType::Standard => {
2237 MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2238 }
2239 MemTableType::Arena => {
2240 MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2241 }
2242 }
2243 }
2244
2245 /// Write a key-value pair
2246 #[inline]
2247 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2248 match self {
2249 MemTableKind::Standard(m) => m.write(key, value, txn_id),
2250 MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2251 }
2252 }
2253
2254 /// Write batch of key-value pairs
2255 #[inline]
2256 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2257 match self {
2258 MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2259 MemTableKind::Arena(m) => {
2260 // Convert to arena-compatible format
2261 let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2262 .iter()
2263 .map(|(k, v)| (k.as_slice(), v.clone()))
2264 .collect();
2265 m.write_batch(&arena_writes, txn_id)
2266 }
2267 }
2268 }
2269
2270 /// Read at snapshot timestamp
2271 #[inline]
2272 pub fn read(
2273 &self,
2274 key: &[u8],
2275 snapshot_ts: u64,
2276 current_txn_id: Option<u64>,
2277 ) -> Option<Vec<u8>> {
2278 match self {
2279 MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2280 MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2281 }
2282 }
2283
2284 /// Commit transaction
2285 #[inline]
2286 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2287 match self {
2288 MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2289 MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2290 }
2291 }
2292
2293 /// Abort transaction
2294 #[inline]
2295 pub fn abort(&self, txn_id: u64) {
2296 match self {
2297 MemTableKind::Standard(m) => m.abort(txn_id),
2298 MemTableKind::Arena(m) => m.abort(txn_id),
2299 }
2300 }
2301
2302 /// Scan prefix
2303 #[inline]
2304 pub fn scan_prefix(
2305 &self,
2306 prefix: &[u8],
2307 snapshot_ts: u64,
2308 current_txn_id: Option<u64>,
2309 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2310 match self {
2311 MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2312 MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2313 }
2314 }
2315
2316 /// Scan range
2317 #[inline]
2318 pub fn scan_range(
2319 &self,
2320 start: &[u8],
2321 end: &[u8],
2322 snapshot_ts: u64,
2323 current_txn_id: Option<u64>,
2324 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2325 match self {
2326 MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2327 MemTableKind::Arena(m) => {
2328 // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2329 let mut results = Vec::new();
2330 if let Some(ref idx) = m.ordered_index {
2331 let start_handle = ArenaKeyHandle::new(start);
2332 let end_handle = ArenaKeyHandle::new(end);
2333
2334 if end.is_empty() {
2335 for entry in idx.range(start_handle..) {
2336 let key = entry.key();
2337 if let Some(chain) = m.data.get(key)
2338 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2339 && let Some(value) = &v.value
2340 {
2341 results.push((key.as_bytes().to_vec(), value.clone()));
2342 }
2343 }
2344 } else {
2345 for entry in idx.range(start_handle..end_handle) {
2346 let key = entry.key();
2347 if let Some(chain) = m.data.get(key)
2348 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2349 && let Some(value) = &v.value
2350 {
2351 results.push((key.as_bytes().to_vec(), value.clone()));
2352 }
2353 }
2354 }
2355 } else {
2356 for entry in m.data.iter() {
2357 let key = entry.key();
2358 let key_bytes = key.as_bytes();
2359 if key_bytes < start {
2360 continue;
2361 }
2362 if !end.is_empty() && key_bytes >= end {
2363 continue;
2364 }
2365 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2366 && let Some(value) = &v.value
2367 {
2368 results.push((key_bytes.to_vec(), value.clone()));
2369 }
2370 }
2371 }
2372 results
2373 }
2374 }
2375 }
2376
2377 /// Scan range iterator (returns collected results for now)
2378 #[inline]
2379 pub fn scan_range_iter<'a>(
2380 &'a self,
2381 start: &'a [u8],
2382 end: &'a [u8],
2383 snapshot_ts: u64,
2384 current_txn_id: Option<u64>,
2385 ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2386 match self {
2387 MemTableKind::Standard(m) => {
2388 Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2389 }
2390 MemTableKind::Arena(_) => {
2391 // Arena version returns collected results as iterator
2392 let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2393 Box::new(results.into_iter())
2394 }
2395 }
2396 }
2397
2398 /// Get approximate size
2399 #[inline]
2400 pub fn size(&self) -> u64 {
2401 match self {
2402 MemTableKind::Standard(m) => m.size(),
2403 MemTableKind::Arena(m) => m.size(),
2404 }
2405 }
2406
2407 /// Garbage collect old versions
2408 #[inline]
2409 pub fn gc(&self, min_active_ts: u64) -> usize {
2410 match self {
2411 MemTableKind::Standard(m) => m.gc(min_active_ts),
2412 MemTableKind::Arena(m) => m.gc(min_active_ts),
2413 }
2414 }
2415
2416 /// Get the kind of memtable
2417 pub fn kind(&self) -> MemTableType {
2418 match self {
2419 MemTableKind::Standard(_) => MemTableType::Standard,
2420 MemTableKind::Arena(_) => MemTableType::Arena,
2421 }
2422 }
2423}
2424
2425/// Durable storage engine with full ACID support
2426pub struct DurableStorage {
2427 /// Path to storage directory
2428 path: PathBuf,
2429 /// Write-ahead log
2430 wal: Arc<TxnWal>,
2431 /// MVCC manager
2432 mvcc: Arc<MvccManager>,
2433 /// In-memory data (unified abstraction over Standard/Arena)
2434 memtable: Arc<MemTableKind>,
2435 /// Per-transaction WAL buffers for batched writes
2436 /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2437 /// At commit, buffer is flushed to WAL with single lock acquisition
2438 txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2439 /// Group commit buffer (optional)
2440 group_commit: Option<Arc<EventDrivenGroupCommit>>,
2441 /// Recovery state
2442 needs_recovery: AtomicU64, // 1 = needs recovery
2443 /// Last checkpoint LSN
2444 last_checkpoint_lsn: AtomicU64,
2445 /// Synchronous mode (like SQLite's PRAGMA synchronous)
2446 /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2447 sync_mode: AtomicU64,
2448 /// Commits since last sync (for NORMAL mode)
2449 commits_since_sync: AtomicU64,
2450 /// Adaptive batch sizing for NORMAL mode (Little's Law)
2451 /// Arrival rate in requests/sec × 1000 for precision
2452 arrival_rate_ema: AtomicU64,
2453 /// Last commit timestamp in microseconds
2454 last_commit_us: AtomicU64,
2455 /// Estimated fsync latency in microseconds
2456 fsync_latency_us: AtomicU64,
2457 /// Database lock for exclusive access (None = no locking)
2458 #[allow(dead_code)]
2459 db_lock: Option<crate::lock::DatabaseLock>,
2460}
2461
2462impl DurableStorage {
2463 /// Open or create durable storage at path
2464 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2465 Self::open_with_config(path, true)
2466 }
2467
2468 /// Open with configurable ordered index
2469 ///
2470 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2471 /// but scan_prefix becomes O(N) instead of O(log N + K)
2472 pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2473 Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2474 }
2475
2476 /// Open with arena-backed memtable for write-heavy workloads
2477 ///
2478 /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2479 /// Best for workloads with:
2480 /// - High write throughput
2481 /// - Large keys (reduces allocation overhead)
2482 /// - Minimal concurrent reads during writes
2483 pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2484 Self::open_with_full_config(path, true, MemTableType::Arena)
2485 }
2486
2487 /// Open with full configuration options
2488 ///
2489 /// # Arguments
2490 /// * `path` - Storage directory path
2491 /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2492 /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2493 ///
2494 /// # Locking
2495 ///
2496 /// Acquires an exclusive advisory lock on the database directory.
2497 /// This prevents concurrent multi-process access which would corrupt data.
2498 /// If another process has the database open, returns `Err(DatabaseLocked)`.
2499 pub fn open_with_full_config<P: AsRef<Path>>(
2500 path: P,
2501 enable_ordered_index: bool,
2502 memtable_type: MemTableType,
2503 ) -> Result<Self> {
2504 Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2505 }
2506
2507 /// Open without locking (for testing crash recovery scenarios)
2508 ///
2509 /// # Safety
2510 /// This should ONLY be used in tests that simulate crashes by forgetting
2511 /// the storage instance. In production, always use `open_with_full_config`.
2512 #[cfg(test)]
2513 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2514 Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2515 }
2516
2517 /// Open an ephemeral (in-memory-like) DurableStorage backed by a temp directory.
2518 ///
2519 /// Uses the full DurableStorage engine (WAL, MVCC, SSI) but writes to a
2520 /// temporary directory that is automatically cleaned up when the
2521 /// `EphemeralHandle` is dropped. This ensures test and production code paths
2522 /// are identical — bugs found in tests are guaranteed to reproduce in production.
2523 ///
2524 /// # Returns
2525 /// An `EphemeralHandle` that owns both the storage and the temp directory.
2526 /// Access the storage via `handle.storage()` or `Deref` coercion.
2527 ///
2528 /// # Example
2529 /// ```ignore
2530 /// let handle = DurableStorage::open_ephemeral()?;
2531 /// let txn = handle.begin_transaction()?;
2532 /// handle.write(txn, b"key".to_vec(), b"value".to_vec())?;
2533 /// handle.commit(txn)?;
2534 /// // temp directory cleaned up when `handle` drops
2535 /// ```
2536 pub fn open_ephemeral() -> Result<EphemeralHandle> {
2537 let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2538 let storage = Self::open_with_full_config_internal(
2539 tmp.path(),
2540 true,
2541 MemTableType::Standard,
2542 false, // No lock needed for ephemeral
2543 )?;
2544 Ok(EphemeralHandle {
2545 storage,
2546 _tmpdir: tmp,
2547 })
2548 }
2549
2550 /// Open an ephemeral DurableStorage with group commit enabled.
2551 ///
2552 /// Same as `open_ephemeral()` but with group commit for higher throughput.
2553 pub fn open_ephemeral_with_group_commit() -> Result<EphemeralHandle> {
2554 let tmp = tempfile::tempdir().map_err(|e| SochDBError::Io(e))?;
2555 let mut storage =
2556 Self::open_with_full_config_internal(tmp.path(), true, MemTableType::Standard, false)?;
2557
2558 let wal = storage.wal.clone();
2559 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2560 for &txn_id in txn_ids {
2561 let entry = TxnWalEntry::txn_commit(txn_id);
2562 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2563 }
2564 wal.flush().map_err(|e| e.to_string())?;
2565 wal.sync().map_err(|e| e.to_string())?;
2566 Ok(std::time::SystemTime::now()
2567 .duration_since(std::time::UNIX_EPOCH)
2568 .unwrap()
2569 .as_micros() as u64)
2570 });
2571 storage.group_commit = Some(Arc::new(gc));
2572
2573 Ok(EphemeralHandle {
2574 storage,
2575 _tmpdir: tmp,
2576 })
2577 }
2578
2579 fn open_with_full_config_internal<P: AsRef<Path>>(
2580 path: P,
2581 enable_ordered_index: bool,
2582 memtable_type: MemTableType,
2583 acquire_lock: bool,
2584 ) -> Result<Self> {
2585 let path = path.as_ref().to_path_buf();
2586 std::fs::create_dir_all(&path)?;
2587
2588 // Acquire exclusive lock on database directory (unless disabled for testing)
2589 let db_lock = if acquire_lock {
2590 Some(
2591 crate::lock::DatabaseLock::acquire(&path)
2592 .map_err(|e| SochDBError::LockError(e.to_string()))?,
2593 )
2594 } else {
2595 None
2596 };
2597
2598 let wal_path = path.join("wal.log");
2599 let wal = Arc::new(TxnWal::new(&wal_path)?);
2600
2601 let storage = Self {
2602 path,
2603 wal: wal.clone(),
2604 mvcc: Arc::new(MvccManager::new()),
2605 memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2606 txn_write_buffers: DashMap::new(),
2607 group_commit: None,
2608 needs_recovery: AtomicU64::new(0),
2609 last_checkpoint_lsn: AtomicU64::new(0),
2610 sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2611 commits_since_sync: AtomicU64::new(0),
2612 // Adaptive batch sizing (Little's Law)
2613 arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2614 last_commit_us: AtomicU64::new(0),
2615 fsync_latency_us: AtomicU64::new(5000), // 5ms default
2616 db_lock,
2617 };
2618
2619 // Check if recovery needed
2620 if storage.check_recovery_needed()? {
2621 storage.needs_recovery.store(1, Ordering::SeqCst);
2622 }
2623
2624 Ok(storage)
2625 }
2626
2627 /// Open with group commit enabled
2628 pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2629 Self::open_with_group_commit_and_config(path, true)
2630 }
2631
2632 /// Open with group commit and configurable ordered index
2633 pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2634 path: P,
2635 enable_ordered_index: bool,
2636 ) -> Result<Self> {
2637 let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2638
2639 let wal = storage.wal.clone();
2640 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2641 // Write all commit records WITHOUT flushing (batch them)
2642 for &txn_id in txn_ids {
2643 let entry = TxnWalEntry::txn_commit(txn_id);
2644 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2645 }
2646
2647 // Then do a SINGLE flush + fsync for the entire batch
2648 wal.flush().map_err(|e| e.to_string())?;
2649 wal.sync().map_err(|e| e.to_string())?;
2650
2651 // Return commit timestamp
2652 Ok(std::time::SystemTime::now()
2653 .duration_since(std::time::UNIX_EPOCH)
2654 .unwrap()
2655 .as_micros() as u64)
2656 });
2657
2658 storage.group_commit = Some(Arc::new(gc));
2659 Ok(storage)
2660 }
2661
2662 /// Open with IndexPolicy for automatic memtable/index configuration
2663 ///
2664 /// This is the recommended constructor for new code. The policy determines:
2665 /// - Whether to use ordered index (ScanOptimized only)
2666 /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2667 /// - Default settings optimized for the workload pattern
2668 ///
2669 /// # Arguments
2670 /// * `path` - Storage directory path
2671 /// * `policy` - Index policy determining write/scan tradeoffs
2672 /// * `group_commit` - Whether to enable group commit for throughput
2673 pub fn open_with_policy<P: AsRef<Path>>(
2674 path: P,
2675 policy: crate::index_policy::IndexPolicy,
2676 group_commit: bool,
2677 ) -> Result<Self> {
2678 use crate::index_policy::IndexPolicy;
2679
2680 // Derive configuration from policy
2681 let (enable_ordered_index, memtable_type) = match policy {
2682 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2683 // Write-heavy: no ordered index, use arena for reduced allocations
2684 (false, MemTableType::Arena)
2685 }
2686 IndexPolicy::Balanced => {
2687 // Mixed OLTP: deferred sorting (already implemented in Standard)
2688 (true, MemTableType::Standard)
2689 }
2690 IndexPolicy::ScanOptimized => {
2691 // Scan-heavy: maintain ordered index
2692 (true, MemTableType::Standard)
2693 }
2694 };
2695
2696 if group_commit {
2697 let mut storage =
2698 Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2699
2700 let wal = storage.wal.clone();
2701 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2702 for &txn_id in txn_ids {
2703 let entry = TxnWalEntry::txn_commit(txn_id);
2704 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2705 }
2706 wal.flush().map_err(|e| e.to_string())?;
2707 wal.sync().map_err(|e| e.to_string())?;
2708 Ok(std::time::SystemTime::now()
2709 .duration_since(std::time::UNIX_EPOCH)
2710 .unwrap()
2711 .as_micros() as u64)
2712 });
2713 storage.group_commit = Some(Arc::new(gc));
2714 Ok(storage)
2715 } else {
2716 Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2717 }
2718 }
2719
2720 /// Open storage for concurrent mode (multi-reader, single-writer)
2721 ///
2722 /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2723 /// Coordination is handled by the concurrent MVCC layer instead.
2724 ///
2725 /// # Safety
2726 ///
2727 /// This must ONLY be called from `Database::open_concurrent()` which
2728 /// manages the concurrent MVCC coordination. Direct use will cause
2729 /// data corruption.
2730 pub fn open_for_concurrent<P: AsRef<Path>>(
2731 path: P,
2732 policy: crate::index_policy::IndexPolicy,
2733 ) -> Result<Self> {
2734 use crate::index_policy::IndexPolicy;
2735
2736 let (enable_ordered_index, memtable_type) = match policy {
2737 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => (false, MemTableType::Arena),
2738 IndexPolicy::Balanced => (true, MemTableType::Standard),
2739 IndexPolicy::ScanOptimized => (true, MemTableType::Standard),
2740 };
2741
2742 // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2743 Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, false)
2744 }
2745
2746 /// Get the memtable type being used
2747 pub fn memtable_type(&self) -> MemTableType {
2748 self.memtable.kind()
2749 }
2750
2751 /// Check if recovery is needed (dirty shutdown detection)
2752 ///
2753 /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2754 /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2755 /// but committed data still needs to be loaded from WAL into the memtable.
2756 fn check_recovery_needed(&self) -> Result<bool> {
2757 let marker_path = self.path.join(".clean_shutdown");
2758 if marker_path.exists() {
2759 // Clean shutdown - remove marker
2760 std::fs::remove_file(&marker_path)?;
2761 }
2762 // ALWAYS need recovery to rebuild memtable from WAL
2763 // The memtable is in-memory only and needs to be restored on every startup
2764 Ok(true)
2765 }
2766
2767 /// Perform crash recovery
2768 pub fn recover(&self) -> Result<RecoveryStats> {
2769 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2770 return Ok(RecoveryStats::default());
2771 }
2772
2773 let (writes, txn_count) = self.wal.replay_for_recovery()?;
2774
2775 // Apply committed writes to memtable
2776 let recovery_txn_id = self.wal.alloc_txn_id();
2777 let commit_ts = self.mvcc.alloc_commit_ts();
2778
2779 // Collect keys being written for efficient commit
2780 let mut write_set: HashSet<InlineKey> = HashSet::new();
2781 for (key, value) in &writes {
2782 write_set.insert(SmallVec::from_slice(key));
2783 self.memtable
2784 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2785 }
2786 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2787
2788 self.needs_recovery.store(0, Ordering::SeqCst);
2789
2790 Ok(RecoveryStats {
2791 transactions_recovered: txn_count,
2792 writes_recovered: writes.len(),
2793 commit_ts,
2794 })
2795 }
2796
2797 /// Begin a new transaction
2798 pub fn begin_transaction(&self) -> Result<u64> {
2799 let txn_id = self.wal.begin_transaction()?;
2800 self.mvcc.begin(txn_id);
2801 Ok(txn_id)
2802 }
2803
2804 /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2805 ///
2806 /// This enables mode-aware optimizations:
2807 /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2808 /// - WriteOnly: Skip read tracking, faster bulk inserts
2809 /// - ReadWrite: Full SSI for serializable isolation
2810 pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2811 let txn_id = self.wal.begin_transaction()?;
2812 self.mvcc.begin_with_mode(txn_id, mode);
2813 Ok(txn_id)
2814 }
2815
2816 /// Begin a read-only transaction without any WAL records.
2817 ///
2818 /// This is a performance-critical optimization that eliminates two WAL
2819 /// mutex acquisitions per read (TxnBegin + TxnAbort). Since read-only
2820 /// transactions have no state to recover, WAL records are unnecessary.
2821 ///
2822 /// Callers MUST use `abort_read_only_fast()` to clean up.
2823 #[inline]
2824 pub fn begin_read_only_fast(&self) -> u64 {
2825 let txn_id = self.wal.alloc_txn_id();
2826 self.mvcc.begin_read_only(txn_id);
2827 txn_id
2828 }
2829
2830 /// Abort a fast read-only transaction.
2831 ///
2832 /// O(1) cleanup: only removes MVCC state. No WAL write, no memtable scan.
2833 #[inline]
2834 pub fn abort_read_only_fast(&self, txn_id: u64) {
2835 self.mvcc.abort(txn_id);
2836 }
2837
2838 /// Read a key WITHOUT any MVCC transaction tracking.
2839 ///
2840 /// Uses the current global timestamp to see all committed writes.
2841 /// Bypasses: begin/abort, active_txns DashMap, record_read, stats.
2842 /// Only safe for single-threaded access (no concurrent writes).
2843 #[inline]
2844 pub fn read_latest(&self, key: &[u8]) -> Option<Vec<u8>> {
2845 let snapshot_ts = self
2846 .mvcc
2847 .ts_counter
2848 .load(std::sync::atomic::Ordering::Relaxed);
2849 self.memtable.read(key, snapshot_ts, None)
2850 }
2851
2852 /// Scan keys with a prefix WITHOUT any MVCC transaction tracking.
2853 ///
2854 /// Uses the current global timestamp. Only safe for single-threaded access.
2855 #[inline]
2856 pub fn scan_latest(&self, prefix: &[u8]) -> Vec<(Vec<u8>, Vec<u8>)> {
2857 let snapshot_ts = self
2858 .mvcc
2859 .ts_counter
2860 .load(std::sync::atomic::Ordering::Relaxed);
2861 self.memtable.scan_prefix(prefix, snapshot_ts, None)
2862 }
2863
2864 /// Read a key within a transaction
2865 #[inline]
2866 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2867 // Fast path: get just snapshot_ts without cloning whole transaction
2868 let snapshot_ts = self
2869 .mvcc
2870 .get_snapshot_ts(txn_id)
2871 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2872
2873 // Record read for SSI (skipped for read-only transactions)
2874 self.mvcc.record_read(txn_id, key);
2875
2876 // Read at snapshot timestamp, seeing own uncommitted writes
2877 Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2878 }
2879
2880 /// Write a key-value pair within a transaction
2881 ///
2882 /// Writes are buffered and only flushed to disk on commit.
2883 /// This provides ~10× better throughput for batched inserts.
2884 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2885 // Use the zero-allocation path internally
2886 self.write_refs(txn_id, &key, &value)?;
2887
2888 Ok(())
2889 }
2890
2891 /// Write from references - zero allocation hot path
2892 ///
2893 /// Avoids cloning key/value by writing to WAL from refs directly,
2894 /// then only allocating once for memtable storage.
2895 #[inline]
2896 pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2897 // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2898 self.mvcc.record_write(txn_id, key);
2899
2900 // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2901 // This reduces lock contention from O(writes) to O(1) per transaction
2902 self.txn_write_buffers
2903 .entry(txn_id)
2904 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2905 .append(key, value);
2906
2907 // Write to memtable (needs owned key/value for storage)
2908 self.memtable
2909 .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2910
2911 Ok(())
2912 }
2913
2914 /// Delete a key within a transaction
2915 pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2916 // Record write (uses InlineKey - zero allocation for small keys)
2917 self.mvcc.record_write(txn_id, &key);
2918
2919 // Buffer tombstone in memory - NO WAL lock taken!
2920 self.txn_write_buffers
2921 .entry(txn_id)
2922 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2923 .append(&key, &[]); // Empty value = tombstone
2924
2925 // Write tombstone to memtable
2926 self.memtable.write(key, None, txn_id)?;
2927
2928 Ok(())
2929 }
2930
2931 /// Batch write multiple key-value pairs with reduced overhead
2932 ///
2933 /// This API amortizes fixed costs over the batch:
2934 /// - Single DashMap entry lookup for TxnWalBuffer
2935 /// - Single MVCC write set update
2936 /// - Batch memtable operations
2937 ///
2938 /// Performance: ~2-3x faster than individual write_refs calls
2939 /// for batches of 100+ entries.
2940 ///
2941 /// # Arguments
2942 /// * `txn_id` - Transaction ID
2943 /// * `writes` - Slice of (key, value) pairs
2944 #[inline]
2945 pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2946 if writes.is_empty() {
2947 return Ok(());
2948 }
2949
2950 // Single DashMap access for entire batch
2951 let mut buffer_entry = self
2952 .txn_write_buffers
2953 .entry(txn_id)
2954 .or_insert_with(|| TxnWalBuffer::new(txn_id));
2955
2956 // Batch operations with reduced per-row overhead
2957 for (key, value) in writes {
2958 // Record write for MVCC
2959 self.mvcc.record_write(txn_id, key);
2960
2961 // Append to WAL buffer
2962 buffer_entry.append(key, value);
2963 }
2964 drop(buffer_entry);
2965
2966 // Batch write to memtable
2967 let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2968 .iter()
2969 .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2970 .collect();
2971 self.memtable.write_batch(&owned_writes, txn_id)?;
2972
2973 Ok(())
2974 }
2975
2976 /// Commit a transaction
2977 ///
2978 /// With sync_mode:
2979 /// - 0 (OFF): No sync, risk of data loss
2980 /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2981 /// - 2 (FULL): Sync every commit (safest, slowest)
2982 pub fn commit(&self, txn_id: u64) -> Result<u64> {
2983 // First, flush all buffered DATA writes to the WAL with a SINGLE lock
2984 // acquisition (O(1) lock instead of O(writes) locks). Flushing data
2985 // records before validation is safe: ARIES recovery only treats them as
2986 // winners if a *durable commit record* for this transaction also exists,
2987 // and that record is written below — strictly after validation succeeds.
2988 if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2989 && !buffer.is_empty()
2990 {
2991 // Flush entire buffer to WAL with one lock
2992 self.wal.flush_buffer(&buffer)?;
2993 }
2994
2995 // ====================================================================
2996 // Task 1 — Linearizable commit invariant:
2997 //
2998 // A commit record must NEVER become durable unless the transaction
2999 // has already passed SSI validation and its write set is final.
3000 //
3001 // We therefore VALIDATE (and freeze the write set) *before* making the
3002 // commit record durable. `mvcc.commit` performs SSI validation and
3003 // returns `None` on a dangerous structure (serialization conflict) or
3004 // if the transaction no longer exists. Writing/fsyncing the commit
3005 // record before this point would let a crash resurrect a transaction
3006 // that the live system rejected — the classic "committed-after-crash,
3007 // aborted-before-crash" non-linearizability bug.
3008 // ====================================================================
3009 let (commit_ts, write_set) = self.mvcc.commit(txn_id).ok_or_else(|| {
3010 SochDBError::Validation(
3011 "transaction aborted: SSI validation failed (serialization conflict) \
3012 or transaction not found"
3013 .into(),
3014 )
3015 })?;
3016
3017 // Validation passed and the write set is frozen. Only now may the
3018 // commit record become durable.
3019 if let Some(gc) = &self.group_commit {
3020 // Submit the *validated* commit intent to group commit and wait.
3021 // This batches multiple validated commits into a single fsync.
3022 gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
3023 } else {
3024 // Direct commit path with adaptive sync (Little's Law)
3025 let sync_mode = self.sync_mode.load(Ordering::Relaxed);
3026 let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
3027
3028 // Update arrival rate for adaptive batching
3029 self.update_arrival_rate();
3030
3031 // Write commit record (no flush yet - BufWriter will buffer it)
3032 let entry = TxnWalEntry::txn_commit(txn_id);
3033 self.wal.append_no_flush(&entry)?;
3034
3035 // Determine if we should sync/flush based on mode
3036 let should_sync = match sync_mode {
3037 0 => false, // OFF: never sync
3038 1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
3039 _ => true, // FULL: always sync
3040 };
3041
3042 if should_sync {
3043 // Measure fsync latency for adaptive tuning
3044 let start = std::time::Instant::now();
3045 self.wal.flush()?;
3046 self.wal.sync()?;
3047 let latency_us = start.elapsed().as_micros() as u64;
3048
3049 // Update fsync latency estimate (EMA with α = 0.1)
3050 let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
3051 let new_latency = (old_latency * 9 + latency_us) / 10;
3052 self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
3053
3054 self.commits_since_sync.store(0, Ordering::Relaxed);
3055 }
3056 }
3057
3058 // Commit record is durable (or buffered per sync mode) for a validated
3059 // transaction — publish the writes to the memtable. (O(k), k = keys.)
3060 self.memtable.commit(txn_id, commit_ts, &write_set);
3061
3062 Ok(commit_ts)
3063 }
3064
3065 /// Update arrival rate using exponential moving average
3066 #[inline]
3067 fn update_arrival_rate(&self) {
3068 let now_us = std::time::SystemTime::now()
3069 .duration_since(std::time::UNIX_EPOCH)
3070 .unwrap()
3071 .as_micros() as u64;
3072
3073 let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
3074
3075 if last > 0 {
3076 let delta_us = now_us.saturating_sub(last);
3077 if delta_us > 0 && delta_us < 10_000_000 {
3078 // Ignore gaps > 10s
3079 // Rate = 1_000_000 / delta_us (requests/sec)
3080 // Stored as rate × 1000 for precision
3081 let instant_rate = 1_000_000_000 / delta_us;
3082
3083 // EMA with α = 0.1
3084 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
3085 let new_rate = (old_rate * 9 + instant_rate) / 10;
3086 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
3087 }
3088 }
3089 }
3090
3091 /// Compute optimal batch threshold using Little's Law
3092 ///
3093 /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
3094 /// Returns the number of commits to batch before fsync
3095 #[inline]
3096 fn adaptive_batch_threshold(&self) -> u64 {
3097 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
3098 let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
3099
3100 if lambda <= 0.0 || tau <= 0.0 {
3101 return 100; // Fallback to fixed threshold
3102 }
3103
3104 // Little's Law: W* = sqrt(2 × τ × λ)
3105 // This minimizes total time = wait_time + fsync_overhead
3106 let n_opt = (2.0 * tau * lambda).sqrt();
3107
3108 // Clamp between 1 and 1000
3109 (n_opt as u64).clamp(1, 1000)
3110 }
3111
3112 /// Set synchronous mode
3113 ///
3114 /// - 0: OFF - No fsync (risk of data loss)
3115 /// - 1: NORMAL - Periodic fsync (balanced)
3116 /// - 2: FULL - Fsync every commit (safest)
3117 pub fn set_sync_mode(&self, mode: u64) {
3118 self.sync_mode.store(mode.min(2), Ordering::Relaxed);
3119 }
3120
3121 /// Force a group commit flush (useful for benchmarking or testing)
3122 pub fn flush_group_commit(&self) {
3123 if let Some(gc) = &self.group_commit {
3124 gc.flush_batch();
3125 }
3126 }
3127
3128 /// Abort a transaction
3129 ///
3130 /// Performance: O(1) for read-only transactions (no writes to clean up).
3131 /// For write transactions, O(N) memtable scan is required to remove
3132 /// uncommitted versions.
3133 pub fn abort(&self, txn_id: u64) -> Result<()> {
3134 // Check if transaction had any buffered writes.
3135 // Read-only transactions never populate txn_write_buffers,
3136 // so this returns None — allowing us to skip the O(N) memtable scan.
3137 let had_writes = self.txn_write_buffers.remove(&txn_id).is_some();
3138
3139 if had_writes {
3140 // Write abort record to WAL (only needed if data was written)
3141 self.wal.abort_transaction(txn_id)?;
3142 // Clean up uncommitted memtable entries
3143 self.memtable.abort(txn_id);
3144 }
3145
3146 // MVCC cleanup is always O(1) — just removes from active_txns DashMap
3147 self.mvcc.abort(txn_id);
3148
3149 Ok(())
3150 }
3151
3152 /// Scan keys with prefix
3153 #[inline]
3154 pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3155 // Fast path: get just snapshot_ts without cloning whole transaction
3156 let snapshot_ts = self
3157 .mvcc
3158 .get_snapshot_ts(txn_id)
3159 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3160
3161 // Note: Scan doesn't record individual key reads for SSI (too expensive)
3162 // SSI conflicts are tracked at the prefix level if needed
3163 Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
3164 }
3165
3166 /// Scan keys in range
3167 #[inline]
3168 pub fn scan_range(
3169 &self,
3170 txn_id: u64,
3171 start: &[u8],
3172 end: &[u8],
3173 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
3174 let snapshot_ts = self
3175 .mvcc
3176 .get_snapshot_ts(txn_id)
3177 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
3178
3179 Ok(self
3180 .memtable
3181 .scan_range(start, end, snapshot_ts, Some(txn_id)))
3182 }
3183
3184 /// Streaming scan for very large result sets
3185 ///
3186 /// Returns an iterator that yields (key, value) pairs without
3187 /// materializing the entire result set in memory.
3188 #[inline]
3189 pub fn scan_range_iter<'a>(
3190 &'a self,
3191 txn_id: u64,
3192 start: &'a [u8],
3193 end: &'a [u8],
3194 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
3195 let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
3196 self.memtable
3197 .scan_range_iter(start, end, snapshot_ts, Some(txn_id))
3198 }
3199
3200 /// Force fsync to disk
3201 /// Flush the WAL's in-memory buffer to the OS
3202 ///
3203 /// This ensures all buffered writes are pushed from the BufWriter
3204 /// into the OS page cache. Call this before `fsync()` to ensure
3205 /// all data is durable.
3206 pub fn flush_wal(&self) -> Result<()> {
3207 self.wal.flush()
3208 }
3209
3210 /// Force sync the WAL to disk (fsync)
3211 pub fn fsync(&self) -> Result<()> {
3212 self.wal.sync()
3213 }
3214
3215 /// Write checkpoint
3216 pub fn checkpoint(&self) -> Result<u64> {
3217 let txn_id = 0; // System transaction
3218 let entry = TxnWalEntry::checkpoint(txn_id);
3219 let lsn = self.wal.append_sync(&entry)?;
3220 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
3221 Ok(lsn)
3222 }
3223
3224 /// Truncate the WAL file after checkpoint.
3225 ///
3226 /// This physically truncates the WAL file to 0 bytes, reclaiming disk
3227 /// space. The in-memory memtable retains all data for the current
3228 /// session, but a crash after truncation will result in data loss
3229 /// since the WAL is the only persistence mechanism for DurableStorage.
3230 ///
3231 /// Call after `checkpoint()` when WAL durability across restarts is
3232 /// not required (e.g. desktop telemetry viewers, caches).
3233 pub fn truncate_wal(&self) -> Result<()> {
3234 self.wal.truncate()
3235 }
3236
3237 /// Get storage statistics
3238 pub fn stats(&self) -> StorageStats {
3239 // Get WAL size from the WAL manager
3240 let wal_size = self.wal.size_bytes();
3241
3242 // Get active transaction count from MVCC
3243 let active_txns = self.mvcc.active_transaction_count();
3244
3245 StorageStats {
3246 memtable_size_bytes: self.memtable.size(),
3247 wal_size_bytes: wal_size,
3248 active_transactions: active_txns,
3249 min_active_snapshot: self.mvcc.min_active_snapshot(),
3250 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3251 }
3252 }
3253
3254 /// Garbage collect old versions
3255 pub fn gc(&self) -> usize {
3256 let min_ts = self.mvcc.min_active_snapshot();
3257 self.memtable.gc(min_ts)
3258 }
3259
3260 /// Clean shutdown
3261 pub fn shutdown(&self) -> Result<()> {
3262 // Sync WAL
3263 self.fsync()?;
3264
3265 // Write clean shutdown marker
3266 let marker_path = self.path.join(".clean_shutdown");
3267 std::fs::write(&marker_path, b"clean")?;
3268
3269 Ok(())
3270 }
3271}
3272
3273impl Drop for DurableStorage {
3274 fn drop(&mut self) {
3275 let _ = self.shutdown();
3276 }
3277}
3278
3279// =============================================================================
3280// EphemeralHandle - Temp-directory-backed DurableStorage for testing
3281// =============================================================================
3282
3283/// Owns a `DurableStorage` instance backed by a temporary directory.
3284///
3285/// The temp directory is automatically cleaned up when this handle is dropped.
3286/// Access the underlying storage via `Deref` coercion or `.storage()`.
3287///
3288/// # Why this exists
3289///
3290/// SochDB previously had two storage engines — `LscsStorage` (BTreeMap-backed,
3291/// in-memory WAL, used by tests) and `DurableStorage` (SkipMap-backed, real WAL,
3292/// used in production). This dual-engine architecture meant bugs could surface
3293/// only in production because the test path exercised different code.
3294///
3295/// `EphemeralHandle` eliminates this divergence: tests use the exact same
3296/// `DurableStorage` engine as production, just backed by a temp directory.
3297pub struct EphemeralHandle {
3298 storage: DurableStorage,
3299 _tmpdir: tempfile::TempDir,
3300}
3301
3302impl EphemeralHandle {
3303 /// Get a reference to the underlying storage
3304 pub fn storage(&self) -> &DurableStorage {
3305 &self.storage
3306 }
3307
3308 /// Get a mutable reference to the underlying storage
3309 pub fn storage_mut(&mut self) -> &mut DurableStorage {
3310 &mut self.storage
3311 }
3312
3313 /// Consume the handle and return the storage and temp directory.
3314 ///
3315 /// Useful when you need an `Arc<DurableStorage>` — the caller must keep
3316 /// the `TempDir` alive for the lifetime of the storage.
3317 pub fn into_parts(self) -> (DurableStorage, tempfile::TempDir) {
3318 (self.storage, self._tmpdir)
3319 }
3320}
3321
3322impl std::ops::Deref for EphemeralHandle {
3323 type Target = DurableStorage;
3324 fn deref(&self) -> &DurableStorage {
3325 &self.storage
3326 }
3327}
3328
3329impl std::ops::DerefMut for EphemeralHandle {
3330 fn deref_mut(&mut self) -> &mut DurableStorage {
3331 &mut self.storage
3332 }
3333}
3334
3335/// Recovery statistics
3336#[derive(Debug, Default)]
3337pub struct RecoveryStats {
3338 pub transactions_recovered: usize,
3339 pub writes_recovered: usize,
3340 pub commit_ts: u64,
3341}
3342
3343/// Storage statistics
3344#[derive(Debug, Default)]
3345pub struct StorageStats {
3346 pub memtable_size_bytes: u64,
3347 pub wal_size_bytes: u64,
3348 pub active_transactions: usize,
3349 pub min_active_snapshot: u64,
3350 pub last_checkpoint_lsn: u64,
3351}
3352
3353#[cfg(test)]
3354mod tests {
3355 use super::*;
3356 use tempfile::tempdir;
3357
3358 #[test]
3359 fn test_basic_transaction() {
3360 let dir = tempdir().unwrap();
3361 let storage = DurableStorage::open(dir.path()).unwrap();
3362
3363 // Begin transaction
3364 let txn_id = storage.begin_transaction().unwrap();
3365
3366 // Write data
3367 storage
3368 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3369 .unwrap();
3370 storage
3371 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3372 .unwrap();
3373
3374 // Read back (within same transaction)
3375 let v1 = storage.read(txn_id, b"key1").unwrap();
3376 assert_eq!(v1, Some(b"value1".to_vec()));
3377
3378 // Commit
3379 let commit_ts = storage.commit(txn_id).unwrap();
3380 assert!(commit_ts > 0);
3381
3382 // Read in new transaction
3383 let txn2 = storage.begin_transaction().unwrap();
3384 let v1 = storage.read(txn2, b"key1").unwrap();
3385 assert_eq!(v1, Some(b"value1".to_vec()));
3386 storage.abort(txn2).unwrap();
3387 }
3388
3389 #[test]
3390 fn test_snapshot_isolation() {
3391 let dir = tempdir().unwrap();
3392 let storage = DurableStorage::open(dir.path()).unwrap();
3393
3394 // T1: Write initial value
3395 let t1 = storage.begin_transaction().unwrap();
3396 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3397 storage.commit(t1).unwrap();
3398
3399 // T2: Start reading (snapshot at this point)
3400 let t2 = storage.begin_transaction().unwrap();
3401
3402 // T3: Update the value
3403 let t3 = storage.begin_transaction().unwrap();
3404 storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3405 storage.commit(t3).unwrap();
3406
3407 // T2 should still see v1 (snapshot isolation)
3408 let v = storage.read(t2, b"key").unwrap();
3409 assert_eq!(v, Some(b"v1".to_vec()));
3410
3411 // New transaction should see v2
3412 let t4 = storage.begin_transaction().unwrap();
3413 let v = storage.read(t4, b"key").unwrap();
3414 assert_eq!(v, Some(b"v2".to_vec()));
3415
3416 storage.abort(t2).unwrap();
3417 storage.abort(t4).unwrap();
3418 }
3419
3420 #[test]
3421 fn test_gc_preserves_versions_for_active_snapshot() {
3422 // GC must not free a version that an in-flight reader's snapshot can
3423 // still observe. The low-water-mark (min_active_snapshot) is the oldest
3424 // live reader; any version visible to it must survive GC.
3425 let dir = tempdir().unwrap();
3426 let storage = DurableStorage::open(dir.path()).unwrap();
3427
3428 // Seed an initial committed version v1.
3429 let t1 = storage.begin_transaction().unwrap();
3430 storage.write(t1, b"k".to_vec(), b"v1".to_vec()).unwrap();
3431 storage.commit(t1).unwrap();
3432
3433 // Open a long-lived snapshot reader BEFORE any newer writes. Its
3434 // snapshot_ts pins the GC watermark so v1 cannot be reclaimed.
3435 let reader = storage.begin_transaction().unwrap();
3436 assert_eq!(
3437 storage.read(reader, b"k").unwrap(),
3438 Some(b"v1".to_vec()),
3439 "reader's snapshot must see v1"
3440 );
3441
3442 // Produce several newer committed versions while the reader is active.
3443 for v in ["v2", "v3", "v4"] {
3444 let w = storage.begin_transaction().unwrap();
3445 storage
3446 .write(w, b"k".to_vec(), v.as_bytes().to_vec())
3447 .unwrap();
3448 storage.commit(w).unwrap();
3449 }
3450
3451 // Run GC. Because `reader` is still active, the watermark is pinned at
3452 // its snapshot and the version it needs (v1) must NOT be freed.
3453 let _freed = storage.gc();
3454 assert_eq!(
3455 storage.read(reader, b"k").unwrap(),
3456 Some(b"v1".to_vec()),
3457 "GC must not free a version still visible to an active snapshot"
3458 );
3459
3460 // A fresh transaction sees the latest committed version.
3461 let fresh = storage.begin_transaction().unwrap();
3462 assert_eq!(storage.read(fresh, b"k").unwrap(), Some(b"v4".to_vec()));
3463 storage.abort(fresh).unwrap();
3464
3465 // Release the old reader; the watermark can now advance.
3466 storage.abort(reader).unwrap();
3467
3468 // After the old snapshot is gone, GC may reclaim superseded versions,
3469 // and new readers still resolve the latest value correctly.
3470 let _freed2 = storage.gc();
3471 let after = storage.begin_transaction().unwrap();
3472 assert_eq!(storage.read(after, b"k").unwrap(), Some(b"v4".to_vec()));
3473 storage.abort(after).unwrap();
3474 }
3475
3476 #[test]
3477 fn test_ssi_detects_write_skew() {
3478 // Classic write-skew: two concurrent transactions each read what the
3479 // other writes (disjoint write keys). Under plain SI both would commit,
3480 // violating serializability. Under SSI the pivot transaction (with both
3481 // an inbound and an outbound rw-edge) must be aborted, so the two
3482 // commits cannot both succeed.
3483 let dir = tempdir().unwrap();
3484 let storage = DurableStorage::open(dir.path()).unwrap();
3485
3486 // Seed two keys.
3487 let seed = storage.begin_transaction().unwrap();
3488 storage.write(seed, b"x".to_vec(), b"0".to_vec()).unwrap();
3489 storage.write(seed, b"y".to_vec(), b"0".to_vec()).unwrap();
3490 storage.commit(seed).unwrap();
3491
3492 // T1 reads x and y, then writes x.
3493 let t1 = storage.begin_transaction().unwrap();
3494 let _ = storage.read(t1, b"x").unwrap();
3495 let _ = storage.read(t1, b"y").unwrap();
3496
3497 // T2 reads x and y, then writes y. (Concurrent with T1.)
3498 let t2 = storage.begin_transaction().unwrap();
3499 let _ = storage.read(t2, b"x").unwrap();
3500 let _ = storage.read(t2, b"y").unwrap();
3501
3502 storage.write(t1, b"x".to_vec(), b"1".to_vec()).unwrap();
3503 storage.write(t2, b"y".to_vec(), b"1".to_vec()).unwrap();
3504
3505 // First committer wins.
3506 let c1 = storage.commit(t1);
3507 assert!(c1.is_ok(), "first committer should succeed: {c1:?}");
3508
3509 // The second committer forms a dangerous rw-structure with T1 and must
3510 // be rejected to preserve serializability.
3511 let c2 = storage.commit(t2);
3512 assert!(
3513 c2.is_err(),
3514 "SSI must abort the write-skew pivot, but commit succeeded"
3515 );
3516 }
3517
3518 #[test]
3519 fn test_abort_transaction() {
3520 let dir = tempdir().unwrap();
3521 let storage = DurableStorage::open(dir.path()).unwrap();
3522
3523 // Write initial value
3524 let t1 = storage.begin_transaction().unwrap();
3525 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3526 storage.commit(t1).unwrap();
3527
3528 // Start transaction that will abort
3529 let t2 = storage.begin_transaction().unwrap();
3530 storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3531 storage.abort(t2).unwrap();
3532
3533 // New transaction should see v1 (aborted changes not visible)
3534 let t3 = storage.begin_transaction().unwrap();
3535 let v = storage.read(t3, b"key").unwrap();
3536 assert_eq!(v, Some(b"v1".to_vec()));
3537 storage.abort(t3).unwrap();
3538 }
3539
3540 #[test]
3541 fn test_crash_recovery() {
3542 let dir = tempdir().unwrap();
3543
3544 // Phase 1: Write data and commit
3545 {
3546 // Use open_without_lock for crash simulation tests
3547 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3548
3549 // Set sync mode to FULL to ensure data is synced before "crash"
3550 storage.set_sync_mode(2); // FULL: sync every commit
3551
3552 let txn = storage.begin_transaction().unwrap();
3553 storage
3554 .write(txn, b"persist".to_vec(), b"data".to_vec())
3555 .unwrap();
3556 storage.commit(txn).unwrap();
3557
3558 // Simulate crash (no clean shutdown)
3559 std::mem::forget(storage);
3560 }
3561
3562 // Phase 2: Reopen and recover
3563 {
3564 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3565 let stats = storage.recover().unwrap();
3566 assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3567
3568 // Data should be recovered
3569 let txn = storage.begin_transaction().unwrap();
3570 let v = storage.read(txn, b"persist").unwrap();
3571 assert_eq!(v, Some(b"data".to_vec()));
3572 storage.abort(txn).unwrap();
3573 }
3574 }
3575
3576 #[test]
3577 fn test_scan_prefix() {
3578 let dir = tempdir().unwrap();
3579 let storage = DurableStorage::open(dir.path()).unwrap();
3580
3581 let txn = storage.begin_transaction().unwrap();
3582 storage
3583 .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3584 .unwrap();
3585 storage
3586 .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3587 .unwrap();
3588 storage
3589 .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3590 .unwrap();
3591 storage.commit(txn).unwrap();
3592
3593 let txn2 = storage.begin_transaction().unwrap();
3594 let users = storage.scan(txn2, b"user:").unwrap();
3595 assert_eq!(users.len(), 2);
3596 storage.abort(txn2).unwrap();
3597 }
3598
3599 #[test]
3600 fn test_delete() {
3601 let dir = tempdir().unwrap();
3602 let storage = DurableStorage::open(dir.path()).unwrap();
3603
3604 // Insert
3605 let t1 = storage.begin_transaction().unwrap();
3606 storage
3607 .write(t1, b"key".to_vec(), b"value".to_vec())
3608 .unwrap();
3609 storage.commit(t1).unwrap();
3610
3611 // Verify exists
3612 let t2 = storage.begin_transaction().unwrap();
3613 assert!(storage.read(t2, b"key").unwrap().is_some());
3614 storage.abort(t2).unwrap();
3615
3616 // Delete
3617 let t3 = storage.begin_transaction().unwrap();
3618 storage.delete(t3, b"key".to_vec()).unwrap();
3619 storage.commit(t3).unwrap();
3620
3621 // Verify deleted
3622 let t4 = storage.begin_transaction().unwrap();
3623 assert!(storage.read(t4, b"key").unwrap().is_none());
3624 storage.abort(t4).unwrap();
3625 }
3626
3627 #[test]
3628 fn test_gc() {
3629 let dir = tempdir().unwrap();
3630 let storage = DurableStorage::open(dir.path()).unwrap();
3631
3632 // Create multiple versions
3633 for i in 0..10 {
3634 let txn = storage.begin_transaction().unwrap();
3635 storage
3636 .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3637 .unwrap();
3638 storage.commit(txn).unwrap();
3639 }
3640
3641 // GC should reclaim old versions
3642 let gc_count = storage.gc();
3643 // At least some versions should be collected
3644 // (exact count depends on implementation)
3645 let _ = gc_count; // gc_count is usize, always >= 0
3646 }
3647
3648 #[test]
3649 fn test_group_commit() {
3650 use std::sync::Arc;
3651 use std::thread;
3652
3653 let dir = tempdir().unwrap();
3654 let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3655
3656 // Spawn multiple threads to commit concurrently
3657 let mut handles = vec![];
3658 for i in 0..4 {
3659 let storage = Arc::clone(&storage);
3660 handles.push(thread::spawn(move || {
3661 let txn = storage.begin_transaction().unwrap();
3662 storage
3663 .write(
3664 txn,
3665 format!("key{}", i).into_bytes(),
3666 format!("val{}", i).into_bytes(),
3667 )
3668 .unwrap();
3669 storage.commit(txn).unwrap()
3670 }));
3671 }
3672
3673 // Wait for all commits
3674 let mut commit_times = vec![];
3675 for h in handles {
3676 commit_times.push(h.join().unwrap());
3677 }
3678
3679 // All commits should succeed
3680 assert!(commit_times.iter().all(|&ts| ts > 0));
3681
3682 // Verify data persisted
3683 let txn = storage.begin_transaction().unwrap();
3684 for i in 0..4 {
3685 let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3686 assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3687 }
3688 storage.abort(txn).unwrap();
3689 }
3690
3691 // ==================== ArenaMvccMemTable Tests ====================
3692
3693 #[test]
3694 fn test_arena_memtable_basic_write_read() {
3695 let memtable = ArenaMvccMemTable::new();
3696
3697 // Write some values
3698 memtable
3699 .write(b"key1", Some(b"value1".to_vec()), 1)
3700 .unwrap();
3701 memtable
3702 .write(b"key2", Some(b"value2".to_vec()), 1)
3703 .unwrap();
3704
3705 // Read them back (uncommitted, so need txn_id match)
3706 assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3707 assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3708 assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3709 }
3710
3711 #[test]
3712 fn test_arena_memtable_update() {
3713 let memtable = ArenaMvccMemTable::new();
3714
3715 memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3716 memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3717
3718 assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3719 }
3720
3721 #[test]
3722 fn test_arena_memtable_delete() {
3723 let memtable = ArenaMvccMemTable::new();
3724
3725 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3726 memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3727
3728 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3729 }
3730
3731 #[test]
3732 fn test_arena_memtable_scan_prefix() {
3733 let memtable = ArenaMvccMemTable::new();
3734
3735 memtable
3736 .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3737 .unwrap();
3738 memtable
3739 .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3740 .unwrap();
3741 memtable
3742 .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3743 .unwrap();
3744 memtable
3745 .write(b"order:1", Some(b"order_data".to_vec()), 1)
3746 .unwrap();
3747
3748 // Create a write set and commit
3749 let mut write_set = HashSet::new();
3750 write_set.insert(InlineKey::from_slice(b"user:1:name"));
3751 write_set.insert(InlineKey::from_slice(b"user:1:email"));
3752 write_set.insert(InlineKey::from_slice(b"user:2:name"));
3753 write_set.insert(InlineKey::from_slice(b"order:1"));
3754 memtable.commit(1, 10, &write_set);
3755
3756 // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3757 let results = memtable.scan_prefix(b"user:1:", 11, None);
3758 assert_eq!(results.len(), 2);
3759
3760 // Scan for all users
3761 let results = memtable.scan_prefix(b"user:", 11, None);
3762 assert_eq!(results.len(), 3);
3763 }
3764
3765 #[test]
3766 fn test_arena_memtable_write_batch() {
3767 let memtable = ArenaMvccMemTable::new();
3768
3769 let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3770 (b"k1", Some(b"v1".to_vec())),
3771 (b"k2", Some(b"v2".to_vec())),
3772 (b"k3", Some(b"v3".to_vec())),
3773 ];
3774
3775 memtable.write_batch(&writes, 1).unwrap();
3776
3777 assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3778 assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3779 assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3780 }
3781
3782 #[test]
3783 fn test_arena_memtable_gc() {
3784 let memtable = ArenaMvccMemTable::new();
3785
3786 // Write multiple versions
3787 for i in 0..10 {
3788 memtable
3789 .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3790 .unwrap();
3791
3792 let mut write_set = HashSet::new();
3793 write_set.insert(InlineKey::from_slice(b"key"));
3794 memtable.commit(i + 1, (i + 1) * 10, &write_set);
3795 }
3796
3797 // GC old versions
3798 let gc_count = memtable.gc(90);
3799 let _ = gc_count; // gc_count is usize, always >= 0
3800 }
3801
3802 #[test]
3803 fn test_arena_memtable_size_tracking() {
3804 let memtable = ArenaMvccMemTable::new();
3805
3806 assert_eq!(memtable.size(), 0);
3807
3808 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3809
3810 assert!(memtable.size() > 0);
3811 }
3812
3813 #[test]
3814 fn test_arena_memtable_abort() {
3815 let memtable = ArenaMvccMemTable::new();
3816
3817 memtable
3818 .write(b"key", Some(b"uncommitted".to_vec()), 1)
3819 .unwrap();
3820
3821 // Visible to same txn
3822 assert_eq!(
3823 memtable.read(b"key", 0, Some(1)),
3824 Some(b"uncommitted".to_vec())
3825 );
3826
3827 // Not visible to other txns
3828 assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3829
3830 // Abort
3831 memtable.abort(1);
3832
3833 // No longer visible
3834 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3835 }
3836
3837 // ========================================================================
3838 // MemTableKind Tests - Unified Abstraction
3839 // ========================================================================
3840
3841 #[test]
3842 fn test_memtable_kind_standard() {
3843 let memtable = MemTableKind::new(MemTableType::Standard, true);
3844 assert_eq!(memtable.kind(), MemTableType::Standard);
3845
3846 // Write and read
3847 memtable
3848 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
3849 .unwrap();
3850
3851 // Commit transaction at ts=100
3852 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3853 memtable.commit(1, 100, &write_set);
3854
3855 // Read after commit - snapshot_ts must be > commit_ts for visibility
3856 let v = memtable.read(b"key1", 101, None);
3857 assert_eq!(v, Some(b"value1".to_vec()));
3858 }
3859
3860 #[test]
3861 fn test_memtable_kind_arena() {
3862 let memtable = MemTableKind::new(MemTableType::Arena, true);
3863 assert_eq!(memtable.kind(), MemTableType::Arena);
3864
3865 // Write and read
3866 memtable
3867 .write(b"key1".to_vec(), Some(b"value1".to_vec()), 1)
3868 .unwrap();
3869
3870 // Commit at ts=100
3871 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3872 memtable.commit(1, 100, &write_set);
3873
3874 // Read after commit - snapshot_ts > commit_ts
3875 let v = memtable.read(b"key1", 101, None);
3876 assert_eq!(v, Some(b"value1".to_vec()));
3877 }
3878
3879 #[test]
3880 fn test_memtable_kind_scan_range() {
3881 // Test both implementations have consistent behavior
3882 for kind in [MemTableType::Standard, MemTableType::Arena] {
3883 let memtable = MemTableKind::new(kind, true);
3884
3885 // Write some data
3886 for i in 0..5 {
3887 let key = format!("key{}", i);
3888 let value = format!("value{}", i);
3889 memtable
3890 .write(key.into_bytes(), Some(value.into_bytes()), 1)
3891 .unwrap();
3892 }
3893
3894 // Commit all at ts=100
3895 let write_set: HashSet<InlineKey> = (0..5)
3896 .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3897 .collect();
3898 memtable.commit(1, 100, &write_set);
3899
3900 // Scan range with snapshot_ts > commit_ts
3901 let results = memtable.scan_range(b"key1", b"key4", 101, None);
3902 assert_eq!(
3903 results.len(),
3904 3,
3905 "kind={:?} should have 3 results (key1, key2, key3)",
3906 kind
3907 );
3908 }
3909 }
3910
3911 #[test]
3912 fn test_durable_storage_arena() {
3913 let dir = tempdir().unwrap();
3914 let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3915
3916 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3917
3918 // Basic transaction should work the same
3919 let txn_id = storage.begin_transaction().unwrap();
3920 storage
3921 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3922 .unwrap();
3923 storage.commit(txn_id).unwrap();
3924
3925 let txn2 = storage.begin_transaction().unwrap();
3926 let v = storage.read(txn2, b"key1").unwrap();
3927 assert_eq!(v, Some(b"value1".to_vec()));
3928 storage.abort(txn2).unwrap();
3929 }
3930
3931 #[test]
3932 fn test_durable_storage_full_config() {
3933 let dir = tempdir().unwrap();
3934
3935 // Test with Arena and ordered index enabled
3936 let storage =
3937 DurableStorage::open_with_full_config(dir.path(), true, MemTableType::Arena).unwrap();
3938
3939 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3940
3941 // Write multiple keys
3942 let txn = storage.begin_transaction().unwrap();
3943 for i in 0..10 {
3944 let key = format!("key{:02}", i);
3945 let value = format!("value{}", i);
3946 storage
3947 .write(txn, key.into_bytes(), value.into_bytes())
3948 .unwrap();
3949 }
3950 storage.commit(txn).unwrap();
3951
3952 // Scan should work (uses scan method for prefix)
3953 let txn2 = storage.begin_transaction().unwrap();
3954 let results = storage.scan(txn2, b"key0").unwrap();
3955 assert_eq!(results.len(), 10); // key00 through key09
3956 storage.abort(txn2).unwrap();
3957 }
3958}