use crate::InternalValue;
use crate::comparator::SharedComparator;
use crate::heap::{HeapEntry, MergeHeap};
type IterItem = crate::Result<InternalValue>;
pub type BoxedIterator<'a> = Box<dyn DoubleEndedIterator<Item = IterItem> + Send + 'a>;
pub struct Merger<I> {
iterators: Vec<I>,
heap: MergeHeap,
initialized_lo: bool,
initialized_hi: bool,
}
impl<I: Iterator<Item = IterItem>> Merger<I> {
#[must_use]
pub fn new(iterators: Vec<I>, comparator: SharedComparator) -> Self {
let heap = MergeHeap::with_capacity(2 * iterators.len(), comparator);
Self {
iterators,
heap,
initialized_lo: false,
initialized_hi: false,
}
}
fn initialize_lo(&mut self) -> crate::Result<()> {
for (idx, it) in self.iterators.iter_mut().enumerate() {
if let Some(item) = it.next() {
let item = item?;
self.heap.push(HeapEntry::new(idx, item));
}
}
self.initialized_lo = true;
Ok(())
}
}
impl<I: DoubleEndedIterator<Item = IterItem>> Merger<I> {
fn initialize_hi(&mut self) -> crate::Result<()> {
for (idx, it) in self.iterators.iter_mut().enumerate() {
if let Some(item) = it.next_back() {
let item = item?;
self.heap.push(HeapEntry::new(idx, item));
}
}
self.initialized_hi = true;
Ok(())
}
}
impl<I: Iterator<Item = IterItem>> Iterator for Merger<I> {
type Item = IterItem;
fn next(&mut self) -> Option<Self::Item> {
if !self.initialized_lo {
fail_iter!(self.initialize_lo());
}
let top_index = self.heap.peek_min()?.index();
#[expect(clippy::indexing_slicing, reason = "we trust the HeapEntry index")]
if let Some(next_result) = self.iterators[top_index].next() {
match next_result {
Ok(next_value) => {
let old = self.heap.replace_min(HeapEntry::new(top_index, next_value));
Some(Ok(old.into_value()))
}
Err(e) => {
let _ = self.heap.pop_min();
Some(Err(e))
}
}
} else {
let old = self.heap.pop_min()?;
Some(Ok(old.into_value()))
}
}
}
impl<I: DoubleEndedIterator<Item = IterItem>> DoubleEndedIterator for Merger<I> {
fn next_back(&mut self) -> Option<Self::Item> {
if !self.initialized_hi {
fail_iter!(self.initialize_hi());
}
let top_index = self.heap.peek_max()?.index();
#[expect(clippy::indexing_slicing, reason = "we trust the HeapEntry index")]
if let Some(next_result) = self.iterators[top_index].next_back() {
match next_result {
Ok(next_value) => {
let old = self.heap.replace_max(HeapEntry::new(top_index, next_value));
Some(Ok(old.into_value()))
}
Err(e) => {
let _ = self.heap.pop_max();
Some(Err(e))
}
}
} else {
let old = self.heap.pop_max()?;
Some(Ok(old.into_value()))
}
}
}
#[cfg(test)]
#[allow(clippy::unnecessary_wraps)]
mod tests {
use super::*;
use crate::ValueType::Value;
use crate::comparator;
use test_log::test;
#[test]
#[expect(clippy::unwrap_used, reason = "test assertions")]
fn merge_simple() -> crate::Result<()> {
#[rustfmt::skip]
let a = vec![
Ok(InternalValue::from_components("a", b"", 0, Value)),
];
#[rustfmt::skip]
let b = vec![
Ok(InternalValue::from_components("b", b"", 0, Value)),
];
let mut iter = Merger::new(
vec![a.into_iter(), b.into_iter()],
comparator::default_comparator(),
);
assert_eq!(
iter.next().unwrap()?,
InternalValue::from_components("a", b"", 0, Value),
);
assert_eq!(
iter.next().unwrap()?,
InternalValue::from_components("b", b"", 0, Value),
);
assert!(iter.next().is_none(), "iter should be closed");
Ok(())
}
#[test]
#[ignore = "maybe not needed"]
#[expect(clippy::unwrap_used, reason = "test assertions")]
fn merge_dup() -> crate::Result<()> {
#[rustfmt::skip]
let a = vec![
Ok(InternalValue::from_components("a", b"", 0, Value)),
];
#[rustfmt::skip]
let b = vec![
Ok(InternalValue::from_components("a", b"", 0, Value)),
];
let mut iter = Merger::new(
vec![a.into_iter(), b.into_iter()],
comparator::default_comparator(),
);
assert_eq!(
iter.next().unwrap()?,
InternalValue::from_components("a", b"", 0, Value),
);
assert!(iter.next().is_none(), "iter should be closed");
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertions")]
fn merge_interleaved() -> crate::Result<()> {
let a = vec![
Ok(InternalValue::from_components("a", b"", 0, Value)),
Ok(InternalValue::from_components("c", b"", 0, Value)),
Ok(InternalValue::from_components("e", b"", 0, Value)),
];
let b = vec![
Ok(InternalValue::from_components("b", b"", 0, Value)),
Ok(InternalValue::from_components("d", b"", 0, Value)),
];
let iter = Merger::new(
vec![a.into_iter(), b.into_iter()],
comparator::default_comparator(),
);
let keys: Vec<String> = iter
.map(|r| {
let v = r.unwrap();
String::from_utf8_lossy(&v.key.user_key).to_string()
})
.collect();
assert_eq!(keys, ["a", "b", "c", "d", "e"]);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertions")]
fn merge_many_sources() -> crate::Result<()> {
let iter = Merger::new(
(0..8)
.map(|i| {
vec![Ok(InternalValue::from_components(
format!("{}", (b'a' + i) as char),
b"",
0,
Value,
))]
.into_iter()
})
.collect(),
comparator::default_comparator(),
);
let keys: Vec<String> = iter
.map(|r| {
let v = r.unwrap();
String::from_utf8_lossy(&v.key.user_key).to_string()
})
.collect();
assert_eq!(keys, ["a", "b", "c", "d", "e", "f", "g", "h"]);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertions")]
fn merge_seqno_ordering() -> crate::Result<()> {
let a = vec![Ok(InternalValue::from_components("k", b"v1", 3, Value))];
let b = vec![Ok(InternalValue::from_components("k", b"v2", 7, Value))];
let c = vec![Ok(InternalValue::from_components("k", b"v3", 1, Value))];
let iter = Merger::new(
vec![a.into_iter(), b.into_iter(), c.into_iter()],
comparator::default_comparator(),
);
let seqnos: Vec<u64> = iter.map(|r| r.unwrap().key.seqno).collect();
assert_eq!(seqnos, [7, 3, 1]);
Ok(())
}
#[test]
#[expect(clippy::unwrap_used, reason = "test assertions")]
fn merge_mixed_direction() -> crate::Result<()> {
let a = vec![
Ok(InternalValue::from_components("a", b"", 0, Value)),
Ok(InternalValue::from_components("c", b"", 0, Value)),
Ok(InternalValue::from_components("e", b"", 0, Value)),
];
let b = vec![
Ok(InternalValue::from_components("b", b"", 0, Value)),
Ok(InternalValue::from_components("d", b"", 0, Value)),
Ok(InternalValue::from_components("f", b"", 0, Value)),
];
let mut iter = Merger::new(
vec![a.into_iter(), b.into_iter()],
comparator::default_comparator(),
);
let k = |v: InternalValue| String::from_utf8_lossy(&v.key.user_key).to_string();
assert_eq!(k(iter.next().unwrap()?), "a");
assert_eq!(k(iter.next_back().unwrap()?), "f");
assert_eq!(k(iter.next().unwrap()?), "b");
assert_eq!(k(iter.next_back().unwrap()?), "e");
assert_eq!(k(iter.next().unwrap()?), "c");
assert_eq!(k(iter.next_back().unwrap()?), "d");
assert!(iter.next().is_none(), "should be exhausted");
Ok(())
}
}