use std::fmt::Debug;
use std::io::Write;
use std::rc::Rc;
use integer_encoding::FixedInt;
use snap::write::FrameEncoder;
use crate::config::{TableFileCompressionType, PREFIX_COMPRESSION_RESTART_INTERVAL};
use crate::file_names::FileNameHandler;
use crate::filter_policy;
use crate::fs::RandomAccessFile;
use crate::key::InternalKey;
use crate::utils::bytes::BinarySeparable;
use crate::utils::crc;
use crate::DbOptions;
use super::block::MetaIndexKey;
use super::block_handle::BlockHandle;
use super::constants::{BLOCK_DESCRIPTOR_SIZE_BYTES, CRC_CALCULATOR};
use super::errors::BuilderError;
use super::footer::Footer;
use super::BlockBuilder;
use super::FilterBlockBuilder;
type TableBuildResult<T> = Result<T, BuilderError>;
pub(crate) struct TableBuilder {
options: DbOptions,
file_closed: bool,
file: Box<dyn RandomAccessFile>,
file_number: u64,
current_offset: u64,
data_block_builder: BlockBuilder<InternalKey>,
index_block_builder: BlockBuilder<InternalKey>,
filter_block_builder: FilterBlockBuilder,
num_entries: usize,
maybe_last_key_added: Option<Rc<InternalKey>>,
}
impl TableBuilder {
pub fn new(options: DbOptions, file_number: u64) -> TableBuildResult<Self> {
let file_name_handler = FileNameHandler::new(options.db_path().to_string());
let table_file_name = file_name_handler.get_table_file_path(file_number);
let file = options
.filesystem_provider()
.create_file(&table_file_name, false)?;
let filter_block_builder = FilterBlockBuilder::new(options.filter_policy());
Ok(Self {
options,
file_number,
file,
file_closed: false,
num_entries: 0,
maybe_last_key_added: None,
data_block_builder: BlockBuilder::new(PREFIX_COMPRESSION_RESTART_INTERVAL),
index_block_builder: BlockBuilder::new(1),
filter_block_builder,
current_offset: 0,
})
}
pub fn add_entry(&mut self, key: Rc<InternalKey>, value: &[u8]) -> TableBuildResult<()> {
assert!(!self.file_closed, "{}", BuilderError::AlreadyClosed);
assert!(
self.maybe_last_key_added
.as_ref()
.map_or(true, |last_key| last_key < &key),
"{}",
BuilderError::OutOfOrder
);
if self.data_block_builder.approximate_size() >= self.options.max_block_size() {
let maybe_block_handle = self.flush_data_block()?;
if let Some(block_handle) = maybe_block_handle {
let last_key_added = self.maybe_last_key_added.as_ref().unwrap();
let key_separator =
BinarySeparable::find_shortest_separator(last_key_added.as_ref(), key.as_ref());
let deserialized_separator = Rc::new(InternalKey::try_from(key_separator).unwrap());
self.index_block_builder.add_entry(
Rc::clone(&deserialized_separator),
&Vec::from(&block_handle),
);
}
}
self.maybe_last_key_added = Some(key.clone());
self.num_entries += 1;
self.data_block_builder.add_entry(key.clone(), value);
self.filter_block_builder
.add_key(key.get_user_key().to_vec());
Ok(())
}
pub fn finalize(&mut self) -> TableBuildResult<()> {
assert!(!self.file_closed, "{}", BuilderError::AlreadyClosed);
let maybe_block_handle = self.flush_data_block()?;
if let Some(block_handle) = maybe_block_handle {
let last_key_added = self.maybe_last_key_added.as_ref().unwrap();
let key_separator = BinarySeparable::find_shortest_successor(last_key_added.as_ref());
let serialized_separator = Rc::new(InternalKey::try_from(key_separator).unwrap());
self.index_block_builder
.add_entry(Rc::clone(&serialized_separator), &Vec::from(&block_handle));
}
self.file_closed = true;
let filter_block_offset = self.current_offset;
let filter_block_contents = self.filter_block_builder.finalize();
self.emit_block_to_disk(&filter_block_contents, TableFileCompressionType::None)?;
let filter_block_handle =
BlockHandle::new(filter_block_offset, filter_block_contents.len() as u64);
let mut metaindex_builder: BlockBuilder<MetaIndexKey> =
BlockBuilder::new(PREFIX_COMPRESSION_RESTART_INTERVAL);
let filter_block_key = MetaIndexKey::new(filter_policy::get_filter_block_name(
self.options.filter_policy(),
));
metaindex_builder.add_entry(Rc::new(filter_block_key), &Vec::from(&filter_block_handle));
let metaindex_contents = metaindex_builder.finalize();
let metaindex_handle = self.write_block(&metaindex_contents)?;
metaindex_builder.reset();
let index_contents = self.index_block_builder.finalize();
let index_block_handle = self.write_block(&index_contents)?;
self.index_block_builder.reset();
let footer = Footer::new(metaindex_handle, index_block_handle);
let serialized_footer = Vec::<u8>::try_from(&footer)?;
self.file.write_all(&serialized_footer)?;
self.current_offset += serialized_footer.len() as u64;
Ok(())
}
pub fn abandon(&mut self) {
assert!(!self.file_closed, "{}", BuilderError::AlreadyClosed);
self.file_closed = true;
}
pub fn file_size(&self) -> u64 {
self.current_offset
}
pub fn get_num_entries(&self) -> usize {
self.num_entries
}
}
impl TableBuilder {
fn flush_data_block(&mut self) -> TableBuildResult<Option<BlockHandle>> {
if self.data_block_builder.is_empty() {
return Ok(None);
}
let block_contents = self.data_block_builder.finalize();
let block_handle = self.write_block(&block_contents)?;
self.data_block_builder.reset();
self.file.flush()?;
self.filter_block_builder
.notify_new_data_block(self.current_offset as usize);
Ok(Some(block_handle))
}
fn write_block(&mut self, block_contents: &[u8]) -> TableBuildResult<BlockHandle> {
let start_offset = self.current_offset;
let mut compressed_block: Vec<u8> = vec![];
let mut snappy_encoder = FrameEncoder::new(compressed_block);
snappy_encoder.write_all(block_contents)?;
snappy_encoder.flush()?;
compressed_block = snappy_encoder.into_inner().unwrap();
let actual_block_size =
if compressed_block.len() < (block_contents.len() - (block_contents.len() / 8)) {
self.emit_block_to_disk(
compressed_block.as_slice(),
TableFileCompressionType::Snappy,
)?;
compressed_block.len()
} else {
self.emit_block_to_disk(block_contents, TableFileCompressionType::None)?;
block_contents.len()
};
Ok(BlockHandle::new(start_offset, actual_block_size as u64))
}
fn emit_block_to_disk(
&mut self,
block_contents: &[u8],
compression_type: TableFileCompressionType,
) -> TableBuildResult<()> {
self.file.write_all(block_contents)?;
let mut digest = CRC_CALCULATOR.digest();
digest.update(block_contents);
digest.update(&[compression_type as u8]);
let checksum = digest.finalize();
self.file.write_all(&[compression_type as u8])?;
let masked_checksum = crc::mask_checksum(checksum);
self.file
.write_all(&u32::encode_fixed_vec(masked_checksum))?;
self.current_offset += (block_contents.len() + BLOCK_DESCRIPTOR_SIZE_BYTES) as u64;
Ok(())
}
}
impl Debug for TableBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TableBuilder")
.field("options", &self.options)
.field("file_closed", &self.file_closed)
.field("file_number", &self.file_number)
.field("current_offset", &self.current_offset)
.field("num_entries", &self.num_entries)
.field("maybe_last_key_added", &self.maybe_last_key_added)
.finish()
}
}
#[cfg(test)]
mod tests {
use pretty_assertions::assert_eq;
use crate::Operation;
use super::*;
#[test]
fn table_builder_can_write_data_blocks_to_disk() {
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options, 55).unwrap();
for idx in 0..400_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
assert_eq!(table_builder.get_num_entries(), 400);
assert!(
table_builder.file_size() >= (2 * MAX_BLOCK_SIZE_BYTES as u64),
"The current file size should be at least as large as two blocks given our test input."
);
let pre_finalize_file_size = table_builder.file_size();
table_builder.finalize().unwrap();
assert!(table_builder.file_size() > pre_finalize_file_size);
}
#[test]
#[should_panic(
expected = "Attempted to perform an operation when the file had already been closed."
)]
fn table_builder_finalizing_an_abandoned_builder_should_panic() {
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options, 55).unwrap();
for idx in 0..400_usize {
let num = idx + 100_000;
let key = InternalKey::new(
num.to_string().as_bytes().to_vec(),
idx as u64,
Operation::Put,
);
table_builder
.add_entry(Rc::new(key), &u64::encode_fixed_vec(num as u64))
.unwrap();
}
table_builder.abandon();
table_builder.finalize().unwrap();
}
#[test]
fn table_builder_succeeds_when_keys_are_added_in_sorted_order() {
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let keys = [
InternalKey::new(b"batmann".to_vec(), 1, Operation::Put),
InternalKey::new(b"robin".to_vec(), 3, Operation::Put),
InternalKey::new(b"robin".to_vec(), 2, Operation::Delete),
InternalKey::new(b"tumtum".to_vec(), 5, Operation::Put),
InternalKey::new(b"tumtum".to_vec(), 1, Operation::Put),
];
let mut table_builder = TableBuilder::new(options, 55).unwrap();
for key in keys {
table_builder.add_entry(Rc::new(key), b"random").unwrap();
}
}
#[test]
#[should_panic(expected = "Attempted to add a key but it was out of order.")]
fn table_builder_panics_if_user_keys_are_not_added_in_sorted_order() {
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options, 55).unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(b"def".to_vec(), 399, Operation::Put)),
b"123",
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(b"abc".to_vec(), 400, Operation::Put)),
b"456",
)
.unwrap();
}
#[test]
#[should_panic(expected = "Attempted to add a key but it was out of order.")]
fn table_builder_panics_when_user_keys_are_sorted_but_sequence_numbers_are_not_added_in_sorted_order(
) {
const MAX_BLOCK_SIZE_BYTES: usize = 256;
let mut options = DbOptions::with_memory_env();
options.max_block_size = MAX_BLOCK_SIZE_BYTES;
let mut table_builder = TableBuilder::new(options, 55).unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(b"abc".to_vec(), 399, Operation::Put)),
b"123",
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(b"def".to_vec(), 400, Operation::Put)),
b"456",
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(b"ghi".to_vec(), 401, Operation::Put)),
b"original",
)
.unwrap();
table_builder
.add_entry(
Rc::new(InternalKey::new(b"ghi".to_vec(), 402, Operation::Put)),
b"most up to date",
)
.unwrap();
}
}