use core::ops::{Bound, ControlFlow, RangeBounds};
use super::{sealed::Constructable, Error};
use bulk::*;
use either::Either;
use iter::*;
use ref_cast::RefCast;
use skl::{
among::Among,
generic::{
multiple_version::{
sync::{Entry as MapEntry, SkipMap},
Map,
},
Builder, Comparable, KeyRef, Type,
},
Allocator as _, Arena, KeyBuilder, VacantBuffer, ValueBuilder,
};
use std::sync::Arc;
pub use dbutils::types::MaybeStructured;
pub use entry::*;
mod bulk;
mod entry;
pub mod iter;
const UNBOUNDED: u8 = 0;
const INCLUDED: u8 = 1;
const EXCLUDED: u8 = 2;
struct Inner<K: ?Sized, V: ?Sized> {
skl: SkipMap<K, V>,
range_del_skl: SkipMap<PhantomRangeKey<K>, PhantomRangeDeletionSpan<K>>,
range_key_skl: SkipMap<PhantomRangeKey<K>, PhantomRangeUpdateSpan<K, V>>,
}
pub struct Memtable<K: ?Sized, V: ?Sized> {
inner: Arc<Inner<K, V>>,
}
impl<K: ?Sized, V: ?Sized> Clone for Memtable<K, V> {
#[inline]
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<K, V> Constructable for Memtable<K, V>
where
K: ?Sized + 'static,
V: ?Sized + 'static,
{
#[inline]
fn construct(opts: super::Options) -> Result<Self, super::Error> {
let skl_opts = opts.to_skl_options();
let skl = Builder::new()
.with_options(skl_opts)
.alloc::<SkipMap<K, V>>()?;
let allocator = skl.allocator().clone();
let range_del_skl =
SkipMap::<PhantomRangeKey<K>, PhantomRangeDeletionSpan<K>>::create_from_allocator(
allocator.clone(),
)?;
let range_key_skl =
SkipMap::<PhantomRangeKey<K>, PhantomRangeUpdateSpan<K, V>>::create_from_allocator(
allocator,
)?;
Ok(Self {
inner: Arc::new(Inner {
skl,
range_del_skl,
range_key_skl,
}),
})
}
#[cfg(all(feature = "memmap", not(target_family = "wasm")))]
fn map_anon(opts: super::Options) -> std::io::Result<Self> {
fn io_err(e: skl::error::Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::InvalidInput, e)
}
let skl_opts = opts.to_skl_options();
let skl = Builder::new()
.with_options(skl_opts)
.map_anon::<SkipMap<K, V>>()?;
let allocator = skl.allocator().clone();
let range_del_skl =
SkipMap::<PhantomRangeKey<K>, PhantomRangeDeletionSpan<K>>::create_from_allocator(
allocator.clone(),
)
.map_err(io_err)?;
let range_key_skl =
SkipMap::<PhantomRangeKey<K>, PhantomRangeUpdateSpan<K, V>>::create_from_allocator(allocator)
.map_err(io_err)?;
Ok(Self {
inner: Arc::new(Inner {
skl,
range_del_skl,
range_key_skl,
}),
})
}
}
impl<K, V> Memtable<K, V>
where
K: ?Sized + 'static,
V: ?Sized + 'static,
{
#[inline]
pub fn maximum_version(&self) -> u64 {
self
.inner
.skl
.maximum_version()
.max(self.inner.range_del_skl.maximum_version())
.max(self.inner.range_key_skl.maximum_version())
}
#[inline]
pub fn minimum_version(&self) -> u64 {
self
.inner
.skl
.minimum_version()
.min(self.inner.range_del_skl.minimum_version())
.min(self.inner.range_key_skl.minimum_version())
}
#[inline]
pub fn reserved_slice(&self) -> &[u8] {
self.inner.skl.allocator().reserved_slice()
}
#[allow(clippy::mut_from_ref)]
#[inline]
pub unsafe fn reserved_slice_mut(&self) -> &mut [u8] {
self.inner.skl.allocator().reserved_slice_mut()
}
}
impl<K, V> Memtable<K, V>
where
K: ?Sized + Type + 'static,
for<'a> K::Ref<'a>: KeyRef<'a, K>,
V: ?Sized + Type + 'static,
{
#[inline]
pub fn contains_key<'a, Q>(&'a self, version: u64, key: &Q) -> bool
where
Q: ?Sized + Comparable<K::Ref<'a>>,
{
self.get(version, key).is_some()
}
#[inline]
pub fn get<'a, Q>(&'a self, version: u64, key: &Q) -> Option<Entry<'a, K, V>>
where
Q: ?Sized + Comparable<K::Ref<'a>>,
{
let ent = self.inner.skl.get(version, key)?;
match self.validate(version, ent) {
ControlFlow::Break(entry) => entry,
ControlFlow::Continue(_) => None,
}
}
#[inline]
pub fn first(&self, version: u64) -> Option<Entry<'_, K, V>> {
self.iter(version).next()
}
#[inline]
pub fn last(&self, version: u64) -> Option<Entry<'_, K, V>> {
self.iter(version).next_back()
}
#[inline]
pub fn upper_bound<'a, Q>(&'a self, version: u64, key: Bound<&Q>) -> Option<Entry<'a, K, V>>
where
Q: ?Sized + Comparable<K::Ref<'a>>,
{
self
.range::<Q, _>(version, (Bound::Unbounded, key))
.next_back()
}
#[inline]
pub fn lower_bound<'a, Q>(&'a self, version: u64, key: Bound<&Q>) -> Option<Entry<'a, K, V>>
where
Q: ?Sized + Comparable<K::Ref<'a>>,
{
self.range::<Q, _>(version, (key, Bound::Unbounded)).next()
}
#[inline]
pub fn iter(&self, version: u64) -> Iter<'_, K, V> {
Iter::new(version, self)
}
#[inline]
pub fn iter_points(&self, version: u64) -> PointIter<'_, K, V> {
PointIter::new(version, self)
}
#[inline]
pub fn iter_points_all_versions(&self, version: u64) -> IterAllPoints<'_, K, V> {
self.inner.skl.iter_all_versions(version)
}
#[inline]
pub fn iter_bulk_deletions(&self, version: u64) -> BulkDeletionIter<'_, K, V> {
BulkDeletionIter::new(version, self)
}
#[inline]
pub fn iter_bulk_deletions_all_versions(&self, version: u64) -> BulkDeletionIterAll<'_, K, V> {
BulkDeletionIterAll::new(version, self)
}
#[inline]
pub fn iter_bulk_updates(&self, version: u64) -> BulkUpdateIter<'_, K, V> {
BulkUpdateIter::new(version, self)
}
#[inline]
pub fn iter_bulk_updates_all_versions(&self, version: u64) -> BulkUpdateIterAll<'_, K, V> {
BulkUpdateIterAll::new(version, self)
}
#[inline]
pub fn range<'a, Q, R>(&'a self, version: u64, r: R) -> Range<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
Range::new(version, self, r)
}
#[inline]
pub fn range_points<'a, Q, R>(&'a self, version: u64, r: R) -> PointRange<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
PointRange::new(version, self, r)
}
#[inline]
pub fn range_all_points<'a, Q, R>(&'a self, version: u64, r: R) -> RangeAllPoints<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
self.inner.skl.range_all_versions(version, r)
}
#[inline]
pub fn range_bulk_deletions<'a, Q, R>(
&'a self,
version: u64,
r: R,
) -> BulkDeletionRange<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
BulkDeletionRange::new(version, self, r)
}
#[inline]
pub fn range_bulk_deletions_all_versions<'a, Q, R>(
&'a self,
version: u64,
r: R,
) -> BulkDeletionRangeAll<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
BulkDeletionRangeAll::new(version, self, r)
}
#[inline]
pub fn range_bulk_updates<'a, Q, R>(
&'a self,
version: u64,
r: R,
) -> BulkUpdateRange<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
BulkUpdateRange::new(version, self, r)
}
#[inline]
pub fn range_bulk_updates_all_versions<'a, Q, R>(
&'a self,
version: u64,
r: R,
) -> BulkUpdateRangeAll<'a, K, V, Q, R>
where
R: RangeBounds<Q>,
Q: ?Sized + Comparable<K::Ref<'a>>,
{
BulkUpdateRangeAll::new(version, self, r)
}
}
impl<K, V> Memtable<K, V>
where
K: ?Sized + Type + 'static,
for<'a> K::Ref<'a>: KeyRef<'a, K>,
V: ?Sized + Type + 'static,
{
pub fn insert<'a, 'b: 'a>(
&'a self,
version: u64,
key: impl Into<MaybeStructured<'b, K>>,
value: impl Into<MaybeStructured<'b, V>>,
) -> Result<(), Among<K::Error, V::Error, skl::error::Error>> {
self.inner.skl.insert(version, key, value).map(|_| ())
}
pub fn insert_with_value_builder<'a, E>(
&'a self,
version: u64,
key: impl Into<MaybeStructured<'a, K>>,
value_builder: ValueBuilder<impl FnOnce(&mut VacantBuffer<'a>) -> Result<usize, E>>,
) -> Result<(), Among<K::Error, E, skl::error::Error>> {
self
.inner
.skl
.insert_with_value_builder(version, key, value_builder)
.map(|_| ())
}
#[allow(single_use_lifetimes)]
pub fn insert_with_builders<'a, KE, VE>(
&'a self,
version: u64,
key_builder: KeyBuilder<impl FnOnce(&mut VacantBuffer<'a>) -> Result<usize, KE>>,
value_builder: ValueBuilder<impl FnOnce(&mut VacantBuffer<'a>) -> Result<usize, VE>>,
) -> Result<(), Among<KE, VE, skl::error::Error>> {
self
.inner
.skl
.insert_with_builders(version, key_builder, value_builder)
.map(|_| ())
}
pub fn remove<'a>(
&'a self,
version: u64,
key: impl Into<MaybeStructured<'a, K>>,
) -> Result<(), Either<K::Error, Error>> {
self.inner.skl.get_or_remove(version, key).map(|_| ())
}
#[inline]
pub fn remove_with_builder<'a, E>(
&'a self,
version: u64,
key_builder: KeyBuilder<impl FnOnce(&mut VacantBuffer<'a>) -> Result<usize, E>>,
) -> Result<(), Either<E, Error>> {
self
.inner
.skl
.get_or_remove_with_builder(version, key_builder)
.map(|_| ())
}
pub fn remove_range<'a, Q, R>(
&'a self,
version: u64,
range: R,
) -> Result<(), Either<K::Error, Error>>
where
R: RangeBounds<Q> + 'a,
Q: ?Sized + IntoMaybeStructured<K> + 'a,
{
let range = {
let start = range.start_bound().map(IntoMaybeStructured::to_maybe);
let end = range.end_bound().map(IntoMaybeStructured::to_maybe);
(start, end)
};
let start = RangeKeyEncoder::new(range.start_bound().map(|k| *k));
let span = RangeDeletionSpan::new(range.end_bound().map(|k| *k));
let kb = |buf: &mut VacantBuffer<'_>| start.encode(buf);
let vb = |buf: &mut VacantBuffer<'_>| span.encode(buf);
self
.inner
.range_del_skl
.insert_with_builders(
version,
KeyBuilder::new(start.encoded_len, kb),
ValueBuilder::new(span.encoded_len, vb),
)
.map(|_| ())
.map_err(|e| match e {
Among::Left(e) => Either::Left(e),
Among::Middle(e) => Either::Left(e),
Among::Right(e) => Either::Right(e),
})
}
pub fn update_range<'a, Q, R>(
&self,
version: u64,
range: R,
value: impl Into<MaybeStructured<'a, V>>,
) -> Result<(), Among<K::Error, V::Error, Error>>
where
R: RangeBounds<Q> + 'a,
Q: ?Sized + IntoMaybeStructured<K> + 'a,
{
let range = {
let start = range.start_bound().map(IntoMaybeStructured::to_maybe);
let end = range.end_bound().map(IntoMaybeStructured::to_maybe);
(start, end)
};
let start = RangeKeyEncoder::new(range.start_bound().map(|k| *k));
let span = RangeUpdateSpan::new(range.end_bound().map(|k| *k), value.into());
let kb = |buf: &mut VacantBuffer<'_>| start.encode(buf);
let vb = |buf: &mut VacantBuffer<'_>| span.encode(buf);
self
.inner
.range_key_skl
.insert_with_builders(
version,
KeyBuilder::new(start.encoded_len, kb),
ValueBuilder::new(span.encoded_len, vb),
)
.map(|_| ())
.map_err(|e| match e {
Among::Middle(e) => Among::from_either_to_left_middle(e),
Among::Left(e) => Among::Left(e),
Among::Right(e) => Among::Right(e),
})
}
}
impl<K, V> Memtable<K, V>
where
K: ?Sized + Type + 'static,
for<'a> K::Ref<'a>: KeyRef<'a, K>,
V: ?Sized + Type + 'static,
{
fn validate<'a>(
&'a self,
query_version: u64,
ent: MapEntry<'a, K, V>,
) -> ControlFlow<Option<Entry<'a, K, V>>, MapEntry<'a, K, V>> {
let key = ent.key();
let version = ent.version();
let bound = Query::ref_cast(key);
let shadow = self
.inner
.range_del_skl
.range::<Query<K::Ref<'_>>, _>(query_version, ..=bound)
.any(|ent| {
let del_ent_version = ent.version();
let span = ent.value();
(version <= del_ent_version && del_ent_version <= query_version)
&& span.contains(ent.key(), key)
});
if shadow {
return ControlFlow::Continue(ent);
}
let range_ent = self
.inner
.range_key_skl
.range::<Query<K::Ref<'_>>, _>(query_version, ..=bound)
.filter(|ent| {
let range_ent_version = ent.version();
let span = ent.value();
(version <= range_ent_version && range_ent_version <= query_version)
&& span.contains(ent.key(), key)
})
.max_by_key(|e| e.version());
if let Some(range_ent) = range_ent {
let value = EntryValue::<K, V>::Range(range_ent);
ControlFlow::Break(Some(Entry::new(self, query_version, ent, value)))
} else {
let value = EntryValue::<K, V>::Point(*ent.value());
ControlFlow::Break(Some(Entry::new(self, query_version, ent, value)))
}
}
}
use sealed::IntoMaybeStructured;
mod sealed {
use super::MaybeStructured;
pub trait IntoMaybeStructured<T: ?Sized> {
fn to_maybe(&self) -> MaybeStructured<'_, T>;
}
impl<T: ?Sized> IntoMaybeStructured<T> for T {
#[inline]
fn to_maybe(&self) -> MaybeStructured<'_, T> {
MaybeStructured::from(self)
}
}
impl<T: ?Sized> IntoMaybeStructured<T> for &T {
#[inline]
fn to_maybe(&self) -> MaybeStructured<'_, T> {
MaybeStructured::from(*self)
}
}
impl<T: ?Sized> IntoMaybeStructured<T> for MaybeStructured<'_, T> {
#[inline]
fn to_maybe(&self) -> MaybeStructured<'_, T> {
*self
}
}
}