sochdb_storage/durable_storage.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Durable Storage Layer
19//!
20//! This module wires together all storage components into a production-ready
21//! durable storage engine:
22//!
23//! - WAL (txn_wal.rs) for durability
24//! - Group Commit for throughput
25//! - MVCC for isolation
26//! - LSCS for columnar efficiency
27//!
28//! ## Architecture
29//!
30//! ```text
31//! ┌─────────────────────────────────────────────────────────────────┐
32//! │ DurableStorage │
33//! ├─────────────────────────────────────────────────────────────────┤
34//! │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
35//! │ │ MvccManager │ │ GroupCommit │───▶│ TxnWal (fsync) │ │
36//! │ │ │ │ │ └─────────────────────┘ │
37//! │ │ ┌─────────┐ │ └─────────────┘ │
38//! │ │ │Snapshots│ │ │
39//! │ │ └─────────┘ │ ┌─────────────────────────────────────────┐│
40//! │ │ ┌─────────┐ │ │ MemTable ││
41//! │ │ │ Txn Map │ │ │ (key → (value, txn_id, version)) ││
42//! │ │ └─────────┘ │ └─────────────────────────────────────────┘│
43//! │ └─────────────┘ │
44//! │ ┌─────────────────────────────────────────┐│
45//! │ │ LSCS (SST) ││
46//! │ │ Immutable columnar segments ││
47//! │ └─────────────────────────────────────────┘│
48//! └─────────────────────────────────────────────────────────────────┘
49//! ```
50//!
51//! ## Concurrency
52//!
53//! - Writers: Serialize through WAL, use MVCC for conflict detection
54//! - Readers: Lock-free reads at snapshot timestamp
55//! - Commits: Batched through GroupCommit for throughput
56
57use std::collections::HashSet;
58use std::path::{Path, PathBuf};
59use std::sync::Arc;
60use std::sync::atomic::{AtomicU64, Ordering};
61
62use dashmap::DashMap;
63use smallvec::SmallVec;
64
65use crossbeam_skiplist::SkipMap;
66
67use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
68use crate::group_commit::EventDrivenGroupCommit;
69use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
70use sochdb_core::{Result, SochDBError};
71
72// =============================================================================
73// SSI Bloom Filter - Fast Conflict Pre-Filtering
74// =============================================================================
75
76/// Space-efficient Bloom filter for SSI conflict detection
77///
78/// Used to quickly determine if two transactions MIGHT have conflicting keys.
79/// False positives are acceptable (leads to unnecessary exact checks),
80/// but false negatives are not allowed.
81///
82/// ## Configuration
83///
84/// For 1000 keys with 1% false positive rate:
85/// - m = ~9600 bits ≈ 1.2 KB per transaction
86/// - k = 7 hash functions
87///
88/// ## Lazy Initialization
89///
90/// The bit vector is lazily initialized on first insert to avoid
91/// allocation overhead for read-only transactions.
92#[derive(Clone, Debug)]
93pub struct SsiBloomFilter {
94 /// Bit vector (each u64 holds 64 bits) - lazily initialized
95 bits: Option<Vec<u64>>,
96 /// Expected capacity (used for lazy init sizing)
97 expected_capacity: usize,
98 /// Number of hash functions to use
99 num_hashes: u32,
100}
101
102impl SsiBloomFilter {
103 /// Optimal number of bits per item for 1% false positive rate
104 /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
105 const BITS_PER_ITEM: f64 = 9.6;
106
107 /// Optimal number of hash functions for 1% false positive rate
108 /// k = (m/n) × ln(2) ≈ 7
109 const DEFAULT_NUM_HASHES: u32 = 7;
110
111 /// Minimum capacity to avoid tiny filters
112 const MIN_CAPACITY: usize = 64;
113
114 /// Create a new bloom filter for expected item count (lazy allocation)
115 ///
116 /// Configured for ~1% false positive rate.
117 /// The bit vector is not allocated until first insert.
118 #[inline]
119 pub fn new(expected_items: usize) -> Self {
120 Self {
121 bits: None,
122 expected_capacity: expected_items.max(Self::MIN_CAPACITY),
123 num_hashes: Self::DEFAULT_NUM_HASHES,
124 }
125 }
126
127 /// Create with specific capacity in words (for memory-constrained scenarios)
128 pub fn with_word_capacity(words: usize) -> Self {
129 Self {
130 bits: None,
131 expected_capacity: words.max(1) * 64 / 10, // Approx items from words
132 num_hashes: Self::DEFAULT_NUM_HASHES,
133 }
134 }
135
136 /// Ensure bits are allocated (lazy initialization)
137 #[inline]
138 fn ensure_allocated(&mut self) {
139 if self.bits.is_none() {
140 let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
141 let num_words = num_bits.div_ceil(64);
142 self.bits = Some(vec![0u64; num_words]);
143 }
144 }
145
146 /// Add a key to the filter - O(k) where k = num_hashes
147 #[inline]
148 pub fn insert(&mut self, key: &[u8]) {
149 self.ensure_allocated();
150 let bits = self.bits.as_mut().unwrap();
151 let num_bits = bits.len() * 64;
152 if num_bits == 0 {
153 return;
154 }
155
156 // Use two hash functions to simulate k hash functions
157 // h(i) = h1 + i * h2 (double hashing technique)
158 let h1 = Self::hash1(key);
159 let h2 = Self::hash2(key);
160
161 for i in 0..self.num_hashes {
162 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
163 let bit_idx = (h as usize) % num_bits;
164 let word_idx = bit_idx / 64;
165 let bit_pos = bit_idx % 64;
166 bits[word_idx] |= 1 << bit_pos;
167 }
168 }
169
170 /// Check if a key might be present - O(k)
171 ///
172 /// Returns:
173 /// - false: Key is definitely NOT in the set (or filter not initialized)
174 /// - true: Key MIGHT be in the set (needs exact check)
175 #[inline]
176 pub fn may_contain(&self, key: &[u8]) -> bool {
177 let bits = match &self.bits {
178 Some(b) => b,
179 None => return false, // Uninitialized = empty
180 };
181 let num_bits = bits.len() * 64;
182 if num_bits == 0 {
183 return false;
184 }
185
186 let h1 = Self::hash1(key);
187 let h2 = Self::hash2(key);
188
189 for i in 0..self.num_hashes {
190 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
191 let bit_idx = (h as usize) % num_bits;
192 let word_idx = bit_idx / 64;
193 let bit_pos = bit_idx % 64;
194 if bits[word_idx] & (1 << bit_pos) == 0 {
195 return false; // Definitely not present
196 }
197 }
198 true // Might be present
199 }
200
201 /// Check if this filter might intersect with another
202 ///
203 /// Fast O(m/64) check using bitwise AND of all words.
204 /// If no bits are shared, sets are definitely disjoint.
205 #[inline]
206 pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
207 let (self_bits, other_bits) = match (&self.bits, &other.bits) {
208 (Some(s), Some(o)) => (s, o),
209 _ => return false, // Either uninitialized = no intersection
210 };
211 let min_len = self_bits.len().min(other_bits.len());
212 for i in 0..min_len {
213 if self_bits[i] & other_bits[i] != 0 {
214 return true; // Might intersect
215 }
216 }
217 false // Definitely disjoint
218 }
219
220 /// First hash function (using built-in hasher)
221 #[inline]
222 fn hash1(key: &[u8]) -> u64 {
223 use std::collections::hash_map::DefaultHasher;
224 use std::hash::{Hash, Hasher};
225 let mut hasher = DefaultHasher::new();
226 key.hash(&mut hasher);
227 hasher.finish()
228 }
229
230 /// Second hash function (using twox-hash for independence)
231 #[inline]
232 fn hash2(key: &[u8]) -> u64 {
233 twox_hash::xxh3::hash64(key)
234 }
235
236 /// Get the memory size in bytes
237 pub fn size_bytes(&self) -> usize {
238 self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
239 }
240
241 /// Check if the filter is empty
242 pub fn is_empty(&self) -> bool {
243 match &self.bits {
244 Some(bits) => bits.iter().all(|&w| w == 0),
245 None => true,
246 }
247 }
248}
249
250/// Type alias for inline key storage - keys up to 32 bytes stored on stack
251/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
252pub type InlineKey = SmallVec<[u8; 32]>;
253
254/// Version of a key-value pair
255#[derive(Debug, Clone)]
256pub struct Version {
257 /// The value (None = tombstone)
258 pub value: Option<Vec<u8>>,
259 /// Transaction that created this version
260 pub txn_id: u64,
261 /// Commit timestamp (0 = uncommitted)
262 pub commit_ts: u64,
263}
264
265// ============================================================================
266// Optimized VersionChain with Binary Search (Task 1: mm.md)
267// ============================================================================
268
269/// Multi-version data for a single key with O(log v) read complexity
270///
271/// ## Optimization: Binary Search with Sorted Commit Ordering
272///
273/// Separates committed versions (sorted descending by commit_ts) from
274/// uncommitted version (single optional slot per transaction).
275///
276/// **Before:** O(v) linear scan + O(v) max computation = O(v)
277/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
278///
279/// For v=10 versions: 3.3x speedup
280/// For v=100 versions: 7x speedup
281#[derive(Debug, Default)]
282pub struct VersionChain {
283 /// Committed versions sorted by commit_ts DESCENDING (newest first)
284 /// This ordering enables efficient binary search using partition_point
285 committed: Vec<Version>,
286 /// Single uncommitted version slot (at most one per transaction writing this key)
287 uncommitted: Option<Version>,
288}
289
290impl VersionChain {
291 /// Create a new empty version chain
292 #[inline]
293 pub fn new() -> Self {
294 Self {
295 committed: Vec::new(),
296 uncommitted: None,
297 }
298 }
299
300 /// Add a new uncommitted version
301 /// If there's already an uncommitted version from this txn, update it in place
302 ///
303 /// O(1) - just updates the uncommitted slot
304 #[inline]
305 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
306 match &mut self.uncommitted {
307 Some(v) if v.txn_id == txn_id => {
308 // Update in place - O(1)
309 v.value = value;
310 }
311 Some(_) => {
312 // Different transaction - this is a write conflict!
313 // The caller should have checked has_write_conflict first
314 // For safety, we overwrite (this will be caught at commit)
315 self.uncommitted = Some(Version {
316 value,
317 txn_id,
318 commit_ts: 0,
319 });
320 }
321 None => {
322 // New uncommitted version - O(1)
323 self.uncommitted = Some(Version {
324 value,
325 txn_id,
326 commit_ts: 0,
327 });
328 }
329 }
330 }
331
332 /// Commit a version - moves from uncommitted slot to sorted committed list
333 ///
334 /// O(log v) - inserts into sorted position using binary search
335 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
336 if let Some(ref mut v) = self.uncommitted {
337 if v.txn_id == txn_id && v.commit_ts == 0 {
338 v.commit_ts = commit_ts;
339 let committed_version = self.uncommitted.take().unwrap();
340
341 // Insert into sorted position (descending by commit_ts)
342 // Use partition_point to find insertion point in O(log v)
343 let insert_pos = self.committed.partition_point(|existing| existing.commit_ts > commit_ts);
344 self.committed.insert(insert_pos, committed_version);
345
346 return true;
347 }
348 }
349 false
350 }
351
352 /// Abort a version (remove uncommitted version for txn)
353 ///
354 /// O(1) - just clears the uncommitted slot if it matches
355 #[inline]
356 pub fn abort(&mut self, txn_id: u64) {
357 if let Some(ref v) = self.uncommitted {
358 if v.txn_id == txn_id {
359 self.uncommitted = None;
360 }
361 }
362 }
363
364 /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
365 /// Returns the most recent committed version visible at snapshot_ts,
366 /// or an uncommitted version if it belongs to current_txn_id.
367 ///
368 /// ## Complexity: O(1) + O(log v) = O(log v)
369 ///
370 /// 1. O(1) check for uncommitted version from current transaction
371 /// 2. O(log v) binary search for most recent visible committed version
372 ///
373 /// Snapshot isolation: we see commits with commit_ts < snapshot_ts (strictly less)
374 #[inline]
375 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
376 // O(1): Check uncommitted version from current transaction
377 if let Some(txn_id) = current_txn_id {
378 if let Some(ref v) = self.uncommitted {
379 if v.txn_id == txn_id {
380 return Some(v);
381 }
382 }
383 }
384
385 // O(log v): Binary search for first version with commit_ts < snapshot_ts
386 // Since committed is sorted descending by commit_ts, we find the first
387 // version where commit_ts < snapshot_ts (the newest visible version)
388 //
389 // partition_point returns the first index where predicate is false
390 // We want first index where commit_ts < snapshot_ts
391 let idx = self.committed.partition_point(|v| v.commit_ts >= snapshot_ts);
392
393 // The version at idx (if exists) is the newest with commit_ts < snapshot_ts
394 self.committed.get(idx)
395 }
396
397 /// Check if there's an uncommitted version by another transaction
398 ///
399 /// O(1) - just checks the uncommitted slot
400 #[inline]
401 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
402 if let Some(ref v) = self.uncommitted {
403 return v.txn_id != my_txn_id;
404 }
405 false
406 }
407
408 /// Garbage collect old versions
409 ///
410 /// Keeps only versions that might be visible to active transactions,
411 /// plus one committed version before min_active_ts for new snapshots.
412 pub fn gc(&mut self, min_active_ts: u64) {
413 // Uncommitted version is always kept (will be committed or aborted)
414
415 if self.committed.len() <= 1 {
416 return;
417 }
418
419 // Find versions to keep:
420 // 1. All versions with commit_ts > min_active_ts (visible to active txns)
421 // 2. One version with commit_ts <= min_active_ts (newest anchor point)
422
423 // Since committed is sorted descending, find split point
424 let split_idx = self.committed.partition_point(|v| v.commit_ts > min_active_ts);
425
426 // Keep all versions before split_idx (commit_ts > min_active_ts)
427 // Plus one version at split_idx if it exists (anchor point)
428 let keep_count = if split_idx < self.committed.len() {
429 split_idx + 1 // Keep one anchor version
430 } else {
431 split_idx
432 };
433
434 self.committed.truncate(keep_count);
435 }
436
437 /// Get total version count (committed + uncommitted)
438 #[inline]
439 pub fn version_count(&self) -> usize {
440 self.committed.len() + if self.uncommitted.is_some() { 1 } else { 0 }
441 }
442
443 // Legacy compatibility: get versions vec (for tests)
444 #[cfg(test)]
445 pub fn versions(&self) -> Vec<Version> {
446 let mut result = self.committed.clone();
447 if let Some(ref v) = self.uncommitted {
448 result.push(v.clone());
449 }
450 result
451 }
452}
453
454// =============================================================================
455// Pre-sizing Constants to Avoid HashSet Resize Overhead
456// =============================================================================
457
458/// Default capacity for write_set HashSet
459/// Sized for typical OLTP transactions (10-50 keys)
460/// Avoids resize overhead that caused +11% regression
461const WRITE_SET_INITIAL_CAPACITY: usize = 32;
462
463/// Default capacity for read_set HashSet
464/// Typically larger than write_set due to read-heavy patterns
465const READ_SET_INITIAL_CAPACITY: usize = 64;
466
467/// Transaction state for MVCC
468#[derive(Debug, Clone)]
469pub struct MvccTransaction {
470 /// Transaction ID
471 pub txn_id: u64,
472 /// Snapshot timestamp (reads see commits before this)
473 pub snapshot_ts: u64,
474 /// Keys written by this transaction - uses SmallVec for inline storage
475 /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
476 pub write_set: HashSet<InlineKey>,
477 /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
478 /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
479 pub read_set: HashSet<InlineKey>,
480 /// Bloom filter for write set - fast SSI pre-filtering
481 pub write_bloom: SsiBloomFilter,
482 /// Bloom filter for read set - fast SSI pre-filtering
483 pub read_bloom: SsiBloomFilter,
484 /// Transaction state
485 pub state: TxnState,
486 /// Transaction mode for SSI optimization (Recommendation 9)
487 /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
488 pub mode: TransactionMode,
489}
490
491impl MvccTransaction {
492 /// Create a new transaction with pre-sized collections
493 ///
494 /// This avoids HashSet resize overhead during the transaction
495 /// which was causing +11% regression on write_set.insert().
496 #[inline]
497 pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
498 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
499 }
500
501 /// Create a read-only transaction (SSI bypass - 2.6x faster)
502 ///
503 /// Read-only transactions skip all SSI tracking:
504 /// - No read_set allocation
505 /// - No read_bloom allocation
506 /// - No commit validation
507 ///
508 /// ## Performance
509 ///
510 /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
511 #[inline]
512 pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
513 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
514 }
515
516 /// Create a write-only transaction (partial SSI bypass)
517 ///
518 /// Write-only transactions skip read tracking:
519 /// - No read_set tracking
520 /// - No read_bloom inserts
521 /// - Still needs write_set for commit
522 #[inline]
523 pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
524 Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
525 }
526
527 /// Create transaction with specific mode
528 #[inline]
529 pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
530 // Optimize allocation based on mode
531 let (write_capacity, read_capacity) = match mode {
532 TransactionMode::ReadOnly => (0, 0), // No tracking needed
533 TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
534 TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
535 };
536 Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
537 }
538
539 /// Create with custom capacities for expected workload
540 ///
541 /// Use this when you know the transaction will write many keys
542 /// to avoid resize overhead entirely.
543 #[inline]
544 pub fn with_capacity(
545 txn_id: u64,
546 snapshot_ts: u64,
547 write_capacity: usize,
548 read_capacity: usize,
549 mode: TransactionMode,
550 ) -> Self {
551 Self {
552 txn_id,
553 snapshot_ts,
554 write_set: HashSet::with_capacity(write_capacity),
555 read_set: HashSet::with_capacity(read_capacity),
556 write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
557 read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
558 state: TxnState::Active,
559 mode,
560 }
561 }
562
563 /// Check if this is a read-only transaction
564 #[inline]
565 pub fn is_read_only(&self) -> bool {
566 self.write_set.is_empty()
567 }
568
569 /// Check if this is a single-key write transaction
570 #[inline]
571 pub fn is_single_key_write(&self) -> bool {
572 self.write_set.len() == 1 && self.read_set.len() <= 1
573 }
574}
575
576/// Transaction state
577#[derive(Debug, Clone, Copy, PartialEq, Eq)]
578pub enum TxnState {
579 Active,
580 Committed,
581 Aborted,
582}
583
584// =============================================================================
585// Transaction Mode for SSI Bypass (Recommendation 9)
586// =============================================================================
587
588/// Transaction mode for SSI optimization
589///
590/// By classifying transactions at begin time, we can skip expensive SSI
591/// tracking for the majority of transactions:
592///
593/// | Mode | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
594/// |-----------|-------------------|--------------------|-----------------|
595/// | ReadOnly | None | None | ~10 ns |
596/// | WriteOnly | None | Full | ~30 ns |
597/// | ReadWrite | Full | Full | ~50 ns |
598///
599/// ## Performance Analysis
600///
601/// For read-only transactions (typically 90% of workload):
602/// ```text
603/// Current: T_txn = T_begin + N × (T_read + T_record) + T_commit
604/// = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
605///
606/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
607/// = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
608///
609/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
610/// ```
611#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
612pub enum TransactionMode {
613 /// Read-only transaction - skips ALL SSI tracking
614 /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
615 /// Safe to skip read_set, read_bloom, and commit validation entirely
616 ReadOnly,
617
618 /// Write-only transaction - skips read tracking
619 /// Cannot form incoming rw-edges (no reads from concurrent writers)
620 /// Only needs write_set and write_bloom tracking
621 WriteOnly,
622
623 /// Full read-write transaction (default) - complete SSI tracking
624 /// May form both incoming and outgoing rw-edges
625 /// Requires full validation at commit time
626 #[default]
627 ReadWrite,
628}
629
630impl TransactionMode {
631 /// Check if this mode requires read tracking
632 #[inline]
633 pub fn tracks_reads(&self) -> bool {
634 matches!(self, TransactionMode::ReadWrite)
635 }
636
637 /// Check if this mode requires write tracking
638 #[inline]
639 pub fn tracks_writes(&self) -> bool {
640 matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
641 }
642
643 /// Check if commit needs SSI validation
644 #[inline]
645 pub fn needs_ssi_validation(&self) -> bool {
646 matches!(self, TransactionMode::ReadWrite)
647 }
648}
649
650/// SSI conflict edge type
651#[derive(Debug, Clone, Copy, PartialEq, Eq)]
652pub enum ConflictType {
653 /// Read-write conflict: T1 reads X, then T2 writes X
654 ReadWrite,
655 /// Write-read conflict: T1 writes X, then T2 reads X
656 WriteRead,
657}
658
659/// SSI conflict edge for dangerous structure detection
660#[derive(Debug, Clone)]
661pub struct ConflictEdge {
662 /// Source transaction
663 pub from_txn: u64,
664 /// Target transaction
665 pub to_txn: u64,
666 /// Type of conflict
667 pub conflict_type: ConflictType,
668}
669
670/// MVCC Manager with SSI support
671///
672/// Uses DashMap for lock-free per-transaction access.
673/// Implements Serializable Snapshot Isolation (SSI) with
674/// dangerous structure detection for rw-antidependency cycles.
675#[allow(clippy::type_complexity)]
676pub struct MvccManager {
677 /// Active transactions (sharded for concurrent access)
678 active_txns: DashMap<u64, MvccTransaction>,
679 /// Current timestamp counter
680 ts_counter: AtomicU64,
681 /// Minimum active snapshot timestamp (for GC)
682 min_active_ts: AtomicU64,
683 /// Recently committed transactions for SSI validation
684 /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
685 /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
686 recent_commits: DashMap<
687 u64,
688 (
689 u64,
690 SsiBloomFilter,
691 SsiBloomFilter,
692 HashSet<InlineKey>,
693 HashSet<InlineKey>,
694 ),
695 >,
696 /// Max recent commits to track
697 max_recent_commits: usize,
698}
699
700impl Default for MvccManager {
701 fn default() -> Self {
702 Self::new()
703 }
704}
705
706impl MvccManager {
707 pub fn new() -> Self {
708 Self {
709 active_txns: DashMap::new(),
710 ts_counter: AtomicU64::new(1),
711 min_active_ts: AtomicU64::new(0),
712 recent_commits: DashMap::new(),
713 max_recent_commits: 1000, // Track last 1000 commits for SSI
714 }
715 }
716
717 /// Begin a new transaction with snapshot isolation
718 ///
719 /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
720 pub fn begin(&self, txn_id: u64) -> MvccTransaction {
721 self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
722 }
723
724 /// Begin a read-only transaction (SSI bypass - 2.6x faster)
725 ///
726 /// Read-only transactions skip all SSI tracking, reducing
727 /// per-read overhead from ~82ns to ~32ns.
728 ///
729 /// ## Safety
730 ///
731 /// Caller must ensure no writes are performed. Attempting to
732 /// write in a read-only transaction will still succeed but
733 /// won't be tracked for SSI validation.
734 #[inline]
735 pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
736 self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
737 }
738
739 /// Begin a write-only transaction (partial SSI bypass)
740 ///
741 /// Write-only transactions skip read tracking, reducing overhead
742 /// for insert-heavy workloads.
743 #[inline]
744 pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
745 self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
746 }
747
748 /// Begin a transaction with specific mode
749 ///
750 /// This is the core transaction creation method that all other
751 /// begin_* methods delegate to.
752 pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
753 let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
754
755 // Create transaction with mode-optimized allocations
756 let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
757
758 self.active_txns.insert(txn_id, txn.clone());
759 self.update_min_active_ts();
760
761 txn
762 }
763
764 /// Get transaction if active (clones - use get_snapshot_ts for hot path)
765 pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
766 self.active_txns.get(&txn_id).map(|t| t.clone())
767 }
768
769 /// Fast path: get just the snapshot timestamp without cloning
770 /// This is the hot path for reads - avoids cloning bloom filters
771 #[inline]
772 pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
773 self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
774 }
775
776 /// Record a read (for SSI) - uses inline key storage + bloom filter
777 ///
778 /// ## SSI Bypass (Recommendation 9)
779 ///
780 /// For ReadOnly mode transactions, this is a no-op (instant return).
781 /// For WriteOnly mode transactions, this is a no-op.
782 /// Only ReadWrite mode transactions track reads for SSI.
783 ///
784 /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
785 #[inline]
786 pub fn record_read(&self, txn_id: u64, key: &[u8]) {
787 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
788 // SSI Bypass: Skip tracking for read-only and write-only modes
789 if !txn.mode.tracks_reads() {
790 return;
791 }
792
793 // Only track reads if within reasonable bounds
794 if txn.read_set.len() < 10000 {
795 txn.read_set.insert(SmallVec::from_slice(key));
796 txn.read_bloom.insert(key);
797 }
798 }
799 }
800
801 /// Record a write - uses inline key storage + bloom filter
802 ///
803 /// Note: Even ReadOnly transactions can record writes (mode is a hint).
804 /// The mode only affects SSI tracking, not write capability.
805 pub fn record_write(&self, txn_id: u64, key: &[u8]) {
806 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
807 txn.write_set.insert(SmallVec::from_slice(key));
808 txn.write_bloom.insert(key);
809 }
810 }
811
812 /// Allocate commit timestamp
813 pub fn alloc_commit_ts(&self) -> u64 {
814 self.ts_counter.fetch_add(1, Ordering::SeqCst)
815 }
816
817 /// Commit transaction with SSI validation
818 /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
819 /// Returns None if SSI validation fails (dangerous structure detected)
820 ///
821 /// ## SSI Bypass (Recommendation 9)
822 ///
823 /// For ReadOnly mode: Skip validation entirely (~10ns commit)
824 /// For WriteOnly mode: Skip read-based validation (~30ns commit)
825 /// For ReadWrite mode: Full validation (~50ns commit)
826 pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
827 // Get transaction before removing
828 let txn = self.active_txns.get(&txn_id)?.clone();
829
830 // SSI Bypass: Skip validation for ReadOnly transactions
831 // ReadOnly can never form rw-antidependency cycles
832 if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
833 // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
834 // For ReadWrite: check SSI validation result
835 if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
836 // Abort on SSI violation
837 self.active_txns.remove(&txn_id);
838 self.update_min_active_ts();
839 return None;
840 }
841 }
842
843 let commit_ts = self.alloc_commit_ts();
844
845 // Extract write_set and remove transaction - takes ownership
846 let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
847
848 // OPTIMIZATION: Only track ReadWrite transactions for SSI
849 // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
850 let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
851 && !removed_txn.read_set.is_empty()
852 && !removed_txn.write_set.is_empty();
853
854 if needs_ssi_tracking {
855 // Need to clone write_set since we return it AND track it
856 let write_set_for_return = removed_txn.write_set.clone();
857
858 self.track_commit_owned(
859 txn_id,
860 commit_ts,
861 removed_txn.read_bloom,
862 removed_txn.write_bloom,
863 removed_txn.read_set,
864 removed_txn.write_set,
865 );
866
867 self.update_min_active_ts();
868 Some((commit_ts, write_set_for_return))
869 } else {
870 // Fast path: no SSI tracking needed, avoid clone entirely
871 self.update_min_active_ts();
872 Some((commit_ts, removed_txn.write_set))
873 }
874 }
875
876 /// Validate SSI constraints for a committing transaction
877 ///
878 /// ## Transaction Classification (Task 3: Optimistic MVCC)
879 ///
880 /// Transactions are classified and routed through appropriate fast paths:
881 ///
882 /// | Class | Criteria | Validation Cost |
883 /// |------------|-------------------------------|-----------------|
884 /// | ReadOnly | write_set.is_empty() | 0 ns |
885 /// | SingleKey | write_set.len() == 1 | 0 ns |
886 /// | Disjoint | bloom filters don't intersect | ~10 ns |
887 /// | General | full SSI check | ~50 ns |
888 ///
889 /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
890 /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
891 ///
892 /// Detects "dangerous structures" - rw-antidependency cycles:
893 /// - T1 reads X (snapshot sees old value)
894 /// - T2 writes X (concurrent write)
895 /// - T2 reads Y (snapshot sees old value)
896 /// - T1 writes Y (concurrent write)
897 ///
898 /// If T1 → rw → T2 → rw → T1 exists, abort T1
899 #[inline]
900 fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
901 // =================================================================
902 // Fast Path 1: Read-only transactions (0 ns)
903 // =================================================================
904 // Read-only transactions can never form rw-antidependency cycles
905 // because they have no writes to create outgoing rw-edges
906 if txn.write_set.is_empty() {
907 return true;
908 }
909
910 // =================================================================
911 // Fast Path 2: No recent commits to check (0 ns)
912 // =================================================================
913 if self.recent_commits.is_empty() {
914 return true;
915 }
916
917 // =================================================================
918 // Fast Path 3: Single-key write transactions (0 ns)
919 // =================================================================
920 // A single-key write transaction cannot form a dangerous cycle:
921 // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
922 // AND T2 to read what T1 wrote
923 // - With only one key in write_set, the same key would need to be
924 // in both read_set AND write_set of both transactions
925 // - This is already prevented by our conflict detection (write-write)
926 if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
927 return true;
928 }
929
930 let my_snapshot = txn.snapshot_ts;
931
932 // =================================================================
933 // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
934 // =================================================================
935 // Pre-filter using bloom filters: if our write_bloom doesn't intersect
936 // with any concurrent transaction's read_bloom AND vice versa,
937 // there can be no rw-antidependency
938 let mut any_may_intersect = false;
939 for entry in self.recent_commits.iter() {
940 let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
941
942 // Only check concurrent transactions
943 if *other_commit_ts <= my_snapshot {
944 continue;
945 }
946
947 // Check bloom filter intersection (O(m/64) per filter)
948 // If our writes may intersect their reads OR their writes may intersect our reads
949 if txn.write_bloom.may_intersect(other_read_bloom)
950 || other_write_bloom.may_intersect(&txn.read_bloom)
951 {
952 any_may_intersect = true;
953 break;
954 }
955 }
956
957 // No bloom intersection means definitely disjoint - no SSI conflict possible
958 if !any_may_intersect {
959 return true;
960 }
961
962 // =================================================================
963 // Full SSI Validation (~50 ns)
964 // =================================================================
965 // Check for rw-conflicts with recently committed transactions
966 // An rw-conflict exists if:
967 // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
968 // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
969
970 let mut in_conflict_with: Vec<u64> = Vec::new();
971 let mut out_conflict_to: Vec<u64> = Vec::new();
972
973 for entry in self.recent_commits.iter() {
974 let (
975 other_txn_id,
976 (
977 other_commit_ts,
978 _other_read_bloom,
979 other_write_bloom,
980 other_read_set,
981 other_write_set,
982 ),
983 ) = entry.pair();
984
985 // Only consider transactions that committed after our snapshot started
986 // (concurrent transactions)
987 if *other_commit_ts <= my_snapshot {
988 continue;
989 }
990
991 // Check: other wrote → we read (other →rw→ me)
992 // T_other wrote a key that T_me read (rw-dependency inbound)
993 //
994 // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
995 // Only do expensive HashSet intersection if bloom says "maybe conflict"
996 let mut has_in_conflict = false;
997 for key in txn.read_set.iter() {
998 if other_write_bloom.may_contain(key) {
999 // Bloom says maybe - do exact check
1000 if other_write_set.contains(key) {
1001 has_in_conflict = true;
1002 break;
1003 }
1004 }
1005 }
1006 if has_in_conflict {
1007 in_conflict_with.push(*other_txn_id);
1008 }
1009
1010 // Check: we wrote → other read (me →rw→ other)
1011 // T_me wrote a key that T_other read (rw-dependency outbound)
1012 //
1013 // Bloom-accelerated: Use our write_bloom against their read_set
1014 let mut has_out_conflict = false;
1015 for key in other_read_set.iter() {
1016 if txn.write_bloom.may_contain(key) {
1017 // Bloom says maybe - do exact check
1018 if txn.write_set.contains(key) {
1019 has_out_conflict = true;
1020 break;
1021 }
1022 }
1023 }
1024 if has_out_conflict {
1025 out_conflict_to.push(*other_txn_id);
1026 }
1027 }
1028
1029 // Dangerous structure: we have both incoming AND outgoing rw-edges
1030 // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1031 //
1032 // Conservative check: if both exist, abort
1033 // A more precise check would verify the cycle path, but this is safe
1034 if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1035 return false; // SSI violation - abort
1036 }
1037
1038 true
1039 }
1040
1041 /// Track a committed transaction for future SSI validation
1042 ///
1043 /// Only tracks transactions that have both reads AND writes, since SSI
1044 /// only detects rw-antidependency cycles. Pure read or pure write
1045 /// transactions can't form cycles.
1046 ///
1047 /// ## Optimization: Zero-Copy Transfer
1048 ///
1049 /// Takes ownership of sets instead of cloning to avoid the +15% commit
1050 /// phase regression. The caller should use mem::take() to transfer ownership.
1051 fn track_commit_owned(
1052 &self,
1053 txn_id: u64,
1054 commit_ts: u64,
1055 read_bloom: SsiBloomFilter,
1056 write_bloom: SsiBloomFilter,
1057 read_set: HashSet<InlineKey>,
1058 write_set: HashSet<InlineKey>,
1059 ) {
1060 // Optimization: Only track mixed read-write transactions
1061 // Pure reads can't create outgoing rw-edges
1062 // Pure writes can't create incoming rw-edges
1063 if read_set.is_empty() || write_set.is_empty() {
1064 return; // Skip tracking - can't form SSI cycle
1065 }
1066
1067 // Add to recent commits with bloom filters for fast SSI pre-filtering
1068 // No cloning needed - we take ownership
1069 self.recent_commits.insert(
1070 txn_id,
1071 (
1072 commit_ts,
1073 read_bloom,
1074 write_bloom,
1075 read_set,
1076 write_set,
1077 ),
1078 );
1079
1080 // Lazy pruning: only prune when we're significantly over capacity
1081 // Avoids pruning overhead on every commit
1082 if self.recent_commits.len() > self.max_recent_commits * 2 {
1083 // Remove entries with lowest commit_ts
1084 let min_active = self.min_active_ts.load(Ordering::Relaxed);
1085 self.recent_commits
1086 .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1087 }
1088 }
1089
1090 /// Legacy track_commit that clones - kept for compatibility
1091 #[allow(dead_code)]
1092 fn track_commit(
1093 &self,
1094 txn_id: u64,
1095 commit_ts: u64,
1096 read_bloom: SsiBloomFilter,
1097 write_bloom: SsiBloomFilter,
1098 read_set: &HashSet<InlineKey>,
1099 write_set: &HashSet<InlineKey>,
1100 ) {
1101 if read_set.is_empty() || write_set.is_empty() {
1102 return;
1103 }
1104 self.recent_commits.insert(
1105 txn_id,
1106 (
1107 commit_ts,
1108 read_bloom,
1109 write_bloom,
1110 read_set.clone(),
1111 write_set.clone(),
1112 ),
1113 );
1114 }
1115
1116 /// Abort transaction
1117 pub fn abort(&self, txn_id: u64) {
1118 self.active_txns.remove(&txn_id);
1119 self.update_min_active_ts();
1120 }
1121
1122 /// Get minimum active snapshot timestamp
1123 pub fn min_active_snapshot(&self) -> u64 {
1124 self.min_active_ts.load(Ordering::SeqCst)
1125 }
1126
1127 /// Get count of active transactions
1128 pub fn active_transaction_count(&self) -> usize {
1129 self.active_txns.len()
1130 }
1131
1132 fn update_min_active_ts(&self) {
1133 let min = self
1134 .active_txns
1135 .iter()
1136 .map(|entry| entry.value().snapshot_ts)
1137 .min()
1138 .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1139 self.min_active_ts.store(min, Ordering::SeqCst);
1140 }
1141}
1142
1143/// Epoch-based dirty list for O(expired) GC instead of O(n)
1144///
1145/// Instead of scanning ALL version chains, we track which keys have versions
1146/// created in each epoch. GC only needs to visit keys from old epochs.
1147struct EpochDirtyList {
1148 /// Ring buffer of epoch -> dirty keys
1149 /// Index = epoch % EPOCH_RING_SIZE
1150 epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1151 /// Current epoch
1152 current_epoch: AtomicU64,
1153}
1154
1155const EPOCH_RING_SIZE: usize = 4;
1156
1157impl EpochDirtyList {
1158 fn new() -> Self {
1159 Self {
1160 epochs: [
1161 parking_lot::Mutex::new(Vec::new()),
1162 parking_lot::Mutex::new(Vec::new()),
1163 parking_lot::Mutex::new(Vec::new()),
1164 parking_lot::Mutex::new(Vec::new()),
1165 ],
1166 current_epoch: AtomicU64::new(0),
1167 }
1168 }
1169
1170 /// Record a version created in the current epoch
1171 #[inline]
1172 fn record_version(&self, key: Vec<u8>) {
1173 let epoch = self.current_epoch.load(Ordering::Relaxed);
1174 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1175 self.epochs[idx].lock().push(key);
1176 }
1177
1178 /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1179 ///
1180 /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1181 /// For 100 writes: ~100x fewer mutex operations.
1182 #[inline]
1183 fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1184 let epoch = self.current_epoch.load(Ordering::Relaxed);
1185 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1186 let mut guard = self.epochs[idx].lock();
1187 guard.extend(keys);
1188 }
1189
1190 /// Advance to next epoch, returning old epoch's dirty keys
1191 fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1192 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1193 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1194
1195 // Drain the old epoch's dirty list
1196 let mut guard = self.epochs[old_idx].lock();
1197 let keys = std::mem::take(&mut *guard);
1198 (old_epoch, keys)
1199 }
1200
1201 /// Get current epoch
1202 #[allow(dead_code)]
1203 fn current(&self) -> u64 {
1204 self.current_epoch.load(Ordering::Relaxed)
1205 }
1206}
1207
1208// ============================================================================
1209// Streaming Scan Iterator
1210// ============================================================================
1211
1212/// Streaming iterator for range scans
1213///
1214/// Yields results one at a time without materializing the full result set.
1215/// This enables processing of very large result sets with O(1) memory per
1216/// iteration instead of O(N) for the entire result set.
1217struct ScanRangeIterator<'a> {
1218 memtable: &'a MvccMemTable,
1219 start: Vec<u8>,
1220 end: Vec<u8>,
1221 snapshot_ts: u64,
1222 current_txn_id: Option<u64>,
1223 use_ordered: bool,
1224 // We use Option to defer initialization
1225 ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1226 unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1227 initialized: bool,
1228}
1229
1230impl<'a> Iterator for ScanRangeIterator<'a> {
1231 type Item = (Vec<u8>, Vec<u8>);
1232
1233 fn next(&mut self) -> Option<Self::Item> {
1234 // Lazy initialization on first call
1235 if !self.initialized {
1236 self.initialized = true;
1237
1238 if self.use_ordered {
1239 // Try deferred index first (after compaction, it uses a SkipMap internally)
1240 if let Some(ref def_idx) = self.memtable.deferred_index {
1241 let start = self.start.clone();
1242 let end = self.end.clone();
1243 let snapshot_ts = self.snapshot_ts;
1244 let current_txn_id = self.current_txn_id;
1245 let data = &self.memtable.data;
1246
1247 // Collect keys from deferred index (already sorted after compact)
1248 let keys: Vec<Vec<u8>> = if end.is_empty() {
1249 def_idx.range_from(&start).collect()
1250 } else {
1251 def_idx.range(&start, &end).collect()
1252 };
1253
1254 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1255 keys.into_iter()
1256 .filter_map(move |key| {
1257 if let Some(chain) = data.get(&key)
1258 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1259 && let Some(value) = &v.value
1260 {
1261 Some((key, value.clone()))
1262 } else {
1263 None
1264 }
1265 })
1266 );
1267 self.ordered_iter = Some(iter);
1268 } else if let Some(ref idx) = self.memtable.ordered_index {
1269 let start = self.start.clone();
1270 let end = self.end.clone();
1271 let snapshot_ts = self.snapshot_ts;
1272 let current_txn_id = self.current_txn_id;
1273 let data = &self.memtable.data;
1274
1275 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
1276 Box::new(
1277 idx.range(start..)
1278 .filter_map(move |entry| {
1279 let key = entry.key();
1280 if let Some(chain) = data.get(key)
1281 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1282 && let Some(value) = &v.value
1283 {
1284 Some((key.clone(), value.clone()))
1285 } else {
1286 None
1287 }
1288 })
1289 )
1290 } else {
1291 Box::new(
1292 idx.range(start..end)
1293 .filter_map(move |entry| {
1294 let key = entry.key();
1295 if let Some(chain) = data.get(key)
1296 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1297 && let Some(value) = &v.value
1298 {
1299 Some((key.clone(), value.clone()))
1300 } else {
1301 None
1302 }
1303 })
1304 )
1305 };
1306 self.ordered_iter = Some(iter);
1307 }
1308 } else {
1309 // Unordered full scan
1310 let start = self.start.clone();
1311 let end = self.end.clone();
1312 let snapshot_ts = self.snapshot_ts;
1313 let current_txn_id = self.current_txn_id;
1314
1315 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1316 self.memtable.data.iter()
1317 .filter_map(move |entry| {
1318 let key = entry.key();
1319
1320 if key.as_slice() < start.as_slice() {
1321 return None;
1322 }
1323 if !end.is_empty() && key.as_slice() >= end.as_slice() {
1324 return None;
1325 }
1326
1327 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1328 && let Some(value) = &v.value
1329 {
1330 Some((key.clone(), value.clone()))
1331 } else {
1332 None
1333 }
1334 })
1335 );
1336 self.unordered_iter = Some(iter);
1337 }
1338 }
1339
1340 // Get next from appropriate iterator
1341 if let Some(ref mut iter) = self.ordered_iter {
1342 iter.next()
1343 } else if let Some(ref mut iter) = self.unordered_iter {
1344 iter.next()
1345 } else {
1346 None
1347 }
1348 }
1349}
1350
1351/// MemTable with MVCC support
1352///
1353/// Uses DashMap for lock-free concurrent access per key.
1354/// This eliminates the global write lock bottleneck.
1355///
1356/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1357/// Maintains a deferred sorted index for efficient scans:
1358/// - Writes: O(1) append to hot buffer
1359/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1360pub struct MvccMemTable {
1361 /// Key -> VersionChain (sharded for concurrent access)
1362 data: DashMap<Vec<u8>, VersionChain>,
1363 /// Deferred sorted index for efficient prefix/range scans (optional)
1364 /// O(1) insert to hot buffer, O(N log N) sort on first scan
1365 /// When None, scan_prefix will fall back to O(N) DashMap iteration
1366 deferred_index: Option<DeferredSortedIndex>,
1367 /// Legacy SkipMap for compatibility (used when deferred=false)
1368 ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1369 /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1370 #[allow(dead_code)]
1371 use_deferred: bool,
1372 /// Approximate size in bytes
1373 size_bytes: AtomicU64,
1374 /// Epoch-based dirty list for efficient GC
1375 dirty_list: EpochDirtyList,
1376}
1377
1378impl Default for MvccMemTable {
1379 fn default() -> Self {
1380 Self::new()
1381 }
1382}
1383
1384impl MvccMemTable {
1385 pub fn new() -> Self {
1386 Self::with_ordered_index(true)
1387 }
1388
1389 /// Create memtable with optional ordered index
1390 ///
1391 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1392 /// but scan_prefix becomes O(N) instead of O(log N + K)
1393 ///
1394 /// Uses deferred sorting by default for better write performance:
1395 /// - Writes: O(1) append to hot buffer
1396 /// - Scans: O(N log N) sort-on-demand
1397 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1398 Self::with_index_mode(enable_ordered_index, true)
1399 }
1400
1401 /// Create memtable with fine-grained control over indexing
1402 ///
1403 /// # Arguments
1404 /// * `enable_ordered_index` - Whether to maintain an ordered index
1405 /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1406 /// If false, use SkipMap (O(log N) writes)
1407 pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1408 Self {
1409 data: DashMap::new(),
1410 deferred_index: if enable_ordered_index && use_deferred {
1411 Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1412 max_unsorted_entries: 10_000, // Compact every 10K writes
1413 enabled: true,
1414 }))
1415 } else {
1416 None
1417 },
1418 ordered_index: if enable_ordered_index && !use_deferred {
1419 Some(SkipMap::new())
1420 } else {
1421 None
1422 },
1423 use_deferred,
1424 size_bytes: AtomicU64::new(0),
1425 dirty_list: EpochDirtyList::new(),
1426 }
1427 }
1428
1429 /// Write a key-value pair (uncommitted)
1430 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1431 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1432 let key_len = key.len();
1433
1434 // Track this key in the current epoch's dirty list for GC
1435 self.dirty_list.record_version(key.clone());
1436
1437 // Insert into ordered index for prefix scans (if enabled)
1438 // Deferred: O(1) append to hot buffer
1439 // SkipMap: O(log N) insert
1440 if let Some(ref idx) = self.deferred_index {
1441 idx.insert(key.clone());
1442 } else if let Some(ref idx) = self.ordered_index {
1443 idx.insert(key.clone(), ());
1444 }
1445
1446 // Use entry API for atomic get-or-insert
1447 let mut entry = self.data.entry(key).or_default();
1448
1449 // Check for write-write conflict
1450 if entry.has_write_conflict(txn_id) {
1451 return Err(SochDBError::Internal(
1452 "Write-write conflict detected".into(),
1453 ));
1454 }
1455 entry.add_uncommitted(value, txn_id);
1456 self.size_bytes
1457 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1458
1459 Ok(())
1460 }
1461
1462 /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1463 ///
1464 /// Optimizations applied (Rec 3: MVCC Batching):
1465 /// - Batched dirty list tracking: single lock acquire for all keys
1466 /// - Deferred index: O(1) append per key
1467 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1468 let mut total_size = 0u64;
1469
1470 // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1471 self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1472
1473 for (key, value) in writes {
1474 // Insert into ordered index (if enabled)
1475 // Deferred: O(1) append, SkipMap: O(log N)
1476 if let Some(ref idx) = self.deferred_index {
1477 idx.insert(key.clone());
1478 } else if let Some(ref idx) = self.ordered_index {
1479 idx.insert(key.clone(), ());
1480 }
1481
1482 let mut entry = self.data.entry(key.clone()).or_default();
1483
1484 if entry.has_write_conflict(txn_id) {
1485 return Err(SochDBError::Internal(
1486 "Write-write conflict detected".into(),
1487 ));
1488 }
1489
1490 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1491 entry.add_uncommitted(value.clone(), txn_id);
1492 total_size += (key.len() + value_size) as u64;
1493 }
1494
1495 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1496 Ok(())
1497 }
1498
1499 /// Read at snapshot timestamp, with optional current txn to see own writes
1500 pub fn read(
1501 &self,
1502 key: &[u8],
1503 snapshot_ts: u64,
1504 current_txn_id: Option<u64>,
1505 ) -> Option<Vec<u8>> {
1506 self.data.get(key).and_then(|chain| {
1507 chain
1508 .read_at(snapshot_ts, current_txn_id)
1509 .and_then(|v| v.value.clone())
1510 })
1511 }
1512
1513 /// Commit all versions for a transaction
1514 ///
1515 /// Only updates the keys that were written by this transaction (tracked in write_set).
1516 /// Accepts InlineKey for zero-allocation MVCC tracking.
1517 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1518 // Only iterate over keys we know were written - O(k) instead of O(n)
1519 for key in write_set {
1520 if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1521 chain.commit(txn_id, commit_ts);
1522 }
1523 }
1524 }
1525
1526 /// Legacy commit method (iterates all keys) - kept for backward compatibility
1527 #[allow(dead_code)]
1528 pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1529 for mut entry in self.data.iter_mut() {
1530 entry.value_mut().commit(txn_id, commit_ts);
1531 }
1532 }
1533
1534 /// Abort all versions for a transaction
1535 pub fn abort(&self, txn_id: u64) {
1536 for mut entry in self.data.iter_mut() {
1537 entry.value_mut().abort(txn_id);
1538 }
1539 }
1540
1541 /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1542 ///
1543 /// ## Performance
1544 ///
1545 /// When ordered_index is enabled: O(log N + K) complexity
1546 /// - O(log N) to seek to the first key with prefix
1547 /// - O(K) to iterate matching keys
1548 ///
1549 /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1550 ///
1551 /// ## Optimizations Applied
1552 ///
1553 /// - Pre-allocates result vector based on expected output size
1554 /// - Uses batch-friendly iteration patterns
1555 /// - Minimizes allocations during iteration
1556 /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1557 pub fn scan_prefix(
1558 &self,
1559 prefix: &[u8],
1560 snapshot_ts: u64,
1561 current_txn_id: Option<u64>,
1562 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1563 // Estimate result size for pre-allocation (use 10% of total as heuristic)
1564 let estimated_size = (self.data.len() / 10).max(64);
1565 let mut results = Vec::with_capacity(estimated_size);
1566
1567 if let Some(ref idx) = self.deferred_index {
1568 // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1569 for key in idx.range_from(prefix) {
1570 // Stop when we've passed the prefix range
1571 if !key.starts_with(prefix) {
1572 break;
1573 }
1574
1575 // O(1) lookup in DashMap for version chain
1576 if let Some(chain) = self.data.get(&key)
1577 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1578 && let Some(value) = &v.value
1579 {
1580 results.push((key, value.clone()));
1581 }
1582 }
1583 } else if let Some(ref idx) = self.ordered_index {
1584 // Fast path: O(log N) seek to first key >= prefix
1585 for entry in idx.range(prefix.to_vec()..) {
1586 let key = entry.key();
1587
1588 // Stop when we've passed the prefix range
1589 if !key.starts_with(prefix) {
1590 break;
1591 }
1592
1593 // O(1) lookup in DashMap for version chain
1594 if let Some(chain) = self.data.get(key)
1595 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1596 && let Some(value) = &v.value
1597 {
1598 results.push((key.clone(), value.clone()));
1599 }
1600 }
1601 } else {
1602 // Fallback: O(N) full DashMap scan when ordered_index is disabled
1603 // Optimized with batch-friendly iteration
1604 for entry in self.data.iter() {
1605 let key = entry.key();
1606 if !key.starts_with(prefix) {
1607 continue;
1608 }
1609 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1610 && let Some(value) = &v.value
1611 {
1612 results.push((key.clone(), value.clone()));
1613 }
1614 }
1615 }
1616
1617 results
1618 }
1619
1620 /// Optimized full scan with batch allocation
1621 ///
1622 /// For use when scanning entire tables/namespaces.
1623 /// Pre-allocates based on actual data size.
1624 pub fn scan_all(
1625 &self,
1626 snapshot_ts: u64,
1627 current_txn_id: Option<u64>,
1628 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1629 let mut results = Vec::with_capacity(self.data.len());
1630
1631 for entry in self.data.iter() {
1632 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1633 && let Some(value) = &v.value
1634 {
1635 results.push((entry.key().clone(), value.clone()));
1636 }
1637 }
1638
1639 results
1640 }
1641
1642 /// Streaming scan iterator for very large datasets
1643 ///
1644 /// Returns an iterator that yields (key, value) pairs without
1645 /// materializing the entire result set in memory.
1646 pub fn scan_prefix_iter<'a>(
1647 &'a self,
1648 prefix: &'a [u8],
1649 snapshot_ts: u64,
1650 current_txn_id: Option<u64>,
1651 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1652 self.data.iter().filter_map(move |entry| {
1653 let key = entry.key();
1654 if !key.starts_with(prefix) {
1655 return None;
1656 }
1657 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1658 && let Some(value) = &v.value
1659 {
1660 Some((key.clone(), value.clone()))
1661 } else {
1662 None
1663 }
1664 })
1665 }
1666
1667 /// Scan range
1668 pub fn scan_range(
1669 &self,
1670 start: &[u8],
1671 end: &[u8],
1672 snapshot_ts: u64,
1673 current_txn_id: Option<u64>,
1674 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1675 let mut results = Vec::new();
1676
1677 if let Some(ref idx) = self.deferred_index {
1678 // Deferred index path: sort-on-scan
1679 if end.is_empty() {
1680 for key in idx.range_from(start) {
1681 if let Some(chain) = self.data.get(&key)
1682 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1683 && let Some(value) = &v.value
1684 {
1685 results.push((key, value.clone()));
1686 }
1687 }
1688 } else {
1689 for key in idx.range(start, end) {
1690 if let Some(chain) = self.data.get(&key)
1691 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1692 && let Some(value) = &v.value
1693 {
1694 results.push((key, value.clone()));
1695 }
1696 }
1697 }
1698 } else if let Some(ref idx) = self.ordered_index {
1699 // Use range scan on SkipMap
1700 if end.is_empty() {
1701 // Unbounded end
1702 for entry in idx.range(start.to_vec()..) {
1703 let key = entry.key();
1704 if let Some(chain) = self.data.get(key)
1705 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1706 && let Some(value) = &v.value
1707 {
1708 results.push((key.clone(), value.clone()));
1709 }
1710 }
1711 } else {
1712 for entry in idx.range(start.to_vec()..end.to_vec()) {
1713 let key = entry.key();
1714 if let Some(chain) = self.data.get(key)
1715 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1716 && let Some(value) = &v.value
1717 {
1718 results.push((key.clone(), value.clone()));
1719 }
1720 }
1721 }
1722 } else {
1723 // Fallback to full scan if no ordered index
1724 for entry in self.data.iter() {
1725 let key = entry.key();
1726
1727 if key.as_slice() < start {
1728 continue;
1729 }
1730 if !end.is_empty() && key.as_slice() >= end {
1731 continue;
1732 }
1733
1734 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1735 && let Some(value) = &v.value
1736 {
1737 results.push((key.clone(), value.clone()));
1738 }
1739 }
1740 }
1741
1742 results
1743 }
1744
1745 /// Streaming range scan iterator for very large datasets
1746 ///
1747 /// Returns an iterator that yields (key, value) pairs without
1748 /// materializing the entire result set in memory. Uses the ordered
1749 /// index when available for O(log N + K) complexity.
1750 ///
1751 /// ## Zero-Allocation Design
1752 ///
1753 /// While the iterator itself cannot avoid allocations for returned
1754 /// values (since the caller needs ownership), it avoids:
1755 /// - Pre-materializing all results
1756 /// - Intermediate buffers
1757 /// - Repeated key comparisons for already-visited entries
1758 ///
1759 /// ## Usage
1760 ///
1761 /// ```ignore
1762 /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1763 /// // Process each result as it arrives
1764 /// // Memory usage is O(1) per iteration, not O(N) total
1765 /// }
1766 /// ```
1767 pub fn scan_range_iter<'a>(
1768 &'a self,
1769 start: &'a [u8],
1770 end: &'a [u8],
1771 snapshot_ts: u64,
1772 current_txn_id: Option<u64>,
1773 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1774 // Compact deferred index before scanning if needed
1775 if let Some(ref idx) = self.deferred_index {
1776 idx.compact();
1777 }
1778
1779 // Use either ordered index or full scan
1780 let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1781
1782 // Create iterator based on availability of ordered index
1783 ScanRangeIterator {
1784 memtable: self,
1785 start: start.to_vec(),
1786 end: end.to_vec(),
1787 snapshot_ts,
1788 current_txn_id,
1789 use_ordered,
1790 ordered_iter: None,
1791 unordered_iter: None,
1792 initialized: false,
1793 }
1794 }
1795
1796 /// Get approximate size
1797 pub fn size(&self) -> u64 {
1798 self.size_bytes.load(Ordering::Relaxed)
1799 }
1800
1801 /// Garbage collect old versions using epoch-based dirty list
1802 ///
1803 /// O(expired_versions) instead of O(all_versions)
1804 /// Only visits keys that had versions created in the old epoch.
1805 pub fn gc(&self, min_active_ts: u64) -> usize {
1806 // Advance epoch and get the dirty keys from the old epoch
1807 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1808
1809 if dirty_keys.is_empty() {
1810 return 0;
1811 }
1812
1813 let mut gc_count = 0;
1814
1815 // Only visit keys that were modified in the old epoch
1816 // Use a HashSet to deduplicate keys that were written multiple times
1817 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1818
1819 for key in unique_keys {
1820 if let Some(mut entry) = self.data.get_mut(&key) {
1821 let before = entry.value().version_count();
1822 entry.value_mut().gc(min_active_ts);
1823 gc_count += before.saturating_sub(entry.value().version_count());
1824 }
1825 }
1826
1827 gc_count
1828 }
1829
1830 /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1831 #[allow(dead_code)]
1832 pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1833 let mut gc_count = 0;
1834
1835 for mut entry in self.data.iter_mut() {
1836 let before = entry.value().version_count();
1837 entry.value_mut().gc(min_active_ts);
1838 gc_count += before.saturating_sub(entry.value().version_count());
1839 }
1840
1841 gc_count
1842 }
1843}
1844
1845// ============================================================================
1846// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1847// ============================================================================
1848
1849use crate::key_buffer::ArenaKeyHandle;
1850
1851/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1852struct ArenaEpochDirtyList {
1853 epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1854 current_epoch: AtomicU64,
1855}
1856
1857impl ArenaEpochDirtyList {
1858 fn new() -> Self {
1859 Self {
1860 epochs: [
1861 parking_lot::Mutex::new(Vec::new()),
1862 parking_lot::Mutex::new(Vec::new()),
1863 parking_lot::Mutex::new(Vec::new()),
1864 parking_lot::Mutex::new(Vec::new()),
1865 ],
1866 current_epoch: AtomicU64::new(0),
1867 }
1868 }
1869
1870 #[inline]
1871 fn record_version(&self, key: ArenaKeyHandle) {
1872 let epoch = self.current_epoch.load(Ordering::Relaxed);
1873 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1874 self.epochs[idx].lock().push(key);
1875 }
1876
1877 fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1878 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1879 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1880 let mut guard = self.epochs[old_idx].lock();
1881 let keys = std::mem::take(&mut *guard);
1882 (old_epoch, keys)
1883 }
1884}
1885
1886/// Arena-backed MVCC MemTable with optimized key storage
1887///
1888/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1889/// reducing per-write allocations from 3 to 1:
1890///
1891/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1892/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1893///
1894/// ## Performance
1895///
1896/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1897/// by reducing:
1898/// - Heap allocations: 3 → 1 per write
1899/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1900pub struct ArenaMvccMemTable {
1901 /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1902 data: DashMap<ArenaKeyHandle, VersionChain>,
1903 /// Ordered index for prefix scans
1904 ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1905 /// Approximate size in bytes
1906 size_bytes: AtomicU64,
1907 /// Epoch-based dirty list (arena-backed)
1908 dirty_list: ArenaEpochDirtyList,
1909}
1910
1911impl ArenaMvccMemTable {
1912 pub fn new() -> Self {
1913 Self::with_ordered_index(true)
1914 }
1915
1916 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1917 Self {
1918 data: DashMap::new(),
1919 ordered_index: if enable_ordered_index {
1920 Some(SkipMap::new())
1921 } else {
1922 None
1923 },
1924 size_bytes: AtomicU64::new(0),
1925 dirty_list: ArenaEpochDirtyList::new(),
1926 }
1927 }
1928
1929 /// Write a key-value pair using arena key handle
1930 ///
1931 /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
1932 /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
1933 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1934 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1935 let key_len = key.len();
1936
1937 // Create ONE ArenaKeyHandle - this is the only allocation for the key
1938 let key_handle = ArenaKeyHandle::new(key);
1939
1940 // Track in dirty list (O(1) copy of 16-byte handle)
1941 self.dirty_list.record_version(key_handle.clone());
1942
1943 // Insert into ordered index (O(1) copy of 16-byte handle)
1944 if let Some(ref idx) = self.ordered_index {
1945 idx.insert(key_handle.clone(), ());
1946 }
1947
1948 // Use entry API with the handle
1949 let mut entry = self.data.entry(key_handle).or_default();
1950
1951 if entry.has_write_conflict(txn_id) {
1952 return Err(SochDBError::Internal(
1953 "Write-write conflict detected".into(),
1954 ));
1955 }
1956 entry.add_uncommitted(value, txn_id);
1957 self.size_bytes
1958 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1959
1960 Ok(())
1961 }
1962
1963 /// Write batch using arena key handles
1964 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1965 let mut total_size = 0u64;
1966
1967 for (key, value) in writes {
1968 let key_handle = ArenaKeyHandle::new(key);
1969
1970 self.dirty_list.record_version(key_handle.clone());
1971
1972 if let Some(ref idx) = self.ordered_index {
1973 idx.insert(key_handle.clone(), ());
1974 }
1975
1976 let mut entry = self.data.entry(key_handle).or_default();
1977
1978 if entry.has_write_conflict(txn_id) {
1979 return Err(SochDBError::Internal(
1980 "Write-write conflict detected".into(),
1981 ));
1982 }
1983
1984 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1985 entry.add_uncommitted(value.clone(), txn_id);
1986 total_size += (key.len() + value_size) as u64;
1987 }
1988
1989 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1990 Ok(())
1991 }
1992
1993 /// Read at snapshot timestamp
1994 pub fn read(
1995 &self,
1996 key: &[u8],
1997 snapshot_ts: u64,
1998 current_txn_id: Option<u64>,
1999 ) -> Option<Vec<u8>> {
2000 // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
2001 let key_handle = ArenaKeyHandle::new(key);
2002 self.data.get(&key_handle).and_then(|chain| {
2003 chain
2004 .read_at(snapshot_ts, current_txn_id)
2005 .and_then(|v| v.value.clone())
2006 })
2007 }
2008
2009 /// Commit transaction
2010 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2011 for key in write_set {
2012 let key_handle = ArenaKeyHandle::new(key.as_slice());
2013 if let Some(mut chain) = self.data.get_mut(&key_handle) {
2014 chain.commit(txn_id, commit_ts);
2015 }
2016 }
2017 }
2018
2019 /// Abort transaction
2020 pub fn abort(&self, txn_id: u64) {
2021 for mut entry in self.data.iter_mut() {
2022 entry.value_mut().abort(txn_id);
2023 }
2024 }
2025
2026 /// Scan prefix
2027 pub fn scan_prefix(
2028 &self,
2029 prefix: &[u8],
2030 snapshot_ts: u64,
2031 current_txn_id: Option<u64>,
2032 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2033 let mut results = Vec::new();
2034 let prefix_handle = ArenaKeyHandle::new(prefix);
2035
2036 if let Some(ref idx) = self.ordered_index {
2037 for entry in idx.range(prefix_handle..) {
2038 let key = entry.key();
2039
2040 if !key.as_bytes().starts_with(prefix) {
2041 break;
2042 }
2043
2044 if let Some(chain) = self.data.get(key)
2045 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2046 && let Some(value) = &v.value
2047 {
2048 results.push((key.as_bytes().to_vec(), value.clone()));
2049 }
2050 }
2051 } else {
2052 for entry in self.data.iter() {
2053 let key = entry.key();
2054 if !key.as_bytes().starts_with(prefix) {
2055 continue;
2056 }
2057 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2058 && let Some(value) = &v.value
2059 {
2060 results.push((key.as_bytes().to_vec(), value.clone()));
2061 }
2062 }
2063 }
2064
2065 results
2066 }
2067
2068 /// Get approximate size
2069 pub fn size(&self) -> u64 {
2070 self.size_bytes.load(Ordering::Relaxed)
2071 }
2072
2073 /// Garbage collect old versions
2074 pub fn gc(&self, min_active_ts: u64) -> usize {
2075 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2076
2077 if dirty_keys.is_empty() {
2078 return 0;
2079 }
2080
2081 let mut gc_count = 0;
2082 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2083
2084 for key in unique_keys {
2085 if let Some(mut entry) = self.data.get_mut(&key) {
2086 let before = entry.value().version_count();
2087 entry.value_mut().gc(min_active_ts);
2088 gc_count += before.saturating_sub(entry.value().version_count());
2089 }
2090 }
2091
2092 gc_count
2093 }
2094}
2095
2096impl Default for ArenaMvccMemTable {
2097 fn default() -> Self {
2098 Self::new()
2099 }
2100}
2101
2102// ============================================================================
2103// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2104// ============================================================================
2105
2106/// Configuration for memtable type selection
2107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2108pub enum MemTableType {
2109 /// Standard MVCC memtable with deferred sorting
2110 /// Best for: general workloads, balanced read/write
2111 Standard,
2112 /// Arena-backed memtable with reduced allocations
2113 /// Best for: write-heavy workloads, large keys
2114 Arena,
2115}
2116
2117impl Default for MemTableType {
2118 fn default() -> Self {
2119 // Default to Standard which now has deferred sorting
2120 MemTableType::Standard
2121 }
2122}
2123
2124/// Unified memtable abstraction using enum dispatch
2125///
2126/// This pattern provides:
2127/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2128/// - Type-safe switching between implementations
2129/// - Easy extensibility for future memtable types
2130///
2131/// ## Why Enum over Trait Object?
2132///
2133/// - Hot path performance: enum match is a single branch vs vtable indirection
2134/// - Cache friendliness: no pointer chasing
2135/// - Inlining: compiler can inline through enum dispatch
2136pub enum MemTableKind {
2137 Standard(MvccMemTable),
2138 Arena(ArenaMvccMemTable),
2139}
2140
2141impl MemTableKind {
2142 /// Create a new memtable of the specified type
2143 pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2144 match kind {
2145 MemTableType::Standard => {
2146 MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2147 }
2148 MemTableType::Arena => {
2149 MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2150 }
2151 }
2152 }
2153
2154 /// Write a key-value pair
2155 #[inline]
2156 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2157 match self {
2158 MemTableKind::Standard(m) => m.write(key, value, txn_id),
2159 MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2160 }
2161 }
2162
2163 /// Write batch of key-value pairs
2164 #[inline]
2165 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2166 match self {
2167 MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2168 MemTableKind::Arena(m) => {
2169 // Convert to arena-compatible format
2170 let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2171 .iter()
2172 .map(|(k, v)| (k.as_slice(), v.clone()))
2173 .collect();
2174 m.write_batch(&arena_writes, txn_id)
2175 }
2176 }
2177 }
2178
2179 /// Read at snapshot timestamp
2180 #[inline]
2181 pub fn read(
2182 &self,
2183 key: &[u8],
2184 snapshot_ts: u64,
2185 current_txn_id: Option<u64>,
2186 ) -> Option<Vec<u8>> {
2187 match self {
2188 MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2189 MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2190 }
2191 }
2192
2193 /// Commit transaction
2194 #[inline]
2195 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2196 match self {
2197 MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2198 MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2199 }
2200 }
2201
2202 /// Abort transaction
2203 #[inline]
2204 pub fn abort(&self, txn_id: u64) {
2205 match self {
2206 MemTableKind::Standard(m) => m.abort(txn_id),
2207 MemTableKind::Arena(m) => m.abort(txn_id),
2208 }
2209 }
2210
2211 /// Scan prefix
2212 #[inline]
2213 pub fn scan_prefix(
2214 &self,
2215 prefix: &[u8],
2216 snapshot_ts: u64,
2217 current_txn_id: Option<u64>,
2218 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2219 match self {
2220 MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2221 MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2222 }
2223 }
2224
2225 /// Scan range
2226 #[inline]
2227 pub fn scan_range(
2228 &self,
2229 start: &[u8],
2230 end: &[u8],
2231 snapshot_ts: u64,
2232 current_txn_id: Option<u64>,
2233 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2234 match self {
2235 MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2236 MemTableKind::Arena(m) => {
2237 // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2238 let mut results = Vec::new();
2239 if let Some(ref idx) = m.ordered_index {
2240 let start_handle = ArenaKeyHandle::new(start);
2241 let end_handle = ArenaKeyHandle::new(end);
2242
2243 if end.is_empty() {
2244 for entry in idx.range(start_handle..) {
2245 let key = entry.key();
2246 if let Some(chain) = m.data.get(key)
2247 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2248 && let Some(value) = &v.value
2249 {
2250 results.push((key.as_bytes().to_vec(), value.clone()));
2251 }
2252 }
2253 } else {
2254 for entry in idx.range(start_handle..end_handle) {
2255 let key = entry.key();
2256 if let Some(chain) = m.data.get(key)
2257 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2258 && let Some(value) = &v.value
2259 {
2260 results.push((key.as_bytes().to_vec(), value.clone()));
2261 }
2262 }
2263 }
2264 } else {
2265 for entry in m.data.iter() {
2266 let key = entry.key();
2267 let key_bytes = key.as_bytes();
2268 if key_bytes < start {
2269 continue;
2270 }
2271 if !end.is_empty() && key_bytes >= end {
2272 continue;
2273 }
2274 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2275 && let Some(value) = &v.value
2276 {
2277 results.push((key_bytes.to_vec(), value.clone()));
2278 }
2279 }
2280 }
2281 results
2282 }
2283 }
2284 }
2285
2286 /// Scan range iterator (returns collected results for now)
2287 #[inline]
2288 pub fn scan_range_iter<'a>(
2289 &'a self,
2290 start: &'a [u8],
2291 end: &'a [u8],
2292 snapshot_ts: u64,
2293 current_txn_id: Option<u64>,
2294 ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2295 match self {
2296 MemTableKind::Standard(m) => {
2297 Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2298 }
2299 MemTableKind::Arena(_) => {
2300 // Arena version returns collected results as iterator
2301 let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2302 Box::new(results.into_iter())
2303 }
2304 }
2305 }
2306
2307 /// Get approximate size
2308 #[inline]
2309 pub fn size(&self) -> u64 {
2310 match self {
2311 MemTableKind::Standard(m) => m.size(),
2312 MemTableKind::Arena(m) => m.size(),
2313 }
2314 }
2315
2316 /// Garbage collect old versions
2317 #[inline]
2318 pub fn gc(&self, min_active_ts: u64) -> usize {
2319 match self {
2320 MemTableKind::Standard(m) => m.gc(min_active_ts),
2321 MemTableKind::Arena(m) => m.gc(min_active_ts),
2322 }
2323 }
2324
2325 /// Get the kind of memtable
2326 pub fn kind(&self) -> MemTableType {
2327 match self {
2328 MemTableKind::Standard(_) => MemTableType::Standard,
2329 MemTableKind::Arena(_) => MemTableType::Arena,
2330 }
2331 }
2332}
2333
2334/// Durable storage engine with full ACID support
2335pub struct DurableStorage {
2336 /// Path to storage directory
2337 path: PathBuf,
2338 /// Write-ahead log
2339 wal: Arc<TxnWal>,
2340 /// MVCC manager
2341 mvcc: Arc<MvccManager>,
2342 /// In-memory data (unified abstraction over Standard/Arena)
2343 memtable: Arc<MemTableKind>,
2344 /// Per-transaction WAL buffers for batched writes
2345 /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2346 /// At commit, buffer is flushed to WAL with single lock acquisition
2347 txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2348 /// Group commit buffer (optional)
2349 group_commit: Option<Arc<EventDrivenGroupCommit>>,
2350 /// Recovery state
2351 needs_recovery: AtomicU64, // 1 = needs recovery
2352 /// Last checkpoint LSN
2353 last_checkpoint_lsn: AtomicU64,
2354 /// Synchronous mode (like SQLite's PRAGMA synchronous)
2355 /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2356 sync_mode: AtomicU64,
2357 /// Commits since last sync (for NORMAL mode)
2358 commits_since_sync: AtomicU64,
2359 /// Adaptive batch sizing for NORMAL mode (Little's Law)
2360 /// Arrival rate in requests/sec × 1000 for precision
2361 arrival_rate_ema: AtomicU64,
2362 /// Last commit timestamp in microseconds
2363 last_commit_us: AtomicU64,
2364 /// Estimated fsync latency in microseconds
2365 fsync_latency_us: AtomicU64,
2366 /// Database lock for exclusive access (None = no locking)
2367 #[allow(dead_code)]
2368 db_lock: Option<crate::lock::DatabaseLock>,
2369}
2370
2371impl DurableStorage {
2372 /// Open or create durable storage at path
2373 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2374 Self::open_with_config(path, true)
2375 }
2376
2377 /// Open with configurable ordered index
2378 ///
2379 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2380 /// but scan_prefix becomes O(N) instead of O(log N + K)
2381 pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2382 Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2383 }
2384
2385 /// Open with arena-backed memtable for write-heavy workloads
2386 ///
2387 /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2388 /// Best for workloads with:
2389 /// - High write throughput
2390 /// - Large keys (reduces allocation overhead)
2391 /// - Minimal concurrent reads during writes
2392 pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2393 Self::open_with_full_config(path, true, MemTableType::Arena)
2394 }
2395
2396 /// Open with full configuration options
2397 ///
2398 /// # Arguments
2399 /// * `path` - Storage directory path
2400 /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2401 /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2402 ///
2403 /// # Locking
2404 ///
2405 /// Acquires an exclusive advisory lock on the database directory.
2406 /// This prevents concurrent multi-process access which would corrupt data.
2407 /// If another process has the database open, returns `Err(DatabaseLocked)`.
2408 pub fn open_with_full_config<P: AsRef<Path>>(
2409 path: P,
2410 enable_ordered_index: bool,
2411 memtable_type: MemTableType,
2412 ) -> Result<Self> {
2413 Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, true)
2414 }
2415
2416 /// Open without locking (for testing crash recovery scenarios)
2417 ///
2418 /// # Safety
2419 /// This should ONLY be used in tests that simulate crashes by forgetting
2420 /// the storage instance. In production, always use `open_with_full_config`.
2421 #[cfg(test)]
2422 pub fn open_without_lock<P: AsRef<Path>>(path: P) -> Result<Self> {
2423 Self::open_with_full_config_internal(path, true, MemTableType::Standard, false)
2424 }
2425
2426 fn open_with_full_config_internal<P: AsRef<Path>>(
2427 path: P,
2428 enable_ordered_index: bool,
2429 memtable_type: MemTableType,
2430 acquire_lock: bool,
2431 ) -> Result<Self> {
2432 let path = path.as_ref().to_path_buf();
2433 std::fs::create_dir_all(&path)?;
2434
2435 // Acquire exclusive lock on database directory (unless disabled for testing)
2436 let db_lock = if acquire_lock {
2437 Some(crate::lock::DatabaseLock::acquire(&path)
2438 .map_err(|e| SochDBError::LockError(e.to_string()))?)
2439 } else {
2440 None
2441 };
2442
2443 let wal_path = path.join("wal.log");
2444 let wal = Arc::new(TxnWal::new(&wal_path)?);
2445
2446 let storage = Self {
2447 path,
2448 wal: wal.clone(),
2449 mvcc: Arc::new(MvccManager::new()),
2450 memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2451 txn_write_buffers: DashMap::new(),
2452 group_commit: None,
2453 needs_recovery: AtomicU64::new(0),
2454 last_checkpoint_lsn: AtomicU64::new(0),
2455 sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2456 commits_since_sync: AtomicU64::new(0),
2457 // Adaptive batch sizing (Little's Law)
2458 arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2459 last_commit_us: AtomicU64::new(0),
2460 fsync_latency_us: AtomicU64::new(5000), // 5ms default
2461 db_lock,
2462 };
2463
2464 // Check if recovery needed
2465 if storage.check_recovery_needed()? {
2466 storage.needs_recovery.store(1, Ordering::SeqCst);
2467 }
2468
2469 Ok(storage)
2470 }
2471
2472 /// Open with group commit enabled
2473 pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2474 Self::open_with_group_commit_and_config(path, true)
2475 }
2476
2477 /// Open with group commit and configurable ordered index
2478 pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2479 path: P,
2480 enable_ordered_index: bool,
2481 ) -> Result<Self> {
2482 let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2483
2484 let wal = storage.wal.clone();
2485 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2486 // Write all commit records WITHOUT flushing (batch them)
2487 for &txn_id in txn_ids {
2488 let entry = TxnWalEntry::txn_commit(txn_id);
2489 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2490 }
2491
2492 // Then do a SINGLE flush + fsync for the entire batch
2493 wal.flush().map_err(|e| e.to_string())?;
2494 wal.sync().map_err(|e| e.to_string())?;
2495
2496 // Return commit timestamp
2497 Ok(std::time::SystemTime::now()
2498 .duration_since(std::time::UNIX_EPOCH)
2499 .unwrap()
2500 .as_micros() as u64)
2501 });
2502
2503 storage.group_commit = Some(Arc::new(gc));
2504 Ok(storage)
2505 }
2506
2507 /// Open with IndexPolicy for automatic memtable/index configuration
2508 ///
2509 /// This is the recommended constructor for new code. The policy determines:
2510 /// - Whether to use ordered index (ScanOptimized only)
2511 /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2512 /// - Default settings optimized for the workload pattern
2513 ///
2514 /// # Arguments
2515 /// * `path` - Storage directory path
2516 /// * `policy` - Index policy determining write/scan tradeoffs
2517 /// * `group_commit` - Whether to enable group commit for throughput
2518 pub fn open_with_policy<P: AsRef<Path>>(
2519 path: P,
2520 policy: crate::index_policy::IndexPolicy,
2521 group_commit: bool,
2522 ) -> Result<Self> {
2523 use crate::index_policy::IndexPolicy;
2524
2525 // Derive configuration from policy
2526 let (enable_ordered_index, memtable_type) = match policy {
2527 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2528 // Write-heavy: no ordered index, use arena for reduced allocations
2529 (false, MemTableType::Arena)
2530 }
2531 IndexPolicy::Balanced => {
2532 // Mixed OLTP: deferred sorting (already implemented in Standard)
2533 (true, MemTableType::Standard)
2534 }
2535 IndexPolicy::ScanOptimized => {
2536 // Scan-heavy: maintain ordered index
2537 (true, MemTableType::Standard)
2538 }
2539 };
2540
2541 if group_commit {
2542 let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2543
2544 let wal = storage.wal.clone();
2545 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2546 for &txn_id in txn_ids {
2547 let entry = TxnWalEntry::txn_commit(txn_id);
2548 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2549 }
2550 wal.flush().map_err(|e| e.to_string())?;
2551 wal.sync().map_err(|e| e.to_string())?;
2552 Ok(std::time::SystemTime::now()
2553 .duration_since(std::time::UNIX_EPOCH)
2554 .unwrap()
2555 .as_micros() as u64)
2556 });
2557 storage.group_commit = Some(Arc::new(gc));
2558 Ok(storage)
2559 } else {
2560 Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2561 }
2562 }
2563
2564 /// Open storage for concurrent mode (multi-reader, single-writer)
2565 ///
2566 /// This method opens the storage WITHOUT acquiring the exclusive file lock.
2567 /// Coordination is handled by the concurrent MVCC layer instead.
2568 ///
2569 /// # Safety
2570 ///
2571 /// This must ONLY be called from `Database::open_concurrent()` which
2572 /// manages the concurrent MVCC coordination. Direct use will cause
2573 /// data corruption.
2574 pub fn open_for_concurrent<P: AsRef<Path>>(
2575 path: P,
2576 policy: crate::index_policy::IndexPolicy,
2577 ) -> Result<Self> {
2578 use crate::index_policy::IndexPolicy;
2579
2580 let (enable_ordered_index, memtable_type) = match policy {
2581 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2582 (false, MemTableType::Arena)
2583 }
2584 IndexPolicy::Balanced => {
2585 (true, MemTableType::Standard)
2586 }
2587 IndexPolicy::ScanOptimized => {
2588 (true, MemTableType::Standard)
2589 }
2590 };
2591
2592 // Open WITHOUT exclusive file lock (concurrent MVCC handles coordination)
2593 Self::open_with_full_config_internal(path, enable_ordered_index, memtable_type, false)
2594 }
2595
2596 /// Get the memtable type being used
2597 pub fn memtable_type(&self) -> MemTableType {
2598 self.memtable.kind()
2599 }
2600
2601 /// Check if recovery is needed (dirty shutdown detection)
2602 ///
2603 /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2604 /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2605 /// but committed data still needs to be loaded from WAL into the memtable.
2606 fn check_recovery_needed(&self) -> Result<bool> {
2607 let marker_path = self.path.join(".clean_shutdown");
2608 if marker_path.exists() {
2609 // Clean shutdown - remove marker
2610 std::fs::remove_file(&marker_path)?;
2611 }
2612 // ALWAYS need recovery to rebuild memtable from WAL
2613 // The memtable is in-memory only and needs to be restored on every startup
2614 Ok(true)
2615 }
2616
2617 /// Perform crash recovery
2618 pub fn recover(&self) -> Result<RecoveryStats> {
2619 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2620 return Ok(RecoveryStats::default());
2621 }
2622
2623 let (writes, txn_count) = self.wal.replay_for_recovery()?;
2624
2625 // Apply committed writes to memtable
2626 let recovery_txn_id = self.wal.alloc_txn_id();
2627 let commit_ts = self.mvcc.alloc_commit_ts();
2628
2629 // Collect keys being written for efficient commit
2630 let mut write_set: HashSet<InlineKey> = HashSet::new();
2631 for (key, value) in &writes {
2632 write_set.insert(SmallVec::from_slice(key));
2633 self.memtable
2634 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2635 }
2636 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2637
2638 self.needs_recovery.store(0, Ordering::SeqCst);
2639
2640 Ok(RecoveryStats {
2641 transactions_recovered: txn_count,
2642 writes_recovered: writes.len(),
2643 commit_ts,
2644 })
2645 }
2646
2647 /// Begin a new transaction
2648 pub fn begin_transaction(&self) -> Result<u64> {
2649 let txn_id = self.wal.begin_transaction()?;
2650 self.mvcc.begin(txn_id);
2651 Ok(txn_id)
2652 }
2653
2654 /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2655 ///
2656 /// This enables mode-aware optimizations:
2657 /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2658 /// - WriteOnly: Skip read tracking, faster bulk inserts
2659 /// - ReadWrite: Full SSI for serializable isolation
2660 pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2661 let txn_id = self.wal.begin_transaction()?;
2662 self.mvcc.begin_with_mode(txn_id, mode);
2663 Ok(txn_id)
2664 }
2665
2666 /// Read a key within a transaction
2667 #[inline]
2668 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2669 // Fast path: get just snapshot_ts without cloning whole transaction
2670 let snapshot_ts = self
2671 .mvcc
2672 .get_snapshot_ts(txn_id)
2673 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2674
2675 // Record read for SSI (skipped for read-only transactions)
2676 self.mvcc.record_read(txn_id, key);
2677
2678 // Read at snapshot timestamp, seeing own uncommitted writes
2679 Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2680 }
2681
2682 /// Write a key-value pair within a transaction
2683 ///
2684 /// Writes are buffered and only flushed to disk on commit.
2685 /// This provides ~10× better throughput for batched inserts.
2686 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2687 // Use the zero-allocation path internally
2688 self.write_refs(txn_id, &key, &value)?;
2689
2690 Ok(())
2691 }
2692
2693 /// Write from references - zero allocation hot path
2694 ///
2695 /// Avoids cloning key/value by writing to WAL from refs directly,
2696 /// then only allocating once for memtable storage.
2697 #[inline]
2698 pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2699 // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2700 self.mvcc.record_write(txn_id, key);
2701
2702 // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2703 // This reduces lock contention from O(writes) to O(1) per transaction
2704 self.txn_write_buffers
2705 .entry(txn_id)
2706 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2707 .append(key, value);
2708
2709 // Write to memtable (needs owned key/value for storage)
2710 self.memtable
2711 .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2712
2713 Ok(())
2714 }
2715
2716 /// Delete a key within a transaction
2717 pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2718 // Record write (uses InlineKey - zero allocation for small keys)
2719 self.mvcc.record_write(txn_id, &key);
2720
2721 // Buffer tombstone in memory - NO WAL lock taken!
2722 self.txn_write_buffers
2723 .entry(txn_id)
2724 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2725 .append(&key, &[]); // Empty value = tombstone
2726
2727 // Write tombstone to memtable
2728 self.memtable.write(key, None, txn_id)?;
2729
2730 Ok(())
2731 }
2732
2733 /// Batch write multiple key-value pairs with reduced overhead
2734 ///
2735 /// This API amortizes fixed costs over the batch:
2736 /// - Single DashMap entry lookup for TxnWalBuffer
2737 /// - Single MVCC write set update
2738 /// - Batch memtable operations
2739 ///
2740 /// Performance: ~2-3x faster than individual write_refs calls
2741 /// for batches of 100+ entries.
2742 ///
2743 /// # Arguments
2744 /// * `txn_id` - Transaction ID
2745 /// * `writes` - Slice of (key, value) pairs
2746 #[inline]
2747 pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2748 if writes.is_empty() {
2749 return Ok(());
2750 }
2751
2752 // Single DashMap access for entire batch
2753 let mut buffer_entry = self
2754 .txn_write_buffers
2755 .entry(txn_id)
2756 .or_insert_with(|| TxnWalBuffer::new(txn_id));
2757
2758 // Batch operations with reduced per-row overhead
2759 for (key, value) in writes {
2760 // Record write for MVCC
2761 self.mvcc.record_write(txn_id, key);
2762
2763 // Append to WAL buffer
2764 buffer_entry.append(key, value);
2765 }
2766 drop(buffer_entry);
2767
2768 // Batch write to memtable
2769 let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2770 .iter()
2771 .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2772 .collect();
2773 self.memtable.write_batch(&owned_writes, txn_id)?;
2774
2775 Ok(())
2776 }
2777
2778 /// Commit a transaction
2779 ///
2780 /// With sync_mode:
2781 /// - 0 (OFF): No sync, risk of data loss
2782 /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2783 /// - 2 (FULL): Sync every commit (safest, slowest)
2784 pub fn commit(&self, txn_id: u64) -> Result<u64> {
2785 // First, flush all buffered writes to WAL with SINGLE lock acquisition
2786 // This is the key optimization: O(1) lock instead of O(writes) locks
2787 if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2788 && !buffer.is_empty()
2789 {
2790 // Flush entire buffer to WAL with one lock
2791 self.wal.flush_buffer(&buffer)?;
2792 }
2793
2794 // Use group commit if available, otherwise direct commit
2795 if let Some(gc) = &self.group_commit {
2796 // Submit to group commit and wait for result
2797 // This batches multiple commits into a single fsync
2798 gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
2799
2800 // Get commit timestamp and write_set from MVCC
2801 let (commit_ts, write_set) = self
2802 .mvcc
2803 .commit(txn_id)
2804 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2805
2806 // Commit in memtable (O(k) where k = keys written)
2807 self.memtable.commit(txn_id, commit_ts, &write_set);
2808
2809 Ok(commit_ts)
2810 } else {
2811 // Direct commit path with adaptive sync (Little's Law)
2812 let sync_mode = self.sync_mode.load(Ordering::Relaxed);
2813 let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
2814
2815 // Update arrival rate for adaptive batching
2816 self.update_arrival_rate();
2817
2818 // Write commit record (no flush yet - BufWriter will buffer it)
2819 let entry = TxnWalEntry::txn_commit(txn_id);
2820 self.wal.append_no_flush(&entry)?;
2821
2822 // Determine if we should sync/flush based on mode
2823 let should_sync = match sync_mode {
2824 0 => false, // OFF: never sync
2825 1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
2826 _ => true, // FULL: always sync
2827 };
2828
2829 if should_sync {
2830 // Measure fsync latency for adaptive tuning
2831 let start = std::time::Instant::now();
2832 self.wal.flush()?;
2833 self.wal.sync()?;
2834 let latency_us = start.elapsed().as_micros() as u64;
2835
2836 // Update fsync latency estimate (EMA with α = 0.1)
2837 let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
2838 let new_latency = (old_latency * 9 + latency_us) / 10;
2839 self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
2840
2841 self.commits_since_sync.store(0, Ordering::Relaxed);
2842 }
2843
2844 // Get commit timestamp and write_set from MVCC
2845 let (commit_ts, write_set) = self
2846 .mvcc
2847 .commit(txn_id)
2848 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2849
2850 // Commit in memtable (O(k) where k = keys written)
2851 self.memtable.commit(txn_id, commit_ts, &write_set);
2852
2853 Ok(commit_ts)
2854 }
2855 }
2856
2857 /// Update arrival rate using exponential moving average
2858 #[inline]
2859 fn update_arrival_rate(&self) {
2860 let now_us = std::time::SystemTime::now()
2861 .duration_since(std::time::UNIX_EPOCH)
2862 .unwrap()
2863 .as_micros() as u64;
2864
2865 let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
2866
2867 if last > 0 {
2868 let delta_us = now_us.saturating_sub(last);
2869 if delta_us > 0 && delta_us < 10_000_000 {
2870 // Ignore gaps > 10s
2871 // Rate = 1_000_000 / delta_us (requests/sec)
2872 // Stored as rate × 1000 for precision
2873 let instant_rate = 1_000_000_000 / delta_us;
2874
2875 // EMA with α = 0.1
2876 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
2877 let new_rate = (old_rate * 9 + instant_rate) / 10;
2878 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
2879 }
2880 }
2881 }
2882
2883 /// Compute optimal batch threshold using Little's Law
2884 ///
2885 /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
2886 /// Returns the number of commits to batch before fsync
2887 #[inline]
2888 fn adaptive_batch_threshold(&self) -> u64 {
2889 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
2890 let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
2891
2892 if lambda <= 0.0 || tau <= 0.0 {
2893 return 100; // Fallback to fixed threshold
2894 }
2895
2896 // Little's Law: W* = sqrt(2 × τ × λ)
2897 // This minimizes total time = wait_time + fsync_overhead
2898 let n_opt = (2.0 * tau * lambda).sqrt();
2899
2900 // Clamp between 1 and 1000
2901 (n_opt as u64).clamp(1, 1000)
2902 }
2903
2904 /// Set synchronous mode
2905 ///
2906 /// - 0: OFF - No fsync (risk of data loss)
2907 /// - 1: NORMAL - Periodic fsync (balanced)
2908 /// - 2: FULL - Fsync every commit (safest)
2909 pub fn set_sync_mode(&self, mode: u64) {
2910 self.sync_mode.store(mode.min(2), Ordering::Relaxed);
2911 }
2912
2913 /// Force a group commit flush (useful for benchmarking or testing)
2914 pub fn flush_group_commit(&self) {
2915 if let Some(gc) = &self.group_commit {
2916 gc.flush_batch();
2917 }
2918 }
2919
2920 /// Abort a transaction
2921 pub fn abort(&self, txn_id: u64) -> Result<()> {
2922 // Discard buffered writes (no need to write to WAL)
2923 self.txn_write_buffers.remove(&txn_id);
2924
2925 // Write abort to WAL
2926 self.wal.abort_transaction(txn_id)?;
2927
2928 // Abort in MVCC
2929 self.mvcc.abort(txn_id);
2930
2931 // Abort in memtable
2932 self.memtable.abort(txn_id);
2933
2934 Ok(())
2935 }
2936
2937 /// Scan keys with prefix
2938 #[inline]
2939 pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2940 // Fast path: get just snapshot_ts without cloning whole transaction
2941 let snapshot_ts = self
2942 .mvcc
2943 .get_snapshot_ts(txn_id)
2944 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2945
2946 // Note: Scan doesn't record individual key reads for SSI (too expensive)
2947 // SSI conflicts are tracked at the prefix level if needed
2948 Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
2949 }
2950
2951 /// Scan keys in range
2952 #[inline]
2953 pub fn scan_range(
2954 &self,
2955 txn_id: u64,
2956 start: &[u8],
2957 end: &[u8],
2958 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2959 let snapshot_ts = self
2960 .mvcc
2961 .get_snapshot_ts(txn_id)
2962 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2963
2964 Ok(self
2965 .memtable
2966 .scan_range(start, end, snapshot_ts, Some(txn_id)))
2967 }
2968
2969 /// Streaming scan for very large result sets
2970 ///
2971 /// Returns an iterator that yields (key, value) pairs without
2972 /// materializing the entire result set in memory.
2973 #[inline]
2974 pub fn scan_range_iter<'a>(
2975 &'a self,
2976 txn_id: u64,
2977 start: &'a [u8],
2978 end: &'a [u8],
2979 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
2980 let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
2981 self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
2982 }
2983
2984 /// Force fsync to disk
2985 pub fn fsync(&self) -> Result<()> {
2986 self.wal.sync()
2987 }
2988
2989 /// Write checkpoint
2990 pub fn checkpoint(&self) -> Result<u64> {
2991 let txn_id = 0; // System transaction
2992 let entry = TxnWalEntry::checkpoint(txn_id);
2993 let lsn = self.wal.append_sync(&entry)?;
2994 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
2995 Ok(lsn)
2996 }
2997
2998 /// Get storage statistics
2999 pub fn stats(&self) -> StorageStats {
3000 // Get WAL size from the WAL manager
3001 let wal_size = self.wal.size_bytes();
3002
3003 // Get active transaction count from MVCC
3004 let active_txns = self.mvcc.active_transaction_count();
3005
3006 StorageStats {
3007 memtable_size_bytes: self.memtable.size(),
3008 wal_size_bytes: wal_size,
3009 active_transactions: active_txns,
3010 min_active_snapshot: self.mvcc.min_active_snapshot(),
3011 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
3012 }
3013 }
3014
3015 /// Garbage collect old versions
3016 pub fn gc(&self) -> usize {
3017 let min_ts = self.mvcc.min_active_snapshot();
3018 self.memtable.gc(min_ts)
3019 }
3020
3021 /// Clean shutdown
3022 pub fn shutdown(&self) -> Result<()> {
3023 // Sync WAL
3024 self.fsync()?;
3025
3026 // Write clean shutdown marker
3027 let marker_path = self.path.join(".clean_shutdown");
3028 std::fs::write(&marker_path, b"clean")?;
3029
3030 Ok(())
3031 }
3032}
3033
3034impl Drop for DurableStorage {
3035 fn drop(&mut self) {
3036 let _ = self.shutdown();
3037 }
3038}
3039
3040/// Recovery statistics
3041#[derive(Debug, Default)]
3042pub struct RecoveryStats {
3043 pub transactions_recovered: usize,
3044 pub writes_recovered: usize,
3045 pub commit_ts: u64,
3046}
3047
3048/// Storage statistics
3049#[derive(Debug, Default)]
3050pub struct StorageStats {
3051 pub memtable_size_bytes: u64,
3052 pub wal_size_bytes: u64,
3053 pub active_transactions: usize,
3054 pub min_active_snapshot: u64,
3055 pub last_checkpoint_lsn: u64,
3056}
3057
3058#[cfg(test)]
3059mod tests {
3060 use super::*;
3061 use tempfile::tempdir;
3062
3063 #[test]
3064 fn test_basic_transaction() {
3065 let dir = tempdir().unwrap();
3066 let storage = DurableStorage::open(dir.path()).unwrap();
3067
3068 // Begin transaction
3069 let txn_id = storage.begin_transaction().unwrap();
3070
3071 // Write data
3072 storage
3073 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3074 .unwrap();
3075 storage
3076 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3077 .unwrap();
3078
3079 // Read back (within same transaction)
3080 let v1 = storage.read(txn_id, b"key1").unwrap();
3081 assert_eq!(v1, Some(b"value1".to_vec()));
3082
3083 // Commit
3084 let commit_ts = storage.commit(txn_id).unwrap();
3085 assert!(commit_ts > 0);
3086
3087 // Read in new transaction
3088 let txn2 = storage.begin_transaction().unwrap();
3089 let v1 = storage.read(txn2, b"key1").unwrap();
3090 assert_eq!(v1, Some(b"value1".to_vec()));
3091 storage.abort(txn2).unwrap();
3092 }
3093
3094 #[test]
3095 fn test_snapshot_isolation() {
3096 let dir = tempdir().unwrap();
3097 let storage = DurableStorage::open(dir.path()).unwrap();
3098
3099 // T1: Write initial value
3100 let t1 = storage.begin_transaction().unwrap();
3101 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3102 storage.commit(t1).unwrap();
3103
3104 // T2: Start reading (snapshot at this point)
3105 let t2 = storage.begin_transaction().unwrap();
3106
3107 // T3: Update the value
3108 let t3 = storage.begin_transaction().unwrap();
3109 storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3110 storage.commit(t3).unwrap();
3111
3112 // T2 should still see v1 (snapshot isolation)
3113 let v = storage.read(t2, b"key").unwrap();
3114 assert_eq!(v, Some(b"v1".to_vec()));
3115
3116 // New transaction should see v2
3117 let t4 = storage.begin_transaction().unwrap();
3118 let v = storage.read(t4, b"key").unwrap();
3119 assert_eq!(v, Some(b"v2".to_vec()));
3120
3121 storage.abort(t2).unwrap();
3122 storage.abort(t4).unwrap();
3123 }
3124
3125 #[test]
3126 fn test_abort_transaction() {
3127 let dir = tempdir().unwrap();
3128 let storage = DurableStorage::open(dir.path()).unwrap();
3129
3130 // Write initial value
3131 let t1 = storage.begin_transaction().unwrap();
3132 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3133 storage.commit(t1).unwrap();
3134
3135 // Start transaction that will abort
3136 let t2 = storage.begin_transaction().unwrap();
3137 storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3138 storage.abort(t2).unwrap();
3139
3140 // New transaction should see v1 (aborted changes not visible)
3141 let t3 = storage.begin_transaction().unwrap();
3142 let v = storage.read(t3, b"key").unwrap();
3143 assert_eq!(v, Some(b"v1".to_vec()));
3144 storage.abort(t3).unwrap();
3145 }
3146
3147 #[test]
3148 fn test_crash_recovery() {
3149 let dir = tempdir().unwrap();
3150
3151 // Phase 1: Write data and commit
3152 {
3153 // Use open_without_lock for crash simulation tests
3154 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3155
3156 // Set sync mode to FULL to ensure data is synced before "crash"
3157 storage.set_sync_mode(2); // FULL: sync every commit
3158
3159 let txn = storage.begin_transaction().unwrap();
3160 storage
3161 .write(txn, b"persist".to_vec(), b"data".to_vec())
3162 .unwrap();
3163 storage.commit(txn).unwrap();
3164
3165 // Simulate crash (no clean shutdown)
3166 std::mem::forget(storage);
3167 }
3168
3169 // Phase 2: Reopen and recover
3170 {
3171 let storage = DurableStorage::open_without_lock(dir.path()).unwrap();
3172 let stats = storage.recover().unwrap();
3173 assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3174
3175 // Data should be recovered
3176 let txn = storage.begin_transaction().unwrap();
3177 let v = storage.read(txn, b"persist").unwrap();
3178 assert_eq!(v, Some(b"data".to_vec()));
3179 storage.abort(txn).unwrap();
3180 }
3181 }
3182
3183 #[test]
3184 fn test_scan_prefix() {
3185 let dir = tempdir().unwrap();
3186 let storage = DurableStorage::open(dir.path()).unwrap();
3187
3188 let txn = storage.begin_transaction().unwrap();
3189 storage
3190 .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3191 .unwrap();
3192 storage
3193 .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3194 .unwrap();
3195 storage
3196 .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3197 .unwrap();
3198 storage.commit(txn).unwrap();
3199
3200 let txn2 = storage.begin_transaction().unwrap();
3201 let users = storage.scan(txn2, b"user:").unwrap();
3202 assert_eq!(users.len(), 2);
3203 storage.abort(txn2).unwrap();
3204 }
3205
3206 #[test]
3207 fn test_delete() {
3208 let dir = tempdir().unwrap();
3209 let storage = DurableStorage::open(dir.path()).unwrap();
3210
3211 // Insert
3212 let t1 = storage.begin_transaction().unwrap();
3213 storage
3214 .write(t1, b"key".to_vec(), b"value".to_vec())
3215 .unwrap();
3216 storage.commit(t1).unwrap();
3217
3218 // Verify exists
3219 let t2 = storage.begin_transaction().unwrap();
3220 assert!(storage.read(t2, b"key").unwrap().is_some());
3221 storage.abort(t2).unwrap();
3222
3223 // Delete
3224 let t3 = storage.begin_transaction().unwrap();
3225 storage.delete(t3, b"key".to_vec()).unwrap();
3226 storage.commit(t3).unwrap();
3227
3228 // Verify deleted
3229 let t4 = storage.begin_transaction().unwrap();
3230 assert!(storage.read(t4, b"key").unwrap().is_none());
3231 storage.abort(t4).unwrap();
3232 }
3233
3234 #[test]
3235 fn test_gc() {
3236 let dir = tempdir().unwrap();
3237 let storage = DurableStorage::open(dir.path()).unwrap();
3238
3239 // Create multiple versions
3240 for i in 0..10 {
3241 let txn = storage.begin_transaction().unwrap();
3242 storage
3243 .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3244 .unwrap();
3245 storage.commit(txn).unwrap();
3246 }
3247
3248 // GC should reclaim old versions
3249 let gc_count = storage.gc();
3250 // At least some versions should be collected
3251 // (exact count depends on implementation)
3252 let _ = gc_count; // gc_count is usize, always >= 0
3253 }
3254
3255 #[test]
3256 fn test_group_commit() {
3257 use std::sync::Arc;
3258 use std::thread;
3259
3260 let dir = tempdir().unwrap();
3261 let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3262
3263 // Spawn multiple threads to commit concurrently
3264 let mut handles = vec![];
3265 for i in 0..4 {
3266 let storage = Arc::clone(&storage);
3267 handles.push(thread::spawn(move || {
3268 let txn = storage.begin_transaction().unwrap();
3269 storage
3270 .write(
3271 txn,
3272 format!("key{}", i).into_bytes(),
3273 format!("val{}", i).into_bytes(),
3274 )
3275 .unwrap();
3276 storage.commit(txn).unwrap()
3277 }));
3278 }
3279
3280 // Wait for all commits
3281 let mut commit_times = vec![];
3282 for h in handles {
3283 commit_times.push(h.join().unwrap());
3284 }
3285
3286 // All commits should succeed
3287 assert!(commit_times.iter().all(|&ts| ts > 0));
3288
3289 // Verify data persisted
3290 let txn = storage.begin_transaction().unwrap();
3291 for i in 0..4 {
3292 let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3293 assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3294 }
3295 storage.abort(txn).unwrap();
3296 }
3297
3298 // ==================== ArenaMvccMemTable Tests ====================
3299
3300 #[test]
3301 fn test_arena_memtable_basic_write_read() {
3302 let memtable = ArenaMvccMemTable::new();
3303
3304 // Write some values
3305 memtable
3306 .write(b"key1", Some(b"value1".to_vec()), 1)
3307 .unwrap();
3308 memtable
3309 .write(b"key2", Some(b"value2".to_vec()), 1)
3310 .unwrap();
3311
3312 // Read them back (uncommitted, so need txn_id match)
3313 assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3314 assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3315 assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3316 }
3317
3318 #[test]
3319 fn test_arena_memtable_update() {
3320 let memtable = ArenaMvccMemTable::new();
3321
3322 memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3323 memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3324
3325 assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3326 }
3327
3328 #[test]
3329 fn test_arena_memtable_delete() {
3330 let memtable = ArenaMvccMemTable::new();
3331
3332 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3333 memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3334
3335 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3336 }
3337
3338 #[test]
3339 fn test_arena_memtable_scan_prefix() {
3340 let memtable = ArenaMvccMemTable::new();
3341
3342 memtable
3343 .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3344 .unwrap();
3345 memtable
3346 .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3347 .unwrap();
3348 memtable
3349 .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3350 .unwrap();
3351 memtable
3352 .write(b"order:1", Some(b"order_data".to_vec()), 1)
3353 .unwrap();
3354
3355 // Create a write set and commit
3356 let mut write_set = HashSet::new();
3357 write_set.insert(InlineKey::from_slice(b"user:1:name"));
3358 write_set.insert(InlineKey::from_slice(b"user:1:email"));
3359 write_set.insert(InlineKey::from_slice(b"user:2:name"));
3360 write_set.insert(InlineKey::from_slice(b"order:1"));
3361 memtable.commit(1, 10, &write_set);
3362
3363 // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3364 let results = memtable.scan_prefix(b"user:1:", 11, None);
3365 assert_eq!(results.len(), 2);
3366
3367 // Scan for all users
3368 let results = memtable.scan_prefix(b"user:", 11, None);
3369 assert_eq!(results.len(), 3);
3370 }
3371
3372 #[test]
3373 fn test_arena_memtable_write_batch() {
3374 let memtable = ArenaMvccMemTable::new();
3375
3376 let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3377 (b"k1", Some(b"v1".to_vec())),
3378 (b"k2", Some(b"v2".to_vec())),
3379 (b"k3", Some(b"v3".to_vec())),
3380 ];
3381
3382 memtable.write_batch(&writes, 1).unwrap();
3383
3384 assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3385 assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3386 assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3387 }
3388
3389 #[test]
3390 fn test_arena_memtable_gc() {
3391 let memtable = ArenaMvccMemTable::new();
3392
3393 // Write multiple versions
3394 for i in 0..10 {
3395 memtable
3396 .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3397 .unwrap();
3398
3399 let mut write_set = HashSet::new();
3400 write_set.insert(InlineKey::from_slice(b"key"));
3401 memtable.commit(i + 1, (i + 1) * 10, &write_set);
3402 }
3403
3404 // GC old versions
3405 let gc_count = memtable.gc(90);
3406 let _ = gc_count; // gc_count is usize, always >= 0
3407 }
3408
3409 #[test]
3410 fn test_arena_memtable_size_tracking() {
3411 let memtable = ArenaMvccMemTable::new();
3412
3413 assert_eq!(memtable.size(), 0);
3414
3415 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3416
3417 assert!(memtable.size() > 0);
3418 }
3419
3420 #[test]
3421 fn test_arena_memtable_abort() {
3422 let memtable = ArenaMvccMemTable::new();
3423
3424 memtable
3425 .write(b"key", Some(b"uncommitted".to_vec()), 1)
3426 .unwrap();
3427
3428 // Visible to same txn
3429 assert_eq!(
3430 memtable.read(b"key", 0, Some(1)),
3431 Some(b"uncommitted".to_vec())
3432 );
3433
3434 // Not visible to other txns
3435 assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3436
3437 // Abort
3438 memtable.abort(1);
3439
3440 // No longer visible
3441 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3442 }
3443
3444 // ========================================================================
3445 // MemTableKind Tests - Unified Abstraction
3446 // ========================================================================
3447
3448 #[test]
3449 fn test_memtable_kind_standard() {
3450 let memtable = MemTableKind::new(MemTableType::Standard, true);
3451 assert_eq!(memtable.kind(), MemTableType::Standard);
3452
3453 // Write and read
3454 memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3455
3456 // Commit transaction at ts=100
3457 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3458 memtable.commit(1, 100, &write_set);
3459
3460 // Read after commit - snapshot_ts must be > commit_ts for visibility
3461 let v = memtable.read(b"key1", 101, None);
3462 assert_eq!(v, Some(b"value1".to_vec()));
3463 }
3464
3465 #[test]
3466 fn test_memtable_kind_arena() {
3467 let memtable = MemTableKind::new(MemTableType::Arena, true);
3468 assert_eq!(memtable.kind(), MemTableType::Arena);
3469
3470 // Write and read
3471 memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3472
3473 // Commit at ts=100
3474 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3475 memtable.commit(1, 100, &write_set);
3476
3477 // Read after commit - snapshot_ts > commit_ts
3478 let v = memtable.read(b"key1", 101, None);
3479 assert_eq!(v, Some(b"value1".to_vec()));
3480 }
3481
3482 #[test]
3483 fn test_memtable_kind_scan_range() {
3484 // Test both implementations have consistent behavior
3485 for kind in [MemTableType::Standard, MemTableType::Arena] {
3486 let memtable = MemTableKind::new(kind, true);
3487
3488 // Write some data
3489 for i in 0..5 {
3490 let key = format!("key{}", i);
3491 let value = format!("value{}", i);
3492 memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
3493 }
3494
3495 // Commit all at ts=100
3496 let write_set: HashSet<InlineKey> = (0..5)
3497 .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3498 .collect();
3499 memtable.commit(1, 100, &write_set);
3500
3501 // Scan range with snapshot_ts > commit_ts
3502 let results = memtable.scan_range(b"key1", b"key4", 101, None);
3503 assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
3504 }
3505 }
3506
3507 #[test]
3508 fn test_durable_storage_arena() {
3509 let dir = tempdir().unwrap();
3510 let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3511
3512 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3513
3514 // Basic transaction should work the same
3515 let txn_id = storage.begin_transaction().unwrap();
3516 storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
3517 storage.commit(txn_id).unwrap();
3518
3519 let txn2 = storage.begin_transaction().unwrap();
3520 let v = storage.read(txn2, b"key1").unwrap();
3521 assert_eq!(v, Some(b"value1".to_vec()));
3522 storage.abort(txn2).unwrap();
3523 }
3524
3525 #[test]
3526 fn test_durable_storage_full_config() {
3527 let dir = tempdir().unwrap();
3528
3529 // Test with Arena and ordered index enabled
3530 let storage = DurableStorage::open_with_full_config(
3531 dir.path(),
3532 true,
3533 MemTableType::Arena,
3534 ).unwrap();
3535
3536 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3537
3538 // Write multiple keys
3539 let txn = storage.begin_transaction().unwrap();
3540 for i in 0..10 {
3541 let key = format!("key{:02}", i);
3542 let value = format!("value{}", i);
3543 storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
3544 }
3545 storage.commit(txn).unwrap();
3546
3547 // Scan should work (uses scan method for prefix)
3548 let txn2 = storage.begin_transaction().unwrap();
3549 let results = storage.scan(txn2, b"key0").unwrap();
3550 assert_eq!(results.len(), 10); // key00 through key09
3551 storage.abort(txn2).unwrap();
3552 }
3553}