use std::borrow::Borrow;
use std::cmp::{Ord, Ordering};
use std::convert::TryInto;
use std::fmt::Debug;
use std::ops::{Bound, Deref, DerefMut, RangeBounds};
use std::sync::Arc;
use std::{marker, mem};
use crate::core::{Diff, Entry, Footprint, Result, ScanEntry, Value};
use crate::core::{FullScan, Index, IndexIter, ScanIter};
use crate::core::{Reader, WalWriter, Writer};
use crate::error::Error;
use crate::llrb_node::{LlrbDepth, Node, Stats};
use crate::mvcc::Snapshot;
use crate::spinlock::{self, RWSpinlock};
include!("llrb_common.rs");
pub struct Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
name: String,
lsm: bool,
spin: bool,
root: Option<Box<Node<K, V>>>,
seqno: u64,
n_count: usize,
latch: RWSpinlock,
key_footprint: isize,
tree_footprint: isize,
readers: Arc<u32>,
writers: Arc<u32>,
}
impl<K, V> Drop for Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
self.root.take().map(drop_tree);
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("Llrb dropped before read/write handles {}", n);
}
}
}
fn drop_tree<K, V>(mut node: Box<Node<K, V>>)
where
K: Ord + Clone,
V: Clone + Diff,
{
node.left.take().map(|left| drop_tree(left));
node.right.take().map(|right| drop_tree(right));
}
pub(crate) struct SquashDebris<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
pub(crate) root: Option<Box<Node<K, V>>>,
pub(crate) seqno: u64,
pub(crate) n_count: usize,
pub(crate) key_footprint: isize,
pub(crate) tree_footprint: isize,
}
impl<K, V> Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
const CONCUR_REF_COUNT: usize = 2;
pub fn new<S: AsRef<str>>(name: S) -> Box<Llrb<K, V>> {
Box::new(Llrb {
name: name.as_ref().to_string(),
lsm: false,
spin: true,
root: None,
seqno: Default::default(),
n_count: Default::default(),
latch: RWSpinlock::new(),
key_footprint: Default::default(),
tree_footprint: Default::default(),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
pub fn new_lsm<S>(name: S) -> Box<Llrb<K, V>>
where
S: AsRef<str>,
{
Box::new(Llrb {
name: name.as_ref().to_string(),
lsm: true,
spin: true,
root: None,
seqno: Default::default(),
n_count: Default::default(),
latch: RWSpinlock::new(),
key_footprint: Default::default(),
tree_footprint: Default::default(),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
pub fn set_spinlatch(&mut self, spin: bool) {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot configure Llrb with active readers/writers {}", n)
}
self.spin = spin;
}
pub fn set_seqno(&mut self, seqno: u64) {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot configure Llrb with active readers/writers {}", n)
}
self.seqno = seqno;
}
pub(crate) fn squash(mut self) -> SquashDebris<K, V> {
let n = self.multi_rw();
if n > Self::CONCUR_REF_COUNT {
panic!("cannot squash Llrb with active readers/writer {}", n);
}
SquashDebris {
root: self.root.take(),
seqno: self.seqno,
n_count: self.n_count,
key_footprint: self.key_footprint,
tree_footprint: self.tree_footprint,
}
}
fn shallow_clone(&self) -> Box<Llrb<K, V>> {
Box::new(Llrb {
name: self.name.clone(),
lsm: self.lsm,
spin: self.spin,
root: None,
seqno: Default::default(),
n_count: Default::default(),
latch: RWSpinlock::new(),
key_footprint: Default::default(),
tree_footprint: Default::default(),
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
pub fn clone(&self) -> Box<Llrb<K, V>> {
Box::new(Llrb {
name: self.name.clone(),
lsm: self.lsm,
spin: self.spin,
root: self.root.clone(),
seqno: self.seqno,
n_count: self.n_count,
latch: RWSpinlock::new(),
key_footprint: self.key_footprint,
tree_footprint: self.tree_footprint,
readers: Arc::new(0xC0FFEE),
writers: Arc::new(0xC0FFEE),
})
}
fn multi_rw(&self) -> usize {
Arc::strong_count(&self.readers) + Arc::strong_count(&self.writers)
}
}
impl<K, V> Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
#[inline]
pub fn is_lsm(&self) -> bool {
self.lsm
}
#[inline]
pub fn len(&self) -> usize {
let _latch = self.latch.acquire_read(self.spin);
self.n_count
}
#[inline]
pub fn to_name(&self) -> String {
self.name.clone()
}
pub fn to_seqno(&self) -> u64 {
let _latch = self.latch.acquire_read(self.spin);
self.seqno
}
pub fn to_stats(&self) -> Stats {
Stats::new_partial(
self.len(),
mem::size_of::<Node<K, V>>(),
self.latch.to_conflicts(),
)
}
pub(crate) fn to_spin(&self) -> bool {
self.spin
}
}
impl<K, V> Index<K, V> for Llrb<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
type W = LlrbWriter<K, V>;
type R = LlrbReader<K, V>;
fn make_new(&self) -> Result<Box<Self>> {
Ok(self.shallow_clone())
}
fn to_reader(&mut self) -> Result<Self::R> {
let index = unsafe {
Box::from_raw(self as *mut Llrb<K, V> as *mut std::ffi::c_void)
};
let reader = Arc::clone(&self.readers);
Ok(LlrbReader::<K, V>::new(index, reader))
}
fn to_writer(&mut self) -> Result<Self::W> {
let index = unsafe {
Box::from_raw(self as *mut Llrb<K, V> as *mut std::ffi::c_void)
};
let writer = Arc::clone(&self.writers);
Ok(LlrbWriter::<K, V>::new(index, writer))
}
}
impl<K, V> Footprint for Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn footprint(&self) -> isize {
let _latch = self.latch.acquire_read(self.spin);
self.tree_footprint
}
}
impl<K, V> Llrb<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set_index(
&mut self,
key: K,
value: V,
seqno: Option<u64>,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let _latch = self.latch.acquire_write(self.spin);
let seqno = match seqno {
Some(seqno) => seqno,
None => self.seqno + 1,
};
let key_footprint = key.footprint();
let new_entry = {
let value = Value::new_upsert_value(value, seqno);
Entry::new(key, value)
};
match Llrb::upsert(self.root.take(), new_entry, self.lsm) {
UpsertResult {
node: Some(mut root),
old_entry,
size,
} => {
root.set_black();
self.root = Some(root);
if old_entry.is_none() {
self.n_count += 1;
self.key_footprint += key_footprint;
}
self.seqno = seqno;
self.tree_footprint += size;
(Some(seqno), Ok(old_entry))
}
_ => panic!("set: impossible case, call programmer"),
}
}
fn set_cas_index(
&mut self,
key: K,
value: V,
cas: u64,
seqno: Option<u64>,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let _latch = self.latch.acquire_write(self.spin);
let seqno = match seqno {
Some(seqno) => seqno,
None => self.seqno + 1,
};
let key_footprint = key.footprint();
let new_entry = {
let value = Value::new_upsert_value(value, seqno);
Entry::new(key, value)
};
match Llrb::upsert_cas(self.root.take(), new_entry, cas, self.lsm) {
UpsertCasResult {
node: root,
err: Some(err),
..
} => {
self.root = root;
(None, Err(err))
}
UpsertCasResult {
node: Some(mut root),
old_entry,
size,
err: None,
} => {
root.set_black();
self.root = Some(root);
if old_entry.is_none() {
self.n_count += 1;
self.key_footprint += key_footprint;
}
self.seqno = seqno;
self.tree_footprint += size;
(Some(seqno), Ok(old_entry))
}
_ => panic!("set_cas: impossible case, call programmer"),
}
}
fn delete_index<Q>(
&mut self,
key: &Q,
seqno: Option<u64>,
) -> (Option<u64>, Result<Option<Entry<K, V>>>)
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let _latch = self.latch.acquire_write(self.spin);
let seqno = match seqno {
Some(seqno) => seqno,
None => self.seqno + 1,
};
let key_footprint = key.to_owned().footprint();
if self.lsm {
let res = Llrb::delete_lsm(self.root.take(), key, seqno);
self.root = res.node;
self.root.as_mut().map(|r| r.set_black());
self.seqno = seqno;
self.tree_footprint += res.size;
return match res.old_entry {
None => {
self.key_footprint += key_footprint;
self.n_count += 1;
(Some(seqno), Ok(None))
}
Some(entry) => (Some(seqno), Ok(Some(entry))),
};
} else {
let res = match Llrb::do_delete(self.root.take(), key) {
res @ DeleteResult { node: None, .. } => res,
mut res => {
res.node.as_mut().map(|node| node.set_black());
res
}
};
self.root = res.node;
if res.old_entry.is_some() {
self.key_footprint -= key_footprint;
self.tree_footprint += res.size;
self.n_count -= 1;
self.seqno = seqno;
(Some(seqno), Ok(res.old_entry))
} else {
(None, Ok(res.old_entry))
}
}
}
}
impl<K, V> Writer<K, V> for Llrb<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set(&mut self, key: K, value: V) -> Result<Option<Entry<K, V>>> {
let (_seqno, entry) = self.set_index(key, value, None);
entry
}
fn set_cas(&mut self, key: K, value: V, cas: u64) -> Result<Option<Entry<K, V>>> {
let (_seqno, entry) = self.set_cas_index(key, value, cas, None);
entry
}
fn delete<Q>(&mut self, key: &Q) -> Result<Option<Entry<K, V>>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let (_seqno, entry) = self.delete_index(key, None);
entry
}
}
struct UpsertResult<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
node: Option<Box<Node<K, V>>>,
old_entry: Option<Entry<K, V>>,
size: isize,
}
struct UpsertCasResult<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
node: Option<Box<Node<K, V>>>,
old_entry: Option<Entry<K, V>>,
size: isize,
err: Option<Error>,
}
struct DeleteResult<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
node: Option<Box<Node<K, V>>>,
old_entry: Option<Entry<K, V>>,
size: isize,
}
impl<K, V> Llrb<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn upsert(
node: Option<Box<Node<K, V>>>,
nentry: Entry<K, V>,
lsm: bool,
) -> UpsertResult<K, V> {
match node {
None => {
let mut node: Box<Node<K, V>> = Box::new(From::from(nentry));
node.dirty = false;
let size: isize = node.footprint().try_into().unwrap();
return UpsertResult {
node: Some(node),
old_entry: None,
size,
};
}
Some(mut node) => {
node = Llrb::walkdown_rot23(node);
match node.as_key().cmp(nentry.as_key()) {
Ordering::Greater => {
let mut r = Llrb::upsert(node.left.take(), nentry, lsm);
node.left = r.node;
r.node = Some(Llrb::walkuprot_23(node));
r
}
Ordering::Less => {
let mut r = Llrb::upsert(node.right.take(), nentry, lsm);
node.right = r.node;
r.node = Some(Llrb::walkuprot_23(node));
r
}
Ordering::Equal => {
let old_entry = Some(node.entry.clone());
let size = node.prepend_version(nentry, lsm);
UpsertResult {
node: Some(Llrb::walkuprot_23(node)),
old_entry,
size,
}
}
}
}
}
}
fn upsert_cas(
node: Option<Box<Node<K, V>>>,
nentry: Entry<K, V>,
cas: u64,
lsm: bool,
) -> UpsertCasResult<K, V> {
let mut node = match node {
None if cas > 0 => {
return UpsertCasResult {
node: None,
old_entry: None,
size: 0,
err: Some(Error::InvalidCAS),
};
}
None => {
let mut node: Box<Node<K, V>> = Box::new(From::from(nentry));
node.dirty = false;
let size: isize = node.footprint().try_into().unwrap();
return UpsertCasResult {
node: Some(node),
old_entry: None,
size,
err: None,
};
}
Some(node) => node,
};
node = Llrb::walkdown_rot23(node);
match node.as_key().cmp(nentry.as_key()) {
Ordering::Greater => {
let left = node.left.take();
let mut r = Llrb::upsert_cas(left, nentry, cas, lsm);
node.left = r.node;
r.node = Some(Llrb::walkuprot_23(node));
r
}
Ordering::Less => {
let right = node.right.take();
let mut r = Llrb::upsert_cas(right, nentry, cas, lsm);
node.right = r.node;
r.node = Some(Llrb::walkuprot_23(node));
r
}
Ordering::Equal => {
let p = node.is_deleted() && cas != 0 && cas != node.to_seqno();
let p = p || (!node.is_deleted() && cas != node.to_seqno());
if p {
UpsertCasResult {
node: Some(Llrb::walkuprot_23(node)),
old_entry: None,
size: 0,
err: Some(Error::InvalidCAS),
}
} else {
let old_entry = Some(node.entry.clone());
let size = node.prepend_version(nentry, lsm);
UpsertCasResult {
node: Some(Llrb::walkuprot_23(node)),
old_entry,
size,
err: None,
}
}
}
}
}
fn delete_lsm<Q>(
node: Option<Box<Node<K, V>>>,
key: &Q,
seqno: u64,
) -> DeleteResult<K, V>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
match node {
None => {
let mut node = Node::new_deleted(key.to_owned(), seqno);
node.dirty = false;
let size: isize = node.footprint().try_into().unwrap();
DeleteResult {
node: Some(node),
old_entry: None,
size,
}
}
Some(mut node) => {
node = Llrb::walkdown_rot23(node);
match node.as_key().borrow().cmp(&key) {
Ordering::Greater => {
let left = node.left.take();
let mut r = Llrb::delete_lsm(left, key, seqno);
node.left = r.node;
r.node = Some(Llrb::walkuprot_23(node));
r
}
Ordering::Less => {
let right = node.right.take();
let mut r = Llrb::delete_lsm(right, key, seqno);
node.right = r.node;
r.node = Some(Llrb::walkuprot_23(node));
r
}
Ordering::Equal => {
let entry = node.entry.clone();
let size = node.delete(seqno);
DeleteResult {
node: Some(Llrb::walkuprot_23(node)),
old_entry: Some(entry),
size,
}
}
}
}
}
}
fn do_delete<Q>(node: Option<Box<Node<K, V>>>, key: &Q) -> DeleteResult<K, V>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let mut node = match node {
None => {
return DeleteResult {
node: None,
old_entry: None,
size: 0,
}
}
Some(node) => node,
};
if node.as_key().borrow().gt(key) {
if node.left.is_none() {
DeleteResult {
node: Some(node),
old_entry: None,
size: 0,
}
} else {
let ok = !is_red(node.as_left_deref());
if ok && !is_red(node.left.as_ref().unwrap().as_left_deref()) {
node = Llrb::move_red_left(node);
}
let mut r = Llrb::do_delete(node.left.take(), key);
node.left = r.node;
r.node = Some(Llrb::fixup(node));
r
}
} else {
if is_red(node.as_left_deref()) {
node = Llrb::rotate_right(node);
}
if !node.as_key().borrow().lt(key) && node.right.is_none() {
return DeleteResult {
node: None,
old_entry: Some(node.entry.clone()),
size: node.footprint(),
};
}
let ok = node.right.is_some() && !is_red(node.as_right_deref());
if ok && !is_red(node.right.as_ref().unwrap().as_left_deref()) {
node = Llrb::move_red_right(node);
}
if !node.as_key().borrow().lt(key) {
let (right, mut res_node) = Llrb::delete_min(node.right.take());
node.right = right;
if res_node.is_none() {
panic!("do_delete(): fatal logic, call the programmer");
}
let subdel = res_node.take().unwrap();
let mut newnode = Box::new(subdel.clone_detach());
newnode.left = node.left.take();
newnode.right = node.right.take();
newnode.black = node.black;
newnode.dirty = false;
let size: isize = node.footprint().try_into().unwrap();
DeleteResult {
node: Some(Llrb::fixup(newnode)),
old_entry: Some(node.entry.clone()),
size,
}
} else {
let mut r = Llrb::do_delete(node.right.take(), key);
node.right = r.node;
r.node = Some(Llrb::fixup(node));
r
}
}
}
fn delete_min(
node: Option<Box<Node<K, V>>>,
) -> (Option<Box<Node<K, V>>>, Option<Node<K, V>>) {
match node {
None => (None, None),
Some(node) if node.left.is_none() => (None, Some(*node)),
Some(mut node) => {
let left = node.as_left_deref();
if !is_red(left) && !is_red(left.unwrap().as_left_deref()) {
node = Llrb::move_red_left(node);
}
let (left, old_node) = Llrb::delete_min(node.left.take());
node.left = left;
(Some(Llrb::fixup(node)), old_node)
}
}
}
}
impl<K, V> Reader<K, V> for Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn get<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let _latch = self.latch.acquire_read(self.spin);
get(self.root.as_ref().map(Deref::deref), key)
}
fn iter(&self) -> Result<IndexIter<K, V>> {
let _latch = Some(self.latch.acquire_read(self.spin));
let node = self.root.as_ref().map(Deref::deref);
Ok(Box::new(Iter {
_latch,
_arc: Default::default(),
paths: Some(build_iter(IFlag::Left, node, vec![])),
}))
}
fn range<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let _latch = Some(self.latch.acquire_read(self.spin));
let root = self.root.as_ref().map(Deref::deref);
let paths = match range.start_bound() {
Bound::Unbounded => Some(build_iter(IFlag::Left, root, vec![])),
Bound::Included(low) => Some(find_start(root, low, true, vec![])),
Bound::Excluded(low) => Some(find_start(root, low, false, vec![])),
};
Ok(Box::new(Range {
_latch,
_arc: Default::default(),
range,
paths,
high: marker::PhantomData,
}))
}
fn reverse<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let _latch = Some(self.latch.acquire_read(self.spin));
let root = self.root.as_ref().map(Deref::deref);
let paths = match range.end_bound() {
Bound::Unbounded => Some(build_iter(IFlag::Right, root, vec![])),
Bound::Included(high) => Some(find_end(root, high, true, vec![])),
Bound::Excluded(high) => Some(find_end(root, high, false, vec![])),
};
let low = marker::PhantomData;
Ok(Box::new(Reverse {
_latch,
_arc: Default::default(),
range,
paths,
low,
}))
}
fn get_with_versions<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.get(key)
}
fn iter_with_versions(&self) -> Result<IndexIter<K, V>> {
self.iter()
}
fn range_with_versions<'a, R, Q>(&'a self, rng: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.range(rng)
}
fn reverse_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.reverse(r)
}
}
impl<K, V> FullScan<K, V> for Llrb<K, V>
where
K: Clone + Ord,
V: Clone + Diff + From<<V as Diff>::D>,
{
fn full_scan<G>(&self, from: Bound<K>, within: G) -> Result<ScanIter<K, V>>
where
G: Clone + RangeBounds<u64>,
{
let _latch = Some(self.latch.acquire_read(self.spin));
let root = self.root.as_ref().map(Deref::deref);
let paths = match from {
Bound::Unbounded => Some(build_iter(IFlag::Left, root, vec![])),
Bound::Included(low) => {
let paths = Some(find_start(root, low.borrow(), true, vec![]));
paths
}
Bound::Excluded(low) => {
let paths = Some(find_start(root, low.borrow(), false, vec![]));
paths
}
};
let start = match within.start_bound() {
Bound::Included(x) => Bound::Included(*x),
Bound::Excluded(x) => Bound::Excluded(*x),
Bound::Unbounded => Bound::Unbounded,
};
let end = match within.end_bound() {
Bound::Included(x) => Bound::Included(*x),
Bound::Excluded(x) => Bound::Excluded(*x),
Bound::Unbounded => Bound::Unbounded,
};
Ok(Box::new(IterFullScan {
_latch,
_arc: Default::default(),
start,
end,
paths,
}))
}
}
impl<K, V> Llrb<K, V>
where
K: Clone + Ord + Debug,
V: Clone + Diff,
{
pub fn validate(&self) -> Result<Stats> {
let _latch = self.latch.acquire_read(self.spin);
let root = self.root.as_ref().map(Deref::deref);
let (red, blacks, depth) = (is_red(root), 0, 0);
let mut depths: LlrbDepth = Default::default();
if red {
panic!("LLRB violation: Root node is alway black: {}", self.name);
}
let blacks = validate_tree(root, red, blacks, depth, &mut depths)?;
if depths.to_max() > 100 {
panic!("LLRB depth has exceeded limit: {}", depths.to_max());
}
Ok(Stats::new_full(
self.n_count,
std::mem::size_of::<Node<K, V>>(),
self.latch.to_conflicts(),
blacks,
depths,
))
}
}
impl<K, V> Llrb<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn walkdown_rot23(node: Box<Node<K, V>>) -> Box<Node<K, V>> {
node
}
fn walkuprot_23(mut node: Box<Node<K, V>>) -> Box<Node<K, V>> {
if is_red(node.as_right_deref()) && !is_red(node.as_left_deref()) {
node = Llrb::rotate_left(node);
}
let left = node.as_left_deref();
if is_red(left) && is_red(left.unwrap().as_left_deref()) {
node = Llrb::rotate_right(node);
}
if is_red(node.as_left_deref()) && is_red(node.as_right_deref()) {
Llrb::flip(node.deref_mut())
}
node
}
fn rotate_left(mut node: Box<Node<K, V>>) -> Box<Node<K, V>> {
if is_black(node.as_right_deref()) {
panic!("rotateleft(): rotating a black link ? call the programmer");
}
let mut x = node.right.take().unwrap();
node.right = x.left.take();
x.black = node.black;
node.set_red();
x.left = Some(node);
x
}
fn rotate_right(mut node: Box<Node<K, V>>) -> Box<Node<K, V>> {
if is_black(node.as_left_deref()) {
panic!("rotateright(): rotating a black link ? call the programmer")
}
let mut x = node.left.take().unwrap();
node.left = x.right.take();
x.black = node.black;
node.set_red();
x.right = Some(node);
x
}
fn flip(node: &mut Node<K, V>) {
node.left.as_mut().unwrap().toggle_link();
node.right.as_mut().unwrap().toggle_link();
node.toggle_link();
}
fn fixup(mut node: Box<Node<K, V>>) -> Box<Node<K, V>> {
node = if is_red(node.as_right_deref()) {
Llrb::rotate_left(node)
} else {
node
};
node = {
let left = node.as_left_deref();
if is_red(left) && is_red(left.unwrap().as_left_deref()) {
Llrb::rotate_right(node)
} else {
node
}
};
if is_red(node.as_left_deref()) && is_red(node.as_right_deref()) {
Llrb::flip(node.deref_mut());
}
node
}
fn move_red_left(mut node: Box<Node<K, V>>) -> Box<Node<K, V>> {
Llrb::flip(node.deref_mut());
if is_red(node.right.as_ref().unwrap().as_left_deref()) {
node.right = Some(Llrb::rotate_right(node.right.take().unwrap()));
node = Llrb::rotate_left(node);
Llrb::flip(node.deref_mut());
}
node
}
fn move_red_right(mut node: Box<Node<K, V>>) -> Box<Node<K, V>> {
Llrb::flip(node.deref_mut());
if is_red(node.left.as_ref().unwrap().as_left_deref()) {
node = Llrb::rotate_right(node);
Llrb::flip(node.deref_mut());
}
node
}
}
pub struct LlrbReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
_refn: Arc<u32>,
index: Option<Box<std::ffi::c_void>>,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
}
impl<K, V> Drop for LlrbReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
Box::leak(self.index.take().unwrap());
}
}
impl<K, V> AsRef<Llrb<K, V>> for LlrbReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn as_ref(&self) -> &Llrb<K, V> {
unsafe {
let index_ptr = self.index.as_ref().unwrap().as_ref();
let index_ptr = index_ptr as *const std::ffi::c_void;
(index_ptr as *const Llrb<K, V>).as_ref().unwrap()
}
}
}
impl<K, V> LlrbReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn new(index: Box<std::ffi::c_void>, _refn: Arc<u32>) -> LlrbReader<K, V> {
LlrbReader {
_refn,
index: Some(index),
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
}
}
}
impl<K, V> Reader<K, V> for LlrbReader<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn get<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let index: &Llrb<K, V> = self.as_ref();
index.get(key)
}
fn iter(&self) -> Result<IndexIter<K, V>> {
let index: &Llrb<K, V> = self.as_ref();
index.iter()
}
fn range<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let index: &Llrb<K, V> = self.as_ref();
index.range(range)
}
fn reverse<'a, R, Q>(&'a self, range: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
let index: &Llrb<K, V> = self.as_ref();
index.reverse(range)
}
fn get_with_versions<Q>(&self, key: &Q) -> Result<Entry<K, V>>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.get(key)
}
fn iter_with_versions(&self) -> Result<IndexIter<K, V>> {
self.iter()
}
fn range_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.range(r)
}
fn reverse_with_versions<'a, R, Q>(&'a self, r: R) -> Result<IndexIter<K, V>>
where
K: Borrow<Q>,
R: 'a + RangeBounds<Q>,
Q: 'a + Ord + ?Sized,
{
self.reverse(r)
}
}
pub struct LlrbWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
_refn: Arc<u32>,
index: Option<Box<std::ffi::c_void>>,
phantom_key: marker::PhantomData<K>,
phantom_val: marker::PhantomData<V>,
}
impl<K, V> Drop for LlrbWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn drop(&mut self) {
Box::leak(self.index.take().unwrap());
}
}
impl<K, V> AsMut<Llrb<K, V>> for LlrbWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn as_mut(&mut self) -> &mut Llrb<K, V> {
unsafe {
let index_ptr = self.index.as_mut().unwrap().as_mut();
let index_ptr = index_ptr as *mut std::ffi::c_void;
(index_ptr as *mut Llrb<K, V>).as_mut().unwrap()
}
}
}
impl<K, V> LlrbWriter<K, V>
where
K: Clone + Ord,
V: Clone + Diff,
{
fn new(index: Box<std::ffi::c_void>, _refn: Arc<u32>) -> LlrbWriter<K, V> {
LlrbWriter {
_refn,
index: Some(index),
phantom_key: marker::PhantomData,
phantom_val: marker::PhantomData,
}
}
}
impl<K, V> Writer<K, V> for LlrbWriter<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set(&mut self, key: K, value: V) -> Result<Option<Entry<K, V>>> {
let index: &mut Llrb<K, V> = self.as_mut();
let (_seqno, entry) = index.set_index(key, value, None);
entry
}
fn set_cas(&mut self, key: K, value: V, cas: u64) -> Result<Option<Entry<K, V>>> {
let index: &mut Llrb<K, V> = self.as_mut();
let (_seqno, entry) = index.set_cas_index(key, value, cas, None);
entry
}
fn delete<Q>(&mut self, key: &Q) -> Result<Option<Entry<K, V>>>
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let index: &mut Llrb<K, V> = self.as_mut();
let (_seqno, entry) = index.delete_index(key, None);
entry
}
}
impl<K, V> WalWriter<K, V> for LlrbWriter<K, V>
where
K: Clone + Ord + Footprint,
V: Clone + Diff + Footprint,
{
fn set_index(
&mut self,
key: K,
value: V,
seqno: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let index: &mut Llrb<K, V> = self.as_mut();
index.set_index(key, value, Some(seqno))
}
fn set_cas_index(
&mut self,
key: K,
value: V,
cas: u64,
seqno: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>) {
let index: &mut Llrb<K, V> = self.as_mut();
index.set_cas_index(key, value, cas, Some(seqno))
}
fn delete_index<Q>(
&mut self,
key: &Q,
seqno: u64,
) -> (Option<u64>, Result<Option<Entry<K, V>>>)
where
K: Borrow<Q>,
Q: ToOwned<Owned = K> + Ord + ?Sized,
{
let index: &mut Llrb<K, V> = self.as_mut();
index.delete_index(key, Some(seqno))
}
}
#[cfg(test)]
#[path = "llrb_test.rs"]
mod llrb_test;