use std::cmp::Ordering;
use std::io::Write;
use std::ops::Bound;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use crc32fast::Hasher as Crc32;
use integer_encoding::{FixedInt, FixedIntWriter};
use snap::raw::max_compress_len;
use crate::compression::CompressionSelector;
use crate::error::{Error, Result};
use crate::sstable::block::{Block, BlockData, BlockHandle, BlockIterator, BlockWriter};
use crate::sstable::error::SSTableError;
use crate::sstable::filter_block::{FilterBlockReader, FilterBlockWriter};
use crate::sstable::index_block::{Index, IndexIterator, IndexWriter};
use crate::sstable::meta::TableMetadata;
use crate::vfs::File;
use crate::vlog::{ValueLocation, ValuePointer};
use crate::{
Comparator,
CompressionType,
FilterPolicy,
InternalKey,
InternalKeyKind,
InternalKeyRange,
InternalKeyRef,
LSMIterator,
Options,
Value,
INTERNAL_KEY_SEQ_NUM_MAX,
INTERNAL_KEY_TIMESTAMP_MAX,
};
const TABLE_FOOTER_LENGTH: usize = 42;
const TABLE_FULL_FOOTER_LENGTH: usize = TABLE_FOOTER_LENGTH + 8;
const TABLE_MAGIC_FOOTER_ENCODED: [u8; 8] = [0x57, 0xfb, 0x80, 0x8b, 0x24, 0x75, 0x47, 0xdb];
pub const BLOCK_CKSUM_LEN: usize = 4;
pub const BLOCK_COMPRESS_LEN: usize = 1;
const MASK_DELTA: u32 = 0xa282_ead8;
pub(crate) fn mask(crc: u32) -> u32 {
crc.rotate_right(15).wrapping_add(MASK_DELTA)
}
pub(crate) fn unmask(masked: u32) -> u32 {
let rot = masked.wrapping_sub(MASK_DELTA);
rot.rotate_left(15)
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum ChecksumType {
CRC32c = 1,
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum TableFormat {
LSMV1 = 1,
}
impl TableFormat {
pub(crate) fn from_u8(val: u8) -> Result<Self> {
match val {
1 => Ok(TableFormat::LSMV1),
_ => Err(Error::InvalidTableFormat),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct Footer {
pub format: TableFormat,
pub checksum: ChecksumType,
pub meta_index: BlockHandle, pub index: BlockHandle, }
impl Footer {
pub(crate) fn new(metaix: BlockHandle, index: BlockHandle) -> Footer {
Footer {
meta_index: metaix,
index,
format: TableFormat::LSMV1,
checksum: ChecksumType::CRC32c,
}
}
pub(crate) fn read_from(reader: Arc<dyn File>, file_size: usize) -> Result<Vec<u8>> {
if file_size < TABLE_FULL_FOOTER_LENGTH {
return Err(Error::from(SSTableError::FileTooSmall {
file_size,
min_size: TABLE_FULL_FOOTER_LENGTH,
}));
}
let mut buf = vec![0; TABLE_FULL_FOOTER_LENGTH];
let offset = file_size - TABLE_FULL_FOOTER_LENGTH;
reader.read_at(offset as u64, &mut buf)?;
Ok(buf)
}
pub(crate) fn decode(buf: &[u8]) -> Result<Footer> {
let magic = &buf[buf.len() - TABLE_MAGIC_FOOTER_ENCODED.len()..];
if magic != TABLE_MAGIC_FOOTER_ENCODED {
return Err(Error::from(SSTableError::BadMagicNumber {
magic: magic.to_vec(),
}));
}
if buf.len() < TABLE_FOOTER_LENGTH {
return Err(Error::from(SSTableError::FooterTooShort {
footer_len: buf.len(),
}));
}
let format = TableFormat::from_u8(buf[0])?;
let checksum = match buf[1] {
1 => ChecksumType::CRC32c,
_ => {
return Err(Error::from(SSTableError::InvalidChecksumType {
checksum_type: buf[1],
}))
}
};
let (meta_index, metalen) = BlockHandle::decode(&buf[2..])?;
if metalen == 0 {
return Err(Error::from(SSTableError::BadMetaIndexBlockHandle));
}
let (index_handle, _) = BlockHandle::decode(&buf[2 + metalen..])?;
Ok(Footer {
format,
checksum,
meta_index,
index: index_handle,
})
}
pub(crate) fn encode(&self, dst: &mut [u8]) {
match self.format {
TableFormat::LSMV1 => {
dst[..TABLE_FOOTER_LENGTH].fill(0);
dst[0] = self.format as u8;
dst[1] = self.checksum as u8;
let n = self.meta_index.encode_into(&mut dst[2..]);
self.index.encode_into(&mut dst[2 + n..]);
dst[TABLE_FOOTER_LENGTH..TABLE_FULL_FOOTER_LENGTH]
.copy_from_slice(&TABLE_MAGIC_FOOTER_ENCODED);
}
}
}
}
pub(crate) struct TableWriter<W: Write> {
writer: W,
opts: Arc<Options>,
compression_selector: CompressionSelector,
target_level: u8,
pub(crate) meta: TableMetadata,
offset: usize,
prev_block_last_key: Vec<u8>,
data_block: Option<BlockWriter>,
partitioned_index: IndexWriter,
filter_block: Option<FilterBlockWriter>,
internal_cmp: Arc<dyn Comparator>,
min_vlog_file_id: Option<u32>,
}
impl<W: Write> TableWriter<W> {
pub(crate) fn new(writer: W, id: u64, opts: Arc<Options>, target_level: u8) -> Self {
let fb = {
if let Some(policy) = opts.filter_policy.clone() {
let mut f = FilterBlockWriter::new(Arc::clone(&policy));
f.start_block(0);
Some(f)
} else {
None
}
};
let compression_selector = CompressionSelector::new(opts.compression_per_level.clone());
let mut meta = TableMetadata::new();
meta.properties.id = id;
meta.properties.compression = compression_selector.select_compression(target_level);
TableWriter {
writer,
opts: Arc::clone(&opts),
compression_selector,
target_level,
offset: 0,
meta,
prev_block_last_key: Vec::new(),
data_block: Some(BlockWriter::new(
opts.block_size,
opts.block_restart_interval,
Arc::clone(&opts.internal_comparator),
)),
partitioned_index: IndexWriter::new(Arc::clone(&opts), opts.index_partition_size),
filter_block: fb,
internal_cmp: Arc::clone(&opts.internal_comparator) as Arc<dyn Comparator>,
min_vlog_file_id: None,
}
}
pub(crate) fn add(&mut self, key: InternalKey, val: &[u8]) -> Result<()> {
assert!(self.data_block.is_some());
let enc_key = key.encode();
if !self.prev_block_last_key.is_empty() {
let order = self.internal_cmp.compare(&self.prev_block_last_key, &enc_key);
assert_eq!(order, Ordering::Less, "Keys must be in ascending order");
}
if self.filter_block.is_none() {
if let Some(filter_policy) = self.opts.filter_policy.as_ref() {
let mut filter_block = FilterBlockWriter::new(Arc::clone(filter_policy));
filter_block.start_block(0);
self.filter_block = Some(filter_block);
}
}
if let Ok(location) = ValueLocation::decode(val) {
if location.is_value_pointer() {
if let Ok(pointer) = ValuePointer::decode(&location.value) {
let file_id = pointer.file_id;
self.min_vlog_file_id =
Some(self.min_vlog_file_id.map_or(file_id, |min| min.min(file_id)));
}
}
}
self.update_meta_properties(&key, val);
if self.data_block.as_ref().unwrap().size_estimate() > self.opts.block_size {
self.write_data_block(&enc_key)?;
}
let dblock = self.data_block.as_mut().expect("No data block available");
if let Some(fblock) = self.filter_block.as_mut() {
fblock.add_key(key.user_key.as_slice());
}
dblock.add(&enc_key, val)?;
Ok(())
}
fn write_data_block(&mut self, next_key: &[u8]) -> Result<()> {
assert!(self.data_block.is_some(), "No data block available to write.");
let block = self.data_block.take().expect("Failed to take the existing data block");
let props = &mut self.meta.properties;
props.num_data_blocks += 1;
props.block_count += 1;
if props.block_size == 0 {
props.block_size = block.size_estimate() as u32;
}
let separator_key = self.internal_cmp.separator(&block.last_key, next_key);
self.prev_block_last_key.clone_from(&block.last_key);
let contents = block.finish()?;
let compression_type = self.compression_selector.select_compression(self.target_level);
let handle = self.write_compressed_block(contents, compression_type)?;
let handle_encoded = handle.encode();
self.partitioned_index.add(&separator_key, &handle_encoded)?;
self.data_block = Some(BlockWriter::new(
self.opts.block_size,
self.opts.block_restart_interval,
Arc::clone(&self.opts.internal_comparator),
));
Ok(())
}
pub(crate) fn finish(mut self) -> Result<usize> {
self.meta.properties.created_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
let err = Error::from(SSTableError::FailedToGetSystemTime {
source: e.to_string(),
});
log::error!("[TABLE_WRITER] {}", err);
err
})?
.as_nanos();
self.meta.properties.seqnos =
(self.meta.smallest_seq_num.unwrap_or(0), self.meta.largest_seq_num.unwrap_or(0));
self.meta.properties.oldest_vlog_file_id = self.min_vlog_file_id.unwrap_or(0) as u64;
if self.data_block.as_ref().is_some_and(|db| db.entries() > 0) {
let key_past_last =
self.internal_cmp.successor(&self.data_block.as_ref().unwrap().last_key);
self.write_data_block(&key_past_last)?;
}
let mut meta_ix_block = BlockWriter::new(
self.opts.block_size,
self.opts.block_restart_interval,
Arc::clone(&self.opts.internal_comparator),
);
if let Some(fblock) = self.filter_block.take() {
let filter_key = format!("filter.{}", fblock.filter_name());
let fblock_data = fblock.finish();
if !fblock_data.is_empty() {
let fblock_handle =
self.write_compressed_block(fblock_data, CompressionType::None)?;
self.meta.properties.filter_size = fblock_handle.size as u64;
let mut handle_enc = vec![0u8; 16];
let enc_len = fblock_handle.encode_into(&mut handle_enc);
let filter_key =
InternalKey::new(Vec::from(filter_key.as_bytes()), 0, InternalKeyKind::Set, 0);
meta_ix_block.add(&filter_key.encode(), &handle_enc[0..enc_len])?;
}
}
let compression_type = self.compression_selector.select_compression(self.target_level);
let (ix_handle, new_offset) =
self.partitioned_index.finish(&mut self.writer, compression_type, self.offset)?;
self.offset = new_offset;
self.meta.properties.index_size = self.partitioned_index.index_size();
self.meta.properties.index_partitions = self.partitioned_index.num_partitions();
self.meta.properties.top_level_index_size = self.partitioned_index.top_level_index_size();
let meta_key = InternalKey::new(Vec::from(b"meta"), 0, InternalKeyKind::Set, 0);
let meta_value = self.meta.encode();
meta_ix_block.add(&meta_key.encode(), &meta_value)?;
let meta_block = meta_ix_block.finish()?;
let meta_ix_handle = self.write_compressed_block(meta_block, CompressionType::None)?;
let footer = Footer::new(meta_ix_handle, ix_handle);
let mut buf = vec![0u8; TABLE_FULL_FOOTER_LENGTH];
footer.encode(&mut buf);
self.offset += self.writer.write(&buf[..])?;
self.writer.flush()?;
Ok(self.offset)
}
fn write_compressed_block(
&mut self,
block_data: BlockData,
compression_type: CompressionType,
) -> Result<BlockHandle> {
let compressed_block = compress_block(block_data, compression_type)?;
let (handle, new_offset) = write_block_at_offset(
&mut self.writer,
compressed_block,
compression_type,
self.offset,
)?;
self.offset = new_offset;
Ok(handle)
}
fn update_meta_properties(&mut self, key: &InternalKey, value: &[u8]) {
let seq_num = key.seq_num();
self.meta.update_seq_num(seq_num);
if self.meta.smallest_point.is_none() {
self.meta.set_smallest_point_key(key.clone());
}
self.meta.set_largest_point_key(key.clone());
let props = &mut self.meta.properties;
props.num_entries += 1;
props.item_count += 1;
props.raw_key_size += key.size() as u64;
props.raw_value_size += value.len() as u64;
let ts = key.timestamp;
props.oldest_key_time = Some(props.oldest_key_time.map_or(ts, |t| t.min(ts)));
props.newest_key_time = Some(props.newest_key_time.map_or(ts, |t| t.max(ts)));
if key.kind() == InternalKeyKind::RangeDelete {
props.num_range_deletions += 1;
}
if key.is_tombstone() {
props.num_deletions += 1;
props.tombstone_count += 1;
if key.kind() == InternalKeyKind::SoftDelete {
props.num_soft_deletes += 1;
}
}
props.key_count += 1;
props.data_size += (key.encode().len() + value.len()) as u64;
}
}
pub(crate) fn write_block_at_offset<W: Write>(
writer: &mut W,
block: BlockData,
compression_type: CompressionType,
offset: usize,
) -> Result<(BlockHandle, usize)> {
let cksum = calculate_checksum(&block, compression_type);
writer.write_all(&block)?;
writer.write_all(&[compression_type as u8; BLOCK_COMPRESS_LEN])?;
writer.write_fixedint(mask(cksum.finalize()))?;
let handle = BlockHandle::new(offset, block.len());
let new_offset = offset + block.len() + BLOCK_CKSUM_LEN + BLOCK_COMPRESS_LEN;
Ok((handle, new_offset))
}
pub(crate) fn compress_block(
raw_block: BlockData,
compression: CompressionType,
) -> Result<BlockData> {
match compression {
CompressionType::SnappyCompression => {
let mut enc = snap::raw::Encoder::new();
let mut buffer = vec![0; max_compress_len(raw_block.len())];
match enc.compress(&raw_block, buffer.as_mut_slice()) {
Ok(size) => buffer.truncate(size),
Err(e) => return Err(Error::Compression(e.to_string())),
}
Ok(buffer)
}
CompressionType::None => Ok(raw_block),
}
}
pub(crate) fn decompress_block(
compressed_block: &[u8],
compression: CompressionType,
) -> Result<Vec<u8>> {
match compression {
CompressionType::SnappyCompression => {
let mut dec = snap::raw::Decoder::new();
dec.decompress_vec(compressed_block).map_err(|e| Error::Decompression(e.to_string()))
}
CompressionType::None => Ok(Vec::from(compressed_block)),
}
}
fn read_footer(f: Arc<dyn File>, file_size: usize) -> Result<Footer> {
let buf = Footer::read_from(f, file_size)?;
Footer::decode(&buf)
}
fn read_bytes(f: Arc<dyn File>, location: &BlockHandle) -> Result<Vec<u8>> {
let mut buf = vec![0; location.size()];
f.read_at(location.offset() as u64, &mut buf).map(|_| buf)
}
pub(crate) fn calculate_checksum(block: &[u8], compression_type: CompressionType) -> Crc32 {
let mut cksum = Crc32::new();
cksum.update(block);
cksum.update(&[compression_type as u8; BLOCK_COMPRESS_LEN]);
cksum
}
pub(crate) fn read_filter_block(
src: Arc<dyn File>,
location: &BlockHandle,
policy: Arc<dyn FilterPolicy>,
) -> Result<FilterBlockReader> {
if location.size() == 0 {
return Err(Error::FilterBlockEmpty);
}
let buf = read_bytes(src, location)?;
Ok(FilterBlockReader::new(buf, policy))
}
fn read_writer_meta_properties(metaix: &Block) -> Result<Option<TableMetadata>> {
let meta_key = InternalKey::new(Vec::from(b"meta"), 0, InternalKeyKind::Set, 0).encode();
let mut metaindexiter = metaix.iter()?;
metaindexiter.seek_internal(&meta_key)?;
if metaindexiter.is_valid() {
let k = metaindexiter.key();
assert_eq!(k.user_key(), b"meta");
let buf_bytes = metaindexiter.value_encoded()?;
return Ok(Some(TableMetadata::decode(buf_bytes)?));
}
Ok(None)
}
pub(crate) fn read_table_block(
comparator: Arc<dyn Comparator>,
f: Arc<dyn File>,
location: &BlockHandle,
) -> Result<Block> {
let buf = read_bytes(Arc::clone(&f), location)?;
let compress = read_bytes(
Arc::clone(&f),
&BlockHandle::new(location.offset() + location.size(), BLOCK_COMPRESS_LEN),
)?;
let cksum = read_bytes(
Arc::clone(&f),
&BlockHandle::new(
location.offset() + location.size() + BLOCK_COMPRESS_LEN,
BLOCK_CKSUM_LEN,
),
)?;
if !verify_table_block(&buf, compress[0], unmask(u32::decode_fixed(&cksum).unwrap())) {
return Err(Error::from(SSTableError::ChecksumVerificationFailed {
block_offset: location.offset() as u64,
}));
}
let block = decompress_block(&buf, CompressionType::try_from(compress[0])?)?;
Ok(Block::new(block, comparator))
}
fn verify_table_block(block: &[u8], compression_type: u8, want: u32) -> bool {
let mut cksum = Crc32::new();
cksum.update(block);
cksum.update(&[compression_type; BLOCK_COMPRESS_LEN]);
cksum.finalize() == want
}
#[derive(Clone)]
pub enum IndexType {
Partitioned(Index),
}
#[derive(Clone)]
pub(crate) struct Table {
pub id: u64,
pub file: Arc<dyn File>,
#[allow(unused)]
pub file_size: u64,
pub(crate) opts: Arc<Options>,
pub(crate) meta: TableMetadata,
pub(crate) index_block: IndexType,
pub(crate) filter_reader: Option<FilterBlockReader>,
}
impl Table {
pub(crate) fn new(
id: u64,
opts: Arc<Options>,
file: Arc<dyn File>,
file_size: u64,
) -> Result<Table> {
let footer = read_footer(Arc::clone(&file), file_size as usize)?;
let index_block = {
let partitioned_index =
Index::new(id, Arc::clone(&opts), Arc::clone(&file), &footer.index)?;
IndexType::Partitioned(partitioned_index)
};
let metaindexblock = read_table_block(
Arc::clone(&opts.internal_comparator),
Arc::clone(&file),
&footer.meta_index,
)?;
let writer_metadata =
read_writer_meta_properties(&metaindexblock)?.ok_or(Error::TableMetadataNotFound)?;
let filter_reader = if opts.filter_policy.is_some() {
Self::read_filter_block(&metaindexblock, Arc::clone(&file), &opts)?
} else {
None
};
Ok(Table {
id,
file,
file_size,
opts,
filter_reader,
index_block,
meta: writer_metadata,
})
}
fn read_filter_block(
metaix: &Block,
file: Arc<dyn File>,
options: &Options,
) -> Result<Option<FilterBlockReader>> {
let filter_name = format!("filter.{}", options.filter_policy.as_ref().unwrap().name());
let filter_key =
InternalKey::new(Vec::from(filter_name.as_bytes()), 0, InternalKeyKind::Set, 0);
let mut metaindexiter = metaix.iter()?;
metaindexiter.seek_internal(&filter_key.encode())?;
if metaindexiter.is_valid() {
let k = metaindexiter.key();
assert_eq!(k.user_key(), filter_name.as_bytes());
let val = metaindexiter.value_encoded()?;
let fbl = BlockHandle::decode(val);
let filter_block_location = match fbl {
Err(e) => {
return Err(Error::from(SSTableError::FailedToDecodeBlockHandle {
value_bytes: val.to_vec(),
context: format!("error: {:?}", e),
}));
}
Ok(res) => res.0,
};
if filter_block_location.size() > 0 {
return Ok(Some(read_filter_block(
file,
&filter_block_location,
Arc::clone(options.filter_policy.as_ref().unwrap()),
)?));
}
}
Ok(None)
}
pub(crate) fn read_block(&self, location: &BlockHandle) -> Result<Arc<Block>> {
if let Some(block) = self.opts.block_cache.get_data_block(self.id, location.offset() as u64)
{
return Ok(block);
}
let b = read_table_block(
Arc::clone(&self.opts.internal_comparator),
Arc::clone(&self.file),
location,
)?;
let b = Arc::new(b);
self.opts.block_cache.insert_data_block(self.id, location.offset() as u64, Arc::clone(&b));
Ok(b)
}
pub(crate) fn read_block_with_comparator(
&self,
location: &BlockHandle,
comparator: Arc<dyn Comparator>,
) -> Result<Arc<Block>> {
if let Some(block) =
self.opts.block_cache.get_data_block_history(self.id, location.offset() as u64)
{
return Ok(block);
}
let b = read_table_block(comparator, Arc::clone(&self.file), location)?;
let b = Arc::new(b);
self.opts.block_cache.insert_data_block_history(
self.id,
location.offset() as u64,
Arc::clone(&b),
);
Ok(b)
}
pub(crate) fn get(&self, key: &InternalKey) -> Result<Option<(InternalKey, Value)>> {
let key_encoded = key.encode();
if let Some(ref filters) = self.filter_reader {
if !filters.may_contain(key.user_key.as_slice(), 0) {
return Ok(None);
}
}
let IndexType::Partitioned(partitioned_index) = &self.index_block;
let Some((_, partition_handle)) =
partitioned_index.find_block_handle_by_key(&key_encoded)?
else {
return Ok(None);
};
let partition_block = partitioned_index.load_block(partition_handle)?;
let mut partition_iter = partition_block.iter()?;
partition_iter.seek_internal(&key_encoded)?;
if !partition_iter.is_valid() {
return Ok(None);
}
let (data_handle, _) = BlockHandle::decode(partition_iter.value_bytes()).map_err(|e| {
Error::from(SSTableError::FailedToDecodeBlockHandle {
value_bytes: partition_iter.value_bytes().to_vec(),
context: format!("Failed to decode BlockHandle in get(): {e}"),
})
})?;
let data_block = self.read_block(&data_handle)?;
let mut iter = data_block.iter()?;
iter.seek_internal(&key_encoded)?;
if iter.is_valid() && iter.user_key() == key.user_key.as_slice() {
Ok(Some((iter.key().to_owned(), iter.value_encoded()?.to_vec())))
} else {
Ok(None)
}
}
pub(crate) fn iter(&self, range: Option<InternalKeyRange>) -> Result<TableIterator<'_>> {
let range = range.unwrap_or((Bound::Unbounded, Bound::Unbounded));
TableIterator::new(self, range)
}
pub(crate) fn iter_with_comparator(
&self,
range: Option<InternalKeyRange>,
comparator: Arc<dyn Comparator>,
) -> Result<TableIterator<'_>> {
let range = range.unwrap_or((Bound::Unbounded, Bound::Unbounded));
TableIterator::new_with_comparator(self, range, comparator)
}
pub(crate) fn is_key_in_key_range(&self, key: &InternalKey) -> bool {
let Some(smallest) = &self.meta.smallest_point else {
return true;
};
let Some(largest) = &self.meta.largest_point else {
return true;
};
self.opts.comparator.compare(key.user_key.as_slice(), &smallest.user_key) >= Ordering::Equal
&& self.opts.comparator.compare(key.user_key.as_slice(), &largest.user_key)
<= Ordering::Equal
}
pub(crate) fn is_before_range(&self, range: &InternalKeyRange) -> bool {
let Some(largest) = &self.meta.largest_point else {
return false; };
match &range.0 {
Bound::Unbounded => false, Bound::Included(k) => {
self.opts.comparator.compare(&largest.user_key, &k.user_key) == Ordering::Less
}
Bound::Excluded(k) => {
self.opts.comparator.compare(&largest.user_key, &k.user_key) != Ordering::Greater
}
}
}
pub(crate) fn is_after_range(&self, range: &InternalKeyRange) -> bool {
let Some(smallest) = &self.meta.smallest_point else {
return false; };
match &range.1 {
Bound::Unbounded => false, Bound::Included(k) => {
self.opts.comparator.compare(&smallest.user_key, &k.user_key) == Ordering::Greater
}
Bound::Excluded(k) => {
self.opts.comparator.compare(&smallest.user_key, &k.user_key) != Ordering::Less
}
}
}
pub(crate) fn overlaps_with_range(&self, range: &InternalKeyRange) -> bool {
!self.is_before_range(range) && !self.is_after_range(range)
}
}
pub(crate) struct TableIterator<'a> {
table: &'a Table,
first_level: IndexIterator<'a>,
second_level: Option<BlockIterator>,
range: InternalKeyRange,
exhausted: bool,
custom_comparator: Option<Arc<dyn Comparator>>,
}
impl<'a> TableIterator<'a> {
pub(crate) fn new(table: &'a Table, range: InternalKeyRange) -> Result<Self> {
let IndexType::Partitioned(ref partitioned_index) = table.index_block;
Ok(Self {
table,
first_level: IndexIterator::new(partitioned_index),
second_level: None,
range,
exhausted: false,
custom_comparator: None,
})
}
pub(crate) fn new_with_comparator(
table: &'a Table,
range: InternalKeyRange,
comparator: Arc<dyn Comparator>,
) -> Result<Self> {
let IndexType::Partitioned(ref partitioned_index) = table.index_block;
Ok(Self {
table,
first_level: IndexIterator::new(partitioned_index),
second_level: None,
range,
exhausted: false,
custom_comparator: Some(comparator),
})
}
fn is_valid(&self) -> bool {
!self.exhausted && self.second_level.as_ref().is_some_and(|iter| iter.is_valid())
}
fn mark_exhausted(&mut self) {
self.exhausted = true;
self.second_level = None;
}
fn init_data_block(&mut self) -> Result<()> {
if !self.first_level.valid() {
self.second_level = None;
return Ok(());
}
let handle = self.first_level.block_handle()?;
let iter = if let Some(ref comparator) = self.custom_comparator {
let block = self.table.read_block_with_comparator(&handle, Arc::clone(comparator))?;
block.iter()?
} else {
let block = self.table.read_block(&handle)?;
block.iter()?
};
self.second_level = Some(iter);
Ok(())
}
fn advance_to_valid_entry(&mut self) -> Result<()> {
loop {
if self.second_level.as_ref().is_some_and(|iter| iter.is_valid()) {
return Ok(());
}
if !self.first_level.valid() {
self.second_level = None;
return Ok(());
}
self.first_level.next()?;
self.init_data_block()?;
if let Some(ref mut iter) = self.second_level {
iter.seek_to_first()?;
}
}
}
fn retreat_to_valid_entry(&mut self) -> Result<()> {
loop {
if self.second_level.as_ref().is_some_and(|iter| iter.is_valid()) {
return Ok(());
}
if !self.first_level.valid() {
self.second_level = None;
return Ok(());
}
self.first_level.prev()?;
self.init_data_block()?;
if let Some(ref mut iter) = self.second_level {
iter.seek_to_last()?;
}
}
}
fn satisfies_lower_bound(&self, user_key: &[u8]) -> bool {
match &self.range.0 {
Bound::Included(start) => {
self.table.opts.comparator.compare(user_key, &start.user_key) != Ordering::Less
}
Bound::Excluded(start) => {
self.table.opts.comparator.compare(user_key, &start.user_key) == Ordering::Greater
}
Bound::Unbounded => true,
}
}
fn satisfies_upper_bound(&self, user_key: &[u8]) -> bool {
match &self.range.1 {
Bound::Included(end) => {
self.table.opts.comparator.compare(user_key, &end.user_key) != Ordering::Greater
}
Bound::Excluded(end) => {
self.table.opts.comparator.compare(user_key, &end.user_key) == Ordering::Less
}
Bound::Unbounded => true,
}
}
fn current_user_key(&self) -> &[u8] {
self.second_level.as_ref().unwrap().user_key()
}
pub(crate) fn seek_to_first(&mut self) -> Result<()> {
self.exhausted = false;
let lower_bound = self.range.0.clone();
match lower_bound {
Bound::Unbounded => {
self.first_level.seek_to_first()?;
self.init_data_block()?;
if let Some(ref mut iter) = self.second_level {
iter.seek_to_first()?;
}
self.advance_to_valid_entry()?;
}
Bound::Included(ref internal_key) => {
self.seek_internal(&internal_key.encode())?;
}
Bound::Excluded(ref internal_key) => {
let seek_key = InternalKey::new(
internal_key.user_key.clone(),
0, InternalKeyKind::Set,
0,
);
self.seek_internal(&seek_key.encode())?;
if self.is_valid() && self.current_user_key() == internal_key.user_key.as_slice() {
self.advance_internal()?;
}
}
}
if self.is_valid() && !self.satisfies_upper_bound(self.current_user_key()) {
self.mark_exhausted();
}
Ok(())
}
pub(crate) fn seek_to_last(&mut self) -> Result<()> {
self.exhausted = false;
let upper_bound = self.range.1.clone();
match upper_bound {
Bound::Unbounded => {
self.first_level.seek_to_last()?;
self.init_data_block()?;
if let Some(ref mut iter) = self.second_level {
iter.seek_to_last()?;
}
self.retreat_to_valid_entry()?;
}
Bound::Included(ref internal_key) => {
let bound_user_key = internal_key.user_key.clone();
let seek_key =
InternalKey::new(internal_key.user_key.clone(), 0, InternalKeyKind::Set, 0);
self.seek_internal(&seek_key.encode())?;
if !self.is_valid() {
self.position_to_absolute_last()?;
} else {
let cmp = self
.table
.opts
.comparator
.compare(self.current_user_key(), bound_user_key.as_slice());
if cmp == Ordering::Greater {
self.prev_internal()?;
}
}
}
Bound::Excluded(ref internal_key) => {
let bound_user_key = internal_key.user_key.clone();
let seek_key = InternalKey::new(
internal_key.user_key.clone(),
INTERNAL_KEY_SEQ_NUM_MAX,
InternalKeyKind::Max,
INTERNAL_KEY_TIMESTAMP_MAX,
);
self.seek_internal(&seek_key.encode())?;
if !self.is_valid() {
self.position_to_absolute_last()?;
}
if self.is_valid() {
let cmp = self
.table
.opts
.comparator
.compare(self.current_user_key(), bound_user_key.as_slice());
if cmp != Ordering::Less {
self.prev_internal()?;
}
}
}
}
if self.is_valid() && !self.satisfies_lower_bound(self.current_user_key()) {
self.mark_exhausted();
}
Ok(())
}
fn seek_internal(&mut self, target: &[u8]) -> Result<()> {
self.first_level.seek(target)?;
self.init_data_block()?;
if let Some(ref mut iter) = self.second_level {
iter.seek_internal(target)?;
}
self.advance_to_valid_entry()?;
Ok(())
}
fn position_to_absolute_last(&mut self) -> Result<()> {
self.first_level.seek_to_last()?;
self.init_data_block()?;
if let Some(ref mut iter) = self.second_level {
iter.seek_to_last()?;
}
self.retreat_to_valid_entry()?;
Ok(())
}
fn advance_internal(&mut self) -> Result<bool> {
if let Some(ref mut iter) = self.second_level {
if iter.advance()? {
return Ok(true);
}
}
self.advance_to_valid_entry()?;
Ok(self.is_valid())
}
fn prev_internal(&mut self) -> Result<bool> {
if let Some(ref mut iter) = self.second_level {
if iter.prev_internal()? {
return Ok(true);
}
}
self.retreat_to_valid_entry()?;
Ok(self.is_valid())
}
}
impl LSMIterator for TableIterator<'_> {
fn seek(&mut self, target: &[u8]) -> Result<bool> {
self.exhausted = false;
self.seek_internal(target)?;
if self.is_valid() && !self.satisfies_upper_bound(self.current_user_key()) {
self.mark_exhausted();
}
Ok(self.is_valid())
}
fn seek_first(&mut self) -> Result<bool> {
self.seek_to_first()?;
Ok(self.is_valid())
}
fn seek_last(&mut self) -> Result<bool> {
self.seek_to_last()?;
Ok(self.is_valid())
}
fn next(&mut self) -> Result<bool> {
if !self.is_valid() && !self.exhausted {
return self.seek_first();
}
if !self.is_valid() {
return Ok(false);
}
self.advance_internal()?;
if !self.is_valid() || !self.satisfies_upper_bound(self.current_user_key()) {
self.mark_exhausted();
}
Ok(self.is_valid())
}
fn prev(&mut self) -> Result<bool> {
if !self.is_valid() && !self.exhausted {
return self.seek_last();
}
if !self.is_valid() {
return Ok(false);
}
self.prev_internal()?;
if !self.is_valid() || !self.satisfies_lower_bound(self.current_user_key()) {
self.mark_exhausted();
}
Ok(self.is_valid())
}
fn valid(&self) -> bool {
self.is_valid()
}
fn key(&self) -> InternalKeyRef<'_> {
debug_assert!(self.valid());
InternalKeyRef::from_encoded(self.second_level.as_ref().unwrap().key_bytes())
}
fn value_encoded(&self) -> Result<&[u8]> {
debug_assert!(self.valid());
Ok(self.second_level.as_ref().unwrap().value_bytes())
}
}