use async_trait::async_trait;
use crate::error::SlateDBError;
use crate::iter::KeyValueIterator;
use crate::types::{RowEntry, ValueDeletable};
use std::cmp::{Ordering, Reverse};
use std::collections::{BinaryHeap, VecDeque};
struct MergeIteratorHeapEntry<'a> {
next_kv: RowEntry,
index: usize,
iterator: Box<dyn KeyValueIterator + 'a>,
}
impl<'a> MergeIteratorHeapEntry<'a> {
async fn seek(
mut self,
next_key: &[u8],
) -> Result<Option<MergeIteratorHeapEntry<'a>>, SlateDBError> {
if self.next_kv.key >= next_key {
Ok(Some(self))
} else {
self.iterator.seek(next_key).await?;
if let Some(next_kv) = self.iterator.next_entry().await? {
Ok(Some(MergeIteratorHeapEntry {
next_kv,
index: self.index,
iterator: self.iterator,
}))
} else {
Ok(None)
}
}
}
}
impl Eq for MergeIteratorHeapEntry<'_> {}
impl PartialEq<Self> for MergeIteratorHeapEntry<'_> {
fn eq(&self, other: &Self) -> bool {
self.index == other.index && self.next_kv == other.next_kv
}
}
impl PartialOrd<Self> for MergeIteratorHeapEntry<'_> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for MergeIteratorHeapEntry<'_> {
fn cmp(&self, other: &Self) -> Ordering {
match self.next_kv.key.cmp(&other.next_kv.key) {
Ordering::Equal => other.next_kv.seq.cmp(&self.next_kv.seq), ord => ord, }
}
}
pub(crate) struct MergeIterator<'a> {
current: Option<MergeIteratorHeapEntry<'a>>,
iterators: BinaryHeap<Reverse<MergeIteratorHeapEntry<'a>>>,
pending_iterators: Vec<(usize, Box<dyn KeyValueIterator + 'a>)>,
dedup: bool,
initialized: bool,
}
impl<'a> MergeIterator<'a> {
pub(crate) fn new<T: KeyValueIterator + 'a>(
iterators: impl IntoIterator<Item = T>,
) -> Result<Self, SlateDBError> {
Ok(Self {
current: None,
iterators: BinaryHeap::new(),
pending_iterators: iterators
.into_iter()
.enumerate()
.map(|(index, iterator)| {
(index, Box::new(iterator) as Box<dyn KeyValueIterator + 'a>)
})
.collect(),
dedup: true,
initialized: false,
})
}
pub(crate) fn with_dedup(mut self, dedup: bool) -> Self {
self.dedup = dedup;
self
}
async fn initialize(&mut self) -> Result<(), SlateDBError> {
if self.initialized {
return Ok(());
}
for (index, mut iterator) in self.pending_iterators.drain(..) {
iterator.init().await?;
if let Some(next_kv) = iterator.next_entry().await? {
self.iterators.push(Reverse(MergeIteratorHeapEntry {
next_kv,
index,
iterator,
}));
}
}
self.current = self.iterators.pop().map(|r| r.0);
self.initialized = true;
Ok(())
}
async fn ensure_initialized(&mut self) -> Result<(), SlateDBError> {
if !self.initialized {
self.initialize().await?;
}
Ok(())
}
fn peek(&self) -> Option<&RowEntry> {
self.current.as_ref().map(|c| &c.next_kv)
}
async fn advance(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
self.ensure_initialized().await?;
if let Some(mut iterator_state) = self.current.take() {
let current_kv = iterator_state.next_kv;
if let Some(kv) = iterator_state.iterator.next_entry().await? {
iterator_state.next_kv = kv;
self.iterators.push(Reverse(iterator_state));
}
self.current = self.iterators.pop().map(|r| r.0);
return Ok(Some(current_kv));
}
Ok(None)
}
}
#[async_trait]
impl KeyValueIterator for MergeIterator<'_> {
async fn init(&mut self) -> Result<(), SlateDBError> {
self.initialize().await
}
async fn next_entry(&mut self) -> Result<Option<RowEntry>, SlateDBError> {
if !self.initialized {
return Err(SlateDBError::IteratorNotInitialized);
}
if !self.dedup {
return self.advance().await;
}
let current_kv = match self.advance().await? {
Some(kv) => kv,
None => return Ok(None),
};
if !matches!(current_kv.value, ValueDeletable::Merge(_)) {
while let Some(peeked_entry) = self.peek() {
if peeked_entry.key != current_kv.key {
break;
}
self.advance().await?;
}
}
Ok(Some(current_kv))
}
async fn seek(&mut self, next_key: &[u8]) -> Result<(), SlateDBError> {
if !self.initialized {
return Err(SlateDBError::IteratorNotInitialized);
}
self.ensure_initialized().await?;
let mut seek_futures = VecDeque::new();
if let Some(iterator) = self.current.take() {
seek_futures.push_back(iterator.seek(next_key))
}
for iterator in self.iterators.drain() {
seek_futures.push_back(iterator.0.seek(next_key));
}
for seek_result in futures::future::join_all(seek_futures).await {
if let Some(seeked_iterator) = seek_result? {
self.iterators.push(Reverse(seeked_iterator));
}
}
self.current = self.iterators.pop().map(|r| r.0);
Ok(())
}
}
#[cfg(test)]
mod tests {
use crate::iter::KeyValueIterator;
use crate::merge_iterator::MergeIterator;
use crate::test_utils::{assert_iterator, assert_next_entry, TestIterator};
use crate::types::RowEntry;
use std::collections::VecDeque;
use std::vec;
#[tokio::test]
async fn test_merge_iterator_should_include_entries_in_order() {
let mut iters: VecDeque<TestIterator> = VecDeque::new();
iters.push_back(
TestIterator::new()
.with_entry(b"aaaa", b"1111", 0)
.with_entry(b"cccc", b"3333", 0)
.with_entry(b"zzzz", b"26262626", 0),
);
iters.push_back(
TestIterator::new()
.with_entry(b"bbbb", b"2222", 0)
.with_entry(b"xxxx", b"24242424", 0)
.with_entry(b"yyyy", b"25252525", 0),
);
iters.push_back(
TestIterator::new()
.with_entry(b"dddd", b"4444", 0)
.with_entry(b"eeee", b"5555", 0)
.with_entry(b"gggg", b"7777", 0),
);
let mut merge_iter = MergeIterator::new(iters).unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"aaaa", b"1111", 0),
RowEntry::new_value(b"bbbb", b"2222", 0),
RowEntry::new_value(b"cccc", b"3333", 0),
RowEntry::new_value(b"dddd", b"4444", 0),
RowEntry::new_value(b"eeee", b"5555", 0),
RowEntry::new_value(b"gggg", b"7777", 0),
RowEntry::new_value(b"xxxx", b"24242424", 0),
RowEntry::new_value(b"yyyy", b"25252525", 0),
RowEntry::new_value(b"zzzz", b"26262626", 0),
],
)
.await;
}
#[tokio::test]
async fn test_merge_iterator_should_write_one_entry_with_given_key() {
let mut iters: VecDeque<TestIterator> = VecDeque::new();
iters.push_back(
TestIterator::new()
.with_entry(b"aaaa", b"0000", 6)
.with_entry(b"aaaa", b"1111", 5)
.with_entry(b"cccc", b"use this one c", 5),
);
iters.push_back(
TestIterator::new()
.with_entry(b"cccc", b"badc1", 1)
.with_entry(b"xxxx", b"use this one x", 4),
);
iters.push_back(
TestIterator::new()
.with_entry(b"bbbb", b"2222", 3)
.with_entry(b"cccc", b"badc2", 3)
.with_entry(b"xxxx", b"badx1", 3),
);
let mut merge_iter = MergeIterator::new(iters).unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"aaaa", b"0000", 6),
RowEntry::new_value(b"bbbb", b"2222", 3),
RowEntry::new_value(b"cccc", b"use this one c", 5),
RowEntry::new_value(b"xxxx", b"use this one x", 4),
],
)
.await;
}
#[tokio::test]
async fn test_two_iterator_should_include_entries_in_order() {
let iter1 = TestIterator::new()
.with_entry(b"aaaa", b"1111", 0)
.with_entry(b"cccc", b"3333", 0)
.with_entry(b"zzzz", b"26262626", 0);
let iter2 = TestIterator::new()
.with_entry(b"bbbb", b"2222", 0)
.with_entry(b"xxxx", b"24242424", 0)
.with_entry(b"yyyy", b"25252525", 0);
let mut merge_iter = MergeIterator::new([iter1, iter2]).unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"aaaa", b"1111", 0),
RowEntry::new_value(b"bbbb", b"2222", 0),
RowEntry::new_value(b"cccc", b"3333", 0),
RowEntry::new_value(b"xxxx", b"24242424", 0),
RowEntry::new_value(b"yyyy", b"25252525", 0),
RowEntry::new_value(b"zzzz", b"26262626", 0),
],
)
.await;
}
#[tokio::test]
async fn test_two_iterator_should_write_one_entry_with_given_key() {
let iter1 = TestIterator::new()
.with_entry(b"aaaa", b"1111", 0)
.with_entry(b"cccc", b"use this one c", 5);
let iter2 = TestIterator::new()
.with_entry(b"cccc", b"badc1", 2)
.with_entry(b"xxxx", b"24242424", 3);
let mut merge_iter = MergeIterator::new([iter1, iter2]).unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"aaaa", b"1111", 0),
RowEntry::new_value(b"cccc", b"use this one c", 5),
RowEntry::new_value(b"xxxx", b"24242424", 3),
],
)
.await;
}
#[tokio::test]
async fn test_seek_merge_iter() {
let mut iters: VecDeque<TestIterator> = VecDeque::new();
iters.push_back(
TestIterator::new()
.with_entry(b"aa", b"aa1", 0)
.with_entry(b"bb", b"bb1", 0),
);
iters.push_back(
TestIterator::new()
.with_entry(b"aa", b"aa2", 0)
.with_entry(b"bb", b"bb2", 0)
.with_entry(b"cc", b"cc2", 0),
);
let mut merge_iter = MergeIterator::new(iters).unwrap();
merge_iter.init().await.unwrap();
merge_iter.seek(b"bb".as_ref()).await.unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"bb", b"bb1", 0),
RowEntry::new_value(b"cc", b"cc2", 0),
],
)
.await;
}
#[tokio::test]
async fn test_seek_merge_iter_to_current_key() {
let mut iters: VecDeque<TestIterator> = VecDeque::new();
iters.push_back(
TestIterator::new()
.with_entry(b"aa", b"aa1", 0)
.with_entry(b"bb", b"bb1", 0),
);
iters.push_back(
TestIterator::new()
.with_entry(b"aa", b"aa2", 0)
.with_entry(b"bb", b"bb2", 0)
.with_entry(b"cc", b"cc2", 0),
);
let mut merge_iter = MergeIterator::new(iters).unwrap();
assert_next_entry(&mut merge_iter, &RowEntry::new_value(b"aa", b"aa1", 0)).await;
merge_iter.seek(b"bb".as_ref()).await.unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"bb", b"bb1", 0),
RowEntry::new_value(b"cc", b"cc2", 0),
],
)
.await;
}
#[tokio::test]
async fn test_two_merge_seek() {
let iter1 = TestIterator::new()
.with_entry(b"aa", b"aa1", 1)
.with_entry(b"bb", b"bb0", 2)
.with_entry(b"bb", b"bb1", 1)
.with_entry(b"dd", b"dd1", 3);
let iter2 = TestIterator::new()
.with_entry(b"aa", b"aa2", 4)
.with_entry(b"bb", b"bb2", 5)
.with_entry(b"cc", b"cc0", 6)
.with_entry(b"cc", b"cc2", 5)
.with_entry(b"ee", b"ee2", 7);
let mut merge_iter = MergeIterator::new([iter1, iter2]).unwrap();
merge_iter.init().await.unwrap();
merge_iter.seek(b"b".as_ref()).await.unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"bb", b"bb2", 5),
RowEntry::new_value(b"cc", b"cc0", 6),
RowEntry::new_value(b"dd", b"dd1", 3),
RowEntry::new_value(b"ee", b"ee2", 7),
],
)
.await;
}
#[tokio::test]
async fn test_dedup_disabled() {
let iter1 = TestIterator::new()
.with_entry(b"key1", b"value1", 1)
.with_entry(b"key2", b"value2", 2);
let iter2 = TestIterator::new()
.with_entry(b"key1", b"value1_updated", 3)
.with_entry(b"key3", b"value3", 4);
let mut merge_iter = MergeIterator::new([iter1, iter2])
.unwrap()
.with_dedup(false);
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_value(b"key1", b"value1_updated", 3), RowEntry::new_value(b"key1", b"value1", 1), RowEntry::new_value(b"key2", b"value2", 2),
RowEntry::new_value(b"key3", b"value3", 4),
],
)
.await;
}
#[tokio::test]
async fn should_not_dedup_valid_merge_entries() {
let mut iters: VecDeque<TestIterator> = VecDeque::new();
iters.push_back(
TestIterator::new()
.with_row_entry(RowEntry::new_merge(b"k1", b"b", 2))
.with_row_entry(RowEntry::new_merge(b"k1", b"a", 1)),
);
iters.push_back(TestIterator::new().with_row_entry(RowEntry::new_merge(b"k1", b"c", 3)));
let mut merge_iter = MergeIterator::new(iters).unwrap();
assert_iterator(
&mut merge_iter,
vec![
RowEntry::new_merge(b"k1", b"c", 3),
RowEntry::new_merge(b"k1", b"b", 2),
RowEntry::new_merge(b"k1", b"a", 1),
],
)
.await;
}
#[tokio::test]
async fn should_advance_past_old_merge_entries() {
let mut iters: VecDeque<TestIterator> = VecDeque::new();
iters.push_back(
TestIterator::new()
.with_row_entry(RowEntry::new_merge(b"k1", b"a", 2))
.with_row_entry(RowEntry::new_merge(b"k1", b"b", 1)),
);
iters.push_back(TestIterator::new().with_row_entry(RowEntry::new_value(
b"k1",
b"new_value",
3,
)));
let mut merge_iter = MergeIterator::new(iters).unwrap();
assert_iterator(
&mut merge_iter,
vec![RowEntry::new_value(b"k1", b"new_value", 3)],
)
.await;
}
}