#[cfg(feature = "zstd")]
use super::KeyedBlockHandle;
use super::{BlockOffset, DataBlock, GlobalTableId, data_block::Iter as DataBlockIter};
use crate::{
Cache, CompressionType, InternalValue, SeqNo, UserKey,
comparator::SharedComparator,
encryption::EncryptionProvider,
file_accessor::FileAccessor,
table::{
BlockHandle,
block::ParsedItem,
block_index::{BlockIndexIter, BlockIndexIterImpl},
util::load_block,
},
};
use alloc::sync::Arc;
use crate::path::PathBuf;
use self_cell::self_cell;
#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
type InnerIter<'a> = DataBlockIter<'a>;
#[cfg(feature = "zstd")]
fn partial_decode_enabled() -> bool {
use std::sync::OnceLock;
static ENABLED: OnceLock<bool> = OnceLock::new();
*ENABLED.get_or_init(|| {
std::env::var("LSM_PARTIAL_DECODE")
.ok()
.is_some_and(|v| matches!(v.trim(), "1" | "on" | "true" | "yes"))
})
}
#[cfg(feature = "zstd")]
const PARTIAL_MIN_BLOCK_BYTES: usize = 256 * 1024;
#[cfg(feature = "zstd")]
const PARTIAL_PROMOTE_HITS: u32 = 4;
#[cfg(feature = "zstd")]
const PARTIAL_PROMOTE_FRACTION_PCT: u32 = 75;
#[cfg(feature = "zstd")]
fn promote_by_fraction(covered: u32, total: u32) -> bool {
total > 0
&& u64::from(covered) * 100 >= u64::from(total) * u64::from(PARTIAL_PROMOTE_FRACTION_PCT)
}
pub enum Bound {
Included(UserKey),
Excluded(UserKey),
}
type Bounds = (Option<Bound>, Option<Bound>);
self_cell!(
pub struct OwnedDataBlockIter {
owner: DataBlock,
#[covariant]
dependent: InnerIter,
}
);
impl OwnedDataBlockIter {
fn seek_lower_inclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
self.with_dependent_mut(|_, m| m.seek(needle, seqno))
}
fn seek_upper_inclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
self.with_dependent_mut(|_, m| m.seek_upper(needle, seqno))
}
fn seek_lower_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
self.with_dependent_mut(|_, m| m.seek_exclusive(needle, seqno))
}
fn seek_upper_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool {
self.with_dependent_mut(|_, m| m.seek_upper_exclusive(needle, seqno))
}
pub fn seek_lower_bound(&mut self, bound: &Bound, seqno: SeqNo) -> bool {
match bound {
Bound::Included(key) => self.seek_lower_inclusive(key, seqno),
Bound::Excluded(key) => self.seek_lower_exclusive(key, seqno),
}
}
pub fn seek_upper_bound(&mut self, bound: &Bound, seqno: SeqNo) -> bool {
match bound {
Bound::Included(key) => self.seek_upper_inclusive(key, seqno),
Bound::Excluded(key) => self.seek_upper_exclusive(key, seqno),
}
}
}
impl Iterator for OwnedDataBlockIter {
type Item = InternalValue;
fn next(&mut self) -> Option<Self::Item> {
self.with_dependent_mut(|block, iter| {
iter.next().map(|item| item.materialize(&block.inner.data))
})
}
}
impl DoubleEndedIterator for OwnedDataBlockIter {
fn next_back(&mut self) -> Option<Self::Item> {
self.with_dependent_mut(|block, iter| {
iter.next_back()
.map(|item| item.materialize(&block.inner.data))
})
}
}
fn create_data_block_reader(
block: DataBlock,
comparator: SharedComparator,
) -> crate::Result<OwnedDataBlockIter> {
OwnedDataBlockIter::try_new(block, |b| b.try_iter(comparator))
}
pub struct Iter {
table_id: GlobalTableId,
path: Arc<PathBuf>,
global_seqno: SeqNo,
#[expect(clippy::struct_field_names)]
index_iter: BlockIndexIterImpl,
file_accessor: FileAccessor,
cache: Arc<Cache>,
compression: CompressionType,
encryption: Option<Arc<dyn EncryptionProvider>>,
ecc: Option<crate::table::block::EccParams>,
heal_hints: Option<Arc<crate::heal_hints::HealHints>>,
has_kv_footer: bool,
#[cfg(zstd_any)]
zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
comparator: SharedComparator,
#[cfg(feature = "zstd")]
block_layout: crate::table::block_layout::BlockLayoutMap,
#[cfg(feature = "zstd")]
data_block_restart_interval: u8,
index_initialized: bool,
lo_offset: BlockOffset,
lo_data_block: Option<OwnedDataBlockIter>,
hi_offset: BlockOffset,
hi_data_block: Option<OwnedDataBlockIter>,
range: Bounds,
poisoned: bool,
#[cfg(feature = "metrics")]
metrics: Arc<Metrics>,
}
impl Iter {
#[expect(
clippy::too_many_arguments,
reason = "encryption, comparator and metrics add extra parameters to the constructor"
)]
pub fn new(
table_id: GlobalTableId,
global_seqno: SeqNo,
path: Arc<PathBuf>,
index_iter: BlockIndexIterImpl,
file_accessor: FileAccessor,
cache: Arc<Cache>,
compression: CompressionType,
encryption: Option<Arc<dyn EncryptionProvider>>,
ecc: Option<crate::table::block::EccParams>,
heal_hints: Option<Arc<crate::heal_hints::HealHints>>,
has_kv_footer: bool,
#[cfg(zstd_any)] zstd_dictionary: Option<Arc<crate::compression::ZstdDictionary>>,
comparator: SharedComparator,
#[cfg(feature = "zstd")] block_layout: crate::table::block_layout::BlockLayoutMap,
#[cfg(feature = "zstd")] data_block_restart_interval: u8,
#[cfg(feature = "metrics")] metrics: Arc<Metrics>,
) -> Self {
Self {
table_id,
path,
global_seqno,
index_iter,
file_accessor,
cache,
compression,
encryption,
ecc,
heal_hints,
has_kv_footer,
#[cfg(zstd_any)]
zstd_dictionary,
comparator,
#[cfg(feature = "zstd")]
block_layout,
#[cfg(feature = "zstd")]
data_block_restart_interval,
index_initialized: false,
lo_offset: BlockOffset(0),
lo_data_block: None,
hi_offset: BlockOffset(u64::MAX),
hi_data_block: None,
range: (None, None),
poisoned: false,
#[cfg(feature = "metrics")]
metrics,
}
}
pub fn set_lower_bound(&mut self, bound: Bound) {
self.range.0 = Some(bound);
}
pub fn set_upper_bound(&mut self, bound: Bound) {
self.range.1 = Some(bound);
}
#[cfg(feature = "zstd")]
fn try_partial_block(&self, handle: &KeyedBlockHandle) -> crate::Result<Option<DataBlock>> {
if !partial_decode_enabled()
|| !matches!(self.compression, CompressionType::Zstd(_))
|| self.encryption.is_some()
{
return Ok(None);
}
let Some(Bound::Included(upper) | Bound::Excluded(upper)) = &self.range.1 else {
return Ok(None);
};
if self.comparator.compare(upper, handle.end_key()) == std::cmp::Ordering::Greater {
return Ok(None);
}
let Some(ends) = self.block_layout.ends_for(*handle.offset()) else {
return Ok(None);
};
let offset = BlockOffset(*handle.offset());
if self.cache.has_block(self.table_id, offset) {
return Ok(None);
}
let total_bytes = ends.last().copied().unwrap_or(0) as usize;
if total_bytes < PARTIAL_MIN_BLOCK_BYTES {
return Ok(None);
}
#[expect(
clippy::cast_possible_truncation,
reason = "inner-block count is bounded well within u32"
)]
let total_blocks = ends.len() as u32;
let ends = ends.to_vec();
let (carried_hits, carried_resume) =
match self.cache.peek_partial_block(self.table_id, offset) {
Some(entry) => {
let covered = self.comparator.compare(&entry.covered_upper, upper)
!= std::cmp::Ordering::Less;
if covered {
let hits = entry.hits + 1;
if hits >= PARTIAL_PROMOTE_HITS {
self.cache.evict_partial_block(self.table_id, offset);
return Ok(None);
}
let block = crate::table::lazy_block::synthesize_data_block(
&entry.resume.window_prime,
self.data_block_restart_interval,
)?;
self.cache.insert_partial_block(
self.table_id,
offset,
crate::cache::PartialBlockEntry { hits, ..entry },
);
return Ok(Some(block));
}
if promote_by_fraction(entry.resume.decoded_blocks, total_blocks) {
self.cache.evict_partial_block(self.table_id, offset);
return Ok(None);
}
(entry.hits, Some(entry.resume))
}
None => (0, None),
};
let (fd, _cache_event) = self
.file_accessor
.get_or_open_table(&self.table_id, &self.path)?;
let transform = crate::table::block::BlockTransform::from_parts(
self.compression,
None,
#[cfg(zstd_any)]
None,
)?;
let transform = match self.ecc {
Some(ecc) => transform.with_ecc(ecc),
None => transform,
};
let block_handle = BlockHandle::new(handle.offset(), handle.size());
let (_header, frame, recovery) =
crate::table::block::Block::read_data_frame(fd.as_ref(), block_handle, &transform)?;
if let Some(kind) = recovery {
#[cfg(feature = "metrics")]
self.metrics.record_ecc_recovery(kind);
#[cfg(not(feature = "metrics"))]
let _ = kind;
crate::table::util::maybe_record_persistent_heal(
self.table_id,
&self.path,
&self.file_accessor,
&block_handle,
crate::table::block::BlockType::Data,
self.compression,
None,
self.ecc,
#[cfg(zstd_any)]
self.zstd_dictionary.as_deref(),
self.heal_hints.as_ref().map(AsRef::as_ref),
#[cfg(feature = "metrics")]
&self.metrics,
);
}
let (block, covered_upper, payload) = crate::table::lazy_block::partial_data_block(
frame.to_vec(),
ends,
self.data_block_restart_interval,
&self.comparator,
upper,
carried_resume,
)?;
if promote_by_fraction(payload.decoded_blocks, total_blocks) {
self.cache.evict_partial_block(self.table_id, offset);
return Ok(None);
}
if let Some(covered) = covered_upper {
self.cache.insert_partial_block(
self.table_id,
offset,
crate::cache::PartialBlockEntry {
resume: payload,
covered_upper: covered,
total_blocks,
hits: carried_hits,
},
);
}
Ok(Some(block))
}
}
impl Iter {
#[expect(
clippy::unnecessary_wraps,
reason = "matches Iterator::next return type"
)]
fn poison<E: Into<crate::Error>>(&mut self, err: E) -> Option<crate::Result<InternalValue>> {
self.poisoned = true;
Some(Err(err.into()))
}
}
impl Iterator for Iter {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
if self.poisoned {
return None;
}
if let Some(block) = &mut self.lo_data_block
&& let Some(item) = block
.next()
.map(|mut v| {
v.key.seqno += self.global_seqno;
v
})
.map(Ok)
{
return Some(item);
}
if !self.index_initialized {
let mut ok = if let Some(bound) = &self.range.0 {
let key = match bound {
Bound::Included(k) | Bound::Excluded(k) => k,
};
self.index_iter.seek_lower(key, u64::MAX)
} else {
true
};
if ok && let Some(bound) = &self.range.1 {
let key = match bound {
Bound::Included(k) | Bound::Excluded(k) => k,
};
ok = self.index_iter.seek_upper(key, u64::MAX);
}
self.index_initialized = true;
if !ok {
self.lo_data_block = None;
self.hi_data_block = None;
return None;
}
}
loop {
let Some(handle) = self.index_iter.next() else {
if let Some(block) = &mut self.hi_data_block
&& let Some(item) = block
.next()
.map(|mut v| {
v.key.seqno += self.global_seqno;
v
})
.map(Ok)
{
return Some(item);
}
self.lo_data_block = None;
self.hi_data_block = None;
return None;
};
let handle = match handle {
Ok(h) => h,
Err(e) => return self.poison(e),
};
#[cfg(feature = "zstd")]
let partial = match self.try_partial_block(&handle) {
Ok(p) => p,
Err(e) => return self.poison(e),
};
#[cfg(not(feature = "zstd"))]
let partial: Option<DataBlock> = None;
let block = if let Some(db) = partial {
db
} else {
let raw = match load_block(
self.table_id,
&self.path,
&self.file_accessor,
&self.cache,
&BlockHandle::new(handle.offset(), handle.size()),
crate::table::block::BlockType::Data,
self.compression,
self.encryption.as_deref(),
self.ecc,
#[cfg(zstd_any)]
self.zstd_dictionary.as_deref(),
self.heal_hints.as_ref().map(AsRef::as_ref),
#[cfg(feature = "metrics")]
&self.metrics,
) {
Ok(b) => b,
Err(e) => return self.poison(e),
};
match DataBlock::from_loaded(raw, self.has_kv_footer) {
Ok(b) => b,
Err(e) => return self.poison(e),
}
};
let mut reader = match create_data_block_reader(block, self.comparator.clone()) {
Ok(r) => r,
Err(e) => return self.poison(e),
};
if let Some(bound) = &self.range.0 {
reader.seek_lower_bound(bound, SeqNo::MAX);
}
if let Some(bound) = &self.range.1 {
reader.seek_upper_bound(bound, SeqNo::MAX);
}
let item = reader.next();
self.lo_offset = handle.offset();
self.lo_data_block = Some(reader);
if let Some(mut item) = item {
item.key.seqno += self.global_seqno;
return Some(Ok(item));
}
}
}
}
impl DoubleEndedIterator for Iter {
fn next_back(&mut self) -> Option<Self::Item> {
if self.poisoned {
return None;
}
if let Some(block) = &mut self.hi_data_block
&& let Some(item) = block
.next_back()
.map(|mut v| {
v.key.seqno += self.global_seqno;
v
})
.map(Ok)
{
return Some(item);
}
if !self.index_initialized {
let mut ok = if let Some(bound) = &self.range.0 {
let key = match bound {
Bound::Included(k) | Bound::Excluded(k) => k,
};
self.index_iter.seek_lower(key, u64::MAX)
} else {
true
};
if ok && let Some(bound) = &self.range.1 {
let key = match bound {
Bound::Included(k) | Bound::Excluded(k) => k,
};
ok = self.index_iter.seek_upper(key, u64::MAX);
}
self.index_initialized = true;
if !ok {
self.lo_data_block = None;
self.hi_data_block = None;
return None;
}
}
loop {
let Some(handle) = self.index_iter.next_back() else {
if let Some(block) = &mut self.lo_data_block
&& let Some(item) = block
.next_back()
.map(|mut v| {
v.key.seqno += self.global_seqno;
v
})
.map(Ok)
{
return Some(item);
}
self.lo_data_block = None;
self.hi_data_block = None;
return None;
};
let handle = match handle {
Ok(h) => h,
Err(e) => return self.poison(e),
};
#[cfg(feature = "zstd")]
let partial = match self.try_partial_block(&handle) {
Ok(p) => p,
Err(e) => return self.poison(e),
};
#[cfg(not(feature = "zstd"))]
let partial: Option<DataBlock> = None;
let block = if let Some(db) = partial {
db
} else {
let raw = match load_block(
self.table_id,
&self.path,
&self.file_accessor,
&self.cache,
&BlockHandle::new(handle.offset(), handle.size()),
crate::table::block::BlockType::Data,
self.compression,
self.encryption.as_deref(),
self.ecc,
#[cfg(zstd_any)]
self.zstd_dictionary.as_deref(),
self.heal_hints.as_ref().map(AsRef::as_ref),
#[cfg(feature = "metrics")]
&self.metrics,
) {
Ok(b) => b,
Err(e) => return self.poison(e),
};
match DataBlock::from_loaded(raw, self.has_kv_footer) {
Ok(b) => b,
Err(e) => return self.poison(e),
}
};
let mut reader = match create_data_block_reader(block, self.comparator.clone()) {
Ok(r) => r,
Err(e) => return self.poison(e),
};
if let Some(bound) = &self.range.1 {
reader.seek_upper_bound(bound, SeqNo::MAX);
}
if let Some(bound) = &self.range.0 {
reader.seek_lower_bound(bound, SeqNo::MAX);
}
let item = reader.next_back();
self.hi_offset = handle.offset();
self.hi_data_block = Some(reader);
if let Some(mut item) = item {
item.key.seqno += self.global_seqno;
return Some(Ok(item));
}
}
}
}
#[cfg(all(test, feature = "zstd"))]
mod promote_tests {
use super::promote_by_fraction;
#[test]
fn promote_by_fraction_triggers_at_or_above_threshold() {
assert!(promote_by_fraction(3, 4));
assert!(promote_by_fraction(6, 8));
assert!(promote_by_fraction(10, 10));
}
#[test]
fn promote_by_fraction_holds_below_threshold() {
assert!(!promote_by_fraction(2, 4));
assert!(!promote_by_fraction(1, 8));
assert!(!promote_by_fraction(5, 8));
}
#[test]
fn promote_by_fraction_zero_total_never_promotes() {
assert!(!promote_by_fraction(0, 0));
assert!(!promote_by_fraction(5, 0));
}
}