mod internal_node;
mod leaf;
mod leaf_node;
mod node;
use crate::ebr::{Arc, AtomicArc, Barrier, Ptr, Tag};
use crate::wait_queue::AsyncWait;
use leaf::{InsertResult, Leaf, RemoveResult, Scanner};
use node::Node;
use std::borrow::Borrow;
use std::cmp::Ordering;
use std::fmt::{self, Debug};
use std::iter::FusedIterator;
use std::ops::Bound::{Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::pin::Pin;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed};
pub struct TreeIndex<K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
root: AtomicArc<Node<K, V>>,
}
impl<K, V> TreeIndex<K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
#[inline]
#[must_use]
pub fn new() -> TreeIndex<K, V> {
TreeIndex {
root: AtomicArc::null(),
}
}
#[inline]
pub fn insert(&self, mut key: K, mut val: V) -> Result<(), (K, V)> {
let mut new_root = None;
loop {
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.insert(key, val, &mut (), &barrier) {
Ok(r) => match r {
InsertResult::Success => return Ok(()),
InsertResult::Frozen(k, v) | InsertResult::Retry(k, v) => {
key = k;
val = v;
root_ref.cleanup_link(key.borrow(), false, &barrier);
}
InsertResult::Duplicate(k, v) => return Err((k, v)),
InsertResult::Full(k, v) => {
let (k, v) = Node::split_root(k, v, &self.root, &barrier);
key = k;
val = v;
continue;
}
InsertResult::Retired(k, v) => {
key = k;
val = v;
let _result = Node::remove_root(&self.root, &mut (), &barrier);
}
},
Err((k, v)) => {
key = k;
val = v;
}
}
}
let node = if let Some(new_root) = new_root.take() {
new_root
} else {
Arc::new(Node::new_leaf_node())
};
if let Err((node, _)) = self.root.compare_exchange(
Ptr::null(),
(Some(node), Tag::None),
AcqRel,
Acquire,
&barrier,
) {
new_root = node;
}
}
}
#[inline]
pub async fn insert_async(&self, mut key: K, mut val: V) -> Result<(), (K, V)> {
let mut new_root = None;
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
let need_await = {
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.insert(key, val, &mut async_wait_pinned, &barrier) {
Ok(r) => match r {
InsertResult::Success => return Ok(()),
InsertResult::Frozen(k, v) | InsertResult::Retry(k, v) => {
key = k;
val = v;
root_ref.cleanup_link(key.borrow(), false, &barrier);
true
}
InsertResult::Duplicate(k, v) => return Err((k, v)),
InsertResult::Full(k, v) => {
let (k, v) = Node::split_root(k, v, &self.root, &barrier);
key = k;
val = v;
continue;
}
InsertResult::Retired(k, v) => {
key = k;
val = v;
!matches!(
Node::remove_root(&self.root, &mut async_wait_pinned, &barrier),
Ok(true)
)
}
},
Err((k, v)) => {
key = k;
val = v;
true
}
}
} else {
false
}
};
if need_await {
async_wait_pinned.await;
}
let node = if let Some(new_root) = new_root.take() {
new_root
} else {
Arc::new(Node::new_leaf_node())
};
if let Err((node, _)) = self.root.compare_exchange(
Ptr::null(),
(Some(node), Tag::None),
AcqRel,
Acquire,
&Barrier::new(),
) {
new_root = node;
}
}
}
#[inline]
pub fn remove<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.remove_if(key, |_| true)
}
#[inline]
pub async fn remove_async<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
self.remove_if_async(key, |_| true).await
}
#[inline]
pub fn remove_if<Q, F: FnMut(&V) -> bool>(&self, key: &Q, mut condition: F) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let mut has_been_removed = false;
loop {
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.remove_if::<_, _, _>(key, &mut condition, &mut (), &barrier) {
Ok(r) => match r {
RemoveResult::Success => return true,
RemoveResult::Cleanup => {
root_ref.cleanup_link(key, false, &barrier);
return true;
}
RemoveResult::Retired => {
if matches!(Node::remove_root(&self.root, &mut (), &barrier), Ok(true))
{
return true;
}
has_been_removed = true;
}
RemoveResult::Fail => return has_been_removed,
RemoveResult::Frozen => (),
},
Err(removed) => {
if removed {
has_been_removed = true;
}
}
}
} else {
return has_been_removed;
}
}
}
#[inline]
pub async fn remove_if_async<Q, F: FnMut(&V) -> bool>(&self, key: &Q, mut condition: F) -> bool
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let mut has_been_removed = false;
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
if let Some(root_ref) = self.root.load(Acquire, &barrier).as_ref() {
match root_ref.remove_if::<_, _, _>(
key,
&mut condition,
&mut async_wait_pinned,
&barrier,
) {
Ok(r) => match r {
RemoveResult::Success => return true,
RemoveResult::Cleanup => {
root_ref.cleanup_link(key, false, &barrier);
return true;
}
RemoveResult::Retired => {
if matches!(
Node::remove_root(&self.root, &mut async_wait_pinned, &barrier),
Ok(true)
) {
return true;
}
has_been_removed = true;
}
RemoveResult::Fail => return has_been_removed,
RemoveResult::Frozen => (),
},
Err(removed) => {
if removed {
has_been_removed = true;
}
}
}
} else {
return has_been_removed;
}
}
async_wait_pinned.await;
}
}
#[inline]
pub fn read<Q, R, F: FnOnce(&Q, &V) -> R>(&self, key: &Q, reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
let barrier = Barrier::new();
self.read_with(key, reader, &barrier)
}
#[inline]
pub fn read_with<'b, Q, R, F: FnOnce(&Q, &'b V) -> R>(
&self,
key: &Q,
reader: F,
barrier: &'b Barrier,
) -> Option<R>
where
K: Borrow<Q>,
Q: Ord + ?Sized,
{
if let Some(root_ref) = self.root.load(Acquire, barrier).as_ref() {
if let Some(val) = root_ref.search(key, barrier) {
return Some(reader(key, val));
}
}
None
}
#[inline]
pub fn clear(&self) {
self.root.swap((None, Tag::None), Relaxed);
}
#[inline]
pub fn len(&self) -> usize {
let barrier = Barrier::new();
self.iter(&barrier).count()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn depth(&self) -> usize {
let barrier = Barrier::new();
self.root
.load(Acquire, &barrier)
.as_ref()
.map_or(0, |root_ref| root_ref.depth(1, &barrier))
}
#[inline]
pub fn iter<'t, 'b>(&'t self, barrier: &'b Barrier) -> Visitor<'t, 'b, K, V> {
Visitor::new(&self.root, barrier)
}
#[inline]
pub fn range<'t, 'b, R: RangeBounds<K>>(
&'t self,
range: R,
barrier: &'b Barrier,
) -> Range<'t, 'b, K, V, R> {
Range::new(&self.root, range, barrier)
}
}
impl<K, V> Clone for TreeIndex<K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
#[inline]
fn clone(&self) -> Self {
let cloned = Self::default();
for (k, v) in self.iter(&Barrier::new()) {
let _reuslt = cloned.insert(k.clone(), v.clone());
}
cloned
}
}
impl<K, V> Debug for TreeIndex<K, V>
where
K: 'static + Clone + Debug + Ord + Sync,
V: 'static + Clone + Debug + Sync,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let barrier = Barrier::new();
f.debug_map().entries(self.iter(&barrier)).finish()
}
}
impl<K, V> Default for TreeIndex<K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
#[inline]
fn default() -> Self {
TreeIndex::new()
}
}
impl<K, V> PartialEq for TreeIndex<K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + PartialEq + Sync,
{
#[inline]
fn eq(&self, other: &Self) -> bool {
let barrier = Barrier::new();
Iterator::eq(self.iter(&barrier), other.iter(&barrier))
}
}
pub struct Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
root: &'t AtomicArc<Node<K, V>>,
leaf_scanner: Option<Scanner<'b, K, V>>,
barrier: &'b Barrier,
}
impl<'t, 'b, K, V> Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
#[inline]
fn new(root: &'t AtomicArc<Node<K, V>>, barrier: &'b Barrier) -> Visitor<'t, 'b, K, V> {
Visitor::<'t, 'b, K, V> {
root,
leaf_scanner: None,
barrier,
}
}
}
impl<'t, 'b, K, V> Iterator for Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
type Item = (&'b K, &'b V);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.leaf_scanner.is_none() {
let root_ptr = self.root.load(Acquire, self.barrier);
if let Some(root_ref) = root_ptr.as_ref() {
if let Some(scanner) = root_ref.min(self.barrier) {
self.leaf_scanner.replace(scanner);
}
} else {
return None;
}
}
if let Some(mut scanner) = self.leaf_scanner.take() {
let min_allowed_key = scanner.get().map(|(key, _)| key);
if let Some(result) = scanner.next() {
self.leaf_scanner.replace(scanner);
return Some(result);
}
if let Some(new_scanner) = scanner.jump(min_allowed_key, self.barrier) {
if let Some(entry) = new_scanner.get() {
self.leaf_scanner.replace(new_scanner);
return Some(entry);
}
}
}
None
}
}
impl<'t, 'b, K, V> FusedIterator for Visitor<'t, 'b, K, V>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
{
}
pub struct Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
R: 'static + RangeBounds<K>,
{
root: &'t AtomicArc<Node<K, V>>,
leaf_scanner: Option<Scanner<'b, K, V>>,
range: R,
check_lower_bound: bool,
check_upper_bound: bool,
barrier: &'b Barrier,
}
impl<'t, 'b, K, V, R> Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
R: RangeBounds<K>,
{
#[inline]
fn new(
root: &'t AtomicArc<Node<K, V>>,
range: R,
barrier: &'b Barrier,
) -> Range<'t, 'b, K, V, R> {
Range::<'t, 'b, K, V, R> {
root,
leaf_scanner: None,
range,
check_lower_bound: true,
check_upper_bound: false,
barrier,
}
}
#[inline]
fn next_unbounded(&mut self) -> Option<(&'b K, &'b V)> {
if self.leaf_scanner.is_none() {
let root_ptr = self.root.load(Acquire, self.barrier);
if let Some(root_ref) = root_ptr.as_ref() {
let min_allowed_key = match self.range.start_bound() {
Excluded(key) | Included(key) => Some(key),
Unbounded => {
self.check_lower_bound = false;
None
}
};
if let Some(leaf_scanner) = min_allowed_key.map_or_else(
|| {
if let Some(mut min_scanner) = root_ref.min(self.barrier) {
min_scanner.next();
Some(min_scanner)
} else {
None
}
},
|min_allowed_key| {
root_ref.max_le_appr(min_allowed_key, self.barrier)
},
) {
self.check_upper_bound = match self.range.end_bound() {
Excluded(key) => leaf_scanner
.max_key()
.map_or(false, |max_key| max_key.cmp(key) != Ordering::Less),
Included(key) => leaf_scanner
.max_key()
.map_or(false, |max_key| max_key.cmp(key) == Ordering::Greater),
Unbounded => false,
};
if let Some(result) = leaf_scanner.get() {
self.leaf_scanner.replace(leaf_scanner);
return Some(result);
}
}
} else {
return None;
}
}
if let Some(mut scanner) = self.leaf_scanner.take() {
let min_allowed_key = scanner.get().map(|(key, _)| key);
if let Some(result) = scanner.next() {
self.leaf_scanner.replace(scanner);
return Some(result);
}
if let Some(new_scanner) = scanner.jump(min_allowed_key, self.barrier).take() {
if let Some(entry) = new_scanner.get() {
self.check_upper_bound = match self.range.end_bound() {
Excluded(key) => new_scanner
.max_key()
.map_or(false, |max_key| max_key.cmp(key) != Ordering::Less),
Included(key) => new_scanner
.max_key()
.map_or(false, |max_key| max_key.cmp(key) == Ordering::Greater),
Unbounded => false,
};
self.leaf_scanner.replace(new_scanner);
return Some(entry);
}
}
}
None
}
}
impl<'t, 'b, K, V, R> Iterator for Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
R: RangeBounds<K>,
{
type Item = (&'b K, &'b V);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
while let Some((k, v)) = self.next_unbounded() {
if self.check_lower_bound {
match self.range.start_bound() {
Excluded(key) => {
if k.cmp(key) != Ordering::Greater {
continue;
}
}
Included(key) => {
if k.cmp(key) == Ordering::Less {
continue;
}
}
Unbounded => (),
}
}
self.check_lower_bound = false;
if self.check_upper_bound {
match self.range.end_bound() {
Excluded(key) => {
if k.cmp(key) == Ordering::Less {
return Some((k, v));
}
}
Included(key) => {
if k.cmp(key) != Ordering::Greater {
return Some((k, v));
}
}
Unbounded => {
return Some((k, v));
}
}
break;
}
return Some((k, v));
}
None
}
}
impl<'t, 'b, K, V, R> FusedIterator for Range<'t, 'b, K, V, R>
where
K: 'static + Clone + Ord + Sync,
V: 'static + Clone + Sync,
R: RangeBounds<K>,
{
}