use std::collections::hash_map::DefaultHasher;
use std::fs::File;
use std::hash::{Hash, Hasher};
use std::io::{Cursor, Read, Seek, SeekFrom};
use std::path::Path;
use std::sync::Arc;
use byteorder::{LittleEndian, ReadBytesExt};
use bytes::Bytes;
use memmap2::{Mmap, MmapOptions};
use super::super::cache::{BlockCache, CacheKey};
use super::types::SSTABLE_VERSION_V1;
use super::{
decompress_block, BloomFilter, CompressionType, SSTableIterator, FOOTER_SIZE, SSTABLE_MAGIC,
SSTABLE_VERSION,
};
use crate::core::error::{Error, Result};
pub struct SSTableReader {
file_id: u64,
mmap: Mmap,
format_version: u32,
index: SSTableIndex,
bloom_filter: Option<BloomFilter>,
cache: Option<Arc<BlockCache>>,
}
pub(crate) struct SSTableIndex {
entries: Vec<IndexEntry>,
}
#[derive(Debug, Clone)]
pub(crate) struct IndexEntry {
pub(crate) last_key: Bytes,
pub(crate) block_offset: u64,
pub(crate) block_size: u32,
}
#[derive(Debug, Clone)]
pub(crate) struct BlockInfo {
pub offset: u64,
pub size: u32,
}
#[derive(Debug, Clone)]
pub(crate) enum SSTableValue {
Value(Bytes),
Tombstone,
}
impl SSTableReader {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().to_path_buf();
let file = File::open(&path)?;
let file_size = file.metadata()?.len();
let mmap = unsafe {
MmapOptions::new().map(&file).map_err(|e| Error::Io {
message: "Failed to mmap SSTable".to_string(),
source: e,
})?
};
if file_size < FOOTER_SIZE as u64 {
return Err(Error::SSTable {
message: "SSTable file too small".to_string(),
source: None,
});
}
let footer_offset = file_size - FOOTER_SIZE as u64;
let mut cursor = Cursor::new(&mmap[footer_offset as usize..]);
let index_offset = cursor.read_u64::<LittleEndian>()?;
let index_size = cursor.read_u32::<LittleEndian>()?;
let bloom_offset = cursor.read_u64::<LittleEndian>()?;
let bloom_size = cursor.read_u32::<LittleEndian>()?;
let mut magic = [0u8; 8];
cursor.read_exact(&mut magic)?;
if &magic != SSTABLE_MAGIC {
return Err(Error::SSTable {
message: "Invalid SSTable magic number".to_string(),
source: None,
});
}
let version = cursor.read_u32::<LittleEndian>()?;
if version != SSTABLE_VERSION && version != SSTABLE_VERSION_V1 {
return Err(Error::SSTable {
message: format!("Unsupported SSTable version: {}", version),
source: None,
});
}
let stored_checksum = cursor.read_u32::<LittleEndian>()?;
let mut footer_hasher = crc32fast::Hasher::new();
footer_hasher.update(&index_offset.to_le_bytes());
footer_hasher.update(&index_size.to_le_bytes());
footer_hasher.update(&bloom_offset.to_le_bytes());
footer_hasher.update(&bloom_size.to_le_bytes());
footer_hasher.update(&magic);
footer_hasher.update(&version.to_le_bytes());
let computed_checksum = footer_hasher.finalize();
if stored_checksum != computed_checksum {
return Err(Error::SSTable {
message: format!(
"Footer checksum mismatch: stored={:#010x}, computed={:#010x}",
stored_checksum, computed_checksum
),
source: None,
});
}
let index_end = (index_offset + index_size as u64) as usize;
if index_end > mmap.len() {
return Err(Error::SSTable {
message: format!(
"Index offset/size exceeds file: end={}, file_len={}",
index_end,
mmap.len()
),
source: None,
});
}
let index_data = &mmap[index_offset as usize..index_end];
let index = SSTableIndex::load(index_data)?;
let bloom_filter = if bloom_size > 0 {
let bloom_end = (bloom_offset + bloom_size as u64) as usize;
if bloom_end > mmap.len() {
return Err(Error::SSTable {
message: format!(
"Bloom filter offset/size exceeds file: end={}, file_len={}",
bloom_end,
mmap.len()
),
source: None,
});
}
let bloom_data = &mmap[bloom_offset as usize..bloom_end];
Some(Self::deserialize_bloom_filter(bloom_data)?)
} else {
None
};
let mut hasher = DefaultHasher::new();
path.hash(&mut hasher);
let file_id = hasher.finish();
Ok(Self {
file_id,
mmap,
format_version: version,
index,
bloom_filter,
cache: None,
})
}
pub fn open_with_cache(path: impl AsRef<Path>, cache: Arc<BlockCache>) -> Result<Self> {
let mut reader = Self::open(path)?;
reader.cache = Some(cache);
Ok(reader)
}
pub fn set_cache(&mut self, cache: Arc<BlockCache>) {
self.cache = Some(cache);
}
pub fn get(&self, key: &[u8]) -> Result<Option<Bytes>> {
Ok(match self.get_entry(key)? {
Some(SSTableValue::Value(value)) => Some(value),
Some(SSTableValue::Tombstone) | None => None,
})
}
pub(crate) fn get_entry(&self, key: &[u8]) -> Result<Option<SSTableValue>> {
if let Some(ref bloom) = self.bloom_filter {
if !bloom.contains(key) {
return Ok(None);
}
}
let block_info = match self.index.find_block(key) {
Some(info) => info,
None => return Ok(None),
};
let block_data = self.read_block(block_info.offset, block_info.size)?;
self.search_block_entry(&block_data, key)
}
pub(crate) fn read_block(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
let cache_key = CacheKey::new(self.file_id, offset);
if let Some(ref cache) = self.cache {
if let Some(cached) = cache.get(&cache_key) {
return Ok(cached.to_vec());
}
}
let block_end = offset + size as u64 - 5; let footer_end = (block_end + 5) as usize;
if footer_end > self.mmap.len() {
return Err(Error::SSTable {
message: format!(
"Block offset/size exceeds file: end={}, file_len={}",
footer_end,
self.mmap.len()
),
source: None,
});
}
let block_data = &self.mmap[offset as usize..block_end as usize];
let compression = CompressionType::try_from(self.mmap[block_end as usize])?;
let crc = (&self.mmap[(block_end + 1) as usize..(block_end + 5) as usize])
.read_u32::<LittleEndian>()?;
if crc32fast::hash(block_data) != crc {
return Err(Error::SSTable {
message: "Block CRC mismatch".to_string(),
source: None,
});
}
let decompressed = decompress_block(block_data, compression)?;
if let Some(ref cache) = self.cache {
cache.insert(cache_key, Bytes::copy_from_slice(&decompressed));
}
Ok(decompressed)
}
fn search_block_entry(
&self,
block_data: &[u8],
target_key: &[u8],
) -> Result<Option<SSTableValue>> {
let mut cursor = Cursor::new(block_data);
let data_len = block_data.len();
if data_len < 4 {
return Ok(None);
}
cursor.seek(SeekFrom::End(-4))?;
let entry_count = cursor.read_u32::<LittleEndian>()? as usize;
let offsets_start = data_len - 4 - (entry_count * 4);
cursor.seek(SeekFrom::Start(offsets_start as u64))?;
let mut offsets = Vec::with_capacity(entry_count);
for _ in 0..entry_count {
offsets.push(cursor.read_u32::<LittleEndian>()?);
}
let mut left = 0;
let mut right = entry_count;
while left < right {
let mid = left + (right - left) / 2;
cursor.seek(SeekFrom::Start(offsets[mid] as u64))?;
let key_len = cursor.read_u32::<LittleEndian>()? as usize;
let mut key = vec![0u8; key_len];
cursor.read_exact(&mut key)?;
match key.as_slice().cmp(target_key) {
std::cmp::Ordering::Equal => {
return self.read_value(&mut cursor).map(Some);
}
std::cmp::Ordering::Less => left = mid + 1,
std::cmp::Ordering::Greater => right = mid,
}
}
Ok(None)
}
pub(crate) fn read_value(&self, cursor: &mut Cursor<&[u8]>) -> Result<SSTableValue> {
if self.format_version == SSTABLE_VERSION_V1 {
let value_len = cursor.read_u32::<LittleEndian>()? as usize;
let mut value = vec![0u8; value_len];
cursor.read_exact(&mut value)?;
return Ok(if value.is_empty() {
SSTableValue::Tombstone
} else {
SSTableValue::Value(Bytes::from(value))
});
}
let marker = cursor.read_u8()?;
let value_len = cursor.read_u32::<LittleEndian>()? as usize;
let mut value = vec![0u8; value_len];
cursor.read_exact(&mut value)?;
match marker {
0 => Ok(SSTableValue::Tombstone),
1 => Ok(SSTableValue::Value(Bytes::from(value))),
other => Err(Error::SSTable {
message: format!("Invalid SSTable value marker: {}", other),
source: None,
}),
}
}
pub fn iter(&self) -> SSTableIterator<'_> {
SSTableIterator::new(self)
}
pub(crate) fn index(&self) -> &SSTableIndex {
&self.index
}
fn deserialize_bloom_filter(data: &[u8]) -> Result<BloomFilter> {
if data.len() < 12 {
return Err(Error::SSTable {
message: "Invalid bloom filter data".to_string(),
source: None,
});
}
let mut cursor = Cursor::new(&data[data.len() - 12..]);
let _num_hash_functions = cursor.read_u32::<LittleEndian>()? as usize;
let _num_bits = cursor.read_u32::<LittleEndian>()? as usize;
let bits_per_key = cursor.read_u32::<LittleEndian>()? as usize;
let bits_data = data[..data.len() - 12].to_vec();
Ok(BloomFilter::from_bytes(bits_data, bits_per_key))
}
}
impl SSTableIndex {
pub(crate) fn load(data: &[u8]) -> Result<Self> {
let mut cursor = Cursor::new(data);
let mut entries = Vec::new();
cursor.seek(SeekFrom::End(-4))?;
let entry_count = cursor.read_u32::<LittleEndian>()? as usize;
cursor.seek(SeekFrom::Start(0))?;
for _ in 0..entry_count {
let key_len = cursor.read_u32::<LittleEndian>()? as usize;
let mut key = vec![0u8; key_len];
cursor.read_exact(&mut key)?;
let block_offset = cursor.read_u64::<LittleEndian>()?;
let block_size = cursor.read_u32::<LittleEndian>()?;
entries.push(IndexEntry {
last_key: Bytes::from(key),
block_offset,
block_size,
});
}
Ok(Self { entries })
}
pub(crate) fn find_block(&self, key: &[u8]) -> Option<BlockInfo> {
if self.entries.is_empty() {
return None;
}
let idx = self
.entries
.partition_point(|entry| entry.last_key.as_ref() < key);
if idx < self.entries.len() {
let entry = &self.entries[idx];
Some(BlockInfo {
offset: entry.block_offset,
size: entry.block_size,
})
} else {
None
}
}
pub(crate) fn entries(&self) -> &[IndexEntry] {
&self.entries
}
}