use std::cmp::Ordering;
use std::collections::BinaryHeap;
use crate::lsm::sstable::SSTableEntry;
#[derive(Debug, Clone, PartialEq, Eq)]
struct HeapItem {
entry: SSTableEntry,
source_id: usize,
}
impl Ord for HeapItem {
fn cmp(&self, other: &Self) -> Ordering {
match self.entry.key.cmp(&other.entry.key) {
Ordering::Less => Ordering::Greater,
Ordering::Greater => Ordering::Less,
Ordering::Equal => match self.entry.timestamp.cmp(&other.entry.timestamp) {
Ordering::Equal => match self.entry.sequence.cmp(&other.entry.sequence) {
Ordering::Equal => other.source_id.cmp(&self.source_id),
ord => ord,
},
ord => ord,
},
}
}
}
impl PartialOrd for HeapItem {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
pub struct MergingIterator {
sources: Vec<Box<dyn Iterator<Item = SSTableEntry>>>,
heap: BinaryHeap<HeapItem>,
last_yielded: Option<(Vec<u8>, u64, u64)>,
}
impl MergingIterator {
pub fn new(sources: Vec<Box<dyn Iterator<Item = SSTableEntry>>>) -> Self {
let mut iter = Self {
sources,
heap: BinaryHeap::new(),
last_yielded: None,
};
iter.prime();
iter
}
fn prime(&mut self) {
for source_id in 0..self.sources.len() {
if let Some(entry) = self.sources[source_id].next() {
self.heap.push(HeapItem { entry, source_id });
}
}
}
}
impl Iterator for MergingIterator {
type Item = SSTableEntry;
fn next(&mut self) -> Option<Self::Item> {
while let Some(item) = self.heap.pop() {
if let Some(next) = self.sources[item.source_id].next() {
self.heap.push(HeapItem {
entry: next,
source_id: item.source_id,
});
}
let key = item.entry.key.clone();
let ts = item.entry.timestamp;
let seq = item.entry.sequence;
let ident = (key, ts, seq);
if self.last_yielded.as_ref() == Some(&ident) {
continue;
}
self.last_yielded = Some(ident);
return Some(item.entry);
}
None
}
}
#[cfg(all(test, not(target_arch = "wasm32")))]
mod tests {
use super::*;
fn put(key: &[u8], value: &[u8], ts: u64, seq: u64) -> SSTableEntry {
SSTableEntry {
key: key.to_vec(),
value: Some(value.to_vec()),
timestamp: ts,
sequence: seq,
}
}
fn del(key: &[u8], ts: u64, seq: u64) -> SSTableEntry {
SSTableEntry {
key: key.to_vec(),
value: None,
timestamp: ts,
sequence: seq,
}
}
#[test]
fn merges_in_key_order_and_prioritizes_newer_versions() {
let s0 = vec![
put(b"a", b"v2", 20, 1),
put(b"a", b"v1", 10, 1),
put(b"b", b"x", 5, 1),
];
let s1 = vec![del(b"a", 15, 1), put(b"c", b"z", 7, 1)];
let merged: Vec<_> =
MergingIterator::new(vec![Box::new(s0.into_iter()), Box::new(s1.into_iter())])
.collect();
assert_eq!(merged[0].key, b"a".to_vec());
assert_eq!(merged[1].key, b"a".to_vec());
assert_eq!(merged[2].key, b"a".to_vec());
assert_eq!(merged[3].key, b"b".to_vec());
assert_eq!(merged[4].key, b"c".to_vec());
assert_eq!(merged[0].timestamp, 20);
assert_eq!(merged[1].timestamp, 15);
assert_eq!(merged[2].timestamp, 10);
}
#[test]
fn dedupes_identical_versions_across_sources() {
let e = put(b"a", b"v", 10, 1);
let merged: Vec<_> = MergingIterator::new(vec![
Box::new(vec![e.clone()].into_iter()),
Box::new(vec![e].into_iter()),
])
.collect();
assert_eq!(merged.len(), 1);
}
}