use crate::{
BoxedIterator, InternalValue,
comparator::SharedComparator,
key::InternalKey,
memtable::Memtable,
merge_operator::MergeOperator,
merge_source::{CoherentIterSource, CoherentMergeSource, IterItem, MergeSource},
mvcc_stream::MvccStream,
range_tombstone::RangeTombstone,
range_tombstone_filter::RangeTombstoneFilter,
reseek::{ReseekCtx, Reseekable},
run_reader::RunReader,
seeking_merger::SeekingMerger,
value::{SeqNo, UserKey},
version::{Run, SuperVersion},
};
use alloc::sync::Arc;
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, vec::Vec};
use core::ops::{Bound, RangeBounds};
use self_cell::self_cell;
#[must_use]
pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool {
item_seqno < seqno
}
fn build_seeking<'a>(
iters: Vec<BoxedIterator<'a>>,
comparator: SharedComparator,
) -> SeekingMerger<CoherentIterSource<BoxedIterator<'a>>, SharedComparator> {
let sources: Vec<CoherentIterSource<BoxedIterator<'a>>> =
iters.into_iter().map(CoherentIterSource::new).collect();
SeekingMerger::new(sources, comparator)
}
pub(crate) fn prefix_upper_range(prefix: &[u8]) -> Bound<UserKey> {
use core::ops::Bound::{Excluded, Unbounded};
assert!(!prefix.is_empty(), "prefix may not be empty");
let mut end = prefix.to_vec();
let len = end.len();
for (idx, byte) in end.iter_mut().rev().enumerate() {
let idx = len - 1 - idx;
if *byte < 255 {
*byte += 1;
end.truncate(idx + 1);
return Excluded(end.into());
}
}
Unbounded
}
#[must_use]
#[expect(clippy::module_name_repetitions)]
pub fn prefix_to_range(prefix: &[u8]) -> (Bound<UserKey>, Bound<UserKey>) {
use core::ops::Bound::{Included, Unbounded};
if prefix.is_empty() {
return (Unbounded, Unbounded);
}
(Included(prefix.into()), prefix_upper_range(prefix))
}
#[derive(Clone)]
pub struct IterState {
pub(crate) version: SuperVersion,
pub(crate) ephemeral: Option<(Arc<Memtable>, SeqNo)>,
pub(crate) merge_operator: Option<Arc<dyn MergeOperator>>,
pub(crate) comparator: crate::comparator::SharedComparator,
pub(crate) prefix_hash: Option<u64>,
pub(crate) key_hash: Option<u64>,
pub(crate) bloom_key: Option<UserKey>,
#[cfg(feature = "metrics")]
pub(crate) metrics: Option<Arc<crate::Metrics>>,
}
type BoxedMerge<'a> = Box<dyn DoubleEndedIterator<Item = crate::Result<InternalValue>> + Send + 'a>;
self_cell!(
pub struct TreeIter {
owner: IterState,
#[covariant]
dependent: BoxedMerge,
}
);
impl Iterator for TreeIter {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
self.with_dependent_mut(|_, iter| iter.next())
}
}
impl DoubleEndedIterator for TreeIter {
fn next_back(&mut self) -> Option<Self::Item> {
self.with_dependent_mut(|_, iter| iter.next_back())
}
}
fn range_tombstone_overlaps_bounds(
rt: &RangeTombstone,
bounds: &(Bound<UserKey>, Bound<UserKey>),
comparator: &dyn crate::comparator::UserComparator,
) -> bool {
let overlaps_lo = match &bounds.0 {
Bound::Included(key) | Bound::Excluded(key) => {
comparator.compare(&rt.end, key) == core::cmp::Ordering::Greater
}
Bound::Unbounded => true,
};
let overlaps_hi = match &bounds.1 {
Bound::Included(key) => comparator.compare(&rt.start, key) != core::cmp::Ordering::Greater,
Bound::Excluded(key) => comparator.compare(&rt.start, key) == core::cmp::Ordering::Less,
Bound::Unbounded => true,
};
overlaps_lo && overlaps_hi
}
fn bloom_passes(state: &IterState, table: &crate::table::Table) -> bool {
if let Some(prefix_hash) = state.prefix_hash {
match table.maybe_contains_prefix(prefix_hash) {
Ok(false) => {
#[cfg(feature = "metrics")]
if let Some(m) = &state.metrics {
m.prefix_bloom_skips
.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
}
return false;
}
Err(e) => {
log::debug!("prefix bloom check failed for table {:?}: {e}", table.id(),);
}
_ => {}
}
}
debug_assert!(
state.bloom_key.is_none() || state.key_hash.is_some(),
"bloom_key requires key_hash to be set"
);
if let Some(key_hash) = state.key_hash {
let result = if let Some(bloom_key) = &state.bloom_key {
table.bloom_may_contain_key(bloom_key, key_hash)
} else {
table.bloom_may_contain_key_hash(key_hash)
};
match result {
Ok(false) => return false,
Err(e) => {
log::debug!("key bloom check failed for table {:?}: {e}", table.id(),);
}
_ => {}
}
}
true
}
impl TreeIter {
#[expect(
clippy::too_many_lines,
reason = "mirrors create_range structure for the point-read fast path; splitting would reduce clarity"
)]
#[must_use]
pub fn create_range_point(guard: IterState, key: &[u8], seqno: SeqNo) -> Self {
let key_slice = UserKey::from(key);
Self::new(guard, |lock| {
let user_range = (
Bound::Included(key_slice.clone()),
Bound::Included(key_slice.clone()),
);
let range = (
Bound::Included(InternalKey::new(
key_slice.as_ref(),
SeqNo::MAX,
crate::ValueType::Tombstone,
)),
Bound::Included(InternalKey::new(
key_slice.as_ref(),
0,
crate::ValueType::Value,
)),
);
let mut iters: Vec<BoxedIterator<'_>> = Vec::new();
let mut range_tombstones: Vec<(RangeTombstone, SeqNo)> = Vec::new();
let bounds = (
user_range.0.as_ref().map(core::convert::AsRef::as_ref),
user_range.1.as_ref().map(core::convert::AsRef::as_ref),
);
for run in lock
.version
.version
.iter_levels()
.flat_map(|lvl| lvl.iter())
{
for table in run.iter() {
if !table.check_key_range_overlap_cmp(&bounds, lock.comparator.as_ref()) {
continue;
}
range_tombstones.extend(
table
.range_tombstones()
.iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt.clone(), seqno)),
);
}
match run.len() {
0 => {}
1 => {
#[expect(clippy::expect_used, reason = "we checked for length")]
let table = run.first().expect("should exist");
if table.check_key_range_overlap_cmp(&bounds, lock.comparator.as_ref())
&& bloom_passes(lock, table)
{
let reader =
table
.range(user_range.clone())
.filter(move |item| match item {
Ok(item) => seqno_filter(item.key.seqno, seqno),
Err(_) => true,
});
iters.push(Box::new(reader));
}
}
_ => {
let surviving: Vec<_> = run
.iter()
.filter(|table| {
table.check_key_range_overlap_cmp(&bounds, lock.comparator.as_ref())
&& bloom_passes(lock, table)
})
.cloned()
.collect();
match surviving.len() {
0 => {}
1 => {
if let Some(table) = surviving.into_iter().next() {
let reader =
table.range(user_range.clone()).filter(move |item| {
match item {
Ok(item) => seqno_filter(item.key.seqno, seqno),
Err(_) => true,
}
});
iters.push(Box::new(reader));
}
}
_ => {
#[expect(
clippy::expect_used,
reason = "Run::new returns None only for empty vecs"
)]
let new_run =
Run::new(surviving).expect("non-empty surviving tables");
if let Some(reader) = RunReader::new_cmp(
Arc::new(new_run),
user_range.clone(),
lock.comparator.as_ref(),
) {
iters.push(Box::new(reader.filter(move |item| match item {
Ok(item) => seqno_filter(item.key.seqno, seqno),
Err(_) => true,
})));
}
}
}
}
}
}
for memtable in lock.version.sealed_memtables.iter() {
range_tombstones.extend(
memtable
.range_tombstones_sorted()
.into_iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt, seqno)),
);
let iter = memtable.range_internal(range.clone());
iters.push(Box::new(
iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
.map(Ok),
));
}
{
range_tombstones.extend(
lock.version
.active_memtable
.range_tombstones_sorted()
.into_iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt, seqno)),
);
let iter = lock.version.active_memtable.range_internal(range);
iters.push(Box::new(
iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
.map(Ok),
));
}
let merged = build_seeking(iters, lock.comparator.clone());
let iter = MvccStream::new_with_comparator(
merged,
lock.merge_operator.clone(),
lock.comparator.clone(),
)
.with_range_tombstones(range_tombstones.clone());
Box::new(iter.filter(move |x| match x {
Ok(value) => {
if value.key.is_tombstone() {
return false;
}
!range_tombstones.iter().any(|(rt, cutoff)| {
rt.should_suppress_with(
&value.key.user_key,
value.key.seqno,
*cutoff,
lock.comparator.as_ref(),
)
})
}
Err(_) => true,
}))
})
}
#[expect(
clippy::too_many_lines,
reason = "create_range wires up multiple iterator sources, filters, and tombstone handling; splitting further would reduce clarity"
)]
pub fn create_range<K: AsRef<[u8]>, R: RangeBounds<K>>(
guard: IterState,
range: R,
seqno: SeqNo,
) -> Self {
Self::new(guard, |lock| {
let user_range = (
match range.start_bound() {
Bound::Included(key) => Bound::Included(UserKey::from(key.as_ref())),
Bound::Excluded(key) => Bound::Excluded(UserKey::from(key.as_ref())),
Bound::Unbounded => Bound::Unbounded,
},
match range.end_bound() {
Bound::Included(key) => Bound::Included(UserKey::from(key.as_ref())),
Bound::Excluded(key) => Bound::Excluded(UserKey::from(key.as_ref())),
Bound::Unbounded => Bound::Unbounded,
},
);
let range = (
match &user_range.0 {
Bound::Included(key) => Bound::Included(InternalKey::new(
key.as_ref(),
SeqNo::MAX,
crate::ValueType::Tombstone,
)),
Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
key.as_ref(),
0,
crate::ValueType::Tombstone,
)),
Bound::Unbounded => Bound::Unbounded,
},
match &user_range.1 {
Bound::Included(key) => {
Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value))
}
Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
key.as_ref(),
SeqNo::MAX,
crate::ValueType::Value,
)),
Bound::Unbounded => Bound::Unbounded,
},
);
let mut iters: Vec<BoxedIterator<'_>> = Vec::with_capacity(5);
let mut all_range_tombstones: Vec<(RangeTombstone, SeqNo)> = Vec::new();
let mut single_tables = Vec::new();
let mut multi_runs = Vec::new();
for run in lock
.version
.version
.iter_levels()
.flat_map(|lvl| lvl.iter())
{
match run.len() {
0 => {
}
1 => {
#[expect(clippy::expect_used, reason = "we checked for length")]
let table = run.first().expect("should exist");
all_range_tombstones.extend(
table
.range_tombstones()
.iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt.clone(), seqno)),
);
if table.check_key_range_overlap_cmp(
&(
user_range.0.as_ref().map(core::convert::AsRef::as_ref),
user_range.1.as_ref().map(core::convert::AsRef::as_ref),
),
lock.comparator.as_ref(),
) && bloom_passes(lock, table)
{
single_tables.push(table.clone());
}
}
_ => {
for table in run.iter() {
all_range_tombstones.extend(
table
.range_tombstones()
.iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt.clone(), seqno)),
);
}
if lock.prefix_hash.is_some() || lock.key_hash.is_some() {
let bounds = (
user_range.0.as_ref().map(core::convert::AsRef::as_ref),
user_range.1.as_ref().map(core::convert::AsRef::as_ref),
);
let surviving: Vec<_> = run
.iter()
.filter(|table| {
if !table.check_key_range_overlap_cmp(
&bounds,
lock.comparator.as_ref(),
) {
return false;
}
bloom_passes(lock, table)
})
.cloned()
.collect();
match surviving.len() {
0 => {
}
1 => {
if let Some(table) = surviving.into_iter().next() {
single_tables.push(table);
}
}
_ => {
#[expect(
clippy::expect_used,
reason = "Run::new returns None only for empty vecs"
)]
let new_run =
Run::new(surviving).expect("non-empty surviving tables");
multi_runs.push(Arc::new(new_run));
}
}
} else {
multi_runs.push(run.clone());
}
}
}
}
all_range_tombstones
.sort_unstable_by(|(a, _), (b, _)| lock.comparator.compare(&a.start, &b.start));
for table in single_tables {
let table_min: &[u8] = table.metadata.key_range.min().as_ref();
let table_max: &[u8] = table.metadata.key_range.max().as_ref();
let table_kv_seqno = table.get_highest_kv_seqno();
let candidate_end = all_range_tombstones.partition_point(|(rt, _)| {
lock.comparator.compare(&rt.start, table_min) != core::cmp::Ordering::Greater
});
let is_covered =
all_range_tombstones
.iter()
.take(candidate_end)
.any(|(rt, cutoff)| {
rt.visible_at(*cutoff)
&& rt.fully_covers_with(
table_min,
table_max,
lock.comparator.as_ref(),
)
&& rt.seqno > table_kv_seqno
});
if !is_covered {
let reader = table
.range(user_range.clone())
.filter(move |item| match item {
Ok(item) => seqno_filter(item.key.seqno, seqno),
Err(_) => true,
});
iters.push(Box::new(reader));
}
}
for run in multi_runs {
if let Some(reader) =
RunReader::new_cmp(run, user_range.clone(), lock.comparator.as_ref())
{
iters.push(Box::new(reader.filter(move |item| match item {
Ok(item) => seqno_filter(item.key.seqno, seqno),
Err(_) => true,
})));
}
}
for memtable in lock.version.sealed_memtables.iter() {
all_range_tombstones.extend(
memtable
.range_tombstones_sorted()
.into_iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt, seqno)),
);
let iter = memtable.range_internal(range.clone());
iters.push(Box::new(
iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
.map(Ok),
));
}
{
all_range_tombstones.extend(
lock.version
.active_memtable
.range_tombstones_sorted()
.into_iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt, seqno)),
);
let iter = lock.version.active_memtable.range_internal(range.clone());
iters.push(Box::new(
iter.filter(move |item| seqno_filter(item.key.seqno, seqno))
.map(Ok),
));
}
if let Some((mt, eph_seqno)) = &lock.ephemeral {
all_range_tombstones.extend(
mt.range_tombstones_sorted()
.into_iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&user_range,
lock.comparator.as_ref(),
)
})
.map(|rt| (rt, *eph_seqno)),
);
let iter = Box::new(
mt.range_internal(range)
.filter(move |item| seqno_filter(item.key.seqno, *eph_seqno))
.map(Ok),
);
iters.push(iter);
}
let merged = build_seeking(iters, lock.comparator.clone());
let iter = MvccStream::new_with_comparator(
merged,
lock.merge_operator.clone(),
lock.comparator.clone(),
)
.with_range_tombstones(all_range_tombstones.clone());
let iter = iter.filter(|x| match x {
Ok(value) => !value.key.is_tombstone(),
Err(_) => true,
});
all_range_tombstones
.sort_by(|a, b| a.0.cmp_with_comparator(&b.0, lock.comparator.as_ref()));
all_range_tombstones.dedup_by(|a, b| {
if a.0 == b.0 {
b.1 = b.1.max(a.1);
true
} else {
false
}
});
if all_range_tombstones
.iter()
.all(|(rt, cutoff)| !rt.visible_at(*cutoff))
{
Box::new(iter)
} else {
Box::new(RangeTombstoneFilter::new_with_comparator(
iter,
all_range_tombstones,
lock.comparator.clone(),
))
}
})
}
}
type UserBounds = (Bound<UserKey>, Bound<UserKey>);
fn user_to_internal_bounds(user: &UserBounds) -> (Bound<InternalKey>, Bound<InternalKey>) {
let lo = match &user.0 {
Bound::Included(key) => Bound::Included(InternalKey::new(
key.clone(),
SeqNo::MAX,
crate::ValueType::Tombstone,
)),
Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
key.clone(),
0,
crate::ValueType::Tombstone,
)),
Bound::Unbounded => Bound::Unbounded,
};
let hi = match &user.1 {
Bound::Included(key) => {
Bound::Included(InternalKey::new(key.clone(), 0, crate::ValueType::Value))
}
Bound::Excluded(key) => Bound::Excluded(InternalKey::new(
key.clone(),
SeqNo::MAX,
crate::ValueType::Value,
)),
Bound::Unbounded => Bound::Unbounded,
};
(lo, hi)
}
#[derive(Clone)]
struct CollectedSources {
single_tables: Vec<crate::table::Table>,
multi_runs: Vec<Arc<Run<crate::table::Table>>>,
range_tombstones: Vec<(RangeTombstone, SeqNo)>,
union: UserBounds,
}
fn collect_sources(state: &IterState, union: UserBounds, seqno: SeqNo) -> CollectedSources {
let mut single_tables: Vec<crate::table::Table> = Vec::new();
let mut multi_runs: Vec<Arc<Run<crate::table::Table>>> = Vec::new();
let mut rts: Vec<(RangeTombstone, SeqNo)> = Vec::new();
let bounds_ref = (
union.0.as_ref().map(core::convert::AsRef::as_ref),
union.1.as_ref().map(core::convert::AsRef::as_ref),
);
for run in state
.version
.version
.iter_levels()
.flat_map(|lvl| lvl.iter())
{
match run.len() {
0 => {}
1 => {
#[expect(clippy::expect_used, reason = "len checked")]
let table = run.first().expect("len == 1");
rts.extend(
table
.range_tombstones()
.iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(rt, &union, state.comparator.as_ref())
})
.map(|rt| (rt.clone(), seqno)),
);
if table.check_key_range_overlap_cmp(&bounds_ref, state.comparator.as_ref())
&& bloom_passes(state, table)
{
single_tables.push(table.clone());
}
}
_ => {
for table in run.iter() {
rts.extend(
table
.range_tombstones()
.iter()
.filter(|rt| {
range_tombstone_overlaps_bounds(
rt,
&union,
state.comparator.as_ref(),
)
})
.map(|rt| (rt.clone(), seqno)),
);
}
multi_runs.push(run.clone());
}
}
}
let mut collect_mt_rts = |iter: alloc::vec::Vec<RangeTombstone>, cutoff: SeqNo| {
rts.extend(
iter.into_iter()
.filter(|rt| range_tombstone_overlaps_bounds(rt, &union, state.comparator.as_ref()))
.map(|rt| (rt, cutoff)),
);
};
for memtable in state.version.sealed_memtables.iter() {
collect_mt_rts(memtable.range_tombstones_sorted(), seqno);
}
collect_mt_rts(
state.version.active_memtable.range_tombstones_sorted(),
seqno,
);
if let Some((mt, eph_seqno)) = &state.ephemeral {
collect_mt_rts(mt.range_tombstones_sorted(), *eph_seqno);
}
rts.sort_by(|a, b| a.0.cmp_with_comparator(&b.0, state.comparator.as_ref()));
rts.dedup_by(|a, b| {
if a.0 == b.0 {
b.1 = b.1.max(a.1);
true
} else {
false
}
});
CollectedSources {
single_tables,
multi_runs,
range_tombstones: rts,
union,
}
}
struct TombstoneSkip<I> {
inner: I,
}
impl<I: Iterator<Item = crate::Result<InternalValue>>> Iterator for TombstoneSkip<I> {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.inner.next()? {
Ok(value) if value.key.is_tombstone() => {}
other => return Some(other),
}
}
}
}
impl<I: DoubleEndedIterator<Item = crate::Result<InternalValue>>> DoubleEndedIterator
for TombstoneSkip<I>
{
fn next_back(&mut self) -> Option<Self::Item> {
loop {
match self.inner.next_back()? {
Ok(value) if value.key.is_tombstone() => {}
other => return Some(other),
}
}
}
}
impl<I: Reseekable> Reseekable for TombstoneSkip<I> {
fn reseek(&mut self, ctx: &ReseekCtx) {
self.inner.reseek(ctx);
}
}
struct TableLeaf {
table: crate::table::Table,
iter: crate::table::iter::Iter,
seqno: SeqNo,
}
impl TableLeaf {
fn new(table: crate::table::Table, user_range: UserBounds, seqno: SeqNo) -> Self {
let iter = table.range_iter(user_range);
Self { table, iter, seqno }
}
fn next_filtered(&mut self) -> Option<IterItem> {
loop {
match self.iter.next()? {
Ok(value) if seqno_filter(value.key.seqno, self.seqno) => return Some(Ok(value)),
Ok(_) => {}
Err(e) => return Some(Err(e)),
}
}
}
fn next_back_filtered(&mut self) -> Option<IterItem> {
loop {
match self.iter.next_back()? {
Ok(value) if seqno_filter(value.key.seqno, self.seqno) => return Some(Ok(value)),
Ok(_) => {}
Err(e) => return Some(Err(e)),
}
}
}
}
struct RunLeaf {
reader: RunReader,
comparator: SharedComparator,
seqno: SeqNo,
}
impl RunLeaf {
fn new(
run: Arc<Run<crate::table::Table>>,
user_range: UserBounds,
seqno: SeqNo,
comparator: SharedComparator,
) -> Option<Self> {
let reader = RunReader::new_cmp(run, user_range, comparator.as_ref())?;
Some(Self {
reader,
comparator,
seqno,
})
}
fn next_filtered(&mut self) -> Option<IterItem> {
loop {
match self.reader.next()? {
Ok(value) if seqno_filter(value.key.seqno, self.seqno) => return Some(Ok(value)),
Ok(_) => {}
Err(e) => return Some(Err(e)),
}
}
}
fn next_back_filtered(&mut self) -> Option<IterItem> {
loop {
match self.reader.next_back()? {
Ok(value) if seqno_filter(value.key.seqno, self.seqno) => return Some(Ok(value)),
Ok(_) => {}
Err(e) => return Some(Err(e)),
}
}
}
}
struct MemtableLeaf<'a> {
mt: &'a Memtable,
range: crate::memtable::skiplist::Range<'a>,
seqno: SeqNo,
}
impl<'a> MemtableLeaf<'a> {
fn new(
mt: &'a Memtable,
internal: (Bound<InternalKey>, Bound<InternalKey>),
seqno: SeqNo,
) -> Self {
let range = mt.items.range(internal);
Self { mt, range, seqno }
}
fn next_filtered(&mut self) -> Option<IterItem> {
loop {
let entry = self.range.next()?;
let value = InternalValue {
key: entry.key(),
value: entry.value(),
};
if seqno_filter(value.key.seqno, self.seqno) {
return Some(Ok(value));
}
}
}
fn next_back_filtered(&mut self) -> Option<IterItem> {
loop {
let entry = self.range.next_back()?;
let value = InternalValue {
key: entry.key(),
value: entry.value(),
};
if seqno_filter(value.key.seqno, self.seqno) {
return Some(Ok(value));
}
}
}
}
enum SeekableLeaf<'a> {
Table(Box<TableLeaf>),
Run(Box<RunLeaf>),
Memtable(MemtableLeaf<'a>),
}
impl MergeSource for SeekableLeaf<'_> {
fn next(&mut self) -> Option<IterItem> {
match self {
SeekableLeaf::Table(l) => l.next_filtered(),
SeekableLeaf::Run(l) => l.next_filtered(),
SeekableLeaf::Memtable(l) => l.next_filtered(),
}
}
fn next_back(&mut self) -> Option<IterItem> {
match self {
SeekableLeaf::Table(l) => l.next_back_filtered(),
SeekableLeaf::Run(l) => l.next_back_filtered(),
SeekableLeaf::Memtable(l) => l.next_back_filtered(),
}
}
fn seek(&mut self, _target: &InternalKey) -> crate::Result<()> {
Ok(())
}
}
impl CoherentMergeSource for SeekableLeaf<'_> {}
impl Reseekable for SeekableLeaf<'_> {
fn reseek(&mut self, ctx: &ReseekCtx) {
match self {
SeekableLeaf::Table(l) => l.table.reseek_range(&mut l.iter, ctx.user.clone()),
SeekableLeaf::Run(l) => l.reader.reseek(ctx.user.clone(), l.comparator.as_ref()),
SeekableLeaf::Memtable(l) => {
l.range = l.mt.items.range(ctx.internal.clone());
}
}
}
}
type SeekPipeline<'a> = RangeTombstoneFilter<
TombstoneSkip<MvccStream<SeekingMerger<SeekableLeaf<'a>, SharedComparator>>>,
>;
fn build_seek_pipeline<'a>(
state: &'a IterState,
collected: &CollectedSources,
lower: Bound<UserKey>,
upper: Bound<UserKey>,
seqno: SeqNo,
) -> SeekPipeline<'a> {
let user_range: UserBounds = (lower, upper);
let internal = user_to_internal_bounds(&user_range);
let mut sources: Vec<SeekableLeaf<'a>> =
Vec::with_capacity(collected.single_tables.len() + collected.multi_runs.len() + 3);
for table in &collected.single_tables {
sources.push(SeekableLeaf::Table(Box::new(TableLeaf::new(
table.clone(),
user_range.clone(),
seqno,
))));
}
for run in &collected.multi_runs {
if let Some(leaf) = RunLeaf::new(
run.clone(),
user_range.clone(),
seqno,
state.comparator.clone(),
) {
sources.push(SeekableLeaf::Run(Box::new(leaf)));
}
}
for memtable in state.version.sealed_memtables.iter() {
sources.push(SeekableLeaf::Memtable(MemtableLeaf::new(
memtable,
internal.clone(),
seqno,
)));
}
sources.push(SeekableLeaf::Memtable(MemtableLeaf::new(
&state.version.active_memtable,
internal.clone(),
seqno,
)));
if let Some((mt, eph_seqno)) = &state.ephemeral {
sources.push(SeekableLeaf::Memtable(MemtableLeaf::new(
mt, internal, *eph_seqno,
)));
}
let merged = SeekingMerger::new(sources, state.comparator.clone());
let mvcc = MvccStream::new_with_comparator(
merged,
state.merge_operator.clone(),
state.comparator.clone(),
)
.with_range_tombstones(collected.range_tombstones.clone());
let skip = TombstoneSkip { inner: mvcc };
RangeTombstoneFilter::new_with_comparator(
skip,
collected.range_tombstones.clone(),
state.comparator.clone(),
)
}
pub struct SeekableOwner {
state: IterState,
collected: Arc<CollectedSources>,
seqno: SeqNo,
lower: Bound<UserKey>,
upper: Bound<UserKey>,
}
self_cell!(
struct SeekableCell {
owner: SeekableOwner,
#[covariant]
dependent: SeekPipeline,
}
);
impl SeekableCell {
fn build(owner: SeekableOwner) -> Self {
Self::new(owner, |o| {
build_seek_pipeline(
&o.state,
&o.collected,
o.lower.clone(),
o.upper.clone(),
o.seqno,
)
})
}
fn reseek(&mut self, ctx: &ReseekCtx) {
self.with_dependent_mut(|_, pipeline| pipeline.reseek(ctx));
}
}
impl Iterator for SeekableCell {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
self.with_dependent_mut(|_, iter| iter.next())
}
}
impl DoubleEndedIterator for SeekableCell {
fn next_back(&mut self) -> Option<Self::Item> {
self.with_dependent_mut(|_, iter| iter.next_back())
}
}
pub struct SeekableTreeIter {
cell: SeekableCell,
#[expect(
clippy::option_option,
reason = "std `Peekable` buffer shape: outer None = not yet peeked, \
inner None = peeked past the end, Some(Some) = buffered item"
)]
peeked: Option<Option<crate::Result<InternalValue>>>,
}
impl SeekableTreeIter {
#[must_use]
pub fn create(
state: IterState,
union_lower: Bound<UserKey>,
union_upper: Bound<UserKey>,
seqno: SeqNo,
) -> Self {
let collected = Arc::new(collect_sources(
&state,
(union_lower.clone(), union_upper.clone()),
seqno,
));
Self {
cell: SeekableCell::build(SeekableOwner {
state,
collected,
seqno,
lower: union_lower,
upper: union_upper,
}),
peeked: None,
}
}
pub(crate) fn reposition(&mut self, lower: Bound<UserKey>, upper: Bound<UserKey>) {
let user_range: UserBounds = (lower, upper);
let internal = user_to_internal_bounds(&user_range);
let ctx = ReseekCtx {
user: user_range,
internal,
};
self.cell.reseek(&ctx);
self.peeked = None;
}
pub fn seek_to(&mut self, key: &[u8]) {
let (lower, upper) = {
let union = &self.cell.borrow_owner().collected.union;
let lower = match &union.0 {
Bound::Included(floor) if floor.as_ref() > key => union.0.clone(),
Bound::Excluded(floor) if floor.as_ref() >= key => union.0.clone(),
_ => Bound::Included(UserKey::from(key)),
};
(lower, union.1.clone())
};
self.reposition(lower, upper);
}
pub fn seek_to_for_prev(&mut self, key: &[u8]) {
let (lower, upper) = {
let union = &self.cell.borrow_owner().collected.union;
let upper = match &union.1 {
Bound::Included(ceil) if ceil.as_ref() < key => union.1.clone(),
Bound::Excluded(ceil) if ceil.as_ref() <= key => union.1.clone(),
_ => Bound::Included(UserKey::from(key)),
};
(union.0.clone(), upper)
};
self.reposition(lower, upper);
}
#[must_use]
pub fn version(&self) -> crate::version::Version {
self.cell.borrow_owner().state.version.version.clone()
}
pub fn peek_key(&mut self) -> Option<crate::Result<UserKey>> {
if self.peeked.is_none() {
self.peeked = Some(self.cell.next());
}
if matches!(self.peeked, Some(Some(Err(_)))) {
return match self.peeked.take() {
Some(Some(Err(e))) => Some(Err(e)),
_ => None,
};
}
match &self.peeked {
Some(Some(Ok(item))) => Some(Ok(item.key.user_key.clone())),
_ => None,
}
}
}
impl Iterator for SeekableTreeIter {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
match self.peeked.take() {
Some(buffered) => buffered,
None => self.cell.next(),
}
}
}
impl DoubleEndedIterator for SeekableTreeIter {
fn next_back(&mut self) -> Option<Self::Item> {
match self.cell.next_back() {
Some(item) => Some(item),
None => self.peeked.take().flatten(),
}
}
}
pub struct BatchRangeScan<I> {
iter: SeekableTreeIter,
intervals: I,
primed: bool,
}
impl<I: Iterator<Item = (Bound<UserKey>, Bound<UserKey>)>> BatchRangeScan<I> {
pub fn new(iter: SeekableTreeIter, intervals: I) -> Self {
Self {
iter,
intervals,
primed: false,
}
}
}
impl<I: Iterator<Item = (Bound<UserKey>, Bound<UserKey>)>> Iterator for BatchRangeScan<I> {
type Item = crate::Result<InternalValue>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.primed {
if let Some(item) = self.iter.next() {
return Some(item);
}
self.primed = false;
}
let (lower, upper) = self.intervals.next()?;
self.iter.reposition(lower, upper);
self.primed = true;
}
}
}
#[cfg(test)]
mod tests;