sochdb_storage/
version_set.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! SuperVersion Metadata + Copy-on-Write Version Set
16//!
17//! This module implements RocksDB-style SuperVersion metadata management for
18//! near lock-free reads. The key insight is that read paths only need a consistent
19//! snapshot of metadata (memtable, immutable memtables, SSTable levels), not
20//! exclusive access to the underlying data structures.
21//!
22//! ## Problem Analysis
23//!
24//! Previous implementation suffered from lock contention:
25//! - Foreground reads must traverse memtable (RwLock) → immutables → levels
26//! - Each level switch may acquire additional locks
27//! - Compaction/flush modifies metadata while readers hold references
28//! - Lock-order complexity leads to deadlock potential
29//!
30//! ## Solution: SuperVersion + ArcSwap
31//!
32//! ```text
33//! ┌─────────────────────────────────────────────────────────────────┐
34//! │                       VersionSet                                 │
35//! │  ┌─────────────────────────────────────────────────────────────┐│
36//! │  │ current: ArcSwap<SuperVersion>  ◄─── Atomic swap (O(1))     ││
37//! │  └─────────────────────────────────────────────────────────────┘│
38//! │                            │                                     │
39//! │                            ▼                                     │
40//! │  ┌─────────────────────────────────────────────────────────────┐│
41//! │  │                    SuperVersion                              ││
42//! │  │  ┌─────────────┐  ┌──────────────┐  ┌───────────────────┐  ││
43//! │  │  │ memtable    │  │ immutables   │  │ level_files       │  ││
44//! │  │  │ Arc<...>    │  │ Arc<Vec<..>> │  │ Arc<Vec<Level>>   │  ││
45//! │  │  └─────────────┘  └──────────────┘  └───────────────────┘  ││
46//! │  └─────────────────────────────────────────────────────────────┘│
47//! └─────────────────────────────────────────────────────────────────┘
48//! ```
49//!
50//! ## Read Path (Lock-Free)
51//!
52//! 1. Load current SuperVersion via atomic: `let sv = version_set.get();` // O(1)
53//! 2. Search memtable → immutables → levels using sv references
54//! 3. Release sv when done (Arc drop)
55//!
56//! No locks acquired. Readers are completely decoupled from writers.
57//!
58//! ## Write Path (Copy-on-Write)
59//!
60//! 1. Clone current SuperVersion's inner Arcs
61//! 2. Modify the new copy (e.g., add new immutable memtable)
62//! 3. Atomically swap new SuperVersion into place
63//! 4. Old SuperVersion kept alive by existing readers (Arc)
64//!
65//! ## Complexity Analysis
66//!
67//! | Operation          | Old (with locks)     | New (SuperVersion)        |
68//! |--------------------|----------------------|---------------------------|
69//! | Read acquire       | O(1) but contended   | O(1) atomic load          |
70//! | Read release       | O(1) unlock          | O(1) Arc decrement        |
71//! | Metadata update    | O(1) + lock wait     | O(changed_metadata) clone |
72//! | Concurrent reads   | Serialized on RwLock | Truly parallel            |
73//!
74//! For 8 threads, expected speedup: ~6-8x on read-heavy workloads.
75
76use arc_swap::{ArcSwap, Guard};
77use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
78use parking_lot::Mutex;
79use std::collections::BTreeMap;
80use std::path::PathBuf;
81use std::sync::atomic::{AtomicU64, Ordering};
82use std::sync::Arc;
83
84// =============================================================================
85// File Metadata
86// =============================================================================
87
88/// Metadata for an SSTable file
89#[derive(Debug, Clone)]
90pub struct FileMetadata {
91    /// Unique file number
92    pub file_number: u64,
93    /// File size in bytes
94    pub file_size: u64,
95    /// Smallest key in file (for range queries)
96    pub smallest_key: Vec<u8>,
97    /// Largest key in file (for range queries)
98    pub largest_key: Vec<u8>,
99    /// Number of entries in file
100    pub num_entries: u64,
101    /// Minimum sequence number in file
102    pub min_seqno: u64,
103    /// Maximum sequence number in file
104    pub max_seqno: u64,
105    /// Path to the file
106    pub path: PathBuf,
107    /// Bloom filter (if loaded)
108    pub bloom_filter: Option<Arc<BloomFilterHandle>>,
109    /// Whether file is being compacted
110    pub being_compacted: bool,
111}
112
113impl FileMetadata {
114    /// Check if key might be in this file using bloom filter
115    #[inline]
116    pub fn may_contain(&self, key: &[u8]) -> bool {
117        match &self.bloom_filter {
118            Some(bf) => bf.may_contain(key),
119            None => true, // No filter = must check file
120        }
121    }
122
123    /// Check if a key range overlaps with this file
124    #[inline]
125    pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
126        if end.is_empty() {
127            // Unbounded end - overlaps if smallest_key <= end conceptually
128            self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
129        } else {
130            // Standard range overlap check
131            self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
132        }
133    }
134}
135
136/// Handle to a bloom filter (may be memory-mapped or in-memory)
137#[derive(Debug)]
138pub struct BloomFilterHandle {
139    /// The bloom filter bits
140    bits: Vec<u64>,
141    /// Number of hash functions
142    num_hashes: u32,
143}
144
145impl BloomFilterHandle {
146    /// Create a new bloom filter handle
147    pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
148        Self { bits, num_hashes }
149    }
150
151    /// Check if key may be present
152    #[inline]
153    pub fn may_contain(&self, key: &[u8]) -> bool {
154        if self.bits.is_empty() {
155            return true;
156        }
157        let num_bits = self.bits.len() * 64;
158        let h1 = Self::hash1(key);
159        let h2 = Self::hash2(key);
160
161        for i in 0..self.num_hashes {
162            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
163            let bit_idx = (h as usize) % num_bits;
164            let word_idx = bit_idx / 64;
165            let bit_pos = bit_idx % 64;
166            if self.bits[word_idx] & (1 << bit_pos) == 0 {
167                return false;
168            }
169        }
170        true
171    }
172
173    #[inline]
174    fn hash1(key: &[u8]) -> u64 {
175        use std::collections::hash_map::DefaultHasher;
176        use std::hash::{Hash, Hasher};
177        let mut hasher = DefaultHasher::new();
178        key.hash(&mut hasher);
179        hasher.finish()
180    }
181
182    #[inline]
183    fn hash2(key: &[u8]) -> u64 {
184        twox_hash::xxh3::hash64(key)
185    }
186}
187
188// =============================================================================
189// Level Metadata
190// =============================================================================
191
192/// Metadata for a single level in the LSM tree
193#[derive(Debug, Clone)]
194pub struct LevelMetadata {
195    /// Level number (0 = unsorted, 1+ = sorted)
196    pub level: u32,
197    /// Files in this level (sorted by smallest_key for L1+)
198    pub files: Vec<Arc<FileMetadata>>,
199    /// Total size of files in this level
200    pub total_size: u64,
201    /// Target size for this level (for compaction decisions)
202    pub target_size: u64,
203    /// Compaction score (size / target_size)
204    pub compaction_score: f64,
205}
206
207impl LevelMetadata {
208    /// Create a new empty level
209    pub fn new(level: u32, target_size: u64) -> Self {
210        Self {
211            level,
212            files: Vec::new(),
213            total_size: 0,
214            target_size,
215            compaction_score: 0.0,
216        }
217    }
218
219    /// Find files that may contain a key using binary search (for L1+)
220    pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
221        if self.level == 0 {
222            // L0 is unsorted - must check all files
223            self.files
224                .iter()
225                .filter(|f| {
226                    f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key
227                })
228                .collect()
229        } else {
230            // L1+ is sorted - binary search for overlapping files
231            let idx = self.files.partition_point(|f| f.largest_key.as_slice() < key);
232            if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
233                vec![&self.files[idx]]
234            } else {
235                vec![]
236            }
237        }
238    }
239
240    /// Find files that overlap with a key range
241    pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
242        if self.level == 0 {
243            // L0 is unsorted - check all files
244            self.files
245                .iter()
246                .filter(|f| f.overlaps_range(start, end))
247                .collect()
248        } else {
249            // L1+ is sorted - find range using binary search
250            let start_idx = self.files.partition_point(|f| f.largest_key.as_slice() < start);
251            let end_idx = if end.is_empty() {
252                self.files.len()
253            } else {
254                self.files.partition_point(|f| f.smallest_key.as_slice() <= end)
255            };
256            self.files[start_idx..end_idx].iter().collect()
257        }
258    }
259
260    /// Recalculate compaction score
261    pub fn update_compaction_score(&mut self) {
262        if self.target_size > 0 {
263            self.compaction_score = self.total_size as f64 / self.target_size as f64;
264        } else {
265            self.compaction_score = 0.0;
266        }
267    }
268}
269
270// =============================================================================
271// Immutable MemTable Reference
272// =============================================================================
273
274/// Reference to an immutable memtable (sealed, pending flush)
275#[derive(Debug, Clone)]
276pub struct ImmutableMemTableRef {
277    /// Unique ID for this immutable memtable
278    pub id: u64,
279    /// Sequence number when sealed
280    pub seal_seqno: u64,
281    /// Approximate size in bytes
282    pub size_bytes: u64,
283    /// Reference to the actual memtable data
284    /// This is opaque - the actual type is provided by the storage layer
285    pub data: Arc<dyn ImmutableMemTable>,
286}
287
288/// Trait for immutable memtable operations
289pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
290    /// Get a value at a specific sequence number
291    fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
292    
293    /// Iterate over all entries
294    fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
295    
296    /// Get approximate size
297    fn size(&self) -> u64;
298}
299
300// =============================================================================
301// SuperVersion - The Core Abstraction
302// =============================================================================
303
304/// SuperVersion: A consistent snapshot of all storage metadata
305///
306/// This is the key abstraction for lock-free reads. A SuperVersion contains
307/// Arc references to:
308/// - Current mutable memtable
309/// - List of immutable memtables (pending flush)
310/// - All SSTable levels and their file metadata
311///
312/// Readers acquire a SuperVersion (O(1) atomic load), use it for their entire
313/// read operation, then release it (O(1) Arc drop). No locks required.
314///
315/// Writers create a new SuperVersion with modified metadata and atomically
316/// swap it in. Old versions remain valid until all readers release them.
317#[derive(Debug, Clone)]
318pub struct SuperVersion {
319    /// Version number (monotonically increasing)
320    pub version_number: u64,
321    
322    /// Current mutable memtable reference
323    /// Note: The memtable itself may have internal synchronization,
324    /// but the *reference* is immutable within a SuperVersion
325    pub memtable_version: u64,
326    
327    /// Immutable memtables (oldest first)
328    pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
329    
330    /// SSTable levels (L0, L1, L2, ...)
331    pub levels: Arc<Vec<LevelMetadata>>,
332    
333    /// Minimum sequence number safe to garbage collect
334    /// Versions with seqno < this can be pruned during compaction
335    pub min_snapshot_seqno: u64,
336    
337    /// Current log (WAL) number for crash recovery
338    pub log_number: u64,
339    
340    /// Prev log number (for two-log protocol during flush)
341    pub prev_log_number: u64,
342    
343    /// Next file number to allocate
344    pub next_file_number: u64,
345    
346    /// Manifest file number
347    pub manifest_file_number: u64,
348}
349
350impl SuperVersion {
351    /// Create a new empty SuperVersion
352    pub fn new() -> Self {
353        Self {
354            version_number: 1,
355            memtable_version: 1,
356            immutable_memtables: Arc::new(Vec::new()),
357            levels: Arc::new(Vec::new()),
358            min_snapshot_seqno: 0,
359            log_number: 1,
360            prev_log_number: 0,
361            next_file_number: 2,
362            manifest_file_number: 1,
363        }
364    }
365
366    /// Create a new SuperVersion with updated immutable memtables
367    pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
368        let mut new_imms = (*self.immutable_memtables).clone();
369        new_imms.push(imm);
370        
371        Self {
372            version_number: self.version_number + 1,
373            memtable_version: self.memtable_version + 1,
374            immutable_memtables: Arc::new(new_imms),
375            levels: Arc::clone(&self.levels),
376            min_snapshot_seqno: self.min_snapshot_seqno,
377            log_number: self.log_number,
378            prev_log_number: self.prev_log_number,
379            next_file_number: self.next_file_number,
380            manifest_file_number: self.manifest_file_number,
381        }
382    }
383
384    /// Create a new SuperVersion after flushing immutable memtables
385    pub fn with_flushed_memtables(
386        &self,
387        flushed_ids: &[u64],
388        new_files: Vec<(u32, Arc<FileMetadata>)>, // (level, file)
389    ) -> Self {
390        // Remove flushed immutable memtables
391        let new_imms: Vec<_> = self.immutable_memtables
392            .iter()
393            .filter(|imm| !flushed_ids.contains(&imm.id))
394            .cloned()
395            .collect();
396        
397        // Add new files to levels
398        let mut new_levels = (*self.levels).clone();
399        for (level, file) in new_files {
400            // Ensure level exists
401            while new_levels.len() <= level as usize {
402                let target_size = self.level_target_size(new_levels.len() as u32);
403                new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
404            }
405            
406            let lm = &mut new_levels[level as usize];
407            lm.total_size += file.file_size;
408            lm.files.push(file);
409            lm.update_compaction_score();
410        }
411        
412        Self {
413            version_number: self.version_number + 1,
414            memtable_version: self.memtable_version,
415            immutable_memtables: Arc::new(new_imms),
416            levels: Arc::new(new_levels),
417            min_snapshot_seqno: self.min_snapshot_seqno,
418            log_number: self.log_number,
419            prev_log_number: self.prev_log_number,
420            next_file_number: self.next_file_number,
421            manifest_file_number: self.manifest_file_number,
422        }
423    }
424
425    /// Create a new SuperVersion after compaction
426    pub fn with_compaction_result(
427        &self,
428        input_files: &[(u32, u64)], // (level, file_number)
429        output_files: Vec<(u32, Arc<FileMetadata>)>,
430    ) -> Self {
431        let mut new_levels = (*self.levels).clone();
432        
433        // Remove input files
434        for (level, file_num) in input_files {
435            if let Some(lm) = new_levels.get_mut(*level as usize) {
436                if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
437                    let removed = lm.files.remove(pos);
438                    lm.total_size -= removed.file_size;
439                }
440            }
441        }
442        
443        // Add output files
444        for (level, file) in output_files {
445            while new_levels.len() <= level as usize {
446                let target_size = self.level_target_size(new_levels.len() as u32);
447                new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
448            }
449            
450            let lm = &mut new_levels[level as usize];
451            lm.total_size += file.file_size;
452            
453            // Insert in sorted order for L1+
454            if level > 0 {
455                let pos = lm.files.partition_point(|f| f.smallest_key < file.smallest_key);
456                lm.files.insert(pos, file);
457            } else {
458                lm.files.push(file);
459            }
460            lm.update_compaction_score();
461        }
462        
463        Self {
464            version_number: self.version_number + 1,
465            memtable_version: self.memtable_version,
466            immutable_memtables: Arc::clone(&self.immutable_memtables),
467            levels: Arc::new(new_levels),
468            min_snapshot_seqno: self.min_snapshot_seqno,
469            log_number: self.log_number,
470            prev_log_number: self.prev_log_number,
471            next_file_number: self.next_file_number,
472            manifest_file_number: self.manifest_file_number,
473        }
474    }
475
476    /// Calculate target size for a level
477    fn level_target_size(&self, level: u32) -> u64 {
478        // Level 0: 64MB, Level 1: 256MB, each subsequent level 10x
479        match level {
480            0 => 64 * 1024 * 1024,
481            1 => 256 * 1024 * 1024,
482            _ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
483        }
484    }
485
486    /// Get total number of files across all levels
487    pub fn total_file_count(&self) -> usize {
488        self.levels.iter().map(|l| l.files.len()).sum()
489    }
490
491    /// Get level with highest compaction score
492    pub fn pick_compaction_level(&self) -> Option<u32> {
493        self.levels
494            .iter()
495            .filter(|l| l.compaction_score > 1.0)
496            .max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
497            .map(|l| l.level)
498    }
499}
500
501impl Default for SuperVersion {
502    fn default() -> Self {
503        Self::new()
504    }
505}
506
507// =============================================================================
508// VersionSet - The Top-Level Manager
509// =============================================================================
510
511/// VersionSet manages the current SuperVersion and provides atomic updates
512///
513/// This is the entry point for all metadata access. Readers call `get()` to
514/// acquire the current SuperVersion, writers call `install()` to atomically
515/// swap in a new version.
516///
517/// ## Thread Safety
518///
519/// - `get()`: Lock-free (atomic load)
520/// - `install()`: Serialized through internal mutex (writers must coordinate)
521///
522/// Multiple readers can proceed in parallel with zero synchronization.
523/// Writers are serialized to ensure consistent version progression.
524pub struct VersionSet {
525    /// Current SuperVersion (atomically swappable)
526    current: ArcSwap<SuperVersion>,
527    
528    /// Version number counter
529    version_counter: AtomicU64,
530    
531    /// Write serialization lock
532    /// Only one writer can modify the version at a time
533    write_lock: Mutex<()>,
534    
535    /// Snapshot registry - tracks active snapshots for GC
536    snapshots: Mutex<BTreeMap<u64, u64>>, // seqno -> ref_count
537    
538    /// Database directory
539    db_path: PathBuf,
540}
541
542impl VersionSet {
543    /// Create a new VersionSet
544    pub fn new(db_path: PathBuf) -> Self {
545        Self {
546            current: ArcSwap::from_pointee(SuperVersion::new()),
547            version_counter: AtomicU64::new(1),
548            write_lock: Mutex::new(()),
549            snapshots: Mutex::new(BTreeMap::new()),
550            db_path,
551        }
552    }
553
554    /// Get current SuperVersion (lock-free)
555    ///
556    /// This is the hot path for reads. Returns a Guard that holds an Arc
557    /// to the current SuperVersion. The Guard can be dereferenced to access
558    /// the SuperVersion.
559    ///
560    /// ## Performance
561    ///
562    /// - Time: O(1) atomic load
563    /// - No locks acquired
564    /// - No memory allocation
565    #[inline]
566    pub fn get(&self) -> Guard<Arc<SuperVersion>> {
567        self.current.load()
568    }
569
570    /// Get current SuperVersion as owned Arc
571    ///
572    /// Use this when you need to hold the SuperVersion across await points
573    /// or store it for later use.
574    #[inline]
575    pub fn get_arc(&self) -> Arc<SuperVersion> {
576        self.current.load_full()
577    }
578
579    /// Install a new SuperVersion (serialized)
580    ///
581    /// This atomically swaps the new version into place. The old version
582    /// remains valid for any readers that acquired it before the swap.
583    ///
584    /// ## Safety
585    ///
586    /// Only one writer should call this at a time. Use `with_write_lock()`
587    /// to serialize concurrent updates.
588    pub fn install(&self, new_version: SuperVersion) {
589        let _guard = self.write_lock.lock();
590        self.current.store(Arc::new(new_version));
591        self.version_counter.fetch_add(1, Ordering::SeqCst);
592    }
593
594    /// Execute a function with exclusive write access
595    ///
596    /// Use this to ensure atomic read-modify-write operations on the
597    /// version set. The function receives the current SuperVersion and
598    /// should return the new SuperVersion to install.
599    pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
600    where
601        F: FnOnce(&SuperVersion) -> SuperVersion,
602    {
603        let _guard = self.write_lock.lock();
604        let current = self.current.load();
605        let new_version = f(&current);
606        self.current.store(Arc::new(new_version.clone()));
607        self.version_counter.fetch_add(1, Ordering::SeqCst);
608        new_version
609    }
610
611    /// Register a snapshot at the given sequence number
612    ///
613    /// Returns the snapshot sequence number for later release.
614    pub fn register_snapshot(&self, seqno: u64) -> u64 {
615        let mut snapshots = self.snapshots.lock();
616        *snapshots.entry(seqno).or_insert(0) += 1;
617        seqno
618    }
619
620    /// Release a snapshot
621    pub fn release_snapshot(&self, seqno: u64) {
622        let mut snapshots = self.snapshots.lock();
623        if let Some(count) = snapshots.get_mut(&seqno) {
624            *count -= 1;
625            if *count == 0 {
626                snapshots.remove(&seqno);
627            }
628        }
629    }
630
631    /// Get minimum sequence number that must be preserved
632    ///
633    /// Returns the oldest snapshot sequence number, or the current version's
634    /// min_snapshot_seqno if no snapshots are active.
635    pub fn min_preserved_seqno(&self) -> u64 {
636        let snapshots = self.snapshots.lock();
637        snapshots.keys().next().copied().unwrap_or_else(|| {
638            self.current.load().min_snapshot_seqno
639        })
640    }
641
642    /// Update min_snapshot_seqno based on active snapshots
643    pub fn update_min_snapshot_seqno(&self) {
644        let min_seqno = self.min_preserved_seqno();
645        self.with_write_lock(|current| {
646            SuperVersion {
647                min_snapshot_seqno: min_seqno,
648                version_number: current.version_number + 1,
649                ..current.clone()
650            }
651        });
652    }
653
654    /// Get database path
655    pub fn db_path(&self) -> &PathBuf {
656        &self.db_path
657    }
658
659    /// Get current version number
660    pub fn version_number(&self) -> u64 {
661        self.version_counter.load(Ordering::SeqCst)
662    }
663}
664
665impl std::fmt::Debug for VersionSet {
666    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
667        let current = self.current.load();
668        f.debug_struct("VersionSet")
669            .field("version_number", &current.version_number)
670            .field("num_immutables", &current.immutable_memtables.len())
671            .field("num_levels", &current.levels.len())
672            .field("total_files", &current.total_file_count())
673            .field("db_path", &self.db_path)
674            .finish()
675    }
676}
677
678// =============================================================================
679// SuperVersionHandle - RAII Guard for SuperVersion
680// =============================================================================
681
682/// RAII handle for SuperVersion access
683///
684/// This provides a convenient way to access a SuperVersion with automatic
685/// cleanup. The handle can be cloned to share access across threads.
686pub struct SuperVersionHandle {
687    version: Arc<SuperVersion>,
688    version_set: Arc<VersionSet>,
689    snapshot_seqno: Option<u64>,
690}
691
692impl SuperVersionHandle {
693    /// Create a new handle from a VersionSet
694    pub fn new(version_set: Arc<VersionSet>) -> Self {
695        let version = version_set.get_arc();
696        Self {
697            version,
698            version_set,
699            snapshot_seqno: None,
700        }
701    }
702
703    /// Create a handle with a registered snapshot
704    pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
705        let registered_seqno = version_set.register_snapshot(seqno);
706        let version = version_set.get_arc();
707        Self {
708            version,
709            version_set,
710            snapshot_seqno: Some(registered_seqno),
711        }
712    }
713
714    /// Get the SuperVersion
715    #[inline]
716    pub fn version(&self) -> &SuperVersion {
717        &self.version
718    }
719
720    /// Get the snapshot sequence number (if any)
721    pub fn snapshot_seqno(&self) -> Option<u64> {
722        self.snapshot_seqno
723    }
724}
725
726impl Drop for SuperVersionHandle {
727    fn drop(&mut self) {
728        if let Some(seqno) = self.snapshot_seqno {
729            self.version_set.release_snapshot(seqno);
730        }
731    }
732}
733
734impl Clone for SuperVersionHandle {
735    fn clone(&self) -> Self {
736        // If we have a snapshot, register another reference
737        if let Some(seqno) = self.snapshot_seqno {
738            self.version_set.register_snapshot(seqno);
739        }
740        Self {
741            version: Arc::clone(&self.version),
742            version_set: Arc::clone(&self.version_set),
743            snapshot_seqno: self.snapshot_seqno,
744        }
745    }
746}
747
748// =============================================================================
749// Tests
750// =============================================================================
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755    use std::thread;
756
757    #[test]
758    fn test_superversion_creation() {
759        let sv = SuperVersion::new();
760        assert_eq!(sv.version_number, 1);
761        assert!(sv.immutable_memtables.is_empty());
762        assert!(sv.levels.is_empty());
763    }
764
765    #[test]
766    fn test_version_set_get() {
767        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
768        let sv = vs.get();
769        assert_eq!(sv.version_number, 1);
770    }
771
772    #[test]
773    fn test_version_set_install() {
774        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
775        
776        let new_sv = SuperVersion {
777            version_number: 2,
778            ..SuperVersion::new()
779        };
780        vs.install(new_sv);
781        
782        let sv = vs.get();
783        assert_eq!(sv.version_number, 2);
784    }
785
786    #[test]
787    fn test_concurrent_reads() {
788        let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
789        let mut handles = vec![];
790
791        for _ in 0..10 {
792            let vs_clone = Arc::clone(&vs);
793            handles.push(thread::spawn(move || {
794                for _ in 0..1000 {
795                    let sv = vs_clone.get();
796                    assert!(sv.version_number >= 1);
797                }
798            }));
799        }
800
801        for h in handles {
802            h.join().unwrap();
803        }
804    }
805
806    #[test]
807    fn test_snapshot_registry() {
808        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
809        
810        // Register snapshots
811        vs.register_snapshot(100);
812        vs.register_snapshot(200);
813        vs.register_snapshot(100); // Duplicate
814        
815        assert_eq!(vs.min_preserved_seqno(), 100);
816        
817        // Release one reference to 100
818        vs.release_snapshot(100);
819        assert_eq!(vs.min_preserved_seqno(), 100); // Still have one ref
820        
821        // Release second reference
822        vs.release_snapshot(100);
823        assert_eq!(vs.min_preserved_seqno(), 200);
824        
825        // Release 200
826        vs.release_snapshot(200);
827    }
828
829    #[test]
830    fn test_level_binary_search() {
831        let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
832        
833        // Add files in sorted order
834        for i in 0..10 {
835            let file = Arc::new(FileMetadata {
836                file_number: i as u64,
837                file_size: 1024,
838                smallest_key: format!("{:02}", i * 10).into_bytes(),
839                largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
840                num_entries: 100,
841                min_seqno: 1,
842                max_seqno: 100,
843                path: PathBuf::from(format!("/tmp/{}.sst", i)),
844                bloom_filter: None,
845                being_compacted: false,
846            });
847            level.files.push(file);
848            level.total_size += 1024;
849        }
850        
851        // Test point lookup
852        let files = level.find_files_for_key(b"25");
853        assert_eq!(files.len(), 1);
854        assert_eq!(files[0].file_number, 2);
855        
856        // Test range lookup
857        let files = level.find_files_for_range(b"15", b"35");
858        assert_eq!(files.len(), 3); // Files 1, 2, 3
859    }
860}