use crc32c::crc32c;
use std::io::{Cursor, Read, Seek, SeekFrom, Write};
use std::path::Path;
const MAGIC: &[u8; 8] = b"IRSTBL02";
const FENCE_GROUP_SIZE: usize = 64;
const PAGE_SIZE_BYTES: u64 = 8 * 1024;
use crate::core::reactor::Reactor;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum EntryKind {
FullNode = 0,
EdgeDelta = 1,
VectorDelta = 2,
Tombstone = 3,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Entry {
pub key: u64,
pub version: u64,
pub kind: EntryKind,
pub value: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FencePointer {
pub min_key: u64,
pub max_key: u64,
pub offset: u64,
}
#[derive(Debug, Clone)]
pub struct BloomFilter {
bits: Vec<u8>,
num_bits: u32,
num_hashes: u32,
}
impl BloomFilter {
pub fn new(num_bits: u32, num_hashes: u32) -> Self {
let byte_len = num_bits.div_ceil(8) as usize;
Self {
bits: vec![0u8; byte_len],
num_bits,
num_hashes,
}
}
pub fn insert(&mut self, key: u64) {
for i in 0..self.num_hashes {
let idx = hash_key(key, i, self.num_bits);
set_bit(&mut self.bits, idx);
}
}
pub fn may_contain(&self, key: u64) -> bool {
for i in 0..self.num_hashes {
let idx = hash_key(key, i, self.num_bits);
if !get_bit(&self.bits, idx) {
return false;
}
}
true
}
pub fn from_bits(bits: Vec<u8>, num_bits: u32, num_hashes: u32) -> Self {
Self {
bits,
num_bits,
num_hashes,
}
}
pub fn bits(&self) -> &[u8] {
&self.bits
}
pub fn num_bits(&self) -> u32 {
self.num_bits
}
pub fn num_hashes(&self) -> u32 {
self.num_hashes
}
}
#[derive(Debug)]
pub enum SstableError {
Io(std::io::Error),
Corrupt(String),
InvalidInput(String),
}
pub type Result<T> = std::result::Result<T, SstableError>;
impl From<std::io::Error> for SstableError {
fn from(err: std::io::Error) -> Self {
Self::Io(err)
}
}
#[derive(Debug, Clone)]
pub struct Sstable {
pub entries: Vec<Entry>,
pub bloom: BloomFilter,
pub fences: Vec<FencePointer>,
}
pub fn write_sstable(path: &Path, entries: &[Entry]) -> Result<Sstable> {
write_sstable_with_reactor(path, entries, &crate::core::reactor::SystemReactor)
}
pub fn write_sstable_with_reactor(
path: &Path,
entries: &[Entry],
reactor: &dyn Reactor,
) -> Result<Sstable> {
if entries.is_empty() {
return Err(SstableError::InvalidInput(
"entries must not be empty".to_string(),
));
}
let mut sorted = entries.to_vec();
sorted.sort_by(|a, b| (a.key, a.version).cmp(&(b.key, b.version)));
let num_bits = (sorted.len() as u32).saturating_mul(12).max(128);
let num_hashes = 7u32;
let mut bloom = BloomFilter::new(num_bits, num_hashes);
for entry in &sorted {
bloom.insert(entry.key);
}
let mut fences = Vec::new();
let mut offset = header_len() as u64;
for chunk in sorted.chunks(FENCE_GROUP_SIZE) {
let min_key = chunk.first().unwrap().key;
let max_key = chunk.last().unwrap().key;
fences.push(FencePointer {
min_key,
max_key,
offset,
});
for entry in chunk {
offset += entry_len(entry) as u64;
}
}
let bloom_offset = align_offset(offset, PAGE_SIZE_BYTES);
let fence_offset = align_offset(bloom_offset + bloom.bits().len() as u64, PAGE_SIZE_BYTES);
let mut buffer = Vec::new();
{
let mut cursor = Cursor::new(&mut buffer);
write_header(
&mut cursor,
sorted.len() as u32,
bloom.num_bits(),
bloom.num_hashes(),
fences.len() as u32,
bloom_offset,
fence_offset,
)?;
for entry in &sorted {
write_entry(&mut cursor, entry)?;
}
let entries_end = cursor.stream_position()?;
if entries_end < bloom_offset {
write_padding(&mut cursor, (bloom_offset - entries_end) as usize)?;
}
cursor.write_all(bloom.bits())?;
let bloom_end = cursor.stream_position()?;
if bloom_end < fence_offset {
write_padding(&mut cursor, (fence_offset - bloom_end) as usize)?;
}
for fence in &fences {
cursor.write_all(&fence.min_key.to_le_bytes())?;
cursor.write_all(&fence.max_key.to_le_bytes())?;
cursor.write_all(&fence.offset.to_le_bytes())?;
}
let file_end = cursor.stream_position()?;
let padded_end = align_offset(file_end, PAGE_SIZE_BYTES);
if padded_end > file_end {
write_padding(&mut cursor, (padded_end - file_end) as usize)?;
}
}
reactor.write_file(path, &buffer)?;
Ok(Sstable {
entries: sorted,
bloom,
fences,
})
}
pub fn read_sstable(path: &Path) -> Result<Sstable> {
read_sstable_with_reactor(path, &crate::core::reactor::SystemReactor)
}
pub fn read_sstable_with_reactor(path: &Path, reactor: &dyn Reactor) -> Result<Sstable> {
let buffer = reactor.read_file(path)?;
let mut cursor = Cursor::new(buffer);
let header = read_header(&mut cursor)?;
let total_len = cursor.get_ref().len() as u64;
if header.bloom_offset % PAGE_SIZE_BYTES != 0 || header.fence_offset % PAGE_SIZE_BYTES != 0 {
return Err(SstableError::Corrupt(
"unaligned section offsets".to_string(),
));
}
if header.bloom_offset > total_len || header.fence_offset > total_len {
return Err(SstableError::Corrupt(
"section offset out of bounds".to_string(),
));
}
let bloom_end = header.bloom_offset.saturating_add(header.bloom_len as u64);
if bloom_end > total_len {
return Err(SstableError::Corrupt(
"bloom block out of bounds".to_string(),
));
}
if header.fence_offset < bloom_end {
return Err(SstableError::Corrupt(
"fence offset before bloom end".to_string(),
));
}
let mut entries = Vec::with_capacity(header.entry_count as usize);
for _ in 0..header.entry_count {
entries.push(read_entry(&mut cursor)?);
}
cursor.seek(SeekFrom::Start(header.bloom_offset))?;
let mut bloom_bits = vec![0u8; header.bloom_len as usize];
cursor.read_exact(&mut bloom_bits)?;
let bloom = BloomFilter::from_bits(bloom_bits, header.bloom_bits, header.bloom_hashes);
let mut fences = Vec::with_capacity(header.fence_count as usize);
cursor.seek(SeekFrom::Start(header.fence_offset))?;
for _ in 0..header.fence_count {
fences.push(read_fence(&mut cursor)?);
}
Ok(Sstable {
entries,
bloom,
fences,
})
}
pub fn sstable_iterator(path: &Path) -> Result<Vec<Entry>> {
Ok(read_sstable(path)?.entries)
}
pub fn read_entries_for_key(path: &Path, key: u64) -> Result<Vec<Entry>> {
read_entries_for_key_with_reactor(path, key, &crate::core::reactor::SystemReactor)
}
pub fn read_entries_for_key_with_reactor(
path: &Path,
key: u64,
reactor: &dyn Reactor,
) -> Result<Vec<Entry>> {
let table = read_sstable_with_reactor(path, reactor)?;
Ok(read_entries_for_key_in_table(&table, key))
}
pub fn read_entries_for_key_in_table(table: &Sstable, key: u64) -> Vec<Entry> {
if !table.bloom.may_contain(key) {
return Vec::new();
}
let mut in_fence = false;
for fence in &table.fences {
if key >= fence.min_key && key <= fence.max_key {
in_fence = true;
break;
}
}
if !in_fence {
return Vec::new();
}
if table.entries.is_empty() {
return Vec::new();
}
let start = table.entries.partition_point(|entry| entry.key < key);
if start >= table.entries.len() || table.entries[start].key != key {
return Vec::new();
}
let end = start + table.entries[start..].partition_point(|entry| entry.key == key);
table.entries[start..end].to_vec()
}
struct Header {
entry_count: u32,
bloom_bits: u32,
bloom_hashes: u32,
bloom_len: u32,
fence_count: u32,
bloom_offset: u64,
fence_offset: u64,
}
fn header_len() -> usize {
8 + 4 * 5 + 8 * 2
}
fn write_header(
file: &mut impl Write,
entry_count: u32,
bloom_bits: u32,
bloom_hashes: u32,
fence_count: u32,
bloom_offset: u64,
fence_offset: u64,
) -> Result<()> {
let bloom_len = bloom_bits.div_ceil(8);
file.write_all(MAGIC)?;
file.write_all(&entry_count.to_le_bytes())?;
file.write_all(&bloom_bits.to_le_bytes())?;
file.write_all(&bloom_hashes.to_le_bytes())?;
file.write_all(&bloom_len.to_le_bytes())?;
file.write_all(&fence_count.to_le_bytes())?;
file.write_all(&bloom_offset.to_le_bytes())?;
file.write_all(&fence_offset.to_le_bytes())?;
Ok(())
}
fn read_header(file: &mut impl Read) -> Result<Header> {
let mut magic = [0u8; 8];
file.read_exact(&mut magic)?;
if &magic != MAGIC {
return Err(SstableError::Corrupt("invalid magic".to_string()));
}
let entry_count = read_u32(file)?;
let bloom_bits = read_u32(file)?;
let bloom_hashes = read_u32(file)?;
let bloom_len = read_u32(file)?;
let fence_count = read_u32(file)?;
let bloom_offset = read_u64(file)?;
let fence_offset = read_u64(file)?;
Ok(Header {
entry_count,
bloom_bits,
bloom_hashes,
bloom_len,
fence_count,
bloom_offset,
fence_offset,
})
}
fn read_fence(file: &mut impl Read) -> Result<FencePointer> {
Ok(FencePointer {
min_key: read_u64(file)?,
max_key: read_u64(file)?,
offset: read_u64(file)?,
})
}
fn write_entry(file: &mut impl Write, entry: &Entry) -> Result<()> {
let key_bytes = entry.key.to_le_bytes();
let version_bytes = entry.version.to_le_bytes();
let len = entry.value.len() as u32;
let len_bytes = len.to_le_bytes();
file.write_all(&key_bytes)?;
file.write_all(&version_bytes)?;
file.write_all(&[entry.kind as u8])?;
file.write_all(&len_bytes)?;
file.write_all(&entry.value)?;
let checksum = entry_checksum(
&key_bytes,
&version_bytes,
entry.kind as u8,
&len_bytes,
&entry.value,
);
file.write_all(&checksum.to_le_bytes())?;
Ok(())
}
fn entry_checksum(
key_bytes: &[u8],
version_bytes: &[u8],
kind: u8,
len_bytes: &[u8],
value: &[u8],
) -> u32 {
let mut buf = Vec::with_capacity(8 + 8 + 1 + 4 + value.len());
buf.extend_from_slice(key_bytes);
buf.extend_from_slice(version_bytes);
buf.push(kind);
buf.extend_from_slice(len_bytes);
buf.extend_from_slice(value);
crc32c(&buf)
}
fn read_entry(file: &mut impl Read) -> Result<Entry> {
let key = read_u64(file)?;
let version = read_u64(file)?;
let mut kind_byte = [0u8; 1];
file.read_exact(&mut kind_byte)?;
let len = read_u32(file)?;
let mut value = vec![0u8; len as usize];
file.read_exact(&mut value)?;
let stored = read_u32(file)?;
let expected = entry_checksum(
&key.to_le_bytes(),
&version.to_le_bytes(),
kind_byte[0],
&len.to_le_bytes(),
&value,
);
if stored != expected {
return Err(SstableError::Corrupt(format!(
"entry checksum mismatch: stored={stored}, computed={expected}"
)));
}
let kind = match kind_byte[0] {
0 => EntryKind::FullNode,
1 => EntryKind::EdgeDelta,
2 => EntryKind::VectorDelta,
3 => EntryKind::Tombstone,
other => {
return Err(SstableError::Corrupt(format!(
"invalid entry kind {}",
other
)))
}
};
Ok(Entry {
key,
version,
kind,
value,
})
}
fn read_u32(file: &mut impl Read) -> Result<u32> {
let mut buf = [0u8; 4];
file.read_exact(&mut buf)?;
Ok(u32::from_le_bytes(buf))
}
fn read_u64(file: &mut impl Read) -> Result<u64> {
let mut buf = [0u8; 8];
file.read_exact(&mut buf)?;
Ok(u64::from_le_bytes(buf))
}
fn entry_len(entry: &Entry) -> usize {
8 + 8 + 1 + 4 + entry.value.len() + 4
}
fn align_offset(offset: u64, alignment: u64) -> u64 {
if alignment == 0 {
return offset;
}
let rem = offset % alignment;
if rem == 0 {
offset
} else {
offset + (alignment - rem)
}
}
fn write_padding(file: &mut impl Write, len: usize) -> Result<()> {
if len == 0 {
return Ok(());
}
let padding = vec![0u8; len];
file.write_all(&padding)?;
Ok(())
}
fn hash_key(key: u64, seed: u32, num_bits: u32) -> u32 {
let mut bytes = [0u8; 12];
bytes[..8].copy_from_slice(&key.to_le_bytes());
bytes[8..12].copy_from_slice(&seed.to_le_bytes());
let hash = crc32c(&bytes);
if num_bits == 0 {
0
} else {
hash % num_bits
}
}
fn set_bit(bits: &mut [u8], idx: u32) {
let byte = (idx / 8) as usize;
let bit = idx % 8;
if let Some(slot) = bits.get_mut(byte) {
*slot |= 1u8 << bit;
}
}
fn get_bit(bits: &[u8], idx: u32) -> bool {
let byte = (idx / 8) as usize;
let bit = idx % 8;
bits.get(byte)
.is_some_and(|slot| (slot & (1u8 << bit)) != 0)
}
#[cfg(test)]
mod tests;