use super::async_yield::{self, AwaitableBarrier};
use super::ebr::{Arc, AtomicArc, Barrier, Ptr};
use super::hash_table::cell::{EntryIterator, Locker};
use super::hash_table::cell_array::CellArray;
use super::hash_table::HashTable;
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash};
use std::iter::FusedIterator;
use std::sync::atomic::AtomicU8;
use std::sync::atomic::Ordering::Acquire;
pub struct HashIndex<K, V, H = RandomState>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: BuildHasher,
{
array: AtomicArc<CellArray<K, V, true>>,
minimum_capacity: usize,
resize_mutex: AtomicU8,
build_hasher: H,
}
impl<K, V, H> HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: BuildHasher,
{
#[inline]
pub fn new(capacity: usize, build_hasher: H) -> HashIndex<K, V, H> {
let initial_capacity = capacity.max(Self::default_capacity());
HashIndex {
array: AtomicArc::from(Arc::new(CellArray::<K, V, true>::new(
initial_capacity,
AtomicArc::null(),
))),
minimum_capacity: initial_capacity,
resize_mutex: AtomicU8::new(0),
build_hasher,
}
}
#[inline]
pub fn insert(&self, key: K, val: V) -> Result<(), (K, V)> {
let (hash, partial_hash) = self.hash(&key);
if let Some((k, v)) = self
.insert_entry::<false>(key, val, hash, partial_hash, &Barrier::new())
.ok()
.unwrap()
{
Err((k, v))
} else {
Ok(())
}
}
#[inline]
pub async fn insert_async(&self, mut key: K, mut val: V) -> Result<(), (K, V)> {
let (hash, partial_hash) = self.hash(&key);
loop {
match self.insert_entry::<true>(key, val, hash, partial_hash, &Barrier::new()) {
Ok(Some(returned)) => return Err(returned),
Ok(None) => return Ok(()),
Err(returned) => {
key = returned.0;
val = returned.1;
}
}
async_yield::async_yield().await;
}
}
#[inline]
pub fn remove<Q>(&self, key_ref: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if(key_ref, |_| true)
}
#[inline]
pub async fn remove_async<Q>(&self, key_ref: &Q) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.remove_if_async(key_ref, |_| true).await
}
#[inline]
pub fn remove_if<Q, F: FnMut(&V) -> bool>(&self, key_ref: &Q, mut condition: F) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let (hash, partial_hash) = self.hash(key_ref);
self.remove_entry::<Q, _, false>(
key_ref,
hash,
partial_hash,
&mut condition,
&Barrier::new(),
)
.ok()
.map_or(false, |(_, r)| r)
}
#[inline]
pub async fn remove_if_async<Q, F: FnMut(&V) -> bool>(
&self,
key_ref: &Q,
mut condition: F,
) -> Option<(K, V)>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let (hash, partial_hash) = self.hash(key_ref);
loop {
if let Ok(result) = self.remove_entry::<Q, F, true>(
key_ref,
hash,
partial_hash,
&mut condition,
&Barrier::new(),
) {
return result.0;
}
async_yield::async_yield().await;
}
}
#[inline]
pub fn read<Q, R, F: Fn(&K, &V) -> R>(&self, key_ref: &Q, reader: F) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let barrier = Barrier::new();
self.read_with(key_ref, reader, &barrier)
}
#[inline]
pub fn read_with<'b, Q, R, F: FnMut(&'b K, &'b V) -> R>(
&self,
key_ref: &Q,
mut reader: F,
barrier: &'b Barrier,
) -> Option<R>
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let (hash, partial_hash) = self.hash(key_ref);
self.read_entry::<Q, R, F, false>(key_ref, hash, partial_hash, &mut reader, barrier)
.ok()
.and_then(|r| r)
}
#[inline]
pub fn contains<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
self.read(key, |_, _| ()).is_some()
}
pub fn clear(&self) -> usize {
let mut num_removed: usize = 0;
let barrier = Barrier::new();
let mut current_array_ptr = self.array.load(Acquire, &barrier);
while let Some(current_array_ref) = current_array_ptr.as_ref() {
if !current_array_ref.old_array(&barrier).is_null() {
while !current_array_ref.partial_rehash::<_, _, _, false>(
|k| self.hash(k),
|k, v| Some((k.clone(), v.clone())),
&barrier,
) {
current_array_ptr = self.array.load(Acquire, &barrier);
continue;
}
}
for index in 0..current_array_ref.num_cells() {
if let Some(locker) = Locker::lock(current_array_ref.cell(index), &barrier) {
let mut iterator = locker.cell().iter(&barrier);
while iterator.next().is_some() {
locker.erase(&mut iterator);
num_removed = num_removed.saturating_add(1);
}
}
}
let new_current_array_ptr = self.array.load(Acquire, &barrier);
if current_array_ptr == new_current_array_ptr {
self.resize(&barrier);
break;
}
current_array_ptr = new_current_array_ptr;
}
num_removed
}
pub async fn clear_async(&self) -> usize {
let mut num_removed: usize = 0;
let mut awaitable_barrier = AwaitableBarrier::default();
let mut current_array_holder = self.array.get_arc(Acquire, awaitable_barrier.barrier());
while let Some(current_array) = current_array_holder.take() {
while !current_array
.old_array(awaitable_barrier.barrier())
.is_null()
{
if current_array.partial_rehash::<_, _, _, true>(
|key| self.hash(key),
|_, _| None,
awaitable_barrier.barrier(),
) {
continue;
}
awaitable_barrier.drop_barrier_and_yield().await;
}
for cell_index in 0..current_array.num_cells() {
loop {
{
let barrier = awaitable_barrier.barrier();
if let Ok(result) =
Locker::try_lock(current_array.cell(cell_index), barrier)
{
if let Some(locker) = result {
let mut iterator = locker.cell().iter(barrier);
while iterator.next().is_some() {
locker.erase(&mut iterator);
num_removed = num_removed.saturating_add(1);
}
}
break;
}
}
awaitable_barrier.drop_barrier_and_yield().await;
}
}
if let Some(new_current_array) =
self.array.get_arc(Acquire, awaitable_barrier.barrier())
{
if new_current_array.as_ptr() == current_array.as_ptr() {
break;
}
current_array_holder.replace(new_current_array);
continue;
}
break;
}
if num_removed != 0 {
self.resize(&Barrier::new());
}
num_removed
}
#[inline]
pub fn len(&self) -> usize {
self.num_entries(&Barrier::new())
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn capacity(&self) -> usize {
self.num_slots(&Barrier::new())
}
#[inline]
pub fn iter<'h, 'b>(&'h self, barrier: &'b Barrier) -> Visitor<'h, 'b, K, V, H> {
Visitor {
hash_index: self,
current_array_ptr: Ptr::null(),
current_index: 0,
current_entry_iterator: None,
barrier_ref: barrier,
}
}
}
impl<K, V> Default for HashIndex<K, V, RandomState>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
{
#[inline]
fn default() -> Self {
HashIndex {
array: AtomicArc::from(Arc::new(CellArray::<K, V, true>::new(
Self::default_capacity(),
AtomicArc::null(),
))),
minimum_capacity: Self::default_capacity(),
resize_mutex: AtomicU8::new(0),
build_hasher: RandomState::new(),
}
}
}
impl<K, V, H> HashTable<K, V, H, true> for HashIndex<K, V, H>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: BuildHasher,
{
fn hasher(&self) -> &H {
&self.build_hasher
}
fn copier(key: &K, val: &V) -> Option<(K, V)> {
Some((key.clone(), val.clone()))
}
fn cell_array(&self) -> &AtomicArc<CellArray<K, V, true>> {
&self.array
}
fn minimum_capacity(&self) -> usize {
self.minimum_capacity
}
fn resize_mutex(&self) -> &AtomicU8 {
&self.resize_mutex
}
}
pub struct Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: BuildHasher,
{
hash_index: &'h HashIndex<K, V, H>,
current_array_ptr: Ptr<'b, CellArray<K, V, true>>,
current_index: usize,
current_entry_iterator: Option<EntryIterator<'b, K, V, true>>,
barrier_ref: &'b Barrier,
}
impl<'h, 'b, K, V, H> Iterator for Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: 'static + BuildHasher,
{
type Item = (&'b K, &'b V);
fn next(&mut self) -> Option<Self::Item> {
if self.current_array_ptr.is_null() {
let current_array_ptr = self.hash_index.array.load(Acquire, self.barrier_ref);
let current_array_ref = current_array_ptr.as_ref().unwrap();
let old_array_ptr = current_array_ref.old_array(self.barrier_ref);
self.current_array_ptr = if old_array_ptr.is_null() {
current_array_ptr
} else {
old_array_ptr
};
let cell_ref = self.current_array_ptr.as_ref().unwrap().cell(0);
self.current_entry_iterator
.replace(EntryIterator::new(cell_ref, self.barrier_ref));
}
loop {
if let Some(iterator) = self.current_entry_iterator.as_mut() {
if let Some(entry) = iterator.next() {
return Some((&entry.0 .0, &entry.0 .1));
}
}
let array_ref = self.current_array_ptr.as_ref().unwrap();
self.current_index += 1;
if self.current_index == array_ref.num_cells() {
let current_array_ptr = self.hash_index.array.load(Acquire, self.barrier_ref);
if self.current_array_ptr == current_array_ptr {
break;
}
let current_array_ref = current_array_ptr.as_ref().unwrap();
let old_array_ptr = current_array_ref.old_array(self.barrier_ref);
if self.current_array_ptr == old_array_ptr {
self.current_array_ptr = current_array_ptr;
self.current_index = 0;
self.current_entry_iterator.replace(EntryIterator::new(
current_array_ref.cell(0),
self.barrier_ref,
));
continue;
}
self.current_array_ptr = if old_array_ptr.is_null() {
current_array_ptr
} else {
old_array_ptr
};
self.current_index = 0;
self.current_entry_iterator.replace(EntryIterator::new(
self.current_array_ptr.as_ref().unwrap().cell(0),
self.barrier_ref,
));
continue;
}
self.current_entry_iterator.replace(EntryIterator::new(
array_ref.cell(self.current_index),
self.barrier_ref,
));
}
None
}
}
impl<'h, 'b, K, V, H> FusedIterator for Visitor<'h, 'b, K, V, H>
where
K: 'static + Clone + Eq + Hash + Sync,
V: 'static + Clone + Sync,
H: 'static + BuildHasher,
{
}