use arc_swap::{ArcSwap, Guard};
use parking_lot::Mutex;
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone)]
pub struct FileMetadata {
pub file_number: u64,
pub file_size: u64,
pub smallest_key: Vec<u8>,
pub largest_key: Vec<u8>,
pub num_entries: u64,
pub min_seqno: u64,
pub max_seqno: u64,
pub path: PathBuf,
pub bloom_filter: Option<Arc<BloomFilterHandle>>,
pub being_compacted: bool,
}
impl FileMetadata {
#[inline]
pub fn may_contain(&self, key: &[u8]) -> bool {
match &self.bloom_filter {
Some(bf) => bf.may_contain(key),
None => true, }
}
#[inline]
pub fn overlaps_range(&self, start: &[u8], end: &[u8]) -> bool {
if end.is_empty() {
self.smallest_key.as_slice() >= start || self.largest_key.as_slice() >= start
} else {
self.smallest_key.as_slice() <= end && self.largest_key.as_slice() >= start
}
}
}
#[derive(Debug)]
pub struct BloomFilterHandle {
bits: Vec<u64>,
num_hashes: u32,
}
impl BloomFilterHandle {
pub fn new(bits: Vec<u64>, num_hashes: u32) -> Self {
Self { bits, num_hashes }
}
#[inline]
pub fn may_contain(&self, key: &[u8]) -> bool {
if self.bits.is_empty() {
return true;
}
let num_bits = self.bits.len() * 64;
let h1 = Self::hash1(key);
let h2 = Self::hash2(key);
for i in 0..self.num_hashes {
let h = h1.wrapping_add((i as u64).wrapping_mul(h2));
let bit_idx = (h as usize) % num_bits;
let word_idx = bit_idx / 64;
let bit_pos = bit_idx % 64;
if self.bits[word_idx] & (1 << bit_pos) == 0 {
return false;
}
}
true
}
#[inline]
fn hash1(key: &[u8]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
key.hash(&mut hasher);
hasher.finish()
}
#[inline]
fn hash2(key: &[u8]) -> u64 {
twox_hash::xxh3::hash64(key)
}
}
#[derive(Debug, Clone)]
pub struct LevelMetadata {
pub level: u32,
pub files: Vec<Arc<FileMetadata>>,
pub total_size: u64,
pub target_size: u64,
pub compaction_score: f64,
}
impl LevelMetadata {
pub fn new(level: u32, target_size: u64) -> Self {
Self {
level,
files: Vec::new(),
total_size: 0,
target_size,
compaction_score: 0.0,
}
}
pub fn find_files_for_key(&self, key: &[u8]) -> Vec<&Arc<FileMetadata>> {
if self.level == 0 {
self.files
.iter()
.filter(|f| f.smallest_key.as_slice() <= key && f.largest_key.as_slice() >= key)
.collect()
} else {
let idx = self
.files
.partition_point(|f| f.largest_key.as_slice() < key);
if idx < self.files.len() && self.files[idx].smallest_key.as_slice() <= key {
vec![&self.files[idx]]
} else {
vec![]
}
}
}
pub fn find_files_for_range(&self, start: &[u8], end: &[u8]) -> Vec<&Arc<FileMetadata>> {
if self.level == 0 {
self.files
.iter()
.filter(|f| f.overlaps_range(start, end))
.collect()
} else {
let start_idx = self
.files
.partition_point(|f| f.largest_key.as_slice() < start);
let end_idx = if end.is_empty() {
self.files.len()
} else {
self.files
.partition_point(|f| f.smallest_key.as_slice() <= end)
};
self.files[start_idx..end_idx].iter().collect()
}
}
pub fn update_compaction_score(&mut self) {
if self.target_size > 0 {
self.compaction_score = self.total_size as f64 / self.target_size as f64;
} else {
self.compaction_score = 0.0;
}
}
}
#[derive(Debug, Clone)]
pub struct ImmutableMemTableRef {
pub id: u64,
pub seal_seqno: u64,
pub size_bytes: u64,
pub data: Arc<dyn ImmutableMemTable>,
}
pub trait ImmutableMemTable: Send + Sync + std::fmt::Debug {
fn get(&self, key: &[u8], seqno: u64) -> Option<Option<Vec<u8>>>;
fn iter(&self) -> Box<dyn Iterator<Item = (Vec<u8>, Option<Vec<u8>>, u64)> + '_>;
fn size(&self) -> u64;
}
#[derive(Debug, Clone)]
pub struct SuperVersion {
pub version_number: u64,
pub memtable_version: u64,
pub immutable_memtables: Arc<Vec<ImmutableMemTableRef>>,
pub levels: Arc<Vec<LevelMetadata>>,
pub min_snapshot_seqno: u64,
pub log_number: u64,
pub prev_log_number: u64,
pub next_file_number: u64,
pub manifest_file_number: u64,
}
impl SuperVersion {
pub fn new() -> Self {
Self {
version_number: 1,
memtable_version: 1,
immutable_memtables: Arc::new(Vec::new()),
levels: Arc::new(Vec::new()),
min_snapshot_seqno: 0,
log_number: 1,
prev_log_number: 0,
next_file_number: 2,
manifest_file_number: 1,
}
}
pub fn with_new_immutable(&self, imm: ImmutableMemTableRef) -> Self {
let mut new_imms = (*self.immutable_memtables).clone();
new_imms.push(imm);
Self {
version_number: self.version_number + 1,
memtable_version: self.memtable_version + 1,
immutable_memtables: Arc::new(new_imms),
levels: Arc::clone(&self.levels),
min_snapshot_seqno: self.min_snapshot_seqno,
log_number: self.log_number,
prev_log_number: self.prev_log_number,
next_file_number: self.next_file_number,
manifest_file_number: self.manifest_file_number,
}
}
pub fn with_flushed_memtables(
&self,
flushed_ids: &[u64],
new_files: Vec<(u32, Arc<FileMetadata>)>, ) -> Self {
let new_imms: Vec<_> = self
.immutable_memtables
.iter()
.filter(|imm| !flushed_ids.contains(&imm.id))
.cloned()
.collect();
let mut new_levels = (*self.levels).clone();
for (level, file) in new_files {
while new_levels.len() <= level as usize {
let target_size = self.level_target_size(new_levels.len() as u32);
new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
}
let lm = &mut new_levels[level as usize];
lm.total_size += file.file_size;
lm.files.push(file);
lm.update_compaction_score();
}
Self {
version_number: self.version_number + 1,
memtable_version: self.memtable_version,
immutable_memtables: Arc::new(new_imms),
levels: Arc::new(new_levels),
min_snapshot_seqno: self.min_snapshot_seqno,
log_number: self.log_number,
prev_log_number: self.prev_log_number,
next_file_number: self.next_file_number,
manifest_file_number: self.manifest_file_number,
}
}
pub fn with_compaction_result(
&self,
input_files: &[(u32, u64)], output_files: Vec<(u32, Arc<FileMetadata>)>,
) -> Self {
let mut new_levels = (*self.levels).clone();
for (level, file_num) in input_files {
if let Some(lm) = new_levels.get_mut(*level as usize) {
if let Some(pos) = lm.files.iter().position(|f| f.file_number == *file_num) {
let removed = lm.files.remove(pos);
lm.total_size -= removed.file_size;
}
}
}
for (level, file) in output_files {
while new_levels.len() <= level as usize {
let target_size = self.level_target_size(new_levels.len() as u32);
new_levels.push(LevelMetadata::new(new_levels.len() as u32, target_size));
}
let lm = &mut new_levels[level as usize];
lm.total_size += file.file_size;
if level > 0 {
let pos = lm
.files
.partition_point(|f| f.smallest_key < file.smallest_key);
lm.files.insert(pos, file);
} else {
lm.files.push(file);
}
lm.update_compaction_score();
}
Self {
version_number: self.version_number + 1,
memtable_version: self.memtable_version,
immutable_memtables: Arc::clone(&self.immutable_memtables),
levels: Arc::new(new_levels),
min_snapshot_seqno: self.min_snapshot_seqno,
log_number: self.log_number,
prev_log_number: self.prev_log_number,
next_file_number: self.next_file_number,
manifest_file_number: self.manifest_file_number,
}
}
fn level_target_size(&self, level: u32) -> u64 {
match level {
0 => 64 * 1024 * 1024,
1 => 256 * 1024 * 1024,
_ => 256 * 1024 * 1024 * 10u64.pow(level - 1),
}
}
pub fn total_file_count(&self) -> usize {
self.levels.iter().map(|l| l.files.len()).sum()
}
pub fn pick_compaction_level(&self) -> Option<u32> {
self.levels
.iter()
.filter(|l| l.compaction_score > 1.0)
.max_by(|a, b| a.compaction_score.partial_cmp(&b.compaction_score).unwrap())
.map(|l| l.level)
}
}
impl Default for SuperVersion {
fn default() -> Self {
Self::new()
}
}
pub struct VersionSet {
current: ArcSwap<SuperVersion>,
version_counter: AtomicU64,
write_lock: Mutex<()>,
snapshots: Mutex<BTreeMap<u64, u64>>,
db_path: PathBuf,
}
impl VersionSet {
pub fn new(db_path: PathBuf) -> Self {
Self {
current: ArcSwap::from_pointee(SuperVersion::new()),
version_counter: AtomicU64::new(1),
write_lock: Mutex::new(()),
snapshots: Mutex::new(BTreeMap::new()),
db_path,
}
}
#[inline]
pub fn get(&self) -> Guard<Arc<SuperVersion>> {
self.current.load()
}
#[inline]
pub fn get_arc(&self) -> Arc<SuperVersion> {
self.current.load_full()
}
pub fn install(&self, new_version: SuperVersion) {
let _guard = self.write_lock.lock();
self.current.store(Arc::new(new_version));
self.version_counter.fetch_add(1, Ordering::SeqCst);
}
pub fn with_write_lock<F>(&self, f: F) -> SuperVersion
where
F: FnOnce(&SuperVersion) -> SuperVersion,
{
let _guard = self.write_lock.lock();
let current = self.current.load();
let new_version = f(¤t);
self.current.store(Arc::new(new_version.clone()));
self.version_counter.fetch_add(1, Ordering::SeqCst);
new_version
}
pub fn register_snapshot(&self, seqno: u64) -> u64 {
let mut snapshots = self.snapshots.lock();
*snapshots.entry(seqno).or_insert(0) += 1;
seqno
}
pub fn release_snapshot(&self, seqno: u64) {
let mut snapshots = self.snapshots.lock();
if let Some(count) = snapshots.get_mut(&seqno) {
*count -= 1;
if *count == 0 {
snapshots.remove(&seqno);
}
}
}
pub fn min_preserved_seqno(&self) -> u64 {
let snapshots = self.snapshots.lock();
snapshots
.keys()
.next()
.copied()
.unwrap_or_else(|| self.current.load().min_snapshot_seqno)
}
pub fn update_min_snapshot_seqno(&self) {
let min_seqno = self.min_preserved_seqno();
self.with_write_lock(|current| SuperVersion {
min_snapshot_seqno: min_seqno,
version_number: current.version_number + 1,
..current.clone()
});
}
pub fn db_path(&self) -> &PathBuf {
&self.db_path
}
pub fn version_number(&self) -> u64 {
self.version_counter.load(Ordering::SeqCst)
}
}
impl std::fmt::Debug for VersionSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let current = self.current.load();
f.debug_struct("VersionSet")
.field("version_number", ¤t.version_number)
.field("num_immutables", ¤t.immutable_memtables.len())
.field("num_levels", ¤t.levels.len())
.field("total_files", ¤t.total_file_count())
.field("db_path", &self.db_path)
.finish()
}
}
pub struct SuperVersionHandle {
version: Arc<SuperVersion>,
version_set: Arc<VersionSet>,
snapshot_seqno: Option<u64>,
}
impl SuperVersionHandle {
pub fn new(version_set: Arc<VersionSet>) -> Self {
let version = version_set.get_arc();
Self {
version,
version_set,
snapshot_seqno: None,
}
}
pub fn with_snapshot(version_set: Arc<VersionSet>, seqno: u64) -> Self {
let registered_seqno = version_set.register_snapshot(seqno);
let version = version_set.get_arc();
Self {
version,
version_set,
snapshot_seqno: Some(registered_seqno),
}
}
#[inline]
pub fn version(&self) -> &SuperVersion {
&self.version
}
pub fn snapshot_seqno(&self) -> Option<u64> {
self.snapshot_seqno
}
}
impl Drop for SuperVersionHandle {
fn drop(&mut self) {
if let Some(seqno) = self.snapshot_seqno {
self.version_set.release_snapshot(seqno);
}
}
}
impl Clone for SuperVersionHandle {
fn clone(&self) -> Self {
if let Some(seqno) = self.snapshot_seqno {
self.version_set.register_snapshot(seqno);
}
Self {
version: Arc::clone(&self.version),
version_set: Arc::clone(&self.version_set),
snapshot_seqno: self.snapshot_seqno,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_superversion_creation() {
let sv = SuperVersion::new();
assert_eq!(sv.version_number, 1);
assert!(sv.immutable_memtables.is_empty());
assert!(sv.levels.is_empty());
}
#[test]
fn test_version_set_get() {
let vs = VersionSet::new(PathBuf::from("/tmp/test"));
let sv = vs.get();
assert_eq!(sv.version_number, 1);
}
#[test]
fn test_version_set_install() {
let vs = VersionSet::new(PathBuf::from("/tmp/test"));
let new_sv = SuperVersion {
version_number: 2,
..SuperVersion::new()
};
vs.install(new_sv);
let sv = vs.get();
assert_eq!(sv.version_number, 2);
}
#[test]
fn test_concurrent_reads() {
let vs = Arc::new(VersionSet::new(PathBuf::from("/tmp/test")));
let mut handles = vec![];
for _ in 0..10 {
let vs_clone = Arc::clone(&vs);
handles.push(thread::spawn(move || {
for _ in 0..1000 {
let sv = vs_clone.get();
assert!(sv.version_number >= 1);
}
}));
}
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_snapshot_registry() {
let vs = VersionSet::new(PathBuf::from("/tmp/test"));
vs.register_snapshot(100);
vs.register_snapshot(200);
vs.register_snapshot(100);
assert_eq!(vs.min_preserved_seqno(), 100);
vs.release_snapshot(100);
assert_eq!(vs.min_preserved_seqno(), 100);
vs.release_snapshot(100);
assert_eq!(vs.min_preserved_seqno(), 200);
vs.release_snapshot(200);
}
#[test]
fn test_level_binary_search() {
let mut level = LevelMetadata::new(1, 256 * 1024 * 1024);
for i in 0..10 {
let file = Arc::new(FileMetadata {
file_number: i as u64,
file_size: 1024,
smallest_key: format!("{:02}", i * 10).into_bytes(),
largest_key: format!("{:02}", i * 10 + 9).into_bytes(),
num_entries: 100,
min_seqno: 1,
max_seqno: 100,
path: PathBuf::from(format!("/tmp/{}.sst", i)),
bloom_filter: None,
being_compacted: false,
});
level.files.push(file);
level.total_size += 1024;
}
let files = level.find_files_for_key(b"25");
assert_eq!(files.len(), 1);
assert_eq!(files[0].file_number, 2);
let files = level.find_files_for_range(b"15", b"35");
assert_eq!(files.len(), 3); }
}