use core::{
cmp::Ordering,
marker::PhantomData,
ops::{Bound, ControlFlow, RangeBounds},
};
use crossbeam_skiplist_mvcc::{
nested::{Entry as MapEntry, SkipMap},
Comparable, Equivalent,
};
use iter::*;
use ref_cast::RefCast;
use std::sync::Arc;
pub use entry::*;
mod entry;
pub mod iter;
#[derive(Debug, Clone)]
pub enum Operation<K, V> {
Insert {
key: K,
value: V,
},
Remove(K),
RemoveRange {
start_bound: Bound<K>,
end_bound: Bound<K>,
},
UpdateRange {
start_bound: Bound<K>,
end_bound: Bound<K>,
value: V,
},
}
#[derive(PartialEq, Eq, PartialOrd, Ord, ref_cast::RefCast)]
#[repr(transparent)]
struct Query<K: ?Sized, Q: ?Sized> {
_m: PhantomData<K>,
key: Q,
}
struct QueryRange<K: ?Sized, Q: ?Sized, R>
where
R: RangeBounds<Q>,
{
r: R,
_q: PhantomData<(fn() -> K, fn() -> Q)>,
}
impl<K: ?Sized, Q: ?Sized, R> QueryRange<K, Q, R>
where
R: RangeBounds<Q>,
{
#[inline]
pub(super) const fn new(r: R) -> Self {
Self { r, _q: PhantomData }
}
}
impl<K: ?Sized, Q: ?Sized, R> RangeBounds<Query<K, Q>> for QueryRange<K, Q, R>
where
R: RangeBounds<Q>,
{
#[inline]
fn start_bound(&self) -> Bound<&Query<K, Q>> {
self.r.start_bound().map(RefCast::ref_cast)
}
fn end_bound(&self) -> Bound<&Query<K, Q>> {
self.r.end_bound().map(RefCast::ref_cast)
}
}
#[non_exhaustive]
enum RangeKind<V> {
Set(V),
Deletion,
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
#[non_exhaustive]
enum StartKey<K> {
Minimum,
Key(K),
}
impl<K> StartKey<K> {
#[inline]
fn new(key: Bound<K>) -> Self {
match key {
Bound::Included(k) => Self::Key(k),
Bound::Excluded(k) => Self::Key(k),
Bound::Unbounded => Self::Minimum,
}
}
}
impl<K: Ord> PartialOrd for StartKey<K> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<K: Ord> Ord for StartKey<K> {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Self::Minimum, Self::Minimum) => Ordering::Equal,
(Self::Minimum, _) => Ordering::Less,
(_, Self::Minimum) => Ordering::Greater,
(Self::Key(k1), Self::Key(k2)) => k1.cmp(k2),
}
}
}
impl<K, Q> Equivalent<StartKey<K>> for Query<K, Q>
where
Q: ?Sized + Equivalent<K>,
{
#[inline]
fn equivalent(&self, key: &StartKey<K>) -> bool {
match key {
StartKey::Minimum => false,
StartKey::Key(k) => self.key.equivalent(k),
}
}
}
impl<K, Q> Comparable<StartKey<K>> for Query<K, Q>
where
Q: ?Sized + Comparable<K>,
{
#[inline]
fn compare(&self, key: &StartKey<K>) -> Ordering {
match key {
StartKey::Minimum => Ordering::Greater,
StartKey::Key(k) => self.key.compare(k),
}
}
}
struct KeySpan<K, V> {
start_bound: Bound<()>,
end_bound: Bound<K>,
value: RangeKind<V>,
}
impl<K, V> KeySpan<K, V> {
#[inline]
const fn new(start_bound: Bound<()>, end_bound: Bound<K>, value: RangeKind<V>) -> Self {
Self {
start_bound,
end_bound,
value,
}
}
#[inline]
fn range<'a>(&'a self, start_key: &'a StartKey<K>) -> impl RangeBounds<K> + 'a {
let start_bound = match start_key {
StartKey::Key(k) => match self.start_bound {
Bound::Included(_) => Bound::Included(k),
Bound::Excluded(_) => Bound::Excluded(k),
Bound::Unbounded => Bound::Unbounded,
},
StartKey::Minimum => Bound::Unbounded,
};
(start_bound, self.end_bound.as_ref())
}
#[inline]
const fn unwrap_value(&self) -> &V {
match &self.value {
RangeKind::Set(v) => v,
RangeKind::Deletion => panic!("invoke unwrap value on deletion span"),
}
}
}
struct Inner<K, V> {
skl: SkipMap<K, V>,
range_del_skl: SkipMap<StartKey<K>, KeySpan<K, V>>,
range_key_skl: SkipMap<StartKey<K>, KeySpan<K, V>>,
}
pub struct Memtable<K, V> {
inner: Arc<Inner<K, V>>,
}
impl<K, V> Clone for Memtable<K, V> {
#[inline]
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<K, V> Default for Memtable<K, V> {
fn default() -> Self {
Self::new()
}
}
impl<K, V> Memtable<K, V> {
#[inline]
pub fn new() -> Self {
Self {
inner: Arc::new(Inner {
skl: SkipMap::new(),
range_del_skl: SkipMap::new(),
range_key_skl: SkipMap::new(),
}),
}
}
#[inline]
pub fn maximum_version(&self) -> u64 {
self
.inner
.skl
.maximum_version()
.max(self.inner.range_del_skl.maximum_version())
.max(self.inner.range_key_skl.maximum_version())
}
#[inline]
pub fn minimum_version(&self) -> u64 {
self
.inner
.skl
.minimum_version()
.min(self.inner.range_del_skl.minimum_version())
.min(self.inner.range_key_skl.minimum_version())
}
}
impl<K, V> Memtable<K, V>
where
K: Ord,
{
#[inline]
pub fn contains_key<Q>(&self, version: u64, key: &Q) -> bool
where
Q: ?Sized + Comparable<K>,
{
self.get(version, key).is_some()
}
#[inline]
pub fn get<Q>(&self, version: u64, key: &Q) -> Option<Entry<'_, K, V>>
where
Q: ?Sized + Comparable<K>,
{
let ent = self.inner.skl.get(version, key)?;
match self.validate(version, ent) {
ControlFlow::Break(entry) => entry,
ControlFlow::Continue(_) => None,
}
}
#[inline]
pub fn first(&self, version: u64) -> Option<Entry<'_, K, V>> {
self.iter(version).next()
}
#[inline]
pub fn last(&self, version: u64) -> Option<Entry<'_, K, V>> {
self.iter(version).next_back()
}
#[inline]
pub fn upper_bound<Q>(&self, version: u64, key: Bound<&Q>) -> Option<Entry<'_, K, V>>
where
Q: ?Sized + Comparable<K>,
{
self
.range::<Q, _>(version, (Bound::Unbounded, key))
.next_back()
}
#[inline]
pub fn lower_bound<Q>(&self, version: u64, key: Bound<&Q>) -> Option<Entry<'_, K, V>>
where
Q: ?Sized + Comparable<K>,
{
self.range::<Q, _>(version, (key, Bound::Unbounded)).next()
}
#[inline]
pub fn iter(&self, version: u64) -> Iter<'_, K, V> {
Iter::new(version, self)
}
#[inline]
pub fn iter_points(&self, version: u64) -> PointIter<'_, K, V> {
PointIter::new(version, self)
}
#[inline]
pub fn iter_points_all_versions(&self, version: u64) -> IterAllPoints<'_, K, V> {
self.inner.skl.iter_all_versions(version)
}
#[inline]
pub fn iter_bulk_deletions(&self, version: u64) -> BulkDeletionIter<'_, K, V> {
BulkDeletionIter::new(version, self)
}
#[inline]
pub fn iter_bulk_deletions_all_versions(&self, version: u64) -> BulkDeletionIterAll<'_, K, V> {
BulkDeletionIterAll::new(version, self)
}
#[inline]
pub fn iter_bulk_updates(&self, version: u64) -> BulkUpdateIter<'_, K, V> {
BulkUpdateIter::new(version, self)
}
#[inline]
pub fn iter_bulk_updates_all_versions(&self, version: u64) -> BulkUpdateIterAll<'_, K, V> {
BulkUpdateIterAll::new(version, self)
}
#[inline]
pub fn range<Q, R>(&self, version: u64, r: R) -> Range<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
Range::new(version, self, r)
}
#[inline]
pub fn range_points<Q, R>(&self, version: u64, r: R) -> PointRange<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
PointRange::new(version, self, r)
}
#[inline]
pub fn range_all_points<Q, R>(&self, version: u64, r: R) -> RangeAllPoints<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
self.inner.skl.range_all_versions(version, r)
}
#[inline]
pub fn range_bulk_deletions<Q, R>(&self, version: u64, r: R) -> BulkDeletionRange<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
BulkDeletionRange::new(version, self, r)
}
#[inline]
pub fn range_bulk_deletions_all_versions<Q, R>(
&self,
version: u64,
r: R,
) -> BulkDeletionRangeAll<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
BulkDeletionRangeAll::new(version, self, r)
}
#[inline]
pub fn range_bulk_updates<Q, R>(&self, version: u64, r: R) -> BulkUpdateRange<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
BulkUpdateRange::new(version, self, r)
}
#[inline]
pub fn range_bulk_updates_all_versions<Q, R>(
&self,
version: u64,
r: R,
) -> BulkUpdateRangeAll<'_, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K>,
{
BulkUpdateRangeAll::new(version, self, r)
}
fn validate<'a>(
&'a self,
query_version: u64,
ent: MapEntry<'a, K, V>,
) -> ControlFlow<Option<Entry<'a, K, V>>, MapEntry<'a, K, V>> {
let key = ent.key();
let version = ent.version();
let bound = Query::ref_cast(key);
let shadow = self
.inner
.range_del_skl
.range::<Query<K, K>, _>(query_version, ..=bound)
.any(|ent| {
let del_ent_version = ent.version();
(version <= del_ent_version && del_ent_version <= query_version)
&& ent.value().range(ent.key()).contains(key)
});
if shadow {
return ControlFlow::Continue(ent);
}
let range_ent = self
.inner
.range_key_skl
.range::<Query<K, K>, _>(query_version, ..=bound)
.filter(|ent| {
let range_ent_version = ent.version();
(version <= range_ent_version && range_ent_version <= query_version)
&& ent.value().range(ent.key()).contains(key)
})
.max_by_key(|e| e.version());
if let Some(range_ent) = range_ent {
let value = EntryValue::<K, V>::Range(range_ent);
ControlFlow::Break(Some(Entry::new(self, query_version, ent, value)))
} else {
let value = EntryValue::<K, V>::Point(ent.value());
ControlFlow::Break(Some(Entry::new(self, query_version, ent, value)))
}
}
}
impl<K, V> Memtable<K, V>
where
K: Ord + Send + 'static,
V: Send + 'static,
{
pub fn apply<B>(&self, version: u64, batch: B)
where
B: Iterator<Item = Operation<K, V>>,
{
for op in batch {
match op {
Operation::Insert { key, value } => self.insert(version, key, value),
Operation::Remove(key) => self.remove(version, key),
Operation::RemoveRange {
start_bound,
end_bound,
} => self.remove_range(version, start_bound, end_bound),
Operation::UpdateRange {
start_bound,
end_bound,
value,
} => self.update_range(version, start_bound, end_bound, value),
}
}
}
pub fn insert(&self, version: u64, key: K, value: V) {
self.inner.skl.insert_unchecked(version, key, value);
}
pub fn remove(&self, version: u64, key: K) {
self.inner.skl.remove_unchecked(version, key);
}
pub fn remove_range(&self, version: u64, start: Bound<K>, end: Bound<K>) {
let start_bound = start.as_ref().map(|_| ());
let start = StartKey::new(start);
let span = KeySpan::new(start_bound, end, RangeKind::Deletion);
self
.inner
.range_del_skl
.insert_unchecked(version, start, span);
}
pub fn update_range(&self, version: u64, start: Bound<K>, end: Bound<K>, value: V) {
let start_bound = start.as_ref().map(|_| ());
let start = StartKey::new(start);
let span = KeySpan::new(start_bound, end, RangeKind::Set(value));
self
.inner
.range_key_skl
.insert_unchecked(version, start, span);
}
}