sochdb_storage/version_set.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SuperVersion Metadata + Copy-on-Write Version Set
19//!
20//! This module implements RocksDB-style SuperVersion metadata management for
21//! near lock-free reads. The key insight is that read paths only need a consistent
22//! snapshot of metadata (memtable, immutable memtables, SSTable levels), not
23//! exclusive access to the underlying data structures.
24//!
25//! ## Problem Analysis
26//!
27//! Previous implementation suffered from lock contention:
28//! - Foreground reads must traverse memtable (RwLock) → immutables → levels
29//! - Each level switch may acquire additional locks
30//! - Compaction/flush modifies metadata while readers hold references
31//! - Lock-order complexity leads to deadlock potential
32//!
33//! ## Solution: SuperVersion + ArcSwap
34//!
35//! ```text
36//! ┌─────────────────────────────────────────────────────────────────┐
37//! │ VersionSet │
38//! │ ┌─────────────────────────────────────────────────────────────┐│
39//! │ │ current: ArcSwap<SuperVersion> ◄─── Atomic swap (O(1)) ││
40//! │ └─────────────────────────────────────────────────────────────┘│
41//! │ │ │
42//! │ ▼ │
43//! │ ┌─────────────────────────────────────────────────────────────┐│
44//! │ │ SuperVersion ││
45//! │ │ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ ││
46//! │ │ │ memtable │ │ immutables │ │ level_files │ ││
47//! │ │ │ Arc<...> │ │ Arc<Vec<..>> │ │ Arc<Vec<Level>> │ ││
48//! │ │ └─────────────┘ └──────────────┘ └───────────────────┘ ││
49//! │ └─────────────────────────────────────────────────────────────┘│
50//! └─────────────────────────────────────────────────────────────────┘
51//! ```
52//!
53//! ## Read Path (Lock-Free)
54//!
55//! 1. Load current SuperVersion via atomic: `let sv = version_set.get();` // O(1)
56//! 2. Search memtable → immutables → levels using sv references
57//! 3. Release sv when done (Arc drop)
58//!
59//! No locks acquired. Readers are completely decoupled from writers.
60//!
61//! ## Write Path (Copy-on-Write)
62//!
63//! 1. Clone current SuperVersion's inner Arcs
64//! 2. Modify the new copy (e.g., add new immutable memtable)
65//! 3. Atomically swap new SuperVersion into place
66//! 4. Old SuperVersion kept alive by existing readers (Arc)
67//!
68//! ## Complexity Analysis
69//!
70//! | Operation | Old (with locks) | New (SuperVersion) |
71//! |--------------------|----------------------|---------------------------|
72//! | Read acquire | O(1) but contended | O(1) atomic load |
73//! | Read release | O(1) unlock | O(1) Arc decrement |
74//! | Metadata update | O(1) + lock wait | O(changed_metadata) clone |
75//! | Concurrent reads | Serialized on RwLock | Truly parallel |
76//!
77//! For 8 threads, expected speedup: ~6-8x on read-heavy workloads.
78
79use arc_swap::{ArcSwap, Guard};
80use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
81use parking_lot::Mutex;
82use std::collections::BTreeMap;
83use std::path::PathBuf;
84use std::sync::atomic::{AtomicU64, Ordering};
85use std::sync::Arc;
86
87// =============================================================================
88// File Metadata
89// =============================================================================
90
91/// Metadata for an SSTable file
92#[derive(Debug, Clone)]
93pub struct FileMetadata {
94 /// Unique file number
95 pub file_number: u64,
96 /// File size in bytes
97 pub file_size: u64,
98 /// Smallest key in file (for range queries)
99 pub smallest_key: Vec<u8>,
100 /// Largest key in file (for range queries)
101 pub largest_key: Vec<u8>,
102 /// Number of entries in file
103 pub num_entries: u64,
104 /// Minimum sequence number in file
105 pub min_seqno: u64,
106 /// Maximum sequence number in file
107 pub max_seqno: u64,
108 /// Path to the file
109 pub path: PathBuf,
110 /// Bloom filter (if loaded)
111 pub bloom_filter: Option<Arc<BloomFilterHandle>>,
112 /// Whether file is being compacted
113 pub being_compacted: bool,
114}
115
116impl FileMetadata {
117 /// Check if key might be in this file using bloom filter
118 #[inline]
119 pub fn may_contain(&self, key: &[u8]) -> bool {
120 match &self.bloom_filter {
121 Some(bf) => bf.may_contain(key),
122 None => true, // No filter = must check file
123 }
124 }
125
126 /// Check if a key range overlaps with this file
127 #[inline]
128 pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
129 if end.is_empty() {
130 // Unbounded end - overlaps if smallest_key <= end conceptually
131 self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
132 } else {
133 // Standard range overlap check
134 self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
135 }
136 }
137}
138
139/// Handle to a bloom filter (may be memory-mapped or in-memory)
140#[derive(Debug)]
141pub struct BloomFilterHandle {
142 /// The bloom filter bits
143 bits: Vec<u64>,
144 /// Number of hash functions
145 num_hashes: u32,
146}
147
148impl BloomFilterHandle {
149 /// Create a new bloom filter handle
150 pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
151 Self { bits, num_hashes }
152 }
153
154 /// Check if key may be present
155 #[inline]
156 pub fn may_contain(&self, key: &[u8]) -> bool {
157 if self.bits.is_empty() {
158 return true;
159 }
160 let num_bits = self.bits.len() * 64;
161 let h1 = Self::hash1(key);
162 let h2 = Self::hash2(key);
163
164 for i in 0..self.num_hashes {
165 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
166 let bit_idx = (h as usize) % num_bits;
167 let word_idx = bit_idx / 64;
168 let bit_pos = bit_idx % 64;
169 if self.bits[word_idx] & (1 << bit_pos) == 0 {
170 return false;
171 }
172 }
173 true
174 }
175
176 #[inline]
177 fn hash1(key: &[u8]) -> u64 {
178 use std::collections::hash_map::DefaultHasher;
179 use std::hash::{Hash, Hasher};
180 let mut hasher = DefaultHasher::new();
181 key.hash(&mut hasher);
182 hasher.finish()
183 }
184
185 #[inline]
186 fn hash2(key: &[u8]) -> u64 {
187 twox_hash::xxh3::hash64(key)
188 }
189}
190
191// =============================================================================
192// Level Metadata
193// =============================================================================
194
195/// Metadata for a single level in the LSM tree
196#[derive(Debug, Clone)]
197pub struct LevelMetadata {
198 /// Level number (0 = unsorted, 1+ = sorted)
199 pub level: u32,
200 /// Files in this level (sorted by smallest_key for L1+)
201 pub files: Vec<Arc<FileMetadata>>,
202 /// Total size of files in this level
203 pub total_size: u64,
204 /// Target size for this level (for compaction decisions)
205 pub target_size: u64,
206 /// Compaction score (size / target_size)
207 pub compaction_score: f64,
208}
209
210impl LevelMetadata {
211 /// Create a new empty level
212 pub fn new(level: u32, target_size: u64) -> Self {
213 Self {
214 level,
215 files: Vec::new(),
216 total_size: 0,
217 target_size,
218 compaction_score: 0.0,
219 }
220 }
221
222 /// Find files that may contain a key using binary search (for L1+)
223 pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
224 if self.level == 0 {
225 // L0 is unsorted - must check all files
226 self.files
227 .iter()
228 .filter(|f| {
229 f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key
230 })
231 .collect()
232 } else {
233 // L1+ is sorted - binary search for overlapping files
234 let idx = self.files.partition_point(|f| f.largest_key.as_slice() < key);
235 if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
236 vec![&self.files[idx]]
237 } else {
238 vec![]
239 }
240 }
241 }
242
243 /// Find files that overlap with a key range
244 pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
245 if self.level == 0 {
246 // L0 is unsorted - check all files
247 self.files
248 .iter()
249 .filter(|f| f.overlaps_range(start, end))
250 .collect()
251 } else {
252 // L1+ is sorted - find range using binary search
253 let start_idx = self.files.partition_point(|f| f.largest_key.as_slice() < start);
254 let end_idx = if end.is_empty() {
255 self.files.len()
256 } else {
257 self.files.partition_point(|f| f.smallest_key.as_slice() <= end)
258 };
259 self.files[start_idx..end_idx].iter().collect()
260 }
261 }
262
263 /// Recalculate compaction score
264 pub fn update_compaction_score(&mut self) {
265 if self.target_size > 0 {
266 self.compaction_score = self.total_size as f64 / self.target_size as f64;
267 } else {
268 self.compaction_score = 0.0;
269 }
270 }
271}
272
273// =============================================================================
274// Immutable MemTable Reference
275// =============================================================================
276
277/// Reference to an immutable memtable (sealed, pending flush)
278#[derive(Debug, Clone)]
279pub struct ImmutableMemTableRef {
280 /// Unique ID for this immutable memtable
281 pub id: u64,
282 /// Sequence number when sealed
283 pub seal_seqno: u64,
284 /// Approximate size in bytes
285 pub size_bytes: u64,
286 /// Reference to the actual memtable data
287 /// This is opaque - the actual type is provided by the storage layer
288 pub data: Arc<dyn ImmutableMemTable>,
289}
290
291/// Trait for immutable memtable operations
292pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
293 /// Get a value at a specific sequence number
294 fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
295
296 /// Iterate over all entries
297 fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
298
299 /// Get approximate size
300 fn size(&self) -> u64;
301}
302
303// =============================================================================
304// SuperVersion - The Core Abstraction
305// =============================================================================
306
307/// SuperVersion: A consistent snapshot of all storage metadata
308///
309/// This is the key abstraction for lock-free reads. A SuperVersion contains
310/// Arc references to:
311/// - Current mutable memtable
312/// - List of immutable memtables (pending flush)
313/// - All SSTable levels and their file metadata
314///
315/// Readers acquire a SuperVersion (O(1) atomic load), use it for their entire
316/// read operation, then release it (O(1) Arc drop). No locks required.
317///
318/// Writers create a new SuperVersion with modified metadata and atomically
319/// swap it in. Old versions remain valid until all readers release them.
320#[derive(Debug, Clone)]
321pub struct SuperVersion {
322 /// Version number (monotonically increasing)
323 pub version_number: u64,
324
325 /// Current mutable memtable reference
326 /// Note: The memtable itself may have internal synchronization,
327 /// but the *reference* is immutable within a SuperVersion
328 pub memtable_version: u64,
329
330 /// Immutable memtables (oldest first)
331 pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
332
333 /// SSTable levels (L0, L1, L2, ...)
334 pub levels: Arc<Vec<LevelMetadata>>,
335
336 /// Minimum sequence number safe to garbage collect
337 /// Versions with seqno < this can be pruned during compaction
338 pub min_snapshot_seqno: u64,
339
340 /// Current log (WAL) number for crash recovery
341 pub log_number: u64,
342
343 /// Prev log number (for two-log protocol during flush)
344 pub prev_log_number: u64,
345
346 /// Next file number to allocate
347 pub next_file_number: u64,
348
349 /// Manifest file number
350 pub manifest_file_number: u64,
351}
352
353impl SuperVersion {
354 /// Create a new empty SuperVersion
355 pub fn new() -> Self {
356 Self {
357 version_number: 1,
358 memtable_version: 1,
359 immutable_memtables: Arc::new(Vec::new()),
360 levels: Arc::new(Vec::new()),
361 min_snapshot_seqno: 0,
362 log_number: 1,
363 prev_log_number: 0,
364 next_file_number: 2,
365 manifest_file_number: 1,
366 }
367 }
368
369 /// Create a new SuperVersion with updated immutable memtables
370 pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
371 let mut new_imms = (*self.immutable_memtables).clone();
372 new_imms.push(imm);
373
374 Self {
375 version_number: self.version_number + 1,
376 memtable_version: self.memtable_version + 1,
377 immutable_memtables: Arc::new(new_imms),
378 levels: Arc::clone(&self.levels),
379 min_snapshot_seqno: self.min_snapshot_seqno,
380 log_number: self.log_number,
381 prev_log_number: self.prev_log_number,
382 next_file_number: self.next_file_number,
383 manifest_file_number: self.manifest_file_number,
384 }
385 }
386
387 /// Create a new SuperVersion after flushing immutable memtables
388 pub fn with_flushed_memtables(
389 &self,
390 flushed_ids: &[u64],
391 new_files: Vec<(u32, Arc<FileMetadata>)>, // (level, file)
392 ) -> Self {
393 // Remove flushed immutable memtables
394 let new_imms: Vec<_> = self.immutable_memtables
395 .iter()
396 .filter(|imm| !flushed_ids.contains(&imm.id))
397 .cloned()
398 .collect();
399
400 // Add new files to levels
401 let mut new_levels = (*self.levels).clone();
402 for (level, file) in new_files {
403 // Ensure level exists
404 while new_levels.len() <= level as usize {
405 let target_size = self.level_target_size(new_levels.len() as u32);
406 new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
407 }
408
409 let lm = &mut new_levels[level as usize];
410 lm.total_size += file.file_size;
411 lm.files.push(file);
412 lm.update_compaction_score();
413 }
414
415 Self {
416 version_number: self.version_number + 1,
417 memtable_version: self.memtable_version,
418 immutable_memtables: Arc::new(new_imms),
419 levels: Arc::new(new_levels),
420 min_snapshot_seqno: self.min_snapshot_seqno,
421 log_number: self.log_number,
422 prev_log_number: self.prev_log_number,
423 next_file_number: self.next_file_number,
424 manifest_file_number: self.manifest_file_number,
425 }
426 }
427
428 /// Create a new SuperVersion after compaction
429 pub fn with_compaction_result(
430 &self,
431 input_files: &[(u32, u64)], // (level, file_number)
432 output_files: Vec<(u32, Arc<FileMetadata>)>,
433 ) -> Self {
434 let mut new_levels = (*self.levels).clone();
435
436 // Remove input files
437 for (level, file_num) in input_files {
438 if let Some(lm) = new_levels.get_mut(*level as usize) {
439 if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
440 let removed = lm.files.remove(pos);
441 lm.total_size -= removed.file_size;
442 }
443 }
444 }
445
446 // Add output files
447 for (level, file) in output_files {
448 while new_levels.len() <= level as usize {
449 let target_size = self.level_target_size(new_levels.len() as u32);
450 new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
451 }
452
453 let lm = &mut new_levels[level as usize];
454 lm.total_size += file.file_size;
455
456 // Insert in sorted order for L1+
457 if level > 0 {
458 let pos = lm.files.partition_point(|f| f.smallest_key < file.smallest_key);
459 lm.files.insert(pos, file);
460 } else {
461 lm.files.push(file);
462 }
463 lm.update_compaction_score();
464 }
465
466 Self {
467 version_number: self.version_number + 1,
468 memtable_version: self.memtable_version,
469 immutable_memtables: Arc::clone(&self.immutable_memtables),
470 levels: Arc::new(new_levels),
471 min_snapshot_seqno: self.min_snapshot_seqno,
472 log_number: self.log_number,
473 prev_log_number: self.prev_log_number,
474 next_file_number: self.next_file_number,
475 manifest_file_number: self.manifest_file_number,
476 }
477 }
478
479 /// Calculate target size for a level
480 fn level_target_size(&self, level: u32) -> u64 {
481 // Level 0: 64MB, Level 1: 256MB, each subsequent level 10x
482 match level {
483 0 => 64 * 1024 * 1024,
484 1 => 256 * 1024 * 1024,
485 _ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
486 }
487 }
488
489 /// Get total number of files across all levels
490 pub fn total_file_count(&self) -> usize {
491 self.levels.iter().map(|l| l.files.len()).sum()
492 }
493
494 /// Get level with highest compaction score
495 pub fn pick_compaction_level(&self) -> Option<u32> {
496 self.levels
497 .iter()
498 .filter(|l| l.compaction_score > 1.0)
499 .max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
500 .map(|l| l.level)
501 }
502}
503
504impl Default for SuperVersion {
505 fn default() -> Self {
506 Self::new()
507 }
508}
509
510// =============================================================================
511// VersionSet - The Top-Level Manager
512// =============================================================================
513
514/// VersionSet manages the current SuperVersion and provides atomic updates
515///
516/// This is the entry point for all metadata access. Readers call `get()` to
517/// acquire the current SuperVersion, writers call `install()` to atomically
518/// swap in a new version.
519///
520/// ## Thread Safety
521///
522/// - `get()`: Lock-free (atomic load)
523/// - `install()`: Serialized through internal mutex (writers must coordinate)
524///
525/// Multiple readers can proceed in parallel with zero synchronization.
526/// Writers are serialized to ensure consistent version progression.
527pub struct VersionSet {
528 /// Current SuperVersion (atomically swappable)
529 current: ArcSwap<SuperVersion>,
530
531 /// Version number counter
532 version_counter: AtomicU64,
533
534 /// Write serialization lock
535 /// Only one writer can modify the version at a time
536 write_lock: Mutex<()>,
537
538 /// Snapshot registry - tracks active snapshots for GC
539 snapshots: Mutex<BTreeMap<u64, u64>>, // seqno -> ref_count
540
541 /// Database directory
542 db_path: PathBuf,
543}
544
545impl VersionSet {
546 /// Create a new VersionSet
547 pub fn new(db_path: PathBuf) -> Self {
548 Self {
549 current: ArcSwap::from_pointee(SuperVersion::new()),
550 version_counter: AtomicU64::new(1),
551 write_lock: Mutex::new(()),
552 snapshots: Mutex::new(BTreeMap::new()),
553 db_path,
554 }
555 }
556
557 /// Get current SuperVersion (lock-free)
558 ///
559 /// This is the hot path for reads. Returns a Guard that holds an Arc
560 /// to the current SuperVersion. The Guard can be dereferenced to access
561 /// the SuperVersion.
562 ///
563 /// ## Performance
564 ///
565 /// - Time: O(1) atomic load
566 /// - No locks acquired
567 /// - No memory allocation
568 #[inline]
569 pub fn get(&self) -> Guard<Arc<SuperVersion>> {
570 self.current.load()
571 }
572
573 /// Get current SuperVersion as owned Arc
574 ///
575 /// Use this when you need to hold the SuperVersion across await points
576 /// or store it for later use.
577 #[inline]
578 pub fn get_arc(&self) -> Arc<SuperVersion> {
579 self.current.load_full()
580 }
581
582 /// Install a new SuperVersion (serialized)
583 ///
584 /// This atomically swaps the new version into place. The old version
585 /// remains valid for any readers that acquired it before the swap.
586 ///
587 /// ## Safety
588 ///
589 /// Only one writer should call this at a time. Use `with_write_lock()`
590 /// to serialize concurrent updates.
591 pub fn install(&self, new_version: SuperVersion) {
592 let _guard = self.write_lock.lock();
593 self.current.store(Arc::new(new_version));
594 self.version_counter.fetch_add(1, Ordering::SeqCst);
595 }
596
597 /// Execute a function with exclusive write access
598 ///
599 /// Use this to ensure atomic read-modify-write operations on the
600 /// version set. The function receives the current SuperVersion and
601 /// should return the new SuperVersion to install.
602 pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
603 where
604 F: FnOnce(&SuperVersion) -> SuperVersion,
605 {
606 let _guard = self.write_lock.lock();
607 let current = self.current.load();
608 let new_version = f(¤t);
609 self.current.store(Arc::new(new_version.clone()));
610 self.version_counter.fetch_add(1, Ordering::SeqCst);
611 new_version
612 }
613
614 /// Register a snapshot at the given sequence number
615 ///
616 /// Returns the snapshot sequence number for later release.
617 pub fn register_snapshot(&self, seqno: u64) -> u64 {
618 let mut snapshots = self.snapshots.lock();
619 *snapshots.entry(seqno).or_insert(0) += 1;
620 seqno
621 }
622
623 /// Release a snapshot
624 pub fn release_snapshot(&self, seqno: u64) {
625 let mut snapshots = self.snapshots.lock();
626 if let Some(count) = snapshots.get_mut(&seqno) {
627 *count -= 1;
628 if *count == 0 {
629 snapshots.remove(&seqno);
630 }
631 }
632 }
633
634 /// Get minimum sequence number that must be preserved
635 ///
636 /// Returns the oldest snapshot sequence number, or the current version's
637 /// min_snapshot_seqno if no snapshots are active.
638 pub fn min_preserved_seqno(&self) -> u64 {
639 let snapshots = self.snapshots.lock();
640 snapshots.keys().next().copied().unwrap_or_else(|| {
641 self.current.load().min_snapshot_seqno
642 })
643 }
644
645 /// Update min_snapshot_seqno based on active snapshots
646 pub fn update_min_snapshot_seqno(&self) {
647 let min_seqno = self.min_preserved_seqno();
648 self.with_write_lock(|current| {
649 SuperVersion {
650 min_snapshot_seqno: min_seqno,
651 version_number: current.version_number + 1,
652 ..current.clone()
653 }
654 });
655 }
656
657 /// Get database path
658 pub fn db_path(&self) -> &PathBuf {
659 &self.db_path
660 }
661
662 /// Get current version number
663 pub fn version_number(&self) -> u64 {
664 self.version_counter.load(Ordering::SeqCst)
665 }
666}
667
668impl std::fmt::Debug for VersionSet {
669 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670 let current = self.current.load();
671 f.debug_struct("VersionSet")
672 .field("version_number", ¤t.version_number)
673 .field("num_immutables", ¤t.immutable_memtables.len())
674 .field("num_levels", ¤t.levels.len())
675 .field("total_files", ¤t.total_file_count())
676 .field("db_path", &self.db_path)
677 .finish()
678 }
679}
680
681// =============================================================================
682// SuperVersionHandle - RAII Guard for SuperVersion
683// =============================================================================
684
685/// RAII handle for SuperVersion access
686///
687/// This provides a convenient way to access a SuperVersion with automatic
688/// cleanup. The handle can be cloned to share access across threads.
689pub struct SuperVersionHandle {
690 version: Arc<SuperVersion>,
691 version_set: Arc<VersionSet>,
692 snapshot_seqno: Option<u64>,
693}
694
695impl SuperVersionHandle {
696 /// Create a new handle from a VersionSet
697 pub fn new(version_set: Arc<VersionSet>) -> Self {
698 let version = version_set.get_arc();
699 Self {
700 version,
701 version_set,
702 snapshot_seqno: None,
703 }
704 }
705
706 /// Create a handle with a registered snapshot
707 pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
708 let registered_seqno = version_set.register_snapshot(seqno);
709 let version = version_set.get_arc();
710 Self {
711 version,
712 version_set,
713 snapshot_seqno: Some(registered_seqno),
714 }
715 }
716
717 /// Get the SuperVersion
718 #[inline]
719 pub fn version(&self) -> &SuperVersion {
720 &self.version
721 }
722
723 /// Get the snapshot sequence number (if any)
724 pub fn snapshot_seqno(&self) -> Option<u64> {
725 self.snapshot_seqno
726 }
727}
728
729impl Drop for SuperVersionHandle {
730 fn drop(&mut self) {
731 if let Some(seqno) = self.snapshot_seqno {
732 self.version_set.release_snapshot(seqno);
733 }
734 }
735}
736
737impl Clone for SuperVersionHandle {
738 fn clone(&self) -> Self {
739 // If we have a snapshot, register another reference
740 if let Some(seqno) = self.snapshot_seqno {
741 self.version_set.register_snapshot(seqno);
742 }
743 Self {
744 version: Arc::clone(&self.version),
745 version_set: Arc::clone(&self.version_set),
746 snapshot_seqno: self.snapshot_seqno,
747 }
748 }
749}
750
751// =============================================================================
752// Tests
753// =============================================================================
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758 use std::thread;
759
760 #[test]
761 fn test_superversion_creation() {
762 let sv = SuperVersion::new();
763 assert_eq!(sv.version_number, 1);
764 assert!(sv.immutable_memtables.is_empty());
765 assert!(sv.levels.is_empty());
766 }
767
768 #[test]
769 fn test_version_set_get() {
770 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
771 let sv = vs.get();
772 assert_eq!(sv.version_number, 1);
773 }
774
775 #[test]
776 fn test_version_set_install() {
777 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
778
779 let new_sv = SuperVersion {
780 version_number: 2,
781 ..SuperVersion::new()
782 };
783 vs.install(new_sv);
784
785 let sv = vs.get();
786 assert_eq!(sv.version_number, 2);
787 }
788
789 #[test]
790 fn test_concurrent_reads() {
791 let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
792 let mut handles = vec![];
793
794 for _ in 0..10 {
795 let vs_clone = Arc::clone(&vs);
796 handles.push(thread::spawn(move || {
797 for _ in 0..1000 {
798 let sv = vs_clone.get();
799 assert!(sv.version_number >= 1);
800 }
801 }));
802 }
803
804 for h in handles {
805 h.join().unwrap();
806 }
807 }
808
809 #[test]
810 fn test_snapshot_registry() {
811 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
812
813 // Register snapshots
814 vs.register_snapshot(100);
815 vs.register_snapshot(200);
816 vs.register_snapshot(100); // Duplicate
817
818 assert_eq!(vs.min_preserved_seqno(), 100);
819
820 // Release one reference to 100
821 vs.release_snapshot(100);
822 assert_eq!(vs.min_preserved_seqno(), 100); // Still have one ref
823
824 // Release second reference
825 vs.release_snapshot(100);
826 assert_eq!(vs.min_preserved_seqno(), 200);
827
828 // Release 200
829 vs.release_snapshot(200);
830 }
831
832 #[test]
833 fn test_level_binary_search() {
834 let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
835
836 // Add files in sorted order
837 for i in 0..10 {
838 let file = Arc::new(FileMetadata {
839 file_number: i as u64,
840 file_size: 1024,
841 smallest_key: format!("{:02}", i * 10).into_bytes(),
842 largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
843 num_entries: 100,
844 min_seqno: 1,
845 max_seqno: 100,
846 path: PathBuf::from(format!("/tmp/{}.sst", i)),
847 bloom_filter: None,
848 being_compacted: false,
849 });
850 level.files.push(file);
851 level.total_size += 1024;
852 }
853
854 // Test point lookup
855 let files = level.find_files_for_key(b"25");
856 assert_eq!(files.len(), 1);
857 assert_eq!(files[0].file_number, 2);
858
859 // Test range lookup
860 let files = level.find_files_for_range(b"15", b"35");
861 assert_eq!(files.len(), 3); // Files 1, 2, 3
862 }
863}