Skip to main content

sochdb_storage/
version_set.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SuperVersion Metadata + Copy-on-Write Version Set
19//!
20//! This module implements RocksDB-style SuperVersion metadata management for
21//! near lock-free reads. The key insight is that read paths only need a consistent
22//! snapshot of metadata (memtable, immutable memtables, SSTable levels), not
23//! exclusive access to the underlying data structures.
24//!
25//! ## Problem Analysis
26//!
27//! Previous implementation suffered from lock contention:
28//! - Foreground reads must traverse memtable (RwLock) → immutables → levels
29//! - Each level switch may acquire additional locks
30//! - Compaction/flush modifies metadata while readers hold references
31//! - Lock-order complexity leads to deadlock potential
32//!
33//! ## Solution: SuperVersion + ArcSwap
34//!
35//! ```text
36//! ┌─────────────────────────────────────────────────────────────────┐
37//! │                       VersionSet                                 │
38//! │  ┌─────────────────────────────────────────────────────────────┐│
39//! │  │ current: ArcSwap<SuperVersion>  ◄─── Atomic swap (O(1))     ││
40//! │  └─────────────────────────────────────────────────────────────┘│
41//! │                            │                                     │
42//! │                            ▼                                     │
43//! │  ┌─────────────────────────────────────────────────────────────┐│
44//! │  │                    SuperVersion                              ││
45//! │  │  ┌─────────────┐  ┌──────────────┐  ┌───────────────────┐  ││
46//! │  │  │ memtable    │  │ immutables   │  │ level_files       │  ││
47//! │  │  │ Arc<...>    │  │ Arc<Vec<..>> │  │ Arc<Vec<Level>>   │  ││
48//! │  │  └─────────────┘  └──────────────┘  └───────────────────┘  ││
49//! │  └─────────────────────────────────────────────────────────────┘│
50//! └─────────────────────────────────────────────────────────────────┘
51//! ```
52//!
53//! ## Read Path (Lock-Free)
54//!
55//! 1. Load current SuperVersion via atomic: `let sv = version_set.get();` // O(1)
56//! 2. Search memtable → immutables → levels using sv references
57//! 3. Release sv when done (Arc drop)
58//!
59//! No locks acquired. Readers are completely decoupled from writers.
60//!
61//! ## Write Path (Copy-on-Write)
62//!
63//! 1. Clone current SuperVersion's inner Arcs
64//! 2. Modify the new copy (e.g., add new immutable memtable)
65//! 3. Atomically swap new SuperVersion into place
66//! 4. Old SuperVersion kept alive by existing readers (Arc)
67//!
68//! ## Complexity Analysis
69//!
70//! | Operation          | Old (with locks)     | New (SuperVersion)        |
71//! |--------------------|----------------------|---------------------------|
72//! | Read acquire       | O(1) but contended   | O(1) atomic load          |
73//! | Read release       | O(1) unlock          | O(1) Arc decrement        |
74//! | Metadata update    | O(1) + lock wait     | O(changed_metadata) clone |
75//! | Concurrent reads   | Serialized on RwLock | Truly parallel            |
76//!
77//! For 8 threads, expected speedup: ~6-8x on read-heavy workloads.
78
79use arc_swap::{ArcSwap, Guard};
80use crossbeam_epoch::{self as epoch, Atomic, Owned, Shared};
81use parking_lot::Mutex;
82use std::collections::BTreeMap;
83use std::path::PathBuf;
84use std::sync::atomic::{AtomicU64, Ordering};
85use std::sync::Arc;
86
87// =============================================================================
88// File Metadata
89// =============================================================================
90
91/// Metadata for an SSTable file
92#[derive(Debug, Clone)]
93pub struct FileMetadata {
94    /// Unique file number
95    pub file_number: u64,
96    /// File size in bytes
97    pub file_size: u64,
98    /// Smallest key in file (for range queries)
99    pub smallest_key: Vec<u8>,
100    /// Largest key in file (for range queries)
101    pub largest_key: Vec<u8>,
102    /// Number of entries in file
103    pub num_entries: u64,
104    /// Minimum sequence number in file
105    pub min_seqno: u64,
106    /// Maximum sequence number in file
107    pub max_seqno: u64,
108    /// Path to the file
109    pub path: PathBuf,
110    /// Bloom filter (if loaded)
111    pub bloom_filter: Option<Arc<BloomFilterHandle>>,
112    /// Whether file is being compacted
113    pub being_compacted: bool,
114}
115
116impl FileMetadata {
117    /// Check if key might be in this file using bloom filter
118    #[inline]
119    pub fn may_contain(&self, key: &[u8]) -> bool {
120        match &self.bloom_filter {
121            Some(bf) => bf.may_contain(key),
122            None => true, // No filter = must check file
123        }
124    }
125
126    /// Check if a key range overlaps with this file
127    #[inline]
128    pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
129        if end.is_empty() {
130            // Unbounded end - overlaps if smallest_key <= end conceptually
131            self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
132        } else {
133            // Standard range overlap check
134            self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
135        }
136    }
137}
138
139/// Handle to a bloom filter (may be memory-mapped or in-memory)
140#[derive(Debug)]
141pub struct BloomFilterHandle {
142    /// The bloom filter bits
143    bits: Vec<u64>,
144    /// Number of hash functions
145    num_hashes: u32,
146}
147
148impl BloomFilterHandle {
149    /// Create a new bloom filter handle
150    pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
151        Self { bits, num_hashes }
152    }
153
154    /// Check if key may be present
155    #[inline]
156    pub fn may_contain(&self, key: &[u8]) -> bool {
157        if self.bits.is_empty() {
158            return true;
159        }
160        let num_bits = self.bits.len() * 64;
161        let h1 = Self::hash1(key);
162        let h2 = Self::hash2(key);
163
164        for i in 0..self.num_hashes {
165            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
166            let bit_idx = (h as usize) % num_bits;
167            let word_idx = bit_idx / 64;
168            let bit_pos = bit_idx % 64;
169            if self.bits[word_idx] & (1 << bit_pos) == 0 {
170                return false;
171            }
172        }
173        true
174    }
175
176    #[inline]
177    fn hash1(key: &[u8]) -> u64 {
178        use std::collections::hash_map::DefaultHasher;
179        use std::hash::{Hash, Hasher};
180        let mut hasher = DefaultHasher::new();
181        key.hash(&mut hasher);
182        hasher.finish()
183    }
184
185    #[inline]
186    fn hash2(key: &[u8]) -> u64 {
187        twox_hash::xxh3::hash64(key)
188    }
189}
190
191// =============================================================================
192// Level Metadata
193// =============================================================================
194
195/// Metadata for a single level in the LSM tree
196#[derive(Debug, Clone)]
197pub struct LevelMetadata {
198    /// Level number (0 = unsorted, 1+ = sorted)
199    pub level: u32,
200    /// Files in this level (sorted by smallest_key for L1+)
201    pub files: Vec<Arc<FileMetadata>>,
202    /// Total size of files in this level
203    pub total_size: u64,
204    /// Target size for this level (for compaction decisions)
205    pub target_size: u64,
206    /// Compaction score (size / target_size)
207    pub compaction_score: f64,
208}
209
210impl LevelMetadata {
211    /// Create a new empty level
212    pub fn new(level: u32, target_size: u64) -> Self {
213        Self {
214            level,
215            files: Vec::new(),
216            total_size: 0,
217            target_size,
218            compaction_score: 0.0,
219        }
220    }
221
222    /// Find files that may contain a key using binary search (for L1+)
223    pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
224        if self.level == 0 {
225            // L0 is unsorted - must check all files
226            self.files
227                .iter()
228                .filter(|f| {
229                    f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key
230                })
231                .collect()
232        } else {
233            // L1+ is sorted - binary search for overlapping files
234            let idx = self.files.partition_point(|f| f.largest_key.as_slice() < key);
235            if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
236                vec![&self.files[idx]]
237            } else {
238                vec![]
239            }
240        }
241    }
242
243    /// Find files that overlap with a key range
244    pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
245        if self.level == 0 {
246            // L0 is unsorted - check all files
247            self.files
248                .iter()
249                .filter(|f| f.overlaps_range(start, end))
250                .collect()
251        } else {
252            // L1+ is sorted - find range using binary search
253            let start_idx = self.files.partition_point(|f| f.largest_key.as_slice() < start);
254            let end_idx = if end.is_empty() {
255                self.files.len()
256            } else {
257                self.files.partition_point(|f| f.smallest_key.as_slice() <= end)
258            };
259            self.files[start_idx..end_idx].iter().collect()
260        }
261    }
262
263    /// Recalculate compaction score
264    pub fn update_compaction_score(&mut self) {
265        if self.target_size > 0 {
266            self.compaction_score = self.total_size as f64 / self.target_size as f64;
267        } else {
268            self.compaction_score = 0.0;
269        }
270    }
271}
272
273// =============================================================================
274// Immutable MemTable Reference
275// =============================================================================
276
277/// Reference to an immutable memtable (sealed, pending flush)
278#[derive(Debug, Clone)]
279pub struct ImmutableMemTableRef {
280    /// Unique ID for this immutable memtable
281    pub id: u64,
282    /// Sequence number when sealed
283    pub seal_seqno: u64,
284    /// Approximate size in bytes
285    pub size_bytes: u64,
286    /// Reference to the actual memtable data
287    /// This is opaque - the actual type is provided by the storage layer
288    pub data: Arc<dyn ImmutableMemTable>,
289}
290
291/// Trait for immutable memtable operations
292pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
293    /// Get a value at a specific sequence number
294    fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
295    
296    /// Iterate over all entries
297    fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
298    
299    /// Get approximate size
300    fn size(&self) -> u64;
301}
302
303// =============================================================================
304// SuperVersion - The Core Abstraction
305// =============================================================================
306
307/// SuperVersion: A consistent snapshot of all storage metadata
308///
309/// This is the key abstraction for lock-free reads. A SuperVersion contains
310/// Arc references to:
311/// - Current mutable memtable
312/// - List of immutable memtables (pending flush)
313/// - All SSTable levels and their file metadata
314///
315/// Readers acquire a SuperVersion (O(1) atomic load), use it for their entire
316/// read operation, then release it (O(1) Arc drop). No locks required.
317///
318/// Writers create a new SuperVersion with modified metadata and atomically
319/// swap it in. Old versions remain valid until all readers release them.
320#[derive(Debug, Clone)]
321pub struct SuperVersion {
322    /// Version number (monotonically increasing)
323    pub version_number: u64,
324    
325    /// Current mutable memtable reference
326    /// Note: The memtable itself may have internal synchronization,
327    /// but the *reference* is immutable within a SuperVersion
328    pub memtable_version: u64,
329    
330    /// Immutable memtables (oldest first)
331    pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
332    
333    /// SSTable levels (L0, L1, L2, ...)
334    pub levels: Arc<Vec<LevelMetadata>>,
335    
336    /// Minimum sequence number safe to garbage collect
337    /// Versions with seqno < this can be pruned during compaction
338    pub min_snapshot_seqno: u64,
339    
340    /// Current log (WAL) number for crash recovery
341    pub log_number: u64,
342    
343    /// Prev log number (for two-log protocol during flush)
344    pub prev_log_number: u64,
345    
346    /// Next file number to allocate
347    pub next_file_number: u64,
348    
349    /// Manifest file number
350    pub manifest_file_number: u64,
351}
352
353impl SuperVersion {
354    /// Create a new empty SuperVersion
355    pub fn new() -> Self {
356        Self {
357            version_number: 1,
358            memtable_version: 1,
359            immutable_memtables: Arc::new(Vec::new()),
360            levels: Arc::new(Vec::new()),
361            min_snapshot_seqno: 0,
362            log_number: 1,
363            prev_log_number: 0,
364            next_file_number: 2,
365            manifest_file_number: 1,
366        }
367    }
368
369    /// Create a new SuperVersion with updated immutable memtables
370    pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
371        let mut new_imms = (*self.immutable_memtables).clone();
372        new_imms.push(imm);
373        
374        Self {
375            version_number: self.version_number + 1,
376            memtable_version: self.memtable_version + 1,
377            immutable_memtables: Arc::new(new_imms),
378            levels: Arc::clone(&self.levels),
379            min_snapshot_seqno: self.min_snapshot_seqno,
380            log_number: self.log_number,
381            prev_log_number: self.prev_log_number,
382            next_file_number: self.next_file_number,
383            manifest_file_number: self.manifest_file_number,
384        }
385    }
386
387    /// Create a new SuperVersion after flushing immutable memtables
388    pub fn with_flushed_memtables(
389        &self,
390        flushed_ids: &[u64],
391        new_files: Vec<(u32, Arc<FileMetadata>)>, // (level, file)
392    ) -> Self {
393        // Remove flushed immutable memtables
394        let new_imms: Vec<_> = self.immutable_memtables
395            .iter()
396            .filter(|imm| !flushed_ids.contains(&imm.id))
397            .cloned()
398            .collect();
399        
400        // Add new files to levels
401        let mut new_levels = (*self.levels).clone();
402        for (level, file) in new_files {
403            // Ensure level exists
404            while new_levels.len() <= level as usize {
405                let target_size = self.level_target_size(new_levels.len() as u32);
406                new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
407            }
408            
409            let lm = &mut new_levels[level as usize];
410            lm.total_size += file.file_size;
411            lm.files.push(file);
412            lm.update_compaction_score();
413        }
414        
415        Self {
416            version_number: self.version_number + 1,
417            memtable_version: self.memtable_version,
418            immutable_memtables: Arc::new(new_imms),
419            levels: Arc::new(new_levels),
420            min_snapshot_seqno: self.min_snapshot_seqno,
421            log_number: self.log_number,
422            prev_log_number: self.prev_log_number,
423            next_file_number: self.next_file_number,
424            manifest_file_number: self.manifest_file_number,
425        }
426    }
427
428    /// Create a new SuperVersion after compaction
429    pub fn with_compaction_result(
430        &self,
431        input_files: &[(u32, u64)], // (level, file_number)
432        output_files: Vec<(u32, Arc<FileMetadata>)>,
433    ) -> Self {
434        let mut new_levels = (*self.levels).clone();
435        
436        // Remove input files
437        for (level, file_num) in input_files {
438            if let Some(lm) = new_levels.get_mut(*level as usize) {
439                if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
440                    let removed = lm.files.remove(pos);
441                    lm.total_size -= removed.file_size;
442                }
443            }
444        }
445        
446        // Add output files
447        for (level, file) in output_files {
448            while new_levels.len() <= level as usize {
449                let target_size = self.level_target_size(new_levels.len() as u32);
450                new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
451            }
452            
453            let lm = &mut new_levels[level as usize];
454            lm.total_size += file.file_size;
455            
456            // Insert in sorted order for L1+
457            if level > 0 {
458                let pos = lm.files.partition_point(|f| f.smallest_key < file.smallest_key);
459                lm.files.insert(pos, file);
460            } else {
461                lm.files.push(file);
462            }
463            lm.update_compaction_score();
464        }
465        
466        Self {
467            version_number: self.version_number + 1,
468            memtable_version: self.memtable_version,
469            immutable_memtables: Arc::clone(&self.immutable_memtables),
470            levels: Arc::new(new_levels),
471            min_snapshot_seqno: self.min_snapshot_seqno,
472            log_number: self.log_number,
473            prev_log_number: self.prev_log_number,
474            next_file_number: self.next_file_number,
475            manifest_file_number: self.manifest_file_number,
476        }
477    }
478
479    /// Calculate target size for a level
480    fn level_target_size(&self, level: u32) -> u64 {
481        // Level 0: 64MB, Level 1: 256MB, each subsequent level 10x
482        match level {
483            0 => 64 * 1024 * 1024,
484            1 => 256 * 1024 * 1024,
485            _ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
486        }
487    }
488
489    /// Get total number of files across all levels
490    pub fn total_file_count(&self) -> usize {
491        self.levels.iter().map(|l| l.files.len()).sum()
492    }
493
494    /// Get level with highest compaction score
495    pub fn pick_compaction_level(&self) -> Option<u32> {
496        self.levels
497            .iter()
498            .filter(|l| l.compaction_score > 1.0)
499            .max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
500            .map(|l| l.level)
501    }
502}
503
504impl Default for SuperVersion {
505    fn default() -> Self {
506        Self::new()
507    }
508}
509
510// =============================================================================
511// VersionSet - The Top-Level Manager
512// =============================================================================
513
514/// VersionSet manages the current SuperVersion and provides atomic updates
515///
516/// This is the entry point for all metadata access. Readers call `get()` to
517/// acquire the current SuperVersion, writers call `install()` to atomically
518/// swap in a new version.
519///
520/// ## Thread Safety
521///
522/// - `get()`: Lock-free (atomic load)
523/// - `install()`: Serialized through internal mutex (writers must coordinate)
524///
525/// Multiple readers can proceed in parallel with zero synchronization.
526/// Writers are serialized to ensure consistent version progression.
527pub struct VersionSet {
528    /// Current SuperVersion (atomically swappable)
529    current: ArcSwap<SuperVersion>,
530    
531    /// Version number counter
532    version_counter: AtomicU64,
533    
534    /// Write serialization lock
535    /// Only one writer can modify the version at a time
536    write_lock: Mutex<()>,
537    
538    /// Snapshot registry - tracks active snapshots for GC
539    snapshots: Mutex<BTreeMap<u64, u64>>, // seqno -> ref_count
540    
541    /// Database directory
542    db_path: PathBuf,
543}
544
545impl VersionSet {
546    /// Create a new VersionSet
547    pub fn new(db_path: PathBuf) -> Self {
548        Self {
549            current: ArcSwap::from_pointee(SuperVersion::new()),
550            version_counter: AtomicU64::new(1),
551            write_lock: Mutex::new(()),
552            snapshots: Mutex::new(BTreeMap::new()),
553            db_path,
554        }
555    }
556
557    /// Get current SuperVersion (lock-free)
558    ///
559    /// This is the hot path for reads. Returns a Guard that holds an Arc
560    /// to the current SuperVersion. The Guard can be dereferenced to access
561    /// the SuperVersion.
562    ///
563    /// ## Performance
564    ///
565    /// - Time: O(1) atomic load
566    /// - No locks acquired
567    /// - No memory allocation
568    #[inline]
569    pub fn get(&self) -> Guard<Arc<SuperVersion>> {
570        self.current.load()
571    }
572
573    /// Get current SuperVersion as owned Arc
574    ///
575    /// Use this when you need to hold the SuperVersion across await points
576    /// or store it for later use.
577    #[inline]
578    pub fn get_arc(&self) -> Arc<SuperVersion> {
579        self.current.load_full()
580    }
581
582    /// Install a new SuperVersion (serialized)
583    ///
584    /// This atomically swaps the new version into place. The old version
585    /// remains valid for any readers that acquired it before the swap.
586    ///
587    /// ## Safety
588    ///
589    /// Only one writer should call this at a time. Use `with_write_lock()`
590    /// to serialize concurrent updates.
591    pub fn install(&self, new_version: SuperVersion) {
592        let _guard = self.write_lock.lock();
593        self.current.store(Arc::new(new_version));
594        self.version_counter.fetch_add(1, Ordering::SeqCst);
595    }
596
597    /// Execute a function with exclusive write access
598    ///
599    /// Use this to ensure atomic read-modify-write operations on the
600    /// version set. The function receives the current SuperVersion and
601    /// should return the new SuperVersion to install.
602    pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
603    where
604        F: FnOnce(&SuperVersion) -> SuperVersion,
605    {
606        let _guard = self.write_lock.lock();
607        let current = self.current.load();
608        let new_version = f(&current);
609        self.current.store(Arc::new(new_version.clone()));
610        self.version_counter.fetch_add(1, Ordering::SeqCst);
611        new_version
612    }
613
614    /// Register a snapshot at the given sequence number
615    ///
616    /// Returns the snapshot sequence number for later release.
617    pub fn register_snapshot(&self, seqno: u64) -> u64 {
618        let mut snapshots = self.snapshots.lock();
619        *snapshots.entry(seqno).or_insert(0) += 1;
620        seqno
621    }
622
623    /// Release a snapshot
624    pub fn release_snapshot(&self, seqno: u64) {
625        let mut snapshots = self.snapshots.lock();
626        if let Some(count) = snapshots.get_mut(&seqno) {
627            *count -= 1;
628            if *count == 0 {
629                snapshots.remove(&seqno);
630            }
631        }
632    }
633
634    /// Get minimum sequence number that must be preserved
635    ///
636    /// Returns the oldest snapshot sequence number, or the current version's
637    /// min_snapshot_seqno if no snapshots are active.
638    pub fn min_preserved_seqno(&self) -> u64 {
639        let snapshots = self.snapshots.lock();
640        snapshots.keys().next().copied().unwrap_or_else(|| {
641            self.current.load().min_snapshot_seqno
642        })
643    }
644
645    /// Update min_snapshot_seqno based on active snapshots
646    pub fn update_min_snapshot_seqno(&self) {
647        let min_seqno = self.min_preserved_seqno();
648        self.with_write_lock(|current| {
649            SuperVersion {
650                min_snapshot_seqno: min_seqno,
651                version_number: current.version_number + 1,
652                ..current.clone()
653            }
654        });
655    }
656
657    /// Get database path
658    pub fn db_path(&self) -> &PathBuf {
659        &self.db_path
660    }
661
662    /// Get current version number
663    pub fn version_number(&self) -> u64 {
664        self.version_counter.load(Ordering::SeqCst)
665    }
666}
667
668impl std::fmt::Debug for VersionSet {
669    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
670        let current = self.current.load();
671        f.debug_struct("VersionSet")
672            .field("version_number", &current.version_number)
673            .field("num_immutables", &current.immutable_memtables.len())
674            .field("num_levels", &current.levels.len())
675            .field("total_files", &current.total_file_count())
676            .field("db_path", &self.db_path)
677            .finish()
678    }
679}
680
681// =============================================================================
682// SuperVersionHandle - RAII Guard for SuperVersion
683// =============================================================================
684
685/// RAII handle for SuperVersion access
686///
687/// This provides a convenient way to access a SuperVersion with automatic
688/// cleanup. The handle can be cloned to share access across threads.
689pub struct SuperVersionHandle {
690    version: Arc<SuperVersion>,
691    version_set: Arc<VersionSet>,
692    snapshot_seqno: Option<u64>,
693}
694
695impl SuperVersionHandle {
696    /// Create a new handle from a VersionSet
697    pub fn new(version_set: Arc<VersionSet>) -> Self {
698        let version = version_set.get_arc();
699        Self {
700            version,
701            version_set,
702            snapshot_seqno: None,
703        }
704    }
705
706    /// Create a handle with a registered snapshot
707    pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
708        let registered_seqno = version_set.register_snapshot(seqno);
709        let version = version_set.get_arc();
710        Self {
711            version,
712            version_set,
713            snapshot_seqno: Some(registered_seqno),
714        }
715    }
716
717    /// Get the SuperVersion
718    #[inline]
719    pub fn version(&self) -> &SuperVersion {
720        &self.version
721    }
722
723    /// Get the snapshot sequence number (if any)
724    pub fn snapshot_seqno(&self) -> Option<u64> {
725        self.snapshot_seqno
726    }
727}
728
729impl Drop for SuperVersionHandle {
730    fn drop(&mut self) {
731        if let Some(seqno) = self.snapshot_seqno {
732            self.version_set.release_snapshot(seqno);
733        }
734    }
735}
736
737impl Clone for SuperVersionHandle {
738    fn clone(&self) -> Self {
739        // If we have a snapshot, register another reference
740        if let Some(seqno) = self.snapshot_seqno {
741            self.version_set.register_snapshot(seqno);
742        }
743        Self {
744            version: Arc::clone(&self.version),
745            version_set: Arc::clone(&self.version_set),
746            snapshot_seqno: self.snapshot_seqno,
747        }
748    }
749}
750
751// =============================================================================
752// Tests
753// =============================================================================
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use std::thread;
759
760    #[test]
761    fn test_superversion_creation() {
762        let sv = SuperVersion::new();
763        assert_eq!(sv.version_number, 1);
764        assert!(sv.immutable_memtables.is_empty());
765        assert!(sv.levels.is_empty());
766    }
767
768    #[test]
769    fn test_version_set_get() {
770        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
771        let sv = vs.get();
772        assert_eq!(sv.version_number, 1);
773    }
774
775    #[test]
776    fn test_version_set_install() {
777        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
778        
779        let new_sv = SuperVersion {
780            version_number: 2,
781            ..SuperVersion::new()
782        };
783        vs.install(new_sv);
784        
785        let sv = vs.get();
786        assert_eq!(sv.version_number, 2);
787    }
788
789    #[test]
790    fn test_concurrent_reads() {
791        let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
792        let mut handles = vec![];
793
794        for _ in 0..10 {
795            let vs_clone = Arc::clone(&vs);
796            handles.push(thread::spawn(move || {
797                for _ in 0..1000 {
798                    let sv = vs_clone.get();
799                    assert!(sv.version_number >= 1);
800                }
801            }));
802        }
803
804        for h in handles {
805            h.join().unwrap();
806        }
807    }
808
809    #[test]
810    fn test_snapshot_registry() {
811        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
812        
813        // Register snapshots
814        vs.register_snapshot(100);
815        vs.register_snapshot(200);
816        vs.register_snapshot(100); // Duplicate
817        
818        assert_eq!(vs.min_preserved_seqno(), 100);
819        
820        // Release one reference to 100
821        vs.release_snapshot(100);
822        assert_eq!(vs.min_preserved_seqno(), 100); // Still have one ref
823        
824        // Release second reference
825        vs.release_snapshot(100);
826        assert_eq!(vs.min_preserved_seqno(), 200);
827        
828        // Release 200
829        vs.release_snapshot(200);
830    }
831
832    #[test]
833    fn test_level_binary_search() {
834        let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
835        
836        // Add files in sorted order
837        for i in 0..10 {
838            let file = Arc::new(FileMetadata {
839                file_number: i as u64,
840                file_size: 1024,
841                smallest_key: format!("{:02}", i * 10).into_bytes(),
842                largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
843                num_entries: 100,
844                min_seqno: 1,
845                max_seqno: 100,
846                path: PathBuf::from(format!("/tmp/{}.sst", i)),
847                bloom_filter: None,
848                being_compacted: false,
849            });
850            level.files.push(file);
851            level.total_size += 1024;
852        }
853        
854        // Test point lookup
855        let files = level.find_files_for_key(b"25");
856        assert_eq!(files.len(), 1);
857        assert_eq!(files[0].file_number, 2);
858        
859        // Test range lookup
860        let files = level.find_files_for_range(b"15", b"35");
861        assert_eq!(files.len(), 3); // Files 1, 2, 3
862    }
863}