use crate::direction::Direction;
use crate::queue::Merge;
use crate::versions::Versions;
use bytes::Bytes;
use ferntree::iter::{Range as FernRange, RangeRev};
use ferntree::Tree;
use std::collections::btree_map::Range as TreeRange;
use std::ops::Bound;
use std::ops::ControlFlow;
use std::sync::Arc;
const IC: usize = 64;
const LC: usize = 64;
pub(crate) enum TreeIterState<'a> {
Forward(FernRange<'a, Bytes, Versions, IC, LC>),
Reverse(RangeRev<'a, Bytes, Versions, IC, LC>),
}
impl<'a> TreeIterState<'a> {
#[inline]
pub(crate) fn build(
tree: &'a Tree<Bytes, Versions>,
beg: &Bytes,
end: &Bytes,
direction: Direction,
) -> Self {
match direction {
Direction::Forward => {
TreeIterState::Forward(tree.range(Bound::Included(beg), Bound::Excluded(end)))
}
Direction::Reverse => {
TreeIterState::Reverse(tree.range_rev(Bound::Included(beg), Bound::Excluded(end)))
}
}
}
}
#[inline]
pub(crate) fn for_each_in_range<F>(tree: &Tree<Bytes, Versions>, beg: &Bytes, end: &Bytes, mut f: F)
where
F: FnMut(&Bytes, &Versions) -> ControlFlow<()>,
{
let mut iter = tree.raw_iter();
iter.seek(beg);
let mut stop = false;
loop {
let has_more_leaves = iter.for_each_in_leaf(|k, v| {
if stop {
return;
}
if k >= end {
stop = true;
return;
}
if matches!(f(k, v), ControlFlow::Break(())) {
stop = true;
}
});
if stop || !has_more_leaves {
break;
}
let Some((k, v)) = iter.next() else {
break;
};
if k >= end {
break;
}
if matches!(f(k, v), ControlFlow::Break(())) {
break;
}
}
}
type CachedTreeEntry = Option<(Bytes, bool)>;
pub(crate) struct MergeQueueIter {
sources: Vec<Arc<Merge>>,
heads: Vec<Option<(Bytes, Option<Bytes>)>>,
beg: Bytes,
end: Bytes,
direction: Direction,
}
impl MergeQueueIter {
pub(crate) fn new(
sources: Vec<Arc<Merge>>,
beg: Bytes,
end: Bytes,
direction: Direction,
) -> Self {
let mut heads = Vec::with_capacity(sources.len());
for src in &sources {
heads.push(seek_in_writeset(src, direction, &beg, &end, None));
}
Self {
sources,
heads,
beg,
end,
direction,
}
}
}
fn seek_in_writeset(
src: &Arc<Merge>,
direction: Direction,
beg: &Bytes,
end: &Bytes,
after: Option<&Bytes>,
) -> Option<(Bytes, Option<Bytes>)> {
let ws = &src.writeset;
let entry = match (direction, after) {
(Direction::Forward, None) => {
ws.range::<Bytes, _>((Bound::Included(beg), Bound::Excluded(end))).next()
}
(Direction::Forward, Some(k)) => {
ws.range::<Bytes, _>((Bound::Excluded(k), Bound::Excluded(end))).next()
}
(Direction::Reverse, None) => {
ws.range::<Bytes, _>((Bound::Included(beg), Bound::Excluded(end))).next_back()
}
(Direction::Reverse, Some(k)) => {
ws.range::<Bytes, _>((Bound::Included(beg), Bound::Excluded(k))).next_back()
}
};
entry.map(|(k, v)| (k.clone(), v.clone()))
}
impl Iterator for MergeQueueIter {
type Item = (Bytes, Option<Bytes>);
fn next(&mut self) -> Option<Self::Item> {
let mut winner: Option<usize> = None;
for (i, head) in self.heads.iter().enumerate() {
let Some((k, _)) = head else {
continue;
};
match winner {
None => winner = Some(i),
Some(wi) => {
let (wk, _) = self.heads[wi].as_ref().unwrap();
let take = match self.direction {
Direction::Forward => k < wk,
Direction::Reverse => k > wk,
};
if take {
winner = Some(i);
}
}
}
}
let winner = winner?;
let (out_key, out_val) = self.heads[winner].take().unwrap();
for i in (winner + 1)..self.heads.len() {
let same = self.heads[i].as_ref().map(|(k, _)| k == &out_key).unwrap_or(false);
if same {
let new = seek_in_writeset(
&self.sources[i],
self.direction,
&self.beg,
&self.end,
Some(&out_key),
);
self.heads[i] = new;
}
}
let new = seek_in_writeset(
&self.sources[winner],
self.direction,
&self.beg,
&self.end,
Some(&out_key),
);
self.heads[winner] = new;
Some((out_key, out_val))
}
}
pub struct MergeIterator<'a> {
pub(crate) tree_iter: TreeIterState<'a>,
pub(crate) self_iter: TreeRange<'a, Bytes, Option<Bytes>>,
pub(crate) join_iter: Box<dyn Iterator<Item = (Bytes, Option<Bytes>)> + 'a>,
pub(crate) tree_next: CachedTreeEntry,
pub(crate) join_next: Option<(Bytes, Option<Bytes>)>,
pub(crate) self_next: Option<(&'a Bytes, &'a Option<Bytes>)>,
pub(crate) direction: Direction,
pub(crate) version: u64,
pub(crate) skip_remaining: usize,
}
#[derive(Clone, Copy, PartialEq, Eq)]
enum KeySource {
None,
Datastore,
Committed,
Transaction,
}
impl<'a> MergeIterator<'a> {
pub fn new(
tree_iter: TreeIterState<'a>,
mut join_iter: Box<dyn Iterator<Item = (Bytes, Option<Bytes>)> + 'a>,
mut self_iter: TreeRange<'a, Bytes, Option<Bytes>>,
direction: Direction,
version: u64,
skip: usize,
) -> Self {
let join_next = join_iter.next();
let self_next = match direction {
Direction::Forward => self_iter.next(),
Direction::Reverse => self_iter.next_back(),
};
let mut me = MergeIterator {
tree_iter,
self_iter,
join_iter,
tree_next: None,
join_next,
self_next,
direction,
version,
skip_remaining: skip,
};
me.tree_next = Self::fetch_tree_entry(&mut me.tree_iter, version);
me
}
#[inline]
fn fetch_tree_entry(tree_iter: &mut TreeIterState<'_>, version: u64) -> CachedTreeEntry {
match tree_iter {
TreeIterState::Forward(range) => {
range.peek().map(|(k, v)| (k.clone(), v.exists_version(version)))
}
TreeIterState::Reverse(range) => {
range.peek().map(|(k, v)| (k.clone(), v.exists_version(version)))
}
}
}
#[inline]
fn peek_tree_value(&mut self) -> Option<Bytes> {
let version = self.version;
match &mut self.tree_iter {
TreeIterState::Forward(range) => {
range.peek().and_then(|(_, v)| v.fetch_version(version))
}
TreeIterState::Reverse(range) => {
range.peek().and_then(|(_, v)| v.fetch_version(version))
}
}
}
#[inline]
fn advance_join(&mut self) {
self.join_next = self.join_iter.next();
}
#[inline]
fn advance_self(&mut self) {
self.self_next = match self.direction {
Direction::Forward => self.self_iter.next(),
Direction::Reverse => self.self_iter.next_back(),
};
}
#[inline]
fn advance_tree(&mut self) {
match &mut self.tree_iter {
TreeIterState::Forward(range) => {
range.next();
}
TreeIterState::Reverse(range) => {
range.next();
}
}
self.tree_next = Self::fetch_tree_entry(&mut self.tree_iter, self.version);
}
#[inline]
fn tree_key(&self) -> Option<&Bytes> {
self.tree_next.as_ref().map(|(k, _)| k)
}
#[inline]
fn tree_exists(&self) -> bool {
self.tree_next.as_ref().map(|(_, e)| *e).unwrap_or(false)
}
#[inline]
fn next_source(&self) -> KeySource {
let mut next_key: Option<&Bytes> = None;
let mut next_source = KeySource::None;
if let Some((sk, _)) = self.self_next {
next_key = Some(sk);
next_source = KeySource::Transaction;
}
if let Some((jk, _)) = &self.join_next {
let should_use = match (next_key, &self.direction) {
(None, _) => true,
(Some(k), Direction::Forward) => jk < k,
(Some(k), Direction::Reverse) => jk > k,
};
if should_use {
next_key = Some(jk);
next_source = KeySource::Committed;
} else if next_key == Some(jk) {
next_source = KeySource::Transaction;
}
}
if let Some(tk) = self.tree_key() {
let should_use = match (next_key, &self.direction) {
(None, _) => true,
(Some(k), Direction::Forward) => tk < k,
(Some(k), Direction::Reverse) => tk > k,
};
if should_use {
next_source = KeySource::Datastore;
}
}
next_source
}
pub fn next_count(&mut self) -> Option<bool> {
loop {
let exists = match self.next_source() {
KeySource::Transaction => {
let (sk, sv) = self.self_next.unwrap();
let exists = sv.is_some();
let skip_join = self.join_next.as_ref().map(|(jk, _)| jk) == Some(sk);
let skip_tree = self.tree_key() == Some(sk);
self.advance_self();
if skip_join {
self.advance_join();
}
if skip_tree {
self.advance_tree();
}
exists
}
KeySource::Committed => {
let exists = self.join_next.as_ref().unwrap().1.is_some();
let skip_tree = self.tree_key() == self.join_next.as_ref().map(|(jk, _)| jk);
self.advance_join();
if skip_tree {
self.advance_tree();
}
exists
}
KeySource::Datastore => {
let exists = self.tree_exists();
self.advance_tree();
exists
}
KeySource::None => return None,
};
if exists && self.skip_remaining > 0 {
self.skip_remaining -= 1;
continue;
}
return Some(exists);
}
}
pub fn next_key(&mut self) -> Option<(Bytes, bool)> {
loop {
match self.next_source() {
KeySource::Transaction => {
let (sk, sv) = self.self_next.unwrap();
let exists = sv.is_some();
let key_ref = sk;
let skip_join = self.join_next.as_ref().map(|(jk, _)| jk) == Some(sk);
let skip_tree = self.tree_key() == Some(sk);
self.advance_self();
if skip_join {
self.advance_join();
}
if skip_tree {
self.advance_tree();
}
if exists && self.skip_remaining > 0 {
self.skip_remaining -= 1;
continue;
}
return Some((key_ref.clone(), exists));
}
KeySource::Committed => {
let (jk, jv) = self.join_next.as_ref().unwrap();
if jv.is_some() && self.skip_remaining > 0 {
let skip_tree = self.tree_key() == Some(jk);
self.advance_join();
if skip_tree {
self.advance_tree();
}
self.skip_remaining -= 1;
continue;
}
let exists = jv.is_some();
let key = jk.clone();
let skip_tree = self.tree_key() == Some(&key);
self.advance_join();
if skip_tree {
self.advance_tree();
}
return Some((key, exists));
}
KeySource::Datastore => {
let exists = self.tree_exists();
if exists && self.skip_remaining > 0 {
self.advance_tree();
self.skip_remaining -= 1;
continue;
}
let key = self.tree_next.as_ref().map(|(k, _)| k.clone()).unwrap();
self.advance_tree();
return Some((key, exists));
}
KeySource::None => return None,
}
}
}
}
impl<'a> Iterator for MergeIterator<'a> {
type Item = (Bytes, Option<Bytes>);
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.next_source() {
KeySource::Transaction => {
let (sk, sv) = self.self_next.unwrap();
let key_ref = sk;
let exists = sv.is_some();
let skip_join = self.join_next.as_ref().map(|(jk, _)| jk) == Some(sk);
let skip_tree = self.tree_key() == Some(sk);
self.advance_self();
if skip_join {
self.advance_join();
}
if skip_tree {
self.advance_tree();
}
if exists && self.skip_remaining > 0 {
self.skip_remaining -= 1;
continue;
}
return Some((key_ref.clone(), sv.clone()));
}
KeySource::Committed => {
let (jk, jv) = self.join_next.as_ref().unwrap();
if jv.is_some() && self.skip_remaining > 0 {
let skip_tree = self.tree_key() == Some(jk);
self.advance_join();
if skip_tree {
self.advance_tree();
}
self.skip_remaining -= 1;
continue;
}
let key = jk.clone();
let value = jv.clone();
let skip_tree = self.tree_key() == Some(&key);
self.advance_join();
if skip_tree {
self.advance_tree();
}
return Some((key, value));
}
KeySource::Datastore => {
let exists = self.tree_exists();
if exists && self.skip_remaining > 0 {
self.advance_tree();
self.skip_remaining -= 1;
continue;
}
let value = if exists {
self.peek_tree_value()
} else {
None
};
let key = self.tree_next.as_ref().map(|(k, _)| k.clone()).unwrap();
self.advance_tree();
return Some((key, value));
}
KeySource::None => return None,
}
}
}
}