use either::Either;
use txn_core::sync::{Cm, Marker};
use super::*;
use core::cmp;
use crossbeam_skiplist::map::Range as MapRange;
pub struct Range<'a, Q, R, K, V>
where
K: Ord + Borrow<Q>,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
pub(crate) range: MapRange<'a, Q, R, K, Values<V>>,
pub(crate) version: u64,
}
impl<'a, Q, R, K, V> Iterator for Range<'a, Q, R, K, V>
where
K: Ord + Borrow<Q>,
R: RangeBounds<Q>,
Q: Ord + ?Sized,
{
type Item = Ref<'a, K, V>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let ent = self.range.next()?;
if let Some(version) = ent
.value()
.upper_bound(Bound::Included(&self.version))
.and_then(|ent| {
if ent.value().is_some() {
Some(*ent.key())
} else {
None
}
})
{
return Some(CommittedRef { version, ent }.into());
}
}
}
}
pub struct WriteTransactionRange<'a, Q, R, K, V, C>
where
K: Ord + Borrow<Q>,
R: RangeBounds<Q> + 'a,
Q: Ord + ?Sized,
{
pub(crate) committed: Range<'a, Q, R, K, V>,
pub(crate) pendings: BTreeMapRange<'a, K, EntryValue<V>>,
next_pending: Option<(&'a K, &'a EntryValue<V>)>,
next_committed: Option<Ref<'a, K, V>>,
last_yielded_key: Option<Either<&'a K, Ref<'a, K, V>>>,
marker: Option<Marker<'a, C>>,
}
impl<'a, Q, R, K, V, C> WriteTransactionRange<'a, Q, R, K, V, C>
where
K: Ord + Borrow<Q>,
Q: Ord + ?Sized,
R: RangeBounds<Q> + 'a,
C: Cm<Key = K>,
{
fn advance_pending(&mut self) {
self.next_pending = self.pendings.next();
}
fn advance_committed(&mut self) {
self.next_committed = self.committed.next();
if let (Some(item), Some(marker)) = (&self.next_committed, &mut self.marker) {
marker.mark(item.key());
}
}
pub fn new(
pendings: BTreeMapRange<'a, K, EntryValue<V>>,
committed: Range<'a, Q, R, K, V>,
marker: Option<Marker<'a, C>>,
) -> Self {
let mut iterator = WriteTransactionRange {
pendings,
committed,
next_pending: None,
next_committed: None,
last_yielded_key: None,
marker,
};
iterator.advance_pending();
iterator.advance_committed();
iterator
}
}
impl<'a, Q, R, K, V, C> Iterator for WriteTransactionRange<'a, Q, R, K, V, C>
where
K: Ord + Borrow<Q>,
Q: Ord + ?Sized,
R: RangeBounds<Q> + 'a,
C: Cm<Key = K>,
{
type Item = Ref<'a, K, V>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match (self.next_pending, &self.next_committed) {
(Some((pending_key, _)), Some(committed)) => {
match pending_key.cmp(committed.key()) {
cmp::Ordering::Less => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending();
self.last_yielded_key = Some(Either::Left(key));
let version = value.version;
match &value.value {
Some(value) => return Some((version, key, value).into()),
None => continue,
}
}
cmp::Ordering::Equal => {
self.advance_committed();
continue;
}
cmp::Ordering::Greater => {
let committed = self.next_committed.take().unwrap();
self.advance_committed(); if self.last_yielded_key.as_ref().map_or(true, |k| match k {
Either::Left(k) => *k != committed.key(),
Either::Right(item) => item.key() != committed.key(),
}) {
self.last_yielded_key = Some(Either::Right(committed.clone()));
return Some(committed);
}
}
}
}
(Some((_, _)), None) => {
let (key, value) = self.next_pending.take().unwrap();
self.advance_pending(); self.last_yielded_key = Some(Either::Left(key)); let version = value.version;
match &value.value {
Some(value) => return Some((version, key, value).into()),
None => continue,
}
}
(None, Some(committed)) => {
if self.last_yielded_key.as_ref().map_or(true, |k| match k {
Either::Left(k) => *k != committed.key(),
Either::Right(item) => item.key() != committed.key(),
}) {
let committed = self.next_committed.take().unwrap();
self.advance_committed(); self.last_yielded_key = Some(Either::Right(committed.clone()));
return Some(committed);
} else {
self.advance_committed();
continue;
}
}
(None, None) => return None,
}
}
}
}