use std::{
borrow::Borrow,
cmp::{Ord, Ordering},
convert::TryInto,
fmt::Debug,
marker, mem,
ops::{Bound, Deref, DerefMut, RangeBounds},
sync::{
atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering::SeqCst},
mpsc, Arc,
},
thread,
};
use crate::core::{Diff, Entry, Footprint, Result, ScanEntry, Value};
use crate::core::{FullScan, Index, IndexIter, ScanIter};
use crate::core::{Reader, WalWriter, Writer};
use crate::error::Error;
use crate::llrb::Llrb;
use crate::llrb_node::{LlrbDepth, Node, Stats};
use crate::spinlock::{self, RWSpinlock};
const RECLAIM_CAP: usize = 128;
include!("llrb_common.rs");
pub struct Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
name: String,
lsm: bool,
spin: bool,
snapshot: OuterSnapshot<K, V>,
latch: RWSpinlock,
key_footprint: AtomicIsize,
tree_footprint: AtomicIsize,
readers: Arc<u32>,
writers: Arc<u32>,
}
impl<K, V> Drop for Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("Mvcc dropped before read/write handles {}", n);
}
{
let mut curr_s: Box<Arc<Snapshot<K,V>>> =
unsafe { Box::from_raw(self.snapshot.inner.load(SeqCst)) };
let snapshot = Arc::get_mut(&mut *curr_s).unwrap();
let n = match snapshot.root.take() {
Some(root) => drop_tree(root),
None => 0,
};
self.snapshot
.n_nodes
.fetch_sub(n.try_into().unwrap(), SeqCst);
}
let n = self.snapshot.n_active.load(SeqCst);
if n != 0 {
panic!("active snapshots: {}", n);
}
let n = self.snapshot.n_nodes.load(SeqCst);
if n != 0 {
panic!("leak or double free n_nodes:{}", n);
}
}
}
impl<K, V> Mvcc<K, V>
where
K: 'static + Send + Clone + Ord + Footprint,
V: 'static + Send + Clone + Diff + Footprint,
<V as Diff>::D: Send,
{
pub fn new<S>(name: S) -> Box<Mvcc<K, V>>
where
S: AsRef<str>,
{
Box::new(Mvcc {
name: name.as_ref().to_string(),
lsm: false,
spin: true,
snapshot: OuterSnapshot::new(),
latch: RWSpinlock::new(),
key_footprint: AtomicIsize::new(0),
tree_footprint: AtomicIsize::new(0),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
pub fn new_lsm<S>(name: S) -> Box<Mvcc<K, V>>
where
S: AsRef<str>,
{
Box::new(Mvcc {
name: name.as_ref().to_string(),
lsm: true,
spin: true,
snapshot: OuterSnapshot::new(),
latch: RWSpinlock::new(),
key_footprint: AtomicIsize::new(0),
tree_footprint: AtomicIsize::new(0),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
pub fn from_llrb(llrb_index: Llrb<K, V>) -> Box<Mvcc<K, V>> {
let mut mvcc_index = if llrb_index.is_lsm() {
Mvcc::new_lsm(llrb_index.to_name())
} else {
Mvcc::new(llrb_index.to_name())
};
mvcc_index.set_spinlatch(llrb_index.to_spin());
mvcc_index
.snapshot
.n_nodes
.store(llrb_index.len() as isize, SeqCst);
let debris = llrb_index.squash();
mvcc_index.key_footprint.store(debris.key_footprint, SeqCst);
mvcc_index
.tree_footprint
.store(debris.tree_footprint, SeqCst);
mvcc_index.snapshot.shift_snapshot(
debris.root,
debris.seqno,
debris.n_count,
vec![],
);
mvcc_index
}
pub fn set_spinlatch(&mut self, spin: bool) {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot configure Mvcc with active readers/writer {}", n);
}
self.spin = spin;
}
pub fn set_seqno(&mut self, seqno: u64) {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot configure Mvcc with active readers/writer {}", n);
}
let snapshot = OuterSnapshot::clone(&self.snapshot);
let root = snapshot.root_duplicate();
self.snapshot
.shift_snapshot(root, seqno, snapshot.n_count, vec![]);
}
pub fn clone(&self) -> Box<Mvcc<K, V>> {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot clone Mvcc with active readers/writer {}", n);
}
let cloned = Box::new(Mvcc {
name: self.name.clone(),
lsm: self.lsm,
spin: self.spin,
snapshot: OuterSnapshot::new(),
latch: RWSpinlock::new(),
key_footprint: AtomicIsize::new(self.key_footprint.load(SeqCst)),
tree_footprint: AtomicIsize::new(self.tree_footprint.load(SeqCst)),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
});
let s: Arc<Snapshot<K, V>> = OuterSnapshot::clone(&self.snapshot);
let root_node = match s.as_root() {
None => None,
Some(n) => Some(Box::new(n.clone())),
};
cloned
.snapshot
.shift_snapshot(root_node, s.seqno, s.n_count, vec![]);
cloned
}
fn shallow_clone(&self) -> Box<Mvcc<K, V>> {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot shallow-clone with active readers/writer {}", n);
}
Box::new(Mvcc {
name: self.name.clone(),
lsm: self.lsm,
spin: self.spin,
snapshot: OuterSnapshot::new(),
latch: RWSpinlock::new(),
key_footprint: AtomicIsize::new(self.key_footprint.load(SeqCst)),
tree_footprint: AtomicIsize::new(self.tree_footprint.load(SeqCst)),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
}
impl<K, V> Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
const CONCUR_REF_COUNT: usize = 2;
#[inline]
pub fn is_lsm(&self) -> bool {
self.lsm
}
#[inline]
pub fn len(&self) -> usize {
OuterSnapshot::clone(&self.snapshot).n_count
}
#[inline]
pub fn to_name(&self) -> String {
self.name.clone()
}
#[inline]
pub fn to_seqno(&self) -> u64 {
OuterSnapshot::clone(&self.snapshot).seqno
}
pub fn to_stats(&self) -> Stats {
let n = self.latch.to_conflicts() + self.snapshot.ulatch.to_conflicts();
Stats::new_partial(self.len(), mem::size_of::<Node<K, V>>(), n)
}
fn multi_rw(&self) -> usize {
Arc::strong_count(&self.readers) + Arc::strong_count(&self.writers)
}
}
impl<K, V> Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn node_new_deleted(&self, key: K, seqno: u64) -> Box<Node<K, V>> {
self.snapshot.n_nodes.fetch_add(1, SeqCst);
Node::new_deleted(key, seqno)
}
fn node_mvcc_clone(
&self,
node: &Node<K, V>,
reclaim: &mut Vec<Box<Node<K, V>>>,
copyval: bool,
) -> Box<Node<K, V>> {
self.snapshot.n_nodes.fetch_add(1, SeqCst);
node.mvcc_clone(reclaim, copyval)
}
fn node_from_entry(&self, new_entry: Entry<K, V>) -> Box<Node<K, V>> {
self.snapshot.n_nodes.fetch_add(1, SeqCst);
Box::new(From::from(new_entry))
}
fn node_mvcc_detach(&self, node: &mut Box<Node<K, V>>) {
node.mvcc_detach();
self.snapshot.n_nodes.as_ref().fetch_sub(1, SeqCst);
}
}
impl<K, V> Index<K, V> for Mvcc<K, V>
where
K: 'static + Send + Clone + Ord + Footprint,
V: 'static + Send + Clone + Diff + Footprint,
<V as Diff>::D: Send,
{
type W = MvccWriter<K, V>;
type R = MvccReader<K, V>;
fn make_new(&self) -> Result<Box<Self>> {
Ok(self.shallow_clone())
}
fn to_reader(&mut self) -> Result<Self::R> {
let index: Box<std::ffi::c_void> = unsafe {
Box::from_raw(self as *mut Mvcc<K, V> as *mut std::ffi::c_void)
};
Ok(MvccReader::<K, V>::new(index))
}
fn to_writer(&mut self) -> Result<Self::W> {
let index: Box<std::ffi::c_void> = unsafe {
Box::from_raw(self as *mut Mvcc<K, V> as *mut std::ffi::c_void)
};
Ok(MvccWriter::<K, V>::new(index))
}
}
impl<K, V> Footprint for Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn footprint(&self) -> isize {
self.tree_footprint.load(SeqCst)
}
}
impl<K, V> Mvcc<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set_index(
&mut self,
key: K,
value: V,
seqno: Option<u64>,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let _w = self.latch.acquire_write(self.spin);
let snapshot: &Arc<Snapshot<K, V>> = self.snapshot.as_ref();
let seqno = match seqno {
Some(seqno) => seqno,
None => snapshot.seqno + 1,
};
let key_footprint = key.footprint();
let new_entry = Entry::new(key, Value::new_upsert_value(value, seqno));
let mut n_count = snapshot.n_count;
let root = snapshot.root_duplicate();
let mut reclm: Vec<Box<Node<K, V>>> = Vec::with_capacity(RECLAIM_CAP);
match self.upsert(root, new_entry, self.lsm, &mut reclm) {
UpsertResult {
node: Some(mut root),
new_node: Some(mut n),
old_entry,
size,
} => {
root.set_black();
if old_entry.is_none() {
n_count += 1;
self.key_footprint.fetch_add(key_footprint, SeqCst);
}
self.tree_footprint.fetch_add(size, SeqCst);
n.dirty = false;
Box::leak(n);
self.snapshot
.shift_snapshot(Some(root), seqno, n_count, reclm);
(Some(seqno), Ok(old_entry))
}
_ => unreachable!(),
}
}
fn set_cas_index(
&mut self,
key: K,
value: V,
cas: u64,
seqno: Option<u64>,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let _w = self.latch.acquire_write(self.spin);
let snapshot: &Arc<Snapshot<K, V>> = self.snapshot.as_ref();
let seqno = match seqno {
Some(seqno) => seqno,
None => snapshot.seqno + 1,
};
let lsm = self.lsm;
let key_footprint = key.footprint();
let new_entry = Entry::new(key, Value::new_upsert_value(value, seqno));
let mut n_count = snapshot.n_count;
let root = snapshot.root_duplicate();
let mut rclm: Vec<Box<Node<K, V>>> = Vec::with_capacity(RECLAIM_CAP);
let s = match self.upsert_cas(root, new_entry, cas, lsm, &mut rclm) {
UpsertCasResult {
node: Some(mut root),
new_node,
old_entry,
err: None,
size,
} => {
root.set_black();
if old_entry.is_none() {
self.key_footprint.fetch_add(key_footprint, SeqCst);
n_count += 1
}
self.tree_footprint.fetch_add(size, SeqCst);
(seqno, root, new_node, Ok(old_entry))
}
UpsertCasResult {
node: Some(mut root),
new_node,
err: Some(err),
..
} => {
root.set_black();
(snapshot.seqno, root, new_node, Err(err))
}
_ => panic!("set_cas: impossible case, call programmer"),
};
let (seqno, root, optn, entry) = s;
if let Some(mut n) = optn {
n.dirty = false;
Box::leak(n);
}
self.snapshot
.shift_snapshot(Some(root), seqno, n_count, rclm);
(Some(seqno), entry)
}
fn delete_index<Q>(
&mut self,
key: &Q,
seqno: Option<u64>,
) -> (Option<u64>, Result<Option<Entry<K, V>>>)
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let _w = self.latch.acquire_write(self.spin);
let snapshot: &Arc<Snapshot<K, V>> = self.snapshot.as_ref();
let seqno = match seqno {
Some(seqno) => seqno,
None => snapshot.seqno + 1,
};
let key_footprint = key.to_owned().footprint();
let mut n_count = snapshot.n_count;
let root = snapshot.root_duplicate();
let mut reclm: Vec<Box<Node<K, V>>> = Vec::with_capacity(RECLAIM_CAP);
let (seqno, root, old_entry) = if self.lsm {
let s = match self.delete_lsm(root, key, seqno, &mut reclm) {
DeleteResult {
node: Some(mut root),
new_node,
old_entry,
size,
} => {
root.set_black();
(Some(root), new_node, old_entry, size)
}
DeleteResult {
node: None,
new_node,
old_entry,
size,
} => (None, new_node, old_entry, size),
};
let (root, new_node, old_entry, size) = s;
self.tree_footprint.fetch_add(size, SeqCst);
match &old_entry {
None => {
self.key_footprint.fetch_add(key_footprint, SeqCst);
n_count += 1;
}
_ => (),
}
if let Some(mut n) = new_node {
n.dirty = false;
Box::leak(n);
}
(seqno, root, old_entry)
} else {
let res = match self.do_delete(root, key, &mut reclm) {
res @ DeleteResult { node: None, .. } => res,
mut res => {
res.node.as_mut().map(|node| node.set_black());
res
}
};
let seqno = if res.old_entry.is_some() {
self.key_footprint.fetch_add(key_footprint, SeqCst);
self.tree_footprint.fetch_add(res.size, SeqCst);
n_count -= 1;
seqno
} else {
snapshot.seqno
};
(seqno, res.node, res.old_entry)
};
self.snapshot.shift_snapshot(root, seqno, n_count, reclm);
(Some(seqno), Ok(old_entry))
}
}
impl<K, V> Writer<K, V> for Mvcc<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set(&mut self, key: K, value: V) -> Result<Option<Entry<K, V>>> {
let (_seqno, entry) = self.set_index(key, value, None);
entry
}
fn set_cas(&mut self, key: K, value: V, cas: u64) -> Result<Option<Entry<K, V>>> {
let (_seqno, entry) = self.set_cas_index(key, value, cas, None);
entry
}
fn delete<Q>(&mut self, key: &Q) -> Result<Option<Entry<K, V>>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let (_seqno, entry) = self.delete_index(key, None);
entry
}
}
struct UpsertResult<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
node: Option<Box<Node<K, V>>>,
new_node: Option<Box<Node<K, V>>>,
old_entry: Option<Entry<K, V>>,
size: isize,
}
struct UpsertCasResult<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
node: Option<Box<Node<K, V>>>,
new_node: Option<Box<Node<K, V>>>,
old_entry: Option<Entry<K, V>>,
size: isize,
err: Option<Error>,
}
struct DeleteResult<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
node: Option<Box<Node<K, V>>>,
new_node: Option<Box<Node<K, V>>>,
old_entry: Option<Entry<K, V>>,
size: isize,
}
impl<K, V> Mvcc<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn upsert(
&self,
node: Option<Box<Node<K, V>>>,
new_entry: Entry<K, V>,
lsm: bool,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> UpsertResult<K, V> {
if node.is_none() {
let node: Box<Node<K, V>> = self.node_from_entry(new_entry);
let n = node.duplicate();
let size: isize = node.footprint().try_into().unwrap();
return UpsertResult {
node: Some(node),
new_node: Some(n),
old_entry: None,
size,
};
}
let node = node.unwrap();
let cmp = node.as_key().cmp(new_entry.as_key());
let r = if cmp == Ordering::Greater {
let mut new_node = self.node_mvcc_clone(&node, reclaim, false);
let left = new_node.left.take();
let mut r = self.upsert(left, new_entry, lsm, reclaim);
new_node.left = r.node;
r.node = Some(self.walkuprot_23(new_node, reclaim));
r
} else if cmp == Ordering::Less {
let mut new_node = self.node_mvcc_clone(&node, reclaim, false);
let right = new_node.right.take();
let mut r = self.upsert(right, new_entry, lsm, reclaim);
new_node.right = r.node;
r.node = Some(self.walkuprot_23(new_node, reclaim));
r
} else {
let mut new_node = self.node_mvcc_clone(&node, reclaim, true);
let entry = node.entry.clone();
let size = new_node.prepend_version(new_entry, lsm);
new_node.dirty = true;
let n = new_node.duplicate();
UpsertResult {
node: Some(self.walkuprot_23(new_node, reclaim)),
new_node: Some(n),
old_entry: Some(entry),
size,
}
};
Box::leak(node);
r
}
fn upsert_cas(
&self,
node: Option<Box<Node<K, V>>>,
nentry: Entry<K, V>,
cas: u64,
lsm: bool,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> UpsertCasResult<K, V> {
if node.is_none() && cas > 0 {
return UpsertCasResult {
node: None,
new_node: None,
old_entry: None,
size: 0,
err: Some(Error::InvalidCAS),
};
} else if node.is_none() {
let node: Box<Node<K, V>> = self.node_from_entry(nentry);
let n = node.duplicate();
let size: isize = node.footprint().try_into().unwrap();
return UpsertCasResult {
node: Some(node),
new_node: Some(n),
old_entry: None,
size,
err: None,
};
}
let node = node.unwrap();
let cmp = node.as_key().cmp(nentry.as_key());
let r = if cmp == Ordering::Greater {
let mut newnd = self.node_mvcc_clone(&node, reclaim, false);
let left = newnd.left.take();
let mut r = self.upsert_cas(left, nentry, cas, lsm, reclaim);
newnd.left = r.node;
r.node = Some(self.walkuprot_23(newnd, reclaim));
r
} else if cmp == Ordering::Less {
let mut newnd = self.node_mvcc_clone(&node, reclaim, false);
let right = newnd.right.take();
let mut r = self.upsert_cas(right, nentry, cas, lsm, reclaim);
newnd.right = r.node;
r.node = Some(self.walkuprot_23(newnd, reclaim));
r
} else if node.is_deleted() && cas != 0 && cas != node.to_seqno() {
let newnd = self.node_mvcc_clone(&node, reclaim, true);
UpsertCasResult {
node: Some(newnd),
new_node: None,
old_entry: None,
size: 0,
err: Some(Error::InvalidCAS),
}
} else if !node.is_deleted() && cas != node.to_seqno() {
let newnd = self.node_mvcc_clone(&node, reclaim, true);
UpsertCasResult {
node: Some(newnd),
new_node: None,
old_entry: None,
size: 0,
err: Some(Error::InvalidCAS),
}
} else {
let mut newnd = self.node_mvcc_clone(&node, reclaim, true);
let entry = Some(node.entry.clone());
let size = newnd.prepend_version(nentry, lsm);
newnd.dirty = true;
let n = newnd.duplicate();
UpsertCasResult {
node: Some(self.walkuprot_23(newnd, reclaim)),
new_node: Some(n),
old_entry: entry,
size,
err: None,
}
};
Box::leak(node);
r
}
fn delete_lsm<Q>(
&self,
node: Option<Box<Node<K, V>>>,
key: &Q,
seqno: u64,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> DeleteResult<K, V>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
if node.is_none() {
let mut node = self.node_new_deleted(key.to_owned(), seqno);
node.dirty = false;
let n = node.duplicate();
let size: isize = node.footprint().try_into().unwrap();
return DeleteResult {
node: Some(node),
new_node: Some(n),
old_entry: None,
size,
};
}
let node = node.unwrap();
let (new_node, n, entry, size) = match node.as_key().borrow().cmp(&key) {
Ordering::Greater => {
let mut new_node = self.node_mvcc_clone(&node, reclaim, false);
let left = new_node.left.take();
let r = self.delete_lsm(left, key, seqno, reclaim);
new_node.left = r.node;
(new_node, r.new_node, r.old_entry, r.size)
}
Ordering::Less => {
let mut new_node = self.node_mvcc_clone(&node, reclaim, false);
let right = new_node.right.take();
let r = self.delete_lsm(right, key, seqno, reclaim);
new_node.right = r.node;
(new_node, r.new_node, r.old_entry, r.size)
}
Ordering::Equal => {
let mut new_node = self.node_mvcc_clone(&node, reclaim, true);
let old_entry = node.entry.clone();
let size = new_node.delete(seqno);
new_node.dirty = true;
let n = new_node.duplicate();
(new_node, Some(n), Some(old_entry), size)
}
};
Box::leak(node);
DeleteResult {
node: Some(self.walkuprot_23(new_node, reclaim)),
new_node: n,
old_entry: entry,
size,
}
}
fn do_delete<Q>(
&self,
node: Option<Box<Node<K, V>>>,
key: &Q,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> DeleteResult<K, V>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
if node.is_none() {
return DeleteResult {
node: None,
new_node: None,
old_entry: None,
size: 0,
};
}
let node = node.unwrap();
let mut newnd = self.node_mvcc_clone(&node, reclaim, true);
Box::leak(node);
if newnd.as_key().borrow().gt(key) {
if newnd.left.is_none() {
DeleteResult {
node: Some(newnd),
new_node: None,
old_entry: None,
size: 0,
}
} else {
let ok = !is_red(newnd.as_left_deref());
if ok && !is_red(newnd.left.as_ref().unwrap().as_left_deref()) {
newnd = self.move_red_left(newnd, reclaim);
}
let mut r = self.do_delete(newnd.left.take(), key, reclaim);
newnd.left = r.node;
r.node = Some(self.fixup(newnd, reclaim));
r
}
} else {
if is_red(newnd.as_left_deref()) {
newnd = self.rotate_right(newnd, reclaim);
}
if !newnd.as_key().borrow().lt(key) && newnd.right.is_none() {
self.node_mvcc_detach(&mut newnd);
let size: isize = newnd.footprint().try_into().unwrap();
return DeleteResult {
node: None,
new_node: None,
old_entry: Some(newnd.entry.clone()),
size,
};
}
let ok = newnd.right.is_some() && !is_red(newnd.as_right_deref());
if ok && !is_red(newnd.right.as_ref().unwrap().as_left_deref()) {
newnd = self.move_red_right(newnd, reclaim);
}
if !newnd.as_key().borrow().lt(key) {
let right = newnd.right.take();
let (right, mut res_node) = self.delete_min(right, reclaim);
newnd.right = right;
if res_node.is_none() {
panic!("do_delete(): fatal logic, call the programmer");
}
let mut newnode = res_node.take().unwrap();
newnode.left = newnd.left.take();
newnode.right = newnd.right.take();
newnode.black = newnd.black;
let entry = newnd.entry.clone();
let size: isize = newnd.footprint().try_into().unwrap();
DeleteResult {
node: Some(self.fixup(newnode, reclaim)),
new_node: None,
old_entry: Some(entry),
size,
}
} else {
let mut r = self.do_delete(newnd.right.take(), key, reclaim);
newnd.right = r.node;
r.node = Some(self.fixup(newnd, reclaim));
r
}
}
}
fn delete_min(
&self,
node: Option<Box<Node<K, V>>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> (Option<Box<Node<K, V>>>, Option<Box<Node<K, V>>>) {
if node.is_none() {
return (None, None);
}
let node = node.unwrap();
let mut new_node = self.node_mvcc_clone(&node, reclaim, true);
Box::leak(node);
if new_node.left.is_none() {
self.node_mvcc_detach(&mut new_node);
(None, Some(new_node))
} else {
let left = new_node.as_left_deref();
if !is_red(left) && !is_red(left.unwrap().as_left_deref()) {
new_node = self.move_red_left(new_node, reclaim);
}
let left = new_node.left.take();
let (left, old_node) = self.delete_min(left, reclaim);
new_node.left = left;
(Some(self.fixup(new_node, reclaim)), old_node)
}
}
}
impl<K, V> Reader<K, V> for Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn get<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let snapshot: Arc<Snapshot<K, V>> = OuterSnapshot::clone(&self.snapshot);
let res = get(snapshot.as_root(), key);
res
}
fn iter(&self) -> Result<IndexIter<K, V>> {
let mut iter = Box::new(Iter {
_latch: Default::default(),
_arc: OuterSnapshot::clone(&self.snapshot),
paths: Default::default(),
});
let root = iter
._arc
.as_ref()
.root_duplicate()
.map(|n| Box::leak(n) as &Node<K, V>);
iter.paths = Some(build_iter(IFlag::Left, root, vec![]));
Ok(iter)
}
fn range<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let mut r = Box::new(Range {
_latch: Default::default(),
_arc: OuterSnapshot::clone(&self.snapshot),
range,
paths: Default::default(),
high: marker::PhantomData,
});
let root = r
._arc
.as_ref()
.root_duplicate()
.map(|n| Box::leak(n) as &Node<K, V>);
r.paths = match r.range.start_bound() {
Bound::Unbounded => Some(build_iter(IFlag::Left, root, vec![])),
Bound::Included(low) => Some(find_start(root, low, true, vec![])),
Bound::Excluded(low) => Some(find_start(root, low, false, vec![])),
};
Ok(r)
}
fn reverse<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let mut r = Box::new(Reverse {
_latch: Default::default(),
_arc: OuterSnapshot::clone(&self.snapshot),
range,
paths: Default::default(),
low: marker::PhantomData,
});
let root = r
._arc
.as_ref()
.root_duplicate()
.map(|n| Box::leak(n) as &Node<K, V>);
r.paths = match r.range.end_bound() {
Bound::Unbounded => Some(build_iter(IFlag::Right, root, vec![])),
Bound::Included(high) => Some(find_end(root, high, true, vec![])),
Bound::Excluded(high) => Some(find_end(root, high, false, vec![])),
};
Ok(r)
}
fn get_with_versions<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.get(key)
}
fn iter_with_versions(&self) -> Result<IndexIter<K, V>> {
self.iter()
}
fn range_with_versions<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.range(range)
}
fn reverse_with_versions<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.reverse(range)
}
}
impl<K, V> FullScan<K, V> for Mvcc<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D>,
{
fn full_scan<G>(&self, from: Bound<K>, within: G) -> Result<ScanIter<K, V>>
where
G: Clone + RangeBounds<u64>,
{
let start = match within.start_bound() {
Bound::Included(x) => Bound::Included(*x),
Bound::Excluded(x) => Bound::Excluded(*x),
Bound::Unbounded => Bound::Unbounded,
};
let end = match within.end_bound() {
Bound::Included(x) => Bound::Included(*x),
Bound::Excluded(x) => Bound::Excluded(*x),
Bound::Unbounded => Bound::Unbounded,
};
let mut iter = Box::new(IterFullScan {
_latch: Default::default(),
_arc: OuterSnapshot::clone(&self.snapshot),
start,
end,
paths: Default::default(),
});
let root = iter
._arc
.as_ref()
.root_duplicate()
.map(|n| Box::leak(n) as &Node<K, V>);
iter.paths = match from {
Bound::Unbounded => Some(build_iter(IFlag::Left, root, vec![])),
Bound::Included(low) => {
let paths = Some(find_start(root, low.borrow(), true, vec![]));
paths
}
Bound::Excluded(low) => {
let paths = Some(find_start(root, low.borrow(), false, vec![]));
paths
}
};
Ok(iter)
}
}
impl<K, V> Mvcc<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Diff,
{
pub fn validate(&self) -> Result<Stats> {
let arc_mvcc = OuterSnapshot::clone(&self.snapshot);
let root = arc_mvcc.as_root();
let (red, blacks, depth) = (is_red(root), 0, 0);
let mut depths: LlrbDepth = Default::default();
if red {
panic!("LLRB violation: Root node is alway black: {}", self.name);
}
let blacks = validate_tree(root, red, blacks, depth, &mut depths)?;
if depths.to_max() > 100 {
panic!("LLRB depth has exceeded limit: {}", depths.to_max());
}
let n = self.latch.to_conflicts() + self.snapshot.ulatch.to_conflicts();
Ok(Stats::new_full(
arc_mvcc.n_count,
std::mem::size_of::<Node<K, V>>(),
n,
blacks,
depths,
))
}
}
impl<K, V> Mvcc<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn walkuprot_23(
&self,
mut node: Box<Node<K, V>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> Box<Node<K, V>> {
let (left, right) = (node.as_left_deref(), node.as_right_deref());
if is_red(right) && !is_red(left) {
node = self.rotate_left(node, reclaim);
}
let left = node.as_left_deref();
if is_red(left) && is_red(left.unwrap().as_left_deref()) {
node = self.rotate_right(node, reclaim);
}
let (left, right) = (node.as_left_deref(), node.as_right_deref());
if is_red(left) && is_red(right) {
self.flip(node.deref_mut(), reclaim)
}
node
}
fn rotate_left(
&self,
mut node: Box<Node<K, V>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> Box<Node<K, V>> {
let old_right = node.right.take().unwrap();
if is_black(Some(old_right.as_ref())) {
panic!("rotateleft(): rotating a black link ? call the programmer");
}
let mut right = if old_right.dirty {
old_right
} else {
self.node_mvcc_clone(Box::leak(old_right), reclaim, true)
};
node.right = right.left.take();
right.black = node.black;
node.set_red();
right.left = Some(node);
right
}
fn rotate_right(
&self,
mut node: Box<Node<K, V>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> Box<Node<K, V>> {
let old_left = node.left.take().unwrap();
if is_black(Some(old_left.as_ref())) {
panic!("rotateright(): rotating a black link ? call the programmer")
}
let mut left = if old_left.dirty {
old_left
} else {
self.node_mvcc_clone(Box::leak(old_left), reclaim, true)
};
node.left = left.right.take();
left.black = node.black;
node.set_red();
left.right = Some(node);
left
}
fn flip(&self, node: &mut Node<K, V>, reclaim: &mut Vec<Box<Node<K, V>>>) {
let old_left = node.left.take().unwrap();
let old_right = node.right.take().unwrap();
let mut left = if old_left.dirty {
old_left
} else {
self.node_mvcc_clone(Box::leak(old_left), reclaim, true)
};
let mut right = if old_right.dirty {
old_right
} else {
self.node_mvcc_clone(Box::leak(old_right), reclaim, true)
};
left.toggle_link();
right.toggle_link();
node.toggle_link();
node.left = Some(left);
node.right = Some(right);
}
fn fixup(
&self,
mut node: Box<Node<K, V>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> Box<Node<K, V>> {
if is_red(node.as_right_deref()) {
node = self.rotate_left(node, reclaim)
}
let left = node.as_left_deref();
if is_red(left) && is_red(left.unwrap().as_left_deref()) {
node = self.rotate_right(node, reclaim)
}
if is_red(node.as_left_deref()) && is_red(node.as_right_deref()) {
self.flip(node.deref_mut(), reclaim);
}
node
}
fn move_red_left(
&self,
mut node: Box<Node<K, V>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> Box<Node<K, V>> {
self.flip(node.deref_mut(), reclaim);
if is_red(node.right.as_ref().unwrap().as_left_deref()) {
let right = node.right.take().unwrap();
node.right = Some(self.rotate_right(right, reclaim));
node = self.rotate_left(node, reclaim);
self.flip(node.deref_mut(), reclaim);
}
node
}
fn move_red_right(
&self,
mut node: Box<Node<K, V>>,
reclaim: &mut Vec<Box<Node<K, V>>>,
) -> Box<Node<K, V>> {
self.flip(node.deref_mut(), reclaim);
if is_red(node.left.as_ref().unwrap().as_left_deref()) {
node = self.rotate_right(node, reclaim);
self.flip(node.deref_mut(), reclaim);
}
node
}
}
struct OuterSnapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
ulatch: RWSpinlock,
inner: AtomicPtr<Arc<Snapshot<K, V>>>,
n_nodes: Arc<AtomicIsize>,
n_active: Arc<AtomicUsize>,
}
impl<K, V> AsRef<Arc<Snapshot<K, V>>> for OuterSnapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn as_ref(&self) -> &Arc<Snapshot<K, V>> {
unsafe { self.inner.load(SeqCst).as_ref().unwrap() }
}
}
impl<K, V> OuterSnapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn new() -> OuterSnapshot<K, V> {
let n_active = Arc::new(AtomicUsize::new(2));
let m = Arc::clone(&n_active);
let n = Arc::clone(&n_active);
let n_nodes = Arc::new(AtomicIsize::new(0));
let next_snapshot: Option<Arc<Snapshot<K,V>>> =
Some(Arc::new(*Snapshot::new(None, Arc::clone(&n_nodes), m)));
let curr_snapshot: Box<Snapshot<K, V>> =
Snapshot::new(next_snapshot, Arc::clone(&n_nodes), n);
let arc: Box<Arc<Snapshot<K, V>>> = Box::new(Arc::new(*curr_snapshot));
OuterSnapshot {
ulatch: RWSpinlock::new(),
inner: AtomicPtr::new(Box::leak(arc)),
n_nodes,
n_active,
}
}
fn clone(this: &OuterSnapshot<K, V>) -> Arc<Snapshot<K, V>> {
let _r = this.ulatch.acquire_read(true );
let inner_snap: &Arc<Snapshot<K,V>> =
unsafe { this.inner.load(SeqCst).as_ref().unwrap() };
Arc::clone(inner_snap)
}
fn shift_snapshot(
&self,
root: Option<Box<Node<K, V>>>,
seqno: u64,
n_count: usize,
reclaim: Vec<Box<Node<K, V>>>,
) {
loop {
if self.n_active.load(SeqCst) < 1000 {
break;
} else {
thread::yield_now();
}
}
let _w = self.ulatch.acquire_write(true );
let m = Arc::clone(&self.n_active);
let curr_s: Box<Arc<Snapshot<K,V>>> =
unsafe { Box::from_raw(self.inner.load(SeqCst)) };
let curr_r: &Snapshot<K, V> = curr_s.as_ref().as_ref();
let curr_m: &mut Snapshot<K, V> = unsafe {
(curr_r as *const Snapshot<K,V>
as *mut Snapshot<K,V>)
.as_mut()
.unwrap()
};
let next_m = Arc::get_mut(curr_m.next.as_mut().unwrap()).unwrap();
next_m.root = root;
next_m.reclaim = Some(reclaim);
next_m.seqno = seqno;
next_m.n_count = n_count;
let n_nodes = Arc::clone(&self.n_nodes);
next_m.next = Some(Arc::new(*Snapshot::new(None, n_nodes, m)));
let next_s: Box<Arc<Snapshot<K, V>>> = Box::new(Arc::clone(
curr_r.next.as_ref().unwrap(),
));
self.n_active.fetch_add(1, SeqCst);
self.inner.store(Box::leak(next_s), SeqCst);
}
}
pub(crate) struct Snapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
root: Option<Box<Node<K, V>>>,
reclaim: Option<Vec<Box<Node<K, V>>>>,
seqno: u64,
n_count: usize,
n_nodes: Arc<AtomicIsize>,
n_active: Arc<AtomicUsize>,
next: Option<Arc<Snapshot<K, V>>>,
}
impl<K, V> Snapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn new(
next: Option<Arc<Snapshot<K, V>>>,
n_nodes: Arc<AtomicIsize>,
n_active: Arc<AtomicUsize>,
) -> Box<Snapshot<K, V>> {
Box::new(Snapshot {
root: Default::default(),
reclaim: Default::default(),
seqno: Default::default(),
n_count: Default::default(),
next,
n_nodes,
n_active,
})
}
fn root_duplicate(&self) -> Option<Box<Node<K, V>>> {
match &self.root {
None => None,
Some(node) => {
let node = node.deref() as *const Node<K, V> as *mut Node<K, V>;
Some(unsafe { Box::from_raw(node) })
}
}
}
fn as_root(&self) -> Option<&Node<K, V>> {
self.root.as_ref().map(Deref::deref)
}
}
impl<K, V> Drop for Snapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
self.root.take().map(Box::leak);
match self.reclaim.take() {
Some(reclaim) => {
self.n_nodes
.as_ref()
.fetch_sub(reclaim.len().try_into().unwrap(), SeqCst);
}
_ => (),
};
let _n = self.n_active.fetch_sub(1, SeqCst);
let mut child = self.next.take();
while let Some(snap) = child.take() {
match Arc::try_unwrap(snap) {
Ok(mut snap) => {
child = snap.next.take();
mem::drop(snap)
}
Err(_snap) => break,
}
}
}
}
impl<K, V> Default for Snapshot<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn default() -> Snapshot<K, V> {
Snapshot {
root: Default::default(),
reclaim: Default::default(),
seqno: Default::default(),
n_count: Default::default(),
next: Default::default(),
n_nodes: Default::default(),
n_active: Default::default(),
}
}
}
pub struct MvccReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
index: Option<Box<std::ffi::c_void>>,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
}
impl<K, V> Drop for MvccReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
Box::leak(self.index.take().unwrap());
}
}
impl<K, V> AsRef<Mvcc<K, V>> for MvccReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn as_ref(&self) -> &Mvcc<K, V> {
unsafe {
let index_ptr = self.index.as_ref().unwrap().as_ref();
let index_ptr = index_ptr as *const std::ffi::c_void;
(index_ptr as *const Mvcc<K, V>).as_ref().unwrap()
}
}
}
impl<K, V> MvccReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn new(index: Box<std::ffi::c_void>) -> MvccReader<K, V> {
MvccReader {
index: Some(index),
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
}
}
}
impl<K, V> Reader<K, V> for MvccReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn get<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let index: &Mvcc<K, V> = self.as_ref();
index.get(key)
}
fn iter(&self) -> Result<IndexIter<K, V>> {
let index: &Mvcc<K, V> = self.as_ref();
index.iter()
}
fn range<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let index: &Mvcc<K, V> = self.as_ref();
index.range(range)
}
fn reverse<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let index: &Mvcc<K, V> = self.as_ref();
index.reverse(range)
}
fn get_with_versions<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.get(key)
}
fn iter_with_versions(&self) -> Result<IndexIter<K, V>> {
self.iter()
}
fn range_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.range(r)
}
fn reverse_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.reverse(r)
}
}
pub struct MvccWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
index: Option<Box<std::ffi::c_void>>,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
}
impl<K, V> Drop for MvccWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
Box::leak(self.index.take().unwrap());
}
}
impl<K, V> AsMut<Mvcc<K, V>> for MvccWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn as_mut(&mut self) -> &mut Mvcc<K, V> {
unsafe {
let index_ptr = self.index.as_mut().unwrap().as_mut();
let index_ptr = index_ptr as *mut std::ffi::c_void;
(index_ptr as *mut Mvcc<K, V>).as_mut().unwrap()
}
}
}
impl<K, V> MvccWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn new(index: Box<std::ffi::c_void>) -> MvccWriter<K, V> {
MvccWriter {
index: Some(index),
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
}
}
}
impl<K, V> Writer<K, V> for MvccWriter<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set(&mut self, key: K, value: V) -> Result<Option<Entry<K, V>>> {
let index: &mut Mvcc<K, V> = self.as_mut();
let (_seqno, entry) = index.set_index(key, value, None);
entry
}
fn set_cas(&mut self, key: K, value: V, cas: u64) -> Result<Option<Entry<K, V>>> {
let index: &mut Mvcc<K, V> = self.as_mut();
let (_seqno, entry) = index.set_cas_index(key, value, cas, None);
entry
}
fn delete<Q>(&mut self, key: &Q) -> Result<Option<Entry<K, V>>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let index: &mut Mvcc<K, V> = self.as_mut();
let (_seqno, entry) = index.delete_index(key, None);
entry
}
}
impl<K, V> WalWriter<K, V> for MvccWriter<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set_index(
&mut self,
key: K,
value: V,
seqno: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let index: &mut Mvcc<K, V> = self.as_mut();
index.set_index(key, value, Some(seqno))
}
fn set_cas_index(
&mut self,
key: K,
value: V,
cas: u64,
seqno: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let index: &mut Mvcc<K, V> = self.as_mut();
index.set_cas_index(key, value, cas, Some(seqno))
}
fn delete_index<Q>(
&mut self,
key: &Q,
seqno: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>)
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let index: &mut Mvcc<K, V> = self.as_mut();
index.delete_index(key, Some(seqno))
}
}
#[allow(dead_code)]
fn gc<K, V>(rx: mpsc::Receiver<usize>, n_nodes: Arc<AtomicIsize>) -> Result<()>
where
K: Clone + Ord,
V: Clone + Diff,
{
for n in rx {
n_nodes.as_ref().fetch_sub(n.try_into().unwrap(), SeqCst);
}
Ok(())
}
fn drop_tree<K, V>(mut node: Box<Node<K, V>>) -> usize
where
K: Ord + Clone,
V: Clone + Diff,
{
let mut n = match node.left.take() {
Some(left) => drop_tree(left),
None => 0,
};
n += match node.right.take() {
Some(right) => drop_tree(right),
None => 0,
};
n + 1
}
#[allow(dead_code)]
fn print_reclaim<K, V>(prefix: &str, reclaim: &Vec<Box<Node<K, V>>>)
where
K: Clone + Ord,
V: Clone + Diff,
{
print!("{}reclaim ", prefix);
reclaim.iter().for_each(|item| print!("{:p} ", *item));
println!("");
}
#[cfg(test)]
#[path = "mvcc_test.rs"]
mod mvcc_test;