use std::borrow::Borrow;
use std::convert::TryInto;
use std::ops::{Bound, RangeBounds};
use std::{
fs,
mem::{self, ManuallyDrop},
sync::atomic::AtomicBool,
sync::atomic::Ordering::SeqCst,
};
use crate::{error::Error, vlog};
pub type Result<T> = std::result::Result<T, Error>;
pub type IndexIter<'a, K, V> = Box<dyn Iterator<Item = Result<Entry<K, V>>> + 'a>;
pub(crate) type ScanIter<'a, K, V> = Box<dyn Iterator<Item = Result<ScanEntry<K, V>>> + 'a>;
pub trait Diff: Sized {
type D: Clone + From<Self> + Into<Self> + Footprint;
fn diff(&self, old: &Self) -> Self::D;
fn merge(&self, delta: &Self::D) -> Self;
}
pub trait Footprint {
fn footprint(&self) -> isize;
}
pub trait WalWriter<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set_index(
&mut self,
key: K,
value: V,
index: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>);
fn set_cas_index(
&mut self,
key: K,
value: V,
cas: u64,
index: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>);
fn delete_index<Q>(
&mut self,
key: &Q,
index: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>)
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized;
}
pub trait Replay<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn set_index(
&mut self,
key: K,
value: V,
index: u64,
) -> Result<Entry<K, V>>;
fn set_cas_index(
&mut self,
key: K,
value: V,
cas: u64,
index: u64,
) -> Result<Entry<K, V>>;
fn delete_index(&mut self, key: K, index: u64) -> Result<Entry<K, V>>;
}
pub trait Index<K, V>: Sized + Footprint
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
type W: Writer<K, V>;
type R: Reader<K, V>;
fn make_new(&self) -> Result<Box<Self>>;
fn to_reader(&mut self) -> Result<Self::R>;
fn to_writer(&mut self) -> Result<Self::W>;
}
pub trait Reader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn get<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized;
fn iter(&self) -> Result<IndexIter<K, V>>;
fn range<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized;
fn reverse<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized;
fn get_with_versions<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized;
fn iter_with_versions(&self) -> Result<IndexIter<K, V>>;
fn range_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized;
fn reverse_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized;
}
pub trait Writer<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set(&mut self, k: K, v: V) -> Result<Option<Entry<K, V>>>;
fn set_cas(&mut self, k: K, v: V, cas: u64) -> Result<Option<Entry<K, V>>>;
fn delete<Q>(&mut self, key: &Q) -> Result<Option<Entry<K, V>>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized;
}
pub trait Durable {
fn commit(&mut self);
fn compact(&mut self);
}
pub trait Serialize: Sized {
fn encode(&self, buf: &mut Vec<u8>) -> usize;
fn decode(&mut self, buf: &[u8]) -> Result<usize>;
}
pub(crate) trait FullScan<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D>,
{
fn full_scan<G>(&self, from: Bound<K>, within: G) -> Result<ScanIter<K, V>>
where
G: Clone + RangeBounds<u64>;
}
#[derive(Clone)]
pub(crate) enum InnerDelta<V>
where
V: Clone + Diff,
{
U { delta: vlog::Delta<V>, seqno: u64 },
D { seqno: u64 },
}
#[derive(Clone)]
pub(crate) struct Delta<V>
where
V: Clone + Diff,
{
data: InnerDelta<V>,
}
impl<V> Delta<V>
where
V: Clone + Diff,
{
pub(crate) fn new_upsert(delta: vlog::Delta<V>, seqno: u64) -> Delta<V> {
Delta {
data: InnerDelta::U { delta, seqno },
}
}
pub(crate) fn new_delete(seqno: u64) -> Delta<V> {
Delta {
data: InnerDelta::D { seqno },
}
}
}
impl<V> AsRef<InnerDelta<V>> for Delta<V>
where
V: Clone + Diff,
{
fn as_ref(&self) -> &InnerDelta<V> {
&self.data
}
}
impl<V> Delta<V>
where
V: Clone + Diff,
{
#[allow(dead_code)]
pub(crate) fn to_diff(&self) -> Option<<V as Diff>::D> {
match &self.data {
InnerDelta::D { .. } => None,
InnerDelta::U { delta, .. } => delta.to_native_delta(),
}
}
#[allow(dead_code)]
pub(crate) fn into_diff(self) -> Option<<V as Diff>::D> {
match self.data {
InnerDelta::D { .. } => None,
InnerDelta::U { delta, .. } => delta.into_native_delta(),
}
}
pub(crate) fn to_seqno(&self) -> u64 {
match &self.data {
InnerDelta::U { seqno, .. } => *seqno,
InnerDelta::D { seqno } => *seqno,
}
}
#[allow(dead_code)]
pub(crate) fn to_seqno_state(&self) -> (bool, u64) {
match &self.data {
InnerDelta::U { seqno, .. } => (true, *seqno),
InnerDelta::D { seqno } => (false, *seqno),
}
}
pub(crate) fn footprint(&self) -> isize {
let mut footprint: isize = mem::size_of::<Delta<V>>().try_into().unwrap();
footprint += match &self.data {
InnerDelta::U { delta, .. } => delta.diff_footprint(),
InnerDelta::D { .. } => 0,
};
footprint
}
#[allow(dead_code)]
pub(crate) fn into_upserted(self) -> Option<(vlog::Delta<V>, u64)> {
match self.data {
InnerDelta::U { delta, seqno } => Some((delta, seqno)),
InnerDelta::D { .. } => None,
}
}
#[allow(dead_code)]
pub(crate) fn into_deleted(self) -> Option<u64> {
match self.data {
InnerDelta::D { seqno } => Some(seqno),
InnerDelta::U { .. } => None,
}
}
pub(crate) fn is_reference(&self) -> bool {
match self.data {
InnerDelta::U {
delta: vlog::Delta::Reference { .. },
..
} => true,
_ => false,
}
}
#[cfg(test)]
pub(crate) fn is_deleted(&self) -> bool {
match self.data {
InnerDelta::D { .. } => true,
InnerDelta::U { .. } => false,
}
}
}
pub(crate) enum Value<V>
where
V: Clone + Diff,
{
U {
value: ManuallyDrop<Box<vlog::Value<V>>>,
is_reclaim: AtomicBool,
seqno: u64,
},
D {
seqno: u64,
},
}
impl<V> Clone for Value<V>
where
V: Clone + Diff,
{
fn clone(&self) -> Value<V> {
match self {
Value::U {
value,
is_reclaim,
seqno,
} => Value::U {
value: value.clone(),
is_reclaim: AtomicBool::new(is_reclaim.load(SeqCst)),
seqno: *seqno,
},
Value::D { seqno } => Value::D { seqno: *seqno },
}
}
}
impl<V> Drop for Value<V>
where
V: Clone + Diff,
{
fn drop(&mut self) {
match self {
Value::U {
value, is_reclaim, ..
} => {
if is_reclaim.load(SeqCst) {
unsafe { ManuallyDrop::drop(value) };
}
}
_ => (),
}
}
}
impl<V> Value<V>
where
V: Clone + Diff,
{
pub(crate) fn new_upsert(v: Box<vlog::Value<V>>, seqno: u64) -> Value<V> {
Value::U {
value: ManuallyDrop::new(v),
is_reclaim: AtomicBool::new(true),
seqno,
}
}
pub(crate) fn new_upsert_value(value: V, seqno: u64) -> Value<V> {
let value = Box::new(vlog::Value::new_native(value));
Value::U {
value: ManuallyDrop::new(value),
is_reclaim: AtomicBool::new(true),
seqno,
}
}
pub(crate) fn new_delete(seqno: u64) -> Value<V> {
Value::D { seqno }
}
pub(crate) fn mvcc_clone(&self, copyval: bool) -> Value<V> {
match self {
Value::U {
value,
seqno,
is_reclaim,
} if !copyval => {
is_reclaim.store(false, SeqCst);
let v = value.as_ref() as *const vlog::Value<V>;
let value = unsafe { Box::from_raw(v as *mut vlog::Value<V>) };
Value::U {
value: ManuallyDrop::new(value),
is_reclaim: AtomicBool::new(true),
seqno: *seqno,
}
}
val => val.clone(),
}
}
pub(crate) fn to_native_value(&self) -> Option<V> {
match &self {
Value::U { value, .. } => value.to_native_value(),
Value::D { .. } => None,
}
}
pub(crate) fn to_seqno(&self) -> u64 {
match self {
Value::U { seqno, .. } => *seqno,
Value::D { seqno } => *seqno,
}
}
pub(crate) fn is_deleted(&self) -> bool {
match self {
Value::U { .. } => false,
Value::D { .. } => true,
}
}
pub(crate) fn is_reference(&self) -> bool {
match self {
Value::U { value, .. } => value.is_reference(),
_ => false,
}
}
}
impl<V> Value<V>
where
V: Clone + Diff + Footprint,
{
pub(crate) fn footprint(&self) -> isize {
let mut fp: isize = mem::size_of::<Value<V>>().try_into().unwrap();
fp += match self {
Value::U { value, .. } => value.value_footprint(),
Value::D { .. } => 0,
};
fp
}
}
#[derive(Clone)]
pub struct Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
key: K,
value: Value<V>,
deltas: Vec<Delta<V>>,
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
pub const KEY_SIZE_LIMIT: usize = 1024 * 1024 * 1024;
pub const DIFF_SIZE_LIMIT: usize = 1024 * 1024 * 1024 * 1024;
pub const VALUE_SIZE_LIMIT: usize = 1024 * 1024 * 1024 * 1024;
pub(crate) fn new(key: K, value: Value<V>) -> Entry<K, V> {
Entry {
key,
value,
deltas: vec![],
}
}
pub(crate) fn mvcc_clone(&self, copyval: bool) -> Entry<K, V> {
Entry {
key: self.key.clone(),
value: self.value.mvcc_clone(copyval),
deltas: self.deltas.clone(),
}
}
pub(crate) fn set_deltas(&mut self, deltas: Vec<Delta<V>>) {
self.deltas = deltas;
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff + Footprint,
{
pub(crate) fn prepend_version(&mut self, nentry: Self, lsm: bool) -> isize {
if lsm {
self.prepend_version_lsm(nentry)
} else {
self.prepend_version_nolsm(nentry)
}
}
fn prepend_version_nolsm(&mut self, nentry: Self) -> isize {
let size = self.value.footprint();
self.value = nentry.value.clone();
(self.value.footprint() - size).try_into().unwrap()
}
fn prepend_version_lsm(&mut self, nentry: Self) -> isize {
let delta = match &self.value {
Value::D { seqno } => Delta::new_delete(*seqno),
Value::U { value, seqno, .. } if !value.is_reference() => {
match &nentry.value {
Value::D { .. } => {
let value = value.to_native_value().unwrap();
let diff: <V as Diff>::D = From::from(value);
let dlt = vlog::Delta::new_native(diff);
Delta::new_upsert(dlt, *seqno)
}
Value::U { value: nvalue, .. } => {
let value = value.to_native_value().unwrap();
let dff = nvalue.to_native_value().unwrap().diff(&value);
let dlt = vlog::Delta::new_native(dff);
Delta::new_upsert(dlt, *seqno)
}
}
}
Value::U { .. } => unreachable!(),
};
let size = {
let size = nentry.value.footprint() + delta.footprint();
size - self.value.footprint()
};
self.deltas.insert(0, delta);
self.prepend_version_nolsm(nentry);
size.try_into().unwrap()
}
pub(crate) fn delete(&mut self, seqno: u64) -> isize {
let delta_size = match &self.value {
Value::D { seqno } => {
self.deltas.insert(0, Delta::new_delete(*seqno));
0
}
Value::U { value, seqno, .. } if !value.is_reference() => {
let delta = {
let value = value.to_native_value().unwrap();
let d: <V as Diff>::D = From::from(value);
vlog::Delta::new_native(d)
};
let size = delta.diff_footprint();
self.deltas.insert(0, Delta::new_upsert(delta, *seqno));
size
}
Value::U { .. } => unreachable!(),
};
let size = self.value.footprint();
self.value = Value::new_delete(seqno);
(size + delta_size - self.value.footprint())
.try_into()
.unwrap()
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
pub(crate) fn purge(mut self, cutoff: Bound<u64>) -> Option<Entry<K, V>> {
let n = self.to_seqno();
match cutoff {
Bound::Included(cutoff) if n <= cutoff => return None,
Bound::Excluded(cutoff) if n < cutoff => return None,
Bound::Unbounded => return None,
_ => (),
}
self.deltas = self
.deltas
.drain(..)
.take_while(|d| {
let seqno = d.to_seqno();
match cutoff {
Bound::Included(cutoff) if seqno > cutoff => true,
Bound::Excluded(cutoff) if seqno >= cutoff => true,
_ => false,
}
})
.collect();
Some(self)
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D>,
{
pub(crate) fn filter_within(
&self,
start: Bound<u64>,
end: Bound<u64>,
) -> Option<Entry<K, V>> {
let entry = self.skip_till(start.clone(), end)?;
match start {
Bound::Included(x) => entry.purge(Bound::Excluded(x)),
Bound::Excluded(x) => entry.purge(Bound::Included(x)),
Bound::Unbounded => Some(entry),
}
}
fn skip_till(&self, ob: Bound<u64>, nb: Bound<u64>) -> Option<Entry<K, V>> {
let n = self.to_seqno();
match ob {
Bound::Included(o_seqno) if n < o_seqno => return None,
Bound::Excluded(o_seqno) if n <= o_seqno => return None,
_ => (),
}
let o = self.deltas.last().map_or(n, |d| d.to_seqno());
match nb {
Bound::Included(nb) if o > nb => return None,
Bound::Excluded(nb) if o >= nb => return None,
Bound::Included(nb) if n <= nb => return Some(self.clone()),
Bound::Excluded(nb) if n < nb => return Some(self.clone()),
Bound::Unbounded => return Some(self.clone()),
_ => (),
};
let mut entry = self.clone();
let mut iter = entry.deltas.drain(..);
while let Some(delta) = iter.next() {
let value = entry.value.to_native_value();
let (value, _) = next_value(value, delta.data);
entry.value = value;
let seqno = entry.value.to_seqno();
let done = match nb {
Bound::Included(n_seqno) if seqno <= n_seqno => true,
Bound::Excluded(n_seqno) if seqno < n_seqno => true,
_ => false,
};
if done {
entry.deltas = iter.collect();
return Some(entry);
}
}
unreachable!()
}
pub fn versions(&self) -> VersionIter<K, V> {
VersionIter {
key: self.key.clone(),
entry: Some(Entry {
key: self.key.clone(),
value: self.value.clone(),
deltas: Default::default(),
}),
curval: None,
deltas: Some(self.to_deltas().into_iter()),
}
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D> + Footprint,
{
pub(crate) fn flush_merge(self, entry: Entry<K, V>) -> Entry<K, V> {
let (a, mut b) = if self.to_seqno() > entry.to_seqno() {
(self, entry)
} else if entry.to_seqno() > self.to_seqno() {
(entry, self)
} else {
unreachable!()
};
a.validate_flush_merge(&b);
for ne in a.versions().collect::<Vec<Entry<K, V>>>().into_iter().rev() {
b.prepend_version(ne, true );
}
b
}
fn validate_flush_merge(&self, entr: &Entry<K, V>) {
let mut seqnos = vec![self.to_seqno()];
self.deltas.iter().for_each(|d| seqnos.push(d.to_seqno()));
seqnos.push(entr.to_seqno());
entr.deltas.iter().for_each(|d| seqnos.push(d.to_seqno()));
let mut fail = seqnos[0..seqnos.len() - 1]
.into_iter()
.zip(seqnos[1..].into_iter())
.any(|(a, b)| a <= b);
fail = fail || self.value.is_reference();
fail = fail || self.deltas.iter().any(|d| d.is_reference());
if fail {
unreachable!()
}
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff + Serialize,
<V as Diff>::D: Serialize,
{
pub(crate) fn fetch_value(&mut self, fd: &mut fs::File) -> Result<()> {
match &self.value {
Value::U { value, seqno, .. } => match value.to_reference() {
Some((fpos, len, _seqno)) => {
let value = Box::new(vlog::fetch_value(fpos, len, fd)?);
self.value = Value::new_upsert(value, *seqno);
Ok(())
}
_ => Ok(()),
},
_ => Ok(()),
}
}
pub(crate) fn fetch_deltas(&mut self, fd: &mut fs::File) -> Result<()> {
for delta in self.deltas.iter_mut() {
match delta.data {
InnerDelta::U {
delta: vlog::Delta::Reference { fpos, length, .. },
seqno,
} => {
let d = vlog::fetch_delta(fpos, length, fd)?;
*delta = Delta::new_upsert(d, seqno);
}
_ => (),
}
}
Ok(())
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
#[inline]
pub fn as_key(&self) -> &K {
&self.key
}
#[inline]
pub fn to_key(&self) -> K {
self.key.clone()
}
#[inline]
pub(crate) fn as_deltas(&self) -> &Vec<Delta<V>> {
&self.deltas
}
pub(crate) fn to_delta_count(&self) -> usize {
self.deltas.len()
}
pub(crate) fn as_value(&self) -> &Value<V> {
&self.value
}
#[inline]
pub(crate) fn to_deltas(&self) -> Vec<Delta<V>> {
self.deltas.clone()
}
pub fn to_native_value(&self) -> Option<V> {
self.value.to_native_value()
}
#[inline]
pub fn to_seqno(&self) -> u64 {
match self.value {
Value::U { seqno, .. } => seqno,
Value::D { seqno, .. } => seqno,
}
}
#[inline]
pub fn to_seqno_state(&self) -> (bool, u64) {
match self.value {
Value::U { seqno, .. } => (true, seqno),
Value::D { seqno, .. } => (false, seqno),
}
}
pub fn is_deleted(&self) -> bool {
self.value.is_deleted()
}
}
impl<K, V> Entry<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
pub fn footprint(&self) -> isize {
let mut fp: isize = mem::size_of::<Entry<K, V>>().try_into().unwrap();
fp += self.value.footprint();
for delta in self.deltas.iter() {
fp += delta.footprint();
}
fp
}
}
pub struct VersionIter<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D>,
{
key: K,
entry: Option<Entry<K, V>>,
curval: Option<V>,
deltas: Option<std::vec::IntoIter<Delta<V>>>,
}
impl<K, V> Iterator for VersionIter<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D>,
{
type Item = Entry<K, V>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(entry) = self.entry.take() {
if entry.value.is_reference() {
self.deltas.take();
return None;
} else {
self.curval = entry.to_native_value();
return Some(entry);
}
}
let delta = {
match &mut self.deltas {
Some(deltas) => match deltas.next() {
None => {
return None;
}
Some(delta) if delta.is_reference() => {
self.deltas.take();
return None;
}
Some(delta) => delta,
},
None => return None,
}
};
let (value, curval) = next_value(self.curval.take(), delta.data);
self.curval = curval;
Some(Entry::new(self.key.clone(), value))
}
}
fn next_value<V>(value: Option<V>, delta: InnerDelta<V>) -> (Value<V>, Option<V>)
where
V: Clone + Diff + From<<V as Diff>::D>,
{
match (value, delta) {
(None, InnerDelta::D { seqno }) => {
(Value::new_delete(seqno), None)
}
(Some(_), InnerDelta::D { seqno }) => {
(Value::new_delete(seqno), None)
}
(None, InnerDelta::U { delta, seqno }) => {
let nv: V = From::from(delta.into_native_delta().unwrap());
let v = Box::new(vlog::Value::new_native(nv.clone()));
let value = Value::new_upsert(v, seqno);
(value, Some(nv))
}
(Some(curval), InnerDelta::U { delta, seqno }) => {
let nv = curval.merge(&delta.into_native_delta().unwrap());
let v = Box::new(vlog::Value::new_native(nv.clone()));
let value = Value::new_upsert(v, seqno);
(value, Some(nv))
}
}
}
pub enum ScanEntry<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
Found(Entry<K, V>),
Retry(K),
}
#[cfg(test)]
#[path = "core_test.rs"]
mod core_test;