Skip to main content

sochdb_storage/
version_set.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SuperVersion Metadata + Copy-on-Write Version Set
19//!
20//! This module implements RocksDB-style SuperVersion metadata management for
21//! near lock-free reads. The key insight is that read paths only need a consistent
22//! snapshot of metadata (memtable, immutable memtables, SSTable levels), not
23//! exclusive access to the underlying data structures.
24//!
25//! ## Problem Analysis
26//!
27//! Previous implementation suffered from lock contention:
28//! - Foreground reads must traverse memtable (RwLock) → immutables → levels
29//! - Each level switch may acquire additional locks
30//! - Compaction/flush modifies metadata while readers hold references
31//! - Lock-order complexity leads to deadlock potential
32//!
33//! ## Solution: SuperVersion + ArcSwap
34//!
35//! ```text
36//! ┌─────────────────────────────────────────────────────────────────┐
37//! │                       VersionSet                                 │
38//! │  ┌─────────────────────────────────────────────────────────────┐│
39//! │  │ current: ArcSwap<SuperVersion>  ◄─── Atomic swap (O(1))     ││
40//! │  └─────────────────────────────────────────────────────────────┘│
41//! │                            │                                     │
42//! │                            ▼                                     │
43//! │  ┌─────────────────────────────────────────────────────────────┐│
44//! │  │                    SuperVersion                              ││
45//! │  │  ┌─────────────┐  ┌──────────────┐  ┌───────────────────┐  ││
46//! │  │  │ memtable    │  │ immutables   │  │ level_files       │  ││
47//! │  │  │ Arc<...>    │  │ Arc<Vec<..>> │  │ Arc<Vec<Level>>   │  ││
48//! │  │  └─────────────┘  └──────────────┘  └───────────────────┘  ││
49//! │  └─────────────────────────────────────────────────────────────┘│
50//! └─────────────────────────────────────────────────────────────────┘
51//! ```
52//!
53//! ## Read Path (Lock-Free)
54//!
55//! 1. Load current SuperVersion via atomic: `let sv = version_set.get();` // O(1)
56//! 2. Search memtable → immutables → levels using sv references
57//! 3. Release sv when done (Arc drop)
58//!
59//! No locks acquired. Readers are completely decoupled from writers.
60//!
61//! ## Write Path (Copy-on-Write)
62//!
63//! 1. Clone current SuperVersion's inner Arcs
64//! 2. Modify the new copy (e.g., add new immutable memtable)
65//! 3. Atomically swap new SuperVersion into place
66//! 4. Old SuperVersion kept alive by existing readers (Arc)
67//!
68//! ## Complexity Analysis
69//!
70//! | Operation          | Old (with locks)     | New (SuperVersion)        |
71//! |--------------------|----------------------|---------------------------|
72//! | Read acquire       | O(1) but contended   | O(1) atomic load          |
73//! | Read release       | O(1) unlock          | O(1) Arc decrement        |
74//! | Metadata update    | O(1) + lock wait     | O(changed_metadata) clone |
75//! | Concurrent reads   | Serialized on RwLock | Truly parallel            |
76//!
77//! For 8 threads, expected speedup: ~6-8x on read-heavy workloads.
78
79use arc_swap::{ArcSwap, Guard};
80use parking_lot::Mutex;
81use std::collections::BTreeMap;
82use std::path::PathBuf;
83use std::sync::Arc;
84use std::sync::atomic::{AtomicU64, Ordering};
85
86// =============================================================================
87// File Metadata
88// =============================================================================
89
90/// Metadata for an SSTable file
91#[derive(Debug, Clone)]
92pub struct FileMetadata {
93    /// Unique file number
94    pub file_number: u64,
95    /// File size in bytes
96    pub file_size: u64,
97    /// Smallest key in file (for range queries)
98    pub smallest_key: Vec<u8>,
99    /// Largest key in file (for range queries)
100    pub largest_key: Vec<u8>,
101    /// Number of entries in file
102    pub num_entries: u64,
103    /// Minimum sequence number in file
104    pub min_seqno: u64,
105    /// Maximum sequence number in file
106    pub max_seqno: u64,
107    /// Path to the file
108    pub path: PathBuf,
109    /// Bloom filter (if loaded)
110    pub bloom_filter: Option<Arc<BloomFilterHandle>>,
111    /// Whether file is being compacted
112    pub being_compacted: bool,
113}
114
115impl FileMetadata {
116    /// Check if key might be in this file using bloom filter
117    #[inline]
118    pub fn may_contain(&self, key: &[u8]) -> bool {
119        match &self.bloom_filter {
120            Some(bf) => bf.may_contain(key),
121            None => true, // No filter = must check file
122        }
123    }
124
125    /// Check if a key range overlaps with this file
126    #[inline]
127    pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
128        if end.is_empty() {
129            // Unbounded end - overlaps if smallest_key <= end conceptually
130            self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
131        } else {
132            // Standard range overlap check
133            self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
134        }
135    }
136}
137
138/// Handle to a bloom filter (may be memory-mapped or in-memory)
139#[derive(Debug)]
140pub struct BloomFilterHandle {
141    /// The bloom filter bits
142    bits: Vec<u64>,
143    /// Number of hash functions
144    num_hashes: u32,
145}
146
147impl BloomFilterHandle {
148    /// Create a new bloom filter handle
149    pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
150        Self { bits, num_hashes }
151    }
152
153    /// Check if key may be present
154    #[inline]
155    pub fn may_contain(&self, key: &[u8]) -> bool {
156        if self.bits.is_empty() {
157            return true;
158        }
159        let num_bits = self.bits.len() * 64;
160        let h1 = Self::hash1(key);
161        let h2 = Self::hash2(key);
162
163        for i in 0..self.num_hashes {
164            let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
165            let bit_idx = (h as usize) % num_bits;
166            let word_idx = bit_idx / 64;
167            let bit_pos = bit_idx % 64;
168            if self.bits[word_idx] & (1 << bit_pos) == 0 {
169                return false;
170            }
171        }
172        true
173    }
174
175    #[inline]
176    fn hash1(key: &[u8]) -> u64 {
177        use std::collections::hash_map::DefaultHasher;
178        use std::hash::{Hash, Hasher};
179        let mut hasher = DefaultHasher::new();
180        key.hash(&mut hasher);
181        hasher.finish()
182    }
183
184    #[inline]
185    fn hash2(key: &[u8]) -> u64 {
186        twox_hash::xxh3::hash64(key)
187    }
188}
189
190// =============================================================================
191// Level Metadata
192// =============================================================================
193
194/// Metadata for a single level in the LSM tree
195#[derive(Debug, Clone)]
196pub struct LevelMetadata {
197    /// Level number (0 = unsorted, 1+ = sorted)
198    pub level: u32,
199    /// Files in this level (sorted by smallest_key for L1+)
200    pub files: Vec<Arc<FileMetadata>>,
201    /// Total size of files in this level
202    pub total_size: u64,
203    /// Target size for this level (for compaction decisions)
204    pub target_size: u64,
205    /// Compaction score (size / target_size)
206    pub compaction_score: f64,
207}
208
209impl LevelMetadata {
210    /// Create a new empty level
211    pub fn new(level: u32, target_size: u64) -> Self {
212        Self {
213            level,
214            files: Vec::new(),
215            total_size: 0,
216            target_size,
217            compaction_score: 0.0,
218        }
219    }
220
221    /// Find files that may contain a key using binary search (for L1+)
222    pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
223        if self.level == 0 {
224            // L0 is unsorted - must check all files
225            self.files
226                .iter()
227                .filter(|f| f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key)
228                .collect()
229        } else {
230            // L1+ is sorted - binary search for overlapping files
231            let idx = self
232                .files
233                .partition_point(|f| f.largest_key.as_slice() < key);
234            if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
235                vec![&self.files[idx]]
236            } else {
237                vec![]
238            }
239        }
240    }
241
242    /// Find files that overlap with a key range
243    pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
244        if self.level == 0 {
245            // L0 is unsorted - check all files
246            self.files
247                .iter()
248                .filter(|f| f.overlaps_range(start, end))
249                .collect()
250        } else {
251            // L1+ is sorted - find range using binary search
252            let start_idx = self
253                .files
254                .partition_point(|f| f.largest_key.as_slice() < start);
255            let end_idx = if end.is_empty() {
256                self.files.len()
257            } else {
258                self.files
259                    .partition_point(|f| f.smallest_key.as_slice() <= end)
260            };
261            self.files[start_idx..end_idx].iter().collect()
262        }
263    }
264
265    /// Recalculate compaction score
266    pub fn update_compaction_score(&mut self) {
267        if self.target_size > 0 {
268            self.compaction_score = self.total_size as f64 / self.target_size as f64;
269        } else {
270            self.compaction_score = 0.0;
271        }
272    }
273}
274
275// =============================================================================
276// Immutable MemTable Reference
277// =============================================================================
278
279/// Reference to an immutable memtable (sealed, pending flush)
280#[derive(Debug, Clone)]
281pub struct ImmutableMemTableRef {
282    /// Unique ID for this immutable memtable
283    pub id: u64,
284    /// Sequence number when sealed
285    pub seal_seqno: u64,
286    /// Approximate size in bytes
287    pub size_bytes: u64,
288    /// Reference to the actual memtable data
289    /// This is opaque - the actual type is provided by the storage layer
290    pub data: Arc<dyn ImmutableMemTable>,
291}
292
293/// Trait for immutable memtable operations
294pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
295    /// Get a value at a specific sequence number
296    fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
297
298    /// Iterate over all entries
299    fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
300
301    /// Get approximate size
302    fn size(&self) -> u64;
303}
304
305// =============================================================================
306// SuperVersion - The Core Abstraction
307// =============================================================================
308
309/// SuperVersion: A consistent snapshot of all storage metadata
310///
311/// This is the key abstraction for lock-free reads. A SuperVersion contains
312/// Arc references to:
313/// - Current mutable memtable
314/// - List of immutable memtables (pending flush)
315/// - All SSTable levels and their file metadata
316///
317/// Readers acquire a SuperVersion (O(1) atomic load), use it for their entire
318/// read operation, then release it (O(1) Arc drop). No locks required.
319///
320/// Writers create a new SuperVersion with modified metadata and atomically
321/// swap it in. Old versions remain valid until all readers release them.
322#[derive(Debug, Clone)]
323pub struct SuperVersion {
324    /// Version number (monotonically increasing)
325    pub version_number: u64,
326
327    /// Current mutable memtable reference
328    /// Note: The memtable itself may have internal synchronization,
329    /// but the *reference* is immutable within a SuperVersion
330    pub memtable_version: u64,
331
332    /// Immutable memtables (oldest first)
333    pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
334
335    /// SSTable levels (L0, L1, L2, ...)
336    pub levels: Arc<Vec<LevelMetadata>>,
337
338    /// Minimum sequence number safe to garbage collect
339    /// Versions with seqno < this can be pruned during compaction
340    pub min_snapshot_seqno: u64,
341
342    /// Current log (WAL) number for crash recovery
343    pub log_number: u64,
344
345    /// Prev log number (for two-log protocol during flush)
346    pub prev_log_number: u64,
347
348    /// Next file number to allocate
349    pub next_file_number: u64,
350
351    /// Manifest file number
352    pub manifest_file_number: u64,
353}
354
355impl SuperVersion {
356    /// Create a new empty SuperVersion
357    pub fn new() -> Self {
358        Self {
359            version_number: 1,
360            memtable_version: 1,
361            immutable_memtables: Arc::new(Vec::new()),
362            levels: Arc::new(Vec::new()),
363            min_snapshot_seqno: 0,
364            log_number: 1,
365            prev_log_number: 0,
366            next_file_number: 2,
367            manifest_file_number: 1,
368        }
369    }
370
371    /// Create a new SuperVersion with updated immutable memtables
372    pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
373        let mut new_imms = (*self.immutable_memtables).clone();
374        new_imms.push(imm);
375
376        Self {
377            version_number: self.version_number + 1,
378            memtable_version: self.memtable_version + 1,
379            immutable_memtables: Arc::new(new_imms),
380            levels: Arc::clone(&self.levels),
381            min_snapshot_seqno: self.min_snapshot_seqno,
382            log_number: self.log_number,
383            prev_log_number: self.prev_log_number,
384            next_file_number: self.next_file_number,
385            manifest_file_number: self.manifest_file_number,
386        }
387    }
388
389    /// Create a new SuperVersion after flushing immutable memtables
390    pub fn with_flushed_memtables(
391        &self,
392        flushed_ids: &[u64],
393        new_files: Vec<(u32, Arc<FileMetadata>)>, // (level, file)
394    ) -> Self {
395        // Remove flushed immutable memtables
396        let new_imms: Vec<_> = self
397            .immutable_memtables
398            .iter()
399            .filter(|imm| !flushed_ids.contains(&imm.id))
400            .cloned()
401            .collect();
402
403        // Add new files to levels
404        let mut new_levels = (*self.levels).clone();
405        for (level, file) in new_files {
406            // Ensure level exists
407            while new_levels.len() <= level as usize {
408                let target_size = self.level_target_size(new_levels.len() as u32);
409                new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
410            }
411
412            let lm = &mut new_levels[level as usize];
413            lm.total_size += file.file_size;
414            lm.files.push(file);
415            lm.update_compaction_score();
416        }
417
418        Self {
419            version_number: self.version_number + 1,
420            memtable_version: self.memtable_version,
421            immutable_memtables: Arc::new(new_imms),
422            levels: Arc::new(new_levels),
423            min_snapshot_seqno: self.min_snapshot_seqno,
424            log_number: self.log_number,
425            prev_log_number: self.prev_log_number,
426            next_file_number: self.next_file_number,
427            manifest_file_number: self.manifest_file_number,
428        }
429    }
430
431    /// Create a new SuperVersion after compaction
432    pub fn with_compaction_result(
433        &self,
434        input_files: &[(u32, u64)], // (level, file_number)
435        output_files: Vec<(u32, Arc<FileMetadata>)>,
436    ) -> Self {
437        let mut new_levels = (*self.levels).clone();
438
439        // Remove input files
440        for (level, file_num) in input_files {
441            if let Some(lm) = new_levels.get_mut(*level as usize) {
442                if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
443                    let removed = lm.files.remove(pos);
444                    lm.total_size -= removed.file_size;
445                }
446            }
447        }
448
449        // Add output files
450        for (level, file) in output_files {
451            while new_levels.len() <= level as usize {
452                let target_size = self.level_target_size(new_levels.len() as u32);
453                new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
454            }
455
456            let lm = &mut new_levels[level as usize];
457            lm.total_size += file.file_size;
458
459            // Insert in sorted order for L1+
460            if level > 0 {
461                let pos = lm
462                    .files
463                    .partition_point(|f| f.smallest_key < file.smallest_key);
464                lm.files.insert(pos, file);
465            } else {
466                lm.files.push(file);
467            }
468            lm.update_compaction_score();
469        }
470
471        Self {
472            version_number: self.version_number + 1,
473            memtable_version: self.memtable_version,
474            immutable_memtables: Arc::clone(&self.immutable_memtables),
475            levels: Arc::new(new_levels),
476            min_snapshot_seqno: self.min_snapshot_seqno,
477            log_number: self.log_number,
478            prev_log_number: self.prev_log_number,
479            next_file_number: self.next_file_number,
480            manifest_file_number: self.manifest_file_number,
481        }
482    }
483
484    /// Calculate target size for a level
485    fn level_target_size(&self, level: u32) -> u64 {
486        // Level 0: 64MB, Level 1: 256MB, each subsequent level 10x
487        match level {
488            0 => 64 * 1024 * 1024,
489            1 => 256 * 1024 * 1024,
490            _ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
491        }
492    }
493
494    /// Get total number of files across all levels
495    pub fn total_file_count(&self) -> usize {
496        self.levels.iter().map(|l| l.files.len()).sum()
497    }
498
499    /// Get level with highest compaction score
500    pub fn pick_compaction_level(&self) -> Option<u32> {
501        self.levels
502            .iter()
503            .filter(|l| l.compaction_score > 1.0)
504            .max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
505            .map(|l| l.level)
506    }
507}
508
509impl Default for SuperVersion {
510    fn default() -> Self {
511        Self::new()
512    }
513}
514
515// =============================================================================
516// VersionSet - The Top-Level Manager
517// =============================================================================
518
519/// VersionSet manages the current SuperVersion and provides atomic updates
520///
521/// This is the entry point for all metadata access. Readers call `get()` to
522/// acquire the current SuperVersion, writers call `install()` to atomically
523/// swap in a new version.
524///
525/// ## Thread Safety
526///
527/// - `get()`: Lock-free (atomic load)
528/// - `install()`: Serialized through internal mutex (writers must coordinate)
529///
530/// Multiple readers can proceed in parallel with zero synchronization.
531/// Writers are serialized to ensure consistent version progression.
532pub struct VersionSet {
533    /// Current SuperVersion (atomically swappable)
534    current: ArcSwap<SuperVersion>,
535
536    /// Version number counter
537    version_counter: AtomicU64,
538
539    /// Write serialization lock
540    /// Only one writer can modify the version at a time
541    write_lock: Mutex<()>,
542
543    /// Snapshot registry - tracks active snapshots for GC
544    snapshots: Mutex<BTreeMap<u64, u64>>, // seqno -> ref_count
545
546    /// Database directory
547    db_path: PathBuf,
548}
549
550impl VersionSet {
551    /// Create a new VersionSet
552    pub fn new(db_path: PathBuf) -> Self {
553        Self {
554            current: ArcSwap::from_pointee(SuperVersion::new()),
555            version_counter: AtomicU64::new(1),
556            write_lock: Mutex::new(()),
557            snapshots: Mutex::new(BTreeMap::new()),
558            db_path,
559        }
560    }
561
562    /// Get current SuperVersion (lock-free)
563    ///
564    /// This is the hot path for reads. Returns a Guard that holds an Arc
565    /// to the current SuperVersion. The Guard can be dereferenced to access
566    /// the SuperVersion.
567    ///
568    /// ## Performance
569    ///
570    /// - Time: O(1) atomic load
571    /// - No locks acquired
572    /// - No memory allocation
573    #[inline]
574    pub fn get(&self) -> Guard<Arc<SuperVersion>> {
575        self.current.load()
576    }
577
578    /// Get current SuperVersion as owned Arc
579    ///
580    /// Use this when you need to hold the SuperVersion across await points
581    /// or store it for later use.
582    #[inline]
583    pub fn get_arc(&self) -> Arc<SuperVersion> {
584        self.current.load_full()
585    }
586
587    /// Install a new SuperVersion (serialized)
588    ///
589    /// This atomically swaps the new version into place. The old version
590    /// remains valid for any readers that acquired it before the swap.
591    ///
592    /// ## Safety
593    ///
594    /// Only one writer should call this at a time. Use `with_write_lock()`
595    /// to serialize concurrent updates.
596    pub fn install(&self, new_version: SuperVersion) {
597        let _guard = self.write_lock.lock();
598        self.current.store(Arc::new(new_version));
599        self.version_counter.fetch_add(1, Ordering::SeqCst);
600    }
601
602    /// Execute a function with exclusive write access
603    ///
604    /// Use this to ensure atomic read-modify-write operations on the
605    /// version set. The function receives the current SuperVersion and
606    /// should return the new SuperVersion to install.
607    pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
608    where
609        F: FnOnce(&SuperVersion) -> SuperVersion,
610    {
611        let _guard = self.write_lock.lock();
612        let current = self.current.load();
613        let new_version = f(&current);
614        self.current.store(Arc::new(new_version.clone()));
615        self.version_counter.fetch_add(1, Ordering::SeqCst);
616        new_version
617    }
618
619    /// Register a snapshot at the given sequence number
620    ///
621    /// Returns the snapshot sequence number for later release.
622    pub fn register_snapshot(&self, seqno: u64) -> u64 {
623        let mut snapshots = self.snapshots.lock();
624        *snapshots.entry(seqno).or_insert(0) += 1;
625        seqno
626    }
627
628    /// Release a snapshot
629    pub fn release_snapshot(&self, seqno: u64) {
630        let mut snapshots = self.snapshots.lock();
631        if let Some(count) = snapshots.get_mut(&seqno) {
632            *count -= 1;
633            if *count == 0 {
634                snapshots.remove(&seqno);
635            }
636        }
637    }
638
639    /// Get minimum sequence number that must be preserved
640    ///
641    /// Returns the oldest snapshot sequence number, or the current version's
642    /// min_snapshot_seqno if no snapshots are active.
643    pub fn min_preserved_seqno(&self) -> u64 {
644        let snapshots = self.snapshots.lock();
645        snapshots
646            .keys()
647            .next()
648            .copied()
649            .unwrap_or_else(|| self.current.load().min_snapshot_seqno)
650    }
651
652    /// Update min_snapshot_seqno based on active snapshots
653    pub fn update_min_snapshot_seqno(&self) {
654        let min_seqno = self.min_preserved_seqno();
655        self.with_write_lock(|current| SuperVersion {
656            min_snapshot_seqno: min_seqno,
657            version_number: current.version_number + 1,
658            ..current.clone()
659        });
660    }
661
662    /// Get database path
663    pub fn db_path(&self) -> &PathBuf {
664        &self.db_path
665    }
666
667    /// Get current version number
668    pub fn version_number(&self) -> u64 {
669        self.version_counter.load(Ordering::SeqCst)
670    }
671}
672
673impl std::fmt::Debug for VersionSet {
674    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675        let current = self.current.load();
676        f.debug_struct("VersionSet")
677            .field("version_number", &current.version_number)
678            .field("num_immutables", &current.immutable_memtables.len())
679            .field("num_levels", &current.levels.len())
680            .field("total_files", &current.total_file_count())
681            .field("db_path", &self.db_path)
682            .finish()
683    }
684}
685
686// =============================================================================
687// SuperVersionHandle - RAII Guard for SuperVersion
688// =============================================================================
689
690/// RAII handle for SuperVersion access
691///
692/// This provides a convenient way to access a SuperVersion with automatic
693/// cleanup. The handle can be cloned to share access across threads.
694pub struct SuperVersionHandle {
695    version: Arc<SuperVersion>,
696    version_set: Arc<VersionSet>,
697    snapshot_seqno: Option<u64>,
698}
699
700impl SuperVersionHandle {
701    /// Create a new handle from a VersionSet
702    pub fn new(version_set: Arc<VersionSet>) -> Self {
703        let version = version_set.get_arc();
704        Self {
705            version,
706            version_set,
707            snapshot_seqno: None,
708        }
709    }
710
711    /// Create a handle with a registered snapshot
712    pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
713        let registered_seqno = version_set.register_snapshot(seqno);
714        let version = version_set.get_arc();
715        Self {
716            version,
717            version_set,
718            snapshot_seqno: Some(registered_seqno),
719        }
720    }
721
722    /// Get the SuperVersion
723    #[inline]
724    pub fn version(&self) -> &SuperVersion {
725        &self.version
726    }
727
728    /// Get the snapshot sequence number (if any)
729    pub fn snapshot_seqno(&self) -> Option<u64> {
730        self.snapshot_seqno
731    }
732}
733
734impl Drop for SuperVersionHandle {
735    fn drop(&mut self) {
736        if let Some(seqno) = self.snapshot_seqno {
737            self.version_set.release_snapshot(seqno);
738        }
739    }
740}
741
742impl Clone for SuperVersionHandle {
743    fn clone(&self) -> Self {
744        // If we have a snapshot, register another reference
745        if let Some(seqno) = self.snapshot_seqno {
746            self.version_set.register_snapshot(seqno);
747        }
748        Self {
749            version: Arc::clone(&self.version),
750            version_set: Arc::clone(&self.version_set),
751            snapshot_seqno: self.snapshot_seqno,
752        }
753    }
754}
755
756// =============================================================================
757// Tests
758// =============================================================================
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763    use std::thread;
764
765    #[test]
766    fn test_superversion_creation() {
767        let sv = SuperVersion::new();
768        assert_eq!(sv.version_number, 1);
769        assert!(sv.immutable_memtables.is_empty());
770        assert!(sv.levels.is_empty());
771    }
772
773    #[test]
774    fn test_version_set_get() {
775        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
776        let sv = vs.get();
777        assert_eq!(sv.version_number, 1);
778    }
779
780    #[test]
781    fn test_version_set_install() {
782        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
783
784        let new_sv = SuperVersion {
785            version_number: 2,
786            ..SuperVersion::new()
787        };
788        vs.install(new_sv);
789
790        let sv = vs.get();
791        assert_eq!(sv.version_number, 2);
792    }
793
794    #[test]
795    fn test_concurrent_reads() {
796        let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
797        let mut handles = vec![];
798
799        for _ in 0..10 {
800            let vs_clone = Arc::clone(&vs);
801            handles.push(thread::spawn(move || {
802                for _ in 0..1000 {
803                    let sv = vs_clone.get();
804                    assert!(sv.version_number >= 1);
805                }
806            }));
807        }
808
809        for h in handles {
810            h.join().unwrap();
811        }
812    }
813
814    #[test]
815    fn test_snapshot_registry() {
816        let vs = VersionSet::new(PathBuf::from("/tmp/test"));
817
818        // Register snapshots
819        vs.register_snapshot(100);
820        vs.register_snapshot(200);
821        vs.register_snapshot(100); // Duplicate
822
823        assert_eq!(vs.min_preserved_seqno(), 100);
824
825        // Release one reference to 100
826        vs.release_snapshot(100);
827        assert_eq!(vs.min_preserved_seqno(), 100); // Still have one ref
828
829        // Release second reference
830        vs.release_snapshot(100);
831        assert_eq!(vs.min_preserved_seqno(), 200);
832
833        // Release 200
834        vs.release_snapshot(200);
835    }
836
837    #[test]
838    fn test_level_binary_search() {
839        let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
840
841        // Add files in sorted order
842        for i in 0..10 {
843            let file = Arc::new(FileMetadata {
844                file_number: i as u64,
845                file_size: 1024,
846                smallest_key: format!("{:02}", i * 10).into_bytes(),
847                largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
848                num_entries: 100,
849                min_seqno: 1,
850                max_seqno: 100,
851                path: PathBuf::from(format!("/tmp/{}.sst", i)),
852                bloom_filter: None,
853                being_compacted: false,
854            });
855            level.files.push(file);
856            level.total_size += 1024;
857        }
858
859        // Test point lookup
860        let files = level.find_files_for_key(b"25");
861        assert_eq!(files.len(), 1);
862        assert_eq!(files[0].file_number, 2);
863
864        // Test range lookup
865        let files = level.find_files_for_range(b"15", b"35");
866        assert_eq!(files.len(), 3); // Files 1, 2, 3
867    }
868}