sochdb_storage/durable_storage.rs
1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Durable Storage Layer
16//!
17//! This module wires together all storage components into a production-ready
18//! durable storage engine:
19//!
20//! - WAL (txn_wal.rs) for durability
21//! - Group Commit for throughput
22//! - MVCC for isolation
23//! - LSCS for columnar efficiency
24//!
25//! ## Architecture
26//!
27//! ```text
28//! ┌─────────────────────────────────────────────────────────────────┐
29//! │ DurableStorage │
30//! ├─────────────────────────────────────────────────────────────────┤
31//! │ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
32//! │ │ MvccManager │ │ GroupCommit │───▶│ TxnWal (fsync) │ │
33//! │ │ │ │ │ └─────────────────────┘ │
34//! │ │ ┌─────────┐ │ └─────────────┘ │
35//! │ │ │Snapshots│ │ │
36//! │ │ └─────────┘ │ ┌─────────────────────────────────────────┐│
37//! │ │ ┌─────────┐ │ │ MemTable ││
38//! │ │ │ Txn Map │ │ │ (key → (value, txn_id, version)) ││
39//! │ │ └─────────┘ │ └─────────────────────────────────────────┘│
40//! │ └─────────────┘ │
41//! │ ┌─────────────────────────────────────────┐│
42//! │ │ LSCS (SST) ││
43//! │ │ Immutable columnar segments ││
44//! │ └─────────────────────────────────────────┘│
45//! └─────────────────────────────────────────────────────────────────┘
46//! ```
47//!
48//! ## Concurrency
49//!
50//! - Writers: Serialize through WAL, use MVCC for conflict detection
51//! - Readers: Lock-free reads at snapshot timestamp
52//! - Commits: Batched through GroupCommit for throughput
53
54use std::collections::HashSet;
55use std::path::{Path, PathBuf};
56use std::sync::Arc;
57use std::sync::atomic::{AtomicU64, Ordering};
58
59use dashmap::DashMap;
60use smallvec::SmallVec;
61
62use crossbeam_skiplist::SkipMap;
63
64use crate::deferred_index::{DeferredSortedIndex, DeferredIndexConfig};
65use crate::group_commit::EventDrivenGroupCommit;
66use crate::txn_wal::{TxnWal, TxnWalBuffer, TxnWalEntry};
67use sochdb_core::{Result, SochDBError};
68
69// =============================================================================
70// SSI Bloom Filter - Fast Conflict Pre-Filtering
71// =============================================================================
72
73/// Space-efficient Bloom filter for SSI conflict detection
74///
75/// Used to quickly determine if two transactions MIGHT have conflicting keys.
76/// False positives are acceptable (leads to unnecessary exact checks),
77/// but false negatives are not allowed.
78///
79/// ## Configuration
80///
81/// For 1000 keys with 1% false positive rate:
82/// - m = ~9600 bits ≈ 1.2 KB per transaction
83/// - k = 7 hash functions
84///
85/// ## Lazy Initialization
86///
87/// The bit vector is lazily initialized on first insert to avoid
88/// allocation overhead for read-only transactions.
89#[derive(Clone, Debug)]
90pub struct SsiBloomFilter {
91 /// Bit vector (each u64 holds 64 bits) - lazily initialized
92 bits: Option<Vec<u64>>,
93 /// Expected capacity (used for lazy init sizing)
94 expected_capacity: usize,
95 /// Number of hash functions to use
96 num_hashes: u32,
97}
98
99impl SsiBloomFilter {
100 /// Optimal number of bits per item for 1% false positive rate
101 /// m/n = -ln(p) / (ln(2))² ≈ 9.6 for p = 0.01
102 const BITS_PER_ITEM: f64 = 9.6;
103
104 /// Optimal number of hash functions for 1% false positive rate
105 /// k = (m/n) × ln(2) ≈ 7
106 const DEFAULT_NUM_HASHES: u32 = 7;
107
108 /// Minimum capacity to avoid tiny filters
109 const MIN_CAPACITY: usize = 64;
110
111 /// Create a new bloom filter for expected item count (lazy allocation)
112 ///
113 /// Configured for ~1% false positive rate.
114 /// The bit vector is not allocated until first insert.
115 #[inline]
116 pub fn new(expected_items: usize) -> Self {
117 Self {
118 bits: None,
119 expected_capacity: expected_items.max(Self::MIN_CAPACITY),
120 num_hashes: Self::DEFAULT_NUM_HASHES,
121 }
122 }
123
124 /// Create with specific capacity in words (for memory-constrained scenarios)
125 pub fn with_word_capacity(words: usize) -> Self {
126 Self {
127 bits: None,
128 expected_capacity: words.max(1) * 64 / 10, // Approx items from words
129 num_hashes: Self::DEFAULT_NUM_HASHES,
130 }
131 }
132
133 /// Ensure bits are allocated (lazy initialization)
134 #[inline]
135 fn ensure_allocated(&mut self) {
136 if self.bits.is_none() {
137 let num_bits = ((self.expected_capacity as f64) * Self::BITS_PER_ITEM).ceil() as usize;
138 let num_words = num_bits.div_ceil(64);
139 self.bits = Some(vec![0u64; num_words]);
140 }
141 }
142
143 /// Add a key to the filter - O(k) where k = num_hashes
144 #[inline]
145 pub fn insert(&mut self, key: &[u8]) {
146 self.ensure_allocated();
147 let bits = self.bits.as_mut().unwrap();
148 let num_bits = bits.len() * 64;
149 if num_bits == 0 {
150 return;
151 }
152
153 // Use two hash functions to simulate k hash functions
154 // h(i) = h1 + i * h2 (double hashing technique)
155 let h1 = Self::hash1(key);
156 let h2 = Self::hash2(key);
157
158 for i in 0..self.num_hashes {
159 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
160 let bit_idx = (h as usize) % num_bits;
161 let word_idx = bit_idx / 64;
162 let bit_pos = bit_idx % 64;
163 bits[word_idx] |= 1 << bit_pos;
164 }
165 }
166
167 /// Check if a key might be present - O(k)
168 ///
169 /// Returns:
170 /// - false: Key is definitely NOT in the set (or filter not initialized)
171 /// - true: Key MIGHT be in the set (needs exact check)
172 #[inline]
173 pub fn may_contain(&self, key: &[u8]) -> bool {
174 let bits = match &self.bits {
175 Some(b) => b,
176 None => return false, // Uninitialized = empty
177 };
178 let num_bits = bits.len() * 64;
179 if num_bits == 0 {
180 return false;
181 }
182
183 let h1 = Self::hash1(key);
184 let h2 = Self::hash2(key);
185
186 for i in 0..self.num_hashes {
187 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
188 let bit_idx = (h as usize) % num_bits;
189 let word_idx = bit_idx / 64;
190 let bit_pos = bit_idx % 64;
191 if bits[word_idx] & (1 << bit_pos) == 0 {
192 return false; // Definitely not present
193 }
194 }
195 true // Might be present
196 }
197
198 /// Check if this filter might intersect with another
199 ///
200 /// Fast O(m/64) check using bitwise AND of all words.
201 /// If no bits are shared, sets are definitely disjoint.
202 #[inline]
203 pub fn may_intersect(&self, other: &SsiBloomFilter) -> bool {
204 let (self_bits, other_bits) = match (&self.bits, &other.bits) {
205 (Some(s), Some(o)) => (s, o),
206 _ => return false, // Either uninitialized = no intersection
207 };
208 let min_len = self_bits.len().min(other_bits.len());
209 for i in 0..min_len {
210 if self_bits[i] & other_bits[i] != 0 {
211 return true; // Might intersect
212 }
213 }
214 false // Definitely disjoint
215 }
216
217 /// First hash function (using built-in hasher)
218 #[inline]
219 fn hash1(key: &[u8]) -> u64 {
220 use std::collections::hash_map::DefaultHasher;
221 use std::hash::{Hash, Hasher};
222 let mut hasher = DefaultHasher::new();
223 key.hash(&mut hasher);
224 hasher.finish()
225 }
226
227 /// Second hash function (using twox-hash for independence)
228 #[inline]
229 fn hash2(key: &[u8]) -> u64 {
230 twox_hash::xxh3::hash64(key)
231 }
232
233 /// Get the memory size in bytes
234 pub fn size_bytes(&self) -> usize {
235 self.bits.as_ref().map(|b| b.len() * 8).unwrap_or(0) + std::mem::size_of::<Self>()
236 }
237
238 /// Check if the filter is empty
239 pub fn is_empty(&self) -> bool {
240 match &self.bits {
241 Some(bits) => bits.iter().all(|&w| w == 0),
242 None => true,
243 }
244 }
245}
246
247/// Type alias for inline key storage - keys up to 32 bytes stored on stack
248/// This eliminates heap allocation for typical keys like "users/12345" (12 bytes)
249pub type InlineKey = SmallVec<[u8; 32]>;
250
251/// Version of a key-value pair
252#[derive(Debug, Clone)]
253pub struct Version {
254 /// The value (None = tombstone)
255 pub value: Option<Vec<u8>>,
256 /// Transaction that created this version
257 pub txn_id: u64,
258 /// Commit timestamp (0 = uncommitted)
259 pub commit_ts: u64,
260}
261
262// ============================================================================
263// Optimized VersionChain with Binary Search (Task 1: mm.md)
264// ============================================================================
265
266/// Multi-version data for a single key with O(log v) read complexity
267///
268/// ## Optimization: Binary Search with Sorted Commit Ordering
269///
270/// Separates committed versions (sorted descending by commit_ts) from
271/// uncommitted version (single optional slot per transaction).
272///
273/// **Before:** O(v) linear scan + O(v) max computation = O(v)
274/// **After:** O(1) uncommitted check + O(log v) binary search = O(log v)
275///
276/// For v=10 versions: 3.3x speedup
277/// For v=100 versions: 7x speedup
278#[derive(Debug, Default)]
279pub struct VersionChain {
280 /// Committed versions sorted by commit_ts DESCENDING (newest first)
281 /// This ordering enables efficient binary search using partition_point
282 committed: Vec<Version>,
283 /// Single uncommitted version slot (at most one per transaction writing this key)
284 uncommitted: Option<Version>,
285}
286
287impl VersionChain {
288 /// Create a new empty version chain
289 #[inline]
290 pub fn new() -> Self {
291 Self {
292 committed: Vec::new(),
293 uncommitted: None,
294 }
295 }
296
297 /// Add a new uncommitted version
298 /// If there's already an uncommitted version from this txn, update it in place
299 ///
300 /// O(1) - just updates the uncommitted slot
301 #[inline]
302 pub fn add_uncommitted(&mut self, value: Option<Vec<u8>>, txn_id: u64) {
303 match &mut self.uncommitted {
304 Some(v) if v.txn_id == txn_id => {
305 // Update in place - O(1)
306 v.value = value;
307 }
308 Some(_) => {
309 // Different transaction - this is a write conflict!
310 // The caller should have checked has_write_conflict first
311 // For safety, we overwrite (this will be caught at commit)
312 self.uncommitted = Some(Version {
313 value,
314 txn_id,
315 commit_ts: 0,
316 });
317 }
318 None => {
319 // New uncommitted version - O(1)
320 self.uncommitted = Some(Version {
321 value,
322 txn_id,
323 commit_ts: 0,
324 });
325 }
326 }
327 }
328
329 /// Commit a version - moves from uncommitted slot to sorted committed list
330 ///
331 /// O(log v) - inserts into sorted position using binary search
332 pub fn commit(&mut self, txn_id: u64, commit_ts: u64) -> bool {
333 if let Some(ref mut v) = self.uncommitted {
334 if v.txn_id == txn_id && v.commit_ts == 0 {
335 v.commit_ts = commit_ts;
336 let committed_version = self.uncommitted.take().unwrap();
337
338 // Insert into sorted position (descending by commit_ts)
339 // Use partition_point to find insertion point in O(log v)
340 let insert_pos = self.committed.partition_point(|existing| existing.commit_ts > commit_ts);
341 self.committed.insert(insert_pos, committed_version);
342
343 return true;
344 }
345 }
346 false
347 }
348
349 /// Abort a version (remove uncommitted version for txn)
350 ///
351 /// O(1) - just clears the uncommitted slot if it matches
352 #[inline]
353 pub fn abort(&mut self, txn_id: u64) {
354 if let Some(ref v) = self.uncommitted {
355 if v.txn_id == txn_id {
356 self.uncommitted = None;
357 }
358 }
359 }
360
361 /// Read at a snapshot timestamp, optionally seeing own uncommitted writes
362 /// Returns the most recent committed version visible at snapshot_ts,
363 /// or an uncommitted version if it belongs to current_txn_id.
364 ///
365 /// ## Complexity: O(1) + O(log v) = O(log v)
366 ///
367 /// 1. O(1) check for uncommitted version from current transaction
368 /// 2. O(log v) binary search for most recent visible committed version
369 ///
370 /// Snapshot isolation: we see commits with commit_ts < snapshot_ts (strictly less)
371 #[inline]
372 pub fn read_at(&self, snapshot_ts: u64, current_txn_id: Option<u64>) -> Option<&Version> {
373 // O(1): Check uncommitted version from current transaction
374 if let Some(txn_id) = current_txn_id {
375 if let Some(ref v) = self.uncommitted {
376 if v.txn_id == txn_id {
377 return Some(v);
378 }
379 }
380 }
381
382 // O(log v): Binary search for first version with commit_ts < snapshot_ts
383 // Since committed is sorted descending by commit_ts, we find the first
384 // version where commit_ts < snapshot_ts (the newest visible version)
385 //
386 // partition_point returns the first index where predicate is false
387 // We want first index where commit_ts < snapshot_ts
388 let idx = self.committed.partition_point(|v| v.commit_ts >= snapshot_ts);
389
390 // The version at idx (if exists) is the newest with commit_ts < snapshot_ts
391 self.committed.get(idx)
392 }
393
394 /// Check if there's an uncommitted version by another transaction
395 ///
396 /// O(1) - just checks the uncommitted slot
397 #[inline]
398 pub fn has_write_conflict(&self, my_txn_id: u64) -> bool {
399 if let Some(ref v) = self.uncommitted {
400 return v.txn_id != my_txn_id;
401 }
402 false
403 }
404
405 /// Garbage collect old versions
406 ///
407 /// Keeps only versions that might be visible to active transactions,
408 /// plus one committed version before min_active_ts for new snapshots.
409 pub fn gc(&mut self, min_active_ts: u64) {
410 // Uncommitted version is always kept (will be committed or aborted)
411
412 if self.committed.len() <= 1 {
413 return;
414 }
415
416 // Find versions to keep:
417 // 1. All versions with commit_ts > min_active_ts (visible to active txns)
418 // 2. One version with commit_ts <= min_active_ts (newest anchor point)
419
420 // Since committed is sorted descending, find split point
421 let split_idx = self.committed.partition_point(|v| v.commit_ts > min_active_ts);
422
423 // Keep all versions before split_idx (commit_ts > min_active_ts)
424 // Plus one version at split_idx if it exists (anchor point)
425 let keep_count = if split_idx < self.committed.len() {
426 split_idx + 1 // Keep one anchor version
427 } else {
428 split_idx
429 };
430
431 self.committed.truncate(keep_count);
432 }
433
434 /// Get total version count (committed + uncommitted)
435 #[inline]
436 pub fn version_count(&self) -> usize {
437 self.committed.len() + if self.uncommitted.is_some() { 1 } else { 0 }
438 }
439
440 // Legacy compatibility: get versions vec (for tests)
441 #[cfg(test)]
442 pub fn versions(&self) -> Vec<Version> {
443 let mut result = self.committed.clone();
444 if let Some(ref v) = self.uncommitted {
445 result.push(v.clone());
446 }
447 result
448 }
449}
450
451// =============================================================================
452// Pre-sizing Constants to Avoid HashSet Resize Overhead
453// =============================================================================
454
455/// Default capacity for write_set HashSet
456/// Sized for typical OLTP transactions (10-50 keys)
457/// Avoids resize overhead that caused +11% regression
458const WRITE_SET_INITIAL_CAPACITY: usize = 32;
459
460/// Default capacity for read_set HashSet
461/// Typically larger than write_set due to read-heavy patterns
462const READ_SET_INITIAL_CAPACITY: usize = 64;
463
464/// Transaction state for MVCC
465#[derive(Debug, Clone)]
466pub struct MvccTransaction {
467 /// Transaction ID
468 pub txn_id: u64,
469 /// Snapshot timestamp (reads see commits before this)
470 pub snapshot_ts: u64,
471 /// Keys written by this transaction - uses SmallVec for inline storage
472 /// Pre-sized to WRITE_SET_INITIAL_CAPACITY to avoid resize overhead
473 pub write_set: HashSet<InlineKey>,
474 /// Keys read by this transaction (for SSI validation) - uses SmallVec for inline storage
475 /// Pre-sized to READ_SET_INITIAL_CAPACITY to avoid resize overhead
476 pub read_set: HashSet<InlineKey>,
477 /// Bloom filter for write set - fast SSI pre-filtering
478 pub write_bloom: SsiBloomFilter,
479 /// Bloom filter for read set - fast SSI pre-filtering
480 pub read_bloom: SsiBloomFilter,
481 /// Transaction state
482 pub state: TxnState,
483 /// Transaction mode for SSI optimization (Recommendation 9)
484 /// ReadOnly/WriteOnly modes skip SSI tracking for 2.6x improvement
485 pub mode: TransactionMode,
486}
487
488impl MvccTransaction {
489 /// Create a new transaction with pre-sized collections
490 ///
491 /// This avoids HashSet resize overhead during the transaction
492 /// which was causing +11% regression on write_set.insert().
493 #[inline]
494 pub fn new(txn_id: u64, snapshot_ts: u64) -> Self {
495 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadWrite)
496 }
497
498 /// Create a read-only transaction (SSI bypass - 2.6x faster)
499 ///
500 /// Read-only transactions skip all SSI tracking:
501 /// - No read_set allocation
502 /// - No read_bloom allocation
503 /// - No commit validation
504 ///
505 /// ## Performance
506 ///
507 /// For N=100 reads: 8350ns → 3230ns (2.6× improvement)
508 #[inline]
509 pub fn read_only(txn_id: u64, snapshot_ts: u64) -> Self {
510 Self::with_mode(txn_id, snapshot_ts, TransactionMode::ReadOnly)
511 }
512
513 /// Create a write-only transaction (partial SSI bypass)
514 ///
515 /// Write-only transactions skip read tracking:
516 /// - No read_set tracking
517 /// - No read_bloom inserts
518 /// - Still needs write_set for commit
519 #[inline]
520 pub fn write_only(txn_id: u64, snapshot_ts: u64) -> Self {
521 Self::with_mode(txn_id, snapshot_ts, TransactionMode::WriteOnly)
522 }
523
524 /// Create transaction with specific mode
525 #[inline]
526 pub fn with_mode(txn_id: u64, snapshot_ts: u64, mode: TransactionMode) -> Self {
527 // Optimize allocation based on mode
528 let (write_capacity, read_capacity) = match mode {
529 TransactionMode::ReadOnly => (0, 0), // No tracking needed
530 TransactionMode::WriteOnly => (WRITE_SET_INITIAL_CAPACITY, 0),
531 TransactionMode::ReadWrite => (WRITE_SET_INITIAL_CAPACITY, READ_SET_INITIAL_CAPACITY),
532 };
533 Self::with_capacity(txn_id, snapshot_ts, write_capacity, read_capacity, mode)
534 }
535
536 /// Create with custom capacities for expected workload
537 ///
538 /// Use this when you know the transaction will write many keys
539 /// to avoid resize overhead entirely.
540 #[inline]
541 pub fn with_capacity(
542 txn_id: u64,
543 snapshot_ts: u64,
544 write_capacity: usize,
545 read_capacity: usize,
546 mode: TransactionMode,
547 ) -> Self {
548 Self {
549 txn_id,
550 snapshot_ts,
551 write_set: HashSet::with_capacity(write_capacity),
552 read_set: HashSet::with_capacity(read_capacity),
553 write_bloom: SsiBloomFilter::new(write_capacity.max(1)),
554 read_bloom: SsiBloomFilter::new(read_capacity.max(1)),
555 state: TxnState::Active,
556 mode,
557 }
558 }
559
560 /// Check if this is a read-only transaction
561 #[inline]
562 pub fn is_read_only(&self) -> bool {
563 self.write_set.is_empty()
564 }
565
566 /// Check if this is a single-key write transaction
567 #[inline]
568 pub fn is_single_key_write(&self) -> bool {
569 self.write_set.len() == 1 && self.read_set.len() <= 1
570 }
571}
572
573/// Transaction state
574#[derive(Debug, Clone, Copy, PartialEq, Eq)]
575pub enum TxnState {
576 Active,
577 Committed,
578 Aborted,
579}
580
581// =============================================================================
582// Transaction Mode for SSI Bypass (Recommendation 9)
583// =============================================================================
584
585/// Transaction mode for SSI optimization
586///
587/// By classifying transactions at begin time, we can skip expensive SSI
588/// tracking for the majority of transactions:
589///
590/// | Mode | SSI Read Tracking | SSI Write Tracking | Commit Overhead |
591/// |-----------|-------------------|--------------------|-----------------|
592/// | ReadOnly | None | None | ~10 ns |
593/// | WriteOnly | None | Full | ~30 ns |
594/// | ReadWrite | Full | Full | ~50 ns |
595///
596/// ## Performance Analysis
597///
598/// For read-only transactions (typically 90% of workload):
599/// ```text
600/// Current: T_txn = T_begin + N × (T_read + T_record) + T_commit
601/// = 100ns + N × (32ns + 50ns) + 50ns = 150ns + 82ns × N
602///
603/// ReadOnly: T_txn = T_begin_ro + N × T_read + T_commit_ro
604/// = 20ns + N × 32ns + 10ns = 30ns + 32ns × N
605///
606/// For N=100 reads: 8350ns → 3230ns (2.6× faster)
607/// ```
608#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
609pub enum TransactionMode {
610 /// Read-only transaction - skips ALL SSI tracking
611 /// Cannot form rw-antidependency cycles (no writes to create outgoing edges)
612 /// Safe to skip read_set, read_bloom, and commit validation entirely
613 ReadOnly,
614
615 /// Write-only transaction - skips read tracking
616 /// Cannot form incoming rw-edges (no reads from concurrent writers)
617 /// Only needs write_set and write_bloom tracking
618 WriteOnly,
619
620 /// Full read-write transaction (default) - complete SSI tracking
621 /// May form both incoming and outgoing rw-edges
622 /// Requires full validation at commit time
623 #[default]
624 ReadWrite,
625}
626
627impl TransactionMode {
628 /// Check if this mode requires read tracking
629 #[inline]
630 pub fn tracks_reads(&self) -> bool {
631 matches!(self, TransactionMode::ReadWrite)
632 }
633
634 /// Check if this mode requires write tracking
635 #[inline]
636 pub fn tracks_writes(&self) -> bool {
637 matches!(self, TransactionMode::WriteOnly | TransactionMode::ReadWrite)
638 }
639
640 /// Check if commit needs SSI validation
641 #[inline]
642 pub fn needs_ssi_validation(&self) -> bool {
643 matches!(self, TransactionMode::ReadWrite)
644 }
645}
646
647/// SSI conflict edge type
648#[derive(Debug, Clone, Copy, PartialEq, Eq)]
649pub enum ConflictType {
650 /// Read-write conflict: T1 reads X, then T2 writes X
651 ReadWrite,
652 /// Write-read conflict: T1 writes X, then T2 reads X
653 WriteRead,
654}
655
656/// SSI conflict edge for dangerous structure detection
657#[derive(Debug, Clone)]
658pub struct ConflictEdge {
659 /// Source transaction
660 pub from_txn: u64,
661 /// Target transaction
662 pub to_txn: u64,
663 /// Type of conflict
664 pub conflict_type: ConflictType,
665}
666
667/// MVCC Manager with SSI support
668///
669/// Uses DashMap for lock-free per-transaction access.
670/// Implements Serializable Snapshot Isolation (SSI) with
671/// dangerous structure detection for rw-antidependency cycles.
672#[allow(clippy::type_complexity)]
673pub struct MvccManager {
674 /// Active transactions (sharded for concurrent access)
675 active_txns: DashMap<u64, MvccTransaction>,
676 /// Current timestamp counter
677 ts_counter: AtomicU64,
678 /// Minimum active snapshot timestamp (for GC)
679 min_active_ts: AtomicU64,
680 /// Recently committed transactions for SSI validation
681 /// Maps txn_id -> (commit_ts, read_bloom, write_bloom, read_set, write_set)
682 /// Bloom filters enable fast O(m/64) pre-filtering before O(n) exact checks
683 recent_commits: DashMap<
684 u64,
685 (
686 u64,
687 SsiBloomFilter,
688 SsiBloomFilter,
689 HashSet<InlineKey>,
690 HashSet<InlineKey>,
691 ),
692 >,
693 /// Max recent commits to track
694 max_recent_commits: usize,
695}
696
697impl Default for MvccManager {
698 fn default() -> Self {
699 Self::new()
700 }
701}
702
703impl MvccManager {
704 pub fn new() -> Self {
705 Self {
706 active_txns: DashMap::new(),
707 ts_counter: AtomicU64::new(1),
708 min_active_ts: AtomicU64::new(0),
709 recent_commits: DashMap::new(),
710 max_recent_commits: 1000, // Track last 1000 commits for SSI
711 }
712 }
713
714 /// Begin a new transaction with snapshot isolation
715 ///
716 /// Uses pre-sized HashSets to avoid resize overhead (+11% regression fix)
717 pub fn begin(&self, txn_id: u64) -> MvccTransaction {
718 self.begin_with_mode(txn_id, TransactionMode::ReadWrite)
719 }
720
721 /// Begin a read-only transaction (SSI bypass - 2.6x faster)
722 ///
723 /// Read-only transactions skip all SSI tracking, reducing
724 /// per-read overhead from ~82ns to ~32ns.
725 ///
726 /// ## Safety
727 ///
728 /// Caller must ensure no writes are performed. Attempting to
729 /// write in a read-only transaction will still succeed but
730 /// won't be tracked for SSI validation.
731 #[inline]
732 pub fn begin_read_only(&self, txn_id: u64) -> MvccTransaction {
733 self.begin_with_mode(txn_id, TransactionMode::ReadOnly)
734 }
735
736 /// Begin a write-only transaction (partial SSI bypass)
737 ///
738 /// Write-only transactions skip read tracking, reducing overhead
739 /// for insert-heavy workloads.
740 #[inline]
741 pub fn begin_write_only(&self, txn_id: u64) -> MvccTransaction {
742 self.begin_with_mode(txn_id, TransactionMode::WriteOnly)
743 }
744
745 /// Begin a transaction with specific mode
746 ///
747 /// This is the core transaction creation method that all other
748 /// begin_* methods delegate to.
749 pub fn begin_with_mode(&self, txn_id: u64, mode: TransactionMode) -> MvccTransaction {
750 let snapshot_ts = self.ts_counter.load(Ordering::SeqCst);
751
752 // Create transaction with mode-optimized allocations
753 let txn = MvccTransaction::with_mode(txn_id, snapshot_ts, mode);
754
755 self.active_txns.insert(txn_id, txn.clone());
756 self.update_min_active_ts();
757
758 txn
759 }
760
761 /// Get transaction if active (clones - use get_snapshot_ts for hot path)
762 pub fn get(&self, txn_id: u64) -> Option<MvccTransaction> {
763 self.active_txns.get(&txn_id).map(|t| t.clone())
764 }
765
766 /// Fast path: get just the snapshot timestamp without cloning
767 /// This is the hot path for reads - avoids cloning bloom filters
768 #[inline]
769 pub fn get_snapshot_ts(&self, txn_id: u64) -> Option<u64> {
770 self.active_txns.get(&txn_id).map(|t| t.snapshot_ts)
771 }
772
773 /// Record a read (for SSI) - uses inline key storage + bloom filter
774 ///
775 /// ## SSI Bypass (Recommendation 9)
776 ///
777 /// For ReadOnly mode transactions, this is a no-op (instant return).
778 /// For WriteOnly mode transactions, this is a no-op.
779 /// Only ReadWrite mode transactions track reads for SSI.
780 ///
781 /// This reduces per-read overhead from ~50ns to ~0ns for read-only txns.
782 #[inline]
783 pub fn record_read(&self, txn_id: u64, key: &[u8]) {
784 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
785 // SSI Bypass: Skip tracking for read-only and write-only modes
786 if !txn.mode.tracks_reads() {
787 return;
788 }
789
790 // Only track reads if within reasonable bounds
791 if txn.read_set.len() < 10000 {
792 txn.read_set.insert(SmallVec::from_slice(key));
793 txn.read_bloom.insert(key);
794 }
795 }
796 }
797
798 /// Record a write - uses inline key storage + bloom filter
799 ///
800 /// Note: Even ReadOnly transactions can record writes (mode is a hint).
801 /// The mode only affects SSI tracking, not write capability.
802 pub fn record_write(&self, txn_id: u64, key: &[u8]) {
803 if let Some(mut txn) = self.active_txns.get_mut(&txn_id) {
804 txn.write_set.insert(SmallVec::from_slice(key));
805 txn.write_bloom.insert(key);
806 }
807 }
808
809 /// Allocate commit timestamp
810 pub fn alloc_commit_ts(&self) -> u64 {
811 self.ts_counter.fetch_add(1, Ordering::SeqCst)
812 }
813
814 /// Commit transaction with SSI validation
815 /// Returns (commit_ts, write_set) so the memtable can be updated efficiently
816 /// Returns None if SSI validation fails (dangerous structure detected)
817 ///
818 /// ## SSI Bypass (Recommendation 9)
819 ///
820 /// For ReadOnly mode: Skip validation entirely (~10ns commit)
821 /// For WriteOnly mode: Skip read-based validation (~30ns commit)
822 /// For ReadWrite mode: Full validation (~50ns commit)
823 pub fn commit(&self, txn_id: u64) -> Option<(u64, HashSet<InlineKey>)> {
824 // Get transaction before removing
825 let txn = self.active_txns.get(&txn_id)?.clone();
826
827 // SSI Bypass: Skip validation for ReadOnly transactions
828 // ReadOnly can never form rw-antidependency cycles
829 if txn.mode != TransactionMode::ReadWrite || !self.validate_ssi(&txn) {
830 // For ReadOnly/WriteOnly: always valid (mode check short-circuits)
831 // For ReadWrite: check SSI validation result
832 if txn.mode == TransactionMode::ReadWrite && !self.validate_ssi(&txn) {
833 // Abort on SSI violation
834 self.active_txns.remove(&txn_id);
835 self.update_min_active_ts();
836 return None;
837 }
838 }
839
840 let commit_ts = self.alloc_commit_ts();
841
842 // Extract write_set and remove transaction - takes ownership
843 let (_, removed_txn) = self.active_txns.remove(&txn_id)?;
844
845 // OPTIMIZATION: Only track ReadWrite transactions for SSI
846 // ReadOnly/WriteOnly can't form complete rw-antidependency cycles
847 let needs_ssi_tracking = removed_txn.mode == TransactionMode::ReadWrite
848 && !removed_txn.read_set.is_empty()
849 && !removed_txn.write_set.is_empty();
850
851 if needs_ssi_tracking {
852 // Need to clone write_set since we return it AND track it
853 let write_set_for_return = removed_txn.write_set.clone();
854
855 self.track_commit_owned(
856 txn_id,
857 commit_ts,
858 removed_txn.read_bloom,
859 removed_txn.write_bloom,
860 removed_txn.read_set,
861 removed_txn.write_set,
862 );
863
864 self.update_min_active_ts();
865 Some((commit_ts, write_set_for_return))
866 } else {
867 // Fast path: no SSI tracking needed, avoid clone entirely
868 self.update_min_active_ts();
869 Some((commit_ts, removed_txn.write_set))
870 }
871 }
872
873 /// Validate SSI constraints for a committing transaction
874 ///
875 /// ## Transaction Classification (Task 3: Optimistic MVCC)
876 ///
877 /// Transactions are classified and routed through appropriate fast paths:
878 ///
879 /// | Class | Criteria | Validation Cost |
880 /// |------------|-------------------------------|-----------------|
881 /// | ReadOnly | write_set.is_empty() | 0 ns |
882 /// | SingleKey | write_set.len() == 1 | 0 ns |
883 /// | Disjoint | bloom filters don't intersect | ~10 ns |
884 /// | General | full SSI check | ~50 ns |
885 ///
886 /// Expected distribution: ~60% read-only, ~25% single-key, ~10% disjoint, ~5% general
887 /// Weighted average: ~8 ns vs 50 ns baseline (6x improvement)
888 ///
889 /// Detects "dangerous structures" - rw-antidependency cycles:
890 /// - T1 reads X (snapshot sees old value)
891 /// - T2 writes X (concurrent write)
892 /// - T2 reads Y (snapshot sees old value)
893 /// - T1 writes Y (concurrent write)
894 ///
895 /// If T1 → rw → T2 → rw → T1 exists, abort T1
896 #[inline]
897 fn validate_ssi(&self, txn: &MvccTransaction) -> bool {
898 // =================================================================
899 // Fast Path 1: Read-only transactions (0 ns)
900 // =================================================================
901 // Read-only transactions can never form rw-antidependency cycles
902 // because they have no writes to create outgoing rw-edges
903 if txn.write_set.is_empty() {
904 return true;
905 }
906
907 // =================================================================
908 // Fast Path 2: No recent commits to check (0 ns)
909 // =================================================================
910 if self.recent_commits.is_empty() {
911 return true;
912 }
913
914 // =================================================================
915 // Fast Path 3: Single-key write transactions (0 ns)
916 // =================================================================
917 // A single-key write transaction cannot form a dangerous cycle:
918 // - For a cycle T1 →rw→ T2 →rw→ T1, we need T1 to read what T2 wrote
919 // AND T2 to read what T1 wrote
920 // - With only one key in write_set, the same key would need to be
921 // in both read_set AND write_set of both transactions
922 // - This is already prevented by our conflict detection (write-write)
923 if txn.write_set.len() == 1 && txn.read_set.len() <= 1 {
924 return true;
925 }
926
927 let my_snapshot = txn.snapshot_ts;
928
929 // =================================================================
930 // Fast Path 4: Disjoint transactions using Bloom filters (~10 ns)
931 // =================================================================
932 // Pre-filter using bloom filters: if our write_bloom doesn't intersect
933 // with any concurrent transaction's read_bloom AND vice versa,
934 // there can be no rw-antidependency
935 let mut any_may_intersect = false;
936 for entry in self.recent_commits.iter() {
937 let (_, (other_commit_ts, other_read_bloom, other_write_bloom, _, _)) = entry.pair();
938
939 // Only check concurrent transactions
940 if *other_commit_ts <= my_snapshot {
941 continue;
942 }
943
944 // Check bloom filter intersection (O(m/64) per filter)
945 // If our writes may intersect their reads OR their writes may intersect our reads
946 if txn.write_bloom.may_intersect(other_read_bloom)
947 || other_write_bloom.may_intersect(&txn.read_bloom)
948 {
949 any_may_intersect = true;
950 break;
951 }
952 }
953
954 // No bloom intersection means definitely disjoint - no SSI conflict possible
955 if !any_may_intersect {
956 return true;
957 }
958
959 // =================================================================
960 // Full SSI Validation (~50 ns)
961 // =================================================================
962 // Check for rw-conflicts with recently committed transactions
963 // An rw-conflict exists if:
964 // - T_other wrote to a key that T_me read (T_other →rw→ T_me)
965 // - T_me wrote to a key that T_other read (T_me →rw→ T_other)
966
967 let mut in_conflict_with: Vec<u64> = Vec::new();
968 let mut out_conflict_to: Vec<u64> = Vec::new();
969
970 for entry in self.recent_commits.iter() {
971 let (
972 other_txn_id,
973 (
974 other_commit_ts,
975 _other_read_bloom,
976 other_write_bloom,
977 other_read_set,
978 other_write_set,
979 ),
980 ) = entry.pair();
981
982 // Only consider transactions that committed after our snapshot started
983 // (concurrent transactions)
984 if *other_commit_ts <= my_snapshot {
985 continue;
986 }
987
988 // Check: other wrote → we read (other →rw→ me)
989 // T_other wrote a key that T_me read (rw-dependency inbound)
990 //
991 // Bloom-accelerated: First check bloom filter for fast rejection (O(m/64))
992 // Only do expensive HashSet intersection if bloom says "maybe conflict"
993 let mut has_in_conflict = false;
994 for key in txn.read_set.iter() {
995 if other_write_bloom.may_contain(key) {
996 // Bloom says maybe - do exact check
997 if other_write_set.contains(key) {
998 has_in_conflict = true;
999 break;
1000 }
1001 }
1002 }
1003 if has_in_conflict {
1004 in_conflict_with.push(*other_txn_id);
1005 }
1006
1007 // Check: we wrote → other read (me →rw→ other)
1008 // T_me wrote a key that T_other read (rw-dependency outbound)
1009 //
1010 // Bloom-accelerated: Use our write_bloom against their read_set
1011 let mut has_out_conflict = false;
1012 for key in other_read_set.iter() {
1013 if txn.write_bloom.may_contain(key) {
1014 // Bloom says maybe - do exact check
1015 if txn.write_set.contains(key) {
1016 has_out_conflict = true;
1017 break;
1018 }
1019 }
1020 }
1021 if has_out_conflict {
1022 out_conflict_to.push(*other_txn_id);
1023 }
1024 }
1025
1026 // Dangerous structure: we have both incoming AND outgoing rw-edges
1027 // This creates a potential cycle: T1 →rw→ T_me →rw→ T2
1028 //
1029 // Conservative check: if both exist, abort
1030 // A more precise check would verify the cycle path, but this is safe
1031 if !in_conflict_with.is_empty() && !out_conflict_to.is_empty() {
1032 return false; // SSI violation - abort
1033 }
1034
1035 true
1036 }
1037
1038 /// Track a committed transaction for future SSI validation
1039 ///
1040 /// Only tracks transactions that have both reads AND writes, since SSI
1041 /// only detects rw-antidependency cycles. Pure read or pure write
1042 /// transactions can't form cycles.
1043 ///
1044 /// ## Optimization: Zero-Copy Transfer
1045 ///
1046 /// Takes ownership of sets instead of cloning to avoid the +15% commit
1047 /// phase regression. The caller should use mem::take() to transfer ownership.
1048 fn track_commit_owned(
1049 &self,
1050 txn_id: u64,
1051 commit_ts: u64,
1052 read_bloom: SsiBloomFilter,
1053 write_bloom: SsiBloomFilter,
1054 read_set: HashSet<InlineKey>,
1055 write_set: HashSet<InlineKey>,
1056 ) {
1057 // Optimization: Only track mixed read-write transactions
1058 // Pure reads can't create outgoing rw-edges
1059 // Pure writes can't create incoming rw-edges
1060 if read_set.is_empty() || write_set.is_empty() {
1061 return; // Skip tracking - can't form SSI cycle
1062 }
1063
1064 // Add to recent commits with bloom filters for fast SSI pre-filtering
1065 // No cloning needed - we take ownership
1066 self.recent_commits.insert(
1067 txn_id,
1068 (
1069 commit_ts,
1070 read_bloom,
1071 write_bloom,
1072 read_set,
1073 write_set,
1074 ),
1075 );
1076
1077 // Lazy pruning: only prune when we're significantly over capacity
1078 // Avoids pruning overhead on every commit
1079 if self.recent_commits.len() > self.max_recent_commits * 2 {
1080 // Remove entries with lowest commit_ts
1081 let min_active = self.min_active_ts.load(Ordering::Relaxed);
1082 self.recent_commits
1083 .retain(|_, (ts, _, _, _, _)| *ts >= min_active);
1084 }
1085 }
1086
1087 /// Legacy track_commit that clones - kept for compatibility
1088 #[allow(dead_code)]
1089 fn track_commit(
1090 &self,
1091 txn_id: u64,
1092 commit_ts: u64,
1093 read_bloom: SsiBloomFilter,
1094 write_bloom: SsiBloomFilter,
1095 read_set: &HashSet<InlineKey>,
1096 write_set: &HashSet<InlineKey>,
1097 ) {
1098 if read_set.is_empty() || write_set.is_empty() {
1099 return;
1100 }
1101 self.recent_commits.insert(
1102 txn_id,
1103 (
1104 commit_ts,
1105 read_bloom,
1106 write_bloom,
1107 read_set.clone(),
1108 write_set.clone(),
1109 ),
1110 );
1111 }
1112
1113 /// Abort transaction
1114 pub fn abort(&self, txn_id: u64) {
1115 self.active_txns.remove(&txn_id);
1116 self.update_min_active_ts();
1117 }
1118
1119 /// Get minimum active snapshot timestamp
1120 pub fn min_active_snapshot(&self) -> u64 {
1121 self.min_active_ts.load(Ordering::SeqCst)
1122 }
1123
1124 /// Get count of active transactions
1125 pub fn active_transaction_count(&self) -> usize {
1126 self.active_txns.len()
1127 }
1128
1129 fn update_min_active_ts(&self) {
1130 let min = self
1131 .active_txns
1132 .iter()
1133 .map(|entry| entry.value().snapshot_ts)
1134 .min()
1135 .unwrap_or_else(|| self.ts_counter.load(Ordering::SeqCst));
1136 self.min_active_ts.store(min, Ordering::SeqCst);
1137 }
1138}
1139
1140/// Epoch-based dirty list for O(expired) GC instead of O(n)
1141///
1142/// Instead of scanning ALL version chains, we track which keys have versions
1143/// created in each epoch. GC only needs to visit keys from old epochs.
1144struct EpochDirtyList {
1145 /// Ring buffer of epoch -> dirty keys
1146 /// Index = epoch % EPOCH_RING_SIZE
1147 epochs: [parking_lot::Mutex<Vec<Vec<u8>>>; 4],
1148 /// Current epoch
1149 current_epoch: AtomicU64,
1150}
1151
1152const EPOCH_RING_SIZE: usize = 4;
1153
1154impl EpochDirtyList {
1155 fn new() -> Self {
1156 Self {
1157 epochs: [
1158 parking_lot::Mutex::new(Vec::new()),
1159 parking_lot::Mutex::new(Vec::new()),
1160 parking_lot::Mutex::new(Vec::new()),
1161 parking_lot::Mutex::new(Vec::new()),
1162 ],
1163 current_epoch: AtomicU64::new(0),
1164 }
1165 }
1166
1167 /// Record a version created in the current epoch
1168 #[inline]
1169 fn record_version(&self, key: Vec<u8>) {
1170 let epoch = self.current_epoch.load(Ordering::Relaxed);
1171 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1172 self.epochs[idx].lock().push(key);
1173 }
1174
1175 /// Record multiple versions in a single lock acquisition (Rec 3: MVCC Batching)
1176 ///
1177 /// Performance: Single lock acquire vs N lock acquires for batch of N writes.
1178 /// For 100 writes: ~100x fewer mutex operations.
1179 #[inline]
1180 fn record_versions_batch(&self, keys: impl IntoIterator<Item = Vec<u8>>) {
1181 let epoch = self.current_epoch.load(Ordering::Relaxed);
1182 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1183 let mut guard = self.epochs[idx].lock();
1184 guard.extend(keys);
1185 }
1186
1187 /// Advance to next epoch, returning old epoch's dirty keys
1188 fn advance_epoch(&self) -> (u64, Vec<Vec<u8>>) {
1189 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1190 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1191
1192 // Drain the old epoch's dirty list
1193 let mut guard = self.epochs[old_idx].lock();
1194 let keys = std::mem::take(&mut *guard);
1195 (old_epoch, keys)
1196 }
1197
1198 /// Get current epoch
1199 #[allow(dead_code)]
1200 fn current(&self) -> u64 {
1201 self.current_epoch.load(Ordering::Relaxed)
1202 }
1203}
1204
1205// ============================================================================
1206// Streaming Scan Iterator
1207// ============================================================================
1208
1209/// Streaming iterator for range scans
1210///
1211/// Yields results one at a time without materializing the full result set.
1212/// This enables processing of very large result sets with O(1) memory per
1213/// iteration instead of O(N) for the entire result set.
1214struct ScanRangeIterator<'a> {
1215 memtable: &'a MvccMemTable,
1216 start: Vec<u8>,
1217 end: Vec<u8>,
1218 snapshot_ts: u64,
1219 current_txn_id: Option<u64>,
1220 use_ordered: bool,
1221 // We use Option to defer initialization
1222 ordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1223 unordered_iter: Option<Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a>>,
1224 initialized: bool,
1225}
1226
1227impl<'a> Iterator for ScanRangeIterator<'a> {
1228 type Item = (Vec<u8>, Vec<u8>);
1229
1230 fn next(&mut self) -> Option<Self::Item> {
1231 // Lazy initialization on first call
1232 if !self.initialized {
1233 self.initialized = true;
1234
1235 if self.use_ordered {
1236 // Try deferred index first (after compaction, it uses a SkipMap internally)
1237 if let Some(ref def_idx) = self.memtable.deferred_index {
1238 let start = self.start.clone();
1239 let end = self.end.clone();
1240 let snapshot_ts = self.snapshot_ts;
1241 let current_txn_id = self.current_txn_id;
1242 let data = &self.memtable.data;
1243
1244 // Collect keys from deferred index (already sorted after compact)
1245 let keys: Vec<Vec<u8>> = if end.is_empty() {
1246 def_idx.range_from(&start).collect()
1247 } else {
1248 def_idx.range(&start, &end).collect()
1249 };
1250
1251 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1252 keys.into_iter()
1253 .filter_map(move |key| {
1254 if let Some(chain) = data.get(&key)
1255 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1256 && let Some(value) = &v.value
1257 {
1258 Some((key, value.clone()))
1259 } else {
1260 None
1261 }
1262 })
1263 );
1264 self.ordered_iter = Some(iter);
1265 } else if let Some(ref idx) = self.memtable.ordered_index {
1266 let start = self.start.clone();
1267 let end = self.end.clone();
1268 let snapshot_ts = self.snapshot_ts;
1269 let current_txn_id = self.current_txn_id;
1270 let data = &self.memtable.data;
1271
1272 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = if end.is_empty() {
1273 Box::new(
1274 idx.range(start..)
1275 .filter_map(move |entry| {
1276 let key = entry.key();
1277 if let Some(chain) = data.get(key)
1278 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1279 && let Some(value) = &v.value
1280 {
1281 Some((key.clone(), value.clone()))
1282 } else {
1283 None
1284 }
1285 })
1286 )
1287 } else {
1288 Box::new(
1289 idx.range(start..end)
1290 .filter_map(move |entry| {
1291 let key = entry.key();
1292 if let Some(chain) = data.get(key)
1293 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1294 && let Some(value) = &v.value
1295 {
1296 Some((key.clone(), value.clone()))
1297 } else {
1298 None
1299 }
1300 })
1301 )
1302 };
1303 self.ordered_iter = Some(iter);
1304 }
1305 } else {
1306 // Unordered full scan
1307 let start = self.start.clone();
1308 let end = self.end.clone();
1309 let snapshot_ts = self.snapshot_ts;
1310 let current_txn_id = self.current_txn_id;
1311
1312 let iter: Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> = Box::new(
1313 self.memtable.data.iter()
1314 .filter_map(move |entry| {
1315 let key = entry.key();
1316
1317 if key.as_slice() < start.as_slice() {
1318 return None;
1319 }
1320 if !end.is_empty() && key.as_slice() >= end.as_slice() {
1321 return None;
1322 }
1323
1324 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1325 && let Some(value) = &v.value
1326 {
1327 Some((key.clone(), value.clone()))
1328 } else {
1329 None
1330 }
1331 })
1332 );
1333 self.unordered_iter = Some(iter);
1334 }
1335 }
1336
1337 // Get next from appropriate iterator
1338 if let Some(ref mut iter) = self.ordered_iter {
1339 iter.next()
1340 } else if let Some(ref mut iter) = self.unordered_iter {
1341 iter.next()
1342 } else {
1343 None
1344 }
1345 }
1346}
1347
1348/// MemTable with MVCC support
1349///
1350/// Uses DashMap for lock-free concurrent access per key.
1351/// This eliminates the global write lock bottleneck.
1352///
1353/// Uses epoch-based dirty tracking for O(expired) GC instead of O(n) full scan.
1354/// Maintains a deferred sorted index for efficient scans:
1355/// - Writes: O(1) append to hot buffer
1356/// - Scans: O(N log N) sort-on-demand (amortized across many writes)
1357pub struct MvccMemTable {
1358 /// Key -> VersionChain (sharded for concurrent access)
1359 data: DashMap<Vec<u8>, VersionChain>,
1360 /// Deferred sorted index for efficient prefix/range scans (optional)
1361 /// O(1) insert to hot buffer, O(N log N) sort on first scan
1362 /// When None, scan_prefix will fall back to O(N) DashMap iteration
1363 deferred_index: Option<DeferredSortedIndex>,
1364 /// Legacy SkipMap for compatibility (used when deferred=false)
1365 ordered_index: Option<SkipMap<Vec<u8>, ()>>,
1366 /// Whether to use deferred sorting (true) or immediate SkipMap (false)
1367 #[allow(dead_code)]
1368 use_deferred: bool,
1369 /// Approximate size in bytes
1370 size_bytes: AtomicU64,
1371 /// Epoch-based dirty list for efficient GC
1372 dirty_list: EpochDirtyList,
1373}
1374
1375impl Default for MvccMemTable {
1376 fn default() -> Self {
1377 Self::new()
1378 }
1379}
1380
1381impl MvccMemTable {
1382 pub fn new() -> Self {
1383 Self::with_ordered_index(true)
1384 }
1385
1386 /// Create memtable with optional ordered index
1387 ///
1388 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
1389 /// but scan_prefix becomes O(N) instead of O(log N + K)
1390 ///
1391 /// Uses deferred sorting by default for better write performance:
1392 /// - Writes: O(1) append to hot buffer
1393 /// - Scans: O(N log N) sort-on-demand
1394 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1395 Self::with_index_mode(enable_ordered_index, true)
1396 }
1397
1398 /// Create memtable with fine-grained control over indexing
1399 ///
1400 /// # Arguments
1401 /// * `enable_ordered_index` - Whether to maintain an ordered index
1402 /// * `use_deferred` - If true, use deferred sorting (O(1) writes, sort-on-scan)
1403 /// If false, use SkipMap (O(log N) writes)
1404 pub fn with_index_mode(enable_ordered_index: bool, use_deferred: bool) -> Self {
1405 Self {
1406 data: DashMap::new(),
1407 deferred_index: if enable_ordered_index && use_deferred {
1408 Some(DeferredSortedIndex::with_config(DeferredIndexConfig {
1409 max_unsorted_entries: 10_000, // Compact every 10K writes
1410 enabled: true,
1411 }))
1412 } else {
1413 None
1414 },
1415 ordered_index: if enable_ordered_index && !use_deferred {
1416 Some(SkipMap::new())
1417 } else {
1418 None
1419 },
1420 use_deferred,
1421 size_bytes: AtomicU64::new(0),
1422 dirty_list: EpochDirtyList::new(),
1423 }
1424 }
1425
1426 /// Write a key-value pair (uncommitted)
1427 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1428 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1429 let key_len = key.len();
1430
1431 // Track this key in the current epoch's dirty list for GC
1432 self.dirty_list.record_version(key.clone());
1433
1434 // Insert into ordered index for prefix scans (if enabled)
1435 // Deferred: O(1) append to hot buffer
1436 // SkipMap: O(log N) insert
1437 if let Some(ref idx) = self.deferred_index {
1438 idx.insert(key.clone());
1439 } else if let Some(ref idx) = self.ordered_index {
1440 idx.insert(key.clone(), ());
1441 }
1442
1443 // Use entry API for atomic get-or-insert
1444 let mut entry = self.data.entry(key).or_default();
1445
1446 // Check for write-write conflict
1447 if entry.has_write_conflict(txn_id) {
1448 return Err(SochDBError::Internal(
1449 "Write-write conflict detected".into(),
1450 ));
1451 }
1452 entry.add_uncommitted(value, txn_id);
1453 self.size_bytes
1454 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1455
1456 Ok(())
1457 }
1458
1459 /// Write multiple key-value pairs (uncommitted) - more efficient than individual writes
1460 ///
1461 /// Optimizations applied (Rec 3: MVCC Batching):
1462 /// - Batched dirty list tracking: single lock acquire for all keys
1463 /// - Deferred index: O(1) append per key
1464 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1465 let mut total_size = 0u64;
1466
1467 // Rec 3: Batch MVCC tracking - single lock acquire for all keys
1468 self.dirty_list.record_versions_batch(writes.iter().map(|(k, _)| k.clone()));
1469
1470 for (key, value) in writes {
1471 // Insert into ordered index (if enabled)
1472 // Deferred: O(1) append, SkipMap: O(log N)
1473 if let Some(ref idx) = self.deferred_index {
1474 idx.insert(key.clone());
1475 } else if let Some(ref idx) = self.ordered_index {
1476 idx.insert(key.clone(), ());
1477 }
1478
1479 let mut entry = self.data.entry(key.clone()).or_default();
1480
1481 if entry.has_write_conflict(txn_id) {
1482 return Err(SochDBError::Internal(
1483 "Write-write conflict detected".into(),
1484 ));
1485 }
1486
1487 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1488 entry.add_uncommitted(value.clone(), txn_id);
1489 total_size += (key.len() + value_size) as u64;
1490 }
1491
1492 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1493 Ok(())
1494 }
1495
1496 /// Read at snapshot timestamp, with optional current txn to see own writes
1497 pub fn read(
1498 &self,
1499 key: &[u8],
1500 snapshot_ts: u64,
1501 current_txn_id: Option<u64>,
1502 ) -> Option<Vec<u8>> {
1503 self.data.get(key).and_then(|chain| {
1504 chain
1505 .read_at(snapshot_ts, current_txn_id)
1506 .and_then(|v| v.value.clone())
1507 })
1508 }
1509
1510 /// Commit all versions for a transaction
1511 ///
1512 /// Only updates the keys that were written by this transaction (tracked in write_set).
1513 /// Accepts InlineKey for zero-allocation MVCC tracking.
1514 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
1515 // Only iterate over keys we know were written - O(k) instead of O(n)
1516 for key in write_set {
1517 if let Some(mut chain) = self.data.get_mut(key.as_slice()) {
1518 chain.commit(txn_id, commit_ts);
1519 }
1520 }
1521 }
1522
1523 /// Legacy commit method (iterates all keys) - kept for backward compatibility
1524 #[allow(dead_code)]
1525 pub fn commit_all(&self, txn_id: u64, commit_ts: u64) {
1526 for mut entry in self.data.iter_mut() {
1527 entry.value_mut().commit(txn_id, commit_ts);
1528 }
1529 }
1530
1531 /// Abort all versions for a transaction
1532 pub fn abort(&self, txn_id: u64) {
1533 for mut entry in self.data.iter_mut() {
1534 entry.value_mut().abort(txn_id);
1535 }
1536 }
1537
1538 /// Scan keys with prefix at snapshot (without seeing uncommitted from other txns)
1539 ///
1540 /// ## Performance
1541 ///
1542 /// When ordered_index is enabled: O(log N + K) complexity
1543 /// - O(log N) to seek to the first key with prefix
1544 /// - O(K) to iterate matching keys
1545 ///
1546 /// When ordered_index is disabled: O(N) full DashMap scan (fallback)
1547 ///
1548 /// ## Optimizations Applied
1549 ///
1550 /// - Pre-allocates result vector based on expected output size
1551 /// - Uses batch-friendly iteration patterns
1552 /// - Minimizes allocations during iteration
1553 /// - Deferred index: compacts hot buffer on first scan for sorted iteration
1554 pub fn scan_prefix(
1555 &self,
1556 prefix: &[u8],
1557 snapshot_ts: u64,
1558 current_txn_id: Option<u64>,
1559 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1560 // Estimate result size for pre-allocation (use 10% of total as heuristic)
1561 let estimated_size = (self.data.len() / 10).max(64);
1562 let mut results = Vec::with_capacity(estimated_size);
1563
1564 if let Some(ref idx) = self.deferred_index {
1565 // Deferred index path: sort-on-scan (compacts hot buffer if needed)
1566 for key in idx.range_from(prefix) {
1567 // Stop when we've passed the prefix range
1568 if !key.starts_with(prefix) {
1569 break;
1570 }
1571
1572 // O(1) lookup in DashMap for version chain
1573 if let Some(chain) = self.data.get(&key)
1574 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1575 && let Some(value) = &v.value
1576 {
1577 results.push((key, value.clone()));
1578 }
1579 }
1580 } else if let Some(ref idx) = self.ordered_index {
1581 // Fast path: O(log N) seek to first key >= prefix
1582 for entry in idx.range(prefix.to_vec()..) {
1583 let key = entry.key();
1584
1585 // Stop when we've passed the prefix range
1586 if !key.starts_with(prefix) {
1587 break;
1588 }
1589
1590 // O(1) lookup in DashMap for version chain
1591 if let Some(chain) = self.data.get(key)
1592 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1593 && let Some(value) = &v.value
1594 {
1595 results.push((key.clone(), value.clone()));
1596 }
1597 }
1598 } else {
1599 // Fallback: O(N) full DashMap scan when ordered_index is disabled
1600 // Optimized with batch-friendly iteration
1601 for entry in self.data.iter() {
1602 let key = entry.key();
1603 if !key.starts_with(prefix) {
1604 continue;
1605 }
1606 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1607 && let Some(value) = &v.value
1608 {
1609 results.push((key.clone(), value.clone()));
1610 }
1611 }
1612 }
1613
1614 results
1615 }
1616
1617 /// Optimized full scan with batch allocation
1618 ///
1619 /// For use when scanning entire tables/namespaces.
1620 /// Pre-allocates based on actual data size.
1621 pub fn scan_all(
1622 &self,
1623 snapshot_ts: u64,
1624 current_txn_id: Option<u64>,
1625 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1626 let mut results = Vec::with_capacity(self.data.len());
1627
1628 for entry in self.data.iter() {
1629 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1630 && let Some(value) = &v.value
1631 {
1632 results.push((entry.key().clone(), value.clone()));
1633 }
1634 }
1635
1636 results
1637 }
1638
1639 /// Streaming scan iterator for very large datasets
1640 ///
1641 /// Returns an iterator that yields (key, value) pairs without
1642 /// materializing the entire result set in memory.
1643 pub fn scan_prefix_iter<'a>(
1644 &'a self,
1645 prefix: &'a [u8],
1646 snapshot_ts: u64,
1647 current_txn_id: Option<u64>,
1648 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1649 self.data.iter().filter_map(move |entry| {
1650 let key = entry.key();
1651 if !key.starts_with(prefix) {
1652 return None;
1653 }
1654 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1655 && let Some(value) = &v.value
1656 {
1657 Some((key.clone(), value.clone()))
1658 } else {
1659 None
1660 }
1661 })
1662 }
1663
1664 /// Scan range
1665 pub fn scan_range(
1666 &self,
1667 start: &[u8],
1668 end: &[u8],
1669 snapshot_ts: u64,
1670 current_txn_id: Option<u64>,
1671 ) -> Vec<(Vec<u8>, Vec<u8>)> {
1672 let mut results = Vec::new();
1673
1674 if let Some(ref idx) = self.deferred_index {
1675 // Deferred index path: sort-on-scan
1676 if end.is_empty() {
1677 for key in idx.range_from(start) {
1678 if let Some(chain) = self.data.get(&key)
1679 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1680 && let Some(value) = &v.value
1681 {
1682 results.push((key, value.clone()));
1683 }
1684 }
1685 } else {
1686 for key in idx.range(start, end) {
1687 if let Some(chain) = self.data.get(&key)
1688 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1689 && let Some(value) = &v.value
1690 {
1691 results.push((key, value.clone()));
1692 }
1693 }
1694 }
1695 } else if let Some(ref idx) = self.ordered_index {
1696 // Use range scan on SkipMap
1697 if end.is_empty() {
1698 // Unbounded end
1699 for entry in idx.range(start.to_vec()..) {
1700 let key = entry.key();
1701 if let Some(chain) = self.data.get(key)
1702 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1703 && let Some(value) = &v.value
1704 {
1705 results.push((key.clone(), value.clone()));
1706 }
1707 }
1708 } else {
1709 for entry in idx.range(start.to_vec()..end.to_vec()) {
1710 let key = entry.key();
1711 if let Some(chain) = self.data.get(key)
1712 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
1713 && let Some(value) = &v.value
1714 {
1715 results.push((key.clone(), value.clone()));
1716 }
1717 }
1718 }
1719 } else {
1720 // Fallback to full scan if no ordered index
1721 for entry in self.data.iter() {
1722 let key = entry.key();
1723
1724 if key.as_slice() < start {
1725 continue;
1726 }
1727 if !end.is_empty() && key.as_slice() >= end {
1728 continue;
1729 }
1730
1731 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
1732 && let Some(value) = &v.value
1733 {
1734 results.push((key.clone(), value.clone()));
1735 }
1736 }
1737 }
1738
1739 results
1740 }
1741
1742 /// Streaming range scan iterator for very large datasets
1743 ///
1744 /// Returns an iterator that yields (key, value) pairs without
1745 /// materializing the entire result set in memory. Uses the ordered
1746 /// index when available for O(log N + K) complexity.
1747 ///
1748 /// ## Zero-Allocation Design
1749 ///
1750 /// While the iterator itself cannot avoid allocations for returned
1751 /// values (since the caller needs ownership), it avoids:
1752 /// - Pre-materializing all results
1753 /// - Intermediate buffers
1754 /// - Repeated key comparisons for already-visited entries
1755 ///
1756 /// ## Usage
1757 ///
1758 /// ```ignore
1759 /// for (key, value) in memtable.scan_range_iter(b"start", b"end", ts, txn) {
1760 /// // Process each result as it arrives
1761 /// // Memory usage is O(1) per iteration, not O(N) total
1762 /// }
1763 /// ```
1764 pub fn scan_range_iter<'a>(
1765 &'a self,
1766 start: &'a [u8],
1767 end: &'a [u8],
1768 snapshot_ts: u64,
1769 current_txn_id: Option<u64>,
1770 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
1771 // Compact deferred index before scanning if needed
1772 if let Some(ref idx) = self.deferred_index {
1773 idx.compact();
1774 }
1775
1776 // Use either ordered index or full scan
1777 let use_ordered = self.ordered_index.is_some() || self.deferred_index.is_some();
1778
1779 // Create iterator based on availability of ordered index
1780 ScanRangeIterator {
1781 memtable: self,
1782 start: start.to_vec(),
1783 end: end.to_vec(),
1784 snapshot_ts,
1785 current_txn_id,
1786 use_ordered,
1787 ordered_iter: None,
1788 unordered_iter: None,
1789 initialized: false,
1790 }
1791 }
1792
1793 /// Get approximate size
1794 pub fn size(&self) -> u64 {
1795 self.size_bytes.load(Ordering::Relaxed)
1796 }
1797
1798 /// Garbage collect old versions using epoch-based dirty list
1799 ///
1800 /// O(expired_versions) instead of O(all_versions)
1801 /// Only visits keys that had versions created in the old epoch.
1802 pub fn gc(&self, min_active_ts: u64) -> usize {
1803 // Advance epoch and get the dirty keys from the old epoch
1804 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
1805
1806 if dirty_keys.is_empty() {
1807 return 0;
1808 }
1809
1810 let mut gc_count = 0;
1811
1812 // Only visit keys that were modified in the old epoch
1813 // Use a HashSet to deduplicate keys that were written multiple times
1814 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
1815
1816 for key in unique_keys {
1817 if let Some(mut entry) = self.data.get_mut(&key) {
1818 let before = entry.value().version_count();
1819 entry.value_mut().gc(min_active_ts);
1820 gc_count += before.saturating_sub(entry.value().version_count());
1821 }
1822 }
1823
1824 gc_count
1825 }
1826
1827 /// Legacy full-scan GC (for testing or when epoch-based tracking isn't available)
1828 #[allow(dead_code)]
1829 pub fn gc_full_scan(&self, min_active_ts: u64) -> usize {
1830 let mut gc_count = 0;
1831
1832 for mut entry in self.data.iter_mut() {
1833 let before = entry.value().version_count();
1834 entry.value_mut().gc(min_active_ts);
1835 gc_count += before.saturating_sub(entry.value().version_count());
1836 }
1837
1838 gc_count
1839 }
1840}
1841
1842// ============================================================================
1843// ArenaMvccMemTable - Arena-Backed MVCC MemTable with Reduced Allocations
1844// ============================================================================
1845
1846use crate::key_buffer::ArenaKeyHandle;
1847
1848/// Epoch-based dirty list using ArenaKeyHandle for reduced allocations
1849struct ArenaEpochDirtyList {
1850 epochs: [parking_lot::Mutex<Vec<ArenaKeyHandle>>; 4],
1851 current_epoch: AtomicU64,
1852}
1853
1854impl ArenaEpochDirtyList {
1855 fn new() -> Self {
1856 Self {
1857 epochs: [
1858 parking_lot::Mutex::new(Vec::new()),
1859 parking_lot::Mutex::new(Vec::new()),
1860 parking_lot::Mutex::new(Vec::new()),
1861 parking_lot::Mutex::new(Vec::new()),
1862 ],
1863 current_epoch: AtomicU64::new(0),
1864 }
1865 }
1866
1867 #[inline]
1868 fn record_version(&self, key: ArenaKeyHandle) {
1869 let epoch = self.current_epoch.load(Ordering::Relaxed);
1870 let idx = (epoch as usize) % EPOCH_RING_SIZE;
1871 self.epochs[idx].lock().push(key);
1872 }
1873
1874 fn advance_epoch(&self) -> (u64, Vec<ArenaKeyHandle>) {
1875 let old_epoch = self.current_epoch.fetch_add(1, Ordering::SeqCst);
1876 let old_idx = (old_epoch as usize) % EPOCH_RING_SIZE;
1877 let mut guard = self.epochs[old_idx].lock();
1878 let keys = std::mem::take(&mut *guard);
1879 (old_epoch, keys)
1880 }
1881}
1882
1883/// Arena-backed MVCC MemTable with optimized key storage
1884///
1885/// This version uses `ArenaKeyHandle` instead of `Vec<u8>` for keys,
1886/// reducing per-write allocations from 3 to 1:
1887///
1888/// - Before: 3 × Vec<u8> clones per write (dirty_list, ordered_index, data)
1889/// - After: 1 × ArenaKeyHandle creation, 3 × O(1) copies (16 bytes each)
1890///
1891/// ## Performance
1892///
1893/// Expected improvement: 20-30% throughput increase on write-heavy workloads
1894/// by reducing:
1895/// - Heap allocations: 3 → 1 per write
1896/// - Bytes copied: 3L → L + 48 bytes (where L = key length)
1897pub struct ArenaMvccMemTable {
1898 /// Key -> VersionChain (uses ArenaKeyHandle for O(1) hash)
1899 data: DashMap<ArenaKeyHandle, VersionChain>,
1900 /// Ordered index for prefix scans
1901 ordered_index: Option<SkipMap<ArenaKeyHandle, ()>>,
1902 /// Approximate size in bytes
1903 size_bytes: AtomicU64,
1904 /// Epoch-based dirty list (arena-backed)
1905 dirty_list: ArenaEpochDirtyList,
1906}
1907
1908impl ArenaMvccMemTable {
1909 pub fn new() -> Self {
1910 Self::with_ordered_index(true)
1911 }
1912
1913 pub fn with_ordered_index(enable_ordered_index: bool) -> Self {
1914 Self {
1915 data: DashMap::new(),
1916 ordered_index: if enable_ordered_index {
1917 Some(SkipMap::new())
1918 } else {
1919 None
1920 },
1921 size_bytes: AtomicU64::new(0),
1922 dirty_list: ArenaEpochDirtyList::new(),
1923 }
1924 }
1925
1926 /// Write a key-value pair using arena key handle
1927 ///
1928 /// Only creates ONE ArenaKeyHandle, then copies it (16 bytes) to each location.
1929 /// This is much cheaper than cloning Vec<u8> which requires heap allocation.
1930 pub fn write(&self, key: &[u8], value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
1931 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1932 let key_len = key.len();
1933
1934 // Create ONE ArenaKeyHandle - this is the only allocation for the key
1935 let key_handle = ArenaKeyHandle::new(key);
1936
1937 // Track in dirty list (O(1) copy of 16-byte handle)
1938 self.dirty_list.record_version(key_handle.clone());
1939
1940 // Insert into ordered index (O(1) copy of 16-byte handle)
1941 if let Some(ref idx) = self.ordered_index {
1942 idx.insert(key_handle.clone(), ());
1943 }
1944
1945 // Use entry API with the handle
1946 let mut entry = self.data.entry(key_handle).or_default();
1947
1948 if entry.has_write_conflict(txn_id) {
1949 return Err(SochDBError::Internal(
1950 "Write-write conflict detected".into(),
1951 ));
1952 }
1953 entry.add_uncommitted(value, txn_id);
1954 self.size_bytes
1955 .fetch_add((key_len + value_size) as u64, Ordering::Relaxed);
1956
1957 Ok(())
1958 }
1959
1960 /// Write batch using arena key handles
1961 pub fn write_batch(&self, writes: &[(&[u8], Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
1962 let mut total_size = 0u64;
1963
1964 for (key, value) in writes {
1965 let key_handle = ArenaKeyHandle::new(key);
1966
1967 self.dirty_list.record_version(key_handle.clone());
1968
1969 if let Some(ref idx) = self.ordered_index {
1970 idx.insert(key_handle.clone(), ());
1971 }
1972
1973 let mut entry = self.data.entry(key_handle).or_default();
1974
1975 if entry.has_write_conflict(txn_id) {
1976 return Err(SochDBError::Internal(
1977 "Write-write conflict detected".into(),
1978 ));
1979 }
1980
1981 let value_size = value.as_ref().map(|v| v.len()).unwrap_or(0);
1982 entry.add_uncommitted(value.clone(), txn_id);
1983 total_size += (key.len() + value_size) as u64;
1984 }
1985
1986 self.size_bytes.fetch_add(total_size, Ordering::Relaxed);
1987 Ok(())
1988 }
1989
1990 /// Read at snapshot timestamp
1991 pub fn read(
1992 &self,
1993 key: &[u8],
1994 snapshot_ts: u64,
1995 current_txn_id: Option<u64>,
1996 ) -> Option<Vec<u8>> {
1997 // Create temporary handle for lookup (uses pre-computed hash for O(1) lookup)
1998 let key_handle = ArenaKeyHandle::new(key);
1999 self.data.get(&key_handle).and_then(|chain| {
2000 chain
2001 .read_at(snapshot_ts, current_txn_id)
2002 .and_then(|v| v.value.clone())
2003 })
2004 }
2005
2006 /// Commit transaction
2007 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2008 for key in write_set {
2009 let key_handle = ArenaKeyHandle::new(key.as_slice());
2010 if let Some(mut chain) = self.data.get_mut(&key_handle) {
2011 chain.commit(txn_id, commit_ts);
2012 }
2013 }
2014 }
2015
2016 /// Abort transaction
2017 pub fn abort(&self, txn_id: u64) {
2018 for mut entry in self.data.iter_mut() {
2019 entry.value_mut().abort(txn_id);
2020 }
2021 }
2022
2023 /// Scan prefix
2024 pub fn scan_prefix(
2025 &self,
2026 prefix: &[u8],
2027 snapshot_ts: u64,
2028 current_txn_id: Option<u64>,
2029 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2030 let mut results = Vec::new();
2031 let prefix_handle = ArenaKeyHandle::new(prefix);
2032
2033 if let Some(ref idx) = self.ordered_index {
2034 for entry in idx.range(prefix_handle..) {
2035 let key = entry.key();
2036
2037 if !key.as_bytes().starts_with(prefix) {
2038 break;
2039 }
2040
2041 if let Some(chain) = self.data.get(key)
2042 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2043 && let Some(value) = &v.value
2044 {
2045 results.push((key.as_bytes().to_vec(), value.clone()));
2046 }
2047 }
2048 } else {
2049 for entry in self.data.iter() {
2050 let key = entry.key();
2051 if !key.as_bytes().starts_with(prefix) {
2052 continue;
2053 }
2054 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2055 && let Some(value) = &v.value
2056 {
2057 results.push((key.as_bytes().to_vec(), value.clone()));
2058 }
2059 }
2060 }
2061
2062 results
2063 }
2064
2065 /// Get approximate size
2066 pub fn size(&self) -> u64 {
2067 self.size_bytes.load(Ordering::Relaxed)
2068 }
2069
2070 /// Garbage collect old versions
2071 pub fn gc(&self, min_active_ts: u64) -> usize {
2072 let (_old_epoch, dirty_keys) = self.dirty_list.advance_epoch();
2073
2074 if dirty_keys.is_empty() {
2075 return 0;
2076 }
2077
2078 let mut gc_count = 0;
2079 let unique_keys: std::collections::HashSet<_> = dirty_keys.into_iter().collect();
2080
2081 for key in unique_keys {
2082 if let Some(mut entry) = self.data.get_mut(&key) {
2083 let before = entry.value().version_count();
2084 entry.value_mut().gc(min_active_ts);
2085 gc_count += before.saturating_sub(entry.value().version_count());
2086 }
2087 }
2088
2089 gc_count
2090 }
2091}
2092
2093impl Default for ArenaMvccMemTable {
2094 fn default() -> Self {
2095 Self::new()
2096 }
2097}
2098
2099// ============================================================================
2100// MemTableKind - Unified MemTable Abstraction (Principal Engineer Pattern)
2101// ============================================================================
2102
2103/// Configuration for memtable type selection
2104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2105pub enum MemTableType {
2106 /// Standard MVCC memtable with deferred sorting
2107 /// Best for: general workloads, balanced read/write
2108 Standard,
2109 /// Arena-backed memtable with reduced allocations
2110 /// Best for: write-heavy workloads, large keys
2111 Arena,
2112}
2113
2114impl Default for MemTableType {
2115 fn default() -> Self {
2116 // Default to Standard which now has deferred sorting
2117 MemTableType::Standard
2118 }
2119}
2120
2121/// Unified memtable abstraction using enum dispatch
2122///
2123/// This pattern provides:
2124/// - Zero-cost abstraction (no vtable, no dynamic dispatch)
2125/// - Type-safe switching between implementations
2126/// - Easy extensibility for future memtable types
2127///
2128/// ## Why Enum over Trait Object?
2129///
2130/// - Hot path performance: enum match is a single branch vs vtable indirection
2131/// - Cache friendliness: no pointer chasing
2132/// - Inlining: compiler can inline through enum dispatch
2133pub enum MemTableKind {
2134 Standard(MvccMemTable),
2135 Arena(ArenaMvccMemTable),
2136}
2137
2138impl MemTableKind {
2139 /// Create a new memtable of the specified type
2140 pub fn new(kind: MemTableType, enable_ordered_index: bool) -> Self {
2141 match kind {
2142 MemTableType::Standard => {
2143 MemTableKind::Standard(MvccMemTable::with_ordered_index(enable_ordered_index))
2144 }
2145 MemTableType::Arena => {
2146 MemTableKind::Arena(ArenaMvccMemTable::with_ordered_index(enable_ordered_index))
2147 }
2148 }
2149 }
2150
2151 /// Write a key-value pair
2152 #[inline]
2153 pub fn write(&self, key: Vec<u8>, value: Option<Vec<u8>>, txn_id: u64) -> Result<()> {
2154 match self {
2155 MemTableKind::Standard(m) => m.write(key, value, txn_id),
2156 MemTableKind::Arena(m) => m.write(&key, value, txn_id),
2157 }
2158 }
2159
2160 /// Write batch of key-value pairs
2161 #[inline]
2162 pub fn write_batch(&self, writes: &[(Vec<u8>, Option<Vec<u8>>)], txn_id: u64) -> Result<()> {
2163 match self {
2164 MemTableKind::Standard(m) => m.write_batch(writes, txn_id),
2165 MemTableKind::Arena(m) => {
2166 // Convert to arena-compatible format
2167 let arena_writes: Vec<(&[u8], Option<Vec<u8>>)> = writes
2168 .iter()
2169 .map(|(k, v)| (k.as_slice(), v.clone()))
2170 .collect();
2171 m.write_batch(&arena_writes, txn_id)
2172 }
2173 }
2174 }
2175
2176 /// Read at snapshot timestamp
2177 #[inline]
2178 pub fn read(
2179 &self,
2180 key: &[u8],
2181 snapshot_ts: u64,
2182 current_txn_id: Option<u64>,
2183 ) -> Option<Vec<u8>> {
2184 match self {
2185 MemTableKind::Standard(m) => m.read(key, snapshot_ts, current_txn_id),
2186 MemTableKind::Arena(m) => m.read(key, snapshot_ts, current_txn_id),
2187 }
2188 }
2189
2190 /// Commit transaction
2191 #[inline]
2192 pub fn commit(&self, txn_id: u64, commit_ts: u64, write_set: &HashSet<InlineKey>) {
2193 match self {
2194 MemTableKind::Standard(m) => m.commit(txn_id, commit_ts, write_set),
2195 MemTableKind::Arena(m) => m.commit(txn_id, commit_ts, write_set),
2196 }
2197 }
2198
2199 /// Abort transaction
2200 #[inline]
2201 pub fn abort(&self, txn_id: u64) {
2202 match self {
2203 MemTableKind::Standard(m) => m.abort(txn_id),
2204 MemTableKind::Arena(m) => m.abort(txn_id),
2205 }
2206 }
2207
2208 /// Scan prefix
2209 #[inline]
2210 pub fn scan_prefix(
2211 &self,
2212 prefix: &[u8],
2213 snapshot_ts: u64,
2214 current_txn_id: Option<u64>,
2215 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2216 match self {
2217 MemTableKind::Standard(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2218 MemTableKind::Arena(m) => m.scan_prefix(prefix, snapshot_ts, current_txn_id),
2219 }
2220 }
2221
2222 /// Scan range
2223 #[inline]
2224 pub fn scan_range(
2225 &self,
2226 start: &[u8],
2227 end: &[u8],
2228 snapshot_ts: u64,
2229 current_txn_id: Option<u64>,
2230 ) -> Vec<(Vec<u8>, Vec<u8>)> {
2231 match self {
2232 MemTableKind::Standard(m) => m.scan_range(start, end, snapshot_ts, current_txn_id),
2233 MemTableKind::Arena(m) => {
2234 // ArenaMvccMemTable doesn't have scan_range, use scan_prefix fallback
2235 let mut results = Vec::new();
2236 if let Some(ref idx) = m.ordered_index {
2237 let start_handle = ArenaKeyHandle::new(start);
2238 let end_handle = ArenaKeyHandle::new(end);
2239
2240 if end.is_empty() {
2241 for entry in idx.range(start_handle..) {
2242 let key = entry.key();
2243 if let Some(chain) = m.data.get(key)
2244 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2245 && let Some(value) = &v.value
2246 {
2247 results.push((key.as_bytes().to_vec(), value.clone()));
2248 }
2249 }
2250 } else {
2251 for entry in idx.range(start_handle..end_handle) {
2252 let key = entry.key();
2253 if let Some(chain) = m.data.get(key)
2254 && let Some(v) = chain.read_at(snapshot_ts, current_txn_id)
2255 && let Some(value) = &v.value
2256 {
2257 results.push((key.as_bytes().to_vec(), value.clone()));
2258 }
2259 }
2260 }
2261 } else {
2262 for entry in m.data.iter() {
2263 let key = entry.key();
2264 let key_bytes = key.as_bytes();
2265 if key_bytes < start {
2266 continue;
2267 }
2268 if !end.is_empty() && key_bytes >= end {
2269 continue;
2270 }
2271 if let Some(v) = entry.value().read_at(snapshot_ts, current_txn_id)
2272 && let Some(value) = &v.value
2273 {
2274 results.push((key_bytes.to_vec(), value.clone()));
2275 }
2276 }
2277 }
2278 results
2279 }
2280 }
2281 }
2282
2283 /// Scan range iterator (returns collected results for now)
2284 #[inline]
2285 pub fn scan_range_iter<'a>(
2286 &'a self,
2287 start: &'a [u8],
2288 end: &'a [u8],
2289 snapshot_ts: u64,
2290 current_txn_id: Option<u64>,
2291 ) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
2292 match self {
2293 MemTableKind::Standard(m) => {
2294 Box::new(m.scan_range_iter(start, end, snapshot_ts, current_txn_id))
2295 }
2296 MemTableKind::Arena(_) => {
2297 // Arena version returns collected results as iterator
2298 let results = self.scan_range(start, end, snapshot_ts, current_txn_id);
2299 Box::new(results.into_iter())
2300 }
2301 }
2302 }
2303
2304 /// Get approximate size
2305 #[inline]
2306 pub fn size(&self) -> u64 {
2307 match self {
2308 MemTableKind::Standard(m) => m.size(),
2309 MemTableKind::Arena(m) => m.size(),
2310 }
2311 }
2312
2313 /// Garbage collect old versions
2314 #[inline]
2315 pub fn gc(&self, min_active_ts: u64) -> usize {
2316 match self {
2317 MemTableKind::Standard(m) => m.gc(min_active_ts),
2318 MemTableKind::Arena(m) => m.gc(min_active_ts),
2319 }
2320 }
2321
2322 /// Get the kind of memtable
2323 pub fn kind(&self) -> MemTableType {
2324 match self {
2325 MemTableKind::Standard(_) => MemTableType::Standard,
2326 MemTableKind::Arena(_) => MemTableType::Arena,
2327 }
2328 }
2329}
2330
2331/// Durable storage engine with full ACID support
2332pub struct DurableStorage {
2333 /// Path to storage directory
2334 path: PathBuf,
2335 /// Write-ahead log
2336 wal: Arc<TxnWal>,
2337 /// MVCC manager
2338 mvcc: Arc<MvccManager>,
2339 /// In-memory data (unified abstraction over Standard/Arena)
2340 memtable: Arc<MemTableKind>,
2341 /// Per-transaction WAL buffers for batched writes
2342 /// Key: txn_id, Value: TxnWalBuffer that accumulates writes in memory
2343 /// At commit, buffer is flushed to WAL with single lock acquisition
2344 txn_write_buffers: DashMap<u64, TxnWalBuffer>,
2345 /// Group commit buffer (optional)
2346 group_commit: Option<Arc<EventDrivenGroupCommit>>,
2347 /// Recovery state
2348 needs_recovery: AtomicU64, // 1 = needs recovery
2349 /// Last checkpoint LSN
2350 last_checkpoint_lsn: AtomicU64,
2351 /// Synchronous mode (like SQLite's PRAGMA synchronous)
2352 /// 0 = OFF, 1 = NORMAL (periodic sync), 2 = FULL (sync every commit)
2353 sync_mode: AtomicU64,
2354 /// Commits since last sync (for NORMAL mode)
2355 commits_since_sync: AtomicU64,
2356 /// Adaptive batch sizing for NORMAL mode (Little's Law)
2357 /// Arrival rate in requests/sec × 1000 for precision
2358 arrival_rate_ema: AtomicU64,
2359 /// Last commit timestamp in microseconds
2360 last_commit_us: AtomicU64,
2361 /// Estimated fsync latency in microseconds
2362 fsync_latency_us: AtomicU64,
2363}
2364
2365impl DurableStorage {
2366 /// Open or create durable storage at path
2367 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
2368 Self::open_with_config(path, true)
2369 }
2370
2371 /// Open with configurable ordered index
2372 ///
2373 /// When `enable_ordered_index` is false, saves ~134 ns/op on writes
2374 /// but scan_prefix becomes O(N) instead of O(log N + K)
2375 pub fn open_with_config<P: AsRef<Path>>(path: P, enable_ordered_index: bool) -> Result<Self> {
2376 Self::open_with_full_config(path, enable_ordered_index, MemTableType::Standard)
2377 }
2378
2379 /// Open with arena-backed memtable for write-heavy workloads
2380 ///
2381 /// Uses ArenaMvccMemTable which reduces per-write allocations from 3 to 1.
2382 /// Best for workloads with:
2383 /// - High write throughput
2384 /// - Large keys (reduces allocation overhead)
2385 /// - Minimal concurrent reads during writes
2386 pub fn open_with_arena<P: AsRef<Path>>(path: P) -> Result<Self> {
2387 Self::open_with_full_config(path, true, MemTableType::Arena)
2388 }
2389
2390 /// Open with full configuration options
2391 ///
2392 /// # Arguments
2393 /// * `path` - Storage directory path
2394 /// * `enable_ordered_index` - Enable ordered index for O(log N) scans
2395 /// * `memtable_type` - Type of memtable to use (Standard or Arena)
2396 pub fn open_with_full_config<P: AsRef<Path>>(
2397 path: P,
2398 enable_ordered_index: bool,
2399 memtable_type: MemTableType,
2400 ) -> Result<Self> {
2401 let path = path.as_ref().to_path_buf();
2402 std::fs::create_dir_all(&path)?;
2403
2404 let wal_path = path.join("wal.log");
2405 let wal = Arc::new(TxnWal::new(&wal_path)?);
2406
2407 let storage = Self {
2408 path,
2409 wal: wal.clone(),
2410 mvcc: Arc::new(MvccManager::new()),
2411 memtable: Arc::new(MemTableKind::new(memtable_type, enable_ordered_index)),
2412 txn_write_buffers: DashMap::new(),
2413 group_commit: None,
2414 needs_recovery: AtomicU64::new(0),
2415 last_checkpoint_lsn: AtomicU64::new(0),
2416 sync_mode: AtomicU64::new(1), // Default: NORMAL (like SQLite)
2417 commits_since_sync: AtomicU64::new(0),
2418 // Adaptive batch sizing (Little's Law)
2419 arrival_rate_ema: AtomicU64::new(1_000_000), // 1000 req/s × 1000 initial
2420 last_commit_us: AtomicU64::new(0),
2421 fsync_latency_us: AtomicU64::new(5000), // 5ms default
2422 };
2423
2424 // Check if recovery needed
2425 if storage.check_recovery_needed()? {
2426 storage.needs_recovery.store(1, Ordering::SeqCst);
2427 }
2428
2429 Ok(storage)
2430 }
2431
2432 /// Open with group commit enabled
2433 pub fn open_with_group_commit<P: AsRef<Path>>(path: P) -> Result<Self> {
2434 Self::open_with_group_commit_and_config(path, true)
2435 }
2436
2437 /// Open with group commit and configurable ordered index
2438 pub fn open_with_group_commit_and_config<P: AsRef<Path>>(
2439 path: P,
2440 enable_ordered_index: bool,
2441 ) -> Result<Self> {
2442 let mut storage = Self::open_with_config(path, enable_ordered_index)?;
2443
2444 let wal = storage.wal.clone();
2445 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2446 // Write all commit records WITHOUT flushing (batch them)
2447 for &txn_id in txn_ids {
2448 let entry = TxnWalEntry::txn_commit(txn_id);
2449 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2450 }
2451
2452 // Then do a SINGLE flush + fsync for the entire batch
2453 wal.flush().map_err(|e| e.to_string())?;
2454 wal.sync().map_err(|e| e.to_string())?;
2455
2456 // Return commit timestamp
2457 Ok(std::time::SystemTime::now()
2458 .duration_since(std::time::UNIX_EPOCH)
2459 .unwrap()
2460 .as_micros() as u64)
2461 });
2462
2463 storage.group_commit = Some(Arc::new(gc));
2464 Ok(storage)
2465 }
2466
2467 /// Open with IndexPolicy for automatic memtable/index configuration
2468 ///
2469 /// This is the recommended constructor for new code. The policy determines:
2470 /// - Whether to use ordered index (ScanOptimized only)
2471 /// - Whether to use arena-backed memtable (WriteOptimized, AppendOnly)
2472 /// - Default settings optimized for the workload pattern
2473 ///
2474 /// # Arguments
2475 /// * `path` - Storage directory path
2476 /// * `policy` - Index policy determining write/scan tradeoffs
2477 /// * `group_commit` - Whether to enable group commit for throughput
2478 pub fn open_with_policy<P: AsRef<Path>>(
2479 path: P,
2480 policy: crate::index_policy::IndexPolicy,
2481 group_commit: bool,
2482 ) -> Result<Self> {
2483 use crate::index_policy::IndexPolicy;
2484
2485 // Derive configuration from policy
2486 let (enable_ordered_index, memtable_type) = match policy {
2487 IndexPolicy::WriteOptimized | IndexPolicy::AppendOnly => {
2488 // Write-heavy: no ordered index, use arena for reduced allocations
2489 (false, MemTableType::Arena)
2490 }
2491 IndexPolicy::Balanced => {
2492 // Mixed OLTP: deferred sorting (already implemented in Standard)
2493 (true, MemTableType::Standard)
2494 }
2495 IndexPolicy::ScanOptimized => {
2496 // Scan-heavy: maintain ordered index
2497 (true, MemTableType::Standard)
2498 }
2499 };
2500
2501 if group_commit {
2502 let mut storage = Self::open_with_full_config(path, enable_ordered_index, memtable_type)?;
2503
2504 let wal = storage.wal.clone();
2505 let gc = EventDrivenGroupCommit::new(move |txn_ids: &[u64]| {
2506 for &txn_id in txn_ids {
2507 let entry = TxnWalEntry::txn_commit(txn_id);
2508 wal.append_no_flush(&entry).map_err(|e| e.to_string())?;
2509 }
2510 wal.flush().map_err(|e| e.to_string())?;
2511 wal.sync().map_err(|e| e.to_string())?;
2512 Ok(std::time::SystemTime::now()
2513 .duration_since(std::time::UNIX_EPOCH)
2514 .unwrap()
2515 .as_micros() as u64)
2516 });
2517 storage.group_commit = Some(Arc::new(gc));
2518 Ok(storage)
2519 } else {
2520 Self::open_with_full_config(path, enable_ordered_index, memtable_type)
2521 }
2522 }
2523
2524 /// Get the memtable type being used
2525 pub fn memtable_type(&self) -> MemTableType {
2526 self.memtable.kind()
2527 }
2528
2529 /// Check if recovery is needed (dirty shutdown detection)
2530 ///
2531 /// Note: Recovery must ALWAYS run to rebuild the in-memory memtable from WAL.
2532 /// The clean_shutdown marker only tells us if there might be uncommitted transactions,
2533 /// but committed data still needs to be loaded from WAL into the memtable.
2534 fn check_recovery_needed(&self) -> Result<bool> {
2535 let marker_path = self.path.join(".clean_shutdown");
2536 if marker_path.exists() {
2537 // Clean shutdown - remove marker
2538 std::fs::remove_file(&marker_path)?;
2539 }
2540 // ALWAYS need recovery to rebuild memtable from WAL
2541 // The memtable is in-memory only and needs to be restored on every startup
2542 Ok(true)
2543 }
2544
2545 /// Perform crash recovery
2546 pub fn recover(&self) -> Result<RecoveryStats> {
2547 if self.needs_recovery.load(Ordering::SeqCst) == 0 {
2548 return Ok(RecoveryStats::default());
2549 }
2550
2551 let (writes, txn_count) = self.wal.replay_for_recovery()?;
2552
2553 // Apply committed writes to memtable
2554 let recovery_txn_id = self.wal.alloc_txn_id();
2555 let commit_ts = self.mvcc.alloc_commit_ts();
2556
2557 // Collect keys being written for efficient commit
2558 let mut write_set: HashSet<InlineKey> = HashSet::new();
2559 for (key, value) in &writes {
2560 write_set.insert(SmallVec::from_slice(key));
2561 self.memtable
2562 .write(key.clone(), Some(value.clone()), recovery_txn_id)?;
2563 }
2564 self.memtable.commit(recovery_txn_id, commit_ts, &write_set);
2565
2566 self.needs_recovery.store(0, Ordering::SeqCst);
2567
2568 Ok(RecoveryStats {
2569 transactions_recovered: txn_count,
2570 writes_recovered: writes.len(),
2571 commit_ts,
2572 })
2573 }
2574
2575 /// Begin a new transaction
2576 pub fn begin_transaction(&self) -> Result<u64> {
2577 let txn_id = self.wal.begin_transaction()?;
2578 self.mvcc.begin(txn_id);
2579 Ok(txn_id)
2580 }
2581
2582 /// Begin a transaction with a specific mode (ReadOnly/WriteOnly/ReadWrite)
2583 ///
2584 /// This enables mode-aware optimizations:
2585 /// - ReadOnly: Skip SSI tracking, 2.6x faster reads
2586 /// - WriteOnly: Skip read tracking, faster bulk inserts
2587 /// - ReadWrite: Full SSI for serializable isolation
2588 pub fn begin_with_mode(&self, mode: TransactionMode) -> Result<u64> {
2589 let txn_id = self.wal.begin_transaction()?;
2590 self.mvcc.begin_with_mode(txn_id, mode);
2591 Ok(txn_id)
2592 }
2593
2594 /// Read a key within a transaction
2595 #[inline]
2596 pub fn read(&self, txn_id: u64, key: &[u8]) -> Result<Option<Vec<u8>>> {
2597 // Fast path: get just snapshot_ts without cloning whole transaction
2598 let snapshot_ts = self
2599 .mvcc
2600 .get_snapshot_ts(txn_id)
2601 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2602
2603 // Record read for SSI (skipped for read-only transactions)
2604 self.mvcc.record_read(txn_id, key);
2605
2606 // Read at snapshot timestamp, seeing own uncommitted writes
2607 Ok(self.memtable.read(key, snapshot_ts, Some(txn_id)))
2608 }
2609
2610 /// Write a key-value pair within a transaction
2611 ///
2612 /// Writes are buffered and only flushed to disk on commit.
2613 /// This provides ~10× better throughput for batched inserts.
2614 pub fn write(&self, txn_id: u64, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
2615 // Use the zero-allocation path internally
2616 self.write_refs(txn_id, &key, &value)?;
2617
2618 Ok(())
2619 }
2620
2621 /// Write from references - zero allocation hot path
2622 ///
2623 /// Avoids cloning key/value by writing to WAL from refs directly,
2624 /// then only allocating once for memtable storage.
2625 #[inline]
2626 pub fn write_refs(&self, txn_id: u64, key: &[u8], value: &[u8]) -> Result<()> {
2627 // Record write for MVCC (uses InlineKey - zero allocation for small keys)
2628 self.mvcc.record_write(txn_id, key);
2629
2630 // Buffer writes in memory using TxnWalBuffer - NO WAL lock taken!
2631 // This reduces lock contention from O(writes) to O(1) per transaction
2632 self.txn_write_buffers
2633 .entry(txn_id)
2634 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2635 .append(key, value);
2636
2637 // Write to memtable (needs owned key/value for storage)
2638 self.memtable
2639 .write(key.to_vec(), Some(value.to_vec()), txn_id)?;
2640
2641 Ok(())
2642 }
2643
2644 /// Delete a key within a transaction
2645 pub fn delete(&self, txn_id: u64, key: Vec<u8>) -> Result<()> {
2646 // Record write (uses InlineKey - zero allocation for small keys)
2647 self.mvcc.record_write(txn_id, &key);
2648
2649 // Buffer tombstone in memory - NO WAL lock taken!
2650 self.txn_write_buffers
2651 .entry(txn_id)
2652 .or_insert_with(|| TxnWalBuffer::new(txn_id))
2653 .append(&key, &[]); // Empty value = tombstone
2654
2655 // Write tombstone to memtable
2656 self.memtable.write(key, None, txn_id)?;
2657
2658 Ok(())
2659 }
2660
2661 /// Batch write multiple key-value pairs with reduced overhead
2662 ///
2663 /// This API amortizes fixed costs over the batch:
2664 /// - Single DashMap entry lookup for TxnWalBuffer
2665 /// - Single MVCC write set update
2666 /// - Batch memtable operations
2667 ///
2668 /// Performance: ~2-3x faster than individual write_refs calls
2669 /// for batches of 100+ entries.
2670 ///
2671 /// # Arguments
2672 /// * `txn_id` - Transaction ID
2673 /// * `writes` - Slice of (key, value) pairs
2674 #[inline]
2675 pub fn write_batch_refs(&self, txn_id: u64, writes: &[(&[u8], &[u8])]) -> Result<()> {
2676 if writes.is_empty() {
2677 return Ok(());
2678 }
2679
2680 // Single DashMap access for entire batch
2681 let mut buffer_entry = self
2682 .txn_write_buffers
2683 .entry(txn_id)
2684 .or_insert_with(|| TxnWalBuffer::new(txn_id));
2685
2686 // Batch operations with reduced per-row overhead
2687 for (key, value) in writes {
2688 // Record write for MVCC
2689 self.mvcc.record_write(txn_id, key);
2690
2691 // Append to WAL buffer
2692 buffer_entry.append(key, value);
2693 }
2694 drop(buffer_entry);
2695
2696 // Batch write to memtable
2697 let owned_writes: Vec<(Vec<u8>, Option<Vec<u8>>)> = writes
2698 .iter()
2699 .map(|(k, v)| (k.to_vec(), Some(v.to_vec())))
2700 .collect();
2701 self.memtable.write_batch(&owned_writes, txn_id)?;
2702
2703 Ok(())
2704 }
2705
2706 /// Commit a transaction
2707 ///
2708 /// With sync_mode:
2709 /// - 0 (OFF): No sync, risk of data loss
2710 /// - 1 (NORMAL): Adaptive sync using Little's Law: W* = √(τ/λ)
2711 /// - 2 (FULL): Sync every commit (safest, slowest)
2712 pub fn commit(&self, txn_id: u64) -> Result<u64> {
2713 // First, flush all buffered writes to WAL with SINGLE lock acquisition
2714 // This is the key optimization: O(1) lock instead of O(writes) locks
2715 if let Some((_, buffer)) = self.txn_write_buffers.remove(&txn_id)
2716 && !buffer.is_empty()
2717 {
2718 // Flush entire buffer to WAL with one lock
2719 self.wal.flush_buffer(&buffer)?;
2720 }
2721
2722 // Use group commit if available, otherwise direct commit
2723 if let Some(gc) = &self.group_commit {
2724 // Submit to group commit and wait for result
2725 // This batches multiple commits into a single fsync
2726 gc.submit_and_wait(txn_id).map_err(SochDBError::Internal)?;
2727
2728 // Get commit timestamp and write_set from MVCC
2729 let (commit_ts, write_set) = self
2730 .mvcc
2731 .commit(txn_id)
2732 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2733
2734 // Commit in memtable (O(k) where k = keys written)
2735 self.memtable.commit(txn_id, commit_ts, &write_set);
2736
2737 Ok(commit_ts)
2738 } else {
2739 // Direct commit path with adaptive sync (Little's Law)
2740 let sync_mode = self.sync_mode.load(Ordering::Relaxed);
2741 let commits = self.commits_since_sync.fetch_add(1, Ordering::Relaxed);
2742
2743 // Update arrival rate for adaptive batching
2744 self.update_arrival_rate();
2745
2746 // Write commit record (no flush yet - BufWriter will buffer it)
2747 let entry = TxnWalEntry::txn_commit(txn_id);
2748 self.wal.append_no_flush(&entry)?;
2749
2750 // Determine if we should sync/flush based on mode
2751 let should_sync = match sync_mode {
2752 0 => false, // OFF: never sync
2753 1 => commits >= self.adaptive_batch_threshold(), // NORMAL: adaptive
2754 _ => true, // FULL: always sync
2755 };
2756
2757 if should_sync {
2758 // Measure fsync latency for adaptive tuning
2759 let start = std::time::Instant::now();
2760 self.wal.flush()?;
2761 self.wal.sync()?;
2762 let latency_us = start.elapsed().as_micros() as u64;
2763
2764 // Update fsync latency estimate (EMA with α = 0.1)
2765 let old_latency = self.fsync_latency_us.load(Ordering::Relaxed);
2766 let new_latency = (old_latency * 9 + latency_us) / 10;
2767 self.fsync_latency_us.store(new_latency, Ordering::Relaxed);
2768
2769 self.commits_since_sync.store(0, Ordering::Relaxed);
2770 }
2771
2772 // Get commit timestamp and write_set from MVCC
2773 let (commit_ts, write_set) = self
2774 .mvcc
2775 .commit(txn_id)
2776 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2777
2778 // Commit in memtable (O(k) where k = keys written)
2779 self.memtable.commit(txn_id, commit_ts, &write_set);
2780
2781 Ok(commit_ts)
2782 }
2783 }
2784
2785 /// Update arrival rate using exponential moving average
2786 #[inline]
2787 fn update_arrival_rate(&self) {
2788 let now_us = std::time::SystemTime::now()
2789 .duration_since(std::time::UNIX_EPOCH)
2790 .unwrap()
2791 .as_micros() as u64;
2792
2793 let last = self.last_commit_us.swap(now_us, Ordering::Relaxed);
2794
2795 if last > 0 {
2796 let delta_us = now_us.saturating_sub(last);
2797 if delta_us > 0 && delta_us < 10_000_000 {
2798 // Ignore gaps > 10s
2799 // Rate = 1_000_000 / delta_us (requests/sec)
2800 // Stored as rate × 1000 for precision
2801 let instant_rate = 1_000_000_000 / delta_us;
2802
2803 // EMA with α = 0.1
2804 let old_rate = self.arrival_rate_ema.load(Ordering::Relaxed);
2805 let new_rate = (old_rate * 9 + instant_rate) / 10;
2806 self.arrival_rate_ema.store(new_rate, Ordering::Relaxed);
2807 }
2808 }
2809 }
2810
2811 /// Compute optimal batch threshold using Little's Law
2812 ///
2813 /// W* = √(τ / λ) where τ = fsync latency, λ = arrival rate
2814 /// Returns the number of commits to batch before fsync
2815 #[inline]
2816 fn adaptive_batch_threshold(&self) -> u64 {
2817 let lambda = self.arrival_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0; // req/s
2818 let tau = self.fsync_latency_us.load(Ordering::Relaxed) as f64 / 1_000_000.0; // seconds
2819
2820 if lambda <= 0.0 || tau <= 0.0 {
2821 return 100; // Fallback to fixed threshold
2822 }
2823
2824 // Little's Law: W* = sqrt(2 × τ × λ)
2825 // This minimizes total time = wait_time + fsync_overhead
2826 let n_opt = (2.0 * tau * lambda).sqrt();
2827
2828 // Clamp between 1 and 1000
2829 (n_opt as u64).clamp(1, 1000)
2830 }
2831
2832 /// Set synchronous mode
2833 ///
2834 /// - 0: OFF - No fsync (risk of data loss)
2835 /// - 1: NORMAL - Periodic fsync (balanced)
2836 /// - 2: FULL - Fsync every commit (safest)
2837 pub fn set_sync_mode(&self, mode: u64) {
2838 self.sync_mode.store(mode.min(2), Ordering::Relaxed);
2839 }
2840
2841 /// Force a group commit flush (useful for benchmarking or testing)
2842 pub fn flush_group_commit(&self) {
2843 if let Some(gc) = &self.group_commit {
2844 gc.flush_batch();
2845 }
2846 }
2847
2848 /// Abort a transaction
2849 pub fn abort(&self, txn_id: u64) -> Result<()> {
2850 // Discard buffered writes (no need to write to WAL)
2851 self.txn_write_buffers.remove(&txn_id);
2852
2853 // Write abort to WAL
2854 self.wal.abort_transaction(txn_id)?;
2855
2856 // Abort in MVCC
2857 self.mvcc.abort(txn_id);
2858
2859 // Abort in memtable
2860 self.memtable.abort(txn_id);
2861
2862 Ok(())
2863 }
2864
2865 /// Scan keys with prefix
2866 #[inline]
2867 pub fn scan(&self, txn_id: u64, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2868 // Fast path: get just snapshot_ts without cloning whole transaction
2869 let snapshot_ts = self
2870 .mvcc
2871 .get_snapshot_ts(txn_id)
2872 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2873
2874 // Note: Scan doesn't record individual key reads for SSI (too expensive)
2875 // SSI conflicts are tracked at the prefix level if needed
2876 Ok(self.memtable.scan_prefix(prefix, snapshot_ts, Some(txn_id)))
2877 }
2878
2879 /// Scan keys in range
2880 #[inline]
2881 pub fn scan_range(
2882 &self,
2883 txn_id: u64,
2884 start: &[u8],
2885 end: &[u8],
2886 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
2887 let snapshot_ts = self
2888 .mvcc
2889 .get_snapshot_ts(txn_id)
2890 .ok_or_else(|| SochDBError::Internal("Transaction not found".into()))?;
2891
2892 Ok(self
2893 .memtable
2894 .scan_range(start, end, snapshot_ts, Some(txn_id)))
2895 }
2896
2897 /// Streaming scan for very large result sets
2898 ///
2899 /// Returns an iterator that yields (key, value) pairs without
2900 /// materializing the entire result set in memory.
2901 #[inline]
2902 pub fn scan_range_iter<'a>(
2903 &'a self,
2904 txn_id: u64,
2905 start: &'a [u8],
2906 end: &'a [u8],
2907 ) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
2908 let snapshot_ts = self.mvcc.get_snapshot_ts(txn_id).unwrap_or(0);
2909 self.memtable.scan_range_iter(start, end, snapshot_ts, Some(txn_id))
2910 }
2911
2912 /// Force fsync to disk
2913 pub fn fsync(&self) -> Result<()> {
2914 self.wal.sync()
2915 }
2916
2917 /// Write checkpoint
2918 pub fn checkpoint(&self) -> Result<u64> {
2919 let txn_id = 0; // System transaction
2920 let entry = TxnWalEntry::checkpoint(txn_id);
2921 let lsn = self.wal.append_sync(&entry)?;
2922 self.last_checkpoint_lsn.store(lsn, Ordering::SeqCst);
2923 Ok(lsn)
2924 }
2925
2926 /// Get storage statistics
2927 pub fn stats(&self) -> StorageStats {
2928 // Get WAL size from the WAL manager
2929 let wal_size = self.wal.size_bytes();
2930
2931 // Get active transaction count from MVCC
2932 let active_txns = self.mvcc.active_transaction_count();
2933
2934 StorageStats {
2935 memtable_size_bytes: self.memtable.size(),
2936 wal_size_bytes: wal_size,
2937 active_transactions: active_txns,
2938 min_active_snapshot: self.mvcc.min_active_snapshot(),
2939 last_checkpoint_lsn: self.last_checkpoint_lsn.load(Ordering::SeqCst),
2940 }
2941 }
2942
2943 /// Garbage collect old versions
2944 pub fn gc(&self) -> usize {
2945 let min_ts = self.mvcc.min_active_snapshot();
2946 self.memtable.gc(min_ts)
2947 }
2948
2949 /// Clean shutdown
2950 pub fn shutdown(&self) -> Result<()> {
2951 // Sync WAL
2952 self.fsync()?;
2953
2954 // Write clean shutdown marker
2955 let marker_path = self.path.join(".clean_shutdown");
2956 std::fs::write(&marker_path, b"clean")?;
2957
2958 Ok(())
2959 }
2960}
2961
2962impl Drop for DurableStorage {
2963 fn drop(&mut self) {
2964 let _ = self.shutdown();
2965 }
2966}
2967
2968/// Recovery statistics
2969#[derive(Debug, Default)]
2970pub struct RecoveryStats {
2971 pub transactions_recovered: usize,
2972 pub writes_recovered: usize,
2973 pub commit_ts: u64,
2974}
2975
2976/// Storage statistics
2977#[derive(Debug, Default)]
2978pub struct StorageStats {
2979 pub memtable_size_bytes: u64,
2980 pub wal_size_bytes: u64,
2981 pub active_transactions: usize,
2982 pub min_active_snapshot: u64,
2983 pub last_checkpoint_lsn: u64,
2984}
2985
2986#[cfg(test)]
2987mod tests {
2988 use super::*;
2989 use tempfile::tempdir;
2990
2991 #[test]
2992 fn test_basic_transaction() {
2993 let dir = tempdir().unwrap();
2994 let storage = DurableStorage::open(dir.path()).unwrap();
2995
2996 // Begin transaction
2997 let txn_id = storage.begin_transaction().unwrap();
2998
2999 // Write data
3000 storage
3001 .write(txn_id, b"key1".to_vec(), b"value1".to_vec())
3002 .unwrap();
3003 storage
3004 .write(txn_id, b"key2".to_vec(), b"value2".to_vec())
3005 .unwrap();
3006
3007 // Read back (within same transaction)
3008 let v1 = storage.read(txn_id, b"key1").unwrap();
3009 assert_eq!(v1, Some(b"value1".to_vec()));
3010
3011 // Commit
3012 let commit_ts = storage.commit(txn_id).unwrap();
3013 assert!(commit_ts > 0);
3014
3015 // Read in new transaction
3016 let txn2 = storage.begin_transaction().unwrap();
3017 let v1 = storage.read(txn2, b"key1").unwrap();
3018 assert_eq!(v1, Some(b"value1".to_vec()));
3019 storage.abort(txn2).unwrap();
3020 }
3021
3022 #[test]
3023 fn test_snapshot_isolation() {
3024 let dir = tempdir().unwrap();
3025 let storage = DurableStorage::open(dir.path()).unwrap();
3026
3027 // T1: Write initial value
3028 let t1 = storage.begin_transaction().unwrap();
3029 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3030 storage.commit(t1).unwrap();
3031
3032 // T2: Start reading (snapshot at this point)
3033 let t2 = storage.begin_transaction().unwrap();
3034
3035 // T3: Update the value
3036 let t3 = storage.begin_transaction().unwrap();
3037 storage.write(t3, b"key".to_vec(), b"v2".to_vec()).unwrap();
3038 storage.commit(t3).unwrap();
3039
3040 // T2 should still see v1 (snapshot isolation)
3041 let v = storage.read(t2, b"key").unwrap();
3042 assert_eq!(v, Some(b"v1".to_vec()));
3043
3044 // New transaction should see v2
3045 let t4 = storage.begin_transaction().unwrap();
3046 let v = storage.read(t4, b"key").unwrap();
3047 assert_eq!(v, Some(b"v2".to_vec()));
3048
3049 storage.abort(t2).unwrap();
3050 storage.abort(t4).unwrap();
3051 }
3052
3053 #[test]
3054 fn test_abort_transaction() {
3055 let dir = tempdir().unwrap();
3056 let storage = DurableStorage::open(dir.path()).unwrap();
3057
3058 // Write initial value
3059 let t1 = storage.begin_transaction().unwrap();
3060 storage.write(t1, b"key".to_vec(), b"v1".to_vec()).unwrap();
3061 storage.commit(t1).unwrap();
3062
3063 // Start transaction that will abort
3064 let t2 = storage.begin_transaction().unwrap();
3065 storage.write(t2, b"key".to_vec(), b"v2".to_vec()).unwrap();
3066 storage.abort(t2).unwrap();
3067
3068 // New transaction should see v1 (aborted changes not visible)
3069 let t3 = storage.begin_transaction().unwrap();
3070 let v = storage.read(t3, b"key").unwrap();
3071 assert_eq!(v, Some(b"v1".to_vec()));
3072 storage.abort(t3).unwrap();
3073 }
3074
3075 #[test]
3076 fn test_crash_recovery() {
3077 let dir = tempdir().unwrap();
3078
3079 // Phase 1: Write data and commit
3080 {
3081 let storage = DurableStorage::open(dir.path()).unwrap();
3082
3083 // Set sync mode to FULL to ensure data is synced before "crash"
3084 storage.set_sync_mode(2); // FULL: sync every commit
3085
3086 let txn = storage.begin_transaction().unwrap();
3087 storage
3088 .write(txn, b"persist".to_vec(), b"data".to_vec())
3089 .unwrap();
3090 storage.commit(txn).unwrap();
3091
3092 // Simulate crash (no clean shutdown)
3093 std::mem::forget(storage);
3094 }
3095
3096 // Phase 2: Reopen and recover
3097 {
3098 let storage = DurableStorage::open(dir.path()).unwrap();
3099 let stats = storage.recover().unwrap();
3100 assert!(stats.transactions_recovered > 0 || stats.writes_recovered > 0);
3101
3102 // Data should be recovered
3103 let txn = storage.begin_transaction().unwrap();
3104 let v = storage.read(txn, b"persist").unwrap();
3105 assert_eq!(v, Some(b"data".to_vec()));
3106 storage.abort(txn).unwrap();
3107 }
3108 }
3109
3110 #[test]
3111 fn test_scan_prefix() {
3112 let dir = tempdir().unwrap();
3113 let storage = DurableStorage::open(dir.path()).unwrap();
3114
3115 let txn = storage.begin_transaction().unwrap();
3116 storage
3117 .write(txn, b"user:1".to_vec(), b"alice".to_vec())
3118 .unwrap();
3119 storage
3120 .write(txn, b"user:2".to_vec(), b"bob".to_vec())
3121 .unwrap();
3122 storage
3123 .write(txn, b"order:1".to_vec(), b"order1".to_vec())
3124 .unwrap();
3125 storage.commit(txn).unwrap();
3126
3127 let txn2 = storage.begin_transaction().unwrap();
3128 let users = storage.scan(txn2, b"user:").unwrap();
3129 assert_eq!(users.len(), 2);
3130 storage.abort(txn2).unwrap();
3131 }
3132
3133 #[test]
3134 fn test_delete() {
3135 let dir = tempdir().unwrap();
3136 let storage = DurableStorage::open(dir.path()).unwrap();
3137
3138 // Insert
3139 let t1 = storage.begin_transaction().unwrap();
3140 storage
3141 .write(t1, b"key".to_vec(), b"value".to_vec())
3142 .unwrap();
3143 storage.commit(t1).unwrap();
3144
3145 // Verify exists
3146 let t2 = storage.begin_transaction().unwrap();
3147 assert!(storage.read(t2, b"key").unwrap().is_some());
3148 storage.abort(t2).unwrap();
3149
3150 // Delete
3151 let t3 = storage.begin_transaction().unwrap();
3152 storage.delete(t3, b"key".to_vec()).unwrap();
3153 storage.commit(t3).unwrap();
3154
3155 // Verify deleted
3156 let t4 = storage.begin_transaction().unwrap();
3157 assert!(storage.read(t4, b"key").unwrap().is_none());
3158 storage.abort(t4).unwrap();
3159 }
3160
3161 #[test]
3162 fn test_gc() {
3163 let dir = tempdir().unwrap();
3164 let storage = DurableStorage::open(dir.path()).unwrap();
3165
3166 // Create multiple versions
3167 for i in 0..10 {
3168 let txn = storage.begin_transaction().unwrap();
3169 storage
3170 .write(txn, b"key".to_vec(), format!("v{}", i).into_bytes())
3171 .unwrap();
3172 storage.commit(txn).unwrap();
3173 }
3174
3175 // GC should reclaim old versions
3176 let gc_count = storage.gc();
3177 // At least some versions should be collected
3178 // (exact count depends on implementation)
3179 let _ = gc_count; // gc_count is usize, always >= 0
3180 }
3181
3182 #[test]
3183 fn test_group_commit() {
3184 use std::sync::Arc;
3185 use std::thread;
3186
3187 let dir = tempdir().unwrap();
3188 let storage = Arc::new(DurableStorage::open_with_group_commit(dir.path()).unwrap());
3189
3190 // Spawn multiple threads to commit concurrently
3191 let mut handles = vec![];
3192 for i in 0..4 {
3193 let storage = Arc::clone(&storage);
3194 handles.push(thread::spawn(move || {
3195 let txn = storage.begin_transaction().unwrap();
3196 storage
3197 .write(
3198 txn,
3199 format!("key{}", i).into_bytes(),
3200 format!("val{}", i).into_bytes(),
3201 )
3202 .unwrap();
3203 storage.commit(txn).unwrap()
3204 }));
3205 }
3206
3207 // Wait for all commits
3208 let mut commit_times = vec![];
3209 for h in handles {
3210 commit_times.push(h.join().unwrap());
3211 }
3212
3213 // All commits should succeed
3214 assert!(commit_times.iter().all(|&ts| ts > 0));
3215
3216 // Verify data persisted
3217 let txn = storage.begin_transaction().unwrap();
3218 for i in 0..4 {
3219 let val = storage.read(txn, format!("key{}", i).as_bytes()).unwrap();
3220 assert_eq!(val, Some(format!("val{}", i).into_bytes()));
3221 }
3222 storage.abort(txn).unwrap();
3223 }
3224
3225 // ==================== ArenaMvccMemTable Tests ====================
3226
3227 #[test]
3228 fn test_arena_memtable_basic_write_read() {
3229 let memtable = ArenaMvccMemTable::new();
3230
3231 // Write some values
3232 memtable
3233 .write(b"key1", Some(b"value1".to_vec()), 1)
3234 .unwrap();
3235 memtable
3236 .write(b"key2", Some(b"value2".to_vec()), 1)
3237 .unwrap();
3238
3239 // Read them back (uncommitted, so need txn_id match)
3240 assert_eq!(memtable.read(b"key1", 0, Some(1)), Some(b"value1".to_vec()));
3241 assert_eq!(memtable.read(b"key2", 0, Some(1)), Some(b"value2".to_vec()));
3242 assert_eq!(memtable.read(b"key3", 0, Some(1)), None);
3243 }
3244
3245 #[test]
3246 fn test_arena_memtable_update() {
3247 let memtable = ArenaMvccMemTable::new();
3248
3249 memtable.write(b"key", Some(b"v1".to_vec()), 1).unwrap();
3250 memtable.write(b"key", Some(b"v2".to_vec()), 1).unwrap();
3251
3252 assert_eq!(memtable.read(b"key", 0, Some(1)), Some(b"v2".to_vec()));
3253 }
3254
3255 #[test]
3256 fn test_arena_memtable_delete() {
3257 let memtable = ArenaMvccMemTable::new();
3258
3259 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3260 memtable.write(b"key", None, 1).unwrap(); // Delete = None value
3261
3262 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3263 }
3264
3265 #[test]
3266 fn test_arena_memtable_scan_prefix() {
3267 let memtable = ArenaMvccMemTable::new();
3268
3269 memtable
3270 .write(b"user:1:name", Some(b"Alice".to_vec()), 1)
3271 .unwrap();
3272 memtable
3273 .write(b"user:1:email", Some(b"alice@test.com".to_vec()), 1)
3274 .unwrap();
3275 memtable
3276 .write(b"user:2:name", Some(b"Bob".to_vec()), 1)
3277 .unwrap();
3278 memtable
3279 .write(b"order:1", Some(b"order_data".to_vec()), 1)
3280 .unwrap();
3281
3282 // Create a write set and commit
3283 let mut write_set = HashSet::new();
3284 write_set.insert(InlineKey::from_slice(b"user:1:name"));
3285 write_set.insert(InlineKey::from_slice(b"user:1:email"));
3286 write_set.insert(InlineKey::from_slice(b"user:2:name"));
3287 write_set.insert(InlineKey::from_slice(b"order:1"));
3288 memtable.commit(1, 10, &write_set);
3289
3290 // Scan for user:1:* (snapshot_ts > commit_ts to see committed data)
3291 let results = memtable.scan_prefix(b"user:1:", 11, None);
3292 assert_eq!(results.len(), 2);
3293
3294 // Scan for all users
3295 let results = memtable.scan_prefix(b"user:", 11, None);
3296 assert_eq!(results.len(), 3);
3297 }
3298
3299 #[test]
3300 fn test_arena_memtable_write_batch() {
3301 let memtable = ArenaMvccMemTable::new();
3302
3303 let writes: Vec<(&[u8], Option<Vec<u8>>)> = vec![
3304 (b"k1", Some(b"v1".to_vec())),
3305 (b"k2", Some(b"v2".to_vec())),
3306 (b"k3", Some(b"v3".to_vec())),
3307 ];
3308
3309 memtable.write_batch(&writes, 1).unwrap();
3310
3311 assert_eq!(memtable.read(b"k1", 0, Some(1)), Some(b"v1".to_vec()));
3312 assert_eq!(memtable.read(b"k2", 0, Some(1)), Some(b"v2".to_vec()));
3313 assert_eq!(memtable.read(b"k3", 0, Some(1)), Some(b"v3".to_vec()));
3314 }
3315
3316 #[test]
3317 fn test_arena_memtable_gc() {
3318 let memtable = ArenaMvccMemTable::new();
3319
3320 // Write multiple versions
3321 for i in 0..10 {
3322 memtable
3323 .write(b"key", Some(format!("v{}", i).into_bytes()), i + 1)
3324 .unwrap();
3325
3326 let mut write_set = HashSet::new();
3327 write_set.insert(InlineKey::from_slice(b"key"));
3328 memtable.commit(i + 1, (i + 1) * 10, &write_set);
3329 }
3330
3331 // GC old versions
3332 let gc_count = memtable.gc(90);
3333 let _ = gc_count; // gc_count is usize, always >= 0
3334 }
3335
3336 #[test]
3337 fn test_arena_memtable_size_tracking() {
3338 let memtable = ArenaMvccMemTable::new();
3339
3340 assert_eq!(memtable.size(), 0);
3341
3342 memtable.write(b"key", Some(b"value".to_vec()), 1).unwrap();
3343
3344 assert!(memtable.size() > 0);
3345 }
3346
3347 #[test]
3348 fn test_arena_memtable_abort() {
3349 let memtable = ArenaMvccMemTable::new();
3350
3351 memtable
3352 .write(b"key", Some(b"uncommitted".to_vec()), 1)
3353 .unwrap();
3354
3355 // Visible to same txn
3356 assert_eq!(
3357 memtable.read(b"key", 0, Some(1)),
3358 Some(b"uncommitted".to_vec())
3359 );
3360
3361 // Not visible to other txns
3362 assert_eq!(memtable.read(b"key", 0, Some(2)), None);
3363
3364 // Abort
3365 memtable.abort(1);
3366
3367 // No longer visible
3368 assert_eq!(memtable.read(b"key", 0, Some(1)), None);
3369 }
3370
3371 // ========================================================================
3372 // MemTableKind Tests - Unified Abstraction
3373 // ========================================================================
3374
3375 #[test]
3376 fn test_memtable_kind_standard() {
3377 let memtable = MemTableKind::new(MemTableType::Standard, true);
3378 assert_eq!(memtable.kind(), MemTableType::Standard);
3379
3380 // Write and read
3381 memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3382
3383 // Commit transaction at ts=100
3384 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3385 memtable.commit(1, 100, &write_set);
3386
3387 // Read after commit - snapshot_ts must be > commit_ts for visibility
3388 let v = memtable.read(b"key1", 101, None);
3389 assert_eq!(v, Some(b"value1".to_vec()));
3390 }
3391
3392 #[test]
3393 fn test_memtable_kind_arena() {
3394 let memtable = MemTableKind::new(MemTableType::Arena, true);
3395 assert_eq!(memtable.kind(), MemTableType::Arena);
3396
3397 // Write and read
3398 memtable.write(b"key1".to_vec(), Some(b"value1".to_vec()), 1).unwrap();
3399
3400 // Commit at ts=100
3401 let write_set = std::iter::once(InlineKey::from_slice(b"key1")).collect();
3402 memtable.commit(1, 100, &write_set);
3403
3404 // Read after commit - snapshot_ts > commit_ts
3405 let v = memtable.read(b"key1", 101, None);
3406 assert_eq!(v, Some(b"value1".to_vec()));
3407 }
3408
3409 #[test]
3410 fn test_memtable_kind_scan_range() {
3411 // Test both implementations have consistent behavior
3412 for kind in [MemTableType::Standard, MemTableType::Arena] {
3413 let memtable = MemTableKind::new(kind, true);
3414
3415 // Write some data
3416 for i in 0..5 {
3417 let key = format!("key{}", i);
3418 let value = format!("value{}", i);
3419 memtable.write(key.into_bytes(), Some(value.into_bytes()), 1).unwrap();
3420 }
3421
3422 // Commit all at ts=100
3423 let write_set: HashSet<InlineKey> = (0..5)
3424 .map(|i| InlineKey::from_slice(format!("key{}", i).as_bytes()))
3425 .collect();
3426 memtable.commit(1, 100, &write_set);
3427
3428 // Scan range with snapshot_ts > commit_ts
3429 let results = memtable.scan_range(b"key1", b"key4", 101, None);
3430 assert_eq!(results.len(), 3, "kind={:?} should have 3 results (key1, key2, key3)", kind);
3431 }
3432 }
3433
3434 #[test]
3435 fn test_durable_storage_arena() {
3436 let dir = tempdir().unwrap();
3437 let storage = DurableStorage::open_with_arena(dir.path()).unwrap();
3438
3439 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3440
3441 // Basic transaction should work the same
3442 let txn_id = storage.begin_transaction().unwrap();
3443 storage.write(txn_id, b"key1".to_vec(), b"value1".to_vec()).unwrap();
3444 storage.commit(txn_id).unwrap();
3445
3446 let txn2 = storage.begin_transaction().unwrap();
3447 let v = storage.read(txn2, b"key1").unwrap();
3448 assert_eq!(v, Some(b"value1".to_vec()));
3449 storage.abort(txn2).unwrap();
3450 }
3451
3452 #[test]
3453 fn test_durable_storage_full_config() {
3454 let dir = tempdir().unwrap();
3455
3456 // Test with Arena and ordered index enabled
3457 let storage = DurableStorage::open_with_full_config(
3458 dir.path(),
3459 true,
3460 MemTableType::Arena,
3461 ).unwrap();
3462
3463 assert_eq!(storage.memtable_type(), MemTableType::Arena);
3464
3465 // Write multiple keys
3466 let txn = storage.begin_transaction().unwrap();
3467 for i in 0..10 {
3468 let key = format!("key{:02}", i);
3469 let value = format!("value{}", i);
3470 storage.write(txn, key.into_bytes(), value.into_bytes()).unwrap();
3471 }
3472 storage.commit(txn).unwrap();
3473
3474 // Scan should work (uses scan method for prefix)
3475 let txn2 = storage.begin_transaction().unwrap();
3476 let results = storage.scan(txn2, b"key0").unwrap();
3477 assert_eq!(results.len(), 10); // key00 through key09
3478 storage.abort(txn2).unwrap();
3479 }
3480}