use std::cmp::Ordering;
use std::collections::HashSet;
use std::sync::Arc;
use crate::config::MAX_NUM_LEVELS;
use crate::key::InternalKey;
use crate::utils::comparator::Comparator;
use crate::utils::linked_list::SharedNode;
use super::file_metadata::{FileMetadata, FileMetadataBySmallestKey};
use super::version::Version;
use super::version_manifest::DeletedFile;
use super::VersionChangeManifest;
pub(crate) struct VersionBuilder {
deleted_files: [HashSet<u64>; MAX_NUM_LEVELS],
added_files: [HashSet<Arc<FileMetadata>>; MAX_NUM_LEVELS],
compaction_pointers: [Option<InternalKey>; MAX_NUM_LEVELS],
already_invoked: bool,
}
impl VersionBuilder {
pub(crate) fn new() -> Self {
Self {
deleted_files: Default::default(),
added_files: Default::default(),
compaction_pointers: Default::default(),
already_invoked: false,
}
}
pub(crate) fn accumulate_changes(&mut self, change_manifest: &VersionChangeManifest) {
for ptr in &change_manifest.compaction_pointers {
let (level, key) = ptr;
self.compaction_pointers[*level] = Some(key.clone());
}
for file in &change_manifest.deleted_files {
let DeletedFile { level, file_number } = file;
self.deleted_files[*level].insert(*file_number);
}
for (level, file) in &change_manifest.new_files {
let new_file = Arc::new(file.clone());
self.deleted_files[*level].remove(&new_file.file_number());
self.added_files[*level].insert(new_file);
}
}
pub(crate) fn apply_changes(
&mut self,
base_version: &SharedNode<Version>,
wal_file_number: u64,
prev_sequence_number: u64,
vset_compaction_pointers: &mut [Option<InternalKey>; MAX_NUM_LEVELS],
) -> Version {
assert!(
!self.already_invoked,
"Cannot call `apply_changes` more than once on a `VersionBuilder`."
);
self.already_invoked = true;
for (level, ptr) in self.compaction_pointers.iter_mut().enumerate() {
if ptr.is_some() {
vset_compaction_pointers[level] = ptr.take();
}
}
let base = &base_version.read().element;
let mut new_version = base.new_from_current(wal_file_number, prev_sequence_number);
for level in 0..MAX_NUM_LEVELS {
let mut level_added_files: Vec<Arc<FileMetadata>> =
self.added_files[level].iter().map(Arc::clone).collect();
level_added_files.sort_by(|a, b| FileMetadataBySmallestKey::compare(a, b));
let mut sorted_base_files = base.files[level].clone();
sorted_base_files.sort_by(|a, b| FileMetadataBySmallestKey::compare(a, b));
let mut added_files_idx = 0;
let mut base_files_idx = 0;
new_version.files[level] =
Vec::with_capacity(base.num_files_at_level(level) + self.added_files.len());
while added_files_idx < level_added_files.len()
&& base_files_idx < sorted_base_files.len()
{
let added_file = &level_added_files[added_files_idx];
let base_file = &sorted_base_files[base_files_idx];
if FileMetadataBySmallestKey::compare(base_file, added_file) == Ordering::Less {
self.maybe_add_file(&mut new_version, level, Arc::clone(base_file));
base_files_idx += 1;
} else {
self.maybe_add_file(&mut new_version, level, Arc::clone(added_file));
added_files_idx += 1;
}
}
while base_files_idx < sorted_base_files.len() {
let base_file = &sorted_base_files[base_files_idx];
self.maybe_add_file(&mut new_version, level, Arc::clone(base_file));
base_files_idx += 1;
}
while added_files_idx < level_added_files.len() {
let added_file = &level_added_files[added_files_idx];
self.maybe_add_file(&mut new_version, level, Arc::clone(added_file));
added_files_idx += 1;
}
if cfg!(debug_assertions) && level > 0 {
for file_idx in 1..new_version.files[level].len() {
let prev_key_range_end = new_version.files[level][file_idx - 1].largest_key();
let curr_key_range_start = new_version.files[level][file_idx].smallest_key();
if prev_key_range_end >= curr_key_range_start {
panic!("There was an overlapping key-range in level {} while applying changes for a new version", level);
}
}
}
}
new_version
}
}
impl VersionBuilder {
fn maybe_add_file(&self, version: &mut Version, level: usize, file: Arc<FileMetadata>) {
if self.deleted_files[level].contains(&file.file_number()) {
return;
}
let files = &mut version.files[level];
if level > 0 && !files.is_empty() {
let last_file = files.last().unwrap();
assert!(
last_file.largest_key() < file.smallest_key(),
"Attempting to add file number {} to level {} created an overlap with file number {}.",
file.file_number(),
level,
last_file.file_number()
);
}
files.push(file);
}
}
#[cfg(test)]
mod version_builder_tests {
use parking_lot::RwLock;
use pretty_assertions::assert_eq;
use crate::table_cache::TableCache;
use crate::utils::linked_list::Node;
use crate::{DbOptions, Operation};
use super::*;
#[test]
fn accumulates_changes_successfully() {
let mut version_builder = VersionBuilder::new();
let mut change_manifest = VersionChangeManifest::default();
for idx in 0..4 {
change_manifest.add_file(
3,
2900 + idx,
4 * 1024 * 1024,
InternalKey::new(b"abc".to_vec(), idx, Operation::Put)
..InternalKey::new(b"xyz".to_vec(), 100 + idx, Operation::Delete),
);
change_manifest.remove_file(4, 2880 + idx);
change_manifest.add_compaction_pointer(
idx as usize,
InternalKey::new(b"rst".to_vec(), 100 + idx, Operation::Put),
);
}
version_builder.accumulate_changes(&change_manifest);
for idx in 0..4 {
assert!(version_builder.compaction_pointers[idx].is_some());
assert!(version_builder.deleted_files[4].contains(&(2880 + idx as u64)));
}
assert_eq!(version_builder.added_files[3].len(), 4);
assert_eq!(version_builder.added_files[4].len(), 0);
let mut change_manifest2 = VersionChangeManifest::default();
change_manifest2.add_file(
4,
2882,
4 * 1024 * 1024,
InternalKey::new(b"abc".to_vec(), 2, Operation::Put)
..InternalKey::new(b"xyz".to_vec(), 102, Operation::Delete),
);
version_builder.accumulate_changes(&change_manifest2);
assert!(!version_builder.deleted_files[4].contains(&2882));
assert_eq!(version_builder.added_files[4].len(), 1);
}
#[test]
fn files_marked_to_be_added_and_compaction_pointers_are_added_to_new_version() {
let db_options = create_testing_options();
let table_cache = Arc::new(TableCache::new(db_options.clone(), 10));
let base_version = Arc::new(RwLock::new(Node::new(Version::new(
db_options,
&table_cache,
1,
1,
))));
let mut version_builder = VersionBuilder::new();
let mut change_manifest = VersionChangeManifest::default();
for idx in 1..5 {
change_manifest.add_file(
3,
2900 + idx,
4 * 1024 * 1024,
InternalKey::new(
(idx * 100).to_string().as_bytes().to_vec(),
idx + 200,
Operation::Put,
)
..InternalKey::new(
(idx * 100 + 99).to_string().as_bytes().to_vec(),
idx + 300,
Operation::Delete,
),
);
change_manifest.remove_file(4, 2880 + idx);
change_manifest.add_compaction_pointer(
idx as usize,
InternalKey::new(b"rst".to_vec(), 100 + idx, Operation::Put),
);
}
version_builder.accumulate_changes(&change_manifest);
let mut compaction_pointers: [Option<InternalKey>; MAX_NUM_LEVELS] = Default::default();
let new_version =
version_builder.apply_changes(&base_version, 2899, 5000, &mut compaction_pointers);
for level in 1..5 {
let ptr = compaction_pointers[level].as_ref();
assert_eq!(
*ptr.unwrap(),
InternalKey::new(b"rst".to_vec(), (100 + level) as u64, Operation::Put)
);
}
assert!(version_builder.already_invoked);
assert_eq!(new_version.num_files_at_level(3), 4);
}
#[test]
fn files_marked_to_be_deleted_are_removed_in_the_new_version() {
let db_options = create_testing_options();
let table_cache = Arc::new(TableCache::new(db_options.clone(), 10));
let base_version = Arc::new(RwLock::new(Node::new(Version::new(
db_options,
&table_cache,
1,
1,
))));
for idx in 1..5 {
let mut file = FileMetadata::new(2900 + idx);
file.set_smallest_key(Some(InternalKey::new(
(idx * 100).to_string().as_bytes().to_vec(),
idx + 200,
Operation::Put,
)));
file.set_largest_key(Some(InternalKey::new(
(idx * 100 + 99).to_string().as_bytes().to_vec(),
idx + 300,
Operation::Delete,
)));
base_version.write().element.files[3].push(Arc::new(file));
}
let mut version_builder = VersionBuilder::new();
let mut change_manifest = VersionChangeManifest::default();
change_manifest.remove_file(3, 2902);
version_builder.accumulate_changes(&change_manifest);
let mut compaction_pointers: [Option<InternalKey>; MAX_NUM_LEVELS] = Default::default();
let new_version =
version_builder.apply_changes(&base_version, 2899, 5000, &mut compaction_pointers);
assert_eq!(new_version.num_files_at_level(3), 3);
}
fn create_testing_options() -> DbOptions {
let mut options = DbOptions::with_memory_env();
options.max_block_size = 256;
options
}
}