use std::cmp::Reverse;
use std::fmt::Write;
use std::ops::Range;
use std::sync::Arc;
use crate::config::{L0_COMPACTION_TRIGGER, MAX_MEM_COMPACT_LEVEL, MAX_NUM_LEVELS};
use crate::errors::{RainDBError, RainDBResult};
use crate::key::{InternalKey, MAX_SEQUENCE_NUMBER};
use crate::table_cache::TableCache;
use crate::tables::Table;
use crate::{compaction, tables, DbOptions, RainDbIterator, ReadOptions};
use super::errors::{ReadError, ReadResult};
use super::file_iterators::FilesEntryIterator;
use super::file_metadata::FileMetadata;
#[derive(Clone, Debug)]
pub(crate) struct SeekChargeMetadata {
seek_file: Option<Arc<FileMetadata>>,
seek_file_level: Option<usize>,
}
impl SeekChargeMetadata {
pub fn new() -> Self {
Self {
seek_file: None,
seek_file_level: None,
}
}
pub(crate) fn seek_file(&self) -> Option<&Arc<FileMetadata>> {
self.seek_file.as_ref()
}
pub(crate) fn seek_file_level(&self) -> usize {
self.seek_file_level.unwrap()
}
}
pub(crate) struct GetResponse {
pub(crate) value: Option<Vec<u8>>,
pub(crate) charge_metadata: SeekChargeMetadata,
}
#[derive(Clone, Debug)]
pub(crate) struct SizeCompactionMetadata {
pub(crate) compaction_level: usize,
pub(crate) compaction_score: f64,
}
#[derive(Clone, Debug, Default)]
pub(crate) struct SeekCompactionMetadata {
pub(crate) file_to_compact: Option<Arc<FileMetadata>>,
pub(crate) level_of_file_to_compact: usize,
}
#[derive(Clone, Debug)]
pub(crate) struct Version {
db_options: DbOptions,
pub(crate) files: [Vec<Arc<FileMetadata>>; MAX_NUM_LEVELS],
table_cache: Arc<TableCache>,
seek_compaction_metadata: SeekCompactionMetadata,
size_compaction_metadata: Option<SizeCompactionMetadata>,
last_sequence_number: u64,
wal_file_number: u64,
}
impl Version {
pub(crate) fn new(
db_options: DbOptions,
table_cache: &Arc<TableCache>,
last_sequence_number: u64,
wal_file_number: u64,
) -> Self {
let files = Default::default();
Self {
db_options,
table_cache: Arc::clone(table_cache),
seek_compaction_metadata: SeekCompactionMetadata::default(),
size_compaction_metadata: None,
files,
last_sequence_number,
wal_file_number,
}
}
pub(crate) fn new_from_current(
&self,
last_sequence_number: u64,
wal_file_number: u64,
) -> Version {
let mut new_version = self.clone();
new_version.set_seek_compaction_metadata(SeekCompactionMetadata::default());
new_version.set_size_compaction_metadata(None);
new_version.wal_file_number = wal_file_number;
new_version.last_sequence_number = last_sequence_number;
new_version
}
pub(crate) fn get(
&self,
read_options: &ReadOptions,
key: &InternalKey,
) -> ReadResult<GetResponse> {
let files_per_level = self.get_overlapping_files(key);
let mut last_file_read: Option<&Arc<FileMetadata>> = None;
let mut last_level_read: usize = 0;
let mut seek_charge_metadata = SeekChargeMetadata::new();
for (level, files) in files_per_level.iter().enumerate() {
for file in files {
if seek_charge_metadata.seek_file.is_none() && last_file_read.is_some() {
seek_charge_metadata.seek_file = last_file_read.take().cloned();
seek_charge_metadata.seek_file_level = Some(last_level_read);
}
last_file_read = Some(file);
last_level_read = level;
match self.table_cache.get(read_options, file.file_number(), key) {
Ok(maybe_val) => {
return Ok(GetResponse {
value: maybe_val,
charge_metadata: seek_charge_metadata,
})
}
Err(tables::errors::ReadError::KeyNotFound) => continue,
Err(base_err) => {
return Err(ReadError::TableRead((base_err, Some(seek_charge_metadata))))
}
};
}
}
Ok(GetResponse {
value: None,
charge_metadata: seek_charge_metadata,
})
}
pub(crate) fn update_stats(&mut self, charging_metadata: &SeekChargeMetadata) -> bool {
if let Some(file_to_charge) = charging_metadata.seek_file.as_ref() {
file_to_charge.decrement_allowed_seeks();
if file_to_charge.allowed_seeks() <= 0
&& self.seek_compaction_metadata.file_to_compact.is_none()
{
self.seek_compaction_metadata.file_to_compact = Some(Arc::clone(file_to_charge));
self.seek_compaction_metadata.level_of_file_to_compact =
charging_metadata.seek_file_level.unwrap();
return true;
}
}
false
}
pub(crate) fn num_files_at_level(&self, level: usize) -> usize {
self.files[level].len()
}
pub(crate) fn get_seek_compaction_metadata(&self) -> &SeekCompactionMetadata {
&self.seek_compaction_metadata
}
pub(crate) fn get_size_compaction_metadata(&self) -> Option<&SizeCompactionMetadata> {
self.size_compaction_metadata.as_ref()
}
pub(crate) fn set_seek_compaction_metadata(
&mut self,
seek_compaction_metadata: SeekCompactionMetadata,
) {
self.seek_compaction_metadata = seek_compaction_metadata;
}
pub(crate) fn set_size_compaction_metadata(
&mut self,
size_compaction_metadata: Option<SizeCompactionMetadata>,
) {
self.size_compaction_metadata = size_compaction_metadata;
}
pub(crate) fn last_sequence_number(&self) -> u64 {
self.last_sequence_number
}
pub(crate) fn wal_file_number(&self) -> u64 {
self.wal_file_number
}
pub(crate) fn has_overlap_in_level(
&self,
level: usize,
smallest_user_key: Option<&[u8]>,
largest_user_key: Option<&[u8]>,
) -> bool {
Version::some_file_overlaps_range(
level > 0,
&self.files[level],
smallest_user_key,
largest_user_key,
)
}
pub(crate) fn pick_level_for_memtable_output(
&self,
smallest_user_key: &[u8],
largest_user_key: &[u8],
) -> usize {
let mut level: usize = 0;
if self.has_overlap_in_level(0, Some(smallest_user_key), Some(largest_user_key)) {
return level;
}
let start_key =
InternalKey::new_for_seeking(smallest_user_key.to_vec(), MAX_SEQUENCE_NUMBER);
let end_key = InternalKey::new(largest_user_key.to_vec(), 0, crate::key::Operation::Delete);
while level < MAX_MEM_COMPACT_LEVEL {
if self.has_overlap_in_level(level + 1, Some(smallest_user_key), Some(largest_user_key))
{
break;
}
if level + 2 < MAX_NUM_LEVELS {
let files_overlapping_range = self
.get_overlapping_compaction_inputs(level + 2, Some(&start_key)..Some(&end_key));
let total_overlapping_file_size =
super::utils::sum_file_sizes(&files_overlapping_range);
if total_overlapping_file_size
> compaction::utils::max_grandparent_overlap_bytes_from_options(
&self.db_options,
)
{
break;
}
}
level += 1;
}
level
}
pub(crate) fn get_overlapping_files(
&self,
target_key: &InternalKey,
) -> [Vec<Arc<FileMetadata>>; MAX_NUM_LEVELS] {
let target_user_key = target_key.get_user_key();
let mut files: [Vec<Arc<FileMetadata>>; MAX_NUM_LEVELS] = Default::default();
for file in self.files[0].iter() {
if file.smallest_key().get_user_key() <= target_user_key
&& file.largest_key().get_user_key() >= target_user_key
{
files[0].push(file.clone());
}
}
files[0].sort_by_key(|f| Reverse(f.file_number()));
for level in 1..MAX_NUM_LEVELS {
let level_files = &self.files[level];
if level_files.is_empty() {
continue;
}
let maybe_file_index =
super::utils::find_file_with_upper_bound_range(level_files, target_key);
if let Some(file_index) = maybe_file_index {
let file = &level_files[file_index];
if file.smallest_key().get_user_key() <= target_user_key {
files[level].push(file.clone());
}
}
}
files
}
pub(crate) fn get_overlapping_compaction_inputs(
&self,
level: usize,
key_range: Range<Option<&InternalKey>>,
) -> Vec<&Arc<FileMetadata>> {
assert!(level < MAX_NUM_LEVELS);
let mut overlapping_files = vec![];
let mut start_user_key = key_range
.start
.map(|internal_key| internal_key.get_user_key());
let mut end_user_key = key_range
.end
.map(|internal_key| internal_key.get_user_key());
let mut index: usize = 0;
while index < self.files[level].len() {
let current_file = &self.files[level][index];
let file_range_start = current_file.smallest_key().get_user_key();
let file_range_end = current_file.largest_key().get_user_key();
let is_file_range_before_target =
key_range.start.is_some() && file_range_end < start_user_key.unwrap();
let is_file_range_after_target =
key_range.end.is_some() && end_user_key.unwrap() < file_range_start;
index += 1;
if is_file_range_before_target || is_file_range_after_target {
} else {
overlapping_files.push(current_file);
if level != 0 {
continue;
}
if key_range.start.is_some() && file_range_start < start_user_key.unwrap() {
start_user_key = Some(file_range_start);
overlapping_files.clear();
index = 0;
} else if key_range.end.is_some() && file_range_end > end_user_key.unwrap() {
end_user_key = Some(file_range_end);
overlapping_files.clear();
index = 0;
}
}
}
overlapping_files
}
pub(crate) fn get_overlapping_compaction_inputs_strong(
&self,
level: usize,
key_range: Range<Option<&InternalKey>>,
) -> Vec<Arc<FileMetadata>> {
self.get_overlapping_compaction_inputs(level, key_range)
.into_iter()
.map(Arc::clone)
.collect()
}
pub(crate) fn finalize(&mut self) {
let mut best_level: usize = 0;
let mut best_score: f64 = -1.;
for level in 0..MAX_NUM_LEVELS {
let new_score: f64 = if level == 0 {
(self.files[level].len() / L0_COMPACTION_TRIGGER) as f64
} else {
let level_file_size = super::utils::sum_file_sizes(&self.files[level]) as f64;
level_file_size / Version::max_bytes_for_level(level)
};
if new_score > best_score {
best_score = new_score;
best_level = level;
}
}
self.set_size_compaction_metadata(Some(SizeCompactionMetadata {
compaction_level: best_level,
compaction_score: best_score,
}));
}
pub(crate) fn requires_size_compaction(&self) -> bool {
let compaction_score = self
.get_size_compaction_metadata()
.unwrap()
.compaction_score;
compaction_score >= 1.
}
pub(crate) fn requires_seek_compaction(&self) -> bool {
self.get_seek_compaction_metadata()
.file_to_compact
.is_some()
}
pub(crate) fn get_representative_iterators(
&self,
read_options: &ReadOptions,
) -> RainDBResult<Vec<Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>>> {
let initial_capacity = self.files[0].len() + MAX_NUM_LEVELS - 1;
let mut iterators: Vec<Box<dyn RainDbIterator<Key = InternalKey, Error = RainDBError>>> =
Vec::with_capacity(initial_capacity);
for file in self.files[0].iter() {
let table = self.table_cache.find_table(file.file_number())?;
let table_iter = Box::new(Table::iter_with(table, read_options.clone()));
iterators.push(table_iter);
}
for level in 1..MAX_NUM_LEVELS {
let level_files = &self.files[level];
if level_files.is_empty() {
continue;
}
let file_list_iter = Box::new(FilesEntryIterator::new(
level_files.clone(),
Arc::clone(&self.table_cache),
read_options.clone(),
));
iterators.push(file_list_iter);
}
Ok(iterators)
}
pub(crate) fn record_read_sample(&mut self, key: &InternalKey) -> bool {
let files_per_level = self.get_overlapping_files(key);
let mut seek_charge_metadata = SeekChargeMetadata::new();
let mut num_files_with_key: usize = 0;
for (level, files) in files_per_level.iter().enumerate() {
for file in files {
num_files_with_key += 1;
if num_files_with_key == 1 {
seek_charge_metadata.seek_file = Some(Arc::clone(file));
seek_charge_metadata.seek_file_level = Some(level);
}
if num_files_with_key > 1 {
break;
}
}
}
if num_files_with_key >= 2 {
return self.update_stats(&seek_charge_metadata);
}
false
}
pub(crate) fn debug_summary(&self) -> String {
let mut summary = "".to_owned();
for (level, files) in self.files.iter().enumerate() {
writeln!(summary, "--- Level {level} ---").unwrap();
for file in files {
writeln!(
summary,
"{file_num} (size: {file_size})[{key_range_start:?}..{key_range_end:?}]",
file_num = file.file_number(),
file_size = file.get_file_size(),
key_range_start = file.smallest_key(),
key_range_end = file.largest_key()
)
.unwrap();
}
}
summary
}
pub(crate) fn get_level_size(&self, level: usize) -> u64 {
assert!(level < MAX_NUM_LEVELS);
super::utils::sum_file_sizes(&self.files[level])
}
}
impl Version {
fn some_file_overlaps_range(
disjoint_sorted_files: bool,
files: &[Arc<FileMetadata>],
smallest_user_key: Option<&[u8]>,
largest_user_key: Option<&[u8]>,
) -> bool {
if files.is_empty() {
return false;
}
if !disjoint_sorted_files {
for file in files {
let is_key_after_file = smallest_user_key.is_some()
&& smallest_user_key.unwrap() > file.largest_key().get_user_key();
let is_key_before_file = largest_user_key.is_some()
&& largest_user_key.unwrap() < file.smallest_key().get_user_key();
if is_key_after_file || is_key_before_file {
} else {
return true;
}
}
return false;
}
let mut overlapping_file_index: usize = 0;
if let Some(smallest_key) = smallest_user_key {
let smallest_full_key =
InternalKey::new_for_seeking(smallest_key.to_vec(), MAX_SEQUENCE_NUMBER);
if let Some(file_index) =
super::utils::find_file_with_upper_bound_range(files, &smallest_full_key)
{
overlapping_file_index = file_index;
} else {
return false;
}
}
if largest_user_key.is_none() {
return true;
}
largest_user_key.is_some()
&& largest_user_key.unwrap()
>= files[overlapping_file_index].smallest_key().get_user_key()
}
fn max_bytes_for_level(level: usize) -> f64 {
let starting_multiple_bytes: f64 = 1. * 1024. * 1024.;
let mut level = level;
let mut result: f64 = 10. * starting_multiple_bytes;
while level > 1 {
result *= 10.;
level -= 1;
}
result
}
}
#[cfg(test)]
mod some_file_overlaps_range_tests {
use crate::Operation;
use super::*;
#[test]
fn given_an_empty_list_returns_false() {
let files: Vec<Arc<FileMetadata>> = vec![];
assert!(!Version::some_file_overlaps_range(
false,
&files,
Some(b"efg"),
Some(b"tuv")
));
assert!(!Version::some_file_overlaps_range(
false,
&files,
None,
Some(b"tuv")
));
assert!(!Version::some_file_overlaps_range(
false,
&files,
Some(b"efg"),
None
));
assert!(!Version::some_file_overlaps_range(
false, &files, None, None
));
}
#[test]
fn given_a_list_of_one_element_succeeds() {
let mut files: Vec<Arc<FileMetadata>> = vec![];
insert_files_with_key_range(&mut files, vec![b"200".to_vec()..b"300".to_vec()]);
assert!(
!Version::some_file_overlaps_range(true, &files, Some(b"150"), Some(b"160")),
"A target range completely before the key space in the list of files should not have \
overlap."
);
assert!(
!Version::some_file_overlaps_range(true, &files, Some(b"301"), Some(b"400")),
"A target range completely after the key space in the list of files should not have \
overlap."
);
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"150"),
Some(b"200")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"200"),
Some(b"200")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"200"),
Some(b"300")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"250"),
Some(b"300")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"300"),
Some(b"300")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"150"),
Some(b"250")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"250"),
Some(b"400")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"220"),
Some(b"280")
));
}
#[test]
fn given_a_disjoint_list_multiple_elements_succeeds() {
let mut files: Vec<Arc<FileMetadata>> = vec![];
insert_files_with_key_range(
&mut files,
vec![
b"150".to_vec()..b"200".to_vec(),
b"200".to_vec()..b"250".to_vec(),
b"300".to_vec()..b"350".to_vec(),
b"400".to_vec()..b"450".to_vec(),
],
);
assert!(
!Version::some_file_overlaps_range(true, &files, Some(b"120"), Some(b"140")),
"A target range completely before the key space in the list of files should not have \
overlap."
);
assert!(
!Version::some_file_overlaps_range(true, &files, Some(b"460"), Some(b"470")),
"A target range completely after the key space in the list of files should not have \
overlap."
);
assert!(
!Version::some_file_overlaps_range(true, &files, Some(b"251"), Some(b"299")),
"Should not detect overlap for a target range completely between key spaces of all \
files."
);
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"120"),
Some(b"150")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"200"),
Some(b"200")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"250"),
Some(b"299")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"450"),
Some(b"460")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"150"),
Some(b"250")
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"250"),
Some(b"400")
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"220"),
Some(b"280")
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"275"),
Some(b"320")
));
}
#[test]
fn given_a_disjoint_list_multiple_elements_and_target_ranges_with_open_ranges_succeeds() {
let mut files: Vec<Arc<FileMetadata>> = vec![];
insert_files_with_key_range(
&mut files,
vec![
b"150".to_vec()..b"200".to_vec(),
b"200".to_vec()..b"250".to_vec(),
b"300".to_vec()..b"350".to_vec(),
b"400".to_vec()..b"450".to_vec(),
],
);
assert!(!Version::some_file_overlaps_range(
true,
&files,
None,
Some(b"149")
));
assert!(!Version::some_file_overlaps_range(
true,
&files,
Some(b"451"),
None
));
assert!(Version::some_file_overlaps_range(true, &files, None, None));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"100"),
None
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"230"),
None
));
assert!(Version::some_file_overlaps_range(
true,
&files,
Some(b"450"),
None
));
}
#[test]
fn given_a_non_disjoint_list_multiple_elements_succeeds() {
let mut files: Vec<Arc<FileMetadata>> = vec![];
insert_files_with_key_range(
&mut files,
vec![
b"150".to_vec()..b"600".to_vec(),
b"400".to_vec()..b"500".to_vec(),
],
);
assert!(!Version::some_file_overlaps_range(
false,
&files,
Some(b"100"),
Some(b"149")
));
assert!(!Version::some_file_overlaps_range(
false,
&files,
Some(b"601"),
Some(b"700")
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"100"),
Some(b"150"),
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"100"),
Some(b"700"),
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"400"),
Some(b"600"),
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"450"),
Some(b"500"),
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"450"),
Some(b"700"),
));
assert!(Version::some_file_overlaps_range(
false,
&files,
Some(b"600"),
Some(b"700"),
));
}
fn insert_files_with_key_range(
files: &mut Vec<Arc<FileMetadata>>,
user_keys: Vec<Range<Vec<u8>>>,
) {
for user_key_range in user_keys.into_iter() {
let mut file = FileMetadata::new(30);
file.set_smallest_key(Some(create_testing_key(user_key_range.start)));
file.set_largest_key(Some(create_testing_key(user_key_range.end)));
files.push(Arc::new(file));
}
}
fn create_testing_key(user_key: Vec<u8>) -> InternalKey {
InternalKey::new(user_key, 30, Operation::Put)
}
}
#[cfg(test)]
mod version_tests {
use pretty_assertions::assert_eq;
use std::rc::Rc;
use crate::tables::TableBuilder;
use crate::Operation;
use super::*;
fn setup() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::max())
.is_test(true)
.try_init();
}
#[test]
fn get_overlapping_files() {
let options = DbOptions::with_memory_env();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let overlapping_files = version
.get_overlapping_files(&InternalKey::new_for_seeking("d".as_bytes().to_vec(), 100));
let expected_file_numbers = [61, 60];
let actual_file_numbers = overlapping_files[0].iter().map(|file| file.file_number());
assert!(expected_file_numbers
.into_iter()
.zip(actual_file_numbers)
.all(|(expected, actual)| {
assert_eq!(expected, actual);
expected == actual
}), "Multiple files can be returned for level 0 and should be returned in descending order.");
let overlapping_files = version
.get_overlapping_files(&InternalKey::new_for_seeking("x".as_bytes().to_vec(), 100));
assert_eq!(overlapping_files[1].len(), 1);
assert_eq!(overlapping_files[1][0].file_number(), 57);
assert_eq!(overlapping_files[2].len(), 1);
assert_eq!(overlapping_files[2][0].file_number(), 53);
let overlapping_files = version
.get_overlapping_files(&InternalKey::new_for_seeking("k".as_bytes().to_vec(), 100));
assert_eq!(overlapping_files[2].len(), 1);
assert_eq!(overlapping_files[2][0].file_number(), 55);
let overlapping_files = version
.get_overlapping_files(&InternalKey::new_for_seeking("z".as_bytes().to_vec(), 100));
assert!(overlapping_files
.iter()
.all(|files_overlapping_in_level| files_overlapping_in_level.is_empty()));
}
#[test]
fn get_value_from_version() {
setup();
let read_options = ReadOptions::default();
let options = DbOptions::with_memory_env();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let actual_response = version
.get(
&read_options,
&InternalKey::new_for_seeking("a".as_bytes().to_vec(), 300),
)
.unwrap();
assert_eq!(actual_response.value, Some("a".as_bytes().to_vec()));
assert!(actual_response.charge_metadata.seek_file.is_none());
let actual_response = version
.get(
&read_options,
&InternalKey::new_for_seeking("m".as_bytes().to_vec(), 300),
)
.unwrap();
assert_eq!(actual_response.value, Some("m".as_bytes().to_vec()));
assert!(actual_response.charge_metadata.seek_file().is_none());
let actual_response = version
.get(
&read_options,
&InternalKey::new_for_seeking("p".as_bytes().to_vec(), 300),
)
.unwrap();
assert_eq!(actual_response.value, Some("p".as_bytes().to_vec()));
assert_eq!(
actual_response
.charge_metadata
.seek_file()
.unwrap()
.file_number(),
58
);
assert_eq!(actual_response.charge_metadata.seek_file_level(), 1);
let actual_response = version
.get(
&read_options,
&InternalKey::new_for_seeking("r".as_bytes().to_vec(), 64),
)
.unwrap();
assert_eq!(actual_response.value, Some("r-1".as_bytes().to_vec()));
assert_eq!(
actual_response
.charge_metadata
.seek_file()
.unwrap()
.file_number(),
58
);
assert_eq!(actual_response.charge_metadata.seek_file_level(), 1);
let actual_response = version
.get(
&read_options,
&InternalKey::new_for_seeking("x".as_bytes().to_vec(), 80),
)
.unwrap();
assert!(actual_response.value.is_none());
assert!(actual_response.charge_metadata.seek_file().is_none());
}
#[test]
fn update_stats_succeeds() {
let options = DbOptions::with_memory_env();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let mut seek_charge = SeekChargeMetadata::new();
seek_charge.seek_file = Some(Arc::clone(&version.files[1][1]));
seek_charge.seek_file_level = Some(1);
let needs_compaction = version.update_stats(&seek_charge);
assert!(
!needs_compaction,
"Should not need a compaction when there are still allowed seeks for a file."
);
while seek_charge.seek_file().unwrap().allowed_seeks() > 0 {
seek_charge.seek_file().unwrap().decrement_allowed_seeks();
}
let needs_compaction = version.update_stats(&seek_charge);
assert!(
needs_compaction,
"Should need a compaction when there are no more allowed seeks for a file."
);
assert_eq!(
version.get_seek_compaction_metadata().file_to_compact,
Some(Arc::clone(&version.files[1][1]))
);
assert_eq!(
version
.get_seek_compaction_metadata()
.level_of_file_to_compact,
1
);
}
#[test]
fn get_overlapping_compaction_inputs_expands_level_0_search_range() {
let options = DbOptions::with_memory_env();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let end_key = InternalKey::new("a".as_bytes().to_vec(), 105, Operation::Put);
let key_range = None..Some(&end_key);
let actual_inputs = version.get_overlapping_compaction_inputs(0, key_range);
assert!(
[60, 61, 62]
.into_iter()
.zip(actual_inputs)
.all(|(expected, actual)| {
assert_eq!(expected, actual.file_number());
expected == actual.file_number()
}),
"Multiple files can be returned for level 0."
);
let start_key = InternalKey::new("e".as_bytes().to_vec(), 105, Operation::Put);
let key_range = Some(&start_key)..None;
let actual_inputs = version.get_overlapping_compaction_inputs(0, key_range);
assert!(
[60, 61, 62]
.into_iter()
.zip(actual_inputs)
.all(|(expected, actual)| {
assert_eq!(expected, actual.file_number());
expected == actual.file_number()
}),
"Multiple files can be returned for level 0."
);
let start_key = InternalKey::new("c".as_bytes().to_vec(), 105, Operation::Put);
let end_key = InternalKey::new("e".as_bytes().to_vec(), 105, Operation::Put);
let key_range = Some(&start_key)..Some(&end_key);
let actual_inputs = version.get_overlapping_compaction_inputs(0, key_range);
assert!(
[60, 61, 62]
.into_iter()
.zip(actual_inputs)
.all(|(expected, actual)| {
assert_eq!(expected, actual.file_number());
expected == actual.file_number()
}),
"Multiple files can be returned for level 0."
);
}
#[test]
fn get_overlapping_compaction_inputs_gets_correct_files_for_levels_older_than_zero() {
let options = DbOptions::with_memory_env();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let end_key = InternalKey::new("s".as_bytes().to_vec(), 105, Operation::Put);
let key_range = None..Some(&end_key);
let actual_inputs = version.get_overlapping_compaction_inputs(1, key_range);
assert!([59, 58]
.into_iter()
.zip(actual_inputs)
.all(|(expected, actual)| {
assert_eq!(expected, actual.file_number());
expected == actual.file_number()
}), "Multiple files can be returned for level 0 and should be returned in descending order.");
let start_key = InternalKey::new("v".as_bytes().to_vec(), 105, Operation::Put);
let key_range = Some(&start_key)..None;
let actual_inputs = version.get_overlapping_compaction_inputs(1, key_range);
assert!([57]
.into_iter()
.zip(actual_inputs)
.all(|(expected, actual)| {
assert_eq!(expected, actual.file_number());
expected == actual.file_number()
}), "Multiple files can be returned for level 0 and should be returned in descending order.");
let start_key = InternalKey::new("h".as_bytes().to_vec(), 105, Operation::Put);
let end_key = InternalKey::new("r".as_bytes().to_vec(), 105, Operation::Put);
let key_range = Some(&start_key)..Some(&end_key);
let actual_inputs = version.get_overlapping_compaction_inputs(1, key_range);
assert!([59, 58]
.into_iter()
.zip(actual_inputs)
.all(|(expected, actual)| {
assert_eq!(expected, actual.file_number());
expected == actual.file_number()
}));
}
#[test]
fn pick_level_for_memtable_output_picks_the_correct_level() {
let options = DbOptions::with_memory_env();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let actual_level = version.pick_level_for_memtable_output("b".as_bytes(), "e".as_bytes());
assert_eq!(actual_level, 0);
let actual_level = version.pick_level_for_memtable_output("l".as_bytes(), "n".as_bytes());
assert_eq!(actual_level, 1);
}
#[test]
fn pick_level_for_memtable_output_when_there_are_overlapping_bytes_picks_the_correct_level() {
let mut options = DbOptions::with_memory_env();
options.max_file_size = 20;
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let actual_level = version.pick_level_for_memtable_output("z1".as_bytes(), "z7".as_bytes());
assert_eq!(actual_level, 1);
}
#[test]
fn get_representative_iterators_returns_the_correct_number_of_iterators() {
let options = DbOptions::with_memory_env();
let read_options = ReadOptions::default();
let table_cache = Arc::new(TableCache::new(options.clone(), 1000));
let mut version = Version::new(options.clone(), &table_cache, 99, 30);
create_test_files_for_version(options, &mut version);
let iters = version.get_representative_iterators(&read_options).unwrap();
assert_eq!(iters.len(), 6);
}
fn create_test_files_for_version(db_options: DbOptions, version: &mut Version) {
let entries = vec![
(
("a".as_bytes().to_vec(), Operation::Put),
("a".as_bytes().to_vec()),
),
(
("b".as_bytes().to_vec(), Operation::Put),
("b".as_bytes().to_vec()),
),
(
("c".as_bytes().to_vec(), Operation::Put),
("c".as_bytes().to_vec()),
),
(
("d".as_bytes().to_vec(), Operation::Put),
("d".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 90, 60);
version.files[0].push(Arc::new(table_file_meta));
let entries = vec![
(
("c".as_bytes().to_vec(), Operation::Put),
("c1".as_bytes().to_vec()),
),
(
("d".as_bytes().to_vec(), Operation::Put),
("d1".as_bytes().to_vec()),
),
(
("e".as_bytes().to_vec(), Operation::Put),
("e".as_bytes().to_vec()),
),
(
("f".as_bytes().to_vec(), Operation::Put),
("f".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 100, 61);
version.files[0].push(Arc::new(table_file_meta));
let entries = vec![
(
("f".as_bytes().to_vec(), Operation::Put),
("f-2".as_bytes().to_vec()),
),
(
("f1".as_bytes().to_vec(), Operation::Put),
("f1".as_bytes().to_vec()),
),
(
("f2".as_bytes().to_vec(), Operation::Put),
("f2".as_bytes().to_vec()),
),
(
("f3".as_bytes().to_vec(), Operation::Put),
("f3".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 105, 62);
version.files[0].push(Arc::new(table_file_meta));
let entries = vec![
(
("g".as_bytes().to_vec(), Operation::Put),
("g".as_bytes().to_vec()),
),
(
("h".as_bytes().to_vec(), Operation::Put),
("h".as_bytes().to_vec()),
),
(
("i".as_bytes().to_vec(), Operation::Put),
("i".as_bytes().to_vec()),
),
(
("j".as_bytes().to_vec(), Operation::Put),
("j".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 85, 59);
version.files[1].push(Arc::new(table_file_meta));
let entries = vec![
(
("o".as_bytes().to_vec(), Operation::Put),
("o".as_bytes().to_vec()),
),
(
("r".as_bytes().to_vec(), Operation::Put),
("r".as_bytes().to_vec()),
),
(
("s".as_bytes().to_vec(), Operation::Put),
("s".as_bytes().to_vec()),
),
(
("t".as_bytes().to_vec(), Operation::Put),
("t".as_bytes().to_vec()),
),
(
("u".as_bytes().to_vec(), Operation::Put),
("u".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 80, 58);
version.files[1].push(Arc::new(table_file_meta));
let entries = vec![
(
("v".as_bytes().to_vec(), Operation::Put),
("v".as_bytes().to_vec()),
),
(
("w".as_bytes().to_vec(), Operation::Put),
("w".as_bytes().to_vec()),
),
(("x".as_bytes().to_vec(), Operation::Delete), vec![]),
(
("y".as_bytes().to_vec(), Operation::Put),
("y".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 76, 57);
version.files[1].push(Arc::new(table_file_meta));
let entries = vec![
(
("k".as_bytes().to_vec(), Operation::Put),
("k".as_bytes().to_vec()),
),
(
("l".as_bytes().to_vec(), Operation::Put),
("l".as_bytes().to_vec()),
),
(
("m".as_bytes().to_vec(), Operation::Put),
("m".as_bytes().to_vec()),
),
(
("n".as_bytes().to_vec(), Operation::Put),
("n".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 65, 55);
version.files[2].push(Arc::new(table_file_meta));
let entries = vec![
(
("o".as_bytes().to_vec(), Operation::Put),
("o-1".as_bytes().to_vec()),
),
(
("p".as_bytes().to_vec(), Operation::Put),
("p".as_bytes().to_vec()),
),
(
("r".as_bytes().to_vec(), Operation::Put),
("r-1".as_bytes().to_vec()),
),
(
("s".as_bytes().to_vec(), Operation::Put),
("s-1".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 60, 54);
version.files[2].push(Arc::new(table_file_meta));
let entries = vec![
(
("v".as_bytes().to_vec(), Operation::Put),
("v".as_bytes().to_vec()),
),
(
("w".as_bytes().to_vec(), Operation::Put),
("w".as_bytes().to_vec()),
),
(
("x".as_bytes().to_vec(), Operation::Put),
("x".as_bytes().to_vec()),
),
(
("y".as_bytes().to_vec(), Operation::Put),
("y".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options.clone(), entries, 55, 53);
version.files[2].push(Arc::new(table_file_meta));
let entries = vec![
(
("z1".as_bytes().to_vec(), Operation::Put),
("z1".as_bytes().to_vec()),
),
(
("z2".as_bytes().to_vec(), Operation::Put),
("z2".as_bytes().to_vec()),
),
(
("z3".as_bytes().to_vec(), Operation::Put),
("z3".as_bytes().to_vec()),
),
(
("z4".as_bytes().to_vec(), Operation::Put),
("z4".as_bytes().to_vec()),
),
(
("z5".as_bytes().to_vec(), Operation::Put),
("z5".as_bytes().to_vec()),
),
(
("z6".as_bytes().to_vec(), Operation::Put),
("z6".as_bytes().to_vec()),
),
(
("z7".as_bytes().to_vec(), Operation::Put),
("z7".as_bytes().to_vec()),
),
];
let table_file_meta = create_table(db_options, entries, 45, 52);
version.files[3].push(Arc::new(table_file_meta));
}
fn create_table(
db_options: DbOptions,
entries: Vec<((Vec<u8>, Operation), Vec<u8>)>,
starting_sequence_num: u64,
file_number: u64,
) -> FileMetadata {
let smallest_key = InternalKey::new(
entries.first().unwrap().0 .0.clone(),
starting_sequence_num,
entries.first().unwrap().0 .1,
);
let largest_key = InternalKey::new(
entries.last().unwrap().0 .0.clone(),
starting_sequence_num + (entries.len() as u64) - 1,
entries.last().unwrap().0 .1,
);
let mut table_builder = TableBuilder::new(db_options, file_number).unwrap();
let mut curr_sequence_num = starting_sequence_num;
for ((user_key, operation), value) in entries {
table_builder
.add_entry(
Rc::new(InternalKey::new(user_key, curr_sequence_num, operation)),
&value,
)
.unwrap();
curr_sequence_num += 1;
}
table_builder.finalize().unwrap();
let mut file_meta = FileMetadata::new(file_number);
file_meta.set_smallest_key(Some(smallest_key));
file_meta.set_largest_key(Some(largest_key));
file_meta.set_file_size(table_builder.file_size());
file_meta
}
}