use crate::comparator::UserComparator;
use crate::loser_tree::LoserTree;
use crate::merge_source::{CoherentMergeSource, IterItem, MergeSource};
use alloc::vec::Vec;
use core::cmp::Ordering;
struct MergerEntry {
source: usize,
value: crate::InternalValue,
}
#[derive(Clone)]
struct MinCmp<C: UserComparator + Clone> {
comparator: C,
}
impl<C: UserComparator + Clone> crate::loser_tree::EntryComparator<MergerEntry> for MinCmp<C> {
#[expect(
clippy::inline_always,
reason = "called O(log cap) per replay step on the merger's hot path; \
matching the loser-tree's own #[inline(always)] on cmp_indices \
is what makes the dispatch flatten — verified in disassembly"
)]
#[inline(always)]
fn compare(&self, a: &MergerEntry, b: &MergerEntry) -> Ordering {
a.value
.key
.compare_with(&b.value.key, &self.comparator)
.then_with(|| a.source.cmp(&b.source))
}
}
#[derive(Clone)]
struct MaxCmp<C: UserComparator + Clone> {
comparator: C,
}
impl<C: UserComparator + Clone> crate::loser_tree::EntryComparator<MergerEntry> for MaxCmp<C> {
#[expect(
clippy::inline_always,
reason = "called O(log cap) per replay step on the merger's hot path"
)]
#[inline(always)]
fn compare(&self, a: &MergerEntry, b: &MergerEntry) -> Ordering {
b.value
.key
.compare_with(&a.value.key, &self.comparator)
.then_with(|| b.source.cmp(&a.source))
}
}
fn build_min_cmp<C: UserComparator + Clone>(comparator: C) -> MinCmp<C> {
MinCmp { comparator }
}
fn build_max_cmp<C: UserComparator + Clone>(comparator: C) -> MaxCmp<C> {
MaxCmp { comparator }
}
pub struct SeekingMerger<S: MergeSource, C: UserComparator + Clone> {
sources: Vec<S>,
n_sources: usize,
comparator: C,
forward_tree: Option<LoserTree<MergerEntry, MinCmp<C>>>,
backward_tree: Option<LoserTree<MergerEntry, MaxCmp<C>>>,
pending_forward_error: Option<crate::Error>,
pending_backward_error: Option<crate::Error>,
forward_primed: bool,
backward_primed: bool,
}
impl<S: MergeSource, C: UserComparator + Clone> SeekingMerger<S, C> {
#[must_use]
pub fn new(sources: Vec<S>, comparator: C) -> Self {
let n = sources.len();
Self {
sources,
n_sources: n,
comparator,
forward_tree: None,
backward_tree: None,
pending_forward_error: None,
pending_backward_error: None,
forward_primed: false,
backward_primed: false,
}
}
fn initialize_forward(&mut self) {
let Self {
sources,
n_sources,
comparator,
forward_tree,
backward_tree,
pending_forward_error,
..
} = self;
let n = *n_sources;
let mut pull = |i: usize| -> Option<MergerEntry> {
#[expect(
clippy::indexing_slicing,
reason = "i < n_sources by construction; sources len == n_sources"
)]
match MergeSource::next(&mut sources[i]) {
Some(Ok(value)) => Some(MergerEntry { source: i, value }),
Some(Err(e)) => {
pending_forward_error.get_or_insert(e);
None
}
None => backward_tree.as_mut().and_then(|bt| bt.take_slot(i)),
}
};
if let Some(tree) = forward_tree {
tree.refill_with(pull);
} else {
let mut initial: Vec<Option<MergerEntry>> = Vec::with_capacity(n);
for i in 0..n {
initial.push(pull(i));
}
let cmp = build_min_cmp(comparator.clone());
*forward_tree = Some(LoserTree::build(initial, cmp));
}
}
fn initialize_backward(&mut self) {
let Self {
sources,
n_sources,
comparator,
forward_tree,
backward_tree,
pending_backward_error,
..
} = self;
let n = *n_sources;
let mut pull = |i: usize| -> Option<MergerEntry> {
#[expect(
clippy::indexing_slicing,
reason = "i < n_sources by construction; sources len == n_sources"
)]
match MergeSource::next_back(&mut sources[i]) {
Some(Ok(value)) => Some(MergerEntry { source: i, value }),
Some(Err(e)) => {
pending_backward_error.get_or_insert(e);
None
}
None => forward_tree.as_mut().and_then(|ft| ft.take_slot(i)),
}
};
if let Some(tree) = backward_tree {
tree.refill_with(pull);
} else {
let mut initial: Vec<Option<MergerEntry>> = Vec::with_capacity(n);
for i in 0..n {
initial.push(pull(i));
}
let cmp = build_max_cmp(comparator.clone());
*backward_tree = Some(LoserTree::build(initial, cmp));
}
}
}
impl<S: MergeSource + crate::reseek::Reseekable, C: UserComparator + Clone>
crate::reseek::Reseekable for SeekingMerger<S, C>
{
fn reseek(&mut self, ctx: &crate::reseek::ReseekCtx) {
for source in &mut self.sources {
source.reseek(ctx);
}
if let Some(tree) = &mut self.forward_tree {
tree.clear();
}
if let Some(tree) = &mut self.backward_tree {
tree.clear();
}
self.forward_primed = false;
self.backward_primed = false;
self.pending_forward_error = None;
self.pending_backward_error = None;
}
}
impl<S: MergeSource, C: UserComparator + Clone> Iterator for SeekingMerger<S, C> {
type Item = IterItem;
fn next(&mut self) -> Option<Self::Item> {
if let Some(e) = self.pending_forward_error.take() {
return Some(Err(e));
}
if let Some(e) = self.pending_backward_error.take() {
return Some(Err(e));
}
if !self.forward_primed {
self.initialize_forward();
self.forward_primed = true;
if let Some(e) = self.pending_forward_error.take() {
return Some(Err(e));
}
}
let tree = self.forward_tree.as_mut()?;
let source = tree.winner_slot()?;
#[expect(
clippy::indexing_slicing,
reason = "source index < n_sources by construction"
)]
let next_pull = MergeSource::next(&mut self.sources[source]);
match next_pull {
Some(Ok(next_value)) => {
let old = tree.replace_min(MergerEntry {
source,
value: next_value,
});
Some(Ok(old.value))
}
Some(Err(e)) => {
let old = tree.pop_min()?;
self.pending_forward_error.get_or_insert(e);
Some(Ok(old.value))
}
None => {
let old = tree.pop_min()?;
Some(Ok(old.value))
}
}
}
}
impl<S: CoherentMergeSource, C: UserComparator + Clone> DoubleEndedIterator
for SeekingMerger<S, C>
{
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(e) = self.pending_backward_error.take() {
return Some(Err(e));
}
if let Some(e) = self.pending_forward_error.take() {
return Some(Err(e));
}
if !self.backward_primed {
self.initialize_backward();
self.backward_primed = true;
if let Some(e) = self.pending_backward_error.take() {
return Some(Err(e));
}
}
let tree = self.backward_tree.as_mut()?;
let source = tree.winner_slot()?;
#[expect(
clippy::indexing_slicing,
reason = "source index < n_sources by construction"
)]
let next_pull = MergeSource::next_back(&mut self.sources[source]);
match next_pull {
Some(Ok(next_value)) => {
let old = tree.replace_min(MergerEntry {
source,
value: next_value,
});
Some(Ok(old.value))
}
Some(Err(e)) => {
let old = tree.pop_min()?;
self.pending_backward_error.get_or_insert(e);
Some(Ok(old.value))
}
None => {
let old = tree.pop_min()?;
Some(Ok(old.value))
}
}
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, reason = "test assertions")]
mod tests {
use super::*;
use crate::InternalValue;
use crate::ValueType::Value;
use crate::comparator::{self, SharedComparator};
use crate::key::InternalKey;
use alloc::collections::VecDeque;
use test_log::test;
struct VecSource {
items: VecDeque<InternalValue>,
comparator: SharedComparator,
}
impl VecSource {
fn new<I: IntoIterator<Item = InternalValue>>(
items: I,
comparator: SharedComparator,
) -> Self {
Self {
items: items.into_iter().collect(),
comparator,
}
}
}
impl MergeSource for VecSource {
fn next(&mut self) -> Option<IterItem> {
self.items.pop_front().map(Ok)
}
fn next_back(&mut self) -> Option<IterItem> {
self.items.pop_back().map(Ok)
}
fn seek(&mut self, target: &InternalKey) -> crate::Result<()> {
while let Some(front) = self.items.front() {
if front.key.compare_with(target, self.comparator.as_ref()) == Ordering::Less {
self.items.pop_front();
} else {
break;
}
}
Ok(())
}
}
impl CoherentMergeSource for VecSource {}
fn make_iv(key: &[u8], seqno: u64) -> InternalValue {
InternalValue::from_components(key, b"", seqno, Value)
}
fn k(v: &InternalValue) -> String {
String::from_utf8_lossy(&v.key.user_key).to_string()
}
#[test]
fn forward_only() {
let cmp = comparator::default_comparator();
let a = VecSource::new([make_iv(b"a", 0), make_iv(b"c", 0)], cmp.clone());
let b = VecSource::new([make_iv(b"b", 0), make_iv(b"d", 0)], cmp.clone());
let mut m = SeekingMerger::new(alloc::vec![a, b], cmp);
let keys: Vec<String> = (&mut m).map(|r| k(&r.unwrap())).collect();
assert_eq!(keys, ["a", "b", "c", "d"]);
}
#[test]
fn backward_only() {
let cmp = comparator::default_comparator();
let a = VecSource::new([make_iv(b"a", 0), make_iv(b"c", 0)], cmp.clone());
let b = VecSource::new([make_iv(b"b", 0), make_iv(b"d", 0)], cmp.clone());
let mut iter = SeekingMerger::new(alloc::vec![a, b], cmp);
let mut keys: Vec<String> = Vec::new();
while let Some(item) = iter.next_back() {
keys.push(k(&item.unwrap()));
}
assert_eq!(keys, ["d", "c", "b", "a"]);
}
#[test]
fn mixed_direction() {
let cmp = comparator::default_comparator();
let a = VecSource::new(
[make_iv(b"a", 0), make_iv(b"c", 0), make_iv(b"e", 0)],
cmp.clone(),
);
let b = VecSource::new(
[make_iv(b"b", 0), make_iv(b"d", 0), make_iv(b"f", 0)],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![a, b], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "f");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "e");
assert_eq!(k(&m.next().unwrap().unwrap()), "c");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "d");
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn empty_sources() {
let cmp = comparator::default_comparator();
let mut m: SeekingMerger<VecSource, _> = SeekingMerger::new(alloc::vec![], cmp);
assert!(Iterator::next(&mut m).is_none());
assert!(m.next_back().is_none());
}
#[test]
fn next_back_after_forward_exhausted_migrates_buffered_value() {
let cmp = comparator::default_comparator();
let src = VecSource::new([make_iv(b"a", 0), make_iv(b"z", 0)], cmp.clone());
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(
k(&m.next_back().unwrap().unwrap()),
"z",
"migration must rescue `z` buffered in forward_tree",
);
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn next_after_backward_exhausted_migrates_buffered_value() {
let cmp = comparator::default_comparator();
let src = VecSource::new([make_iv(b"a", 0), make_iv(b"z", 0)], cmp.clone());
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next_back().unwrap().unwrap()), "z");
assert_eq!(
k(&m.next().unwrap().unwrap()),
"a",
"migration must rescue `a` buffered in backward_tree",
);
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn single_source_drain_both_directions() {
let cmp = comparator::default_comparator();
let a = VecSource::new(
[make_iv(b"a", 0), make_iv(b"b", 0), make_iv(b"c", 0)],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "c");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
struct ErrSource {
emit_forward_error: bool,
emit_backward_error: bool,
}
impl MergeSource for ErrSource {
fn next(&mut self) -> Option<IterItem> {
if self.emit_forward_error {
self.emit_forward_error = false;
Some(Err(crate::Error::Unrecoverable))
} else {
None
}
}
fn next_back(&mut self) -> Option<IterItem> {
if self.emit_backward_error {
self.emit_backward_error = false;
Some(Err(crate::Error::Unrecoverable))
} else {
None
}
}
fn seek(&mut self, _target: &InternalKey) -> crate::Result<()> {
Ok(())
}
}
impl CoherentMergeSource for ErrSource {}
#[test]
fn forward_init_propagates_error() {
let cmp = comparator::default_comparator();
let a = ErrSource {
emit_forward_error: true,
emit_backward_error: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert!(m.next().unwrap().is_err());
assert!(m.next().is_none());
}
#[test]
fn backward_init_propagates_error() {
let cmp = comparator::default_comparator();
let a = ErrSource {
emit_forward_error: false,
emit_backward_error: true,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert!(m.next_back().unwrap().is_err());
assert!(m.next_back().is_none());
}
#[test]
fn forward_init_keeps_earlier_prefetched_when_later_source_errs() {
let cmp = comparator::default_comparator();
let good: Box<dyn CoherentMergeSource> = Box::new(VecSource::new(
[make_iv(b"good_a", 0), make_iv(b"good_b", 0)],
cmp.clone(),
));
let bad: Box<dyn CoherentMergeSource> = Box::new(ErrSource {
emit_forward_error: true,
emit_backward_error: false,
});
let mut m = SeekingMerger::new(alloc::vec![good, bad], cmp);
assert!(m.next().unwrap().is_err());
assert_eq!(k(&m.next().unwrap().unwrap()), "good_a");
assert_eq!(k(&m.next().unwrap().unwrap()), "good_b");
assert!(m.next().is_none());
}
#[test]
fn refill_err_surfaces_before_unrelated_source_yields() {
let cmp = comparator::default_comparator();
let bad: Box<dyn CoherentMergeSource> = Box::new(LateErrSource {
first_value: Some(make_iv(b"x_bad", 0)),
already_errored: false,
});
let good: Box<dyn CoherentMergeSource> = Box::new(VecSource::new(
[
make_iv(b"y_good_1", 0),
make_iv(b"y_good_2", 0),
make_iv(b"y_good_3", 0),
],
cmp.clone(),
));
let mut m = SeekingMerger::new(alloc::vec![bad, good], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "x_bad");
assert!(m.next().unwrap().is_err());
assert_eq!(k(&m.next().unwrap().unwrap()), "y_good_1");
assert_eq!(k(&m.next().unwrap().unwrap()), "y_good_2");
assert_eq!(k(&m.next().unwrap().unwrap()), "y_good_3");
assert!(m.next().is_none());
}
#[test]
fn cross_direction_surface_forward_pending_in_next_back() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"only", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "only");
assert!(m.next_back().unwrap().is_err());
}
#[test]
fn cross_direction_surface_backward_pending_in_next() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"only", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next_back().unwrap().unwrap()), "only");
assert!(m.next().unwrap().is_err());
}
#[test]
fn backward_init_keeps_earlier_prefetched_when_later_source_errs() {
let cmp = comparator::default_comparator();
let good: Box<dyn CoherentMergeSource> = Box::new(VecSource::new(
[make_iv(b"good_a", 0), make_iv(b"good_b", 0)],
cmp.clone(),
));
let bad: Box<dyn CoherentMergeSource> = Box::new(ErrSource {
emit_forward_error: false,
emit_backward_error: true,
});
let mut m = SeekingMerger::new(alloc::vec![good, bad], cmp);
assert!(m.next_back().unwrap().is_err());
assert_eq!(k(&m.next_back().unwrap().unwrap()), "good_b");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "good_a");
assert!(m.next_back().is_none());
}
struct LateErrSource {
first_value: Option<InternalValue>,
already_errored: bool,
}
impl MergeSource for LateErrSource {
fn next(&mut self) -> Option<IterItem> {
if let Some(v) = self.first_value.take() {
Some(Ok(v))
} else if !self.already_errored {
self.already_errored = true;
Some(Err(crate::Error::Unrecoverable))
} else {
None
}
}
fn next_back(&mut self) -> Option<IterItem> {
self.next()
}
fn seek(&mut self, _target: &InternalKey) -> crate::Result<()> {
Ok(())
}
}
impl CoherentMergeSource for LateErrSource {}
struct IndependentCursorSource {
items: Vec<crate::InternalValue>,
front_idx: usize,
back_idx: usize,
comparator: SharedComparator,
}
impl IndependentCursorSource {
fn new<I: IntoIterator<Item = crate::InternalValue>>(
items: I,
comparator: SharedComparator,
) -> Self {
let items: Vec<_> = items.into_iter().collect();
debug_assert!(
items.is_sorted_by(|a, b| {
a.key.compare_with(&b.key, comparator.as_ref()) != Ordering::Greater
}),
"IndependentCursorSource items must be sorted ascending by key",
);
let n = items.len();
Self {
items,
front_idx: 0,
back_idx: n,
comparator,
}
}
}
impl MergeSource for IndependentCursorSource {
fn next(&mut self) -> Option<IterItem> {
if self.front_idx >= self.back_idx {
return None;
}
#[expect(
clippy::indexing_slicing,
reason = "front_idx < back_idx <= items.len() by invariant"
)]
let v = self.items[self.front_idx].clone();
self.front_idx += 1;
Some(Ok(v))
}
fn next_back(&mut self) -> Option<IterItem> {
if self.front_idx >= self.back_idx {
return None;
}
self.back_idx -= 1;
#[expect(
clippy::indexing_slicing,
reason = "back_idx < items.len() after decrement, by invariant"
)]
let v = self.items[self.back_idx].clone();
Some(Ok(v))
}
fn seek(&mut self, target: &InternalKey) -> crate::Result<()> {
let idx = self.items.partition_point(|v| {
v.key.compare_with(target, self.comparator.as_ref()) == Ordering::Less
});
self.front_idx = self.front_idx.max(idx);
self.back_idx = self.back_idx.min(idx);
Ok(())
}
}
impl CoherentMergeSource for IndependentCursorSource {}
#[test]
fn switch_to_backward_after_drain_emits_no_duplicates() {
let cmp = comparator::default_comparator();
let src = IndependentCursorSource::new(
[
make_iv(b"a", 0),
make_iv(b"b", 0),
make_iv(b"c", 0),
make_iv(b"d", 0),
],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert_eq!(k(&m.next().unwrap().unwrap()), "c");
assert_eq!(k(&m.next().unwrap().unwrap()), "d");
assert!(m.next().is_none(), "source exhausted forward");
assert!(
m.next_back().is_none(),
"backward must not re-emit forward-consumed items",
);
assert!(m.next_back().is_none(), "stays exhausted");
}
#[test]
fn mid_stream_alternation_emits_no_duplicates_independent_cursor() {
let cmp = comparator::default_comparator();
let src = IndependentCursorSource::new(
[
make_iv(b"a", 0),
make_iv(b"b", 0),
make_iv(b"c", 0),
make_iv(b"d", 0),
make_iv(b"e", 0),
make_iv(b"f", 0),
],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "f");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "e");
assert_eq!(k(&m.next().unwrap().unwrap()), "c");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "d");
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn forward_refill_error_yields_buffered_then_err_on_next_call() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"first", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "first");
assert!(m.next().unwrap().is_err());
assert!(m.next().is_none());
}
#[test]
fn backward_refill_error_yields_buffered_then_err_on_next_call() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"first", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next_back().unwrap().unwrap()), "first");
assert!(m.next_back().unwrap().is_err());
assert!(m.next_back().is_none());
}
}