pub mod array;
pub mod cell;
use array::Array;
use cell::{CellIterator, CellLocker, ARRAY_SIZE, MAX_RESIZING_FACTOR};
use crossbeam_epoch::{Atomic, Guard, Owned, Shared};
use std::collections::hash_map::RandomState;
use std::convert::TryInto;
use std::hash::{BuildHasher, Hash, Hasher};
use std::iter::FusedIterator;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
const DEFAULT_CAPACITY: usize = 64;
pub struct HashIndex<K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
array: Atomic<Array<K, V>>,
minimum_capacity: usize,
resize_mutex: AtomicBool,
build_hasher: H,
}
impl<K, V> Default for HashIndex<K, V, RandomState>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
{
fn default() -> Self {
HashIndex {
array: Atomic::new(Array::<K, V>::new(DEFAULT_CAPACITY, Atomic::null())),
minimum_capacity: DEFAULT_CAPACITY,
resize_mutex: AtomicBool::new(false),
build_hasher: RandomState::new(),
}
}
}
impl<K, V, H> HashIndex<K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
pub fn new(capacity: usize, build_hasher: H) -> HashIndex<K, V, H> {
let initial_capacity = capacity.max(DEFAULT_CAPACITY);
HashIndex {
array: Atomic::new(Array::<K, V>::new(initial_capacity, Atomic::null())),
minimum_capacity: initial_capacity,
resize_mutex: AtomicBool::new(false),
build_hasher,
}
}
pub fn insert(&self, key: K, value: V) -> Result<(), (K, V)> {
let guard = crossbeam_epoch::pin();
let (cell_locker, key, partial_hash) = self.reserve(key, &guard);
match cell_locker.insert(key, value, partial_hash, &guard) {
Ok(()) => Ok(()),
Err((key, value)) => Err((key, value)),
}
}
pub fn remove(&self, key: &K) -> bool {
let (hash, partial_hash) = self.hash(key);
let guard = crossbeam_epoch::pin();
let (cell_locker, cell_index) = self.lock(hash, &guard);
if cell_locker.remove(key, partial_hash, &guard) {
if cell_locker.cell_ref().num_entries() == 0 && cell_index < ARRAY_SIZE {
drop(cell_locker);
let current_array = self.array.load(Acquire, &guard);
let current_array_ref = self.array_ref(current_array);
if current_array_ref.old_array(&guard).is_null()
&& current_array_ref.capacity() > self.minimum_capacity
{
let sample_size = current_array_ref.num_sample_size();
let mut num_entries = 0;
for i in 0..sample_size {
num_entries += current_array_ref.cell_ref(i).num_entries();
if num_entries >= sample_size * ARRAY_SIZE / 16 {
return true;
}
}
self.resize(&guard);
}
}
return true;
}
false
}
pub fn read<R, F: FnOnce(&K, &V) -> R>(&self, key: &K, f: F) -> Option<R> {
let (hash, partial_hash) = self.hash(key);
let guard = crossbeam_epoch::pin();
let mut current_array_shared = self.array.load(Acquire, &guard);
loop {
let current_array_ref = self.array_ref(current_array_shared);
let old_array_shared = current_array_ref.old_array(&guard);
if !old_array_shared.is_null() {
if current_array_ref.partial_rehash(|key| self.hash(key), &guard) {
continue;
}
let old_array_ref = self.array_ref(old_array_shared);
let cell_index = old_array_ref.calculate_cell_index(hash);
let cell_ref = old_array_ref.cell_ref(cell_index);
if let Some(entry) = cell_ref.search(key, partial_hash, &guard) {
return Some(f(&entry.0, &entry.1));
}
}
let cell_index = current_array_ref.calculate_cell_index(hash);
let cell_ref = current_array_ref.cell_ref(cell_index);
if let Some(entry) = cell_ref.search(key, partial_hash, &guard) {
return Some(f(&entry.0, &entry.1));
}
let new_current_array_shared = self.array.load(Acquire, &guard);
if new_current_array_shared == current_array_shared {
break;
}
current_array_shared = new_current_array_shared;
}
None
}
pub fn clear(&self) {
let guard = crossbeam_epoch::pin();
let mut current_array_shared = self.array.load(Acquire, &guard);
loop {
let current_array_ref = self.array_ref(current_array_shared);
let old_array_shared = current_array_ref.old_array(&guard);
if !old_array_shared.is_null() {
while !current_array_ref.partial_rehash(|key| self.hash(key), &guard) {
continue;
}
}
for index in 0..current_array_ref.num_cells() {
if let Some(mut cell_locker) =
CellLocker::lock(current_array_ref.cell_ref(index), &guard)
{
cell_locker.purge(&guard);
}
}
let new_current_array_shared = self.array.load(Acquire, &guard);
if current_array_shared == new_current_array_shared {
break;
}
current_array_shared = new_current_array_shared;
}
}
pub fn len<F: FnOnce(usize) -> usize>(&self, f: F) -> usize {
let guard = crossbeam_epoch::pin();
let current_array = self.array.load(Acquire, &guard);
let current_array_ref = self.array_ref(current_array);
let capacity = current_array_ref.capacity();
let num_samples = std::cmp::min(f(capacity), capacity).next_power_of_two();
let num_cells_to_sample = (num_samples / ARRAY_SIZE).max(1);
if !current_array_ref.old_array(&guard).is_null() {
for _ in 0..num_cells_to_sample {
if current_array_ref.partial_rehash(|key| self.hash(key), &guard) {
break;
}
}
}
self.estimate(current_array_ref, num_cells_to_sample)
}
pub fn capacity(&self) -> usize {
let guard = crossbeam_epoch::pin();
let current_array = self.array.load(Acquire, &guard);
let current_array_ref = self.array_ref(current_array);
if !current_array_ref.old_array(&guard).is_null() {
current_array_ref.partial_rehash(|key| self.hash(key), &guard);
}
current_array_ref.capacity()
}
pub fn hasher(&self) -> &H {
&self.build_hasher
}
pub fn iter(&self) -> Visitor<K, V, H> {
Visitor {
hash_index: self,
current_array: Shared::null(),
current_index: 0,
current_cell_iterator: None,
guard: None,
}
}
fn hash(&self, key: &K) -> (u64, u8) {
let mut h = self.build_hasher.build_hasher();
key.hash(&mut h);
let mut hash = h.finish();
hash = hash ^ (hash.rotate_right(25) ^ hash.rotate_right(50));
hash = hash.overflowing_mul(0xA24BAED4963EE407u64).0;
hash = hash ^ (hash.rotate_right(24) ^ hash.rotate_right(49));
hash = hash.overflowing_mul(0x9FB21C651E98DF25u64).0;
hash = hash ^ (hash >> 28);
(hash, (hash & ((1 << 8) - 1)).try_into().unwrap())
}
fn array_ref<'g>(&self, array_shared: Shared<'g, Array<K, V>>) -> &'g Array<K, V> {
unsafe { array_shared.deref() }
}
fn reserve<'g>(&self, key: K, guard: &'g Guard) -> (CellLocker<'g, K, V>, K, u8) {
let (hash, partial_hash) = self.hash(&key);
let mut resize_triggered = false;
loop {
let (cell_locker, cell_index) = self.lock(hash, guard);
if !resize_triggered
&& cell_index < ARRAY_SIZE
&& cell_locker.cell_ref().num_entries() >= ARRAY_SIZE
{
drop(cell_locker);
resize_triggered = true;
let current_array = self.array.load(Acquire, &guard);
let current_array_ref = self.array_ref(current_array);
if current_array_ref.old_array(&guard).is_null() {
let sample_size = current_array_ref.num_sample_size();
let threshold = sample_size * (ARRAY_SIZE / 8) * 7;
let mut num_entries = 0;
for i in 0..sample_size {
num_entries += current_array_ref.cell_ref(i).num_entries();
if num_entries > threshold {
self.resize(guard);
break;
}
}
}
continue;
}
return (cell_locker, key, partial_hash);
}
}
fn lock<'g>(&self, hash: u64, guard: &'g Guard) -> (CellLocker<'g, K, V>, usize) {
loop {
let current_array_shared = self.array.load(Acquire, &guard);
let current_array_ref = self.array_ref(current_array_shared);
let old_array_shared = current_array_ref.old_array(&guard);
if !old_array_shared.is_null() {
if current_array_ref.partial_rehash(|key| self.hash(key), &guard) {
continue;
}
let old_array_ref = self.array_ref(old_array_shared);
let cell_index = old_array_ref.calculate_cell_index(hash);
if let Some(mut cell_locker) =
CellLocker::lock(old_array_ref.cell_ref(cell_index), guard)
{
current_array_ref.kill_cell(
&mut cell_locker,
old_array_ref,
cell_index,
&|key| self.hash(key),
&guard,
);
}
}
let cell_index = current_array_ref.calculate_cell_index(hash);
if let Some(cell_locker) =
CellLocker::lock(current_array_ref.cell_ref(cell_index), guard)
{
return (cell_locker, cell_index);
}
}
}
fn estimate(&self, current_array_ref: &Array<K, V>, num_cells_to_sample: usize) -> usize {
let mut num_entries = 0;
for i in 0..num_cells_to_sample {
num_entries += current_array_ref.cell_ref(i).num_entries();
}
num_entries * (current_array_ref.num_cells() / num_cells_to_sample)
}
fn resize(&self, guard: &Guard) {
let current_array = self.array.load(Acquire, &guard);
let current_array_ref = self.array_ref(current_array);
let old_array = current_array_ref.old_array(&guard);
if !old_array.is_null() {
let old_array_removed = current_array_ref.partial_rehash(|key| self.hash(key), &guard);
if !old_array_removed {
return;
}
}
if !self.resize_mutex.swap(true, Acquire) {
let memory_ordering = Relaxed;
let mut mutex_guard = scopeguard::guard(memory_ordering, |memory_ordering| {
self.resize_mutex.store(false, memory_ordering);
});
if current_array != self.array.load(Acquire, &guard) {
return;
}
let capacity = current_array_ref.capacity();
let num_cells = current_array_ref.num_cells();
let num_cells_to_sample = (num_cells / 8).max(DEFAULT_CAPACITY / ARRAY_SIZE).min(4096);
let estimated_num_entries = self.estimate(current_array_ref, num_cells_to_sample);
let new_capacity = if estimated_num_entries >= (capacity / 8) * 7 {
let max_capacity = 1usize << (std::mem::size_of::<usize>() * 8 - 1);
if capacity == max_capacity {
capacity
} else if estimated_num_entries <= (capacity / 8) * 9 {
capacity * 2
} else {
let new_capacity_candidate = estimated_num_entries
.next_power_of_two()
.min(max_capacity / 2)
* 2;
if new_capacity_candidate / capacity > (1 << MAX_RESIZING_FACTOR) {
capacity * (1 << MAX_RESIZING_FACTOR)
} else {
new_capacity_candidate
}
}
} else if estimated_num_entries <= capacity / 8 {
estimated_num_entries
.next_power_of_two()
.max(self.minimum_capacity)
} else {
capacity
};
if new_capacity != capacity {
self.array.store(
Owned::new(Array::<K, V>::new(
new_capacity,
Atomic::from(current_array),
)),
Release,
);
*mutex_guard = Release;
}
}
}
}
impl<K, V, H> Drop for HashIndex<K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
fn drop(&mut self) {
let guard = unsafe { crossbeam_epoch::unprotected() };
let current_array = self.array.load(Acquire, guard);
let current_array_ref = self.array_ref(current_array);
current_array_ref.drop_old_array(true, guard);
let array = self.array.swap(Shared::null(), Relaxed, guard);
if !array.is_null() {
let array = unsafe { array.into_owned() };
for index in 0..array.num_cells() {
if let Some(mut cell_locker) = CellLocker::lock(array.cell_ref(index), guard) {
cell_locker.kill();
}
}
}
}
}
pub struct Visitor<'h, K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
hash_index: &'h HashIndex<K, V, H>,
current_array: Shared<'h, Array<K, V>>,
current_index: usize,
current_cell_iterator: Option<CellIterator<'h, K, V>>,
guard: Option<Guard>,
}
impl<'h, K, V, H> Visitor<'h, K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
fn guard_ref(&self) -> &'h Guard {
unsafe { std::mem::transmute::<_, &'h Guard>(self.guard.as_ref().unwrap()) }
}
}
impl<'h, K, V, H> Iterator for Visitor<'h, K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
type Item = (&'h K, &'h V);
fn next(&mut self) -> Option<Self::Item> {
if self.guard.is_none() {
self.guard.replace(crossbeam_epoch::pin());
let current_array = self.hash_index.array.load(Acquire, self.guard_ref());
let current_array_ref = self.hash_index.array_ref(current_array);
let old_array = current_array_ref.old_array(self.guard_ref());
self.current_array = if !old_array.is_null() {
old_array
} else {
current_array
};
self.current_cell_iterator.replace(CellIterator::new(
self.hash_index.array_ref(self.current_array).cell_ref(0),
self.guard_ref(),
));
}
loop {
if let Some(iterator) = self.current_cell_iterator.as_mut() {
if let Some(entry) = iterator.next() {
return Some((&entry.0, &entry.1));
}
}
let array_ref = self.hash_index.array_ref(self.current_array);
self.current_index += 1;
if self.current_index == array_ref.num_cells() {
let current_array = self.hash_index.array.load(Acquire, self.guard_ref());
if self.current_array == current_array {
break;
}
let current_array_ref = self.hash_index.array_ref(current_array);
let old_array = current_array_ref.old_array(self.guard_ref());
if self.current_array == old_array {
self.current_array = current_array;
self.current_index = 0;
self.current_cell_iterator.replace(CellIterator::new(
self.hash_index.array_ref(self.current_array).cell_ref(0),
self.guard_ref(),
));
continue;
}
self.current_array = if !old_array.is_null() {
old_array
} else {
current_array
};
self.current_index = 0;
self.current_cell_iterator.replace(CellIterator::new(
self.hash_index.array_ref(self.current_array).cell_ref(0),
self.guard_ref(),
));
continue;
} else {
self.current_cell_iterator.replace(CellIterator::new(
array_ref.cell_ref(self.current_index),
self.guard_ref(),
));
}
}
None
}
}
impl<'h, K, V, H> FusedIterator for Visitor<'h, K, V, H>
where
K: Clone + Eq + Hash + Sync,
V: Clone + Sync,
H: BuildHasher,
{
}