use integer_encoding::FixedInt;
use snap::read::FrameDecoder;
use std::convert::{TryFrom, TryInto};
use std::fmt;
use std::io::{self, Read};
use std::sync::Arc;
use crate::config::{TableFileCompressionType, SIZE_OF_U32_BYTES};
use crate::errors::{DBIOError, RainDBError, RainDBResult};
use crate::filter_policy;
use crate::fs::ReadonlyRandomAccessFile;
use crate::iterator::RainDbIterator;
use crate::key::{InternalKey, Operation, RainDbKeyType};
use crate::utils::cache::CacheEntry;
use crate::utils::crc;
use crate::{DbOptions, ReadOptions};
use super::block::{BlockIter, BlockReader, DataBlockReader, MetaIndexBlockReader, MetaIndexKey};
use super::block_handle::BlockHandle;
use super::constants::{BLOCK_DESCRIPTOR_SIZE_BYTES, CRC_CALCULATOR};
use super::errors::{ReadError, TableReadResult};
use super::filter_block::FilterBlockReader;
use super::footer::{Footer, SIZE_OF_FOOTER_BYTES};
type DataBlockCacheEntry = Box<dyn CacheEntry<Arc<DataBlockReader>>>;
pub(crate) struct Table {
options: DbOptions,
file: Box<dyn ReadonlyRandomAccessFile>,
cache_partition_id: u64,
footer: Footer,
index_block: DataBlockReader,
maybe_filter_block: Option<FilterBlockReader>,
}
impl Table {
pub fn open(
options: DbOptions,
file: Box<dyn ReadonlyRandomAccessFile>,
) -> TableReadResult<Table> {
let file_length = file.len()?;
if file_length < (SIZE_OF_FOOTER_BYTES as u64) {
return Err(ReadError::FailedToParse(format!(
"Failed to open the table file. The file length ({:?}) is invalid.",
file_length
)));
}
let cache_partition_id = (*options.block_cache()).new_id();
log::debug!("Reading and parsing the table file footer");
let mut footer_buf: Vec<u8> = vec![0; SIZE_OF_FOOTER_BYTES];
file.read_from(
&mut footer_buf,
(file_length as usize) - SIZE_OF_FOOTER_BYTES,
)?;
let footer = Footer::try_from(&footer_buf)?;
log::debug!("Reading and parsing the index block");
let index_block: DataBlockReader =
Table::get_data_block_reader_from_disk(&*file, footer.get_index_handle())?;
log::debug!("Reading and parsing the metaindex block");
let metaindex_block: MetaIndexBlockReader =
Table::get_data_block_reader_from_disk(&*file, footer.get_metaindex_handle())?;
log::debug!("Reading and parsing the filter block");
let maybe_filter_block: Option<FilterBlockReader> =
match Table::read_filter_meta_block(&options, &*file, &metaindex_block) {
Ok(filter_block) => filter_block,
Err(error) => {
log::warn!("{}", error);
None
}
};
Ok(Table {
options,
cache_partition_id,
file,
footer,
index_block,
maybe_filter_block,
})
}
pub fn get(
&self,
read_options: &ReadOptions,
key: &InternalKey,
) -> TableReadResult<Option<Vec<u8>>> {
let mut index_block_iter = self.index_block.iter();
index_block_iter.seek(key)?;
let maybe_raw_handle = index_block_iter.current();
if maybe_raw_handle.is_none() {
return Ok(None);
}
let (_key, raw_handle) = maybe_raw_handle.unwrap();
let block_handle = BlockHandle::try_from(raw_handle)?;
if self.maybe_filter_block.is_some()
&& !self
.maybe_filter_block
.as_ref()
.unwrap()
.key_may_match(block_handle.get_offset(), key.get_user_key())
{
return Err(ReadError::KeyNotFound);
}
let block_reader = self.get_block_reader(read_options, &block_handle)?;
let mut block_reader_iter = block_reader.iter();
block_reader_iter.seek(key)?;
match block_reader_iter.current() {
Some((found_key, found_value)) => {
if found_key.get_user_key() != key.get_user_key() {
return Err(ReadError::KeyNotFound);
}
if found_key.get_operation() == Operation::Delete {
return Ok(None);
}
Ok(Some(found_value.clone()))
}
None => Err(ReadError::KeyNotFound),
}
}
pub fn iter_with(table: Arc<Table>, read_options: ReadOptions) -> TwoLevelIterator {
TwoLevelIterator::new(table, read_options)
}
}
impl Table {
fn get_data_block_reader_from_disk<K: RainDbKeyType>(
file: &dyn ReadonlyRandomAccessFile,
block_handle: &BlockHandle,
) -> TableReadResult<BlockReader<K>> {
let block_data = Table::read_block_from_disk(file, block_handle)?;
BlockReader::new(block_data)
}
fn read_block_from_disk(
file: &dyn ReadonlyRandomAccessFile,
block_handle: &BlockHandle,
) -> TableReadResult<Vec<u8>> {
let block_data_size: usize = block_handle.get_size() as usize;
let total_block_size: usize = block_data_size + BLOCK_DESCRIPTOR_SIZE_BYTES;
let mut raw_block_data: Vec<u8> = vec![0; total_block_size];
let bytes_read = file.read_from(&mut raw_block_data, block_handle.get_offset() as usize)?;
if bytes_read != total_block_size {
return Err(ReadError::IO(DBIOError::new(
io::ErrorKind::UnexpectedEof,
"Could not read the entire block into the buffer.".to_string(),
)));
}
let offset_to_checksum = total_block_size - SIZE_OF_U32_BYTES;
let checksum_on_disk = u32::decode_fixed(&raw_block_data[offset_to_checksum..]);
let unmasked_stored_checksum = crc::unmask_checksum(checksum_on_disk);
let calculated_block_checksum =
CRC_CALCULATOR.checksum(&raw_block_data[..offset_to_checksum]);
if unmasked_stored_checksum != calculated_block_checksum {
return Err(ReadError::FailedToParse(
"Failed to parse the block. There was a mismatch in the checksum".to_string(),
));
}
let compression_type_offset = total_block_size - BLOCK_DESCRIPTOR_SIZE_BYTES;
let maybe_compression_type: RainDBResult<TableFileCompressionType> =
raw_block_data[compression_type_offset].try_into();
let compression_type: TableFileCompressionType = match maybe_compression_type {
Err(error) => {
return Err(ReadError::BlockDecompression(DBIOError::new(
io::ErrorKind::InvalidData,
error.to_string(),
)));
}
Ok(encoded_compression_type) => encoded_compression_type,
};
let raw_block_contents = &raw_block_data[..compression_type_offset];
match compression_type {
TableFileCompressionType::None => Ok(raw_block_contents.to_vec()),
TableFileCompressionType::Snappy => {
let mut snappy_reader = FrameDecoder::new(raw_block_contents);
let mut decompressed_data: Vec<u8> = Vec::with_capacity(raw_block_contents.len());
match snappy_reader.read_to_end(&mut decompressed_data) {
Err(error) => Err(ReadError::BlockDecompression(error.into())),
Ok(_) => Ok(decompressed_data),
}
}
}
}
fn read_filter_meta_block(
options: &DbOptions,
file: &dyn ReadonlyRandomAccessFile,
metaindex_block: &MetaIndexBlockReader,
) -> TableReadResult<Option<FilterBlockReader>> {
let filter_block_name = filter_policy::get_filter_block_name(options.filter_policy());
let mut metaindex_block_iter = metaindex_block.iter();
if let Err(error) = metaindex_block_iter.seek(&MetaIndexKey::new(filter_block_name)) {
return Err(ReadError::FilterBlock(format!("{}", error)));
}
match metaindex_block_iter.current() {
Some((_key, raw_contents)) => {
let filter_block_handle = BlockHandle::try_from(raw_contents)?;
let raw_filter_block = Table::read_block_from_disk(file, &filter_block_handle)?;
match FilterBlockReader::new(options.filter_policy(), raw_filter_block) {
Err(error) => return Err(ReadError::FilterBlock(format!("{}", error))),
Ok(reader) => Ok(Some(reader)),
}
}
None => Ok(None),
}
}
fn get_block_reader(
&self,
read_options: &ReadOptions,
block_handle: &BlockHandle,
) -> TableReadResult<Arc<DataBlockReader>> {
match self.get_block_reader_from_cache(block_handle) {
Some(cache_entry) => Ok(Arc::clone(&cache_entry.get_value())),
None => {
let reader: DataBlockReader =
Table::get_data_block_reader_from_disk(&*self.file, block_handle)?;
if read_options.fill_cache {
let cache_entry = self.cache_block_reader(reader, block_handle);
return Ok(Arc::clone(&cache_entry.get_value()));
}
Ok(Arc::new(reader))
}
}
}
fn get_block_reader_from_cache(
&self,
block_handle: &BlockHandle,
) -> Option<DataBlockCacheEntry> {
let block_cache = self.options.block_cache();
let block_cache_key =
BlockCacheKey::new(self.cache_partition_id, block_handle.get_offset());
block_cache.get(&block_cache_key)
}
fn cache_block_reader(
&self,
block_reader: DataBlockReader,
block_handle: &BlockHandle,
) -> DataBlockCacheEntry {
let block_cache = self.options.block_cache();
let block_cache_key =
BlockCacheKey::new(self.cache_partition_id, block_handle.get_offset());
block_cache.insert(block_cache_key, Arc::new(block_reader))
}
}
impl fmt::Debug for Table {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Table")
.field("cache_partition_id", &self.cache_partition_id)
.field("footer", &self.footer)
.finish_non_exhaustive()
}
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct BlockCacheKey {
cache_partition_id: u64,
block_offset: u64,
}
impl BlockCacheKey {
pub fn new(cache_partition_id: u64, block_offset: u64) -> Self {
Self {
cache_partition_id,
block_offset,
}
}
}
impl From<&BlockCacheKey> for Vec<u8> {
fn from(value: &BlockCacheKey) -> Self {
let mut serialized_value = Vec::with_capacity(8 + 8);
serialized_value.append(&mut u64::encode_fixed_vec(value.cache_partition_id));
serialized_value.append(&mut u64::encode_fixed_vec(value.block_offset));
serialized_value
}
}
pub struct TwoLevelIterator {
table: Arc<Table>,
read_options: ReadOptions,
index_block_iter: BlockIter<InternalKey>,
maybe_data_block_iter: Option<BlockIter<InternalKey>>,
data_block_handle: Option<BlockHandle>,
}
impl TwoLevelIterator {
fn new(table: Arc<Table>, read_options: ReadOptions) -> Self {
let index_block_iter = table.index_block.iter();
Self {
table,
read_options,
index_block_iter,
maybe_data_block_iter: None,
data_block_handle: None,
}
}
fn init_data_block(&mut self) -> TableReadResult<()> {
if !self.index_block_iter.is_valid() {
self.maybe_data_block_iter = None;
self.data_block_handle = None;
} else {
let data_block_handle_bytes = self.index_block_iter.current().unwrap().1;
let block_handle = BlockHandle::try_from(data_block_handle_bytes)?;
if self.data_block_handle.is_some()
&& self.data_block_handle.as_ref().unwrap() == &block_handle
{
} else {
let data_block =
(*self.table).get_block_reader(&self.read_options, &block_handle)?;
self.maybe_data_block_iter = Some(data_block.iter());
self.data_block_handle = Some(block_handle);
}
}
Ok(())
}
fn skip_empty_data_blocks_forward(&mut self) -> TableReadResult<()> {
while self.maybe_data_block_iter.is_none()
|| !self.maybe_data_block_iter.as_mut().unwrap().is_valid()
{
if !self.index_block_iter.is_valid() {
self.maybe_data_block_iter = None;
self.data_block_handle = None;
return Ok(());
}
self.index_block_iter.next();
self.init_data_block()?;
if self.maybe_data_block_iter.is_some() {
self.maybe_data_block_iter
.as_mut()
.unwrap()
.seek_to_first()?;
}
}
Ok(())
}
fn skip_empty_data_blocks_backward(&mut self) -> TableReadResult<()> {
while self.maybe_data_block_iter.is_none()
|| !self.maybe_data_block_iter.as_ref().unwrap().is_valid()
{
if !self.index_block_iter.is_valid() {
self.maybe_data_block_iter = None;
self.data_block_handle = None;
return Ok(());
}
self.index_block_iter.prev();
self.init_data_block()?;
if self.maybe_data_block_iter.is_some() {
self.maybe_data_block_iter
.as_mut()
.unwrap()
.seek_to_last()?;
}
}
Ok(())
}
}
impl RainDbIterator for TwoLevelIterator {
type Key = InternalKey;
type Error = RainDBError;
fn is_valid(&self) -> bool {
self.maybe_data_block_iter.is_some()
&& self.maybe_data_block_iter.as_ref().unwrap().is_valid()
}
fn seek(&mut self, target: &Self::Key) -> Result<(), Self::Error> {
self.index_block_iter.seek(target)?;
self.init_data_block()?;
if self.maybe_data_block_iter.is_some() {
self.maybe_data_block_iter.as_mut().unwrap().seek(target)?;
}
self.skip_empty_data_blocks_forward()?;
Ok(())
}
fn seek_to_first(&mut self) -> Result<(), Self::Error> {
self.index_block_iter.seek_to_first()?;
self.init_data_block()?;
if self.maybe_data_block_iter.is_some() {
self.maybe_data_block_iter
.as_mut()
.unwrap()
.seek_to_first()?;
}
self.skip_empty_data_blocks_forward()?;
Ok(())
}
fn seek_to_last(&mut self) -> Result<(), Self::Error> {
self.index_block_iter.seek_to_last()?;
self.init_data_block()?;
if self.maybe_data_block_iter.is_some() {
self.maybe_data_block_iter
.as_mut()
.unwrap()
.seek_to_last()?;
}
self.skip_empty_data_blocks_backward()?;
Ok(())
}
fn next(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
if !self.is_valid() {
return None;
}
if self
.maybe_data_block_iter
.as_mut()
.unwrap()
.next()
.is_none()
{
if let Err(error) = self.skip_empty_data_blocks_forward() {
log::error!(
"There was an error skipping forward in a two-level iterator. Original \
error: {}",
error
);
return None;
}
}
if let Some(data_block) = self.maybe_data_block_iter.as_ref() {
return data_block.current();
}
None
}
fn prev(&mut self) -> Option<(&Self::Key, &Vec<u8>)> {
if !self.is_valid() {
return None;
}
if self
.maybe_data_block_iter
.as_mut()
.unwrap()
.prev()
.is_none()
{
if let Err(error) = self.skip_empty_data_blocks_backward() {
log::error!(
"There was an error skipping backward in a two-level iterator. Original \
error: {}",
error
);
return None;
}
}
if let Some(data_block) = self.maybe_data_block_iter.as_ref() {
return data_block.current();
}
None
}
fn current(&self) -> Option<(&Self::Key, &Vec<u8>)> {
if !self.is_valid() {
return None;
}
self.maybe_data_block_iter.as_ref().unwrap().current()
}
}
#[cfg(test)]
mod table_tests {
use std::rc::Rc;
use pretty_assertions::assert_eq;
use crate::file_names::FileNameHandler;
use crate::tables::TableBuilder;
use super::*;
fn setup() {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::max())
.is_test(true)
.try_init();
}
#[test]
fn table_with_one_entry_can_be_opened() {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
let num: u64 = 100_000;
let key = InternalKey::new(num.to_string().as_bytes().to_vec(), 30, Operation::Put);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num))
.unwrap();
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let _ = Table::open(options.clone(), file).unwrap();
}
#[test]
fn table_with_multiple_entries_can_be_opened() {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
for idx in 0..400_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let _ = Table::open(options.clone(), file).unwrap();
}
#[test]
fn given_a_table_with_a_single_entry_the_reader_can_get_a_value() {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
let num: u64 = 100_000;
let key = InternalKey::new(num.to_string().as_bytes().to_vec(), 30, Operation::Put);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num))
.unwrap();
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let table = Table::open(options.clone(), file).unwrap();
let actual = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(num.to_string().as_bytes().to_vec(), 30),
)
.unwrap()
.unwrap();
assert_eq!(actual, u64::encode_fixed_vec(100_000));
}
#[test]
fn given_a_table_with_multiple_entries_the_table_iterator_yields_expected_values_when_iterated()
{
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
for idx in 0..2000_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let table = Table::open(options.clone(), file).unwrap();
let mut table_iter = Table::iter_with(Arc::new(table), ReadOptions::default());
table_iter.seek_to_first().unwrap();
let mut idx: usize = 0;
while table_iter.is_valid() && idx < 2000 {
let num = idx + 100_000;
let expected_key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
let expected_val = u64::encode_fixed_vec(num as u64);
let (curr_key, curr_val) = table_iter.current().unwrap();
assert_eq!(&expected_key, curr_key);
assert_eq!(&expected_val, curr_val);
idx += 1;
table_iter.next();
}
assert!(
table_iter.next().is_none() && idx == 2000,
"Arrived at the last element early (index {idx}). Expected last element at iteration \
2000."
);
}
#[test]
fn given_a_table_with_a_multiple_entries_the_table_iterator_can_seek_to_targets() {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
for idx in 0..2000_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let table = Table::open(options.clone(), file).unwrap();
let mut table_iter = Table::iter_with(Arc::new(table), ReadOptions::default());
table_iter.seek_to_last().unwrap();
let (last_key, last_val) = table_iter.current().unwrap();
assert_eq!(
last_key,
&InternalKey::new(
101_999_usize.to_string().as_bytes().to_vec(),
1_999,
Operation::Put,
),
"Found an incorrect last key"
);
assert_eq!(
last_val,
&u64::encode_fixed_vec(101_999),
"Found an incorrect last value"
);
table_iter
.seek(&InternalKey::new(
101_117_usize.to_string().as_bytes().to_vec(),
1117,
Operation::Put,
))
.unwrap();
let (actual_key, actual_val) = table_iter.current().unwrap();
assert_eq!(
actual_key,
&InternalKey::new(
101_117_usize.to_string().as_bytes().to_vec(),
1117,
Operation::Put,
),
"Found an incorrect key"
);
assert_eq!(
actual_val,
&u64::encode_fixed_vec(101_117),
"Found an incorrect value"
);
table_iter
.seek(&InternalKey::new(
101_117_usize.to_string().as_bytes().to_vec(),
1118,
Operation::Put,
))
.unwrap();
let (actual_key, actual_val) = table_iter.current().unwrap();
assert_eq!(
actual_key,
&InternalKey::new(
101_117_usize.to_string().as_bytes().to_vec(),
1117,
Operation::Put,
),
"Found an incorrect key. Should have found a key greater than the target."
);
assert_eq!(
actual_val,
&u64::encode_fixed_vec(101_117),
"Found an incorrect value. Should have found the value of a key greater than the \
target key."
);
table_iter
.seek(&InternalKey::new(
101_117_usize.to_string().as_bytes().to_vec(),
1116,
Operation::Put,
))
.unwrap();
let (actual_key, actual_val) = table_iter.current().unwrap();
assert_eq!(
actual_key,
&InternalKey::new(
101_118_usize.to_string().as_bytes().to_vec(),
1118,
Operation::Put,
),
"Found an incorrect key. Should have found a key greater than the target."
);
assert_eq!(
actual_val,
&u64::encode_fixed_vec(101_118),
"Found an incorrect value. Should have found the value of a key greater than the \
target key."
);
table_iter
.seek(&InternalKey::new(
100_002_usize.to_string().as_bytes().to_vec(),
2,
Operation::Put,
))
.unwrap();
let (actual_key, actual_val) = table_iter.current().unwrap();
assert_eq!(
actual_key,
&InternalKey::new(
100_002_usize.to_string().as_bytes().to_vec(),
2,
Operation::Put,
),
);
assert_eq!(actual_val, &u64::encode_fixed_vec(100_002));
}
#[test]
fn given_a_table_with_multiple_entries_the_table_iterator_can_be_completely_iterated_in_reverse(
) {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
for idx in 0..2000_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let table = Table::open(options.clone(), file).unwrap();
let mut table_iter = Table::iter_with(Arc::new(table), ReadOptions::default());
table_iter.seek_to_last().unwrap();
let mut idx: usize = 0;
while table_iter.is_valid() && idx < 2000 {
idx += 1;
table_iter.prev();
}
assert!(
table_iter.prev().is_none() && idx == 2000,
"Arrived at the last element early (index {idx}). Expected last element at iteration \
2000."
);
}
#[test]
fn given_a_table_with_one_block_get_can_filter_out_deleted_values() {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let keys = [
InternalKey::new(b"batmann".to_vec(), 1, Operation::Put),
InternalKey::new(b"robin".to_vec(), 3, Operation::Put),
InternalKey::new(b"robin".to_vec(), 2, Operation::Delete),
InternalKey::new(b"tumtum".to_vec(), 5, Operation::Delete),
InternalKey::new(b"tumtum".to_vec(), 4, Operation::Put),
InternalKey::new(b"tumtum".to_vec(), 1, Operation::Put),
];
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
for key in keys {
let val = key.get_sequence_number();
table_builder
.add_entry(Rc::new(key), val.to_string().as_bytes())
.unwrap();
}
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let table = Table::open(options.clone(), file).unwrap();
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(b"robin".to_vec(), 3),
)
.unwrap()
.unwrap();
assert_eq!(b"3".to_vec(), found_value);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(b"robin".to_vec(), 20),
)
.unwrap()
.unwrap();
assert_eq!(
b"3".to_vec(),
found_value,
"Using a much more recent sequence number in the key should find the key and return \
the most recent value"
);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new(b"robin".to_vec(), 2, Operation::Delete),
)
.unwrap();
assert_eq!(
None, found_value,
"Looking exactly for a deleted key should find the key"
);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(b"robin".to_vec(), 2),
)
.err();
assert_eq!(
None, found_value,
"Using a seeking key should still find the key and return nothing for the value"
);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(b"robin".to_vec(), 1),
)
.err();
assert_eq!(Some(ReadError::KeyNotFound), found_value);
}
#[test]
fn given_a_table_with_multiple_blocks_get_can_filter_out_deleted_values() {
setup();
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options.clone(), 55).unwrap();
for idx in 0..400_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
let user_key_to_find = 100_404_usize.to_string().as_bytes().to_vec();
table_builder
.add_entry(
Rc::new(InternalKey::new(
user_key_to_find.clone(),
403,
Operation::Delete,
)),
&Vec::new(),
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(
user_key_to_find.clone(),
402,
Operation::Put,
)),
b"v3",
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(
user_key_to_find.clone(),
401,
Operation::Put,
)),
b"v2",
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(
user_key_to_find.clone(),
400,
Operation::Put,
)),
b"original",
)
.unwrap();
for idx in 0..400_usize {
let num = idx + 100_000 + 405;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64 + 405,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
table_builder.finalize().unwrap();
drop(table_builder);
let file_path = FileNameHandler::new(options.db_path().to_string()).get_table_file_path(55);
let file = options.filesystem_provider().open_file(&file_path).unwrap();
let table = Table::open(options.clone(), file).unwrap();
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(user_key_to_find.clone(), 401),
)
.unwrap()
.unwrap();
assert_eq!(b"v2".to_vec(), found_value);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(user_key_to_find.clone(), 402),
)
.unwrap()
.unwrap();
assert_eq!(b"v3".to_vec(), found_value);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new(user_key_to_find.clone(), 403, Operation::Delete),
)
.unwrap();
assert_eq!(
None, found_value,
"Looking exactly for a deleted key should find the key and return nothing for the value"
);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(user_key_to_find.clone(), 403),
)
.err();
assert_eq!(
None, found_value,
"Using a seeking key should find the key and return nothing for the value"
);
let found_value = table
.get(
&ReadOptions::default(),
&InternalKey::new_for_seeking(user_key_to_find, 1000),
)
.err();
assert_eq!(
None, found_value,
"Using a much more recent sequence number in the key should find the key and return \
nothing for the value"
);
}
}