sochdb_storage/version_set.rs
1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SuperVersion Metadata + Copy-on-Write Version Set
16//!
17//! This module implements RocksDB-style SuperVersion metadata management for
18//! near lock-free reads. The key insight is that read paths only need a consistent
19//! snapshot of metadata (memtable, immutable memtables, SSTable levels), not
20//! exclusive access to the underlying data structures.
21//!
22//! ## Problem Analysis
23//!
24//! Previous implementation suffered from lock contention:
25//! - Foreground reads must traverse memtable (RwLock) → immutables → levels
26//! - Each level switch may acquire additional locks
27//! - Compaction/flush modifies metadata while readers hold references
28//! - Lock-order complexity leads to deadlock potential
29//!
30//! ## Solution: SuperVersion + ArcSwap
31//!
32//! ```text
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │ VersionSet │
35//! │ ┌─────────────────────────────────────────────────────────────┐│
36//! │ │ current: ArcSwap<SuperVersion> ◄─── Atomic swap (O(1)) ││
37//! │ └─────────────────────────────────────────────────────────────┘│
38//! │ │ │
39//! │ ▼ │
40//! │ ┌─────────────────────────────────────────────────────────────┐│
41//! │ │ SuperVersion ││
42//! │ │ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ ││
43//! │ │ │ memtable │ │ immutables │ │ level_files │ ││
44//! │ │ │ Arc<...> │ │ Arc<Vec<..>> │ │ Arc<Vec<Level>> │ ││
45//! │ │ └─────────────┘ └──────────────┘ └───────────────────┘ ││
46//! │ └─────────────────────────────────────────────────────────────┘│
47//! └─────────────────────────────────────────────────────────────────┘
48//! ```
49//!
50//! ## Read Path (Lock-Free)
51//!
52//! 1. Load current SuperVersion via atomic: `let sv = version_set.get();` // O(1)
53//! 2. Search memtable → immutables → levels using sv references
54//! 3. Release sv when done (Arc drop)
55//!
56//! No locks acquired. Readers are completely decoupled from writers.
57//!
58//! ## Write Path (Copy-on-Write)
59//!
60//! 1. Clone current SuperVersion's inner Arcs
61//! 2. Modify the new copy (e.g., add new immutable memtable)
62//! 3. Atomically swap new SuperVersion into place
63//! 4. Old SuperVersion kept alive by existing readers (Arc)
64//!
65//! ## Complexity Analysis
66//!
67//! | Operation | Old (with locks) | New (SuperVersion) |
68//! |--------------------|----------------------|---------------------------|
69//! | Read acquire | O(1) but contended | O(1) atomic load |
70//! | Read release | O(1) unlock | O(1) Arc decrement |
71//! | Metadata update | O(1) + lock wait | O(changed_metadata) clone |
72//! | Concurrent reads | Serialized on RwLock | Truly parallel |
73//!
74//! For 8 threads, expected speedup: ~6-8x on read-heavy workloads.
75
76use arc_swap::{ArcSwap, Guard};
77use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
78use parking_lot::Mutex;
79use std::collections::BTreeMap;
80use std::path::PathBuf;
81use std::sync::atomic::{AtomicU64, Ordering};
82use std::sync::Arc;
83
84// =============================================================================
85// File Metadata
86// =============================================================================
87
88/// Metadata for an SSTable file
89#[derive(Debug, Clone)]
90pub struct FileMetadata {
91 /// Unique file number
92 pub file_number: u64,
93 /// File size in bytes
94 pub file_size: u64,
95 /// Smallest key in file (for range queries)
96 pub smallest_key: Vec<u8>,
97 /// Largest key in file (for range queries)
98 pub largest_key: Vec<u8>,
99 /// Number of entries in file
100 pub num_entries: u64,
101 /// Minimum sequence number in file
102 pub min_seqno: u64,
103 /// Maximum sequence number in file
104 pub max_seqno: u64,
105 /// Path to the file
106 pub path: PathBuf,
107 /// Bloom filter (if loaded)
108 pub bloom_filter: Option<Arc<BloomFilterHandle>>,
109 /// Whether file is being compacted
110 pub being_compacted: bool,
111}
112
113impl FileMetadata {
114 /// Check if key might be in this file using bloom filter
115 #[inline]
116 pub fn may_contain(&self, key: &[u8]) -> bool {
117 match &self.bloom_filter {
118 Some(bf) => bf.may_contain(key),
119 None => true, // No filter = must check file
120 }
121 }
122
123 /// Check if a key range overlaps with this file
124 #[inline]
125 pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
126 if end.is_empty() {
127 // Unbounded end - overlaps if smallest_key <= end conceptually
128 self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
129 } else {
130 // Standard range overlap check
131 self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
132 }
133 }
134}
135
136/// Handle to a bloom filter (may be memory-mapped or in-memory)
137#[derive(Debug)]
138pub struct BloomFilterHandle {
139 /// The bloom filter bits
140 bits: Vec<u64>,
141 /// Number of hash functions
142 num_hashes: u32,
143}
144
145impl BloomFilterHandle {
146 /// Create a new bloom filter handle
147 pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
148 Self { bits, num_hashes }
149 }
150
151 /// Check if key may be present
152 #[inline]
153 pub fn may_contain(&self, key: &[u8]) -> bool {
154 if self.bits.is_empty() {
155 return true;
156 }
157 let num_bits = self.bits.len() * 64;
158 let h1 = Self::hash1(key);
159 let h2 = Self::hash2(key);
160
161 for i in 0..self.num_hashes {
162 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
163 let bit_idx = (h as usize) % num_bits;
164 let word_idx = bit_idx / 64;
165 let bit_pos = bit_idx % 64;
166 if self.bits[word_idx] & (1 << bit_pos) == 0 {
167 return false;
168 }
169 }
170 true
171 }
172
173 #[inline]
174 fn hash1(key: &[u8]) -> u64 {
175 use std::collections::hash_map::DefaultHasher;
176 use std::hash::{Hash, Hasher};
177 let mut hasher = DefaultHasher::new();
178 key.hash(&mut hasher);
179 hasher.finish()
180 }
181
182 #[inline]
183 fn hash2(key: &[u8]) -> u64 {
184 twox_hash::xxh3::hash64(key)
185 }
186}
187
188// =============================================================================
189// Level Metadata
190// =============================================================================
191
192/// Metadata for a single level in the LSM tree
193#[derive(Debug, Clone)]
194pub struct LevelMetadata {
195 /// Level number (0 = unsorted, 1+ = sorted)
196 pub level: u32,
197 /// Files in this level (sorted by smallest_key for L1+)
198 pub files: Vec<Arc<FileMetadata>>,
199 /// Total size of files in this level
200 pub total_size: u64,
201 /// Target size for this level (for compaction decisions)
202 pub target_size: u64,
203 /// Compaction score (size / target_size)
204 pub compaction_score: f64,
205}
206
207impl LevelMetadata {
208 /// Create a new empty level
209 pub fn new(level: u32, target_size: u64) -> Self {
210 Self {
211 level,
212 files: Vec::new(),
213 total_size: 0,
214 target_size,
215 compaction_score: 0.0,
216 }
217 }
218
219 /// Find files that may contain a key using binary search (for L1+)
220 pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
221 if self.level == 0 {
222 // L0 is unsorted - must check all files
223 self.files
224 .iter()
225 .filter(|f| {
226 f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key
227 })
228 .collect()
229 } else {
230 // L1+ is sorted - binary search for overlapping files
231 let idx = self.files.partition_point(|f| f.largest_key.as_slice() < key);
232 if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
233 vec![&self.files[idx]]
234 } else {
235 vec![]
236 }
237 }
238 }
239
240 /// Find files that overlap with a key range
241 pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
242 if self.level == 0 {
243 // L0 is unsorted - check all files
244 self.files
245 .iter()
246 .filter(|f| f.overlaps_range(start, end))
247 .collect()
248 } else {
249 // L1+ is sorted - find range using binary search
250 let start_idx = self.files.partition_point(|f| f.largest_key.as_slice() < start);
251 let end_idx = if end.is_empty() {
252 self.files.len()
253 } else {
254 self.files.partition_point(|f| f.smallest_key.as_slice() <= end)
255 };
256 self.files[start_idx..end_idx].iter().collect()
257 }
258 }
259
260 /// Recalculate compaction score
261 pub fn update_compaction_score(&mut self) {
262 if self.target_size > 0 {
263 self.compaction_score = self.total_size as f64 / self.target_size as f64;
264 } else {
265 self.compaction_score = 0.0;
266 }
267 }
268}
269
270// =============================================================================
271// Immutable MemTable Reference
272// =============================================================================
273
274/// Reference to an immutable memtable (sealed, pending flush)
275#[derive(Debug, Clone)]
276pub struct ImmutableMemTableRef {
277 /// Unique ID for this immutable memtable
278 pub id: u64,
279 /// Sequence number when sealed
280 pub seal_seqno: u64,
281 /// Approximate size in bytes
282 pub size_bytes: u64,
283 /// Reference to the actual memtable data
284 /// This is opaque - the actual type is provided by the storage layer
285 pub data: Arc<dyn ImmutableMemTable>,
286}
287
288/// Trait for immutable memtable operations
289pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
290 /// Get a value at a specific sequence number
291 fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
292
293 /// Iterate over all entries
294 fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
295
296 /// Get approximate size
297 fn size(&self) -> u64;
298}
299
300// =============================================================================
301// SuperVersion - The Core Abstraction
302// =============================================================================
303
304/// SuperVersion: A consistent snapshot of all storage metadata
305///
306/// This is the key abstraction for lock-free reads. A SuperVersion contains
307/// Arc references to:
308/// - Current mutable memtable
309/// - List of immutable memtables (pending flush)
310/// - All SSTable levels and their file metadata
311///
312/// Readers acquire a SuperVersion (O(1) atomic load), use it for their entire
313/// read operation, then release it (O(1) Arc drop). No locks required.
314///
315/// Writers create a new SuperVersion with modified metadata and atomically
316/// swap it in. Old versions remain valid until all readers release them.
317#[derive(Debug, Clone)]
318pub struct SuperVersion {
319 /// Version number (monotonically increasing)
320 pub version_number: u64,
321
322 /// Current mutable memtable reference
323 /// Note: The memtable itself may have internal synchronization,
324 /// but the *reference* is immutable within a SuperVersion
325 pub memtable_version: u64,
326
327 /// Immutable memtables (oldest first)
328 pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
329
330 /// SSTable levels (L0, L1, L2, ...)
331 pub levels: Arc<Vec<LevelMetadata>>,
332
333 /// Minimum sequence number safe to garbage collect
334 /// Versions with seqno < this can be pruned during compaction
335 pub min_snapshot_seqno: u64,
336
337 /// Current log (WAL) number for crash recovery
338 pub log_number: u64,
339
340 /// Prev log number (for two-log protocol during flush)
341 pub prev_log_number: u64,
342
343 /// Next file number to allocate
344 pub next_file_number: u64,
345
346 /// Manifest file number
347 pub manifest_file_number: u64,
348}
349
350impl SuperVersion {
351 /// Create a new empty SuperVersion
352 pub fn new() -> Self {
353 Self {
354 version_number: 1,
355 memtable_version: 1,
356 immutable_memtables: Arc::new(Vec::new()),
357 levels: Arc::new(Vec::new()),
358 min_snapshot_seqno: 0,
359 log_number: 1,
360 prev_log_number: 0,
361 next_file_number: 2,
362 manifest_file_number: 1,
363 }
364 }
365
366 /// Create a new SuperVersion with updated immutable memtables
367 pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
368 let mut new_imms = (*self.immutable_memtables).clone();
369 new_imms.push(imm);
370
371 Self {
372 version_number: self.version_number + 1,
373 memtable_version: self.memtable_version + 1,
374 immutable_memtables: Arc::new(new_imms),
375 levels: Arc::clone(&self.levels),
376 min_snapshot_seqno: self.min_snapshot_seqno,
377 log_number: self.log_number,
378 prev_log_number: self.prev_log_number,
379 next_file_number: self.next_file_number,
380 manifest_file_number: self.manifest_file_number,
381 }
382 }
383
384 /// Create a new SuperVersion after flushing immutable memtables
385 pub fn with_flushed_memtables(
386 &self,
387 flushed_ids: &[u64],
388 new_files: Vec<(u32, Arc<FileMetadata>)>, // (level, file)
389 ) -> Self {
390 // Remove flushed immutable memtables
391 let new_imms: Vec<_> = self.immutable_memtables
392 .iter()
393 .filter(|imm| !flushed_ids.contains(&imm.id))
394 .cloned()
395 .collect();
396
397 // Add new files to levels
398 let mut new_levels = (*self.levels).clone();
399 for (level, file) in new_files {
400 // Ensure level exists
401 while new_levels.len() <= level as usize {
402 let target_size = self.level_target_size(new_levels.len() as u32);
403 new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
404 }
405
406 let lm = &mut new_levels[level as usize];
407 lm.total_size += file.file_size;
408 lm.files.push(file);
409 lm.update_compaction_score();
410 }
411
412 Self {
413 version_number: self.version_number + 1,
414 memtable_version: self.memtable_version,
415 immutable_memtables: Arc::new(new_imms),
416 levels: Arc::new(new_levels),
417 min_snapshot_seqno: self.min_snapshot_seqno,
418 log_number: self.log_number,
419 prev_log_number: self.prev_log_number,
420 next_file_number: self.next_file_number,
421 manifest_file_number: self.manifest_file_number,
422 }
423 }
424
425 /// Create a new SuperVersion after compaction
426 pub fn with_compaction_result(
427 &self,
428 input_files: &[(u32, u64)], // (level, file_number)
429 output_files: Vec<(u32, Arc<FileMetadata>)>,
430 ) -> Self {
431 let mut new_levels = (*self.levels).clone();
432
433 // Remove input files
434 for (level, file_num) in input_files {
435 if let Some(lm) = new_levels.get_mut(*level as usize) {
436 if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
437 let removed = lm.files.remove(pos);
438 lm.total_size -= removed.file_size;
439 }
440 }
441 }
442
443 // Add output files
444 for (level, file) in output_files {
445 while new_levels.len() <= level as usize {
446 let target_size = self.level_target_size(new_levels.len() as u32);
447 new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
448 }
449
450 let lm = &mut new_levels[level as usize];
451 lm.total_size += file.file_size;
452
453 // Insert in sorted order for L1+
454 if level > 0 {
455 let pos = lm.files.partition_point(|f| f.smallest_key < file.smallest_key);
456 lm.files.insert(pos, file);
457 } else {
458 lm.files.push(file);
459 }
460 lm.update_compaction_score();
461 }
462
463 Self {
464 version_number: self.version_number + 1,
465 memtable_version: self.memtable_version,
466 immutable_memtables: Arc::clone(&self.immutable_memtables),
467 levels: Arc::new(new_levels),
468 min_snapshot_seqno: self.min_snapshot_seqno,
469 log_number: self.log_number,
470 prev_log_number: self.prev_log_number,
471 next_file_number: self.next_file_number,
472 manifest_file_number: self.manifest_file_number,
473 }
474 }
475
476 /// Calculate target size for a level
477 fn level_target_size(&self, level: u32) -> u64 {
478 // Level 0: 64MB, Level 1: 256MB, each subsequent level 10x
479 match level {
480 0 => 64 * 1024 * 1024,
481 1 => 256 * 1024 * 1024,
482 _ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
483 }
484 }
485
486 /// Get total number of files across all levels
487 pub fn total_file_count(&self) -> usize {
488 self.levels.iter().map(|l| l.files.len()).sum()
489 }
490
491 /// Get level with highest compaction score
492 pub fn pick_compaction_level(&self) -> Option<u32> {
493 self.levels
494 .iter()
495 .filter(|l| l.compaction_score > 1.0)
496 .max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
497 .map(|l| l.level)
498 }
499}
500
501impl Default for SuperVersion {
502 fn default() -> Self {
503 Self::new()
504 }
505}
506
507// =============================================================================
508// VersionSet - The Top-Level Manager
509// =============================================================================
510
511/// VersionSet manages the current SuperVersion and provides atomic updates
512///
513/// This is the entry point for all metadata access. Readers call `get()` to
514/// acquire the current SuperVersion, writers call `install()` to atomically
515/// swap in a new version.
516///
517/// ## Thread Safety
518///
519/// - `get()`: Lock-free (atomic load)
520/// - `install()`: Serialized through internal mutex (writers must coordinate)
521///
522/// Multiple readers can proceed in parallel with zero synchronization.
523/// Writers are serialized to ensure consistent version progression.
524pub struct VersionSet {
525 /// Current SuperVersion (atomically swappable)
526 current: ArcSwap<SuperVersion>,
527
528 /// Version number counter
529 version_counter: AtomicU64,
530
531 /// Write serialization lock
532 /// Only one writer can modify the version at a time
533 write_lock: Mutex<()>,
534
535 /// Snapshot registry - tracks active snapshots for GC
536 snapshots: Mutex<BTreeMap<u64, u64>>, // seqno -> ref_count
537
538 /// Database directory
539 db_path: PathBuf,
540}
541
542impl VersionSet {
543 /// Create a new VersionSet
544 pub fn new(db_path: PathBuf) -> Self {
545 Self {
546 current: ArcSwap::from_pointee(SuperVersion::new()),
547 version_counter: AtomicU64::new(1),
548 write_lock: Mutex::new(()),
549 snapshots: Mutex::new(BTreeMap::new()),
550 db_path,
551 }
552 }
553
554 /// Get current SuperVersion (lock-free)
555 ///
556 /// This is the hot path for reads. Returns a Guard that holds an Arc
557 /// to the current SuperVersion. The Guard can be dereferenced to access
558 /// the SuperVersion.
559 ///
560 /// ## Performance
561 ///
562 /// - Time: O(1) atomic load
563 /// - No locks acquired
564 /// - No memory allocation
565 #[inline]
566 pub fn get(&self) -> Guard<Arc<SuperVersion>> {
567 self.current.load()
568 }
569
570 /// Get current SuperVersion as owned Arc
571 ///
572 /// Use this when you need to hold the SuperVersion across await points
573 /// or store it for later use.
574 #[inline]
575 pub fn get_arc(&self) -> Arc<SuperVersion> {
576 self.current.load_full()
577 }
578
579 /// Install a new SuperVersion (serialized)
580 ///
581 /// This atomically swaps the new version into place. The old version
582 /// remains valid for any readers that acquired it before the swap.
583 ///
584 /// ## Safety
585 ///
586 /// Only one writer should call this at a time. Use `with_write_lock()`
587 /// to serialize concurrent updates.
588 pub fn install(&self, new_version: SuperVersion) {
589 let _guard = self.write_lock.lock();
590 self.current.store(Arc::new(new_version));
591 self.version_counter.fetch_add(1, Ordering::SeqCst);
592 }
593
594 /// Execute a function with exclusive write access
595 ///
596 /// Use this to ensure atomic read-modify-write operations on the
597 /// version set. The function receives the current SuperVersion and
598 /// should return the new SuperVersion to install.
599 pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
600 where
601 F: FnOnce(&SuperVersion) -> SuperVersion,
602 {
603 let _guard = self.write_lock.lock();
604 let current = self.current.load();
605 let new_version = f(¤t);
606 self.current.store(Arc::new(new_version.clone()));
607 self.version_counter.fetch_add(1, Ordering::SeqCst);
608 new_version
609 }
610
611 /// Register a snapshot at the given sequence number
612 ///
613 /// Returns the snapshot sequence number for later release.
614 pub fn register_snapshot(&self, seqno: u64) -> u64 {
615 let mut snapshots = self.snapshots.lock();
616 *snapshots.entry(seqno).or_insert(0) += 1;
617 seqno
618 }
619
620 /// Release a snapshot
621 pub fn release_snapshot(&self, seqno: u64) {
622 let mut snapshots = self.snapshots.lock();
623 if let Some(count) = snapshots.get_mut(&seqno) {
624 *count -= 1;
625 if *count == 0 {
626 snapshots.remove(&seqno);
627 }
628 }
629 }
630
631 /// Get minimum sequence number that must be preserved
632 ///
633 /// Returns the oldest snapshot sequence number, or the current version's
634 /// min_snapshot_seqno if no snapshots are active.
635 pub fn min_preserved_seqno(&self) -> u64 {
636 let snapshots = self.snapshots.lock();
637 snapshots.keys().next().copied().unwrap_or_else(|| {
638 self.current.load().min_snapshot_seqno
639 })
640 }
641
642 /// Update min_snapshot_seqno based on active snapshots
643 pub fn update_min_snapshot_seqno(&self) {
644 let min_seqno = self.min_preserved_seqno();
645 self.with_write_lock(|current| {
646 SuperVersion {
647 min_snapshot_seqno: min_seqno,
648 version_number: current.version_number + 1,
649 ..current.clone()
650 }
651 });
652 }
653
654 /// Get database path
655 pub fn db_path(&self) -> &PathBuf {
656 &self.db_path
657 }
658
659 /// Get current version number
660 pub fn version_number(&self) -> u64 {
661 self.version_counter.load(Ordering::SeqCst)
662 }
663}
664
665impl std::fmt::Debug for VersionSet {
666 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
667 let current = self.current.load();
668 f.debug_struct("VersionSet")
669 .field("version_number", ¤t.version_number)
670 .field("num_immutables", ¤t.immutable_memtables.len())
671 .field("num_levels", ¤t.levels.len())
672 .field("total_files", ¤t.total_file_count())
673 .field("db_path", &self.db_path)
674 .finish()
675 }
676}
677
678// =============================================================================
679// SuperVersionHandle - RAII Guard for SuperVersion
680// =============================================================================
681
682/// RAII handle for SuperVersion access
683///
684/// This provides a convenient way to access a SuperVersion with automatic
685/// cleanup. The handle can be cloned to share access across threads.
686pub struct SuperVersionHandle {
687 version: Arc<SuperVersion>,
688 version_set: Arc<VersionSet>,
689 snapshot_seqno: Option<u64>,
690}
691
692impl SuperVersionHandle {
693 /// Create a new handle from a VersionSet
694 pub fn new(version_set: Arc<VersionSet>) -> Self {
695 let version = version_set.get_arc();
696 Self {
697 version,
698 version_set,
699 snapshot_seqno: None,
700 }
701 }
702
703 /// Create a handle with a registered snapshot
704 pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
705 let registered_seqno = version_set.register_snapshot(seqno);
706 let version = version_set.get_arc();
707 Self {
708 version,
709 version_set,
710 snapshot_seqno: Some(registered_seqno),
711 }
712 }
713
714 /// Get the SuperVersion
715 #[inline]
716 pub fn version(&self) -> &SuperVersion {
717 &self.version
718 }
719
720 /// Get the snapshot sequence number (if any)
721 pub fn snapshot_seqno(&self) -> Option<u64> {
722 self.snapshot_seqno
723 }
724}
725
726impl Drop for SuperVersionHandle {
727 fn drop(&mut self) {
728 if let Some(seqno) = self.snapshot_seqno {
729 self.version_set.release_snapshot(seqno);
730 }
731 }
732}
733
734impl Clone for SuperVersionHandle {
735 fn clone(&self) -> Self {
736 // If we have a snapshot, register another reference
737 if let Some(seqno) = self.snapshot_seqno {
738 self.version_set.register_snapshot(seqno);
739 }
740 Self {
741 version: Arc::clone(&self.version),
742 version_set: Arc::clone(&self.version_set),
743 snapshot_seqno: self.snapshot_seqno,
744 }
745 }
746}
747
748// =============================================================================
749// Tests
750// =============================================================================
751
752#[cfg(test)]
753mod tests {
754 use super::*;
755 use std::thread;
756
757 #[test]
758 fn test_superversion_creation() {
759 let sv = SuperVersion::new();
760 assert_eq!(sv.version_number, 1);
761 assert!(sv.immutable_memtables.is_empty());
762 assert!(sv.levels.is_empty());
763 }
764
765 #[test]
766 fn test_version_set_get() {
767 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
768 let sv = vs.get();
769 assert_eq!(sv.version_number, 1);
770 }
771
772 #[test]
773 fn test_version_set_install() {
774 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
775
776 let new_sv = SuperVersion {
777 version_number: 2,
778 ..SuperVersion::new()
779 };
780 vs.install(new_sv);
781
782 let sv = vs.get();
783 assert_eq!(sv.version_number, 2);
784 }
785
786 #[test]
787 fn test_concurrent_reads() {
788 let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
789 let mut handles = vec![];
790
791 for _ in 0..10 {
792 let vs_clone = Arc::clone(&vs);
793 handles.push(thread::spawn(move || {
794 for _ in 0..1000 {
795 let sv = vs_clone.get();
796 assert!(sv.version_number >= 1);
797 }
798 }));
799 }
800
801 for h in handles {
802 h.join().unwrap();
803 }
804 }
805
806 #[test]
807 fn test_snapshot_registry() {
808 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
809
810 // Register snapshots
811 vs.register_snapshot(100);
812 vs.register_snapshot(200);
813 vs.register_snapshot(100); // Duplicate
814
815 assert_eq!(vs.min_preserved_seqno(), 100);
816
817 // Release one reference to 100
818 vs.release_snapshot(100);
819 assert_eq!(vs.min_preserved_seqno(), 100); // Still have one ref
820
821 // Release second reference
822 vs.release_snapshot(100);
823 assert_eq!(vs.min_preserved_seqno(), 200);
824
825 // Release 200
826 vs.release_snapshot(200);
827 }
828
829 #[test]
830 fn test_level_binary_search() {
831 let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
832
833 // Add files in sorted order
834 for i in 0..10 {
835 let file = Arc::new(FileMetadata {
836 file_number: i as u64,
837 file_size: 1024,
838 smallest_key: format!("{:02}", i * 10).into_bytes(),
839 largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
840 num_entries: 100,
841 min_seqno: 1,
842 max_seqno: 100,
843 path: PathBuf::from(format!("/tmp/{}.sst", i)),
844 bloom_filter: None,
845 being_compacted: false,
846 });
847 level.files.push(file);
848 level.total_size += 1024;
849 }
850
851 // Test point lookup
852 let files = level.find_files_for_key(b"25");
853 assert_eq!(files.len(), 1);
854 assert_eq!(files[0].file_number, 2);
855
856 // Test range lookup
857 let files = level.find_files_for_range(b"15", b"35");
858 assert_eq!(files.len(), 3); // Files 1, 2, 3
859 }
860}