use super::{block::BlockReader, sst::SSTableReader, varint};
use crate::constants::{COMPRESSION_NONE, COMPRESSION_SNAPPY};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::path::PathBuf;
fn decompress_block(raw_block: &[u8]) -> Option<Vec<u8>> {
let compression_type = raw_block[0];
let payload = &raw_block[1..];
match compression_type {
t if t == COMPRESSION_SNAPPY => snap::raw::Decoder::new().decompress_vec(payload).ok(),
t if t == COMPRESSION_NONE => Some(payload.to_vec()),
_ => None,
}
}
pub struct SSTableIterator {
reader: SSTableReader,
current_block_idx: usize,
current_block_data: Option<Vec<u8>>,
current_block_ptr: usize,
current_key: Vec<u8>,
index_ptr: usize,
index_key: Vec<u8>,
}
impl SSTableIterator {
pub fn new(reader: SSTableReader) -> Self {
let mut iter = Self {
reader,
current_block_idx: 0,
current_block_data: None,
current_block_ptr: 0,
current_key: Vec::new(),
index_ptr: 0,
index_key: Vec::new(),
};
iter.load_next_block();
iter
}
fn load_next_block(&mut self) {
let index_block = BlockReader::new(&self.reader.index_data);
if self.index_ptr >= index_block.restarts_offset {
self.current_block_data = None;
return;
}
let ptr = self.index_ptr;
let (shared_len, len1) = varint::decode_u32(&index_block.data[ptr..]).unwrap();
let mut ptr = ptr + len1;
let (unshared_len, len2) = varint::decode_u32(&index_block.data[ptr..]).unwrap();
ptr += len2;
let (value_len, len3) = varint::decode_u32(&index_block.data[ptr..]).unwrap();
ptr += len3;
self.index_key.truncate(shared_len as usize);
let unshared_bytes = &index_block.data[ptr..ptr + unshared_len as usize];
self.index_key.extend_from_slice(unshared_bytes);
ptr += unshared_len as usize;
let value_bytes = &index_block.data[ptr..ptr + value_len as usize];
ptr += value_len as usize;
self.index_ptr = ptr;
let mut v_ptr = 0;
let (offset, v_len1) = varint::decode_u64(&value_bytes[v_ptr..]).unwrap();
v_ptr += v_len1;
let (size, _) = varint::decode_u64(&value_bytes[v_ptr..]).unwrap();
let raw_block = &self.reader.mmap[offset as usize..offset as usize + size as usize];
let Some(block_data) = decompress_block(raw_block) else {
return;
};
self.current_block_data = Some(block_data);
self.current_block_ptr = 0;
self.current_block_idx += 1;
self.current_key.clear();
}
}
impl Iterator for SSTableIterator {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(block_data) = &self.current_block_data {
let block_reader = BlockReader::new(block_data);
if self.current_block_ptr >= block_reader.restarts_offset {
self.load_next_block();
continue;
}
let ptr = self.current_block_ptr;
let (shared_len, len1) = varint::decode_u32(&block_data[ptr..]).unwrap();
let mut ptr = ptr + len1;
let (unshared_len, len2) = varint::decode_u32(&block_data[ptr..]).unwrap();
ptr += len2;
let (value_len, len3) = varint::decode_u32(&block_data[ptr..]).unwrap();
ptr += len3;
self.current_key.truncate(shared_len as usize);
let unshared_bytes = &block_data[ptr..ptr + unshared_len as usize];
self.current_key.extend_from_slice(unshared_bytes);
ptr += unshared_len as usize;
let value_bytes = &block_data[ptr..ptr + value_len as usize];
ptr += value_len as usize;
self.current_block_ptr = ptr;
return Some((self.current_key.clone(), value_bytes.to_vec()));
} else {
return None;
}
}
}
}
struct HeapItem {
key: Vec<u8>,
value: Vec<u8>,
table_index: usize,
}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> Ordering {
other
.key
.cmp(&self.key)
.then_with(|| other.table_index.cmp(&self.table_index))
}
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl PartialEq for HeapItem {
fn eq(&self, other: &Self) -> bool {
self.key == other.key && self.table_index == other.table_index
}
}
impl Eq for HeapItem {}
pub fn compact(input_paths: Vec<PathBuf>, output_path: PathBuf) -> std::io::Result<()> {
let mut iterators: Vec<SSTableIterator> = input_paths
.iter()
.map(|path| SSTableIterator::new(SSTableReader::new(path.clone())))
.collect();
let mut heap = BinaryHeap::new();
for (idx, iter) in iterators.iter_mut().enumerate() {
if let Some((k, v)) = iter.next() {
heap.push(HeapItem {
key: k,
value: v,
table_index: idx,
});
}
}
let mut builder = super::sst::SSTableBuilder::new(output_path);
let mut last_key_written: Option<Vec<u8>> = None;
while let Some(item) = heap.pop() {
if last_key_written.as_ref() != Some(&item.key) {
builder.add(&item.key, &item.value);
last_key_written = Some(item.key.clone());
}
if let Some((k, v)) = iterators[item.table_index].next() {
heap.push(HeapItem {
key: k,
value: v,
table_index: item.table_index,
});
}
}
builder.finish()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sstable::sst::SSTableBuilder;
use tempfile::NamedTempFile;
#[test]
fn test_sstable_iterator() {
let file = NamedTempFile::new().unwrap();
let mut sstable = SSTableBuilder::new(file.path().to_path_buf());
for i in 0..1000 {
let key = format!("key{:04}", i);
let val = format!("value{:04}", i);
sstable.add(key.as_bytes(), val.as_bytes());
}
sstable.finish().unwrap();
let reader = SSTableReader::new(file.path().to_path_buf());
let mut iter = SSTableIterator::new(reader);
for i in 0..1000 {
let (k, v) = iter.next().unwrap();
let key = format!("key{:04}", i);
let val = format!("value{:04}", i);
assert_eq!(k, key.as_bytes());
assert_eq!(v, val.as_bytes());
}
assert_eq!(iter.next(), None);
}
#[test]
fn test_compaction_basic_merge() {
let file1 = NamedTempFile::new().unwrap();
let file2 = NamedTempFile::new().unwrap();
let output = NamedTempFile::new().unwrap();
let mut t1 = SSTableBuilder::new(file1.path().to_path_buf());
t1.add(b"a", b"1");
t1.add(b"c", b"3");
t1.add(b"e", b"5");
t1.finish().unwrap();
let mut t2 = SSTableBuilder::new(file2.path().to_path_buf());
t2.add(b"b", b"2");
t2.add(b"d", b"4");
t2.add(b"f", b"6");
t2.finish().unwrap();
compact(
vec![file1.path().to_path_buf(), file2.path().to_path_buf()],
output.path().to_path_buf(),
)
.unwrap();
let reader = SSTableReader::new(output.path().to_path_buf());
let mut iter = SSTableIterator::new(reader);
assert_eq!(iter.next().unwrap(), (b"a".to_vec(), b"1".to_vec()));
assert_eq!(iter.next().unwrap(), (b"b".to_vec(), b"2".to_vec()));
assert_eq!(iter.next().unwrap(), (b"c".to_vec(), b"3".to_vec()));
assert_eq!(iter.next().unwrap(), (b"d".to_vec(), b"4".to_vec()));
assert_eq!(iter.next().unwrap(), (b"e".to_vec(), b"5".to_vec()));
assert_eq!(iter.next().unwrap(), (b"f".to_vec(), b"6".to_vec()));
assert_eq!(iter.next(), None);
}
#[test]
fn test_compaction_overwrite_resolution() {
let file_old = NamedTempFile::new().unwrap();
let file_new = NamedTempFile::new().unwrap();
let output = NamedTempFile::new().unwrap();
let mut t_old = SSTableBuilder::new(file_old.path().to_path_buf());
t_old.add(b"apple", b"old_val");
t_old.add(b"banana", b"old_val");
t_old.finish().unwrap();
let mut t_new = SSTableBuilder::new(file_new.path().to_path_buf());
t_new.add(b"apple", b"new_val");
t_new.add(b"cat", b"new_val"); t_new.finish().unwrap();
compact(
vec![file_new.path().to_path_buf(), file_old.path().to_path_buf()],
output.path().to_path_buf(),
)
.unwrap();
let reader = SSTableReader::new(output.path().to_path_buf());
let mut iter = SSTableIterator::new(reader);
assert_eq!(
iter.next().unwrap(),
(b"apple".to_vec(), b"new_val".to_vec())
);
assert_eq!(
iter.next().unwrap(),
(b"banana".to_vec(), b"old_val".to_vec())
);
assert_eq!(iter.next().unwrap(), (b"cat".to_vec(), b"new_val".to_vec()));
assert_eq!(iter.next(), None);
}
}