use async_trait::async_trait;
use bytes::Bytes;
use futures::future::try_join;
use std::collections::VecDeque;
use std::sync::Arc;
use crate::bytes_range::BytesRange;
use crate::db_iter::GetIterator;
use crate::db_state::{SortedRun, SsTableView};
use crate::db_stats::DbStats;
use crate::error::SlateDBError;
use crate::iter::{EmptyIterator, IterationOrder, RowEntryIterator};
use crate::manifest::{LsmTreeState, Segment};
use crate::merge_iterator::MergeIterator;
use crate::sorted_run_iterator::SortedRunIterator;
use crate::sst_iter::{SstIterator, SstIteratorOptions};
use crate::tablestore::TableStore;
use crate::types::RowEntry;
use crate::utils::build_concurrent;
pub(crate) struct SegmentScanContext {
pub(crate) table_store: Arc<TableStore>,
pub(crate) range: BytesRange,
pub(crate) sst_iter_options: SstIteratorOptions,
pub(crate) max_parallel: usize,
pub(crate) point_lookup_stats: Option<DbStats>,
pub(crate) db_stats: DbStats,
}
struct RangeTreeIterators {
l0: VecDeque<Box<dyn RowEntryIterator>>,
sr: VecDeque<Box<dyn RowEntryIterator>>,
}
impl RangeTreeIterators {
async fn build(tree: LsmTreeState, ctx: &SegmentScanContext) -> Result<Self, SlateDBError> {
let (l0, sr) = try_join(
build_l0_range_iters(tree.l0, ctx),
build_sr_range_iters(tree.compacted, ctx),
)
.await?;
Ok(Self { l0, sr })
}
}
enum SegmentIterState {
Pending(LsmTreeState),
Built(Box<dyn RowEntryIterator>),
}
struct SegmentMergeIterator {
children: VecDeque<(Bytes, SegmentIterState)>,
context: Option<SegmentScanContext>,
order: IterationOrder,
pending_seek: Option<Bytes>,
initialized: bool,
}
impl SegmentMergeIterator {
fn new(segments: Vec<Segment>, context: SegmentScanContext) -> Self {
let order = context.sst_iter_options.order;
let mut children: VecDeque<(Bytes, SegmentIterState)> = segments
.into_iter()
.map(|s| (s.prefix, SegmentIterState::Pending(s.tree)))
.collect();
if matches!(order, IterationOrder::Descending) {
children = children.into_iter().rev().collect();
}
Self {
children,
context: Some(context),
order,
pending_seek: None,
initialized: false,
}
}
fn count_before(&self, key: &[u8]) -> usize {
match self.order {
IterationOrder::Ascending => {
let after = self.children.partition_point(|(p, _)| p.as_ref() <= key);
if after > 0 && key.starts_with(self.children[after - 1].0.as_ref()) {
after - 1
} else {
after
}
}
IterationOrder::Descending => self.children.partition_point(|(p, _)| p.as_ref() > key),
}
}
async fn ensure_front_built(
&mut self,
) -> Result<Option<&mut Box<dyn RowEntryIterator>>, SlateDBError> {
if matches!(
self.children.front(),
Some((_, SegmentIterState::Pending(_)))
) {
let (prefix, lazy) = self
.children
.pop_front()
.expect("front exists per match above");
let SegmentIterState::Pending(tree) = lazy else {
unreachable!("matched Pending above");
};
let context = self
.context
.as_ref()
.expect("Pending children require a SegmentScanContext");
let RangeTreeIterators { l0, sr } = RangeTreeIterators::build(tree, context).await?;
let iters: VecDeque<Box<dyn RowEntryIterator>> = l0.into_iter().chain(sr).collect();
let merge = MergeIterator::new_with_order(iters, context.sst_iter_options.order)?;
let mut child: Box<dyn RowEntryIterator> = Box::new(merge.with_dedup(false));
child.init().await?;
if let Some(seek_key) = self.pending_seek.as_ref() {
child.seek(seek_key).await?;
}
self.children
.push_front((prefix, SegmentIterState::Built(child)));
}
Ok(self.children.front_mut().map(|(_, lazy)| match lazy {
SegmentIterState::Built(child) => child,
SegmentIterState::Pending(_) => unreachable!("just transitioned to Built"),
}))
}
}
#[async_trait]
impl RowEntryIterator for SegmentMergeIterator {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.initialized = true;
Ok(())
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
if !self.initialized {
return Err(SlateDBError::IteratorNotInitialized);
}
loop {
let Some(child) = self.ensure_front_built().await? else {
return Ok(None);
};
if let Some(entry) = child.next().await? {
return Ok(Some(entry));
}
self.children.pop_front();
}
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
if !self.initialized {
return Err(SlateDBError::IteratorNotInitialized);
}
self.pending_seek = Some(Bytes::copy_from_slice(next_key));
let drop_count = self.count_before(next_key);
if drop_count > 0 {
self.children.drain(..drop_count);
}
let was_pending = matches!(
self.children.front(),
Some((_, SegmentIterState::Pending(_)))
);
if let Some(child) = self.ensure_front_built().await? {
if !was_pending {
child.seek(next_key).await?;
}
}
Ok(())
}
}
pub(crate) fn build_segment_iter(
segments: Vec<Segment>,
context: SegmentScanContext,
max_seq: Option<u64>,
) -> Result<Box<dyn RowEntryIterator>, SlateDBError> {
if let Some(point_key) = context.range.as_point() {
let mut segments = segments;
match segments.pop() {
Some(segment) => Ok(Box::new(GetIterator::from_lsm_tree(
point_key.clone(),
segment.tree,
&context,
max_seq,
)?)),
None => Ok(Box::new(EmptyIterator::new())),
}
} else {
Ok(Box::new(SegmentMergeIterator::new(segments, context)))
}
}
pub(crate) fn build_l0_point_iters(
l0: VecDeque<SsTableView>,
ctx: &SegmentScanContext,
) -> Result<VecDeque<Box<dyn RowEntryIterator>>, SlateDBError> {
let mut iters = VecDeque::new();
for sst in l0.into_iter() {
let iter = SstIterator::new_owned_with_stats(
ctx.range.clone(),
sst,
ctx.table_store.clone(),
ctx.sst_iter_options.clone(),
ctx.point_lookup_stats.clone(),
)?;
if let Some(iter) = iter {
iters.push_back(Box::new(iter) as Box<dyn RowEntryIterator>);
}
}
Ok(iters)
}
pub(crate) fn build_sr_point_iters(
key: &Bytes,
compacted: &[SortedRun],
ctx: &SegmentScanContext,
) -> Result<VecDeque<Box<dyn RowEntryIterator>>, SlateDBError> {
let mut iters = VecDeque::new();
for sr in compacted.iter() {
for handle in sr.tables_covering_point_key(key.as_ref()) {
let iter = SstIterator::new_owned_with_stats(
ctx.range.clone(),
handle.clone(),
ctx.table_store.clone(),
ctx.sst_iter_options.clone(),
ctx.point_lookup_stats.clone(),
)?;
if let Some(iter) = iter {
iters.push_back(Box::new(iter) as Box<dyn RowEntryIterator>);
}
}
}
Ok(iters)
}
async fn build_l0_range_iters(
l0: VecDeque<SsTableView>,
ctx: &SegmentScanContext,
) -> Result<VecDeque<Box<dyn RowEntryIterator>>, SlateDBError> {
let table_store = ctx.table_store.clone();
let range = ctx.range.clone();
let opts = ctx.sst_iter_options.clone();
build_concurrent(l0.into_iter(), ctx.max_parallel, move |sst| {
let table_store = table_store.clone();
let range = range.clone();
let opts = opts.clone();
async move {
SstIterator::new_owned_initialized(range, sst, table_store, opts)
.await
.map(|maybe| maybe.map(|i| Box::new(i) as Box<dyn RowEntryIterator>))
}
})
.await
}
async fn build_sr_range_iters(
compacted: Vec<SortedRun>,
ctx: &SegmentScanContext,
) -> Result<VecDeque<Box<dyn RowEntryIterator>>, SlateDBError> {
let range = ctx.range.clone();
let overlapping: Vec<_> = compacted
.into_iter()
.filter(|sr| sr.overlaps_range(&range))
.collect();
let table_store = ctx.table_store.clone();
let opts = ctx.sst_iter_options.clone();
let stats = ctx.db_stats.clone();
build_concurrent(overlapping.into_iter(), ctx.max_parallel, move |sr| {
let table_store = table_store.clone();
let range = range.clone();
let opts = opts.clone();
let stats = stats.clone();
async move {
SortedRunIterator::new_owned_initialized_with_stats(
range,
sr,
table_store,
opts,
Some(stats),
)
.await
.map(|iter| Some(Box::new(iter) as Box<dyn RowEntryIterator>))
}
})
.await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{RowEntry, ValueDeletable};
use bytes::Bytes;
struct VecIter {
entries: VecDeque<RowEntry>,
order: IterationOrder,
initialized: bool,
}
impl VecIter {
fn with_order(entries: Vec<RowEntry>, order: IterationOrder) -> Self {
Self {
entries: entries.into(),
order,
initialized: false,
}
}
}
#[async_trait]
impl RowEntryIterator for VecIter {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.initialized = true;
Ok(())
}
async fn next(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
if !self.initialized {
return Err(SlateDBError::IteratorNotInitialized);
}
Ok(self.entries.pop_front())
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
if !self.initialized {
return Err(SlateDBError::IteratorNotInitialized);
}
while let Some(front) = self.entries.front() {
let satisfied = match self.order {
IterationOrder::Ascending => front.key.as_ref() >= next_key,
IterationOrder::Descending => front.key.as_ref() <= next_key,
};
if satisfied {
break;
}
self.entries.pop_front();
}
Ok(())
}
}
fn entry(key: &[u8], val: &[u8], seq: u64) -> RowEntry {
RowEntry {
key: Bytes::copy_from_slice(key),
value: ValueDeletable::Value(Bytes::copy_from_slice(val)),
seq,
create_ts: None,
expire_ts: None,
}
}
fn tombstone(key: &[u8], seq: u64) -> RowEntry {
RowEntry {
key: Bytes::copy_from_slice(key),
value: ValueDeletable::Tombstone,
seq,
create_ts: None,
expire_ts: None,
}
}
async fn built_child(
prefix: &[u8],
entries: Vec<RowEntry>,
order: IterationOrder,
) -> (Bytes, SegmentIterState) {
let mut iter = VecIter::with_order(entries, order);
iter.init().await.unwrap();
(
Bytes::copy_from_slice(prefix),
SegmentIterState::Built(Box::new(iter)),
)
}
async fn make_iter(specs: Vec<(&[u8], Vec<RowEntry>)>) -> SegmentMergeIterator {
make_iter_with_order(specs, IterationOrder::Ascending).await
}
async fn make_iter_with_order(
specs: Vec<(&[u8], Vec<RowEntry>)>,
order: IterationOrder,
) -> SegmentMergeIterator {
let mut children = VecDeque::new();
for (prefix, entries) in specs {
children.push_back(built_child(prefix, entries, order).await);
}
SegmentMergeIterator {
children,
context: None,
order,
pending_seek: None,
initialized: false,
}
}
async fn drain(iter: &mut SegmentMergeIterator) -> Vec<Bytes> {
let mut out = Vec::new();
while let Some(e) = iter.next().await.unwrap() {
out.push(e.key);
}
out
}
#[tokio::test]
async fn empty_chain_returns_none() {
let mut iter = make_iter(vec![]).await;
iter.init().await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn next_before_init_errors() {
let mut iter = make_iter(vec![]).await;
let err = iter.next().await.unwrap_err();
assert!(matches!(err, SlateDBError::IteratorNotInitialized));
}
#[tokio::test]
async fn single_child_passes_through() {
let mut iter = make_iter(vec![(
b"a/",
vec![entry(b"a/1", b"x", 1), entry(b"a/2", b"y", 2)],
)])
.await;
iter.init().await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![Bytes::from_static(b"a/1"), Bytes::from_static(b"a/2")],
);
}
#[tokio::test]
async fn unsegmented_child_with_empty_prefix() {
let mut iter = make_iter(vec![(
b"",
vec![entry(b"alpha", b"x", 1), entry(b"omega", b"y", 2)],
)])
.await;
iter.init().await.unwrap();
iter.seek(b"foo").await.unwrap();
assert_eq!(drain(&mut iter).await, vec![Bytes::from_static(b"omega")]);
}
#[tokio::test]
async fn two_children_chain_in_order() {
let mut iter = make_iter(vec![
(b"a/", vec![entry(b"a/1", b"x", 1), entry(b"a/2", b"y", 2)]),
(b"b/", vec![entry(b"b/1", b"z", 3)]),
])
.await;
iter.init().await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![
Bytes::from_static(b"a/1"),
Bytes::from_static(b"a/2"),
Bytes::from_static(b"b/1"),
],
);
}
#[tokio::test]
async fn empty_intermediate_child_is_skipped() {
let mut iter = make_iter(vec![
(b"a/", vec![entry(b"a/1", b"x", 1)]),
(b"b/", vec![]),
(b"c/", vec![entry(b"c/1", b"y", 2)]),
])
.await;
iter.init().await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![Bytes::from_static(b"a/1"), Bytes::from_static(b"c/1")],
);
}
#[tokio::test]
async fn seek_within_current_child() {
let mut iter = make_iter(vec![
(
b"a/",
vec![
entry(b"a/1", b"x", 1),
entry(b"a/2", b"y", 2),
entry(b"a/3", b"z", 3),
],
),
(b"b/", vec![entry(b"b/1", b"w", 4)]),
])
.await;
iter.init().await.unwrap();
iter.seek(b"a/2").await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![
Bytes::from_static(b"a/2"),
Bytes::from_static(b"a/3"),
Bytes::from_static(b"b/1"),
],
);
}
#[tokio::test]
async fn seek_into_later_child_skips_intervening() {
let mut iter = make_iter(vec![
(b"a/", vec![entry(b"a/1", b"x", 1), entry(b"a/2", b"y", 2)]),
(b"b/", vec![entry(b"b/1", b"z", 3)]),
(b"c/", vec![entry(b"c/1", b"u", 4), entry(b"c/2", b"v", 5)]),
])
.await;
iter.init().await.unwrap();
iter.seek(b"c/2").await.unwrap();
assert_eq!(drain(&mut iter).await, vec![Bytes::from_static(b"c/2")]);
}
#[tokio::test]
async fn seek_into_gap_advances_to_next_segment() {
let mut iter = make_iter(vec![
(b"a/", vec![entry(b"a/1", b"x", 1)]),
(b"c/", vec![entry(b"c/1", b"y", 2)]),
])
.await;
iter.init().await.unwrap();
iter.seek(b"b/key").await.unwrap();
assert_eq!(drain(&mut iter).await, vec![Bytes::from_static(b"c/1")]);
}
#[tokio::test]
async fn seek_lazily_initializes_promoted_children() {
let mut iter = make_iter(vec![
(b"a/", vec![entry(b"a/1", b"x", 1)]),
(b"b/", vec![entry(b"b/1", b"y", 2)]),
(b"c/", vec![entry(b"c/1", b"z", 3), entry(b"c/2", b"w", 4)]),
])
.await;
iter.init().await.unwrap();
iter.seek(b"b/").await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![
Bytes::from_static(b"b/1"),
Bytes::from_static(b"c/1"),
Bytes::from_static(b"c/2"),
],
);
}
#[tokio::test]
async fn tombstones_propagate() {
let mut iter = make_iter(vec![(
b"a/",
vec![
entry(b"a/1", b"x", 1),
tombstone(b"a/2", 2),
entry(b"a/3", b"z", 3),
],
)])
.await;
iter.init().await.unwrap();
let first = iter.next().await.unwrap().unwrap();
assert!(matches!(first.value, ValueDeletable::Value(_)));
let second = iter.next().await.unwrap().unwrap();
assert!(matches!(second.value, ValueDeletable::Tombstone));
let third = iter.next().await.unwrap().unwrap();
assert!(matches!(third.value, ValueDeletable::Value(_)));
}
#[tokio::test]
async fn descending_two_children_chain_largest_prefix_first() {
let mut iter = make_iter_with_order(
vec![
(b"b/", vec![entry(b"b/2", b"z", 3), entry(b"b/1", b"y", 2)]),
(b"a/", vec![entry(b"a/2", b"w", 4), entry(b"a/1", b"x", 1)]),
],
IterationOrder::Descending,
)
.await;
iter.init().await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![
Bytes::from_static(b"b/2"),
Bytes::from_static(b"b/1"),
Bytes::from_static(b"a/2"),
Bytes::from_static(b"a/1"),
],
);
}
#[tokio::test]
async fn descending_seek_into_earlier_child_skips_intervening() {
let mut iter = make_iter_with_order(
vec![
(b"c/", vec![entry(b"c/2", b"z", 5), entry(b"c/1", b"y", 4)]),
(b"b/", vec![entry(b"b/1", b"x", 3)]),
(b"a/", vec![entry(b"a/2", b"w", 2), entry(b"a/1", b"v", 1)]),
],
IterationOrder::Descending,
)
.await;
iter.init().await.unwrap();
iter.seek(b"a/2").await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![Bytes::from_static(b"a/2"), Bytes::from_static(b"a/1")],
);
}
#[tokio::test]
async fn descending_seek_into_gap_advances_to_smaller_segment() {
let mut iter = make_iter_with_order(
vec![
(b"c/", vec![entry(b"c/1", b"z", 3)]),
(b"a/", vec![entry(b"a/1", b"y", 1)]),
],
IterationOrder::Descending,
)
.await;
iter.init().await.unwrap();
iter.seek(b"b/key").await.unwrap();
assert_eq!(drain(&mut iter).await, vec![Bytes::from_static(b"a/1")]);
}
#[tokio::test]
async fn descending_seek_within_segment_via_prefix_extension() {
let mut iter = make_iter_with_order(
vec![
(b"c/", vec![entry(b"c/1", b"z", 3)]),
(
b"b/",
vec![
entry(b"b/3", b"w", 6),
entry(b"b/2", b"v", 5),
entry(b"b/1", b"u", 4),
],
),
(b"a/", vec![entry(b"a/1", b"t", 1)]),
],
IterationOrder::Descending,
)
.await;
iter.init().await.unwrap();
iter.seek(b"b/2").await.unwrap();
assert_eq!(
drain(&mut iter).await,
vec![
Bytes::from_static(b"b/2"),
Bytes::from_static(b"b/1"),
Bytes::from_static(b"a/1"),
],
);
}
#[tokio::test]
async fn descending_seek_drops_all_children() {
let mut iter = make_iter_with_order(
vec![
(b"c/", vec![entry(b"c/1", b"z", 2)]),
(b"b/", vec![entry(b"b/1", b"y", 1)]),
],
IterationOrder::Descending,
)
.await;
iter.init().await.unwrap();
iter.seek(b"a/zzz").await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
#[tokio::test]
async fn ascending_seek_drops_all_children() {
let mut iter = make_iter(vec![
(b"a/", vec![entry(b"a/1", b"x", 1)]),
(b"b/", vec![entry(b"b/1", b"y", 2)]),
])
.await;
iter.init().await.unwrap();
iter.seek(b"z/").await.unwrap();
assert!(iter.next().await.unwrap().is_none());
}
}