extern crate alloc;
use alloc::boxed::Box;
use alloc::vec::Vec;
use core::borrow::Borrow;
use core::hash::{BuildHasher, Hash};
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use foldhash::fast::FixedState;
use kovan::{Atomic, RetiredNode, Shared, pin, retire};
const NEIGHBORHOOD_SIZE: usize = 32;
const INITIAL_CAPACITY: usize = 64;
const GROW_THRESHOLD: f64 = 0.75;
const SHRINK_THRESHOLD: f64 = 0.25;
const MIN_CAPACITY: usize = 64;
const MAX_PROBE_DISTANCE: usize = 512;
struct Bucket<K, V> {
hop_info: AtomicU32,
slot: Atomic<Entry<K, V>>,
write_guard: AtomicBool,
}
struct WriteGuardRelease<'t, K: 'static, V: 'static> {
table: &'t Table<K, V>,
idx: usize,
}
impl<K, V> Drop for WriteGuardRelease<'_, K, V> {
fn drop(&mut self) {
self.table
.get_bucket(self.idx)
.write_guard
.store(false, Ordering::Release);
}
}
#[repr(C)]
struct Entry<K, V> {
retired: RetiredNode,
hash: u64,
key: K,
value: V,
}
impl<K: Clone, V: Clone> Clone for Entry<K, V> {
fn clone(&self) -> Self {
Self {
retired: RetiredNode::new(),
hash: self.hash,
key: self.key.clone(),
value: self.value.clone(),
}
}
}
unsafe impl<K: Send, V: Send> Send for Entry<K, V> {}
unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Entry<K, V> {}
#[repr(C)]
struct Table<K, V> {
retired: RetiredNode,
buckets: Box<[Bucket<K, V>]>,
capacity: usize,
mask: usize,
}
unsafe impl<K: Send, V: Send> Send for Table<K, V> {}
unsafe impl<K: Send + Sync, V: Send + Sync> Sync for Table<K, V> {}
impl<K, V> Table<K, V> {
fn new(capacity: usize) -> Self {
let capacity = capacity.next_power_of_two().max(MIN_CAPACITY);
let mut buckets = Vec::with_capacity(capacity + NEIGHBORHOOD_SIZE);
for _ in 0..(capacity + NEIGHBORHOOD_SIZE) {
buckets.push(Bucket {
hop_info: AtomicU32::new(0),
slot: Atomic::null(),
write_guard: AtomicBool::new(false),
});
}
Self {
retired: RetiredNode::new(),
buckets: buckets.into_boxed_slice(),
capacity,
mask: capacity - 1,
}
}
#[inline(always)]
fn bucket_index(&self, hash: u64) -> usize {
(hash as usize) & self.mask
}
#[inline(always)]
fn get_bucket(&self, idx: usize) -> &Bucket<K, V> {
unsafe { self.buckets.get_unchecked(idx) }
}
}
impl<K, V> Drop for Table<K, V> {
fn drop(&mut self) {
let guard = pin();
for i in 0..(self.capacity + NEIGHBORHOOD_SIZE) {
let entry_ptr = self.buckets[i]
.slot
.load(Ordering::Relaxed, &guard)
.as_raw();
if !entry_ptr.is_null() {
unsafe {
drop(Box::from_raw(entry_ptr));
}
}
}
}
}
pub struct HopscotchMap<K: 'static, V: 'static, S = FixedState> {
table: Atomic<Table<K, V>>,
count: AtomicUsize,
resizing: AtomicBool,
hasher: S,
}
#[cfg(feature = "std")]
impl<K, V> HopscotchMap<K, V, FixedState>
where
K: Hash + Eq + Clone + 'static,
V: Clone + 'static,
{
pub fn new() -> Self {
Self::with_hasher(FixedState::default())
}
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_hasher(capacity, FixedState::default())
}
}
impl<K, V, S> HopscotchMap<K, V, S>
where
K: Hash + Eq + Clone + 'static,
V: Clone + 'static,
S: BuildHasher,
{
pub fn with_hasher(hasher: S) -> Self {
Self::with_capacity_and_hasher(INITIAL_CAPACITY, hasher)
}
pub fn with_capacity_and_hasher(capacity: usize, hasher: S) -> Self {
let table = Table::new(capacity);
Self {
table: Atomic::new(Box::into_raw(Box::new(table))),
count: AtomicUsize::new(0),
resizing: AtomicBool::new(false),
hasher,
}
}
pub fn len(&self) -> usize {
self.count.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
unsafe { (*table_ptr.as_raw()).capacity }
}
#[inline]
fn wait_for_resize(&self) {
while self.resizing.load(Ordering::Acquire) {
core::hint::spin_loop();
}
}
pub fn get<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.hasher.hash_one(key);
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
let table = unsafe { &*table_ptr.as_raw() };
let bucket_idx = table.bucket_index(hash);
let bucket = table.get_bucket(bucket_idx);
let hop_info = bucket.hop_info.load(Ordering::Acquire);
if hop_info == 0 {
return None;
}
for offset in 0..NEIGHBORHOOD_SIZE {
if hop_info & (1 << offset) != 0 {
let slot_idx = bucket_idx + offset;
let slot_bucket = table.get_bucket(slot_idx);
let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
if !entry_ptr.is_null() {
let entry = unsafe { &*entry_ptr.as_raw() };
if entry.hash == hash && entry.key.borrow() == key {
return Some(entry.value.clone());
}
}
}
}
None
}
pub fn insert(&self, key: K, value: V) -> Option<V> {
self.insert_impl(key, value, false)
}
fn insert_impl(&self, key: K, value: V, only_if_absent: bool) -> Option<V> {
let hash = self.hasher.hash_one(&key);
let mut counted = false;
loop {
self.wait_for_resize();
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
let table = unsafe { &*table_ptr.as_raw() };
if self.resizing.load(Ordering::Acquire) {
continue;
}
if self.acquire_write_guard(table, hash).is_none() {
core::hint::spin_loop();
continue;
}
let insert_result = {
let _wg = WriteGuardRelease {
table,
idx: table.bucket_index(hash),
};
self.try_insert(
table,
hash,
key.clone(),
value.clone(),
only_if_absent,
&guard,
)
};
match insert_result {
InsertResult::Success(old_val) => {
if old_val.is_none() && !counted {
self.count.fetch_add(1, Ordering::Relaxed);
counted = true;
}
if self.resizing.load(Ordering::SeqCst)
|| self.table.load(Ordering::SeqCst, &guard) != table_ptr
{
continue;
}
if counted {
let new_count = self.count.load(Ordering::Relaxed);
let current_capacity = table.capacity;
let load_factor = new_count as f64 / current_capacity as f64;
if load_factor > GROW_THRESHOLD {
drop(guard);
self.try_resize(current_capacity * 2);
}
}
return old_val;
}
InsertResult::Exists(existing_val) => {
return Some(existing_val);
}
InsertResult::NeedResize => {
let current_capacity = table.capacity;
drop(guard);
self.try_resize(current_capacity * 2);
continue;
}
InsertResult::Retry => {
continue;
}
}
}
}
pub fn get_or_insert(&self, key: K, value: V) -> V {
if let Some(v) = self.get(&key) {
return v;
}
let key2 = key.clone();
match self.insert_impl(key, value.clone(), true) {
None => {
self.get(&key2).unwrap_or(value)
}
Some(existing) => existing, }
}
pub fn insert_if_absent(&self, key: K, value: V) -> Option<V> {
self.insert_impl(key, value, true)
}
pub fn force_remove<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let mut newest = None;
loop {
match self.remove(key) {
Some(v) => {
if newest.is_none() {
newest = Some(v);
}
}
None => return newest,
}
}
}
pub fn remove<Q>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
let hash = self.hasher.hash_one(key);
let mut result: Option<V> = None;
'outer: loop {
self.wait_for_resize();
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
let table_raw = table_ptr.as_raw();
let table = unsafe { &*table_raw };
if self.resizing.load(Ordering::Acquire) {
continue;
}
let bucket_idx = table.bucket_index(hash);
let bucket = table.get_bucket(bucket_idx);
let hop_info = bucket.hop_info.load(Ordering::Acquire);
if hop_info == 0 {
return result;
}
for offset in 0..NEIGHBORHOOD_SIZE {
if hop_info & (1 << offset) != 0 {
let slot_idx = bucket_idx + offset;
let slot_bucket = table.get_bucket(slot_idx);
let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, &guard);
if !entry_ptr.is_null() {
let entry = unsafe { &*entry_ptr.as_raw() };
if entry.hash == hash && entry.key.borrow() == key {
let old_value = entry.value.clone();
match slot_bucket.slot.compare_exchange(
entry_ptr,
unsafe { Shared::from_raw(core::ptr::null_mut()) },
Ordering::Release,
Ordering::Relaxed,
&guard,
) {
Ok(_) => {
let mask = !(1u32 << offset);
bucket.hop_info.fetch_and(mask, Ordering::Release);
unsafe { retire(entry_ptr.as_raw()) };
if result.is_none() {
result = Some(old_value);
}
let shrink_to = if let Ok(prev) = self.count.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|c| c.checked_sub(1),
) {
let new_count = prev - 1;
let current_capacity = table.capacity;
let load_factor =
new_count as f64 / current_capacity as f64;
(load_factor < SHRINK_THRESHOLD
&& current_capacity > MIN_CAPACITY)
.then_some(current_capacity / 2)
} else {
None
};
if self.resizing.load(Ordering::SeqCst)
|| self.table.load(Ordering::SeqCst, &guard).as_raw()
!= table_raw
{
continue 'outer;
}
if let Some(cap) = shrink_to {
drop(guard);
self.try_resize(cap);
}
return result;
}
Err(_) => {
break;
}
}
}
}
}
}
return result;
}
}
pub fn clear(&self) {
while self
.resizing
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_err()
{
core::hint::spin_loop();
}
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
let table = unsafe { &*table_ptr.as_raw() };
for i in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
let bucket = table.get_bucket(i);
let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
if !entry_ptr.is_null()
&& bucket
.slot
.compare_exchange(
entry_ptr,
unsafe { Shared::from_raw(core::ptr::null_mut()) },
Ordering::Release,
Ordering::Relaxed,
&guard,
)
.is_ok()
{
unsafe { retire(entry_ptr.as_raw()) };
}
if i < table.capacity {
let b = table.get_bucket(i);
b.hop_info.store(0, Ordering::Release);
}
}
self.count.store(0, Ordering::Release);
self.resizing.store(false, Ordering::Release);
}
fn acquire_write_guard(&self, table: &Table<K, V>, hash: u64) -> Option<()> {
let bucket = table.get_bucket(table.bucket_index(hash));
if bucket.write_guard.swap(true, Ordering::Acquire) {
None
} else {
Some(())
}
}
fn try_insert(
&self,
table: &Table<K, V>,
hash: u64,
key: K,
value: V,
only_if_absent: bool,
guard: &kovan::Guard,
) -> InsertResult<V> {
let bucket_idx = table.bucket_index(hash);
let bucket = table.get_bucket(bucket_idx);
let hop_info = bucket.hop_info.load(Ordering::Acquire);
for offset in 0..NEIGHBORHOOD_SIZE {
if hop_info & (1 << offset) != 0 {
let slot_idx = bucket_idx + offset;
let slot_bucket = table.get_bucket(slot_idx);
let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
if !entry_ptr.is_null() {
let entry = unsafe { &*entry_ptr.as_raw() };
if entry.hash == hash && entry.key == key {
if only_if_absent {
return InsertResult::Exists(entry.value.clone());
}
let old_value = entry.value.clone();
let new_entry = Box::into_raw(Box::new(Entry {
retired: RetiredNode::new(),
hash,
key: key.clone(),
value: value.clone(),
}));
match slot_bucket.slot.compare_exchange(
entry_ptr,
unsafe { Shared::from_raw(new_entry) },
Ordering::Release,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
unsafe { retire(entry_ptr.as_raw()) };
return InsertResult::Success(Some(old_value));
}
Err(_) => {
drop(unsafe { Box::from_raw(new_entry) });
return InsertResult::Retry;
}
}
}
}
}
}
for offset in 0..NEIGHBORHOOD_SIZE {
let slot_idx = bucket_idx + offset;
if slot_idx >= table.capacity + NEIGHBORHOOD_SIZE {
return InsertResult::NeedResize;
}
let slot_bucket = table.get_bucket(slot_idx);
let entry_ptr = slot_bucket.slot.load(Ordering::Acquire, guard);
if entry_ptr.is_null() {
let new_entry = Box::into_raw(Box::new(Entry {
retired: RetiredNode::new(),
hash,
key: key.clone(),
value: value.clone(),
}));
match slot_bucket.slot.compare_exchange(
unsafe { Shared::from_raw(core::ptr::null_mut()) },
unsafe { Shared::from_raw(new_entry) },
Ordering::Release,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
bucket.hop_info.fetch_or(1u32 << offset, Ordering::Release);
return InsertResult::Success(None);
}
Err(_) => {
drop(unsafe { Box::from_raw(new_entry) });
continue;
}
}
}
}
match self.try_find_closer_slot(table, bucket_idx, guard) {
Some(final_offset) if final_offset < NEIGHBORHOOD_SIZE => {
let slot_idx = bucket_idx + final_offset;
let slot_bucket = table.get_bucket(slot_idx);
let curr = slot_bucket.slot.load(Ordering::Relaxed, guard);
if !curr.is_null() {
return InsertResult::Retry;
}
let new_entry = Box::into_raw(Box::new(Entry {
retired: RetiredNode::new(),
hash,
key,
value,
}));
match slot_bucket.slot.compare_exchange(
unsafe { Shared::from_raw(core::ptr::null_mut()) },
unsafe { Shared::from_raw(new_entry) },
Ordering::Release,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
bucket
.hop_info
.fetch_or(1u32 << final_offset, Ordering::Release);
InsertResult::Success(None)
}
Err(_) => {
drop(unsafe { Box::from_raw(new_entry) });
InsertResult::Retry
}
}
}
_ => InsertResult::NeedResize,
}
}
fn try_find_closer_slot(
&self,
table: &Table<K, V>,
bucket_idx: usize,
guard: &kovan::Guard,
) -> Option<usize> {
for probe_offset in NEIGHBORHOOD_SIZE..MAX_PROBE_DISTANCE {
let probe_idx = bucket_idx + probe_offset;
if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
return None;
}
let probe_bucket = table.get_bucket(probe_idx);
let entry_ptr = probe_bucket.slot.load(Ordering::Acquire, guard);
if entry_ptr.is_null() {
return self.try_move_closer(table, bucket_idx, probe_idx, guard);
}
}
None
}
fn try_move_closer(
&self,
table: &Table<K, V>,
target_idx: usize,
empty_idx: usize,
guard: &kovan::Guard,
) -> Option<usize> {
let mut current_empty = empty_idx;
while current_empty > target_idx + NEIGHBORHOOD_SIZE - 1 {
let mut moved = false;
for offset in 1..NEIGHBORHOOD_SIZE.min(current_empty - target_idx) {
let candidate_idx = current_empty - offset;
let candidate_bucket = table.get_bucket(candidate_idx);
let entry_ptr = candidate_bucket.slot.load(Ordering::Acquire, guard);
if !entry_ptr.is_null() {
let entry = unsafe { &*entry_ptr.as_raw() };
let entry_home = table.bucket_index(entry.hash);
if entry_home <= candidate_idx && current_empty < entry_home + NEIGHBORHOOD_SIZE
{
let new_entry = Box::into_raw(Box::new(entry.clone()));
let empty_bucket = table.get_bucket(current_empty);
match empty_bucket.slot.compare_exchange(
unsafe { Shared::from_raw(core::ptr::null_mut()) },
unsafe { Shared::from_raw(new_entry) },
Ordering::Release,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
match candidate_bucket.slot.compare_exchange(
entry_ptr,
unsafe { Shared::from_raw(core::ptr::null_mut()) },
Ordering::Release,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
let old_offset = candidate_idx - entry_home;
let new_offset = current_empty - entry_home;
let home_bucket = table.get_bucket(entry_home);
home_bucket
.hop_info
.fetch_and(!(1u32 << old_offset), Ordering::Release);
home_bucket
.hop_info
.fetch_or(1u32 << new_offset, Ordering::Release);
unsafe { retire(entry_ptr.as_raw()) };
current_empty = candidate_idx;
moved = true;
break;
}
Err(_) => {
match empty_bucket.slot.compare_exchange(
unsafe { Shared::from_raw(new_entry) },
unsafe { Shared::from_raw(core::ptr::null_mut()) },
Ordering::Release,
Ordering::Relaxed,
guard,
) {
Ok(_) => {
unsafe { drop(Box::from_raw(new_entry)) };
}
Err(_) => {
unsafe { retire(new_entry) };
}
}
continue;
}
}
}
Err(_) => {
unsafe { drop(Box::from_raw(new_entry)) };
continue;
}
}
}
}
}
if !moved {
return None;
}
}
if current_empty >= target_idx && current_empty < target_idx + NEIGHBORHOOD_SIZE {
Some(current_empty - target_idx)
} else {
None
}
}
fn insert_into_new_table(
&self,
table: &Table<K, V>,
hash: u64,
key: K,
value: V,
guard: &kovan::Guard,
) -> bool {
let bucket_idx = table.bucket_index(hash);
for probe_offset in 0..(table.capacity + NEIGHBORHOOD_SIZE) {
let probe_idx = bucket_idx + probe_offset;
if probe_idx >= table.capacity + NEIGHBORHOOD_SIZE {
break;
}
let probe_bucket = table.get_bucket(probe_idx);
let slot_ptr = probe_bucket.slot.load(Ordering::Relaxed, guard);
if slot_ptr.is_null() {
let offset_from_home = probe_idx - bucket_idx;
if offset_from_home < NEIGHBORHOOD_SIZE {
let new_entry = Box::into_raw(Box::new(Entry {
retired: RetiredNode::new(),
hash,
key,
value,
}));
probe_bucket
.slot
.store(unsafe { Shared::from_raw(new_entry) }, Ordering::Release);
let bucket = table.get_bucket(bucket_idx);
bucket
.hop_info
.fetch_or(1u32 << offset_from_home, Ordering::Relaxed);
return true;
} else {
return false;
}
}
}
false
}
fn try_resize(&self, new_capacity: usize) {
if self
.resizing
.compare_exchange(false, true, Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
return;
}
let new_capacity = new_capacity.next_power_of_two().max(MIN_CAPACITY);
let guard = pin();
let old_table_ptr = self.table.load(Ordering::Acquire, &guard);
let old_table = unsafe { &*old_table_ptr.as_raw() };
if old_table.capacity == new_capacity {
self.resizing.store(false, Ordering::Release);
return;
}
let new_table = Box::into_raw(Box::new(Table::new(new_capacity)));
let new_table_ref = unsafe { &*new_table };
let mut success = true;
for i in 0..(old_table.capacity + NEIGHBORHOOD_SIZE) {
let bucket = old_table.get_bucket(i);
let entry_ptr = bucket.slot.load(Ordering::Acquire, &guard);
if !entry_ptr.is_null() {
let entry = unsafe { &*entry_ptr.as_raw() };
if !self.insert_into_new_table(
new_table_ref,
entry.hash,
entry.key.clone(),
entry.value.clone(),
&guard,
) {
success = false;
break;
}
}
}
if success {
match self.table.compare_exchange(
old_table_ptr,
unsafe { Shared::from_raw(new_table) },
Ordering::Release,
Ordering::Relaxed,
&guard,
) {
Ok(_) => {
unsafe { retire(old_table_ptr.as_raw()) };
}
Err(_) => {
success = false;
}
}
}
if !success {
unsafe {
drop(Box::from_raw(new_table));
}
}
self.resizing.store(false, Ordering::Release);
}
pub fn iter(&self) -> HopscotchIter<'_, K, V, S> {
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
let _table = unsafe { &*table_ptr.as_raw() };
HopscotchIter {
map: self,
bucket_idx: 0,
guard,
}
}
pub fn keys(&self) -> HopscotchKeys<'_, K, V, S> {
HopscotchKeys { iter: self.iter() }
}
pub fn values(&self) -> HopscotchValues<'_, K, V, S> {
HopscotchValues { iter: self.iter() }
}
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + Eq + ?Sized,
{
self.get(key).is_some()
}
pub fn extend<I: IntoIterator<Item = (K, V)>>(&self, iter: I) {
for (k, v) in iter {
self.insert(k, v);
}
}
pub fn hasher(&self) -> &S {
&self.hasher
}
}
pub struct HopscotchIter<'a, K: 'static, V: 'static, S> {
map: &'a HopscotchMap<K, V, S>,
bucket_idx: usize,
guard: kovan::Guard,
}
impl<'a, K, V, S> Iterator for HopscotchIter<'a, K, V, S>
where
K: Clone,
V: Clone,
{
type Item = (K, V);
fn next(&mut self) -> Option<Self::Item> {
let table_ptr = self.map.table.load(Ordering::Acquire, &self.guard);
let table = unsafe { &*table_ptr.as_raw() };
while self.bucket_idx < table.buckets.len() {
let bucket = table.get_bucket(self.bucket_idx);
self.bucket_idx += 1;
let entry_ptr = bucket.slot.load(Ordering::Acquire, &self.guard);
if !entry_ptr.is_null() {
let entry = unsafe { &*entry_ptr.as_raw() };
return Some((entry.key.clone(), entry.value.clone()));
}
}
None
}
}
pub struct HopscotchKeys<'a, K: 'static, V: 'static, S> {
iter: HopscotchIter<'a, K, V, S>,
}
impl<'a, K, V, S> Iterator for HopscotchKeys<'a, K, V, S>
where
K: Clone,
V: Clone,
{
type Item = K;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next().map(|(k, _)| k)
}
}
pub struct HopscotchValues<'a, K: 'static, V: 'static, S> {
iter: HopscotchIter<'a, K, V, S>,
}
impl<'a, K, V, S> Iterator for HopscotchValues<'a, K, V, S>
where
K: Clone,
V: Clone,
{
type Item = V;
#[inline]
fn next(&mut self) -> Option<V> {
self.iter.next().map(|(_, v)| v)
}
}
pub struct HopscotchIntoIter<K: 'static, V: 'static> {
table: *mut Table<K, V>,
bucket_idx: usize,
guard: kovan::Guard,
}
impl<K, V> Iterator for HopscotchIntoIter<K, V> {
type Item = (K, V);
fn next(&mut self) -> Option<(K, V)> {
let table = unsafe { &*self.table };
while self.bucket_idx < table.buckets.len() {
let bucket = table.get_bucket(self.bucket_idx);
self.bucket_idx += 1;
let entry = bucket.slot.load(Ordering::Acquire, &self.guard).as_raw();
if !entry.is_null() {
bucket.slot.store(
unsafe { Shared::from_raw(core::ptr::null_mut()) },
Ordering::Relaxed,
);
let k = unsafe { core::ptr::read(&(*entry).key) };
let v = unsafe { core::ptr::read(&(*entry).value) };
unsafe {
alloc::alloc::dealloc(
entry as *mut u8,
core::alloc::Layout::new::<Entry<K, V>>(),
);
}
return Some((k, v));
}
}
None
}
}
impl<K, V> Drop for HopscotchIntoIter<K, V> {
fn drop(&mut self) {
while self.next().is_some() {}
unsafe { drop(Box::from_raw(self.table)) };
}
}
impl<K, V, S> IntoIterator for HopscotchMap<K, V, S>
where
K: 'static,
V: 'static,
{
type Item = (K, V);
type IntoIter = HopscotchIntoIter<K, V>;
fn into_iter(self) -> HopscotchIntoIter<K, V> {
let mut me = core::mem::ManuallyDrop::new(self);
let guard = pin();
let table = me.table.load(Ordering::Relaxed, &guard).as_raw();
unsafe { core::ptr::drop_in_place(&mut me.hasher) };
HopscotchIntoIter {
table,
bucket_idx: 0,
guard,
}
}
}
impl<K, V, S> core::iter::FromIterator<(K, V)> for HopscotchMap<K, V, S>
where
K: Hash + Eq + Clone + Send + 'static,
V: Clone + Send + 'static,
S: BuildHasher + Default,
{
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
let map = Self::with_hasher(S::default());
for (k, v) in iter {
map.insert(k, v);
}
map
}
}
impl<'a, K, V, S> IntoIterator for &'a HopscotchMap<K, V, S>
where
K: Hash + Eq + Clone + 'static,
V: Clone + 'static,
S: BuildHasher,
{
type Item = (K, V);
type IntoIter = HopscotchIter<'a, K, V, S>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
enum InsertResult<V> {
Success(Option<V>),
Exists(V),
NeedResize,
Retry,
}
#[cfg(feature = "std")]
impl<K, V> Default for HopscotchMap<K, V, FixedState>
where
K: Hash + Eq + Clone + 'static,
V: Clone + 'static,
{
fn default() -> Self {
Self::new()
}
}
unsafe impl<K: Send, V: Send, S: Send> Send for HopscotchMap<K, V, S> {}
unsafe impl<K: Send + Sync, V: Send + Sync, S: Send + Sync> Sync for HopscotchMap<K, V, S> {}
impl<K, V, S> Drop for HopscotchMap<K, V, S> {
fn drop(&mut self) {
let guard = pin();
let table_ptr = self.table.load(Ordering::Acquire, &guard);
unsafe {
drop(Box::from_raw(table_ptr.as_raw()));
}
drop(guard);
kovan::flush();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_insert_and_get() {
let map = HopscotchMap::new();
assert_eq!(map.insert(1, 100), None);
assert_eq!(map.get(&1), Some(100));
assert_eq!(map.get(&2), None);
}
#[test]
fn test_growing() {
let map = HopscotchMap::with_capacity(32);
for i in 0..100 {
map.insert(i, i * 2);
}
for i in 0..100 {
assert_eq!(map.get(&i), Some(i * 2));
}
}
#[test]
fn test_concurrent() {
use alloc::sync::Arc;
extern crate std;
use std::thread;
let map = Arc::new(HopscotchMap::with_capacity(64));
let mut handles = alloc::vec::Vec::new();
for thread_id in 0..4 {
let map_clone = Arc::clone(&map);
let handle = thread::spawn(move || {
for i in 0..1000 {
let key = thread_id * 1000 + i;
map_clone.insert(key, key * 2);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
for thread_id in 0..4 {
for i in 0..1000 {
let key = thread_id * 1000 + i;
assert_eq!(map.get(&key), Some(key * 2));
}
}
}
#[test]
fn test_concurrent_insert_and_remove() {
use alloc::sync::Arc;
extern crate std;
use std::thread;
let map = Arc::new(HopscotchMap::with_capacity(64));
for thread_id in 0..4u64 {
for i in 0..500u64 {
let key = thread_id * 1000 + i;
map.insert(key, key * 3);
}
}
let mut insert_handles = alloc::vec::Vec::new();
let mut remove_handles = alloc::vec::Vec::new();
for thread_id in 0..4u64 {
let map_clone = Arc::clone(&map);
insert_handles.push(thread::spawn(move || {
for i in 0..500u64 {
let key = thread_id * 1000 + i;
map_clone.insert(key, key * 3);
}
}));
}
for thread_id in 0..4u64 {
let map_clone = Arc::clone(&map);
remove_handles.push(thread::spawn(move || {
for i in 0..500u64 {
let key = thread_id * 1000 + i;
if let Some(val) = map_clone.remove(&key) {
assert_eq!(val, key * 3);
}
}
}));
}
for handle in insert_handles {
handle.join().unwrap();
}
for handle in remove_handles {
handle.join().unwrap();
}
for thread_id in 0..4u64 {
for i in 0..500u64 {
let key = thread_id * 1000 + i;
if let Some(val) = map.get(&key) {
assert_eq!(val, key * 3);
}
}
}
}
#[test]
fn test_hopscotch_get_or_insert_concurrent_remove() {
use alloc::sync::Arc;
extern crate std;
use std::sync::Barrier;
use std::thread;
let map = Arc::new(HopscotchMap::<u64, u64>::with_capacity(64));
let barrier = Arc::new(Barrier::new(8));
let handles: Vec<_> = (0..8u64)
.map(|tid| {
let map = map.clone();
let barrier = barrier.clone();
thread::spawn(move || {
barrier.wait();
for i in 0..5000u64 {
let key = i % 32; if tid % 2 == 0 {
let _ = map.get_or_insert(key, tid * 1000 + i);
} else {
let _ = map.remove(&key);
}
}
})
})
.collect();
for h in handles {
h.join()
.expect("Thread panicked during get_or_insert/remove race");
}
}
}