use super::{KeySpan, Memtable, StartKey};
use core::{
cmp::Ordering,
ops::{Bound, ControlFlow},
};
use crossbeam_skiplist_mvcc::{
nested::{Entry as MapEntry, VersionedEntry as MapVersionedEntry},
Comparable,
};
use either::Either;
pub(super) enum EntryValue<'a, K, V> {
Range(MapEntry<'a, StartKey<K>, KeySpan<K, V>>),
Point(&'a V),
}
impl<K, V> Clone for EntryValue<'_, K, V> {
#[inline]
fn clone(&self) -> Self {
match self {
Self::Range(entry) => Self::Range(entry.clone()),
Self::Point(entry) => Self::Point(entry),
}
}
}
impl<'a, K, V> EntryValue<'a, K, V> {
#[inline]
fn value(&self) -> &'a V {
match self {
Self::Range(entry) => entry.value().unwrap_value(),
Self::Point(entry) => entry,
}
}
}
pub struct Entry<'a, K, V> {
table: &'a Memtable<K, V>,
key: MapEntry<'a, K, V>,
value: EntryValue<'a, K, V>,
version: u64,
query_version: u64,
}
impl<K, V> core::fmt::Debug for Entry<'_, K, V>
where
K: core::fmt::Debug,
V: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Entry")
.field("key", self.key())
.field("value", self.value())
.field("version", &self.version)
.finish()
}
}
impl<K, V> Clone for Entry<'_, K, V> {
#[inline]
fn clone(&self) -> Self {
Self {
table: self.table,
key: self.key.clone(),
value: self.value.clone(),
version: self.version,
query_version: self.query_version,
}
}
}
impl<'a, K, V> Entry<'a, K, V> {
#[inline]
pub(super) fn new(
table: &'a Memtable<K, V>,
query_version: u64,
key: MapEntry<'a, K, V>,
value: EntryValue<'a, K, V>,
) -> Self {
Self {
table,
version: key.version(),
key,
value,
query_version,
}
}
#[inline]
pub fn key(&self) -> &'a K {
self.key.key()
}
#[inline]
pub fn value(&self) -> &'a V {
self.value.value()
}
#[inline]
pub const fn version(&self) -> u64 {
self.version
}
}
impl<K, V> Entry<'_, K, V>
where
K: Ord,
{
#[inline]
pub fn next(&self) -> Option<Self> {
let mut next = self.key.next();
while let Some(ent) = next {
match self.table.validate(self.query_version, ent) {
ControlFlow::Break(entry) => return entry,
ControlFlow::Continue(ent) => next = ent.next(),
}
}
None
}
#[inline]
pub fn prev(&self) -> Option<Self> {
let mut prev = self.key.prev();
while let Some(ent) = prev {
match self.table.validate(self.query_version, ent) {
ControlFlow::Break(entry) => return entry,
ControlFlow::Continue(ent) => prev = ent.prev(),
}
}
None
}
}
macro_rules! bulk_entry {
($(
$(#[$meta:meta])*
$name:ident($ent:ident, $versioned_ent:ident) $( => $value:ident)?
),+$(,)?) => {
$(
$(#[$meta])*
pub struct $name<'a, K, V> {
table: &'a Memtable<K, V>,
ent: Either<$ent<'a, StartKey<K>, KeySpan<K, V>>, $versioned_ent<'a, StartKey<K>, KeySpan<K, V>>>,
version: u64,
query_version: u64,
}
impl<K, V> core::fmt::Debug for $name<'_, K, V>
where
K: core::fmt::Debug,
V: core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct(stringify!($name))
.field("version", &self.version())
.field("start_bound", &self.start_bound())
.field("end_bound", &self.end_bound())
$(.field("value", self.$value()))?
.finish()
}
}
impl<K, V> Clone for $name<'_, K, V> {
#[inline]
fn clone(&self) -> Self {
Self {
table: self.table,
ent: self.ent.clone(),
version: self.version,
query_version: self.query_version,
}
}
}
impl<'a, K, V> $name<'a, K, V> {
#[inline]
pub(super) fn new(
table: &'a Memtable<K, V>,
query_version: u64,
ent: $ent<'a, StartKey<K>, KeySpan<K, V>>,
) -> Self {
Self {
version: ent.version(),
table,
ent: Either::Left(ent),
query_version,
}
}
#[inline]
pub(super) fn versioned(
table: &'a Memtable<K, V>,
query_version: u64,
ent: $versioned_ent<'a, StartKey<K>, KeySpan<K, V>>,
) -> Self {
Self {
version: ent.version(),
table,
ent: Either::Right(ent),
query_version,
}
}
#[inline]
pub fn contains<Q>(&self, key: &Q) -> bool
where
Q: ?Sized + Comparable<K>,
{
(match self.start_bound() {
Bound::Included(start) => key.compare(start) != Ordering::Less,
Bound::Excluded(start) => key.compare(start) == Ordering::Greater,
Bound::Unbounded => true,
}) && (match self.end_bound() {
Bound::Included(end) => key.compare(end) != Ordering::Greater,
Bound::Excluded(end) => key.compare(end) == Ordering::Less,
Bound::Unbounded => true,
})
}
#[inline]
pub fn bounds(&self) -> (Bound<&'a K>, Bound<&'a K>) {
(self.start_bound(), self.end_bound())
}
#[inline]
pub fn start_bound(&self) -> Bound<&'a K> {
let (k, v) = match &self.ent {
Either::Left(ent) => (ent.key(), ent.value()),
Either::Right(ent) => (ent.key(), ent.value().unwrap()),
};
match k {
StartKey::Key(k) => v.start_bound.as_ref().map(|_| k),
StartKey::Minimum => Bound::Unbounded,
}
}
#[inline]
pub fn end_bound(&self) -> Bound<&'a K> {
match &self.ent {
Either::Left(ent) => ent.value().end_bound.as_ref(),
Either::Right(ent) => ent.value().unwrap().end_bound.as_ref(),
}
}
#[inline]
pub const fn version(&self) -> u64 {
self.version
}
#[inline]
pub fn next(&self) -> Option<Self>
where
K: Ord,
{
match &self.ent {
Either::Left(ent) => {
ent.next().map(|ent| $name::new(self.table, self.query_version, ent))
},
Either::Right(ent) => {
ent.next().map(|ent| $name::versioned(self.table, self.query_version, ent))
},
}
}
#[inline]
pub fn prev(&self) -> Option<Self>
where
K: Ord,
{
match &self.ent {
Either::Left(ent) => {
ent.prev().map(|ent| $name::new(self.table, self.query_version, ent))
},
Either::Right(ent) => {
ent.prev().map(|ent| $name::versioned(self.table, self.query_version, ent))
},
}
}
$(
#[inline]
pub fn $value(&self) -> &'a V {
match &self.ent {
Either::Left(ent) => ent.value().unwrap_value(),
Either::Right(ent) => ent.value().unwrap().unwrap_value(),
}
}
)?
}
impl<K, V> core::ops::RangeBounds<K> for $name<'_, K, V> {
#[inline]
fn start_bound(&self) -> Bound<&K> {
self.start_bound()
}
#[inline]
fn end_bound(&self) -> Bound<&K> {
self.end_bound()
}
}
)*
};
}
bulk_entry!(
BulkDeletionEntry(MapEntry, MapVersionedEntry),
);
bulk_entry!(
BulkUpdateEntry(MapEntry, MapVersionedEntry) => value,
);