use super::*;
use crate::InternalValue;
use crate::ValueType::Value;
use crate::comparator::{self, SharedComparator};
use crate::key::InternalKey;
use alloc::collections::VecDeque;
use test_log::test;
struct VecSource {
items: VecDeque<InternalValue>,
comparator: SharedComparator,
}
impl VecSource {
fn new<I: IntoIterator<Item = InternalValue>>(items: I, comparator: SharedComparator) -> Self {
Self {
items: items.into_iter().collect(),
comparator,
}
}
}
impl MergeSource for VecSource {
fn next(&mut self) -> Option<IterItem> {
self.items.pop_front().map(Ok)
}
fn next_back(&mut self) -> Option<IterItem> {
self.items.pop_back().map(Ok)
}
fn seek(&mut self, target: &InternalKey) -> crate::Result<()> {
while let Some(front) = self.items.front() {
if front.key.compare_with(target, self.comparator.as_ref()) == Ordering::Less {
self.items.pop_front();
} else {
break;
}
}
Ok(())
}
}
impl CoherentMergeSource for VecSource {}
fn make_iv(key: &[u8], seqno: u64) -> InternalValue {
InternalValue::from_components(key, b"", seqno, Value)
}
fn k(v: &InternalValue) -> String {
String::from_utf8_lossy(&v.key.user_key).to_string()
}
#[test]
fn forward_only() {
let cmp = comparator::default_comparator();
let a = VecSource::new([make_iv(b"a", 0), make_iv(b"c", 0)], cmp.clone());
let b = VecSource::new([make_iv(b"b", 0), make_iv(b"d", 0)], cmp.clone());
let mut m = SeekingMerger::new(alloc::vec![a, b], cmp);
let keys: Vec<String> = (&mut m).map(|r| k(&r.unwrap())).collect();
assert_eq!(keys, ["a", "b", "c", "d"]);
}
#[test]
fn backward_only() {
let cmp = comparator::default_comparator();
let a = VecSource::new([make_iv(b"a", 0), make_iv(b"c", 0)], cmp.clone());
let b = VecSource::new([make_iv(b"b", 0), make_iv(b"d", 0)], cmp.clone());
let mut iter = SeekingMerger::new(alloc::vec![a, b], cmp);
let mut keys: Vec<String> = Vec::new();
while let Some(item) = iter.next_back() {
keys.push(k(&item.unwrap()));
}
assert_eq!(keys, ["d", "c", "b", "a"]);
}
#[test]
fn mixed_direction() {
let cmp = comparator::default_comparator();
let a = VecSource::new(
[make_iv(b"a", 0), make_iv(b"c", 0), make_iv(b"e", 0)],
cmp.clone(),
);
let b = VecSource::new(
[make_iv(b"b", 0), make_iv(b"d", 0), make_iv(b"f", 0)],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![a, b], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "f");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "e");
assert_eq!(k(&m.next().unwrap().unwrap()), "c");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "d");
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn empty_sources() {
let cmp = comparator::default_comparator();
let mut m: SeekingMerger<VecSource, _> = SeekingMerger::new(alloc::vec![], cmp);
assert!(Iterator::next(&mut m).is_none());
assert!(m.next_back().is_none());
}
#[test]
fn next_back_after_forward_exhausted_migrates_buffered_value() {
let cmp = comparator::default_comparator();
let src = VecSource::new([make_iv(b"a", 0), make_iv(b"z", 0)], cmp.clone());
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(
k(&m.next_back().unwrap().unwrap()),
"z",
"migration must rescue `z` buffered in forward_tree",
);
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn next_after_backward_exhausted_migrates_buffered_value() {
let cmp = comparator::default_comparator();
let src = VecSource::new([make_iv(b"a", 0), make_iv(b"z", 0)], cmp.clone());
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next_back().unwrap().unwrap()), "z");
assert_eq!(
k(&m.next().unwrap().unwrap()),
"a",
"migration must rescue `a` buffered in backward_tree",
);
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn single_source_drain_both_directions() {
let cmp = comparator::default_comparator();
let a = VecSource::new(
[make_iv(b"a", 0), make_iv(b"b", 0), make_iv(b"c", 0)],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "c");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
struct ErrSource {
emit_forward_error: bool,
emit_backward_error: bool,
}
impl MergeSource for ErrSource {
fn next(&mut self) -> Option<IterItem> {
if self.emit_forward_error {
self.emit_forward_error = false;
Some(Err(crate::Error::Unrecoverable))
} else {
None
}
}
fn next_back(&mut self) -> Option<IterItem> {
if self.emit_backward_error {
self.emit_backward_error = false;
Some(Err(crate::Error::Unrecoverable))
} else {
None
}
}
fn seek(&mut self, _target: &InternalKey) -> crate::Result<()> {
Ok(())
}
}
impl CoherentMergeSource for ErrSource {}
#[test]
fn forward_init_propagates_error() {
let cmp = comparator::default_comparator();
let a = ErrSource {
emit_forward_error: true,
emit_backward_error: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert!(m.next().unwrap().is_err());
assert!(m.next().is_none());
}
#[test]
fn backward_init_propagates_error() {
let cmp = comparator::default_comparator();
let a = ErrSource {
emit_forward_error: false,
emit_backward_error: true,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert!(m.next_back().unwrap().is_err());
assert!(m.next_back().is_none());
}
#[test]
fn forward_init_keeps_earlier_prefetched_when_later_source_errs() {
let cmp = comparator::default_comparator();
let good: Box<dyn CoherentMergeSource> = Box::new(VecSource::new(
[make_iv(b"good_a", 0), make_iv(b"good_b", 0)],
cmp.clone(),
));
let bad: Box<dyn CoherentMergeSource> = Box::new(ErrSource {
emit_forward_error: true,
emit_backward_error: false,
});
let mut m = SeekingMerger::new(alloc::vec![good, bad], cmp);
assert!(m.next().unwrap().is_err());
assert_eq!(k(&m.next().unwrap().unwrap()), "good_a");
assert_eq!(k(&m.next().unwrap().unwrap()), "good_b");
assert!(m.next().is_none());
}
#[test]
fn refill_err_surfaces_before_unrelated_source_yields() {
let cmp = comparator::default_comparator();
let bad: Box<dyn CoherentMergeSource> = Box::new(LateErrSource {
first_value: Some(make_iv(b"x_bad", 0)),
already_errored: false,
});
let good: Box<dyn CoherentMergeSource> = Box::new(VecSource::new(
[
make_iv(b"y_good_1", 0),
make_iv(b"y_good_2", 0),
make_iv(b"y_good_3", 0),
],
cmp.clone(),
));
let mut m = SeekingMerger::new(alloc::vec![bad, good], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "x_bad");
assert!(m.next().unwrap().is_err());
assert_eq!(k(&m.next().unwrap().unwrap()), "y_good_1");
assert_eq!(k(&m.next().unwrap().unwrap()), "y_good_2");
assert_eq!(k(&m.next().unwrap().unwrap()), "y_good_3");
assert!(m.next().is_none());
}
#[test]
fn cross_direction_surface_forward_pending_in_next_back() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"only", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "only");
assert!(m.next_back().unwrap().is_err());
}
#[test]
fn cross_direction_surface_backward_pending_in_next() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"only", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next_back().unwrap().unwrap()), "only");
assert!(m.next().unwrap().is_err());
}
#[test]
fn backward_init_keeps_earlier_prefetched_when_later_source_errs() {
let cmp = comparator::default_comparator();
let good: Box<dyn CoherentMergeSource> = Box::new(VecSource::new(
[make_iv(b"good_a", 0), make_iv(b"good_b", 0)],
cmp.clone(),
));
let bad: Box<dyn CoherentMergeSource> = Box::new(ErrSource {
emit_forward_error: false,
emit_backward_error: true,
});
let mut m = SeekingMerger::new(alloc::vec![good, bad], cmp);
assert!(m.next_back().unwrap().is_err());
assert_eq!(k(&m.next_back().unwrap().unwrap()), "good_b");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "good_a");
assert!(m.next_back().is_none());
}
struct LateErrSource {
first_value: Option<InternalValue>,
already_errored: bool,
}
impl MergeSource for LateErrSource {
fn next(&mut self) -> Option<IterItem> {
if let Some(v) = self.first_value.take() {
Some(Ok(v))
} else if !self.already_errored {
self.already_errored = true;
Some(Err(crate::Error::Unrecoverable))
} else {
None
}
}
fn next_back(&mut self) -> Option<IterItem> {
self.next()
}
fn seek(&mut self, _target: &InternalKey) -> crate::Result<()> {
Ok(())
}
}
impl CoherentMergeSource for LateErrSource {}
struct IndependentCursorSource {
items: Vec<crate::InternalValue>,
front_idx: usize,
back_idx: usize,
comparator: SharedComparator,
}
impl IndependentCursorSource {
fn new<I: IntoIterator<Item = crate::InternalValue>>(
items: I,
comparator: SharedComparator,
) -> Self {
let items: Vec<_> = items.into_iter().collect();
debug_assert!(
items.is_sorted_by(|a, b| {
a.key.compare_with(&b.key, comparator.as_ref()) != Ordering::Greater
}),
"IndependentCursorSource items must be sorted ascending by key",
);
let n = items.len();
Self {
items,
front_idx: 0,
back_idx: n,
comparator,
}
}
}
impl MergeSource for IndependentCursorSource {
fn next(&mut self) -> Option<IterItem> {
if self.front_idx >= self.back_idx {
return None;
}
#[expect(
clippy::indexing_slicing,
reason = "front_idx < back_idx <= items.len() by invariant"
)]
let v = self.items[self.front_idx].clone();
self.front_idx += 1;
Some(Ok(v))
}
fn next_back(&mut self) -> Option<IterItem> {
if self.front_idx >= self.back_idx {
return None;
}
self.back_idx -= 1;
#[expect(
clippy::indexing_slicing,
reason = "back_idx < items.len() after decrement, by invariant"
)]
let v = self.items[self.back_idx].clone();
Some(Ok(v))
}
fn seek(&mut self, target: &InternalKey) -> crate::Result<()> {
let idx = self.items.partition_point(|v| {
v.key.compare_with(target, self.comparator.as_ref()) == Ordering::Less
});
self.front_idx = self.front_idx.max(idx);
self.back_idx = self.back_idx.min(idx);
Ok(())
}
}
impl CoherentMergeSource for IndependentCursorSource {}
#[test]
fn switch_to_backward_after_drain_emits_no_duplicates() {
let cmp = comparator::default_comparator();
let src = IndependentCursorSource::new(
[
make_iv(b"a", 0),
make_iv(b"b", 0),
make_iv(b"c", 0),
make_iv(b"d", 0),
],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert_eq!(k(&m.next().unwrap().unwrap()), "c");
assert_eq!(k(&m.next().unwrap().unwrap()), "d");
assert!(m.next().is_none(), "source exhausted forward");
assert!(
m.next_back().is_none(),
"backward must not re-emit forward-consumed items",
);
assert!(m.next_back().is_none(), "stays exhausted");
}
#[test]
fn mid_stream_alternation_emits_no_duplicates_independent_cursor() {
let cmp = comparator::default_comparator();
let src = IndependentCursorSource::new(
[
make_iv(b"a", 0),
make_iv(b"b", 0),
make_iv(b"c", 0),
make_iv(b"d", 0),
make_iv(b"e", 0),
make_iv(b"f", 0),
],
cmp.clone(),
);
let mut m = SeekingMerger::new(alloc::vec![src], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "a");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "f");
assert_eq!(k(&m.next().unwrap().unwrap()), "b");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "e");
assert_eq!(k(&m.next().unwrap().unwrap()), "c");
assert_eq!(k(&m.next_back().unwrap().unwrap()), "d");
assert!(m.next().is_none());
assert!(m.next_back().is_none());
}
#[test]
fn forward_refill_error_yields_buffered_then_err_on_next_call() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"first", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next().unwrap().unwrap()), "first");
assert!(m.next().unwrap().is_err());
assert!(m.next().is_none());
}
#[test]
fn backward_refill_error_yields_buffered_then_err_on_next_call() {
let cmp = comparator::default_comparator();
let a = LateErrSource {
first_value: Some(make_iv(b"first", 0)),
already_errored: false,
};
let mut m = SeekingMerger::new(alloc::vec![a], cmp);
assert_eq!(k(&m.next_back().unwrap().unwrap()), "first");
assert!(m.next_back().unwrap().is_err());
assert!(m.next_back().is_none());
}