use std::cmp::Ordering;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use std::sync::{Arc, OnceLock};
use parking_lot::{Mutex, MutexGuard};
use crate::cache::block_cache::BlockCache;
use crate::error::{Error, Result};
use crate::sst::block::{Block, decode_entry_reuse};
use crate::sst::filter::BloomFilter;
use crate::sst::format::{
BLOCK_TRAILER_SIZE, BlockHandle, CompressionType, FOOTER_SIZE, PREFIX_FILTER_LEN_NAME,
RANGE_DEL_BLOCK_NAME, decode_footer, decode_index_value_with_props,
};
use crate::stats::DbStats;
use crate::types::{
InternalKeyRef, LazyValue, SequenceNumber, ValueType, compare_internal_key, user_key,
};
use ruc::*;
type RangeTombstoneEntry = (Vec<u8>, Vec<u8>, SequenceNumber);
pub(crate) const MAX_DECOMPRESSED_BLOCK_SIZE: usize = 64 * 1024 * 1024;
pub struct IndexEntry {
pub separator_key: Vec<u8>,
pub handle: BlockHandle,
pub first_key: Option<Vec<u8>>,
pub properties: Vec<(Vec<u8>, Vec<u8>)>,
}
type IndexEntries = Arc<Vec<IndexEntry>>;
struct MetaIndexData {
bloom: Option<Vec<u8>>,
prefix: Option<Vec<u8>>,
prefix_len: Option<usize>,
range_del_handle: Option<BlockHandle>,
}
pub struct TableReader {
path: PathBuf,
file_size: u64,
file_number: u64,
index_block: Block,
filter_data: Option<Vec<u8>>,
prefix_filter_data: Option<Vec<u8>>,
prefix_filter_len: Option<usize>,
file: Mutex<File>,
block_cache: Option<Arc<BlockCache>>,
stats: Option<Arc<DbStats>>,
index_entry_cache: OnceLock<IndexEntries>,
range_tombstone_cache: OnceLock<Arc<crate::iterator::range_del::FragmentedRangeTombstoneList>>,
range_del_handle: Option<BlockHandle>,
}
impl TableReader {
pub fn open(path: &Path) -> Result<Self> {
Self::open_with_all(path, 0, None, None)
}
pub fn open_with_number(path: &Path, file_number: u64) -> Result<Self> {
Self::open_with_all(path, file_number, None, None)
}
pub fn open_full(
path: &Path,
file_number: u64,
block_cache: Option<Arc<BlockCache>>,
) -> Result<Self> {
Self::open_with_all(path, file_number, block_cache, None)
}
pub fn open_with_all(
path: &Path,
file_number: u64,
block_cache: Option<Arc<BlockCache>>,
stats: Option<Arc<DbStats>>,
) -> Result<Self> {
let mut file = File::open(path).c(d!())?;
let file_size = file.metadata().c(d!())?.len();
if file_size < FOOTER_SIZE as u64 {
return Err(eg!(Error::Corruption(format!(
"SST file too small: {} bytes",
file_size
))));
}
file.seek(SeekFrom::End(-(FOOTER_SIZE as i64))).c(d!())?;
let mut footer_buf = [0u8; FOOTER_SIZE];
file.read_exact(&mut footer_buf).c(d!())?;
let (metaindex_handle, index_handle) = decode_footer(&footer_buf).c(d!())?;
let index_data =
Self::read_block_data_with_size(&mut file, &index_handle, file_size).c(d!())?;
let index_block = Block::from_vec(index_data).c(d!())?;
let meta = Self::read_metaindex(&mut file, &metaindex_handle, file_size).c(d!())?;
Ok(Self {
path: path.to_path_buf(),
file_size,
file_number,
index_block,
filter_data: meta.bloom,
prefix_filter_data: meta.prefix,
prefix_filter_len: meta.prefix_len,
file: Mutex::new(file),
block_cache,
stats,
index_entry_cache: OnceLock::new(),
range_tombstone_cache: OnceLock::new(),
range_del_handle: meta.range_del_handle,
})
}
pub fn cached_index_entries(&self) -> Result<IndexEntries> {
if let Some(cached) = self.index_entry_cache.get() {
return Ok(cached.clone());
}
let entries = Arc::new(Self::parse_index_entries(&self.index_block)?);
let _ = self.index_entry_cache.set(entries.clone());
Ok(entries)
}
fn parse_index_entries(index_block: &Block) -> Result<Vec<IndexEntry>> {
index_block
.iter()
.map(|(k, v)| {
let d = decode_index_value_with_props(&v).c(d!())?;
Ok(IndexEntry {
separator_key: k,
handle: d.handle,
first_key: d.first_key.map(|fk| fk.to_vec()),
properties: d
.properties
.into_iter()
.map(|(n, p)| (n.to_vec(), p.to_vec()))
.collect(),
})
})
.collect()
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
if let Some(ref filter) = self.filter_data
&& !BloomFilter::key_may_match(key, filter)
{
return Ok(None);
}
let block_handle = match self.find_data_block(key).c(d!())? {
Some(h) => h,
None => return Ok(None),
};
let block_data = self.read_block_cached(&block_handle).c(d!())?;
let block = Block::new(block_data).c(d!())?;
match block.seek(key) {
Some((found_key, value)) if found_key.as_slice() == key => Ok(Some(value)),
_ => Ok(None),
}
}
pub fn get_internal(
&self,
user_key: &[u8],
sequence: SequenceNumber,
) -> Result<Option<Option<Vec<u8>>>> {
use crate::types::InternalKey;
if let Some(ref filter) = self.filter_data
&& !BloomFilter::key_may_match(user_key, filter)
{
return Ok(None);
}
let seek_key = InternalKey::new(user_key, sequence, ValueType::Value);
let handle = match self
.index_block
.seek_by(seek_key.as_bytes(), compare_internal_key)
{
Some((_idx_key, handle_bytes)) => BlockHandle::decode(&handle_bytes).c(d!())?,
None => return Ok(None),
};
let block_data = self.read_block_cached(&handle).c(d!())?;
let block = Block::new(block_data).c(d!())?;
match block.seek_by(seek_key.as_bytes(), compare_internal_key) {
Some((encoded_ikey, value)) if encoded_ikey.len() >= 8 => {
let ik = InternalKeyRef::new(&encoded_ikey);
if ik.user_key() == user_key {
return Ok(Some(match ik.value_type() {
ValueType::Value => Some(value),
ValueType::Deletion | ValueType::RangeDeletion => None,
}));
}
Ok(None)
}
_ => Ok(None),
}
}
pub fn get_internal_with_seq(
&self,
user_key: &[u8],
sequence: SequenceNumber,
) -> Result<Option<(Option<Vec<u8>>, SequenceNumber)>> {
use crate::types::InternalKey;
if let Some(ref filter) = self.filter_data
&& !BloomFilter::key_may_match(user_key, filter)
{
return Ok(None);
}
let seek_key = InternalKey::new(user_key, sequence, ValueType::Value);
let handle = match self
.index_block
.seek_by(seek_key.as_bytes(), compare_internal_key)
{
Some((_idx_key, handle_bytes)) => BlockHandle::decode(&handle_bytes).c(d!())?,
None => return Ok(None),
};
let block_data = self.read_block_cached(&handle).c(d!())?;
let block = Block::new(block_data).c(d!())?;
match block.seek_by(seek_key.as_bytes(), compare_internal_key) {
Some((encoded_ikey, value)) if encoded_ikey.len() >= 8 => {
let ik = InternalKeyRef::new(&encoded_ikey);
if ik.user_key() == user_key {
let entry_seq = ik.sequence();
return Ok(Some((
match ik.value_type() {
ValueType::Value => Some(value),
ValueType::Deletion | ValueType::RangeDeletion => None,
},
entry_seq,
)));
}
Ok(None)
}
_ => Ok(None),
}
}
pub fn max_covering_tombstone_seq(
&self,
user_key: &[u8],
read_seq: SequenceNumber,
) -> Result<SequenceNumber> {
let tombstones = self.cached_range_tombstones().c(d!())?;
Ok(tombstones.max_covering_tombstone_seq(user_key, read_seq))
}
pub fn get_range_tombstones(&self) -> Result<Vec<RangeTombstoneEntry>> {
let cached = self.cached_range_tombstones().c(d!())?;
Ok(cached.tombstones())
}
fn cached_range_tombstones(
&self,
) -> Result<Arc<crate::iterator::range_del::FragmentedRangeTombstoneList>> {
if let Some(cached) = self.range_tombstone_cache.get() {
return Ok(cached.clone());
}
let mut triples = Vec::new();
if let Some(ref handle) = self.range_del_handle {
let block_data = self.read_block_cached(handle).c(d!())?;
let block = Block::new(block_data).c(d!())?;
for (k, v) in block.iter() {
if k.len() < 8 {
continue;
}
let ikr = InternalKeyRef::new(&k);
if ikr.value_type() == ValueType::RangeDeletion {
triples.push((ikr.user_key().to_vec(), v, ikr.sequence()));
}
}
} else {
for (_, handle_bytes) in self.index_block.iter() {
let handle = BlockHandle::decode(&handle_bytes).c(d!())?;
let block_data = self.read_block_cached(&handle).c(d!())?;
let block = Block::new(block_data).c(d!())?;
for (k, v) in block.iter() {
if k.len() < 8 {
continue;
}
let ikr = InternalKeyRef::new(&k);
if ikr.value_type() != ValueType::RangeDeletion {
continue;
}
triples.push((ikr.user_key().to_vec(), v, ikr.sequence()));
}
}
}
let cached =
Arc::new(crate::iterator::range_del::FragmentedRangeTombstoneList::new(triples));
let _ = self.range_tombstone_cache.set(cached.clone());
Ok(cached)
}
pub fn iter(&self) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
let mut result = Vec::new();
for (_, handle_bytes) in self.index_block.iter() {
let handle = BlockHandle::decode(&handle_bytes).c(d!())?;
let block_data = self.read_block_cached(&handle).c(d!())?;
let block = Block::new(block_data).c(d!())?;
for entry in block.iter() {
result.push(entry);
}
}
Ok(result)
}
fn find_data_block(&self, key: &[u8]) -> Result<Option<BlockHandle>> {
match self.index_block.seek(key) {
Some((_idx_key, handle_bytes)) => {
let handle = BlockHandle::decode(&handle_bytes).c(d!())?;
Ok(Some(handle))
}
None => Ok(None),
}
}
fn read_block_data(file: &mut File, handle: &BlockHandle) -> Result<Vec<u8>> {
let file_size = file.metadata().c(d!())?.len();
Self::read_block_data_with_size(file, handle, file_size)
}
fn read_block_data_with_size(
file: &mut File,
handle: &BlockHandle,
file_size: u64,
) -> Result<Vec<u8>> {
const MAX_COMPRESSED_BLOCK_SIZE: u64 = 64 * 1024 * 1024;
let end = handle
.offset
.checked_add(handle.size)
.and_then(|n| n.checked_add(BLOCK_TRAILER_SIZE as u64))
.ok_or_else(|| Error::Corruption("block handle range overflow".to_string()))
.c(d!())?;
if end > file_size {
return Err(eg!(Error::Corruption(format!(
"block handle out of bounds: offset={}, size={}, file_size={}",
handle.offset, handle.size, file_size
))));
}
if handle.size > MAX_COMPRESSED_BLOCK_SIZE {
return Err(eg!(Error::Corruption(format!(
"compressed block size {} exceeds limit {}",
handle.size, MAX_COMPRESSED_BLOCK_SIZE
))));
}
file.seek(SeekFrom::Start(handle.offset)).c(d!())?;
let mut data = vec![0u8; handle.size as usize];
file.read_exact(&mut data).c(d!())?;
let mut trailer = [0u8; BLOCK_TRAILER_SIZE];
file.read_exact(&mut trailer).c(d!())?;
let compression_type = CompressionType::from_u8(trailer[0])
.ok_or_else(|| Error::Corruption("unknown compression type".to_string()))
.c(d!())?;
let stored_crc = u32::from_le_bytes(trailer[1..5].try_into().unwrap());
let mut hasher = crc32fast::Hasher::new();
hasher.update(&data);
hasher.update(&[trailer[0]]);
let computed_crc = hasher.finalize();
if stored_crc != computed_crc {
return Err(eg!(Error::Corruption(format!(
"block CRC mismatch: stored {:#x}, computed {:#x}",
stored_crc, computed_crc
))));
}
let data = match compression_type {
CompressionType::Lz4 => {
if data.len() < 4 {
return Err(eg!(Error::Corruption(
"LZ4 block too small for size header".to_string()
)));
}
let uncompressed_size =
u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
if uncompressed_size > MAX_DECOMPRESSED_BLOCK_SIZE {
return Err(eg!(Error::Corruption(format!(
"LZ4 decompressed size {} exceeds limit {}",
uncompressed_size, MAX_DECOMPRESSED_BLOCK_SIZE
))));
}
lz4_flex::decompress_size_prepended(&data)
.map_err(|e| Error::Corruption(format!("LZ4 decompression error: {}", e)))
.c(d!())?
}
CompressionType::Zstd => {
zstd::bulk::decompress(data.as_slice(), MAX_DECOMPRESSED_BLOCK_SIZE)
.map_err(|e| Error::Corruption(format!("Zstd decompression error: {}", e)))
.c(d!())?
}
CompressionType::None => data,
};
Ok(data)
}
fn read_metaindex(
file: &mut File,
metaindex_handle: &BlockHandle,
file_size: u64,
) -> Result<MetaIndexData> {
if metaindex_handle.size == 0 {
return Ok(MetaIndexData {
bloom: None,
prefix: None,
prefix_len: None,
range_del_handle: None,
});
}
let metaindex_data =
Self::read_block_data_with_size(file, metaindex_handle, file_size).c(d!())?;
let metaindex = Block::from_vec(metaindex_data).c(d!())?;
let mut bloom = None;
let mut prefix = None;
let mut prefix_len = None;
let mut range_del_handle = None;
for (key, value) in metaindex.iter() {
if key == b"filter.bloom" {
let handle = BlockHandle::decode(&value).c(d!())?;
bloom = Some(Self::read_block_data_with_size(file, &handle, file_size).c(d!())?);
} else if key == b"filter.prefix" {
let handle = BlockHandle::decode(&value).c(d!())?;
prefix = Some(Self::read_block_data_with_size(file, &handle, file_size).c(d!())?);
} else if key == PREFIX_FILTER_LEN_NAME.as_bytes() {
if value.len() != 8 {
return Err(eg!(Error::Corruption(
"bad prefix filter length metadata".to_string()
)));
}
let len = u64::from_le_bytes(value.as_slice().try_into().unwrap());
prefix_len = Some(usize::try_from(len).map_err(|_| {
eg!(Error::Corruption(
"prefix filter length overflows usize".to_string()
))
})?);
} else if key == RANGE_DEL_BLOCK_NAME.as_bytes() {
range_del_handle = Some(BlockHandle::decode(&value).c(d!())?);
}
}
Ok(MetaIndexData {
bloom,
prefix,
prefix_len,
range_del_handle,
})
}
pub fn prefix_may_match(&self, prefix: &[u8]) -> bool {
match (self.prefix_filter_data.as_ref(), self.prefix_filter_len) {
(Some(filter), Some(prefix_len)) if prefix.len() >= prefix_len => {
BloomFilter::key_may_match(&prefix[..prefix_len], filter)
}
_ => true, }
}
pub fn file_size(&self) -> u64 {
self.file_size
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn file_number(&self) -> u64 {
self.file_number
}
fn read_block_cached(&self, handle: &BlockHandle) -> Result<Arc<Vec<u8>>> {
if let Some(ref cache) = self.block_cache
&& let Some(cached) = cache.get(self.file_number, handle.offset)
{
if let Some(ref s) = self.stats {
s.record_cache_hit();
}
return Ok(cached);
}
if let Some(ref s) = self.stats
&& self.block_cache.is_some()
{
s.record_cache_miss();
}
let mut file = self.open_file().c(d!())?;
let data = Self::read_block_data(&mut file, handle).c(d!())?;
if let Some(ref cache) = self.block_cache {
return Ok(cache.insert(self.file_number, handle.offset, data));
}
Ok(Arc::new(data))
}
pub fn pin_metadata_in_cache(&self) {
let entries = match self.cached_index_entries() {
Ok(e) => e,
Err(e) => {
tracing::warn!("pin_metadata_in_cache: index decode error: {}", e);
return;
}
};
if let Some(entry) = entries.first()
&& let Some(ref cache) = self.block_cache
&& cache.get(self.file_number, entry.handle.offset).is_none()
&& let Ok(mut file) = self.open_file()
&& let Ok(data) = Self::read_block_data(&mut file, &entry.handle)
{
cache.insert_pinned(self.file_number, entry.handle.offset, data);
}
}
fn advise_willneed(&self, offset: u64, len: u64) {
#[cfg(target_os = "linux")]
{
use std::os::unix::io::AsRawFd;
if let Ok(file) = self.open_file() {
unsafe {
libc::posix_fadvise(
file.as_raw_fd(),
offset as i64,
len as i64,
libc::POSIX_FADV_WILLNEED,
);
}
}
}
#[cfg(not(target_os = "linux"))]
{
let _ = (offset, len);
}
}
fn open_file(&self) -> Result<MutexGuard<'_, File>> {
Ok(self.file.lock())
}
}
pub struct TableIterator {
reader: Arc<TableReader>,
index_entries: Option<IndexEntries>,
index_pos: usize,
current_block: Option<Block>,
block_cursor_offset: usize,
block_data_end: usize,
block_cursor_key: Vec<u8>,
sequential_reads: u32,
prev_block_index: usize,
at_first_key_from_index: bool,
deferred_index_pos: usize,
current_block_entries: Vec<(Vec<u8>, Vec<u8>)>,
block_pos: usize,
current_restart_index: u32,
backward_block: Option<Block>,
backward_block_index: usize,
err: Option<String>,
upper_bound: Option<Vec<u8>>,
block_property_filters: Vec<Arc<dyn crate::options::BlockPropertyFilter>>,
}
impl TableIterator {
pub fn new(reader: Arc<TableReader>) -> Self {
Self {
reader,
index_entries: None,
index_pos: 0,
current_block: None,
block_cursor_offset: 0,
block_data_end: 0,
block_cursor_key: Vec::new(),
sequential_reads: 0,
prev_block_index: usize::MAX,
at_first_key_from_index: false,
deferred_index_pos: 0,
current_block_entries: Vec::new(),
block_pos: 0,
current_restart_index: 0,
backward_block: None,
backward_block_index: usize::MAX,
err: None,
upper_bound: None,
block_property_filters: Vec::new(),
}
}
pub fn with_block_filters(
mut self,
filters: Vec<Arc<dyn crate::options::BlockPropertyFilter>>,
) -> Self {
self.block_property_filters = filters;
self
}
fn ensure_index(&mut self) -> &IndexEntries {
if self.index_entries.is_none() {
match self.reader.cached_index_entries() {
Ok(entries) => self.index_entries = Some(entries),
Err(e) => {
self.err = Some(format!("index decode error: {}", e));
self.index_entries = Some(Arc::new(Vec::new()));
}
}
}
self.index_entries.as_ref().unwrap()
}
fn set_block_for_cursor(&mut self, block: Block) {
self.block_data_end = block.data_end_offset();
self.block_cursor_offset = 0;
self.block_cursor_key.clear();
self.current_block = Some(block);
self.current_block_entries.clear();
self.block_pos = 0;
}
fn reset_positioning_state(&mut self) {
self.current_block = None;
self.block_cursor_offset = 0;
self.block_data_end = 0;
self.block_cursor_key.clear();
self.at_first_key_from_index = false;
self.deferred_index_pos = 0;
self.current_block_entries.clear();
self.block_pos = 0;
self.current_restart_index = 0;
self.backward_block = None;
self.backward_block_index = usize::MAX;
}
fn block_properties_should_skip(&self, entry: &IndexEntry) -> bool {
if self.block_property_filters.is_empty() {
return false;
}
self.block_property_filters.iter().any(|filter| {
let filter_name = filter.name().as_bytes();
entry
.properties
.iter()
.any(|(name, data)| name.as_slice() == filter_name && filter.should_skip(data))
})
}
fn block_exceeds_upper_bound(&self, entry: &IndexEntry) -> bool {
self.upper_bound.as_ref().is_some_and(|ub| {
entry
.first_key
.as_ref()
.is_some_and(|fk| user_key(fk) >= ub.as_slice())
})
}
fn materialize_deferred_block(&mut self) {
if let Some(ref index_entries) = self.index_entries {
match self
.reader
.read_block_cached(&index_entries[self.deferred_index_pos].handle)
{
Ok(data) => match Block::new(data) {
Ok(block) => {
self.set_block_for_cursor(block);
}
Err(e) => {
self.err = Some(format!("block decode error: {e}"));
}
},
Err(e) => {
self.err = Some(format!("block read error: {e}"));
}
}
}
}
fn cursor_next_lazy(&mut self) -> Option<(Vec<u8>, LazyValue)> {
let block = self.current_block.as_ref()?;
if self.block_cursor_offset >= self.block_data_end {
return None;
}
let data = block.data();
let (value_start, value_len, next_offset) =
decode_entry_reuse(data, self.block_cursor_offset, &mut self.block_cursor_key)?;
if let Some(ref ub) = self.upper_bound
&& user_key(&self.block_cursor_key) >= ub.as_slice()
{
return None;
}
self.block_cursor_offset = next_offset;
let lazy_val = LazyValue::BlockRef {
data: block.data_arc().clone(),
offset: value_start as u32,
len: value_len as u32,
};
Some((self.block_cursor_key.clone(), lazy_val))
}
fn cursor_next(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
let (k, lv) = self.cursor_next_lazy()?;
Some((k, lv.into_vec()))
}
pub fn seek(&mut self, target: &[u8]) {
self.ensure_index();
self.reset_positioning_state();
let index_entries = self.index_entries.as_ref().unwrap();
if let Some(last) = index_entries.last()
&& compare_internal_key(target, &last.separator_key) == Ordering::Greater
{
self.index_pos = index_entries.len();
return;
}
let idx = index_entries.partition_point(|entry| {
compare_internal_key(&entry.separator_key, target) == Ordering::Less
});
self.index_pos = idx;
while self.index_pos < index_entries.len() {
let entry = &index_entries[self.index_pos];
if self.block_exceeds_upper_bound(entry) {
self.index_pos = index_entries.len();
return;
}
self.index_pos += 1;
if self.block_properties_should_skip(entry) {
continue;
}
if let Some(ref first_key) = entry.first_key
&& compare_internal_key(target, first_key) != Ordering::Greater
{
self.at_first_key_from_index = true;
self.deferred_index_pos = self.index_pos - 1;
self.block_cursor_key.clear();
self.block_cursor_key.extend_from_slice(first_key);
self.current_block = None;
return;
}
match self.reader.read_block_cached(&entry.handle) {
Ok(data) => match Block::new(data) {
Ok(block) => {
self.seek_within_block(block, target, compare_internal_key);
}
Err(e) => {
self.err = Some(format!("block decode error in seek: {e}"));
}
},
Err(e) => {
self.err = Some(format!("block read error in seek: {e}"));
}
}
return;
}
}
fn seek_within_block<F: Fn(&[u8], &[u8]) -> Ordering>(
&mut self,
block: Block,
target: &[u8],
compare: F,
) {
let data_end = block.data_end_offset();
let block_data = block.data();
let num_restarts = block.num_restarts();
let mut left = 0u32;
let mut right = num_restarts;
let mut tmp_key = Vec::new();
while left < right {
let mid = left + (right - left) / 2;
let rp_offset = data_end + (mid as usize) * 4;
let rp = u32::from_le_bytes(block_data[rp_offset..rp_offset + 4].try_into().unwrap())
as usize;
tmp_key.clear();
match decode_entry_reuse(block_data, rp, &mut tmp_key) {
Some(_) => {
if compare(&tmp_key, target) == Ordering::Less {
left = mid + 1;
} else {
right = mid;
}
}
None => right = mid,
}
}
let start = if left > 0 {
let rp_offset = data_end + ((left - 1) as usize) * 4;
u32::from_le_bytes(block_data[rp_offset..rp_offset + 4].try_into().unwrap()) as usize
} else {
0
};
self.block_cursor_key.clear();
let mut prev_key_snapshot: Vec<u8> = Vec::new();
let mut offset = start;
while offset < data_end {
let entry_offset = offset;
prev_key_snapshot.clear();
prev_key_snapshot.extend_from_slice(&self.block_cursor_key);
match decode_entry_reuse(block_data, offset, &mut self.block_cursor_key) {
Some((_, _, next_off)) => {
if compare(&self.block_cursor_key, target) != Ordering::Less {
self.block_cursor_key.clear();
self.block_cursor_key.extend_from_slice(&prev_key_snapshot);
self.block_cursor_offset = entry_offset;
self.block_data_end = data_end;
self.current_block = Some(block);
return;
}
offset = next_off;
}
None => break,
}
}
self.block_cursor_offset = data_end;
self.block_data_end = data_end;
self.block_cursor_key.clear();
self.current_block = Some(block);
}
pub fn seek_for_prev(&mut self, target: &[u8]) {
self.ensure_index();
self.reset_positioning_state();
let index_entries = self.index_entries.as_ref().unwrap();
let idx = index_entries.partition_point(|entry| {
compare_internal_key(&entry.separator_key, target) == Ordering::Less
});
let mut found = false;
let mut try_idx = idx;
loop {
if try_idx >= index_entries.len() {
if try_idx == 0 {
break;
}
try_idx -= 1;
continue;
}
let entry = &index_entries[try_idx];
if self.block_properties_should_skip(entry) {
if try_idx == 0 {
break;
}
try_idx -= 1;
continue;
}
let handle = entry.handle;
let block_result = self.reader.read_block_cached(&handle).and_then(Block::new);
match block_result {
Err(e) => {
self.err = Some(format!("block read error in seek_for_prev: {e}"));
break;
}
Ok(block) => match block.seek_for_prev_by(target, compare_internal_key) {
Some((found_key, _found_val)) => {
let restart_idx =
self.find_restart_for_key(&block, &found_key, compare_internal_key);
let entries_from_restart = block.iter_from_restart(restart_idx);
let pos_in_entries = entries_from_restart
.iter()
.rposition(|(k, _)| {
compare_internal_key(k, target) != Ordering::Greater
})
.unwrap_or(0);
self.index_pos = try_idx + 1;
self.current_block_entries = entries_from_restart;
self.block_pos = pos_in_entries;
self.current_restart_index = restart_idx;
self.backward_block = Some(block);
self.backward_block_index = try_idx;
found = true;
break;
}
None => {
}
}, }
if try_idx == 0 {
break;
}
try_idx -= 1;
}
if !found {
self.index_pos = index_entries.len();
self.current_block_entries.clear();
self.block_pos = 0;
self.backward_block = None;
}
}
fn find_restart_for_key<F: Fn(&[u8], &[u8]) -> Ordering>(
&self,
block: &Block,
key: &[u8],
compare: F,
) -> u32 {
let num = block.num_restarts();
if num <= 1 {
return 0;
}
let mut left = 0u32;
let mut right = num;
while left < right {
let mid = left + (right - left) / 2;
if let Some(first_key) = block.first_key_at_restart(mid) {
if compare(&first_key, key) != Ordering::Greater {
left = mid + 1;
} else {
right = mid;
}
} else {
right = mid;
}
}
left.saturating_sub(1)
}
pub fn prev(&mut self) -> Option<(Vec<u8>, Vec<u8>)> {
if self.block_pos > 0 {
self.block_pos -= 1;
return Some(self.current_block_entries[self.block_pos].clone());
}
if self.current_restart_index > 0
&& let Some(ref block) = self.backward_block
{
self.current_restart_index -= 1;
self.current_block_entries = block.iter_restart_segment(self.current_restart_index);
if !self.current_block_entries.is_empty() {
self.block_pos = self.current_block_entries.len() - 1;
return Some(self.current_block_entries[self.block_pos].clone());
}
}
let current_block_index = if self.backward_block_index < usize::MAX {
self.backward_block_index
} else if self.index_pos > 0 {
self.index_pos - 1
} else {
return None;
};
if current_block_index == 0 {
return None; }
self.ensure_index();
let index_entries = self.index_entries.as_ref().unwrap();
let mut prev_block_index = current_block_index - 1;
loop {
let entry = &index_entries[prev_block_index];
if self.block_properties_should_skip(entry) {
if prev_block_index == 0 {
return None;
}
prev_block_index -= 1;
continue;
}
match self.reader.read_block_cached(&entry.handle) {
Ok(data) => match Block::new(data) {
Ok(block) => {
let last_restart = block.num_restarts().saturating_sub(1);
self.current_block_entries = block.iter_restart_segment(last_restart);
if self.current_block_entries.is_empty() {
if prev_block_index == 0 {
return None;
}
prev_block_index -= 1;
continue;
}
self.block_pos = self.current_block_entries.len() - 1;
self.index_pos = prev_block_index + 1;
self.current_restart_index = last_restart;
self.backward_block = Some(block);
self.backward_block_index = prev_block_index;
return Some(self.current_block_entries[self.block_pos].clone());
}
Err(e) => {
self.err = Some(format!("block decode error in prev: {e}"));
}
},
Err(e) => {
self.err = Some(format!("block read error in prev: {e}"));
}
}
return None;
}
}
pub fn current(&self) -> Option<(Vec<u8>, Vec<u8>)> {
if self.block_pos < self.current_block_entries.len() {
Some(self.current_block_entries[self.block_pos].clone())
} else {
None
}
}
fn load_next_block(&mut self) -> bool {
self.ensure_index();
let index_entries = self.index_entries.as_ref().unwrap();
while self.index_pos < index_entries.len() {
let block_idx = self.index_pos;
let entry = &index_entries[block_idx];
if self.block_exceeds_upper_bound(entry) {
self.index_pos = index_entries.len();
return false;
}
if self.block_properties_should_skip(entry) {
self.index_pos += 1;
continue;
}
let handle = index_entries[self.index_pos].handle;
self.index_pos += 1;
if block_idx == self.prev_block_index.wrapping_add(1) {
self.sequential_reads += 1;
if self.sequential_reads >= 2 {
self.maybe_readahead(index_entries, block_idx);
}
} else {
self.sequential_reads = 0;
}
self.prev_block_index = block_idx;
match self.reader.read_block_cached(&handle) {
Ok(data) => match Block::new(data) {
Ok(block) => {
if block.data_end_offset() > 0 {
self.set_block_for_cursor(block);
return true;
}
}
Err(e) => {
self.err = Some(format!("block decode error at index {block_idx}: {e}"));
return false;
}
},
Err(e) => {
self.err = Some(format!("block read error at index {block_idx}: {e}"));
return false;
}
}
}
false
}
fn maybe_readahead(&self, index_entries: &[IndexEntry], current_idx: usize) {
let prefetch_count = (self.sequential_reads as usize).min(8);
let start = current_idx + 1;
let end = (start + prefetch_count).min(index_entries.len());
if start >= end {
return;
}
let first_handle = index_entries[start].handle;
let last_handle = index_entries[end - 1].handle;
let offset = first_handle.offset;
let len = (last_handle.offset + last_handle.size + BLOCK_TRAILER_SIZE as u64) - offset;
self.reader.advise_willneed(offset, len);
}
}
impl Iterator for TableIterator {
type Item = (Vec<u8>, Vec<u8>);
fn next(&mut self) -> Option<Self::Item> {
if self.err.is_some() {
return None;
}
loop {
if self.at_first_key_from_index {
self.at_first_key_from_index = false;
self.materialize_deferred_block();
if let Some(entry) = self.cursor_next() {
return Some(entry);
}
}
if let Some(entry) = self.cursor_next() {
return Some(entry);
}
if self.block_pos < self.current_block_entries.len() {
let entry = self.current_block_entries[self.block_pos].clone();
self.block_pos += 1;
return Some(entry);
}
if !self.load_next_block() {
return None;
}
}
}
}
impl crate::iterator::merge::SeekableIterator for TableIterator {
fn seek_to(&mut self, target: &[u8]) {
self.seek(target);
}
fn current(&self) -> Option<(Vec<u8>, LazyValue)> {
TableIterator::current(self).map(|(k, v)| (k, LazyValue::Inline(v)))
}
fn prev(&mut self) -> Option<(Vec<u8>, LazyValue)> {
TableIterator::prev(self).map(|(k, v)| (k, LazyValue::Inline(v)))
}
fn seek_for_prev(&mut self, target: &[u8]) {
TableIterator::seek_for_prev(self, target);
}
fn seek_to_first(&mut self) {
self.ensure_index();
self.reset_positioning_state();
self.index_pos = 0;
}
fn seek_to_last(&mut self) {
self.ensure_index();
self.reset_positioning_state();
let index_entries = self.index_entries.as_ref().unwrap();
if index_entries.is_empty() {
return;
}
let mut last_idx = index_entries.len() - 1;
while self.block_properties_should_skip(&index_entries[last_idx]) {
if last_idx == 0 {
return;
}
last_idx -= 1;
}
let handle = index_entries[last_idx].handle;
match self.reader.read_block_cached(&handle) {
Ok(data) => match Block::new(data) {
Ok(block) => {
let last_restart = block.num_restarts().saturating_sub(1);
self.current_block_entries = block.iter_restart_segment(last_restart);
if !self.current_block_entries.is_empty() {
self.block_pos = self.current_block_entries.len() - 1;
self.index_pos = last_idx + 1;
self.current_restart_index = last_restart;
self.backward_block = Some(block);
self.backward_block_index = last_idx;
}
}
Err(e) => {
self.err = Some(format!("block decode error in seek_to_last: {e}"));
}
},
Err(e) => {
self.err = Some(format!("block read error in seek_to_last: {e}"));
}
}
}
fn next_into(&mut self, key_buf: &mut Vec<u8>, value_buf: &mut Vec<u8>) -> bool {
if self.err.is_some() {
return false;
}
if self.at_first_key_from_index {
self.at_first_key_from_index = false;
self.materialize_deferred_block();
}
loop {
if let Some(ref block) = self.current_block
&& self.block_cursor_offset < self.block_data_end
{
let data = block.data();
if let Some((vs, vl, next)) =
decode_entry_reuse(data, self.block_cursor_offset, &mut self.block_cursor_key)
{
if let Some(ref ub) = self.upper_bound
&& user_key(&self.block_cursor_key) >= ub.as_slice()
{
return false;
}
self.block_cursor_offset = next;
key_buf.clear();
key_buf.extend_from_slice(&self.block_cursor_key);
value_buf.clear();
value_buf.extend_from_slice(&data[vs..vs + vl]);
return true;
}
}
if self.block_pos < self.current_block_entries.len() {
let (ref k, ref v) = self.current_block_entries[self.block_pos];
if let Some(ref ub) = self.upper_bound
&& user_key(k) >= ub.as_slice()
{
return false;
}
key_buf.clear();
key_buf.extend_from_slice(k);
value_buf.clear();
value_buf.extend_from_slice(v);
self.block_pos += 1;
return true;
}
if !self.load_next_block() {
return false;
}
}
}
fn prefetch_first_block(&mut self) {
self.ensure_index();
if let Some(index) = self.index_entries.as_ref()
&& let Some(entry) = index.first()
{
self.reader.advise_willneed(
entry.handle.offset,
entry.handle.size + BLOCK_TRAILER_SIZE as u64,
);
}
}
fn set_bounds(&mut self, _lower: Option<&[u8]>, upper: Option<&[u8]>) {
self.upper_bound = upper.map(|b| b.to_vec());
}
fn iter_error(&self) -> Option<String> {
self.err.clone()
}
fn next_lazy(&mut self, key_buf: &mut Vec<u8>) -> Option<LazyValue> {
if self.err.is_some() {
return None;
}
if self.at_first_key_from_index {
self.at_first_key_from_index = false;
self.materialize_deferred_block();
if self.err.is_some() {
return None;
}
}
if let Some(ref block) = self.current_block
&& self.block_cursor_offset < self.block_data_end
{
let data = block.data();
let result =
decode_entry_reuse(data, self.block_cursor_offset, &mut self.block_cursor_key);
if let Some((value_start, value_len, next_offset)) = result {
if let Some(ref ub) = self.upper_bound
&& user_key(&self.block_cursor_key) >= ub.as_slice()
{
return None;
}
self.block_cursor_offset = next_offset;
key_buf.clear();
key_buf.extend_from_slice(&self.block_cursor_key);
return Some(LazyValue::BlockRef {
data: block.data_arc().clone(),
offset: value_start as u32,
len: value_len as u32,
});
}
}
if self.block_pos < self.current_block_entries.len() {
let (ref k, ref v) = self.current_block_entries[self.block_pos];
if let Some(ref ub) = self.upper_bound
&& user_key(k) >= ub.as_slice()
{
return None;
}
key_buf.clear();
key_buf.extend_from_slice(k);
let lv = LazyValue::Inline(v.clone());
self.block_pos += 1;
return Some(lv);
}
if !self.load_next_block() {
return None;
}
self.next_lazy(key_buf)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::options::{BlockPropertyCollector, BlockPropertyFilter};
use crate::sst::table_builder::{TableBuildOptions, TableBuilder};
fn build_test_table(dir: &Path, count: usize) -> PathBuf {
let path = dir.join("test.sst");
let mut builder = TableBuilder::new(&path, TableBuildOptions::default()).unwrap();
for i in 0..count {
let key = format!("key_{:06}", i);
let val = format!("value_{}", i);
builder.add(key.as_bytes(), val.as_bytes()).unwrap();
}
builder.finish().unwrap();
path
}
#[test]
fn test_table_read_all() {
let dir = tempfile::tempdir().unwrap();
let path = build_test_table(dir.path(), 100);
let reader = TableReader::open(&path).unwrap();
let entries = reader.iter().unwrap();
assert_eq!(entries.len(), 100);
for (i, (k, v)) in entries.iter().enumerate() {
assert_eq!(k, format!("key_{:06}", i).as_bytes());
assert_eq!(v, format!("value_{}", i).as_bytes());
}
}
#[test]
fn test_table_point_lookup() {
let dir = tempfile::tempdir().unwrap();
let path = build_test_table(dir.path(), 100);
let reader = TableReader::open(&path).unwrap();
let val = reader.get(b"key_000050").unwrap();
assert_eq!(val, Some(b"value_50".to_vec()));
let val = reader.get(b"key_000000").unwrap();
assert_eq!(val, Some(b"value_0".to_vec()));
let val = reader.get(b"key_000099").unwrap();
assert_eq!(val, Some(b"value_99".to_vec()));
let val = reader.get(b"key_999999").unwrap();
assert_eq!(val, None);
let val = reader.get(b"aaa").unwrap();
assert_eq!(val, None);
}
#[test]
fn test_table_large() {
let dir = tempfile::tempdir().unwrap();
let path = build_test_table(dir.path(), 10000);
let reader = TableReader::open(&path).unwrap();
for i in (0..10000).step_by(100) {
let key = format!("key_{:06}", i);
let val = format!("value_{}", i);
assert_eq!(
reader.get(key.as_bytes()).unwrap(),
Some(val.into_bytes()),
"failed at key {}",
i
);
}
}
#[test]
fn test_bloom_filter_used() {
let dir = tempfile::tempdir().unwrap();
let path = build_test_table(dir.path(), 100);
let reader = TableReader::open(&path).unwrap();
assert!(reader.filter_data.is_some());
let val = reader.get(b"nonexistent_key_12345").unwrap();
assert_eq!(val, None);
}
#[test]
fn test_internal_key_lookup() {
use crate::types::InternalKey;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("internal.sst");
let mut builder = TableBuilder::new(
&path,
TableBuildOptions {
bloom_bits_per_key: 0, ..Default::default()
},
)
.unwrap();
let ik1 = InternalKey::new(b"aaa", 3, ValueType::Deletion);
let ik2 = InternalKey::new(b"aaa", 5, ValueType::Value);
let ik3 = InternalKey::new(b"bbb", 4, ValueType::Value);
let mut entries = vec![
(ik1.as_bytes().to_vec(), b"".to_vec()),
(ik2.as_bytes().to_vec(), b"val_aaa".to_vec()),
(ik3.as_bytes().to_vec(), b"val_bbb".to_vec()),
];
entries.sort_by(|(a, _), (b, _)| a.cmp(b));
for (k, v) in &entries {
builder.add(k, v).unwrap();
}
builder.finish().unwrap();
let reader = TableReader::open(&path).unwrap();
let result = reader.get_internal(b"aaa", 10).unwrap();
assert!(result.is_some());
let result = reader.get_internal(b"bbb", 10).unwrap();
assert_eq!(result, Some(Some(b"val_bbb".to_vec())));
let result = reader.get_internal(b"ccc", 10).unwrap();
assert_eq!(result, None);
}
#[test]
fn test_table_iterator_streaming() {
let dir = tempfile::tempdir().unwrap();
let path = build_test_table(dir.path(), 500);
let reader = Arc::new(TableReader::open(&path).unwrap());
let mut iter = TableIterator::new(reader);
let mut count = 0;
let mut prev_key: Option<Vec<u8>> = None;
for (k, v) in &mut iter {
let expected_key = format!("key_{:06}", count);
let expected_val = format!("value_{}", count);
assert_eq!(
k,
expected_key.as_bytes(),
"key mismatch at index {}",
count
);
assert_eq!(
v,
expected_val.as_bytes(),
"value mismatch at index {}",
count
);
if let Some(ref pk) = prev_key {
assert!(
k.as_slice() > pk.as_slice(),
"keys not in order at {}",
count
);
}
prev_key = Some(k);
count += 1;
}
assert_eq!(count, 500);
}
fn build_internal_key_table(dir: &Path, count: usize) -> PathBuf {
use crate::types::InternalKey;
let path = dir.join("internal_iter.sst");
let mut builder = TableBuilder::new(
&path,
TableBuildOptions {
bloom_bits_per_key: 0,
..Default::default()
},
)
.unwrap();
let mut entries: Vec<(Vec<u8>, Vec<u8>)> = (0..count)
.map(|i| {
let uk = format!("key_{:06}", i);
let ik = InternalKey::new(uk.as_bytes(), (count - i) as u64, ValueType::Value);
let val = format!("value_{}", i);
(ik.into_bytes(), val.into_bytes())
})
.collect();
entries.sort_by(|(a, _), (b, _)| compare_internal_key(a, b));
for (k, v) in &entries {
builder.add(k, v).unwrap();
}
builder.finish().unwrap();
path
}
#[test]
fn test_table_iterator_seek_for_prev() {
use crate::types::InternalKey;
let dir = tempfile::tempdir().unwrap();
let path = build_internal_key_table(dir.path(), 100);
let reader = Arc::new(TableReader::open(&path).unwrap());
let seek_key =
|uk: &[u8]| -> Vec<u8> { InternalKey::new(uk, 0, ValueType::Deletion).into_bytes() };
let extract_uk = |ikey: &[u8]| -> Vec<u8> {
crate::types::InternalKeyRef::new(ikey).user_key().to_vec()
};
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000050"));
let entry = iter.current().unwrap();
assert_eq!(extract_uk(&entry.0), b"key_000050");
assert_eq!(entry.1, b"value_50");
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000050x"));
let entry = iter.current().unwrap();
assert_eq!(extract_uk(&entry.0), b"key_000050");
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"zzz"));
let entry = iter.current().unwrap();
assert_eq!(extract_uk(&entry.0), b"key_000099");
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"aaa"));
assert!(iter.current().is_none());
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000000"));
let entry = iter.current().unwrap();
assert_eq!(extract_uk(&entry.0), b"key_000000");
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000050"));
iter.block_pos += 1;
let next = iter.next();
assert!(next.is_some());
assert_eq!(extract_uk(&next.unwrap().0), b"key_000051");
}
#[test]
fn test_table_iterator_prev() {
use crate::types::InternalKey;
let dir = tempfile::tempdir().unwrap();
let path = build_internal_key_table(dir.path(), 100);
let reader = Arc::new(TableReader::open(&path).unwrap());
let seek_key =
|uk: &[u8]| -> Vec<u8> { InternalKey::new(uk, 0, ValueType::Deletion).into_bytes() };
let extract_uk = |ikey: &[u8]| -> Vec<u8> {
crate::types::InternalKeyRef::new(ikey).user_key().to_vec()
};
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000050"));
assert_eq!(extract_uk(&iter.current().unwrap().0), b"key_000050");
let prev = iter.prev().unwrap();
assert_eq!(extract_uk(&prev.0), b"key_000049");
let prev = iter.prev().unwrap();
assert_eq!(extract_uk(&prev.0), b"key_000048");
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"zzz"));
assert_eq!(extract_uk(&iter.current().unwrap().0), b"key_000099");
let prev = iter.prev().unwrap();
assert_eq!(extract_uk(&prev.0), b"key_000098");
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000000"));
assert_eq!(extract_uk(&iter.current().unwrap().0), b"key_000000");
assert!(iter.prev().is_none());
let mut iter = TableIterator::new(reader.clone());
iter.seek_for_prev(&seek_key(b"key_000099"));
let mut count = 1; while iter.prev().is_some() {
count += 1;
}
assert_eq!(count, 100, "should be able to prev through all 100 entries");
}
#[derive(Default)]
struct FirstPrefixCollector {
skip_block: bool,
seen: bool,
}
impl BlockPropertyCollector for FirstPrefixCollector {
fn add(&mut self, key: &[u8], _value: &[u8]) {
if self.seen {
return;
}
self.seen = true;
self.skip_block = crate::types::InternalKeyRef::new(key)
.user_key()
.starts_with(b"a_skip_");
}
fn finish_block(&mut self) -> Vec<u8> {
let result = if self.skip_block {
b"skip".to_vec()
} else {
b"keep".to_vec()
};
self.skip_block = false;
self.seen = false;
result
}
fn name(&self) -> &str {
"first-prefix"
}
}
struct SkipBlocksFilter;
impl BlockPropertyFilter for SkipBlocksFilter {
fn should_skip(&self, properties: &[u8]) -> bool {
properties == b"skip"
}
fn name(&self) -> &str {
"first-prefix"
}
}
#[test]
fn test_table_iterator_prev_skips_filtered_blocks() {
use crate::types::InternalKey;
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("filtered_prev.sst");
let mut builder = TableBuilder::new(
&path,
TableBuildOptions {
block_size: 1,
bloom_bits_per_key: 0,
block_property_collectors: vec![Box::<FirstPrefixCollector>::default()],
..Default::default()
},
)
.unwrap();
for prefix in ["a_skip", "z_keep"] {
for i in 0..20 {
let user_key = format!("{}_{:03}", prefix, i);
let ikey = InternalKey::new(user_key.as_bytes(), 100 - i, ValueType::Value);
builder
.add(ikey.as_bytes(), format!("value_{prefix}_{i}").as_bytes())
.unwrap();
}
}
builder.finish().unwrap();
let reader = Arc::new(TableReader::open(&path).unwrap());
let mut iter =
TableIterator::new(reader).with_block_filters(vec![Arc::new(SkipBlocksFilter)]);
let seek_key = InternalKey::new(b"zzzz", 0, ValueType::Deletion);
iter.seek_for_prev(seek_key.as_bytes());
let mut seen = 0;
while let Some((key, _)) = iter.current() {
let user_key = crate::types::InternalKeyRef::new(&key).user_key().to_vec();
assert!(
user_key.starts_with(b"z_keep_"),
"filtered reverse scan yielded skipped key {:?}",
String::from_utf8_lossy(&user_key)
);
seen += 1;
if iter.prev().is_none() {
break;
}
}
assert_eq!(seen, 20);
}
}