use std::fs::File;
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::path::Path;
use super::block::{BlockBuilder, BlockHandle, BlockType, DEFAULT_RESTART_INTERVAL};
use super::filter::{BloomFilterPolicy, FilterBuilder, FilterPolicy};
use super::format::{Footer, HEADER_SIZE, Header, Section, SectionType};
pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024;
pub const DEFAULT_FILTER_BITS_PER_KEY: f64 = 10.0;
#[derive(Debug, Clone)]
pub struct SSTableBuilderOptions {
pub block_size: usize,
pub restart_interval: usize,
pub compression: BlockType,
pub filter_policy: Option<Box<dyn FilterPolicy>>,
pub use_block_hash_index: bool,
pub use_two_level_index: bool,
}
impl Default for SSTableBuilderOptions {
fn default() -> Self {
Self {
block_size: DEFAULT_BLOCK_SIZE,
restart_interval: DEFAULT_RESTART_INTERVAL,
compression: BlockType::Uncompressed,
filter_policy: Some(Box::new(BloomFilterPolicy::with_bits_per_key(
DEFAULT_FILTER_BITS_PER_KEY,
))),
use_block_hash_index: true,
use_two_level_index: false,
}
}
}
impl Clone for Box<dyn FilterPolicy> {
fn clone(&self) -> Self {
Box::new(BloomFilterPolicy::with_bits_per_key(self.bits_per_key()))
}
}
#[derive(Debug, Clone)]
struct IndexEntry {
largest_key: Vec<u8>,
handle: BlockHandle,
}
pub struct SSTableBuilder {
options: SSTableBuilderOptions,
file: BufWriter<File>,
data_block: BlockBuilder,
index_entries: Vec<IndexEntry>,
filter_builder: Option<Box<dyn FilterBuilder>>,
offset: u64,
num_entries: u64,
smallest_key: Option<Vec<u8>>,
largest_key: Option<Vec<u8>>,
last_key: Option<Vec<u8>>,
pending_index_entry: bool,
pending_largest_key: Vec<u8>,
data_section_start: u64,
estimated_keys: usize,
}
impl SSTableBuilder {
pub fn new<P: AsRef<Path>>(path: P, options: SSTableBuilderOptions) -> std::io::Result<Self> {
let file = File::create(path)?;
let mut writer = BufWriter::new(file);
writer.seek(SeekFrom::Start(HEADER_SIZE as u64))?;
let data_block = if options.use_block_hash_index {
BlockBuilder::with_hash_index(options.restart_interval)
} else {
BlockBuilder::new(options.restart_interval)
};
Ok(Self {
options,
file: writer,
data_block,
index_entries: Vec::new(),
filter_builder: None,
offset: HEADER_SIZE as u64,
num_entries: 0,
smallest_key: None,
largest_key: None,
last_key: None,
pending_index_entry: false,
pending_largest_key: Vec::new(),
data_section_start: HEADER_SIZE as u64,
estimated_keys: 0,
})
}
pub fn set_estimated_keys(&mut self, count: usize) {
self.estimated_keys = count;
if let Some(ref policy) = self.options.filter_policy {
self.filter_builder = Some(policy.create_builder(count));
}
}
pub fn add(&mut self, key: &[u8], value: &[u8]) -> std::io::Result<()> {
if let Some(ref last) = self.last_key {
debug_assert!(key > last.as_slice(), "Keys must be added in sorted order");
}
if self.pending_index_entry {
self.add_index_entry(&self.pending_largest_key.clone())?;
self.pending_index_entry = false;
}
if let Some(ref mut builder) = self.filter_builder {
builder.add_key(key);
}
self.data_block.add(key, value);
if self.smallest_key.is_none() {
self.smallest_key = Some(key.to_vec());
}
self.largest_key = Some(key.to_vec());
self.last_key = Some(key.to_vec());
self.num_entries += 1;
if self.data_block.estimated_size() >= self.options.block_size {
self.flush_data_block()?;
}
Ok(())
}
fn flush_data_block(&mut self) -> std::io::Result<()> {
if self.data_block.is_empty() {
return Ok(());
}
let block_data = self.data_block.finish();
let block_size = block_data.len();
let (compressed_data, block_type) = self.maybe_compress(&block_data);
let block_offset = self.offset;
self.file.write_all(&compressed_data)?;
self.file.write_all(&[block_type as u8])?;
let checksum = crc32fast::hash(&compressed_data);
self.file.write_all(&checksum.to_le_bytes())?;
let total_size = compressed_data.len() + 1 + 4; self.offset += total_size as u64;
if let Some(ref key) = self.largest_key {
self.pending_largest_key = key.clone();
}
self.pending_index_entry = true;
let handle = BlockHandle::new(block_offset, block_size as u64);
self.index_entries.push(IndexEntry {
largest_key: self.pending_largest_key.clone(),
handle,
});
self.data_block.reset();
Ok(())
}
fn maybe_compress(&self, data: &[u8]) -> (Vec<u8>, BlockType) {
match self.options.compression {
BlockType::Uncompressed => (data.to_vec(), BlockType::Uncompressed),
BlockType::Lz4 => {
match lz4_flex::compress_prepend_size(data) {
compressed if compressed.len() < data.len() => (compressed, BlockType::Lz4),
_ => (data.to_vec(), BlockType::Uncompressed),
}
}
BlockType::Zstd => {
match zstd::encode_all(data, 3) {
Ok(compressed) if compressed.len() < data.len() => {
(compressed, BlockType::Zstd)
}
_ => (data.to_vec(), BlockType::Uncompressed),
}
}
BlockType::Snappy => {
(data.to_vec(), BlockType::Uncompressed)
}
}
}
fn add_index_entry(&mut self, _largest_key: &[u8]) -> std::io::Result<()> {
Ok(())
}
pub fn finish(mut self) -> std::io::Result<SSTableBuilderResult> {
self.flush_data_block()?;
let data_section_end = self.offset;
let data_section_size = data_section_end - self.data_section_start;
let data_checksum = 0u32;
let mut sections = vec![Section::new(
SectionType::DataBlocks,
self.data_section_start,
data_section_size,
data_checksum,
)];
if let Some(mut builder) = self.filter_builder.take() {
let filter_data = builder.finish();
let filter_offset = self.offset;
let filter_size = filter_data.len() as u64;
let filter_checksum = crc32fast::hash(&filter_data);
self.file.write_all(&filter_data)?;
self.offset += filter_size;
sections.push(Section::new(
SectionType::Filter,
filter_offset,
filter_size,
filter_checksum,
));
}
let index_offset = self.offset;
let index_data = self.build_index()?;
let index_size = index_data.len() as u64;
let index_checksum = crc32fast::hash(&index_data);
self.file.write_all(&index_data)?;
self.offset += index_size;
sections.push(Section::new(
SectionType::Index,
index_offset,
index_size,
index_checksum,
));
let footer_offset = self.offset;
let footer = Footer::new(sections.clone());
let footer_data = footer.encode();
self.file.write_all(&footer_data)?;
let header = Header::new(sections.len() as u32, footer_offset);
self.file.seek(SeekFrom::Start(0))?;
self.file.write_all(&header.encode())?;
self.file.flush()?;
Ok(SSTableBuilderResult {
file_size: footer_offset + footer_data.len() as u64,
num_entries: self.num_entries,
num_data_blocks: self.index_entries.len(),
smallest_key: self.smallest_key,
largest_key: self.largest_key,
})
}
fn build_index(&self) -> std::io::Result<Vec<u8>> {
let mut builder = BlockBuilder::new(1);
for entry in &self.index_entries {
let handle_encoded = entry.handle.encode();
builder.add(&entry.largest_key, &handle_encoded);
}
Ok(builder.finish())
}
pub fn num_entries(&self) -> u64 {
self.num_entries
}
pub fn file_size(&self) -> u64 {
self.offset
}
}
#[derive(Debug)]
pub struct SSTableBuilderResult {
pub file_size: u64,
pub num_entries: u64,
pub num_data_blocks: usize,
pub smallest_key: Option<Vec<u8>>,
pub largest_key: Option<Vec<u8>>,
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_builder_basic() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.sst");
let options = SSTableBuilderOptions {
block_size: 256, filter_policy: None,
..Default::default()
};
let mut builder = SSTableBuilder::new(&path, options).unwrap();
for i in 0..100 {
let key = format!("key{:05}", i);
let value = format!("value{:05}", i);
builder.add(key.as_bytes(), value.as_bytes()).unwrap();
}
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 100);
assert!(result.num_data_blocks > 0);
assert!(result.file_size > 0);
}
#[test]
fn test_builder_with_filter() {
let dir = tempdir().unwrap();
let path = dir.path().join("test_filter.sst");
let mut builder = SSTableBuilder::new(&path, SSTableBuilderOptions::default()).unwrap();
builder.set_estimated_keys(1000);
for i in 0..1000 {
let key = format!("key{:06}", i);
let value = format!("value{:06}", i);
builder.add(key.as_bytes(), value.as_bytes()).unwrap();
}
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 1000);
assert!(result.file_size > 0);
}
}