use super::ebr::{Arc, AtomicArc, Barrier};
use super::hash_table::bucket::{Bucket, EntryPtr, Locker, OPTIMISTIC};
use super::hash_table::bucket_array::BucketArray;
use super::hash_table::{HashTable, LockedEntry};
use super::wait_queue::AsyncWait;
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::fmt::{self, Debug};
use std::hash::{BuildHasher, Hash};
use std::iter::FusedIterator;
use std::ops::{Deref, RangeInclusive};
use std::panic::UnwindSafe;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, Relaxed};
pub struct HashIndex<K, V, H = RandomState>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
array: AtomicArc<BucketArray<K, V, OPTIMISTIC>>,
minimum_capacity: AtomicUsize,
build_hasher: H,
}
pub struct Reserve<'h, K, V, H = RandomState>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
hashindex: &'h HashIndex<K, V, H>,
additional: usize,
}
pub struct Visitor<'h, 'b, K, V, H = RandomState>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
hashindex: &'h HashIndex<K, V, H>,
current_array: Option<&'b BucketArray<K, V, OPTIMISTIC>>,
current_index: usize,
current_bucket: Option<&'b Bucket<K, V, OPTIMISTIC>>,
current_entry_ptr: EntryPtr<'b, K, V, OPTIMISTIC>,
barrier: &'b Barrier,
}
pub enum ModifyAction<V> {
Keep,
Remove,
Update(V),
}
impl<K, V, H> HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
pub fn with_hasher(build_hasher: H) -> Self {
Self {
array: AtomicArc::null(),
minimum_capacity: AtomicUsize::new(0),
build_hasher,
}
}
#[inline]
pub fn with_capacity_and_hasher(capacity: usize, build_hasher: H) -> Self {
let (array, minimum_capacity) = if capacity == 0 {
(AtomicArc::null(), AtomicUsize::new(0))
} else {
let array = unsafe {
Arc::new_unchecked(BucketArray::<K, V, OPTIMISTIC>::new(
capacity,
AtomicArc::null(),
))
};
let minimum_capacity = array.num_entries();
(AtomicArc::from(array), AtomicUsize::new(minimum_capacity))
};
Self {
array,
minimum_capacity,
build_hasher,
}
}
#[inline]
pub fn reserve(&self, additional_capacity: usize) -> Option<Reserve<K, V, H>> {
let additional = self.reserve_capacity(additional_capacity);
if additional == 0 {
None
} else {
Some(Reserve {
hashindex: self,
additional,
})
}
}
#[inline]
pub fn insert(&self, key: K, val: V) -> Result<(), (K, V)> {
let barrier = Barrier::new();
let hash = self.hash(&key);
if let Ok(Some((k, v))) = self.insert_entry(key, val, hash, &mut (), &barrier) {
Err((k, v))
} else {
Ok(())
}
}
#[inline]
pub async fn insert_async(&self, mut key: K, mut val: V) -> Result<(), (K, V)> {
let hash = self.hash(&key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
match self.insert_entry(key, val, hash, &mut async_wait_pinned, &Barrier::new()) {
Ok(Some(returned)) => return Err(returned),
Ok(None) => return Ok(()),
Err(returned) => {
key = returned.0;
val = returned.1;
}
}
async_wait_pinned.await;
}
}
#[inline]
pub fn modify<Q, F, R>(&self, key: &Q, updater: F) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
F: FnOnce(&K, &V) -> R,
R: Into<ModifyAction<V>>,
{
let barrier = Barrier::new();
let hash = self.hash(key);
if let Ok(Some(LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
})) = self.get_entry(key, hash, &mut (), &barrier)
{
let (k, v) = entry_ptr.get(data_block_mut);
let modify_action: ModifyAction<V> = updater(k, v).into();
let result = match modify_action {
ModifyAction::Keep => false,
ModifyAction::Remove => true,
ModifyAction::Update(new_v) => {
let new_k = k.clone();
locker.insert_with(
data_block_mut,
BucketArray::<K, V, OPTIMISTIC>::partial_hash(hash),
|| (new_k, new_v),
&barrier,
);
true
}
};
if result {
locker.erase(data_block_mut, &mut entry_ptr);
}
return result;
}
false
}
#[inline]
pub async fn modify_async<Q, F, R>(&self, key: &Q, updater: F) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
F: FnOnce(&K, &V) -> R,
R: Into<ModifyAction<V>>,
{
let hash = self.hash(key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
if let Ok(result) = self.get_entry(key, hash, &mut async_wait_pinned, &barrier) {
if let Some(LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
}) = result
{
let (k, v) = entry_ptr.get(data_block_mut);
let modify_action: ModifyAction<V> = updater(k, v).into();
let result = match modify_action {
ModifyAction::Keep => false,
ModifyAction::Remove => true,
ModifyAction::Update(new_v) => {
let new_k = k.clone();
locker.insert_with(
data_block_mut,
BucketArray::<K, V, OPTIMISTIC>::partial_hash(hash),
|| (new_k, new_v),
&barrier,
);
true
}
};
if result {
locker.erase(data_block_mut, &mut entry_ptr);
}
return result;
}
return false;
};
}
async_wait_pinned.await;
}
}
#[inline]
pub unsafe fn update<Q, F, R>(&self, key: &Q, updater: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
F: FnOnce(&K, &mut V) -> R,
{
let barrier = Barrier::new();
let LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
} = self
.get_entry(key, self.hash(key), &mut (), &barrier)
.ok()
.flatten()?;
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
Some(updater(k, v))
}
#[inline]
pub async unsafe fn update_async<Q, F, R>(&self, key: &Q, updater: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
F: FnOnce(&K, &mut V) -> R,
{
let hash = self.hash(key);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if let Ok(result) = self.get_entry(key, hash, &mut async_wait_pinned, &Barrier::new()) {
if let Some(LockedEntry {
mut locker,
data_block_mut,
mut entry_ptr,
index: _,
}) = result
{
let (k, v) = entry_ptr.get_mut(data_block_mut, &mut locker);
return Some(updater(k, v));
}
return None;
}
async_wait_pinned.await;
}
}
#[inline]
pub fn remove<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if(key, |_| true)
}
#[inline]
pub async fn remove_async<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if_async(key, |_| true).await
}
#[inline]
pub fn remove_if<Q, F: FnOnce(&V) -> bool>(&self, key: &Q, condition: F) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_entry(
key,
self.hash(key),
|v: &mut V| condition(v),
|r| r.is_some(),
&mut (),
&Barrier::new(),
)
.ok()
.map_or(false, |r| r)
}
#[inline]
pub async fn remove_if_async<Q, F: FnOnce(&V) -> bool>(&self, key: &Q, condition: F) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let hash = self.hash(key);
let mut condition = |v: &mut V| condition(v);
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
match self.remove_entry(
key,
hash,
condition,
|r| r.is_some(),
&mut async_wait_pinned,
&Barrier::new(),
) {
Ok(r) => return r,
Err(c) => condition = c,
};
async_wait_pinned.await;
}
}
#[inline]
pub fn read<Q, R, F: FnOnce(&K, &V) -> R>(&self, key: &Q, reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let barrier = Barrier::new();
self.read_entry(key, self.hash(key), &mut (), &barrier)
.ok()
.flatten()
.map(|(k, v)| reader(k, v))
}
#[inline]
pub fn read_with<'b, Q, R, F: FnOnce(&'b K, &'b V) -> R>(
&self,
key: &Q,
reader: F,
barrier: &'b Barrier,
) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read_entry(key, self.hash(key), &mut (), barrier)
.ok()
.flatten()
.map(|(k, v)| reader(k, v))
}
#[inline]
pub fn contains<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read(key, |_, _| ()).is_some()
}
#[inline]
pub fn retain<F: FnMut(&K, &V) -> bool>(&self, mut pred: F) -> (usize, usize) {
self.retain_entries(|k, v| pred(k, v))
}
#[inline]
pub async fn retain_async<F: FnMut(&K, &V) -> bool>(&self, mut pred: F) -> (usize, usize) {
let mut num_retained: usize = 0;
let mut num_removed: usize = 0;
let mut current_array_holder = self.array.get_arc(Acquire, &Barrier::new());
while let Some(current_array) = current_array_holder.take() {
self.cleanse_old_array_async(¤t_array).await;
for index in 0..current_array.num_buckets() {
loop {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
{
let barrier = Barrier::new();
let bucket = current_array.bucket_mut(index);
if let Ok(locker) =
Locker::try_lock_or_wait(bucket, &mut async_wait_pinned, &barrier)
{
if let Some(mut locker) = locker {
let data_block_mut = current_array.data_block_mut(index);
let mut entry_ptr = EntryPtr::new(&barrier);
while entry_ptr.next(&locker, &barrier) {
let (k, v) = entry_ptr.get(data_block_mut);
if pred(k, v) {
num_retained = num_retained.saturating_add(1);
} else {
locker.erase(data_block_mut, &mut entry_ptr);
num_removed = num_removed.saturating_add(1);
}
}
}
break;
};
}
async_wait_pinned.await;
}
}
if let Some(new_current_array) = self.array.get_arc(Acquire, &Barrier::new()) {
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
num_retained = 0;
current_array_holder.replace(new_current_array);
continue;
}
break;
}
if num_removed >= num_retained {
self.try_resize(0, &Barrier::new());
}
(num_retained, num_removed)
}
pub fn clear(&self) -> usize {
self.retain(|_, _| false).1
}
pub async fn clear_async(&self) -> usize {
self.retain_async(|_, _| false).await.1
}
#[inline]
pub fn len(&self) -> usize {
self.num_entries(&Barrier::new())
}
#[inline]
pub fn is_empty(&self) -> bool {
!self.has_entry(&Barrier::new())
}
#[inline]
pub fn capacity(&self) -> usize {
self.num_slots(&Barrier::new())
}
#[inline]
pub fn capacity_range(&self) -> RangeInclusive<usize> {
self.minimum_capacity.load(Relaxed)..=self.maximum_capacity()
}
#[inline]
pub fn iter<'h, 'b>(&'h self, barrier: &'b Barrier) -> Visitor<'h, 'b, K, V, H> {
Visitor {
hashindex: self,
current_array: None,
current_index: 0,
current_bucket: None,
current_entry_ptr: EntryPtr::new(barrier),
barrier,
}
}
async fn cleanse_old_array_async(&self, current_array: &BucketArray<K, V, OPTIMISTIC>) {
while !current_array.old_array(&Barrier::new()).is_null() {
let mut async_wait = AsyncWait::default();
let mut async_wait_pinned = Pin::new(&mut async_wait);
if self.incremental_rehash::<_, _, false>(
current_array,
&mut async_wait_pinned,
&Barrier::new(),
) == Ok(true)
{
break;
}
async_wait_pinned.await;
}
}
}
impl<K, V, H> Clone for HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher + Clone,
{
#[inline]
fn clone(&self) -> Self {
let self_clone = Self::with_capacity_and_hasher(self.capacity(), self.hasher().clone());
for (k, v) in self.iter(&Barrier::new()) {
let _reuslt = self_clone.insert(k.clone(), v.clone());
}
self_clone
}
}
impl<K, V, H> Debug for HashIndex<K, V, H>
where
K: 'static + Clone + Debug + Eq + Hash,
V: 'static + Clone + Debug,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let barrier = Barrier::new();
f.debug_map().entries(self.iter(&barrier)).finish()
}
}
impl<K, V> HashIndex<K, V, RandomState>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
{
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
#[must_use]
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_hasher(capacity, RandomState::new())
}
}
impl<K, V, H> Default for HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher + Default,
{
#[inline]
fn default() -> Self {
Self::with_hasher(H::default())
}
}
impl<K, V, H> HashTable<K, V, H, OPTIMISTIC> for HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
fn hasher(&self) -> &H {
&self.build_hasher
}
#[inline]
fn try_clone(entry: &(K, V)) -> Option<(K, V)> {
Some((entry.0.clone(), entry.1.clone()))
}
#[inline]
fn try_reset(_: &mut V) {}
#[inline]
fn bucket_array(&self) -> &AtomicArc<BucketArray<K, V, OPTIMISTIC>> {
&self.array
}
#[inline]
fn minimum_capacity(&self) -> &AtomicUsize {
&self.minimum_capacity
}
#[inline]
fn maximum_capacity(&self) -> usize {
1_usize << (usize::BITS - 1)
}
}
impl<K, V, H> PartialEq for HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone + PartialEq,
H: BuildHasher,
{
#[inline]
fn eq(&self, other: &Self) -> bool {
let barrier = Barrier::new();
if !self
.iter(&barrier)
.any(|(k, v)| other.read(k, |_, ov| v == ov) != Some(true))
{
return !other
.iter(&barrier)
.any(|(k, v)| self.read(k, |_, sv| v == sv) != Some(true));
}
false
}
}
impl<'h, K, V, H> Reserve<'h, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
#[must_use]
pub fn additional_capacity(&self) -> usize {
self.additional
}
}
impl<'h, K, V, H> AsRef<HashIndex<K, V, H>> for Reserve<'h, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
fn as_ref(&self) -> &HashIndex<K, V, H> {
self.hashindex
}
}
impl<'h, K, V, H> Debug for Reserve<'h, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("Reserve").field(&self.additional).finish()
}
}
impl<'h, K, V, H> Deref for Reserve<'h, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
type Target = HashIndex<K, V, H>;
#[inline]
fn deref(&self) -> &Self::Target {
self.hashindex
}
}
impl<'h, K, V, H> Drop for Reserve<'h, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
fn drop(&mut self) {
let result = self
.hashindex
.minimum_capacity
.fetch_sub(self.additional, Relaxed);
self.hashindex.try_resize(0, &Barrier::new());
debug_assert!(result >= self.additional);
}
}
impl<'h, 'b, K, V, H> Debug for Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Visitor")
.field("current_index", &self.current_index)
.field("current_entry_ptr", &self.current_entry_ptr)
.finish()
}
}
impl<'h, 'b, K, V, H> Iterator for Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
type Item = (&'b K, &'b V);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
let mut array = if let Some(array) = self.current_array.as_ref().copied() {
array
} else {
let current_array = self
.hashindex
.bucket_array()
.load(Acquire, self.barrier)
.as_ref()?;
let old_array_ptr = current_array.old_array(self.barrier);
let array = if let Some(old_array) = old_array_ptr.as_ref() {
old_array
} else {
current_array
};
self.current_array.replace(array);
self.current_bucket.replace(array.bucket(0));
self.current_entry_ptr = EntryPtr::new(self.barrier);
array
};
loop {
if let Some(bucket) = self.current_bucket.take() {
if self.current_entry_ptr.next(bucket, self.barrier) {
let (k, v) = self
.current_entry_ptr
.get(array.data_block(self.current_index));
self.current_bucket.replace(bucket);
return Some((k, v));
}
}
self.current_index += 1;
if self.current_index == array.num_buckets() {
let current_array = self
.hashindex
.bucket_array()
.load(Acquire, self.barrier)
.as_ref()?;
if self
.current_array
.as_ref()
.copied()
.map_or(false, |a| ptr::eq(a, current_array))
{
break;
}
let old_array_ptr = current_array.old_array(self.barrier);
if self
.current_array
.as_ref()
.copied()
.map_or(false, |a| ptr::eq(a, old_array_ptr.as_raw()))
{
array = current_array;
self.current_array.replace(array);
self.current_index = 0;
self.current_bucket.replace(array.bucket(0));
self.current_entry_ptr = EntryPtr::new(self.barrier);
continue;
}
array = if let Some(old_array) = old_array_ptr.as_ref() {
old_array
} else {
current_array
};
self.current_array.replace(array);
self.current_index = 0;
self.current_bucket.replace(array.bucket(0));
self.current_entry_ptr = EntryPtr::new(self.barrier);
continue;
}
self.current_bucket
.replace(array.bucket(self.current_index));
self.current_entry_ptr = EntryPtr::new(self.barrier);
}
None
}
}
impl<'h, 'b, K, V, H> FusedIterator for Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash,
V: 'static + Clone,
H: BuildHasher,
{
}
impl<'h, 'b, K, V, H> UnwindSafe for Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash + UnwindSafe,
V: 'static + Clone + UnwindSafe,
H: BuildHasher + UnwindSafe,
{
}
impl<V> Debug for ModifyAction<V> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Keep => write!(f, "Nothing"),
Self::Remove => write!(f, "Remove"),
Self::Update(_) => f.debug_tuple("Update").finish(),
}
}
}
impl<V> From<Option<Option<V>>> for ModifyAction<V> {
#[inline]
fn from(value: Option<Option<V>>) -> Self {
match value {
Some(Some(value)) => Self::Update(value),
Some(None) => Self::Remove,
None => Self::Keep,
}
}
}