use either::Either;
use txn_core::sync::{Cm, Marker};
use super::*;
use core::{cmp, iter::Rev};
use crossbeam_skiplist::map::Iter as MapIter;
pub struct RevIter<'a, K, V> {
pub(crate) iter: Rev<MapIter<'a, K, Values<V>>>,
pub(crate) version: u64,
}
impl<'a, K, V> Iterator for RevIter<'a, K, V>
where
K: Ord,
{
type Item = Ref<'a, K, V>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let ent = self.iter.next()?;
if let Some(version) = ent
.value()
.upper_bound(Bound::Included(&self.version))
.and_then(|ent| {
if ent.value().is_some() {
Some(*ent.key())
} else {
None
}
})
{
return Some(CommittedRef { version, ent }.into());
}
}
}
}
pub struct WriteTransactionRevIter<'a, K, V, C> {
pendings: Rev<BTreeMapIter<'a, K, EntryValue<V>>>,
committed: RevIter<'a, K, V>,
next_pending: Option<(&'a K, &'a EntryValue<V>)>,
next_committed: Option<Ref<'a, K, V>>,
last_yielded_key: Option<Either<&'a K, Ref<'a, K, V>>>,
marker: Option<Marker<'a, C>>,
}
impl<'a, K, V, C> WriteTransactionRevIter<'a, K, V, C>
where
C: Cm<Key = K>,
K: Ord,
{
fn advance_pending(&mut self) {
self.next_pending = self.pendings.next();
}
fn advance_committed(&mut self) {
self.next_committed = self.committed.next();
if let (Some(item), Some(marker)) = (&self.next_committed, &mut self.marker) {
marker.mark(item.key());
}
}
pub fn new(
pendings: Rev<BTreeMapIter<'a, K, EntryValue<V>>>,
committed: RevIter<'a, K, V>,
marker: Option<Marker<'a, C>>,
) -> Self {
let mut iterator = WriteTransactionRevIter {
pendings,
committed,
next_pending: None,
next_committed: None,
last_yielded_key: None,
marker,
};
iterator.advance_pending();
iterator.advance_committed();
iterator
}
}
impl<'a, K, V, C> Iterator for WriteTransactionRevIter<'a, K, V, C>
where
K: Ord + 'static,
C: Cm<Key = K>,
{
type Item = Ref<'a, K, V>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match (self.next_pending, &self.next_committed) {
(Some((pending_key, _)), Some(committed)) => {
match pending_key.cmp(committed.key()) {
cmp::Ordering::Greater => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending();
self.last_yielded_key = Some(Either::Left(key));
let version = value.version;
match &value.value {
Some(value) => return Some((version, key, value).into()),
None => continue,
}
}
cmp::Ordering::Equal => {
self.advance_committed();
continue;
}
cmp::Ordering::Less => {
let committed = self.next_committed.take().unwrap();
self.advance_committed(); if self.last_yielded_key.as_ref().map_or(true, |k| match k {
Either::Left(k) => *k != committed.key(),
Either::Right(item) => item.key() != committed.key(),
}) {
self.last_yielded_key = Some(Either::Right(committed.clone()));
return Some(committed);
}
}
}
}
(Some((_, _)), None) => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending(); self.last_yielded_key = Some(Either::Left(key)); let version = value.version;
match &value.value {
Some(value) => return Some((version, key, value).into()),
None => continue,
}
}
(None, Some(committed)) => {
if self.last_yielded_key.as_ref().map_or(true, |k| match k {
Either::Left(k) => *k != committed.key(),
Either::Right(item) => item.key() != committed.key(),
}) {
let committed = self.next_committed.take().unwrap();
self.advance_committed(); self.last_yielded_key = Some(Either::Right(committed.clone()));
return Some(committed);
} else {
self.advance_committed();
continue;
}
}
(None, None) => return None,
}
}
}
}