#![allow(dead_code)]
use std::{cmp::Ordering, collections::btree_map::Range as BTreeMapRange};
use reifydb_core::{
actors::pending::PendingWrite,
common::CommitVersion,
encoded::key::EncodedKey,
interface::store::{MultiVersionBatch, MultiVersionRow},
};
pub struct FlowRangeIter<'a> {
committed: Box<dyn Iterator<Item = MultiVersionRow> + Send + 'a>,
pending: BTreeMapRange<'a, EncodedKey, PendingWrite>,
next_pending: Option<(&'a EncodedKey, &'a PendingWrite)>,
next_committed: Option<MultiVersionRow>,
version: CommitVersion,
}
impl<'a> FlowRangeIter<'a> {
pub fn new(
pending: BTreeMapRange<'a, EncodedKey, PendingWrite>,
committed: Box<dyn Iterator<Item = MultiVersionRow> + Send + 'a>,
version: CommitVersion,
) -> Self {
let mut iterator = Self {
pending,
committed,
next_pending: None,
next_committed: None,
version,
};
iterator.advance_pending();
iterator.advance_committed();
iterator
}
fn advance_pending(&mut self) {
self.next_pending = self.pending.next();
}
fn advance_committed(&mut self) {
self.next_committed = self.committed.next();
}
}
impl<'a> Iterator for FlowRangeIter<'a> {
type Item = MultiVersionRow;
fn next(&mut self) -> Option<Self::Item> {
loop {
match (&self.next_pending, &self.next_committed) {
(Some((pending_key, _pending_value)), Some(committed)) => {
match pending_key.as_ref().cmp(committed.key.as_ref()) {
Ordering::Less => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending();
match value {
PendingWrite::Set(row) => {
return Some(MultiVersionRow {
key: key.clone(),
row: row.clone(),
version: self.version,
});
}
PendingWrite::Remove => continue, }
}
Ordering::Equal => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending();
self.advance_committed();
match value {
PendingWrite::Set(row) => {
return Some(MultiVersionRow {
key: key.clone(),
row: row.clone(),
version: self.version,
});
}
PendingWrite::Remove => continue, }
}
Ordering::Greater => {
let committed = self.next_committed.take().unwrap();
self.advance_committed();
return Some(committed);
}
}
}
(Some(_), None) => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending();
match value {
PendingWrite::Set(row) => {
return Some(MultiVersionRow {
key: key.clone(),
row: row.clone(),
version: self.version,
});
}
PendingWrite::Remove => continue, }
}
(None, Some(_)) => {
let committed = self.next_committed.take().unwrap();
self.advance_committed();
return Some(committed);
}
(None, None) => return None,
}
}
}
}
pub fn collect_batch(
pending: BTreeMapRange<'_, EncodedKey, PendingWrite>,
committed_batch: MultiVersionBatch,
version: CommitVersion,
) -> MultiVersionBatch {
let iter = FlowRangeIter::new(pending, Box::new(committed_batch.items.into_iter()), version);
let items: Vec<_> = iter.collect();
MultiVersionBatch {
items,
has_more: false,
}
}
#[cfg(test)]
pub mod tests {
use std::collections::BTreeMap;
use reifydb_core::{
common::CommitVersion,
encoded::{key::EncodedKey, row::EncodedRow},
interface::store::MultiVersionRow,
};
use reifydb_type::util::cowvec::CowVec;
use super::*;
fn make_key(s: &str) -> EncodedKey {
EncodedKey::new(s.as_bytes().to_vec())
}
fn make_value(s: &str) -> EncodedRow {
EncodedRow(CowVec::new(s.as_bytes().to_vec()))
}
fn make_committed(key: &str, value: &str, version: u64) -> MultiVersionRow {
MultiVersionRow {
key: make_key(key),
row: make_value(value),
version: CommitVersion(version),
}
}
#[test]
fn test_empty_range_both_iterators() {
let pending: BTreeMap<EncodedKey, PendingWrite> = BTreeMap::new();
let committed: Vec<MultiVersionRow> = vec![];
let mut iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(1));
assert!(iter.next().is_none());
}
#[test]
fn test_range_only_pending() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("1")));
pending.insert(make_key("b"), PendingWrite::Set(make_value("2")));
pending.insert(make_key("c"), PendingWrite::Set(make_value("3")));
pending.insert(make_key("d"), PendingWrite::Set(make_value("4")));
let committed: Vec<MultiVersionRow> = vec![];
let iter = FlowRangeIter::new(
pending.range(make_key("b")..make_key("d")),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("b"));
assert_eq!(items[1].key, make_key("c"));
}
#[test]
fn test_range_only_committed() {
let pending: BTreeMap<EncodedKey, PendingWrite> = BTreeMap::new();
let committed =
vec![make_committed("a", "1", 5), make_committed("b", "2", 6), make_committed("c", "3", 7)];
let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 3);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("b"));
assert_eq!(items[2].key, make_key("c"));
}
#[test]
fn test_range_filters_removes() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("1")));
pending.insert(make_key("b"), PendingWrite::Remove);
pending.insert(make_key("c"), PendingWrite::Set(make_value("3")));
let committed: Vec<MultiVersionRow> = vec![];
let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("c"));
}
#[test]
fn test_range_pending_shadows_committed() {
let mut pending = BTreeMap::new();
pending.insert(make_key("b"), PendingWrite::Set(make_value("new")));
let committed =
vec![make_committed("a", "1", 5), make_committed("b", "old", 6), make_committed("c", "3", 7)];
let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 3);
assert_eq!(items[1].key, make_key("b"));
assert_eq!(items[1].row, make_value("new"));
assert_eq!(items[1].version, CommitVersion(10));
}
#[test]
fn test_range_remove_hides_committed() {
let mut pending = BTreeMap::new();
pending.insert(make_key("b"), PendingWrite::Remove);
let committed =
vec![make_committed("a", "1", 5), make_committed("b", "2", 6), make_committed("c", "3", 7)];
let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("c"));
}
#[test]
fn test_range_bounded_query() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
pending.insert(make_key("e"), PendingWrite::Set(make_value("e")));
pending.insert(make_key("f"), PendingWrite::Set(make_value("f")));
let committed = vec![make_committed("b", "old_b", 5), make_committed("f", "f_old", 6)];
let iter = FlowRangeIter::new(
pending.range(make_key("b")..make_key("e")),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 4);
assert_eq!(items[0].key, make_key("b"));
assert_eq!(items[0].row, make_value("b"));
assert_eq!(items[1].key, make_key("c"));
assert_eq!(items[2].key, make_key("d"));
assert_eq!(items[3].key, make_key("f"));
assert_eq!(items[3].row, make_value("f_old"));
}
#[test]
fn test_range_interleaved_merge() {
let mut pending = BTreeMap::new();
pending.insert(make_key("b"), PendingWrite::Set(make_value("pending_b")));
pending.insert(make_key("d"), PendingWrite::Set(make_value("pending_d")));
let committed = vec![
make_committed("a", "committed_a", 5),
make_committed("c", "committed_c", 6),
make_committed("e", "committed_e", 7),
];
let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 5);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("b"));
assert_eq!(items[2].key, make_key("c"));
assert_eq!(items[3].key, make_key("d"));
assert_eq!(items[4].key, make_key("e"));
}
#[test]
fn test_range_sorted_order() {
let mut pending = BTreeMap::new();
pending.insert(make_key("m"), PendingWrite::Set(make_value("m")));
pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
pending.insert(make_key("z"), PendingWrite::Set(make_value("z")));
let committed =
vec![make_committed("d", "d", 5), make_committed("k", "k", 6), make_committed("p", "p", 7)];
let iter = FlowRangeIter::new(pending.range(..), Box::new(committed.into_iter()), CommitVersion(10));
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 6);
let keys: Vec<_> = items.iter().map(|i| i.key.clone()).collect();
assert_eq!(
keys,
vec![make_key("a"), make_key("d"), make_key("k"), make_key("m"), make_key("p"), make_key("z")]
);
}
#[test]
fn test_range_with_start_bound() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
let committed: Vec<MultiVersionRow> = vec![];
let iter = FlowRangeIter::new(
pending.range(make_key("b")..),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 3);
assert_eq!(items[0].key, make_key("b"));
assert_eq!(items[1].key, make_key("c"));
assert_eq!(items[2].key, make_key("d"));
}
#[test]
fn test_range_with_end_bound() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
let committed: Vec<MultiVersionRow> = vec![];
let iter = FlowRangeIter::new(
pending.range(..make_key("c")),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[1].key, make_key("b"));
}
#[test]
fn test_range_inclusive_bounds() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
pending.insert(make_key("b"), PendingWrite::Set(make_value("b")));
pending.insert(make_key("c"), PendingWrite::Set(make_value("c")));
pending.insert(make_key("d"), PendingWrite::Set(make_value("d")));
let committed: Vec<MultiVersionRow> = vec![];
let iter = FlowRangeIter::new(
pending.range(make_key("b")..=make_key("c")),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 2);
assert_eq!(items[0].key, make_key("b"));
assert_eq!(items[1].key, make_key("c"));
}
#[test]
fn test_range_complex_scenario() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("new_a")));
pending.insert(make_key("b"), PendingWrite::Remove);
pending.insert(make_key("c"), PendingWrite::Set(make_value("new_c")));
pending.insert(make_key("f"), PendingWrite::Remove);
pending.insert(make_key("g"), PendingWrite::Set(make_value("new_g")));
let committed = vec![
make_committed("a", "old_a", 5),
make_committed("b", "old_b", 6),
make_committed("d", "old_d", 7),
make_committed("e", "old_e", 8),
make_committed("f", "old_f", 9),
];
let iter = FlowRangeIter::new(
pending.range(make_key("a")..make_key("g")),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert_eq!(items.len(), 4);
assert_eq!(items[0].key, make_key("a"));
assert_eq!(items[0].row, make_value("new_a"));
assert_eq!(items[1].key, make_key("c"));
assert_eq!(items[1].row, make_value("new_c"));
assert_eq!(items[2].key, make_key("d"));
assert_eq!(items[2].row, make_value("old_d"));
assert_eq!(items[3].key, make_key("e"));
assert_eq!(items[3].row, make_value("old_e"));
}
#[test]
fn test_range_empty_result() {
let mut pending = BTreeMap::new();
pending.insert(make_key("a"), PendingWrite::Set(make_value("a")));
pending.insert(make_key("z"), PendingWrite::Set(make_value("z")));
let committed: Vec<MultiVersionRow> = vec![];
let iter = FlowRangeIter::new(
pending.range(make_key("m")..make_key("n")),
Box::new(committed.into_iter()),
CommitVersion(10),
);
let items: Vec<_> = iter.collect();
assert!(items.is_empty());
}
}