use crate::comparator::UserComparator;
use crate::loser_tree::LoserTree;
use crate::merge_source::{CoherentMergeSource, IterItem, MergeSource};
use alloc::vec::Vec;
use core::cmp::Ordering;
struct MergerEntry {
source: usize,
value: crate::InternalValue,
}
#[derive(Clone)]
struct MinCmp<C: UserComparator + Clone> {
comparator: C,
}
impl<C: UserComparator + Clone> crate::loser_tree::EntryComparator<MergerEntry> for MinCmp<C> {
#[expect(
clippy::inline_always,
reason = "called O(log cap) per replay step on the merger's hot path; \
matching the loser-tree's own #[inline(always)] on cmp_indices \
is what makes the dispatch flatten — verified in disassembly"
)]
#[inline(always)]
fn compare(&self, a: &MergerEntry, b: &MergerEntry) -> Ordering {
a.value
.key
.compare_with(&b.value.key, &self.comparator)
.then_with(|| a.source.cmp(&b.source))
}
}
#[derive(Clone)]
struct MaxCmp<C: UserComparator + Clone> {
comparator: C,
}
impl<C: UserComparator + Clone> crate::loser_tree::EntryComparator<MergerEntry> for MaxCmp<C> {
#[expect(
clippy::inline_always,
reason = "called O(log cap) per replay step on the merger's hot path"
)]
#[inline(always)]
fn compare(&self, a: &MergerEntry, b: &MergerEntry) -> Ordering {
b.value
.key
.compare_with(&a.value.key, &self.comparator)
.then_with(|| b.source.cmp(&a.source))
}
}
fn build_min_cmp<C: UserComparator + Clone>(comparator: C) -> MinCmp<C> {
MinCmp { comparator }
}
fn build_max_cmp<C: UserComparator + Clone>(comparator: C) -> MaxCmp<C> {
MaxCmp { comparator }
}
pub struct SeekingMerger<S: MergeSource, C: UserComparator + Clone> {
sources: Vec<S>,
n_sources: usize,
comparator: C,
forward_tree: Option<LoserTree<MergerEntry, MinCmp<C>>>,
backward_tree: Option<LoserTree<MergerEntry, MaxCmp<C>>>,
pending_forward_error: Option<crate::Error>,
pending_backward_error: Option<crate::Error>,
forward_primed: bool,
backward_primed: bool,
}
impl<S: MergeSource, C: UserComparator + Clone> SeekingMerger<S, C> {
#[must_use]
pub fn new(sources: Vec<S>, comparator: C) -> Self {
let n = sources.len();
Self {
sources,
n_sources: n,
comparator,
forward_tree: None,
backward_tree: None,
pending_forward_error: None,
pending_backward_error: None,
forward_primed: false,
backward_primed: false,
}
}
fn initialize_forward(&mut self) {
let Self {
sources,
n_sources,
comparator,
forward_tree,
backward_tree,
pending_forward_error,
..
} = self;
let n = *n_sources;
let mut pull = |i: usize| -> Option<MergerEntry> {
#[expect(
clippy::indexing_slicing,
reason = "i < n_sources by construction; sources len == n_sources"
)]
match MergeSource::next(&mut sources[i]) {
Some(Ok(value)) => Some(MergerEntry { source: i, value }),
Some(Err(e)) => {
pending_forward_error.get_or_insert(e);
None
}
None => backward_tree.as_mut().and_then(|bt| bt.take_slot(i)),
}
};
if let Some(tree) = forward_tree {
tree.refill_with(pull);
} else {
let mut initial: Vec<Option<MergerEntry>> = Vec::with_capacity(n);
for i in 0..n {
initial.push(pull(i));
}
let cmp = build_min_cmp(comparator.clone());
*forward_tree = Some(LoserTree::build(initial, cmp));
}
}
fn initialize_backward(&mut self) {
let Self {
sources,
n_sources,
comparator,
forward_tree,
backward_tree,
pending_backward_error,
..
} = self;
let n = *n_sources;
let mut pull = |i: usize| -> Option<MergerEntry> {
#[expect(
clippy::indexing_slicing,
reason = "i < n_sources by construction; sources len == n_sources"
)]
match MergeSource::next_back(&mut sources[i]) {
Some(Ok(value)) => Some(MergerEntry { source: i, value }),
Some(Err(e)) => {
pending_backward_error.get_or_insert(e);
None
}
None => forward_tree.as_mut().and_then(|ft| ft.take_slot(i)),
}
};
if let Some(tree) = backward_tree {
tree.refill_with(pull);
} else {
let mut initial: Vec<Option<MergerEntry>> = Vec::with_capacity(n);
for i in 0..n {
initial.push(pull(i));
}
let cmp = build_max_cmp(comparator.clone());
*backward_tree = Some(LoserTree::build(initial, cmp));
}
}
}
impl<S: MergeSource + crate::reseek::Reseekable, C: UserComparator + Clone>
crate::reseek::Reseekable for SeekingMerger<S, C>
{
fn reseek(&mut self, ctx: &crate::reseek::ReseekCtx) {
for source in &mut self.sources {
source.reseek(ctx);
}
if let Some(tree) = &mut self.forward_tree {
tree.clear();
}
if let Some(tree) = &mut self.backward_tree {
tree.clear();
}
self.forward_primed = false;
self.backward_primed = false;
self.pending_forward_error = None;
self.pending_backward_error = None;
}
}
impl<S: MergeSource, C: UserComparator + Clone> Iterator for SeekingMerger<S, C> {
type Item = IterItem;
fn next(&mut self) -> Option<Self::Item> {
if let Some(e) = self.pending_forward_error.take() {
return Some(Err(e));
}
if let Some(e) = self.pending_backward_error.take() {
return Some(Err(e));
}
if !self.forward_primed {
self.initialize_forward();
self.forward_primed = true;
if let Some(e) = self.pending_forward_error.take() {
return Some(Err(e));
}
}
let tree = self.forward_tree.as_mut()?;
let source = tree.winner_slot()?;
#[expect(
clippy::indexing_slicing,
reason = "source index < n_sources by construction"
)]
let next_pull = MergeSource::next(&mut self.sources[source]);
match next_pull {
Some(Ok(next_value)) => {
let old = tree.replace_min(MergerEntry {
source,
value: next_value,
});
Some(Ok(old.value))
}
Some(Err(e)) => {
let old = tree.pop_min()?;
self.pending_forward_error.get_or_insert(e);
Some(Ok(old.value))
}
None => {
let old = tree.pop_min()?;
Some(Ok(old.value))
}
}
}
}
impl<S: CoherentMergeSource, C: UserComparator + Clone> DoubleEndedIterator
for SeekingMerger<S, C>
{
fn next_back(&mut self) -> Option<Self::Item> {
if let Some(e) = self.pending_backward_error.take() {
return Some(Err(e));
}
if let Some(e) = self.pending_forward_error.take() {
return Some(Err(e));
}
if !self.backward_primed {
self.initialize_backward();
self.backward_primed = true;
if let Some(e) = self.pending_backward_error.take() {
return Some(Err(e));
}
}
let tree = self.backward_tree.as_mut()?;
let source = tree.winner_slot()?;
#[expect(
clippy::indexing_slicing,
reason = "source index < n_sources by construction"
)]
let next_pull = MergeSource::next_back(&mut self.sources[source]);
match next_pull {
Some(Ok(next_value)) => {
let old = tree.replace_min(MergerEntry {
source,
value: next_value,
});
Some(Ok(old.value))
}
Some(Err(e)) => {
let old = tree.pop_min()?;
self.pending_backward_error.get_or_insert(e);
Some(Ok(old.value))
}
None => {
let old = tree.pop_min()?;
Some(Ok(old.value))
}
}
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, reason = "test assertions")]
mod tests;