use std::cmp::Ordering;
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::Path;
use crate::error::Result;
use crate::sst::block_builder::BlockBuilder;
use crate::sst::filter::BloomFilter;
use crate::sst::format::*;
use crate::types::{InternalKeyRef, ValueType, compare_internal_key};
use ruc::*;
pub struct TableBuildOptions {
pub block_size: usize,
pub block_restart_interval: usize,
pub bloom_bits_per_key: u32,
pub internal_keys: bool,
pub compression: CompressionType,
pub prefix_len: usize,
pub block_property_collectors: Vec<Box<dyn crate::options::BlockPropertyCollector>>,
}
impl Clone for TableBuildOptions {
fn clone(&self) -> Self {
Self {
block_size: self.block_size,
block_restart_interval: self.block_restart_interval,
bloom_bits_per_key: self.bloom_bits_per_key,
internal_keys: self.internal_keys,
compression: self.compression,
prefix_len: self.prefix_len,
block_property_collectors: Vec::new(),
}
}
}
impl Default for TableBuildOptions {
fn default() -> Self {
Self {
block_size: 4096,
block_restart_interval: 16,
bloom_bits_per_key: 10,
internal_keys: false,
compression: CompressionType::None,
prefix_len: 0,
block_property_collectors: Vec::new(),
}
}
}
struct PendingIndexEntry {
last_key: Vec<u8>,
handle: BlockHandle,
first_key: Vec<u8>,
properties: Vec<(String, Vec<u8>)>,
}
pub struct TableBuilder {
writer: BufWriter<File>,
options: TableBuildOptions,
data_block: BlockBuilder,
index_entries: Vec<PendingIndexEntry>,
pending_first_key: Option<Vec<u8>>,
filter_keys: Vec<Vec<u8>>,
offset: u64,
num_entries: u64,
last_key: Vec<u8>,
smallest_key: Option<Vec<u8>>,
largest_key: Option<Vec<u8>>,
prefix_set: HashSet<Vec<u8>>,
has_range_deletions: bool,
range_del_entries: Vec<(Vec<u8>, Vec<u8>)>,
block_property_collectors: Vec<Box<dyn crate::options::BlockPropertyCollector>>,
finished: bool,
}
impl TableBuilder {
pub fn new(path: &Path, mut options: TableBuildOptions) -> Result<Self> {
let file = File::create(path).c(d!())?;
let collectors = std::mem::take(&mut options.block_property_collectors);
Ok(Self {
writer: BufWriter::new(file),
data_block: BlockBuilder::new(options.block_restart_interval),
options,
index_entries: Vec::new(),
pending_first_key: None,
filter_keys: Vec::new(),
offset: 0,
num_entries: 0,
last_key: Vec::new(),
smallest_key: None,
largest_key: None,
prefix_set: HashSet::new(),
has_range_deletions: false,
range_del_entries: Vec::new(),
block_property_collectors: collectors,
finished: false,
})
}
pub fn add(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
assert!(!self.finished);
if self.options.internal_keys {
assert!(
self.last_key.is_empty()
|| compare_internal_key(key, &self.last_key) == Ordering::Greater,
"keys must be added in order"
);
} else {
assert!(
self.last_key.is_empty() || key > self.last_key.as_slice(),
"keys must be added in order"
);
}
if self.smallest_key.is_none() {
self.smallest_key = Some(key.to_vec());
}
self.largest_key = Some(key.to_vec());
if self.options.internal_keys && key.len() >= 8 {
let ikr = InternalKeyRef::new(key);
if ikr.value_type() == ValueType::RangeDeletion {
self.has_range_deletions = true;
self.range_del_entries.push((key.to_vec(), value.to_vec()));
self.last_key = key.to_vec();
self.num_entries += 1;
return Ok(());
}
}
let user_key_for_bloom = if self.options.internal_keys && key.len() >= 8 {
&key[..key.len() - 8]
} else {
key
};
self.filter_keys.push(user_key_for_bloom.to_vec());
if self.options.prefix_len > 0 && user_key_for_bloom.len() >= self.options.prefix_len {
self.prefix_set
.insert(user_key_for_bloom[..self.options.prefix_len].to_vec());
}
if self.data_block.estimated_size() >= self.options.block_size
&& !self.data_block.is_empty()
{
self.flush_data_block().c(d!())?;
}
if self.data_block.is_empty() {
self.pending_first_key = Some(key.to_vec());
}
self.data_block.add(key, value);
self.last_key = key.to_vec();
self.num_entries += 1;
for collector in &mut self.block_property_collectors {
collector.add(key, value);
}
Ok(())
}
pub fn finish(mut self) -> Result<TableBuildResult> {
if !self.data_block.is_empty() {
self.flush_data_block().c(d!())?;
}
let filter_handle = self.write_filter_block().c(d!())?;
let prefix_filter_handle = self.write_prefix_filter_block().c(d!())?;
let range_del_handle = self.write_range_del_block().c(d!())?;
let metaindex_handle = self
.write_metaindex_block(&filter_handle, &prefix_filter_handle, &range_del_handle)
.c(d!())?;
let index_handle = self.write_index_block().c(d!())?;
let footer = encode_footer(&metaindex_handle, &index_handle);
self.writer.write_all(&footer).c(d!())?;
self.offset += FOOTER_SIZE as u64;
self.writer.flush().c(d!())?;
self.writer.get_ref().sync_all().c(d!())?;
self.finished = true;
Ok(TableBuildResult {
file_size: self.offset,
num_entries: self.num_entries,
smallest_key: self.smallest_key,
largest_key: self.largest_key,
has_range_deletions: self.has_range_deletions,
})
}
fn flush_data_block(&mut self) -> Result<()> {
let last_key = self.last_key.clone();
let first_key = self.pending_first_key.take().unwrap_or_default();
let builder = std::mem::replace(
&mut self.data_block,
BlockBuilder::new(self.options.block_restart_interval),
);
let block_data = builder.finish();
let props: Vec<(String, Vec<u8>)> = self
.block_property_collectors
.iter_mut()
.map(|c| (c.name().to_string(), c.finish_block()))
.collect();
let handle = self.write_raw_block(&block_data).c(d!())?;
self.index_entries.push(PendingIndexEntry {
last_key,
handle,
first_key,
properties: props,
});
Ok(())
}
fn write_raw_block(&mut self, data: &[u8]) -> Result<BlockHandle> {
let (block_data, compression_type) = match self.options.compression {
CompressionType::Lz4 => {
let compressed = lz4_flex::compress_prepend_size(data);
if compressed.len() < data.len() {
(compressed, CompressionType::Lz4)
} else {
(data.to_vec(), CompressionType::None)
}
}
CompressionType::Zstd => {
let compressed = zstd::bulk::compress(data, 3).unwrap_or_else(|_| data.to_vec());
if compressed.len() < data.len() {
(compressed, CompressionType::Zstd)
} else {
(data.to_vec(), CompressionType::None)
}
}
CompressionType::None => (data.to_vec(), CompressionType::None),
};
let handle = BlockHandle::new(self.offset, block_data.len() as u64);
self.writer.write_all(&block_data).c(d!())?;
let mut hasher = crc32fast::Hasher::new();
hasher.update(&block_data);
hasher.update(&[compression_type as u8]);
let crc = hasher.finalize();
self.writer.write_all(&[compression_type as u8]).c(d!())?;
self.writer.write_all(&crc.to_le_bytes()).c(d!())?;
self.offset += block_data.len() as u64 + BLOCK_TRAILER_SIZE as u64;
Ok(handle)
}
fn write_filter_block(&mut self) -> Result<BlockHandle> {
if self.options.bloom_bits_per_key == 0 || self.filter_keys.is_empty() {
return Ok(BlockHandle::default());
}
let bf = BloomFilter::new(self.options.bloom_bits_per_key);
let key_refs: Vec<&[u8]> = self.filter_keys.iter().map(|k| k.as_slice()).collect();
let filter_data = bf.create_filter(&key_refs);
self.write_raw_block(&filter_data).c(d!())
}
fn write_prefix_filter_block(&mut self) -> Result<BlockHandle> {
if self.options.prefix_len == 0
|| self.options.bloom_bits_per_key == 0
|| self.prefix_set.is_empty()
{
return Ok(BlockHandle::default());
}
let mut prefixes: Vec<&[u8]> = self.prefix_set.iter().map(|p| p.as_slice()).collect();
prefixes.sort();
let bf = BloomFilter::new(self.options.bloom_bits_per_key);
let filter_data = bf.create_filter(&prefixes);
self.write_raw_block(&filter_data).c(d!())
}
fn write_range_del_block(&mut self) -> Result<BlockHandle> {
if self.range_del_entries.is_empty() {
return Ok(BlockHandle::default());
}
let mut builder = BlockBuilder::new(self.options.block_restart_interval);
for (key, value) in &self.range_del_entries {
builder.add(key, value);
}
let data = builder.finish();
self.write_raw_block(&data).c(d!())
}
fn write_metaindex_block(
&mut self,
filter_handle: &BlockHandle,
prefix_filter_handle: &BlockHandle,
range_del_handle: &BlockHandle,
) -> Result<BlockHandle> {
let mut builder = BlockBuilder::new(1);
if filter_handle.size > 0 {
let handle_bytes = filter_handle.encode();
builder.add(b"filter.bloom", &handle_bytes);
}
if prefix_filter_handle.size > 0 {
let handle_bytes = prefix_filter_handle.encode();
builder.add(b"filter.prefix", &handle_bytes);
builder.add(
PREFIX_FILTER_LEN_NAME.as_bytes(),
&(self.options.prefix_len as u64).to_le_bytes(),
);
}
if range_del_handle.size > 0 {
let handle_bytes = range_del_handle.encode();
builder.add(RANGE_DEL_BLOCK_NAME.as_bytes(), &handle_bytes);
}
let data = builder.finish();
self.write_raw_block(&data).c(d!())
}
fn write_index_block(&mut self) -> Result<BlockHandle> {
let mut builder = BlockBuilder::new(1);
for entry in &self.index_entries {
let value = if entry.properties.is_empty() {
encode_index_value(&entry.handle, &entry.first_key)
} else {
let prop_refs: Vec<(&str, &[u8])> = entry
.properties
.iter()
.map(|(n, d)| (n.as_str(), d.as_slice()))
.collect();
encode_index_value_with_props(&entry.handle, &entry.first_key, &prop_refs)
.c(d!())?
};
builder.add(&entry.last_key, &value);
}
let data = builder.finish();
self.write_raw_block(&data).c(d!())
}
}
#[derive(Debug)]
pub struct TableBuildResult {
pub file_size: u64,
pub num_entries: u64,
pub smallest_key: Option<Vec<u8>>,
pub largest_key: Option<Vec<u8>>,
pub has_range_deletions: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_build_table() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("test.sst");
let mut builder = TableBuilder::new(&path, TableBuildOptions::default()).unwrap();
for i in 0..100 {
let key = format!("key_{:06}", i);
let val = format!("value_{}", i);
builder.add(key.as_bytes(), val.as_bytes()).unwrap();
}
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 100);
assert!(result.file_size > 0);
assert_eq!(
result.smallest_key.as_deref(),
Some(b"key_000000".as_slice())
);
assert_eq!(
result.largest_key.as_deref(),
Some(b"key_000099".as_slice())
);
}
#[test]
fn test_empty_table() {
use crate::sst::table_reader::TableReader;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("empty.sst");
let builder = TableBuilder::new(&path, TableBuildOptions::default()).unwrap();
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 0);
assert!(result.file_size > 0); assert!(result.smallest_key.is_none());
assert!(result.largest_key.is_none());
let reader = TableReader::open(&path).unwrap();
let entries = reader.iter().unwrap();
assert!(entries.is_empty());
assert_eq!(reader.get(b"anything").unwrap(), None);
}
#[test]
fn test_single_entry_table() {
use crate::sst::table_reader::TableReader;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("single.sst");
let mut builder = TableBuilder::new(&path, TableBuildOptions::default()).unwrap();
builder.add(b"only_key", b"only_value").unwrap();
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 1);
assert_eq!(result.smallest_key.as_deref(), Some(b"only_key".as_slice()));
assert_eq!(result.largest_key.as_deref(), Some(b"only_key".as_slice()));
let reader = TableReader::open(&path).unwrap();
let entries = reader.iter().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].0, b"only_key");
assert_eq!(entries[0].1, b"only_value");
assert_eq!(
reader.get(b"only_key").unwrap(),
Some(b"only_value".to_vec())
);
assert_eq!(reader.get(b"other").unwrap(), None);
}
#[test]
fn test_zstd_compression_roundtrip() {
use crate::sst::table_reader::TableReader;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("zstd.sst");
let opts = TableBuildOptions {
compression: CompressionType::Zstd,
..Default::default()
};
let mut builder = TableBuilder::new(&path, opts).unwrap();
for i in 0..500 {
let key = format!("key_{:06}", i);
let val = format!(
"value_{}_padding_data_to_make_it_compressible_{}",
i,
"x".repeat(100)
);
builder.add(key.as_bytes(), val.as_bytes()).unwrap();
}
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 500);
let reader = TableReader::open(&path).unwrap();
let entries = reader.iter().unwrap();
assert_eq!(entries.len(), 500);
for (i, entry) in entries.iter().enumerate().take(500) {
let key = format!("key_{:06}", i);
let val = format!(
"value_{}_padding_data_to_make_it_compressible_{}",
i,
"x".repeat(100)
);
assert_eq!(entry.0, key.as_bytes());
assert_eq!(entry.1, val.as_bytes());
}
assert_eq!(
reader.get(b"key_000000").unwrap(),
Some(
format!(
"value_0_padding_data_to_make_it_compressible_{}",
"x".repeat(100)
)
.into_bytes()
)
);
assert_eq!(reader.get(b"nonexistent").unwrap(), None);
}
#[test]
fn test_range_del_block_separate_storage() {
use crate::sst::table_reader::TableReader;
use crate::types::{InternalKey, ValueType};
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("range_del.sst");
let opts = TableBuildOptions {
internal_keys: true,
..Default::default()
};
let mut builder = TableBuilder::new(&path, opts).unwrap();
builder
.add(
InternalKey::new(b"aaa", 10, ValueType::Value).as_bytes(),
b"val_a",
)
.unwrap();
builder
.add(
InternalKey::new(b"bbb", 9, ValueType::RangeDeletion).as_bytes(),
b"ddd",
)
.unwrap();
builder
.add(
InternalKey::new(b"ccc", 8, ValueType::Value).as_bytes(),
b"val_c",
)
.unwrap();
builder
.add(
InternalKey::new(b"eee", 7, ValueType::RangeDeletion).as_bytes(),
b"ggg",
)
.unwrap();
builder
.add(
InternalKey::new(b"fff", 6, ValueType::Value).as_bytes(),
b"val_f",
)
.unwrap();
let result = builder.finish().unwrap();
assert_eq!(result.num_entries, 5);
assert!(result.has_range_deletions);
let reader = TableReader::open(&path).unwrap();
let entries = reader.iter().unwrap();
let keys: Vec<&[u8]> = entries.iter().map(|(k, _)| k.as_slice()).collect();
for k in &keys {
if k.len() >= 8 {
let ikr = InternalKeyRef::new(k);
assert_ne!(
ikr.value_type(),
ValueType::RangeDeletion,
"data blocks should not contain range deletions"
);
}
}
assert_eq!(entries.len(), 3, "only point entries in data blocks");
let tombstones = reader.get_range_tombstones().unwrap();
assert_eq!(tombstones.len(), 2);
assert_eq!(tombstones[0].0, b"bbb");
assert_eq!(tombstones[0].1, b"ddd");
assert_eq!(tombstones[0].2, 9);
assert_eq!(tombstones[1].0, b"eee");
assert_eq!(tombstones[1].1, b"ggg");
assert_eq!(tombstones[1].2, 7);
}
}