pub mod builder;
pub mod iterator;
#[cfg(test)]
mod tests;
#[allow(unused_imports)] pub use crate::engine::{PointEntry, RangeTombstone, Record};
pub use builder::SstWriter;
#[allow(unused_imports)] pub use iterator::{BlockEntry, BlockIterator, ScanIterator};
use std::sync::Arc;
use std::{fs::File, io, path::Path};
use crate::encoding::{self, EncodingError};
use bloomfilter::Bloom;
use crc32fast::Hasher as Crc32;
use memmap2::Mmap;
use thiserror::Error;
use tracing::{debug, info, warn};
const SST_HDR_MAGIC: [u8; 4] = *b"SST0";
const SST_HDR_VERSION: u32 = 1;
const SST_BLOOM_FILTER_FALSE_POSITIVE_RATE: f64 = 0.01;
const SST_DATA_BLOCK_MAX_SIZE: usize = 4096;
const SST_FOOTER_SIZE: usize = 44;
const SST_HDR_SIZE: usize = 12;
const SST_DATA_BLOCK_LEN_SIZE: usize = 4;
const SST_DATA_BLOCK_CHECKSUM_SIZE: usize = 4;
pub(crate) fn crc32(data: &[u8]) -> u32 {
let mut hasher = Crc32::new();
hasher.update(data);
hasher.finalize()
}
#[derive(Debug, Error)]
#[non_exhaustive]
pub enum SSTableError {
#[error("I/O error: {0}")]
Io(#[from] io::Error),
#[error("Encoding error: {0}")]
Encoding(#[from] EncodingError),
#[error("Internal error: {0}")]
Internal(String),
#[error("Checksum mismatch")]
ChecksumMismatch,
}
#[derive(Debug, Default)]
pub(crate) struct SSTableHeader {
magic: [u8; 4],
version: u32,
header_crc: u32,
}
#[derive(Debug)]
pub(crate) struct SSTableDataBlock {
pub(crate) data: Vec<u8>,
}
#[derive(Debug)]
pub(crate) struct SSTableBloomBlock {
pub(crate) data: Vec<u8>,
}
#[derive(Debug)]
pub(crate) struct SSTableRangeTombstoneDataBlock {
pub(crate) data: Vec<SSTableRangeTombstoneCell>,
}
#[derive(Debug)]
pub struct SSTablePropertiesBlock {
pub creation_timestamp: u64,
pub record_count: u64,
pub tombstone_count: u64,
pub range_tombstones_count: u64,
pub min_lsn: u64,
pub max_lsn: u64,
pub min_timestamp: u64,
pub max_timestamp: u64,
pub min_key: Vec<u8>,
pub max_key: Vec<u8>,
}
#[derive(Debug)]
pub(crate) struct SSTableIndexEntry {
pub(crate) separator_key: Vec<u8>,
pub(crate) handle: BlockHandle,
}
#[derive(Debug)]
pub(crate) struct SSTableFooter {
pub(crate) metaindex: BlockHandle,
pub(crate) index: BlockHandle,
pub(crate) total_file_size: u64,
pub(crate) footer_crc32: u32,
}
#[derive(Debug)]
pub(crate) struct SSTableCell {
pub(crate) key_len: u32,
pub(crate) value_len: u32,
pub(crate) timestamp: u64,
pub(crate) is_delete: bool,
pub(crate) lsn: u64,
}
#[derive(Debug)]
pub(crate) struct SSTableRangeTombstoneCell {
pub(crate) start_key: Vec<u8>,
pub(crate) end_key: Vec<u8>,
pub(crate) timestamp: u64,
pub(crate) lsn: u64,
}
#[derive(Debug)]
pub(crate) struct BlockHandle {
pub(crate) offset: u64,
pub(crate) size: u64,
}
#[derive(Debug)]
pub(crate) struct MetaIndexEntry {
pub(crate) name: String,
pub(crate) handle: BlockHandle,
}
mod encoding_impls;
#[derive(Debug, PartialEq, Clone)]
pub enum GetResult {
Put {
value: Vec<u8>,
lsn: u64,
timestamp: u64,
},
Delete {
lsn: u64,
timestamp: u64,
},
RangeDelete {
lsn: u64,
timestamp: u64,
},
NotFound,
}
impl GetResult {
pub fn lsn(&self) -> u64 {
match self {
Self::Put { lsn, .. } => *lsn,
Self::Delete { lsn, .. } => *lsn,
Self::RangeDelete { lsn, .. } => *lsn,
Self::NotFound => 0,
}
}
pub fn timestamp(&self) -> u64 {
match self {
Self::Put { timestamp, .. } => *timestamp,
Self::Delete { timestamp, .. } => *timestamp,
Self::RangeDelete { timestamp, .. } => *timestamp,
Self::NotFound => 0,
}
}
fn is_newer_than(&self, other_lsn: u64, other_ts: u64) -> bool {
let (lsn, ts) = (self.lsn(), self.timestamp());
lsn > other_lsn || (lsn == other_lsn && ts > other_ts)
}
}
pub struct SSTable {
id: u64,
pub(crate) mmap: Mmap,
#[allow(dead_code)]
pub(crate) header: SSTableHeader,
pub(crate) bloom: SSTableBloomBlock,
pub(crate) properties: SSTablePropertiesBlock,
pub(crate) range_deletes: SSTableRangeTombstoneDataBlock,
pub(crate) index: Vec<SSTableIndexEntry>,
pub(crate) footer: SSTableFooter,
}
impl SSTable {
pub fn id(&self) -> u64 {
self.id
}
pub(crate) fn set_id(&mut self, id: u64) {
self.id = id;
}
pub fn file_size(&self) -> u64 {
self.footer.total_file_size
}
pub fn max_lsn(&self) -> u64 {
self.properties.max_lsn
}
pub fn min_lsn(&self) -> u64 {
self.properties.min_lsn
}
pub fn record_count(&self) -> u64 {
self.properties.record_count
}
pub fn tombstone_count(&self) -> u64 {
self.properties.tombstone_count
}
pub fn range_tombstone_count(&self) -> u64 {
self.properties.range_tombstones_count
}
pub fn min_key(&self) -> &[u8] {
&self.properties.min_key
}
pub fn max_key(&self) -> &[u8] {
&self.properties.max_key
}
pub fn creation_timestamp(&self) -> u64 {
self.properties.creation_timestamp
}
pub fn min_timestamp(&self) -> u64 {
self.properties.min_timestamp
}
pub fn max_timestamp(&self) -> u64 {
self.properties.max_timestamp
}
pub fn bloom_may_contain(&self, key: &[u8]) -> bool {
if self.bloom.data.is_empty() {
return true; }
match Bloom::from_slice(&self.bloom.data) {
Ok(bloom) => bloom.check(key),
Err(_) => true, }
}
pub fn range_tombstone_iter(&self) -> impl Iterator<Item = crate::engine::RangeTombstone> + '_ {
self.range_deletes
.data
.iter()
.map(|rd| crate::engine::RangeTombstone {
start: rd.start_key.clone(),
end: rd.end_key.clone(),
lsn: rd.lsn,
timestamp: rd.timestamp,
})
}
pub fn open(path: impl AsRef<Path>) -> Result<Self, SSTableError> {
let path = path.as_ref();
debug!(?path, "opening SSTable");
let file = File::open(path)?;
let mmap = unsafe { Mmap::map(&file)? };
let file_len = mmap.len();
if file_len < SST_FOOTER_SIZE {
return Err(SSTableError::Internal("File too small".into()));
}
let (mut header, _) = encoding::decode_from_slice::<SSTableHeader>(&mmap[..SST_HDR_SIZE])?;
let header_checksum = header.header_crc;
header.header_crc = 0;
let header_bytes = encoding::encode_to_vec(&header)?;
let header_comp_checksum = crc32(&header_bytes);
if header_checksum != header_comp_checksum {
warn!(
?path,
expected = header_checksum,
actual = header_comp_checksum,
"header checksum mismatch"
);
return Err(SSTableError::ChecksumMismatch);
}
if header.magic != SST_HDR_MAGIC {
return Err(SSTableError::Internal(
"SSTable header magic mismatch".into(),
));
}
if header.version != SST_HDR_VERSION {
return Err(SSTableError::Internal(
"SSTable header version mismatch".into(),
));
}
let footer_start = file_len - SST_FOOTER_SIZE;
let (mut footer, _) = encoding::decode_from_slice::<SSTableFooter>(&mmap[footer_start..])?;
let footer_checksum = footer.footer_crc32;
footer.footer_crc32 = 0;
let footer_bytes = encoding::encode_to_vec(&footer)?;
let footer_comp_checksum = crc32(&footer_bytes);
if footer_checksum != footer_comp_checksum {
warn!(
?path,
expected = footer_checksum,
actual = footer_comp_checksum,
"footer checksum mismatch"
);
return Err(SSTableError::ChecksumMismatch);
}
let metaindex_data = Self::read_block_bytes(&mmap, &footer.metaindex)?;
let (meta_entries, _) = encoding::decode_vec::<MetaIndexEntry>(&metaindex_data)?;
let mut bloom_block: Option<BlockHandle> = None;
let mut properties_block: Option<BlockHandle> = None;
let mut range_deletes_block: Option<BlockHandle> = None;
for entry in meta_entries {
match entry.name.as_str() {
"filter.bloom" => bloom_block = Some(entry.handle),
"meta.properties" => properties_block = Some(entry.handle),
"meta.range_deletes" => range_deletes_block = Some(entry.handle),
_ => return Err(SSTableError::Internal("Unexpected match".into())),
}
}
let bloom = if let Some(bh) = bloom_block {
let bloom_bytes = Self::read_block_bytes(&mmap, &bh)?;
let (bloom, _) = encoding::decode_from_slice::<SSTableBloomBlock>(&bloom_bytes)
.map_err(|e| SSTableError::Internal(e.to_string()))?;
bloom
} else {
let bloom: Bloom<Vec<u8>> =
Bloom::new_for_fp_rate(1, SST_BLOOM_FILTER_FALSE_POSITIVE_RATE)
.map_err(|e| SSTableError::Internal(e.to_string()))?;
SSTableBloomBlock {
data: bloom.as_slice().to_vec(),
}
};
let properties = if let Some(pb) = properties_block {
let pbytes = Self::read_block_bytes(&mmap, &pb)?;
let (properties, _) = encoding::decode_from_slice::<SSTablePropertiesBlock>(&pbytes)?;
properties
} else {
return Err(SSTableError::Internal("SSTable missing properties".into()));
};
let range_deletes = if let Some(rh) = range_deletes_block {
let rbytes = Self::read_block_bytes(&mmap, &rh)?;
let (ranges, _) = encoding::decode_vec::<SSTableRangeTombstoneCell>(&rbytes)?;
SSTableRangeTombstoneDataBlock { data: ranges }
} else {
SSTableRangeTombstoneDataBlock { data: Vec::new() }
};
let index_bytes = Self::read_block_bytes(&mmap, &footer.index)?;
let (index_entries, _) = encoding::decode_vec::<SSTableIndexEntry>(&index_bytes)?;
info!(
?path,
file_size = footer.total_file_size,
record_count = properties.record_count,
"SSTable opened"
);
Ok(Self {
id: 0,
mmap,
header,
bloom,
properties,
range_deletes,
index: index_entries,
footer,
})
}
pub fn get(&self, key: &[u8]) -> Result<GetResult, SSTableError> {
let range_info = self.covering_range_for_key(key);
let bloom_maybe_present = if !self.bloom.data.is_empty() {
match Bloom::from_slice(&self.bloom.data) {
Ok(bloom) => bloom.check(key),
Err(_) => true, }
} else {
true };
if !bloom_maybe_present {
return Ok(match range_info {
Some((lsn, timestamp)) => GetResult::RangeDelete { lsn, timestamp },
None => GetResult::NotFound,
});
}
if self.index.is_empty() {
return Ok(match range_info {
Some((lsn, timestamp)) => GetResult::RangeDelete { lsn, timestamp },
None => GetResult::NotFound,
});
}
let block_idx = self.find_block_for_key(key);
let entry = &self.index[block_idx];
let raw = Self::read_block_bytes(&self.mmap, &entry.handle)?;
let (block, _) = encoding::decode_from_slice::<SSTableDataBlock>(&raw)?;
let mut iter = BlockIterator::new(block.data);
iter.seek_to(key);
let mut latest: Option<GetResult> = None;
for item in iter {
if item.key != key {
break;
}
let candidate = if item.is_delete {
GetResult::Delete {
lsn: item.lsn,
timestamp: item.timestamp,
}
} else {
GetResult::Put {
value: item.value.to_vec(),
lsn: item.lsn,
timestamp: item.timestamp,
}
};
latest = Some(match &latest {
Some(existing) if candidate.is_newer_than(existing.lsn(), existing.timestamp()) => {
candidate
}
Some(existing) => existing.clone(),
None => candidate,
});
}
let range_delete =
range_info.map(|(lsn, timestamp)| GetResult::RangeDelete { lsn, timestamp });
match (latest, range_delete) {
(None, None) => Ok(GetResult::NotFound),
(Some(r), None) => Ok(r),
(None, Some(rd)) => Ok(rd),
(Some(point), Some(rd)) => {
if rd.is_newer_than(point.lsn(), point.timestamp()) {
Ok(rd)
} else {
Ok(point)
}
}
}
}
pub fn scan(
&self,
start_key: &[u8],
end_key: &[u8],
) -> Result<impl Iterator<Item = Record> + use<'_>, SSTableError> {
ScanIterator::new(self, start_key.to_vec(), end_key.to_vec())
}
pub fn scan_owned(
this: &Arc<Self>,
start_key: &[u8],
end_key: &[u8],
) -> Result<ScanIterator<Arc<SSTable>>, SSTableError> {
ScanIterator::new(Arc::clone(this), start_key.to_vec(), end_key.to_vec())
}
pub(crate) fn read_block_bytes(
mmap: &Mmap,
handle: &BlockHandle,
) -> Result<Vec<u8>, SSTableError> {
let start = usize::try_from(handle.offset)
.map_err(|_| SSTableError::Internal("block offset exceeds addressable range".into()))?;
let size = usize::try_from(handle.size)
.map_err(|_| SSTableError::Internal("block size exceeds addressable range".into()))?;
if start + size > mmap.len() {
return Err(SSTableError::Internal("Block out of range".into()));
}
let mut cursor = start;
let len_bytes: [u8; SST_DATA_BLOCK_LEN_SIZE] = mmap
[cursor..cursor + SST_DATA_BLOCK_LEN_SIZE]
.try_into()
.map_err(|_| SSTableError::Internal("Short block length".into()))?;
let content_len = u32::from_le_bytes(len_bytes) as usize;
cursor += SST_DATA_BLOCK_LEN_SIZE;
if start + content_len > mmap.len() {
return Err(SSTableError::Internal("Block out of range".into()));
}
let content = &mmap[cursor..cursor + content_len];
cursor += content_len;
let checksum_bytes: [u8; SST_DATA_BLOCK_CHECKSUM_SIZE] = mmap
[cursor..cursor + SST_DATA_BLOCK_CHECKSUM_SIZE]
.try_into()
.map_err(|_| SSTableError::Internal("Short checksum".into()))?;
let stored_checksum = u32::from_le_bytes(checksum_bytes);
let mut hasher = Crc32::new();
hasher.update(content);
let computed_checksum = hasher.finalize();
if computed_checksum != stored_checksum {
return Err(SSTableError::ChecksumMismatch);
}
Ok(content.to_vec())
}
pub(crate) fn find_block_for_key(&self, key: &[u8]) -> usize {
if self.index.is_empty() {
return 0;
}
match self
.index
.binary_search_by(|entry| entry.separator_key.as_slice().cmp(key))
{
Ok(i) => i,
Err(0) => 0,
Err(i) => i - 1,
}
}
fn covering_range_for_key(&self, key: &[u8]) -> Option<(u64, u64)> {
let mut res: Option<(u64, u64)> = None;
for rd in &self.range_deletes.data {
if key >= rd.start_key.as_slice() && key < rd.end_key.as_slice() {
res = Some(match res {
Some((prev_lsn, prev_ts)) => {
if rd.lsn > prev_lsn || (rd.lsn == prev_lsn && rd.timestamp > prev_ts) {
(rd.lsn, rd.timestamp)
} else {
(prev_lsn, prev_ts)
}
}
None => (rd.lsn, rd.timestamp),
});
}
}
res
}
}