use std::cmp::Ordering;
use std::marker::PhantomData;
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct LoserNode {
loser: usize,
valid: bool,
}
impl Default for LoserNode {
fn default() -> Self {
Self {
loser: usize::MAX,
valid: false,
}
}
}
pub struct TournamentTree<I, T>
where
I: Iterator<Item = T>,
T: Ord + Clone,
{
tree: Vec<LoserNode>,
iters: Vec<std::iter::Peekable<I>>,
winner: usize,
k: usize,
_phantom: PhantomData<T>,
}
impl<I, T> TournamentTree<I, T>
where
I: Iterator<Item = T>,
T: Ord + Clone,
{
pub fn new(iters: Vec<I>) -> Self {
let k = iters.len();
if k == 0 {
return Self {
tree: vec![],
iters: vec![],
winner: usize::MAX,
k: 0,
_phantom: PhantomData,
};
}
let iters: Vec<_> = iters.into_iter().map(|it| it.peekable()).collect();
let tree = vec![LoserNode::default(); k];
let mut this = Self {
tree,
iters,
winner: 0,
k,
_phantom: PhantomData,
};
this.build();
this
}
fn build(&mut self) {
if self.k == 0 {
self.winner = usize::MAX;
return;
}
let mut valid_sources: Vec<usize> = Vec::with_capacity(self.k);
for idx in 0..self.k {
if self.iters[idx].peek().is_some() {
valid_sources.push(idx);
}
}
if valid_sources.is_empty() {
self.winner = usize::MAX;
return;
}
let mut winner = valid_sources[0];
for &idx in &valid_sources[1..] {
if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
let node_idx = (self.k + winner) / 2;
if node_idx < self.tree.len() {
self.tree[node_idx] = LoserNode {
loser: winner,
valid: true,
};
}
winner = idx;
} else {
let node_idx = (self.k + idx) / 2;
if node_idx < self.tree.len() {
self.tree[node_idx] = LoserNode {
loser: idx,
valid: true,
};
}
}
}
self.winner = winner;
}
fn compare_sources(&mut self, source_a: usize, source_b: usize) -> std::cmp::Ordering {
let key_a = self.iters[source_a].peek().cloned();
let key_b = self.iters[source_b].peek().cloned();
match (key_a, key_b) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Greater, (Some(_), None) => std::cmp::Ordering::Less,
(Some(a), Some(b)) => a.cmp(&b),
}
}
#[allow(dead_code)]
fn set_loser_on_path(&mut self, loser_idx: usize, _winner_idx: usize) {
if self.k == 0 {
return;
}
let node_idx = (self.k + loser_idx) / 2;
if node_idx < self.tree.len() {
self.tree[node_idx] = LoserNode {
loser: loser_idx,
valid: true,
};
}
}
pub fn pop(&mut self) -> Option<(usize, T)> {
if self.winner == usize::MAX || self.k == 0 {
return None;
}
let value = self.iters[self.winner].next()?;
let old_winner = self.winner;
self.replay(old_winner);
Some((old_winner, value))
}
fn replay(&mut self, changed_idx: usize) {
if self.k <= 1 {
if self.k == 1 && self.iters[0].peek().is_none() {
self.winner = usize::MAX;
}
return;
}
let changed_exhausted = self.iters[changed_idx].peek().is_none();
if changed_exhausted {
self.rebuild();
return;
}
let mut winner = changed_idx;
for idx in 0..self.k {
if idx == winner {
continue;
}
if self.compare_sources(idx, winner) == std::cmp::Ordering::Less {
winner = idx;
}
}
self.winner = winner;
}
fn rebuild(&mut self) {
self.build();
}
pub fn peek(&mut self) -> Option<(usize, &T)> {
if self.winner == usize::MAX || self.k == 0 {
return None;
}
self.iters[self.winner].peek().map(|v| (self.winner, v))
}
pub fn is_empty(&self) -> bool {
self.winner == usize::MAX
}
pub fn source_count(&self) -> usize {
self.k
}
}
pub struct MergeIterator<I, T>
where
I: Iterator<Item = T>,
T: Ord + Clone,
{
tree: TournamentTree<I, T>,
last_key: Option<T>,
deduplicate: bool,
}
impl<I, T> MergeIterator<I, T>
where
I: Iterator<Item = T>,
T: Ord + Clone,
{
pub fn new(iters: Vec<I>, deduplicate: bool) -> Self {
Self {
tree: TournamentTree::new(iters),
last_key: None,
deduplicate,
}
}
}
impl<I, T> Iterator for MergeIterator<I, T>
where
I: Iterator<Item = T>,
T: Ord + Clone,
{
type Item = (usize, T);
fn next(&mut self) -> Option<Self::Item> {
loop {
let (source, value) = self.tree.pop()?;
if self.deduplicate {
if let Some(ref last) = self.last_key {
if &value == last {
continue;
}
}
self.last_key = Some(value.clone());
}
return Some((source, value));
}
}
}
use crate::tiered_memtable::HotEntry;
#[derive(Clone)]
pub struct KeyedEntry {
pub entry: HotEntry,
}
impl PartialEq for KeyedEntry {
fn eq(&self, other: &Self) -> bool {
self.entry.key.as_slice() == other.entry.key.as_slice()
}
}
impl Eq for KeyedEntry {}
impl PartialOrd for KeyedEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for KeyedEntry {
fn cmp(&self, other: &Self) -> Ordering {
self.entry.key.as_slice().cmp(other.entry.key.as_slice())
}
}
pub struct HotEntryMerger {
tree: TournamentTree<std::vec::IntoIter<KeyedEntry>, KeyedEntry>,
last_key: Option<Vec<u8>>,
}
impl HotEntryMerger {
pub fn new(sources: Vec<Vec<HotEntry>>) -> Self {
let iters: Vec<_> = sources
.into_iter()
.map(|v| v.into_iter().map(|e| KeyedEntry { entry: e }).collect::<Vec<_>>().into_iter())
.collect();
Self {
tree: TournamentTree::new(iters),
last_key: None,
}
}
pub fn next_unique(&mut self) -> Option<(usize, HotEntry)> {
loop {
let (source, keyed) = self.tree.pop()?;
if let Some(ref last) = self.last_key {
if keyed.entry.key.as_slice() == last.as_slice() {
continue;
}
}
self.last_key = Some(keyed.entry.key.to_vec());
return Some((source, keyed.entry));
}
}
}
impl Iterator for HotEntryMerger {
type Item = (usize, HotEntry);
fn next(&mut self) -> Option<Self::Item> {
self.next_unique()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tournament_tree_basic() {
let sources: Vec<Vec<i32>> = vec![
vec![1, 4, 7, 10],
vec![2, 5, 8, 11],
vec![3, 6, 9, 12],
];
let iters = sources.into_iter().map(|v| v.into_iter());
let mut tree = TournamentTree::new(iters.collect());
let mut result = Vec::new();
while let Some((_, val)) = tree.pop() {
result.push(val);
}
assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]);
}
#[test]
fn test_tournament_tree_uneven() {
let sources: Vec<Vec<i32>> = vec![
vec![1, 10],
vec![2, 3, 4, 5],
vec![6],
];
let iters = sources.into_iter().map(|v| v.into_iter());
let mut tree = TournamentTree::new(iters.collect());
let mut result = Vec::new();
while let Some((_, val)) = tree.pop() {
result.push(val);
}
assert_eq!(result, vec![1, 2, 3, 4, 5, 6, 10]);
}
#[test]
fn test_tournament_tree_single() {
let sources: Vec<Vec<i32>> = vec![
vec![1, 2, 3],
];
let iters = sources.into_iter().map(|v| v.into_iter());
let mut tree = TournamentTree::new(iters.collect());
let mut result = Vec::new();
while let Some((_, val)) = tree.pop() {
result.push(val);
}
assert_eq!(result, vec![1, 2, 3]);
}
#[test]
fn test_tournament_tree_empty() {
let sources: Vec<Vec<i32>> = vec![];
let iters = sources.into_iter().map(|v| v.into_iter());
let mut tree = TournamentTree::new(iters.collect());
assert!(tree.pop().is_none());
assert!(tree.is_empty());
}
#[test]
fn test_tournament_tree_with_duplicates() {
let sources: Vec<Vec<i32>> = vec![
vec![1, 3, 5],
vec![1, 2, 4], vec![2, 3, 6], ];
let iters = sources.into_iter().map(|v| v.into_iter());
let tree = TournamentTree::new(iters.collect());
let merge_iter = MergeIterator {
tree,
last_key: None,
deduplicate: false,
};
let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
assert_eq!(result, vec![1, 1, 2, 2, 3, 3, 4, 5, 6]);
}
#[test]
fn test_merge_iterator_deduplicate() {
let sources: Vec<Vec<i32>> = vec![
vec![1, 3, 5],
vec![1, 2, 4],
vec![2, 3, 6],
];
let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
let merge_iter = MergeIterator::new(iters, true);
let result: Vec<_> = merge_iter.map(|(_, v)| v).collect();
assert_eq!(result, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_source_tracking() {
let sources: Vec<Vec<i32>> = vec![
vec![2, 5],
vec![1, 4],
vec![3, 6],
];
let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
let mut tree = TournamentTree::new(iters);
let mut results = Vec::new();
while let Some((source, val)) = tree.pop() {
results.push((source, val));
}
assert_eq!(results[0], (1, 1)); assert_eq!(results[1], (0, 2)); assert_eq!(results[2], (2, 3)); }
#[test]
fn test_peek() {
let sources: Vec<Vec<i32>> = vec![
vec![2, 4],
vec![1, 3],
];
let iters: Vec<_> = sources.into_iter().map(|v| v.into_iter()).collect();
let mut tree = TournamentTree::new(iters);
assert_eq!(tree.peek(), Some((1, &1)));
assert_eq!(tree.peek(), Some((1, &1)));
assert_eq!(tree.pop(), Some((1, 1)));
assert_eq!(tree.peek(), Some((0, &2)));
}
}