use super::{
block::{BlockBuilder, BlockReader},
varint,
};
use crate::BlockCache;
use crate::bloom_filter::BloomFilter;
use crate::constants::{COMPRESSION_NONE, COMPRESSION_SNAPPY};
use memmap2::Mmap;
use std::{
fs::{File, OpenOptions},
io::Write,
ops::Not,
path::PathBuf,
};
pub struct SSTableBuilder {
file: File,
path: PathBuf,
tmp_path: PathBuf,
data_block_builder: BlockBuilder,
index_block_builder: BlockBuilder,
offset: u64,
bloom_filter: BloomFilter,
}
impl SSTableBuilder {
pub fn new(path: PathBuf) -> Self {
let tmp_path = path.with_extension("tmp");
let file = OpenOptions::new()
.create(true)
.append(true)
.read(true)
.open(&tmp_path)
.unwrap();
Self {
file,
path,
tmp_path,
data_block_builder: BlockBuilder::new(),
index_block_builder: BlockBuilder::new(),
offset: 0,
bloom_filter: BloomFilter::new(100_000, 0.01),
}
}
pub fn add(&mut self, key: &[u8], value: &[u8]) {
self.bloom_filter.set(key);
self.data_block_builder.add(key, value);
if self.data_block_builder.is_block_maxed().not() {
return;
}
let last_key = self.data_block_builder.last_key();
let raw_data = self.data_block_builder.finish().to_vec();
let block_len_on_disk = self.write_compressed_block(&raw_data);
let mut value_bytes = Vec::new();
varint::encode_u64(self.offset, &mut value_bytes);
varint::encode_u64(block_len_on_disk, &mut value_bytes);
self.index_block_builder.add(&last_key, &value_bytes);
self.offset += block_len_on_disk;
self.data_block_builder = BlockBuilder::new();
}
fn write_compressed_block(&mut self, raw_data: &[u8]) -> u64 {
let compressed = snap::raw::Encoder::new()
.compress_vec(raw_data)
.unwrap_or_else(|_| raw_data.to_vec());
self.file.write_all(&[COMPRESSION_SNAPPY]).unwrap();
self.file.write_all(&compressed).unwrap();
1 + compressed.len() as u64
}
pub fn finish(&mut self) -> std::io::Result<()> {
if self.data_block_builder.buffer_len() > 0 {
let last_key = self.data_block_builder.last_key();
let raw_data = self.data_block_builder.finish().to_vec();
let block_len_on_disk = self.write_compressed_block(&raw_data);
let mut value_bytes = Vec::new();
varint::encode_u64(self.offset, &mut value_bytes);
varint::encode_u64(block_len_on_disk, &mut value_bytes);
self.index_block_builder.add(&last_key, &value_bytes);
self.offset += block_len_on_disk;
self.data_block_builder = BlockBuilder::new();
}
let index_offset = self.offset;
let index_data = self.index_block_builder.finish();
let index_size = index_data.len() as u64;
self.file.write_all(index_data)?;
self.offset += index_size;
let filter_offset = self.offset;
let filter_data = self.bloom_filter.to_bytes();
let filter_size = filter_data.len() as u64;
self.file.write_all(&filter_data)?;
self.offset += filter_size;
let mut footer = vec![0u8; 48];
footer[0..8].copy_from_slice(&index_offset.to_le_bytes());
footer[8..16].copy_from_slice(&index_size.to_le_bytes());
footer[16..24].copy_from_slice(&filter_offset.to_le_bytes());
footer[24..32].copy_from_slice(&filter_size.to_le_bytes());
self.file.write_all(&footer)?;
self.offset += 48;
self.file.sync_all()?;
std::fs::rename(&self.tmp_path, &self.path)?;
Ok(())
}
}
pub struct SSTableReader {
pub id: u64,
pub mmap: Mmap,
pub index_data: Vec<u8>,
pub bloom_filter: BloomFilter,
}
impl SSTableReader {
pub fn new(path: PathBuf) -> Self {
let id_str = path.file_stem().and_then(|s| s.to_str()).unwrap_or("0");
let id = id_str.parse::<u64>().unwrap_or(0);
let file = File::open(&path).unwrap();
let file_len = file.metadata().unwrap().len() as usize;
let mmap = unsafe { Mmap::map(&file).unwrap() };
let footer = &mmap[file_len - 48..];
let index_offset = u64::from_le_bytes(footer[0..8].try_into().unwrap()) as usize;
let index_size = u64::from_le_bytes(footer[8..16].try_into().unwrap()) as usize;
let filter_offset = u64::from_le_bytes(footer[16..24].try_into().unwrap()) as usize;
let filter_size = u64::from_le_bytes(footer[24..32].try_into().unwrap()) as usize;
let index_data = mmap[index_offset..index_offset + index_size].to_vec();
let filter_data = &mmap[filter_offset..filter_offset + filter_size];
let bloom_filter =
BloomFilter::from_bytes(filter_data).expect("Corrupt Bloom Filter metadata in SSTable");
Self {
id,
mmap,
index_data,
bloom_filter,
}
}
pub fn get(&self, key: &[u8], cache: Option<&BlockCache>) -> Option<Vec<u8>> {
if !self.bloom_filter.contains(key) {
return None;
}
let index_block = BlockReader::new(&self.index_data);
if let Some(value_bytes) = index_block.lookup(key) {
let mut ptr = 0;
let (offset, len1) = varint::decode_u64(&value_bytes[ptr..])?;
ptr += len1;
let (size, _) = varint::decode_u64(&value_bytes[ptr..])?;
let mut cached_block = None;
if let Some(c) = cache
&& let Ok(mut lru) = c.write()
&& let Some(block) = lru.get(&(self.id, offset))
{
cached_block = Some(std::sync::Arc::clone(block));
}
let block_data: std::sync::Arc<Vec<u8>> = if let Some(b) = cached_block {
b
} else {
let raw_block = &self.mmap[offset as usize..offset as usize + size as usize];
let compression_type = raw_block[0];
let payload = &raw_block[1..];
let decompressed = if compression_type == COMPRESSION_SNAPPY {
let mut decoder = snap::raw::Decoder::new();
decoder.decompress_vec(payload).ok()?
} else if compression_type == COMPRESSION_NONE {
payload.to_vec()
} else {
return None; };
let arc_data = std::sync::Arc::new(decompressed);
if let Some(c) = cache
&& let Ok(mut lru) = c.write()
{
lru.put((self.id, offset), std::sync::Arc::clone(&arc_data));
}
arc_data
};
let block_reader = BlockReader::new(&block_data);
if let Some(val) = block_reader.get(key) {
return Some(val.to_vec());
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
#[test]
fn test_sstable_builder_init() {
let file = NamedTempFile::new().unwrap();
let sstable = SSTableBuilder::new(file.path().to_path_buf());
assert!(file.path().exists());
assert_eq!(sstable.offset, 0);
}
#[test]
fn test_sstable_reader_init() {
let file = NamedTempFile::new().unwrap();
let mut sstable = SSTableBuilder::new(file.path().to_path_buf());
sstable.add(b"apple", b"val_apple");
sstable.add(b"banana", b"val_banana");
sstable.add(b"cat", b"val_cat");
sstable.finish().unwrap();
let reader = SSTableReader::new(file.path().to_path_buf());
assert!(reader.index_data.len() > 0);
let index_block = BlockReader::new(&reader.index_data);
assert!(index_block.num_restarts > 0);
}
#[test]
fn test_sstable_builder_flush() {
let file = NamedTempFile::new().unwrap();
let mut sstable = SSTableBuilder::new(file.path().to_path_buf());
let long_bytes = vec![0; 5000];
sstable.add(b"long_key", &long_bytes);
assert!(sstable.offset > 0);
}
#[test]
fn test_sstable_builder_full_lifecycle() {
let file = NamedTempFile::new().unwrap();
let mut sstable = SSTableBuilder::new(file.path().to_path_buf());
sstable.add(b"apple", b"val_apple");
sstable.add(b"banana", b"val_banana");
sstable.add(b"cat", b"val_cat");
assert_eq!(sstable.offset, 0);
sstable.finish().unwrap();
let metadata = std::fs::metadata(file.path()).unwrap();
assert_eq!(metadata.len(), sstable.offset);
assert!(sstable.offset > 48);
}
#[test]
fn test_sstable_reader_get() {
let file = NamedTempFile::new().unwrap();
let mut sstable = SSTableBuilder::new(file.path().to_path_buf());
for i in 0..1000 {
let key = format!("key{:04}", i);
let val = format!("value{:04}", i);
sstable.add(key.as_bytes(), val.as_bytes());
}
sstable.finish().unwrap();
let reader = SSTableReader::new(file.path().to_path_buf());
assert_eq!(reader.get(b"key0000", None).unwrap(), b"value0000");
assert_eq!(reader.get(b"key0500", None).unwrap(), b"value0500");
assert_eq!(reader.get(b"key0999", None).unwrap(), b"value0999");
assert_eq!(reader.get(b"key0000_not_exist", None), None);
assert_eq!(reader.get(b"missing", None), None);
assert_eq!(reader.get(b"zebra", None), None);
}
}