use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use parking_lot::RwLock;
use rustc_hash::FxHashMap;
use std::io::{self, Read, Write};
use std::sync::Arc;
#[cfg(feature = "fst-index")]
use super::sstable_index::FstBlockIndex;
use super::sstable_index::{BlockAddr, BlockIndex, MmapBlockIndex};
use crate::compression::{CompressionDict, CompressionLevel};
use crate::directories::{FileHandle, OwnedBytes};
pub const SSTABLE_MAGIC: u32 = 0x53544234;
pub const BLOCK_SIZE: usize = 16 * 1024;
pub const DEFAULT_DICT_SIZE: usize = 64 * 1024;
pub const BLOOM_BITS_PER_KEY: usize = 10;
pub const BLOOM_HASH_COUNT: usize = 7;
#[derive(Debug, Clone)]
pub struct BloomFilter {
bits: BloomBits,
num_bits: usize,
num_hashes: usize,
}
#[derive(Debug, Clone)]
enum BloomBits {
Vec(Vec<u64>),
Bytes(OwnedBytes),
}
impl BloomBits {
#[inline]
fn len(&self) -> usize {
match self {
BloomBits::Vec(v) => v.len(),
BloomBits::Bytes(b) => b.len() / 8,
}
}
#[inline]
fn get(&self, word_idx: usize) -> u64 {
match self {
BloomBits::Vec(v) => v[word_idx],
BloomBits::Bytes(b) => {
let off = word_idx * 8;
u64::from_le_bytes([
b[off],
b[off + 1],
b[off + 2],
b[off + 3],
b[off + 4],
b[off + 5],
b[off + 6],
b[off + 7],
])
}
}
}
#[inline]
fn set_bit(&mut self, word_idx: usize, bit_idx: usize) {
match self {
BloomBits::Vec(v) => v[word_idx] |= 1u64 << bit_idx,
BloomBits::Bytes(_) => panic!("cannot mutate read-only bloom filter"),
}
}
fn size_bytes(&self) -> usize {
match self {
BloomBits::Vec(v) => v.len() * 8,
BloomBits::Bytes(b) => b.len(),
}
}
}
impl BloomFilter {
pub fn new(expected_keys: usize, bits_per_key: usize) -> Self {
let num_bits = (expected_keys * bits_per_key).max(64);
let num_words = num_bits.div_ceil(64);
Self {
bits: BloomBits::Vec(vec![0u64; num_words]),
num_bits,
num_hashes: BLOOM_HASH_COUNT,
}
}
pub fn from_bytes_mutable(data: &[u8]) -> io::Result<Self> {
if data.len() < 12 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Bloom filter data too short",
));
}
let num_bits = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let num_hashes = u32::from_le_bytes([data[4], data[5], data[6], data[7]]) as usize;
let num_words = u32::from_le_bytes([data[8], data[9], data[10], data[11]]) as usize;
if data.len() < 12 + num_words * 8 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Bloom filter data truncated",
));
}
let mut vec = vec![0u64; num_words];
for (i, v) in vec.iter_mut().enumerate() {
let off = 12 + i * 8;
*v = u64::from_le_bytes(data[off..off + 8].try_into().unwrap());
}
Ok(Self {
bits: BloomBits::Vec(vec),
num_bits,
num_hashes,
})
}
pub fn from_owned_bytes(data: OwnedBytes) -> io::Result<Self> {
if data.len() < 12 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Bloom filter data too short",
));
}
let d = data.as_slice();
let num_bits = u32::from_le_bytes([d[0], d[1], d[2], d[3]]) as usize;
let num_hashes = u32::from_le_bytes([d[4], d[5], d[6], d[7]]) as usize;
let num_words = u32::from_le_bytes([d[8], d[9], d[10], d[11]]) as usize;
if d.len() < 12 + num_words * 8 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Bloom filter data truncated",
));
}
let bits_bytes = data.slice(12..12 + num_words * 8);
Ok(Self {
bits: BloomBits::Bytes(bits_bytes),
num_bits,
num_hashes,
})
}
pub fn to_bytes(&self) -> Vec<u8> {
let num_words = self.bits.len();
let mut data = Vec::with_capacity(12 + num_words * 8);
data.write_u32::<LittleEndian>(self.num_bits as u32)
.unwrap();
data.write_u32::<LittleEndian>(self.num_hashes as u32)
.unwrap();
data.write_u32::<LittleEndian>(num_words as u32).unwrap();
for i in 0..num_words {
data.write_u64::<LittleEndian>(self.bits.get(i)).unwrap();
}
data
}
pub fn insert(&mut self, key: &[u8]) {
let (h1, h2) = self.hash_pair(key);
for i in 0..self.num_hashes {
let bit_pos = self.get_bit_pos(h1, h2, i);
let word_idx = bit_pos / 64;
let bit_idx = bit_pos % 64;
if word_idx < self.bits.len() {
self.bits.set_bit(word_idx, bit_idx);
}
}
}
pub fn may_contain(&self, key: &[u8]) -> bool {
let (h1, h2) = self.hash_pair(key);
for i in 0..self.num_hashes {
let bit_pos = self.get_bit_pos(h1, h2, i);
let word_idx = bit_pos / 64;
let bit_idx = bit_pos % 64;
if word_idx >= self.bits.len() || (self.bits.get(word_idx) & (1u64 << bit_idx)) == 0 {
return false;
}
}
true
}
pub fn size_bytes(&self) -> usize {
12 + self.bits.size_bytes()
}
pub fn insert_hashed(&mut self, h1: u64, h2: u64) {
for i in 0..self.num_hashes {
let bit_pos = self.get_bit_pos(h1, h2, i);
let word_idx = bit_pos / 64;
let bit_idx = bit_pos % 64;
if word_idx < self.bits.len() {
self.bits.set_bit(word_idx, bit_idx);
}
}
}
#[inline]
fn hash_pair(&self, key: &[u8]) -> (u64, u64) {
let mut h1: u64 = 0xcbf29ce484222325;
let mut h2: u64 = 0x84222325cbf29ce4;
for &byte in key {
h1 ^= byte as u64;
h1 = h1.wrapping_mul(0x100000001b3);
h2 = h2.wrapping_mul(0x100000001b3);
h2 ^= byte as u64;
}
(h1, h2)
}
#[inline]
fn get_bit_pos(&self, h1: u64, h2: u64, i: usize) -> usize {
(h1.wrapping_add((i as u64).wrapping_mul(h2)) % (self.num_bits as u64)) as usize
}
}
#[inline]
fn bloom_hash_pair(key: &[u8]) -> (u64, u64) {
let mut h1: u64 = 0xcbf29ce484222325;
let mut h2: u64 = 0x84222325cbf29ce4;
for &byte in key {
h1 ^= byte as u64;
h1 = h1.wrapping_mul(0x100000001b3);
h2 = h2.wrapping_mul(0x100000001b3);
h2 ^= byte as u64;
}
(h1, h2)
}
pub trait SSTableValue: Clone + Send + Sync {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()>;
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self>;
}
impl SSTableValue for u64 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
write_vint(writer, *self)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
read_vint(reader)
}
}
impl SSTableValue for Vec<u8> {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
write_vint(writer, self.len() as u64)?;
writer.write_all(self)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let len = read_vint(reader)? as usize;
let mut data = vec![0u8; len];
reader.read_exact(&mut data)?;
Ok(data)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SparseDimInfo {
pub offset: u64,
pub length: u32,
}
impl SparseDimInfo {
pub fn new(offset: u64, length: u32) -> Self {
Self { offset, length }
}
}
impl SSTableValue for SparseDimInfo {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
write_vint(writer, self.offset)?;
write_vint(writer, self.length as u64)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let offset = read_vint(reader)?;
let length = read_vint(reader)? as u32;
Ok(Self { offset, length })
}
}
pub const MAX_INLINE_POSTINGS: usize = 3;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TermInfo {
Inline {
doc_freq: u8,
data: [u8; 16],
data_len: u8,
},
External {
posting_offset: u64,
posting_len: u64,
doc_freq: u32,
position_offset: u64,
position_len: u64,
},
}
impl TermInfo {
pub fn external(posting_offset: u64, posting_len: u64, doc_freq: u32) -> Self {
TermInfo::External {
posting_offset,
posting_len,
doc_freq,
position_offset: 0,
position_len: 0,
}
}
pub fn external_with_positions(
posting_offset: u64,
posting_len: u64,
doc_freq: u32,
position_offset: u64,
position_len: u64,
) -> Self {
TermInfo::External {
posting_offset,
posting_len,
doc_freq,
position_offset,
position_len,
}
}
pub fn try_inline(doc_ids: &[u32], term_freqs: &[u32]) -> Option<Self> {
if doc_ids.len() > MAX_INLINE_POSTINGS || doc_ids.is_empty() {
return None;
}
let mut data = [0u8; 16];
let mut cursor = std::io::Cursor::new(&mut data[..]);
let mut prev_doc_id = 0u32;
for (i, &doc_id) in doc_ids.iter().enumerate() {
let delta = doc_id - prev_doc_id;
if write_vint(&mut cursor, delta as u64).is_err() {
return None;
}
if write_vint(&mut cursor, term_freqs[i] as u64).is_err() {
return None;
}
prev_doc_id = doc_id;
}
let data_len = cursor.position() as u8;
if data_len > 16 {
return None;
}
Some(TermInfo::Inline {
doc_freq: doc_ids.len() as u8,
data,
data_len,
})
}
pub fn try_inline_iter(count: usize, iter: impl Iterator<Item = (u32, u32)>) -> Option<Self> {
if count > MAX_INLINE_POSTINGS || count == 0 {
return None;
}
let mut data = [0u8; 16];
let mut cursor = std::io::Cursor::new(&mut data[..]);
let mut prev_doc_id = 0u32;
for (doc_id, tf) in iter {
let delta = doc_id - prev_doc_id;
if write_vint(&mut cursor, delta as u64).is_err() {
return None;
}
if write_vint(&mut cursor, tf as u64).is_err() {
return None;
}
prev_doc_id = doc_id;
}
let data_len = cursor.position() as u8;
Some(TermInfo::Inline {
doc_freq: count as u8,
data,
data_len,
})
}
pub fn doc_freq(&self) -> u32 {
match self {
TermInfo::Inline { doc_freq, .. } => *doc_freq as u32,
TermInfo::External { doc_freq, .. } => *doc_freq,
}
}
pub fn is_inline(&self) -> bool {
matches!(self, TermInfo::Inline { .. })
}
pub fn external_info(&self) -> Option<(u64, u64)> {
match self {
TermInfo::External {
posting_offset,
posting_len,
..
} => Some((*posting_offset, *posting_len)),
TermInfo::Inline { .. } => None,
}
}
pub fn position_info(&self) -> Option<(u64, u64)> {
match self {
TermInfo::External {
position_offset,
position_len,
..
} if *position_len > 0 => Some((*position_offset, *position_len)),
_ => None,
}
}
pub fn decode_inline(&self) -> Option<(Vec<u32>, Vec<u32>)> {
match self {
TermInfo::Inline {
doc_freq,
data,
data_len,
} => {
let mut doc_ids = Vec::with_capacity(*doc_freq as usize);
let mut term_freqs = Vec::with_capacity(*doc_freq as usize);
let mut reader = &data[..*data_len as usize];
let mut prev_doc_id = 0u32;
for _ in 0..*doc_freq {
let delta = read_vint(&mut reader).ok()? as u32;
let tf = read_vint(&mut reader).ok()? as u32;
let doc_id = prev_doc_id + delta;
doc_ids.push(doc_id);
term_freqs.push(tf);
prev_doc_id = doc_id;
}
Some((doc_ids, term_freqs))
}
TermInfo::External { .. } => None,
}
}
}
impl SSTableValue for TermInfo {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
match self {
TermInfo::Inline {
doc_freq,
data,
data_len,
} => {
writer.write_u8(0xFF)?;
writer.write_u8(*doc_freq)?;
writer.write_u8(*data_len)?;
writer.write_all(&data[..*data_len as usize])?;
}
TermInfo::External {
posting_offset,
posting_len,
doc_freq,
position_offset,
position_len,
} => {
if *position_len > 0 {
writer.write_u8(0x01)?;
write_vint(writer, *doc_freq as u64)?;
write_vint(writer, *posting_offset)?;
write_vint(writer, *posting_len)?;
write_vint(writer, *position_offset)?;
write_vint(writer, *position_len)?;
} else {
writer.write_u8(0x00)?;
write_vint(writer, *doc_freq as u64)?;
write_vint(writer, *posting_offset)?;
write_vint(writer, *posting_len)?;
}
}
}
Ok(())
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let tag = reader.read_u8()?;
if tag == 0xFF {
let doc_freq = reader.read_u8()?;
let data_len = reader.read_u8()?;
let mut data = [0u8; 16];
reader.read_exact(&mut data[..data_len as usize])?;
Ok(TermInfo::Inline {
doc_freq,
data,
data_len,
})
} else if tag == 0x00 {
let doc_freq = read_vint(reader)? as u32;
let posting_offset = read_vint(reader)?;
let posting_len = read_vint(reader)?;
Ok(TermInfo::External {
posting_offset,
posting_len,
doc_freq,
position_offset: 0,
position_len: 0,
})
} else if tag == 0x01 {
let doc_freq = read_vint(reader)? as u32;
let posting_offset = read_vint(reader)?;
let posting_len = read_vint(reader)?;
let position_offset = read_vint(reader)?;
let position_len = read_vint(reader)?;
Ok(TermInfo::External {
posting_offset,
posting_len,
doc_freq,
position_offset,
position_len,
})
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid TermInfo tag: {}", tag),
))
}
}
}
pub fn write_vint<W: Write + ?Sized>(writer: &mut W, mut value: u64) -> io::Result<()> {
loop {
let byte = (value & 0x7F) as u8;
value >>= 7;
if value == 0 {
writer.write_u8(byte)?;
return Ok(());
} else {
writer.write_u8(byte | 0x80)?;
}
}
}
pub fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
let mut result = 0u64;
let mut shift = 0;
loop {
let byte = reader.read_u8()?;
result |= ((byte & 0x7F) as u64) << shift;
if byte & 0x80 == 0 {
return Ok(result);
}
shift += 7;
if shift >= 64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"varint too long",
));
}
}
}
pub fn common_prefix_len(a: &[u8], b: &[u8]) -> usize {
a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count()
}
#[derive(Debug, Clone)]
pub struct SSTableStats {
pub num_blocks: usize,
pub num_sparse_entries: usize,
pub num_entries: u64,
pub has_bloom_filter: bool,
pub has_dictionary: bool,
pub bloom_filter_size: usize,
pub dictionary_size: usize,
}
#[derive(Debug, Clone)]
pub struct SSTableWriterConfig {
pub compression_level: CompressionLevel,
pub use_dictionary: bool,
pub dict_size: usize,
pub use_bloom_filter: bool,
pub bloom_bits_per_key: usize,
}
impl Default for SSTableWriterConfig {
fn default() -> Self {
Self::from_optimization(crate::structures::IndexOptimization::default())
}
}
impl SSTableWriterConfig {
pub fn from_optimization(optimization: crate::structures::IndexOptimization) -> Self {
use crate::structures::IndexOptimization;
match optimization {
IndexOptimization::Adaptive => Self {
compression_level: CompressionLevel::BETTER, use_dictionary: false,
dict_size: DEFAULT_DICT_SIZE,
use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
},
IndexOptimization::SizeOptimized => Self {
compression_level: CompressionLevel::MAX, use_dictionary: true,
dict_size: DEFAULT_DICT_SIZE,
use_bloom_filter: true,
bloom_bits_per_key: BLOOM_BITS_PER_KEY,
},
IndexOptimization::PerformanceOptimized => Self {
compression_level: CompressionLevel::FAST, use_dictionary: false,
dict_size: DEFAULT_DICT_SIZE,
use_bloom_filter: true, bloom_bits_per_key: BLOOM_BITS_PER_KEY,
},
}
}
pub fn fast() -> Self {
Self::from_optimization(crate::structures::IndexOptimization::PerformanceOptimized)
}
pub fn max_compression() -> Self {
Self::from_optimization(crate::structures::IndexOptimization::SizeOptimized)
}
}
pub struct SSTableWriter<W: Write, V: SSTableValue> {
writer: W,
block_buffer: Vec<u8>,
prev_key: Vec<u8>,
index: Vec<BlockIndexEntry>,
current_offset: u64,
num_entries: u64,
block_first_key: Option<Vec<u8>>,
config: SSTableWriterConfig,
dictionary: Option<CompressionDict>,
bloom_hashes: Vec<(u64, u64)>,
_phantom: std::marker::PhantomData<V>,
}
impl<W: Write, V: SSTableValue> SSTableWriter<W, V> {
pub fn new(writer: W) -> Self {
Self::with_config(writer, SSTableWriterConfig::default())
}
pub fn with_config(writer: W, config: SSTableWriterConfig) -> Self {
Self {
writer,
block_buffer: Vec::with_capacity(BLOCK_SIZE),
prev_key: Vec::new(),
index: Vec::new(),
current_offset: 0,
num_entries: 0,
block_first_key: None,
config,
dictionary: None,
bloom_hashes: Vec::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn with_dictionary(
writer: W,
config: SSTableWriterConfig,
dictionary: CompressionDict,
) -> Self {
Self {
writer,
block_buffer: Vec::with_capacity(BLOCK_SIZE),
prev_key: Vec::new(),
index: Vec::new(),
current_offset: 0,
num_entries: 0,
block_first_key: None,
config,
dictionary: Some(dictionary),
bloom_hashes: Vec::new(),
_phantom: std::marker::PhantomData,
}
}
pub fn insert(&mut self, key: &[u8], value: &V) -> io::Result<()> {
if self.block_first_key.is_none() {
self.block_first_key = Some(key.to_vec());
}
if self.config.use_bloom_filter {
self.bloom_hashes.push(bloom_hash_pair(key));
}
let prefix_len = common_prefix_len(&self.prev_key, key);
let suffix = &key[prefix_len..];
write_vint(&mut self.block_buffer, prefix_len as u64)?;
write_vint(&mut self.block_buffer, suffix.len() as u64)?;
self.block_buffer.extend_from_slice(suffix);
value.serialize(&mut self.block_buffer)?;
self.prev_key.clear();
self.prev_key.extend_from_slice(key);
self.num_entries += 1;
if self.block_buffer.len() >= BLOCK_SIZE {
self.flush_block()?;
}
Ok(())
}
fn flush_block(&mut self) -> io::Result<()> {
if self.block_buffer.is_empty() {
return Ok(());
}
let compressed = if let Some(ref dict) = self.dictionary {
crate::compression::compress_with_dict(
&self.block_buffer,
self.config.compression_level,
dict,
)?
} else {
crate::compression::compress(&self.block_buffer, self.config.compression_level)?
};
if let Some(first_key) = self.block_first_key.take() {
self.index.push(BlockIndexEntry {
first_key,
offset: self.current_offset,
length: compressed.len() as u32,
});
}
self.writer.write_all(&compressed)?;
self.current_offset += compressed.len() as u64;
self.block_buffer.clear();
self.prev_key.clear();
Ok(())
}
pub fn finish(mut self) -> io::Result<W> {
self.flush_block()?;
let bloom_filter = if self.config.use_bloom_filter && !self.bloom_hashes.is_empty() {
let mut bloom =
BloomFilter::new(self.bloom_hashes.len(), self.config.bloom_bits_per_key);
for (h1, h2) in &self.bloom_hashes {
bloom.insert_hashed(*h1, *h2);
}
Some(bloom)
} else {
None
};
let data_end_offset = self.current_offset;
let entries: Vec<(Vec<u8>, BlockAddr)> = self
.index
.iter()
.map(|e| {
(
e.first_key.clone(),
BlockAddr {
offset: e.offset,
length: e.length,
},
)
})
.collect();
#[cfg(feature = "native")]
let index_bytes = FstBlockIndex::build(&entries)?;
#[cfg(not(feature = "native"))]
let index_bytes = MmapBlockIndex::build(&entries)?;
self.writer
.write_u32::<LittleEndian>(index_bytes.len() as u32)?;
self.writer.write_all(&index_bytes)?;
self.current_offset += 4 + index_bytes.len() as u64;
let bloom_offset = if let Some(ref bloom) = bloom_filter {
let bloom_data = bloom.to_bytes();
let offset = self.current_offset;
self.writer.write_all(&bloom_data)?;
self.current_offset += bloom_data.len() as u64;
offset
} else {
0
};
let dict_offset = if let Some(ref dict) = self.dictionary {
let dict_bytes = dict.as_bytes();
let offset = self.current_offset;
self.writer
.write_u32::<LittleEndian>(dict_bytes.len() as u32)?;
self.writer.write_all(dict_bytes)?;
self.current_offset += 4 + dict_bytes.len() as u64;
offset
} else {
0
};
self.writer.write_u64::<LittleEndian>(data_end_offset)?;
self.writer.write_u64::<LittleEndian>(self.num_entries)?;
self.writer.write_u64::<LittleEndian>(bloom_offset)?; self.writer.write_u64::<LittleEndian>(dict_offset)?; self.writer
.write_u8(self.config.compression_level.0 as u8)?;
self.writer.write_u32::<LittleEndian>(SSTABLE_MAGIC)?;
Ok(self.writer)
}
}
#[derive(Debug, Clone)]
struct BlockIndexEntry {
first_key: Vec<u8>,
offset: u64,
length: u32,
}
pub struct AsyncSSTableReader<V: SSTableValue> {
data_slice: FileHandle,
block_index: BlockIndex,
num_entries: u64,
cache: RwLock<BlockCache>,
bloom_filter: Option<BloomFilter>,
dictionary: Option<CompressionDict>,
#[allow(dead_code)]
compression_level: CompressionLevel,
_phantom: std::marker::PhantomData<V>,
}
struct BlockCache {
blocks: FxHashMap<u64, Arc<[u8]>>,
lru_order: std::collections::VecDeque<u64>,
max_blocks: usize,
}
impl BlockCache {
fn new(max_blocks: usize) -> Self {
Self {
blocks: FxHashMap::default(),
lru_order: std::collections::VecDeque::with_capacity(max_blocks),
max_blocks,
}
}
fn get(&mut self, offset: u64) -> Option<Arc<[u8]>> {
if self.blocks.contains_key(&offset) {
self.promote(offset);
self.blocks.get(&offset).map(Arc::clone)
} else {
None
}
}
fn peek(&self, offset: u64) -> Option<Arc<[u8]>> {
self.blocks.get(&offset).map(Arc::clone)
}
fn insert(&mut self, offset: u64, block: Arc<[u8]>) {
if self.blocks.contains_key(&offset) {
self.promote(offset);
return;
}
while self.blocks.len() >= self.max_blocks {
if let Some(evict_offset) = self.lru_order.pop_front() {
self.blocks.remove(&evict_offset);
} else {
break;
}
}
self.blocks.insert(offset, block);
self.lru_order.push_back(offset);
}
fn promote(&mut self, offset: u64) {
if let Some(pos) = self.lru_order.iter().position(|&k| k == offset) {
self.lru_order.remove(pos);
self.lru_order.push_back(offset);
}
}
}
impl<V: SSTableValue> AsyncSSTableReader<V> {
pub async fn open(file_handle: FileHandle, cache_blocks: usize) -> io::Result<Self> {
let file_len = file_handle.len();
if file_len < 37 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"SSTable too small",
));
}
let footer_bytes = file_handle
.read_bytes_range(file_len - 37..file_len)
.await?;
let mut reader = footer_bytes.as_slice();
let data_end_offset = reader.read_u64::<LittleEndian>()?;
let num_entries = reader.read_u64::<LittleEndian>()?;
let bloom_offset = reader.read_u64::<LittleEndian>()?;
let dict_offset = reader.read_u64::<LittleEndian>()?;
let compression_level = CompressionLevel(reader.read_u8()? as i32);
let magic = reader.read_u32::<LittleEndian>()?;
if magic != SSTABLE_MAGIC {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Invalid SSTable magic: 0x{:08X}", magic),
));
}
let index_start = data_end_offset;
let index_end = file_len - 37;
let index_bytes = file_handle.read_bytes_range(index_start..index_end).await?;
let mut idx_reader = index_bytes.as_slice();
let index_len = idx_reader.read_u32::<LittleEndian>()? as usize;
if index_len > idx_reader.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Index data truncated",
));
}
let index_data = index_bytes.slice(4..4 + index_len);
#[cfg(feature = "fst-index")]
let block_index = match FstBlockIndex::load(index_data.clone()) {
Ok(fst_idx) => BlockIndex::Fst(fst_idx),
Err(_) => BlockIndex::Mmap(MmapBlockIndex::load(index_data)?),
};
#[cfg(not(feature = "fst-index"))]
let block_index = BlockIndex::Mmap(MmapBlockIndex::load(index_data)?);
let bloom_filter = if bloom_offset > 0 {
let bloom_start = bloom_offset;
let bloom_header = file_handle
.read_bytes_range(bloom_start..bloom_start + 12)
.await?;
let num_words = u32::from_le_bytes([
bloom_header[8],
bloom_header[9],
bloom_header[10],
bloom_header[11],
]) as u64;
let bloom_size = 12 + num_words * 8;
let bloom_data = file_handle
.read_bytes_range(bloom_start..bloom_start + bloom_size)
.await?;
Some(BloomFilter::from_owned_bytes(bloom_data)?)
} else {
None
};
let dictionary = if dict_offset > 0 {
let dict_start = dict_offset;
let dict_len_bytes = file_handle
.read_bytes_range(dict_start..dict_start + 4)
.await?;
let dict_len = u32::from_le_bytes([
dict_len_bytes[0],
dict_len_bytes[1],
dict_len_bytes[2],
dict_len_bytes[3],
]) as u64;
let dict_data = file_handle
.read_bytes_range(dict_start + 4..dict_start + 4 + dict_len)
.await?;
Some(CompressionDict::from_owned_bytes(dict_data))
} else {
None
};
let data_slice = file_handle.slice(0..data_end_offset);
Ok(Self {
data_slice,
block_index,
num_entries,
cache: RwLock::new(BlockCache::new(cache_blocks)),
bloom_filter,
dictionary,
compression_level,
_phantom: std::marker::PhantomData,
})
}
pub fn num_entries(&self) -> u64 {
self.num_entries
}
pub fn stats(&self) -> SSTableStats {
SSTableStats {
num_blocks: self.block_index.len(),
num_sparse_entries: 0, num_entries: self.num_entries,
has_bloom_filter: self.bloom_filter.is_some(),
has_dictionary: self.dictionary.is_some(),
bloom_filter_size: self
.bloom_filter
.as_ref()
.map(|b| b.size_bytes())
.unwrap_or(0),
dictionary_size: self.dictionary.as_ref().map(|d| d.len()).unwrap_or(0),
}
}
pub fn cached_blocks(&self) -> usize {
self.cache.read().blocks.len()
}
pub async fn get(&self, key: &[u8]) -> io::Result<Option<V>> {
log::debug!(
"SSTable::get called, key_len={}, total_blocks={}",
key.len(),
self.block_index.len()
);
if let Some(ref bloom) = self.bloom_filter
&& !bloom.may_contain(key)
{
log::debug!("SSTable::get bloom filter negative");
return Ok(None);
}
let block_idx = match self.block_index.locate(key) {
Some(idx) => idx,
None => {
log::debug!("SSTable::get key not found (before first block)");
return Ok(None);
}
};
log::debug!("SSTable::get loading block_idx={}", block_idx);
let block_data = self.load_block(block_idx).await?;
self.search_block(&block_data, key)
}
pub async fn get_batch(&self, keys: &[&[u8]]) -> io::Result<Vec<Option<V>>> {
if keys.is_empty() {
return Ok(Vec::new());
}
let mut key_to_block: Vec<(usize, usize)> = Vec::with_capacity(keys.len());
for (key_idx, key) in keys.iter().enumerate() {
if let Some(ref bloom) = self.bloom_filter
&& !bloom.may_contain(key)
{
key_to_block.push((key_idx, usize::MAX)); continue;
}
match self.block_index.locate(key) {
Some(block_idx) => key_to_block.push((key_idx, block_idx)),
None => key_to_block.push((key_idx, usize::MAX)), }
}
let mut blocks_to_load: Vec<usize> = key_to_block
.iter()
.filter(|(_, b)| *b != usize::MAX)
.map(|(_, b)| *b)
.collect();
blocks_to_load.sort_unstable();
blocks_to_load.dedup();
for &block_idx in &blocks_to_load {
let _ = self.load_block(block_idx).await?;
}
let mut results = vec![None; keys.len()];
for (key_idx, block_idx) in key_to_block {
if block_idx == usize::MAX {
continue;
}
let block_data = self.load_block(block_idx).await?; results[key_idx] = self.search_block(&block_data, keys[key_idx])?;
}
Ok(results)
}
pub async fn preload_all_blocks(&self) -> io::Result<()> {
for block_idx in 0..self.block_index.len() {
self.load_block(block_idx).await?;
}
Ok(())
}
pub async fn prefetch_all_data_bulk(&self) -> io::Result<()> {
let num_blocks = self.block_index.len();
if num_blocks == 0 {
return Ok(());
}
let mut max_end: u64 = 0;
for i in 0..num_blocks {
if let Some(addr) = self.block_index.get_addr(i) {
max_end = max_end.max(addr.offset + addr.length as u64);
}
}
let all_data = self.data_slice.read_bytes_range(0..max_end).await?;
let buf = all_data.as_slice();
let mut cache = self.cache.write();
cache.max_blocks = cache.max_blocks.max(num_blocks);
for i in 0..num_blocks {
let addr = self.block_index.get_addr(i).unwrap();
if cache.get(addr.offset).is_some() {
continue;
}
let compressed =
&buf[addr.offset as usize..(addr.offset + addr.length as u64) as usize];
let decompressed = if let Some(ref dict) = self.dictionary {
crate::compression::decompress_with_dict(compressed, dict)?
} else {
crate::compression::decompress(compressed)?
};
cache.insert(addr.offset, Arc::from(decompressed));
}
Ok(())
}
async fn load_block(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
})?;
{
if let Some(block) = self.cache.read().peek(addr.offset) {
return Ok(block);
}
}
log::debug!(
"SSTable::load_block idx={} CACHE MISS, reading bytes [{}-{}]",
block_idx,
addr.offset,
addr.offset + addr.length as u64
);
let range = addr.byte_range();
let compressed = self.data_slice.read_bytes_range(range).await?;
let decompressed = if let Some(ref dict) = self.dictionary {
crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
} else {
crate::compression::decompress(compressed.as_slice())?
};
let block: Arc<[u8]> = Arc::from(decompressed);
{
let mut cache = self.cache.write();
cache.insert(addr.offset, Arc::clone(&block));
}
Ok(block)
}
#[cfg(feature = "sync")]
fn load_block_sync(&self, block_idx: usize) -> io::Result<Arc<[u8]>> {
let addr = self.block_index.get_addr(block_idx).ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "Block index out of range")
})?;
{
if let Some(block) = self.cache.read().peek(addr.offset) {
return Ok(block);
}
}
let range = addr.byte_range();
let compressed = self.data_slice.read_bytes_range_sync(range)?;
let decompressed = if let Some(ref dict) = self.dictionary {
crate::compression::decompress_with_dict(compressed.as_slice(), dict)?
} else {
crate::compression::decompress(compressed.as_slice())?
};
let block: Arc<[u8]> = Arc::from(decompressed);
{
let mut cache = self.cache.write();
cache.insert(addr.offset, Arc::clone(&block));
}
Ok(block)
}
#[cfg(feature = "sync")]
pub fn get_sync(&self, key: &[u8]) -> io::Result<Option<V>> {
if let Some(ref bloom) = self.bloom_filter
&& !bloom.may_contain(key)
{
return Ok(None);
}
let block_idx = match self.block_index.locate(key) {
Some(idx) => idx,
None => {
return Ok(None);
}
};
let block_data = self.load_block_sync(block_idx)?;
self.search_block(&block_data, key)
}
fn search_block(&self, block_data: &[u8], target_key: &[u8]) -> io::Result<Option<V>> {
let mut reader = block_data;
let mut current_key = Vec::new();
while !reader.is_empty() {
let common_prefix_len = read_vint(&mut reader)? as usize;
let suffix_len = read_vint(&mut reader)? as usize;
if suffix_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"SSTable block suffix truncated",
));
}
current_key.truncate(common_prefix_len);
current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
let value = V::deserialize(&mut reader)?;
match current_key.as_slice().cmp(target_key) {
std::cmp::Ordering::Equal => return Ok(Some(value)),
std::cmp::Ordering::Greater => return Ok(None),
std::cmp::Ordering::Less => continue,
}
}
Ok(None)
}
pub async fn prefetch_range(&self, start_key: &[u8], end_key: &[u8]) -> io::Result<()> {
let start_block = self.block_index.locate(start_key).unwrap_or(0);
let end_block = self
.block_index
.locate(end_key)
.unwrap_or(self.block_index.len().saturating_sub(1));
for block_idx in start_block..=end_block.min(self.block_index.len().saturating_sub(1)) {
let _ = self.load_block(block_idx).await?;
}
Ok(())
}
pub fn iter(&self) -> AsyncSSTableIterator<'_, V> {
AsyncSSTableIterator::new(self)
}
pub async fn all_entries(&self) -> io::Result<Vec<(Vec<u8>, V)>> {
let mut results = Vec::new();
for block_idx in 0..self.block_index.len() {
let block_data = self.load_block(block_idx).await?;
let mut reader = &block_data[..];
let mut current_key = Vec::new();
while !reader.is_empty() {
let common_prefix_len = read_vint(&mut reader)? as usize;
let suffix_len = read_vint(&mut reader)? as usize;
if suffix_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"SSTable block suffix truncated",
));
}
current_key.truncate(common_prefix_len);
current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
let value = V::deserialize(&mut reader)?;
results.push((current_key.clone(), value));
}
}
Ok(results)
}
pub async fn prefix_scan(&self, prefix: &[u8]) -> io::Result<Vec<(Vec<u8>, V)>> {
if self.block_index.is_empty() || prefix.is_empty() {
return Ok(Vec::new());
}
let start_block = match self.block_index.locate(prefix) {
Some(idx) => idx,
None => return Ok(Vec::new()),
};
let mut results = Vec::new();
for block_idx in start_block..self.block_index.len() {
let block_data = self.load_block(block_idx).await?;
let mut reader = &block_data[..];
let mut current_key = Vec::new();
while !reader.is_empty() {
let common_prefix_len = read_vint(&mut reader)? as usize;
let suffix_len = read_vint(&mut reader)? as usize;
if suffix_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"SSTable block suffix truncated",
));
}
current_key.truncate(common_prefix_len);
current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
let value = V::deserialize(&mut reader)?;
if current_key.starts_with(prefix) {
results.push((current_key.clone(), value));
} else if current_key.as_slice() > prefix {
return Ok(results);
}
}
}
Ok(results)
}
#[cfg(feature = "sync")]
pub fn prefix_scan_sync(&self, prefix: &[u8]) -> io::Result<Vec<(Vec<u8>, V)>> {
if self.block_index.is_empty() || prefix.is_empty() {
return Ok(Vec::new());
}
let start_block = match self.block_index.locate(prefix) {
Some(idx) => idx,
None => return Ok(Vec::new()),
};
let mut results = Vec::new();
for block_idx in start_block..self.block_index.len() {
let block_data = self.load_block_sync(block_idx)?;
let mut reader = &block_data[..];
let mut current_key = Vec::new();
while !reader.is_empty() {
let common_prefix_len = read_vint(&mut reader)? as usize;
let suffix_len = read_vint(&mut reader)? as usize;
if suffix_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"SSTable block suffix truncated",
));
}
current_key.truncate(common_prefix_len);
current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
let value = V::deserialize(&mut reader)?;
if current_key.starts_with(prefix) {
results.push((current_key.clone(), value));
} else if current_key.as_slice() > prefix {
return Ok(results);
}
}
}
Ok(results)
}
}
pub struct AsyncSSTableIterator<'a, V: SSTableValue> {
reader: &'a AsyncSSTableReader<V>,
current_block: usize,
block_data: Option<Arc<[u8]>>,
block_offset: usize,
current_key: Vec<u8>,
finished: bool,
}
impl<'a, V: SSTableValue> AsyncSSTableIterator<'a, V> {
fn new(reader: &'a AsyncSSTableReader<V>) -> Self {
Self {
reader,
current_block: 0,
block_data: None,
block_offset: 0,
current_key: Vec::new(),
finished: reader.block_index.is_empty(),
}
}
async fn load_next_block(&mut self) -> io::Result<bool> {
if self.current_block >= self.reader.block_index.len() {
self.finished = true;
return Ok(false);
}
self.block_data = Some(self.reader.load_block(self.current_block).await?);
self.block_offset = 0;
self.current_key.clear();
self.current_block += 1;
Ok(true)
}
pub async fn next(&mut self) -> io::Result<Option<(Vec<u8>, V)>> {
if self.finished {
return Ok(None);
}
if self.block_data.is_none() && !self.load_next_block().await? {
return Ok(None);
}
loop {
let block = self.block_data.as_ref().unwrap();
if self.block_offset >= block.len() {
if !self.load_next_block().await? {
return Ok(None);
}
continue;
}
let mut reader = &block[self.block_offset..];
let start_len = reader.len();
let common_prefix_len = read_vint(&mut reader)? as usize;
let suffix_len = read_vint(&mut reader)? as usize;
if suffix_len > reader.len() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"SSTable block suffix truncated",
));
}
self.current_key.truncate(common_prefix_len);
self.current_key.extend_from_slice(&reader[..suffix_len]);
reader = &reader[suffix_len..];
let value = V::deserialize(&mut reader)?;
self.block_offset += start_len - reader.len();
return Ok(Some((self.current_key.clone(), value)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bloom_filter_basic() {
let mut bloom = BloomFilter::new(100, 10);
bloom.insert(b"hello");
bloom.insert(b"world");
bloom.insert(b"test");
assert!(bloom.may_contain(b"hello"));
assert!(bloom.may_contain(b"world"));
assert!(bloom.may_contain(b"test"));
assert!(!bloom.may_contain(b"notfound"));
assert!(!bloom.may_contain(b"missing"));
}
#[test]
fn test_bloom_filter_serialization() {
let mut bloom = BloomFilter::new(100, 10);
bloom.insert(b"key1");
bloom.insert(b"key2");
let bytes = bloom.to_bytes();
let restored = BloomFilter::from_owned_bytes(OwnedBytes::new(bytes)).unwrap();
assert!(restored.may_contain(b"key1"));
assert!(restored.may_contain(b"key2"));
assert!(!restored.may_contain(b"key3"));
}
#[test]
fn test_bloom_filter_false_positive_rate() {
let num_keys = 10000;
let mut bloom = BloomFilter::new(num_keys, BLOOM_BITS_PER_KEY);
for i in 0..num_keys {
let key = format!("key_{}", i);
bloom.insert(key.as_bytes());
}
for i in 0..num_keys {
let key = format!("key_{}", i);
assert!(bloom.may_contain(key.as_bytes()));
}
let mut false_positives = 0;
let test_count = 10000;
for i in 0..test_count {
let key = format!("nonexistent_{}", i);
if bloom.may_contain(key.as_bytes()) {
false_positives += 1;
}
}
let fp_rate = false_positives as f64 / test_count as f64;
assert!(
fp_rate < 0.03,
"False positive rate {} is too high",
fp_rate
);
}
#[test]
fn test_sstable_writer_config() {
use crate::structures::IndexOptimization;
let config = SSTableWriterConfig::default();
assert_eq!(config.compression_level.0, 9); assert!(config.use_bloom_filter); assert!(!config.use_dictionary);
let adaptive = SSTableWriterConfig::from_optimization(IndexOptimization::Adaptive);
assert_eq!(adaptive.compression_level.0, 9);
assert!(adaptive.use_bloom_filter);
assert!(!adaptive.use_dictionary);
let size = SSTableWriterConfig::from_optimization(IndexOptimization::SizeOptimized);
assert_eq!(size.compression_level.0, 22); assert!(size.use_bloom_filter);
assert!(size.use_dictionary);
let perf = SSTableWriterConfig::from_optimization(IndexOptimization::PerformanceOptimized);
assert_eq!(perf.compression_level.0, 1); assert!(perf.use_bloom_filter); assert!(!perf.use_dictionary);
let fast = SSTableWriterConfig::fast();
assert_eq!(fast.compression_level.0, 1);
let max = SSTableWriterConfig::max_compression();
assert_eq!(max.compression_level.0, 22);
}
#[test]
fn test_vint_roundtrip() {
let test_values = [0u64, 1, 127, 128, 255, 256, 16383, 16384, u64::MAX];
for &val in &test_values {
let mut buf = Vec::new();
write_vint(&mut buf, val).unwrap();
let mut reader = buf.as_slice();
let decoded = read_vint(&mut reader).unwrap();
assert_eq!(val, decoded, "Failed for value {}", val);
}
}
#[test]
fn test_common_prefix_len() {
assert_eq!(common_prefix_len(b"hello", b"hello"), 5);
assert_eq!(common_prefix_len(b"hello", b"help"), 3);
assert_eq!(common_prefix_len(b"hello", b"world"), 0);
assert_eq!(common_prefix_len(b"", b"hello"), 0);
assert_eq!(common_prefix_len(b"hello", b""), 0);
}
}