sochdb_storage/version_set.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! SuperVersion Metadata + Copy-on-Write Version Set
19//!
20//! This module implements RocksDB-style SuperVersion metadata management for
21//! near lock-free reads. The key insight is that read paths only need a consistent
22//! snapshot of metadata (memtable, immutable memtables, SSTable levels), not
23//! exclusive access to the underlying data structures.
24//!
25//! ## Problem Analysis
26//!
27//! Previous implementation suffered from lock contention:
28//! - Foreground reads must traverse memtable (RwLock) → immutables → levels
29//! - Each level switch may acquire additional locks
30//! - Compaction/flush modifies metadata while readers hold references
31//! - Lock-order complexity leads to deadlock potential
32//!
33//! ## Solution: SuperVersion + ArcSwap
34//!
35//! ```text
36//! ┌─────────────────────────────────────────────────────────────────┐
37//! │ VersionSet │
38//! │ ┌─────────────────────────────────────────────────────────────┐│
39//! │ │ current: ArcSwap<SuperVersion> ◄─── Atomic swap (O(1)) ││
40//! │ └─────────────────────────────────────────────────────────────┘│
41//! │ │ │
42//! │ ▼ │
43//! │ ┌─────────────────────────────────────────────────────────────┐│
44//! │ │ SuperVersion ││
45//! │ │ ┌─────────────┐ ┌──────────────┐ ┌───────────────────┐ ││
46//! │ │ │ memtable │ │ immutables │ │ level_files │ ││
47//! │ │ │ Arc<...> │ │ Arc<Vec<..>> │ │ Arc<Vec<Level>> │ ││
48//! │ │ └─────────────┘ └──────────────┘ └───────────────────┘ ││
49//! │ └─────────────────────────────────────────────────────────────┘│
50//! └─────────────────────────────────────────────────────────────────┘
51//! ```
52//!
53//! ## Read Path (Lock-Free)
54//!
55//! 1. Load current SuperVersion via atomic: `let sv = version_set.get();` // O(1)
56//! 2. Search memtable → immutables → levels using sv references
57//! 3. Release sv when done (Arc drop)
58//!
59//! No locks acquired. Readers are completely decoupled from writers.
60//!
61//! ## Write Path (Copy-on-Write)
62//!
63//! 1. Clone current SuperVersion's inner Arcs
64//! 2. Modify the new copy (e.g., add new immutable memtable)
65//! 3. Atomically swap new SuperVersion into place
66//! 4. Old SuperVersion kept alive by existing readers (Arc)
67//!
68//! ## Complexity Analysis
69//!
70//! | Operation | Old (with locks) | New (SuperVersion) |
71//! |--------------------|----------------------|---------------------------|
72//! | Read acquire | O(1) but contended | O(1) atomic load |
73//! | Read release | O(1) unlock | O(1) Arc decrement |
74//! | Metadata update | O(1) + lock wait | O(changed_metadata) clone |
75//! | Concurrent reads | Serialized on RwLock | Truly parallel |
76//!
77//! For 8 threads, expected speedup: ~6-8x on read-heavy workloads.
78
79use arc_swap::{ArcSwap, Guard};
80use parking_lot::Mutex;
81use std::collections::BTreeMap;
82use std::path::PathBuf;
83use std::sync::Arc;
84use std::sync::atomic::{AtomicU64, Ordering};
85
86// =============================================================================
87// File Metadata
88// =============================================================================
89
90/// Metadata for an SSTable file
91#[derive(Debug, Clone)]
92pub struct FileMetadata {
93 /// Unique file number
94 pub file_number: u64,
95 /// File size in bytes
96 pub file_size: u64,
97 /// Smallest key in file (for range queries)
98 pub smallest_key: Vec<u8>,
99 /// Largest key in file (for range queries)
100 pub largest_key: Vec<u8>,
101 /// Number of entries in file
102 pub num_entries: u64,
103 /// Minimum sequence number in file
104 pub min_seqno: u64,
105 /// Maximum sequence number in file
106 pub max_seqno: u64,
107 /// Path to the file
108 pub path: PathBuf,
109 /// Bloom filter (if loaded)
110 pub bloom_filter: Option<Arc<BloomFilterHandle>>,
111 /// Whether file is being compacted
112 pub being_compacted: bool,
113}
114
115impl FileMetadata {
116 /// Check if key might be in this file using bloom filter
117 #[inline]
118 pub fn may_contain(&self, key: &[u8]) -> bool {
119 match &self.bloom_filter {
120 Some(bf) => bf.may_contain(key),
121 None => true, // No filter = must check file
122 }
123 }
124
125 /// Check if a key range overlaps with this file
126 #[inline]
127 pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
128 if end.is_empty() {
129 // Unbounded end - overlaps if smallest_key <= end conceptually
130 self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
131 } else {
132 // Standard range overlap check
133 self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
134 }
135 }
136}
137
138/// Handle to a bloom filter (may be memory-mapped or in-memory)
139#[derive(Debug)]
140pub struct BloomFilterHandle {
141 /// The bloom filter bits
142 bits: Vec<u64>,
143 /// Number of hash functions
144 num_hashes: u32,
145}
146
147impl BloomFilterHandle {
148 /// Create a new bloom filter handle
149 pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
150 Self { bits, num_hashes }
151 }
152
153 /// Check if key may be present
154 #[inline]
155 pub fn may_contain(&self, key: &[u8]) -> bool {
156 if self.bits.is_empty() {
157 return true;
158 }
159 let num_bits = self.bits.len() * 64;
160 let h1 = Self::hash1(key);
161 let h2 = Self::hash2(key);
162
163 for i in 0..self.num_hashes {
164 let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
165 let bit_idx = (h as usize) % num_bits;
166 let word_idx = bit_idx / 64;
167 let bit_pos = bit_idx % 64;
168 if self.bits[word_idx] & (1 << bit_pos) == 0 {
169 return false;
170 }
171 }
172 true
173 }
174
175 #[inline]
176 fn hash1(key: &[u8]) -> u64 {
177 use std::collections::hash_map::DefaultHasher;
178 use std::hash::{Hash, Hasher};
179 let mut hasher = DefaultHasher::new();
180 key.hash(&mut hasher);
181 hasher.finish()
182 }
183
184 #[inline]
185 fn hash2(key: &[u8]) -> u64 {
186 twox_hash::xxh3::hash64(key)
187 }
188}
189
190// =============================================================================
191// Level Metadata
192// =============================================================================
193
194/// Metadata for a single level in the LSM tree
195#[derive(Debug, Clone)]
196pub struct LevelMetadata {
197 /// Level number (0 = unsorted, 1+ = sorted)
198 pub level: u32,
199 /// Files in this level (sorted by smallest_key for L1+)
200 pub files: Vec<Arc<FileMetadata>>,
201 /// Total size of files in this level
202 pub total_size: u64,
203 /// Target size for this level (for compaction decisions)
204 pub target_size: u64,
205 /// Compaction score (size / target_size)
206 pub compaction_score: f64,
207}
208
209impl LevelMetadata {
210 /// Create a new empty level
211 pub fn new(level: u32, target_size: u64) -> Self {
212 Self {
213 level,
214 files: Vec::new(),
215 total_size: 0,
216 target_size,
217 compaction_score: 0.0,
218 }
219 }
220
221 /// Find files that may contain a key using binary search (for L1+)
222 pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
223 if self.level == 0 {
224 // L0 is unsorted - must check all files
225 self.files
226 .iter()
227 .filter(|f| f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key)
228 .collect()
229 } else {
230 // L1+ is sorted - binary search for overlapping files
231 let idx = self
232 .files
233 .partition_point(|f| f.largest_key.as_slice() < key);
234 if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
235 vec![&self.files[idx]]
236 } else {
237 vec![]
238 }
239 }
240 }
241
242 /// Find files that overlap with a key range
243 pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
244 if self.level == 0 {
245 // L0 is unsorted - check all files
246 self.files
247 .iter()
248 .filter(|f| f.overlaps_range(start, end))
249 .collect()
250 } else {
251 // L1+ is sorted - find range using binary search
252 let start_idx = self
253 .files
254 .partition_point(|f| f.largest_key.as_slice() < start);
255 let end_idx = if end.is_empty() {
256 self.files.len()
257 } else {
258 self.files
259 .partition_point(|f| f.smallest_key.as_slice() <= end)
260 };
261 self.files[start_idx..end_idx].iter().collect()
262 }
263 }
264
265 /// Recalculate compaction score
266 pub fn update_compaction_score(&mut self) {
267 if self.target_size > 0 {
268 self.compaction_score = self.total_size as f64 / self.target_size as f64;
269 } else {
270 self.compaction_score = 0.0;
271 }
272 }
273}
274
275// =============================================================================
276// Immutable MemTable Reference
277// =============================================================================
278
279/// Reference to an immutable memtable (sealed, pending flush)
280#[derive(Debug, Clone)]
281pub struct ImmutableMemTableRef {
282 /// Unique ID for this immutable memtable
283 pub id: u64,
284 /// Sequence number when sealed
285 pub seal_seqno: u64,
286 /// Approximate size in bytes
287 pub size_bytes: u64,
288 /// Reference to the actual memtable data
289 /// This is opaque - the actual type is provided by the storage layer
290 pub data: Arc<dyn ImmutableMemTable>,
291}
292
293/// Trait for immutable memtable operations
294pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
295 /// Get a value at a specific sequence number
296 fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
297
298 /// Iterate over all entries
299 fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
300
301 /// Get approximate size
302 fn size(&self) -> u64;
303}
304
305// =============================================================================
306// SuperVersion - The Core Abstraction
307// =============================================================================
308
309/// SuperVersion: A consistent snapshot of all storage metadata
310///
311/// This is the key abstraction for lock-free reads. A SuperVersion contains
312/// Arc references to:
313/// - Current mutable memtable
314/// - List of immutable memtables (pending flush)
315/// - All SSTable levels and their file metadata
316///
317/// Readers acquire a SuperVersion (O(1) atomic load), use it for their entire
318/// read operation, then release it (O(1) Arc drop). No locks required.
319///
320/// Writers create a new SuperVersion with modified metadata and atomically
321/// swap it in. Old versions remain valid until all readers release them.
322#[derive(Debug, Clone)]
323pub struct SuperVersion {
324 /// Version number (monotonically increasing)
325 pub version_number: u64,
326
327 /// Current mutable memtable reference
328 /// Note: The memtable itself may have internal synchronization,
329 /// but the *reference* is immutable within a SuperVersion
330 pub memtable_version: u64,
331
332 /// Immutable memtables (oldest first)
333 pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
334
335 /// SSTable levels (L0, L1, L2, ...)
336 pub levels: Arc<Vec<LevelMetadata>>,
337
338 /// Minimum sequence number safe to garbage collect
339 /// Versions with seqno < this can be pruned during compaction
340 pub min_snapshot_seqno: u64,
341
342 /// Current log (WAL) number for crash recovery
343 pub log_number: u64,
344
345 /// Prev log number (for two-log protocol during flush)
346 pub prev_log_number: u64,
347
348 /// Next file number to allocate
349 pub next_file_number: u64,
350
351 /// Manifest file number
352 pub manifest_file_number: u64,
353}
354
355impl SuperVersion {
356 /// Create a new empty SuperVersion
357 pub fn new() -> Self {
358 Self {
359 version_number: 1,
360 memtable_version: 1,
361 immutable_memtables: Arc::new(Vec::new()),
362 levels: Arc::new(Vec::new()),
363 min_snapshot_seqno: 0,
364 log_number: 1,
365 prev_log_number: 0,
366 next_file_number: 2,
367 manifest_file_number: 1,
368 }
369 }
370
371 /// Create a new SuperVersion with updated immutable memtables
372 pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
373 let mut new_imms = (*self.immutable_memtables).clone();
374 new_imms.push(imm);
375
376 Self {
377 version_number: self.version_number + 1,
378 memtable_version: self.memtable_version + 1,
379 immutable_memtables: Arc::new(new_imms),
380 levels: Arc::clone(&self.levels),
381 min_snapshot_seqno: self.min_snapshot_seqno,
382 log_number: self.log_number,
383 prev_log_number: self.prev_log_number,
384 next_file_number: self.next_file_number,
385 manifest_file_number: self.manifest_file_number,
386 }
387 }
388
389 /// Create a new SuperVersion after flushing immutable memtables
390 pub fn with_flushed_memtables(
391 &self,
392 flushed_ids: &[u64],
393 new_files: Vec<(u32, Arc<FileMetadata>)>, // (level, file)
394 ) -> Self {
395 // Remove flushed immutable memtables
396 let new_imms: Vec<_> = self
397 .immutable_memtables
398 .iter()
399 .filter(|imm| !flushed_ids.contains(&imm.id))
400 .cloned()
401 .collect();
402
403 // Add new files to levels
404 let mut new_levels = (*self.levels).clone();
405 for (level, file) in new_files {
406 // Ensure level exists
407 while new_levels.len() <= level as usize {
408 let target_size = self.level_target_size(new_levels.len() as u32);
409 new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
410 }
411
412 let lm = &mut new_levels[level as usize];
413 lm.total_size += file.file_size;
414 lm.files.push(file);
415 lm.update_compaction_score();
416 }
417
418 Self {
419 version_number: self.version_number + 1,
420 memtable_version: self.memtable_version,
421 immutable_memtables: Arc::new(new_imms),
422 levels: Arc::new(new_levels),
423 min_snapshot_seqno: self.min_snapshot_seqno,
424 log_number: self.log_number,
425 prev_log_number: self.prev_log_number,
426 next_file_number: self.next_file_number,
427 manifest_file_number: self.manifest_file_number,
428 }
429 }
430
431 /// Create a new SuperVersion after compaction
432 pub fn with_compaction_result(
433 &self,
434 input_files: &[(u32, u64)], // (level, file_number)
435 output_files: Vec<(u32, Arc<FileMetadata>)>,
436 ) -> Self {
437 let mut new_levels = (*self.levels).clone();
438
439 // Remove input files
440 for (level, file_num) in input_files {
441 if let Some(lm) = new_levels.get_mut(*level as usize) {
442 if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
443 let removed = lm.files.remove(pos);
444 lm.total_size -= removed.file_size;
445 }
446 }
447 }
448
449 // Add output files
450 for (level, file) in output_files {
451 while new_levels.len() <= level as usize {
452 let target_size = self.level_target_size(new_levels.len() as u32);
453 new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
454 }
455
456 let lm = &mut new_levels[level as usize];
457 lm.total_size += file.file_size;
458
459 // Insert in sorted order for L1+
460 if level > 0 {
461 let pos = lm
462 .files
463 .partition_point(|f| f.smallest_key < file.smallest_key);
464 lm.files.insert(pos, file);
465 } else {
466 lm.files.push(file);
467 }
468 lm.update_compaction_score();
469 }
470
471 Self {
472 version_number: self.version_number + 1,
473 memtable_version: self.memtable_version,
474 immutable_memtables: Arc::clone(&self.immutable_memtables),
475 levels: Arc::new(new_levels),
476 min_snapshot_seqno: self.min_snapshot_seqno,
477 log_number: self.log_number,
478 prev_log_number: self.prev_log_number,
479 next_file_number: self.next_file_number,
480 manifest_file_number: self.manifest_file_number,
481 }
482 }
483
484 /// Calculate target size for a level
485 fn level_target_size(&self, level: u32) -> u64 {
486 // Level 0: 64MB, Level 1: 256MB, each subsequent level 10x
487 match level {
488 0 => 64 * 1024 * 1024,
489 1 => 256 * 1024 * 1024,
490 _ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
491 }
492 }
493
494 /// Get total number of files across all levels
495 pub fn total_file_count(&self) -> usize {
496 self.levels.iter().map(|l| l.files.len()).sum()
497 }
498
499 /// Get level with highest compaction score
500 pub fn pick_compaction_level(&self) -> Option<u32> {
501 self.levels
502 .iter()
503 .filter(|l| l.compaction_score > 1.0)
504 .max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
505 .map(|l| l.level)
506 }
507}
508
509impl Default for SuperVersion {
510 fn default() -> Self {
511 Self::new()
512 }
513}
514
515// =============================================================================
516// VersionSet - The Top-Level Manager
517// =============================================================================
518
519/// VersionSet manages the current SuperVersion and provides atomic updates
520///
521/// This is the entry point for all metadata access. Readers call `get()` to
522/// acquire the current SuperVersion, writers call `install()` to atomically
523/// swap in a new version.
524///
525/// ## Thread Safety
526///
527/// - `get()`: Lock-free (atomic load)
528/// - `install()`: Serialized through internal mutex (writers must coordinate)
529///
530/// Multiple readers can proceed in parallel with zero synchronization.
531/// Writers are serialized to ensure consistent version progression.
532pub struct VersionSet {
533 /// Current SuperVersion (atomically swappable)
534 current: ArcSwap<SuperVersion>,
535
536 /// Version number counter
537 version_counter: AtomicU64,
538
539 /// Write serialization lock
540 /// Only one writer can modify the version at a time
541 write_lock: Mutex<()>,
542
543 /// Snapshot registry - tracks active snapshots for GC
544 snapshots: Mutex<BTreeMap<u64, u64>>, // seqno -> ref_count
545
546 /// Database directory
547 db_path: PathBuf,
548}
549
550impl VersionSet {
551 /// Create a new VersionSet
552 pub fn new(db_path: PathBuf) -> Self {
553 Self {
554 current: ArcSwap::from_pointee(SuperVersion::new()),
555 version_counter: AtomicU64::new(1),
556 write_lock: Mutex::new(()),
557 snapshots: Mutex::new(BTreeMap::new()),
558 db_path,
559 }
560 }
561
562 /// Get current SuperVersion (lock-free)
563 ///
564 /// This is the hot path for reads. Returns a Guard that holds an Arc
565 /// to the current SuperVersion. The Guard can be dereferenced to access
566 /// the SuperVersion.
567 ///
568 /// ## Performance
569 ///
570 /// - Time: O(1) atomic load
571 /// - No locks acquired
572 /// - No memory allocation
573 #[inline]
574 pub fn get(&self) -> Guard<Arc<SuperVersion>> {
575 self.current.load()
576 }
577
578 /// Get current SuperVersion as owned Arc
579 ///
580 /// Use this when you need to hold the SuperVersion across await points
581 /// or store it for later use.
582 #[inline]
583 pub fn get_arc(&self) -> Arc<SuperVersion> {
584 self.current.load_full()
585 }
586
587 /// Install a new SuperVersion (serialized)
588 ///
589 /// This atomically swaps the new version into place. The old version
590 /// remains valid for any readers that acquired it before the swap.
591 ///
592 /// ## Safety
593 ///
594 /// Only one writer should call this at a time. Use `with_write_lock()`
595 /// to serialize concurrent updates.
596 pub fn install(&self, new_version: SuperVersion) {
597 let _guard = self.write_lock.lock();
598 self.current.store(Arc::new(new_version));
599 self.version_counter.fetch_add(1, Ordering::SeqCst);
600 }
601
602 /// Execute a function with exclusive write access
603 ///
604 /// Use this to ensure atomic read-modify-write operations on the
605 /// version set. The function receives the current SuperVersion and
606 /// should return the new SuperVersion to install.
607 pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
608 where
609 F: FnOnce(&SuperVersion) -> SuperVersion,
610 {
611 let _guard = self.write_lock.lock();
612 let current = self.current.load();
613 let new_version = f(¤t);
614 self.current.store(Arc::new(new_version.clone()));
615 self.version_counter.fetch_add(1, Ordering::SeqCst);
616 new_version
617 }
618
619 /// Register a snapshot at the given sequence number
620 ///
621 /// Returns the snapshot sequence number for later release.
622 pub fn register_snapshot(&self, seqno: u64) -> u64 {
623 let mut snapshots = self.snapshots.lock();
624 *snapshots.entry(seqno).or_insert(0) += 1;
625 seqno
626 }
627
628 /// Release a snapshot
629 pub fn release_snapshot(&self, seqno: u64) {
630 let mut snapshots = self.snapshots.lock();
631 if let Some(count) = snapshots.get_mut(&seqno) {
632 *count -= 1;
633 if *count == 0 {
634 snapshots.remove(&seqno);
635 }
636 }
637 }
638
639 /// Get minimum sequence number that must be preserved
640 ///
641 /// Returns the oldest snapshot sequence number, or the current version's
642 /// min_snapshot_seqno if no snapshots are active.
643 pub fn min_preserved_seqno(&self) -> u64 {
644 let snapshots = self.snapshots.lock();
645 snapshots
646 .keys()
647 .next()
648 .copied()
649 .unwrap_or_else(|| self.current.load().min_snapshot_seqno)
650 }
651
652 /// Update min_snapshot_seqno based on active snapshots
653 pub fn update_min_snapshot_seqno(&self) {
654 let min_seqno = self.min_preserved_seqno();
655 self.with_write_lock(|current| SuperVersion {
656 min_snapshot_seqno: min_seqno,
657 version_number: current.version_number + 1,
658 ..current.clone()
659 });
660 }
661
662 /// Get database path
663 pub fn db_path(&self) -> &PathBuf {
664 &self.db_path
665 }
666
667 /// Get current version number
668 pub fn version_number(&self) -> u64 {
669 self.version_counter.load(Ordering::SeqCst)
670 }
671}
672
673impl std::fmt::Debug for VersionSet {
674 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
675 let current = self.current.load();
676 f.debug_struct("VersionSet")
677 .field("version_number", ¤t.version_number)
678 .field("num_immutables", ¤t.immutable_memtables.len())
679 .field("num_levels", ¤t.levels.len())
680 .field("total_files", ¤t.total_file_count())
681 .field("db_path", &self.db_path)
682 .finish()
683 }
684}
685
686// =============================================================================
687// SuperVersionHandle - RAII Guard for SuperVersion
688// =============================================================================
689
690/// RAII handle for SuperVersion access
691///
692/// This provides a convenient way to access a SuperVersion with automatic
693/// cleanup. The handle can be cloned to share access across threads.
694pub struct SuperVersionHandle {
695 version: Arc<SuperVersion>,
696 version_set: Arc<VersionSet>,
697 snapshot_seqno: Option<u64>,
698}
699
700impl SuperVersionHandle {
701 /// Create a new handle from a VersionSet
702 pub fn new(version_set: Arc<VersionSet>) -> Self {
703 let version = version_set.get_arc();
704 Self {
705 version,
706 version_set,
707 snapshot_seqno: None,
708 }
709 }
710
711 /// Create a handle with a registered snapshot
712 pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
713 let registered_seqno = version_set.register_snapshot(seqno);
714 let version = version_set.get_arc();
715 Self {
716 version,
717 version_set,
718 snapshot_seqno: Some(registered_seqno),
719 }
720 }
721
722 /// Get the SuperVersion
723 #[inline]
724 pub fn version(&self) -> &SuperVersion {
725 &self.version
726 }
727
728 /// Get the snapshot sequence number (if any)
729 pub fn snapshot_seqno(&self) -> Option<u64> {
730 self.snapshot_seqno
731 }
732}
733
734impl Drop for SuperVersionHandle {
735 fn drop(&mut self) {
736 if let Some(seqno) = self.snapshot_seqno {
737 self.version_set.release_snapshot(seqno);
738 }
739 }
740}
741
742impl Clone for SuperVersionHandle {
743 fn clone(&self) -> Self {
744 // If we have a snapshot, register another reference
745 if let Some(seqno) = self.snapshot_seqno {
746 self.version_set.register_snapshot(seqno);
747 }
748 Self {
749 version: Arc::clone(&self.version),
750 version_set: Arc::clone(&self.version_set),
751 snapshot_seqno: self.snapshot_seqno,
752 }
753 }
754}
755
756// =============================================================================
757// Tests
758// =============================================================================
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763 use std::thread;
764
765 #[test]
766 fn test_superversion_creation() {
767 let sv = SuperVersion::new();
768 assert_eq!(sv.version_number, 1);
769 assert!(sv.immutable_memtables.is_empty());
770 assert!(sv.levels.is_empty());
771 }
772
773 #[test]
774 fn test_version_set_get() {
775 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
776 let sv = vs.get();
777 assert_eq!(sv.version_number, 1);
778 }
779
780 #[test]
781 fn test_version_set_install() {
782 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
783
784 let new_sv = SuperVersion {
785 version_number: 2,
786 ..SuperVersion::new()
787 };
788 vs.install(new_sv);
789
790 let sv = vs.get();
791 assert_eq!(sv.version_number, 2);
792 }
793
794 #[test]
795 fn test_concurrent_reads() {
796 let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
797 let mut handles = vec![];
798
799 for _ in 0..10 {
800 let vs_clone = Arc::clone(&vs);
801 handles.push(thread::spawn(move || {
802 for _ in 0..1000 {
803 let sv = vs_clone.get();
804 assert!(sv.version_number >= 1);
805 }
806 }));
807 }
808
809 for h in handles {
810 h.join().unwrap();
811 }
812 }
813
814 #[test]
815 fn test_snapshot_registry() {
816 let vs = VersionSet::new(PathBuf::from("/tmp/test"));
817
818 // Register snapshots
819 vs.register_snapshot(100);
820 vs.register_snapshot(200);
821 vs.register_snapshot(100); // Duplicate
822
823 assert_eq!(vs.min_preserved_seqno(), 100);
824
825 // Release one reference to 100
826 vs.release_snapshot(100);
827 assert_eq!(vs.min_preserved_seqno(), 100); // Still have one ref
828
829 // Release second reference
830 vs.release_snapshot(100);
831 assert_eq!(vs.min_preserved_seqno(), 200);
832
833 // Release 200
834 vs.release_snapshot(200);
835 }
836
837 #[test]
838 fn test_level_binary_search() {
839 let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
840
841 // Add files in sorted order
842 for i in 0..10 {
843 let file = Arc::new(FileMetadata {
844 file_number: i as u64,
845 file_size: 1024,
846 smallest_key: format!("{:02}", i * 10).into_bytes(),
847 largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
848 num_entries: 100,
849 min_seqno: 1,
850 max_seqno: 100,
851 path: PathBuf::from(format!("/tmp/{}.sst", i)),
852 bloom_filter: None,
853 being_compacted: false,
854 });
855 level.files.push(file);
856 level.total_size += 1024;
857 }
858
859 // Test point lookup
860 let files = level.find_files_for_key(b"25");
861 assert_eq!(files.len(), 1);
862 assert_eq!(files[0].file_number, 2);
863
864 // Test range lookup
865 let files = level.find_files_for_range(b"15", b"35");
866 assert_eq!(files.len(), 3); // Files 1, 2, 3
867 }
868}