use super::{Key, Value};
use crate::Result;
use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap;
type KVIterator = Box<dyn Iterator<Item = Result<(Key, Value)>> + Send>;
#[derive(Debug, Clone)]
struct HeapItem {
key: Key,
value: Value,
source_id: usize, }
impl PartialEq for HeapItem {
fn eq(&self, other: &Self) -> bool {
self.key == other.key && self.value.timestamp == other.value.timestamp
}
}
impl Eq for HeapItem {}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> Ordering {
self.key.cmp(&other.key)
.then(other.value.timestamp.cmp(&self.value.timestamp)) .then(self.source_id.cmp(&other.source_id))
}
}
pub struct MergingIterator {
heap: BinaryHeap<Reverse<HeapItem>>,
sources: Vec<KVIterator>,
last_key: Option<Key>,
finished: bool,
}
impl MergingIterator {
pub fn new(sources: Vec<KVIterator>) -> Self {
let mut iter = Self {
heap: BinaryHeap::new(),
sources,
last_key: None,
finished: false,
};
iter.fill_heap();
iter
}
fn fill_heap(&mut self) {
for (source_id, source) in self.sources.iter_mut().enumerate() {
if let Some(Ok((key, value))) = source.next() {
self.heap.push(Reverse(HeapItem {
key,
value,
source_id,
}));
}
}
}
fn refill_from_source(&mut self, source_id: usize) {
if let Some(source) = self.sources.get_mut(source_id) {
if let Some(Ok((key, value))) = source.next() {
self.heap.push(Reverse(HeapItem {
key,
value,
source_id,
}));
}
}
}
}
impl Iterator for MergingIterator {
type Item = Result<(Key, Value)>;
fn next(&mut self) -> Option<Self::Item> {
if self.finished {
return None;
}
loop {
let Reverse(item) = match self.heap.pop() {
Some(item) => item,
None => {
self.finished = true;
return None;
}
};
self.refill_from_source(item.source_id);
if let Some(last_key) = self.last_key {
if item.key == last_key {
continue;
}
}
if item.value.deleted {
self.last_key = Some(item.key);
continue;
}
self.last_key = Some(item.key);
return Some(Ok((item.key, item.value)));
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::lsm::ValueData;
#[test]
fn test_merging_iterator_basic() {
let source1: Vec<Result<(Key, Value)>> = vec![
Ok((1, Value::new(vec![1], 100))),
Ok((3, Value::new(vec![3], 100))),
Ok((5, Value::new(vec![5], 100))),
];
let source2: Vec<Result<(Key, Value)>> = vec![
Ok((2, Value::new(vec![2], 100))),
Ok((4, Value::new(vec![4], 100))),
Ok((6, Value::new(vec![6], 100))),
];
let sources: Vec<Box<dyn Iterator<Item = Result<(Key, Value)>> + Send>> = vec![
Box::new(source1.into_iter()),
Box::new(source2.into_iter()),
];
let iter = MergingIterator::new(sources);
let keys: Vec<Key> = iter.map(|r| r.unwrap().0).collect();
assert_eq!(keys, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_merging_iterator_mvcc() {
let source1: Vec<Result<(Key, Value)>> = vec![
Ok((1, Value {
data: ValueData::Inline(vec![1, 0, 0]), timestamp: 300,
deleted: false,
})),
];
let source2: Vec<Result<(Key, Value)>> = vec![
Ok((1, Value {
data: ValueData::Inline(vec![1, 0]), timestamp: 200,
deleted: false,
})),
];
let source3: Vec<Result<(Key, Value)>> = vec![
Ok((1, Value {
data: ValueData::Inline(vec![1]), timestamp: 100,
deleted: false,
})),
];
let sources: Vec<Box<dyn Iterator<Item = Result<(Key, Value)>> + Send>> = vec![
Box::new(source1.into_iter()),
Box::new(source2.into_iter()),
Box::new(source3.into_iter()),
];
let iter = MergingIterator::new(sources);
let results: Vec<(Key, Vec<u8>)> = iter.map(|r| {
let (k, v) = r.unwrap();
match v.data {
ValueData::Inline(data) => (k, data),
_ => panic!("Expected inline data"),
}
}).collect();
assert_eq!(results.len(), 1);
assert_eq!(results[0].0, 1);
assert_eq!(results[0].1, vec![1, 0, 0]); }
#[test]
fn test_merging_iterator_tombstone() {
let source1: Vec<Result<(Key, Value)>> = vec![
Ok((1, Value::new(vec![1], 100))),
Ok((2, Value {
data: ValueData::Inline(vec![]),
timestamp: 200,
deleted: true, })),
Ok((3, Value::new(vec![3], 100))),
];
let sources: Vec<Box<dyn Iterator<Item = Result<(Key, Value)>> + Send>> = vec![
Box::new(source1.into_iter()),
];
let iter = MergingIterator::new(sources);
let keys: Vec<Key> = iter.map(|r| r.unwrap().0).collect();
assert_eq!(keys, vec![1, 3]);
}
}