use crate::cht::map::{
bucket::{self, BucketArray},
bucket_array_ref::BucketArrayRef,
DefaultHashBuilder,
};
use std::{
borrow::Borrow,
hash::{BuildHasher, Hash},
ptr,
sync::atomic::{self, AtomicUsize, Ordering},
};
use crossbeam_epoch::Atomic;
pub(crate) struct HashMap<K, V, S = DefaultHashBuilder> {
segments: Box<[Segment<K, V>]>,
build_hasher: S,
len: AtomicUsize,
segment_shift: u32,
}
#[cfg(test)]
impl<K, V> HashMap<K, V, DefaultHashBuilder> {
pub fn with_capacity(capacity: usize) -> Self {
Self::with_num_segments_capacity_and_hasher(
default_num_segments(),
capacity,
DefaultHashBuilder::default(),
)
}
}
impl<K, V, S> HashMap<K, V, S> {
pub(crate) fn with_num_segments_and_hasher(num_segments: usize, build_hasher: S) -> Self {
Self::with_num_segments_capacity_and_hasher(num_segments, 0, build_hasher)
}
pub(crate) fn with_num_segments_capacity_and_hasher(
num_segments: usize,
capacity: usize,
build_hasher: S,
) -> Self {
assert!(num_segments > 0);
let actual_num_segments = num_segments.next_power_of_two();
let segment_shift = 64 - actual_num_segments.trailing_zeros();
let mut segments = Vec::with_capacity(actual_num_segments);
if capacity == 0 {
unsafe {
ptr::write_bytes(segments.as_mut_ptr(), 0, actual_num_segments);
segments.set_len(actual_num_segments);
}
} else {
let actual_capacity = (capacity * 2 / actual_num_segments).next_power_of_two();
for _ in 0..actual_num_segments {
segments.push(Segment {
bucket_array: Atomic::new(BucketArray::with_length(0, actual_capacity)),
len: AtomicUsize::new(0),
});
}
}
let segments = segments.into_boxed_slice();
Self {
segments,
build_hasher,
len: AtomicUsize::new(0),
segment_shift,
}
}
pub(crate) fn actual_num_segments(&self) -> usize {
self.segments.len()
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.len.load(Ordering::Relaxed)
}
#[cfg(test)]
pub(crate) fn is_empty(&self) -> bool {
self.len() == 0
}
#[cfg(any(test, feature = "unstable-debug-counters"))]
pub(crate) fn capacity(&self) -> usize {
let guard = &crossbeam_epoch::pin();
self.segments
.iter()
.map(|s| s.bucket_array.load_consume(guard))
.map(|p| unsafe { p.as_ref() })
.map(|a| a.map(BucketArray::capacity).unwrap_or(0))
.sum::<usize>()
}
#[cfg(test)]
pub(crate) fn num_segments(&self) -> usize {
self.segments.len()
}
}
impl<K: Hash + Eq, V, S: BuildHasher> HashMap<K, V, S> {
#[inline]
pub(crate) fn get(&self, hash: u64, eq: impl FnMut(&K) -> bool) -> Option<V>
where
V: Clone,
{
self.get_key_value_and(hash, eq, |_, v| v.clone())
}
#[inline]
pub(crate) fn get_key_value_and<T>(
&self,
hash: u64,
eq: impl FnMut(&K) -> bool,
with_entry: impl FnOnce(&K, &V) -> T,
) -> Option<T> {
self.get_key_value_and_then(hash, eq, |k, v| Some(with_entry(k, v)))
}
#[inline]
pub(crate) fn get_key_value_and_then<T>(
&self,
hash: u64,
eq: impl FnMut(&K) -> bool,
with_entry: impl FnOnce(&K, &V) -> Option<T>,
) -> Option<T> {
self.bucket_array_ref(hash)
.get_key_value_and_then(hash, eq, with_entry)
}
#[cfg(test)]
#[inline]
pub fn insert_entry_and<T>(
&self,
key: K,
hash: u64,
value: V,
with_previous_entry: impl FnOnce(&K, &V) -> T,
) -> Option<T>
where
V: Clone,
{
let result = self
.bucket_array_ref(hash)
.insert_with_or_modify_entry_and(
key,
hash,
|| value,
|_k, v| v.clone(),
with_previous_entry,
);
if result.is_none() {
self.len.fetch_add(1, Ordering::Relaxed);
}
result
}
#[inline]
pub(crate) fn remove(&self, hash: u64, eq: impl FnMut(&K) -> bool) -> Option<V>
where
V: Clone,
{
self.remove_entry_if_and(hash, eq, |_, _| true, |_, v| v.clone())
}
#[inline]
pub(crate) fn remove_entry(&self, hash: u64, eq: impl FnMut(&K) -> bool) -> Option<(K, V)>
where
K: Clone,
V: Clone,
{
self.remove_entry_if_and(hash, eq, |_, _| true, |k, v| (k.clone(), v.clone()))
}
pub(crate) fn remove_if(
&self,
hash: u64,
eq: impl FnMut(&K) -> bool,
condition: impl FnMut(&K, &V) -> bool,
) -> Option<V>
where
V: Clone,
{
self.remove_entry_if_and(hash, eq, condition, move |_, v| v.clone())
}
#[inline]
pub(crate) fn remove_entry_if_and<T>(
&self,
hash: u64,
eq: impl FnMut(&K) -> bool,
condition: impl FnMut(&K, &V) -> bool,
with_previous_entry: impl FnOnce(&K, &V) -> T,
) -> Option<T> {
self.bucket_array_ref(hash)
.remove_entry_if_and(hash, eq, condition, move |k, v| {
self.len.fetch_sub(1, Ordering::Relaxed);
with_previous_entry(k, v)
})
}
#[inline]
pub(crate) fn insert_with_or_modify(
&self,
key: K,
hash: u64,
on_insert: impl FnOnce() -> V,
on_modify: impl FnMut(&K, &V) -> V,
) -> Option<V>
where
V: Clone,
{
self.insert_with_or_modify_entry_and(key, hash, on_insert, on_modify, |_, v| v.clone())
}
#[inline]
pub(crate) fn insert_with_or_modify_entry_and<T>(
&self,
key: K,
hash: u64,
on_insert: impl FnOnce() -> V,
on_modify: impl FnMut(&K, &V) -> V,
with_old_entry: impl FnOnce(&K, &V) -> T,
) -> Option<T> {
let result = self.bucket_array_ref(hash).insert_with_or_modify_entry_and(
key,
hash,
on_insert,
on_modify,
with_old_entry,
);
if result.is_none() {
self.len.fetch_add(1, Ordering::Relaxed);
}
result
}
#[inline]
pub(crate) fn insert_if_not_present(&self, key: K, hash: u64, value: V) -> Option<V>
where
V: Clone,
{
let result = self.bucket_array_ref(hash).insert_if_not_present_and(
key,
hash,
|| value,
|_, v| v.clone(),
);
if result.is_none() {
self.len.fetch_add(1, Ordering::Relaxed);
}
result
}
pub(crate) fn keys<T>(&self, segment: usize, with_key: impl FnMut(&K) -> T) -> Option<Vec<T>> {
if segment >= self.segments.len() {
return None;
}
let Segment {
ref bucket_array,
ref len,
} = self.segments[segment];
let bucket_array_ref = BucketArrayRef {
bucket_array,
build_hasher: &self.build_hasher,
len,
};
Some(bucket_array_ref.keys(with_key))
}
#[inline]
pub(crate) fn hash<Q>(&self, key: &Q) -> u64
where
Q: Hash + Eq + ?Sized,
K: Borrow<Q>,
{
bucket::hash(&self.build_hasher, key)
}
}
impl<K, V, S> Drop for HashMap<K, V, S> {
fn drop(&mut self) {
let guard = unsafe { &crossbeam_epoch::unprotected() };
atomic::fence(Ordering::Acquire);
for Segment {
bucket_array: this_bucket_array,
..
} in self.segments.iter()
{
let mut current_ptr = this_bucket_array.load(Ordering::Relaxed, guard);
while let Some(current_ref) = unsafe { current_ptr.as_ref() } {
let next_ptr = current_ref.next.load(Ordering::Relaxed, guard);
for this_bucket_ptr in current_ref
.buckets
.iter()
.map(|b| b.load(Ordering::Relaxed, guard))
.filter(|p| !p.is_null())
{
if bucket::is_tombstone(this_bucket_ptr) {
if next_ptr.is_null() {
unsafe { bucket::defer_acquire_destroy(guard, this_bucket_ptr) };
}
} else {
unsafe { bucket::defer_destroy_bucket(guard, this_bucket_ptr) };
}
}
unsafe { bucket::defer_acquire_destroy(guard, current_ptr) };
current_ptr = next_ptr;
}
}
}
}
impl<K, V, S> HashMap<K, V, S> {
#[inline]
fn bucket_array_ref(&'_ self, hash: u64) -> BucketArrayRef<'_, K, V, S> {
let index = self.segment_index_from_hash(hash);
let Segment {
ref bucket_array,
ref len,
} = self.segments[index];
BucketArrayRef {
bucket_array,
build_hasher: &self.build_hasher,
len,
}
}
#[inline]
fn segment_index_from_hash(&'_ self, hash: u64) -> usize {
if self.segment_shift == 64 {
0
} else {
(hash >> self.segment_shift) as usize
}
}
}
struct Segment<K, V> {
bucket_array: Atomic<BucketArray<K, V>>,
len: AtomicUsize,
}
#[cfg(test)]
fn default_num_segments() -> usize {
num_cpus::get().max(1) * 2
}
#[cfg(test)]
mod tests {
use std::{
collections::BTreeMap,
sync::{Arc, Barrier},
thread::{spawn, JoinHandle},
};
use super::*;
use crate::cht::test_util::{run_deferred, DropNotifier, NoisyDropper};
#[test]
fn single_segment() {
let map =
HashMap::with_num_segments_capacity_and_hasher(1, 0, DefaultHashBuilder::default());
assert!(map.is_empty());
assert_eq!(map.len(), 0);
let key = "key1";
let hash = map.hash(key);
assert_eq!(map.insert_entry_and(key, hash, 5, |_, v| *v), None);
assert_eq!(map.get(hash, |k| k == &key), Some(5));
assert!(!map.is_empty());
assert_eq!(map.len(), 1);
assert_eq!(map.remove(hash, |k| k == &key), Some(5));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
run_deferred();
}
#[test]
fn insert_if_not_present() {
let map =
HashMap::with_num_segments_capacity_and_hasher(1, 0, DefaultHashBuilder::default());
let key = "key1";
let hash = map.hash(key);
assert_eq!(map.insert_if_not_present(key, hash, 5), None);
assert_eq!(map.get(hash, |k| k == &key), Some(5));
assert_eq!(map.insert_if_not_present(key, hash, 6), Some(5));
assert_eq!(map.get(hash, |k| k == &key), Some(5));
assert_eq!(map.remove(hash, |k| k == &key), Some(5));
assert_eq!(map.insert_if_not_present(key, hash, 7), None);
assert_eq!(map.get(hash, |k| k == &key), Some(7));
assert_eq!(map.remove(hash, |k| k == &key), Some(7));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_insert_if_not_present() {
const NUM_THREADS: usize = 64;
const MAX_VALUE: usize = 512;
let hashmap = Arc::new(HashMap::with_capacity(0));
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|thread_id| {
let hashmap = Arc::clone(&hashmap);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
let mut success_count = 0usize;
for key in 0..MAX_VALUE {
let hash = hashmap.hash(&key);
let result = hashmap.insert_if_not_present(key, hash, thread_id);
if result.is_none() {
success_count += 1;
}
}
(thread_id, success_count)
})
})
.collect();
let results1 = threads
.into_iter()
.map(JoinHandle::join)
.collect::<Result<BTreeMap<_, _>, _>>()
.expect("Got an error from a thread");
assert_eq!(hashmap.len(), MAX_VALUE);
let sum_of_insertions: usize = results1.values().sum();
assert_eq!(sum_of_insertions, MAX_VALUE);
let mut results2 = (0..NUM_THREADS)
.map(|thread_id| (thread_id, 0usize))
.collect::<BTreeMap<_, _>>();
for key in 0..MAX_VALUE {
let hash = hashmap.hash(&key);
if let Some(thread_id) = hashmap.get(hash, |&k| k == key) {
let count = results2.get_mut(&thread_id).unwrap();
*count += 1;
}
}
assert_eq!(results1, results2);
run_deferred();
}
#[test]
fn insertion() {
const MAX_VALUE: i32 = 512;
let map = HashMap::with_capacity(MAX_VALUE as usize);
for i in 0..MAX_VALUE {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
assert!(!map.is_empty());
assert_eq!(map.len(), (i + 1) as usize);
for j in 0..=i {
let hash = map.hash(&j);
assert_eq!(map.get(hash, |&k| k == j), Some(j));
assert_eq!(map.insert_entry_and(j, hash, j, |_, v| *v), Some(j));
}
for l in i + 1..MAX_VALUE {
assert_eq!(map.get(map.hash(&l), |&k| k == l), None);
}
}
run_deferred();
}
#[test]
fn growth() {
const MAX_VALUE: i32 = 512;
let map = HashMap::with_capacity(0);
for i in 0..MAX_VALUE {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
assert!(!map.is_empty());
assert_eq!(map.len(), (i + 1) as usize);
for j in 0..=i {
let hash = map.hash(&j);
assert_eq!(map.get(hash, |&k| k == j), Some(j));
assert_eq!(map.insert_entry_and(j, hash, j, |_, v| *v), Some(j));
}
for l in i + 1..MAX_VALUE {
assert_eq!(map.get(map.hash(&l), |&k| k == l), None);
}
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_insertion() {
const MAX_VALUE: i32 = 512;
const NUM_THREADS: usize = 64;
const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE;
let map = Arc::new(HashMap::with_capacity(MAX_INSERTED_VALUE as usize));
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) {
assert_eq!(map.insert_entry_and(j, map.hash(&j), j, |_, v| *v), None);
}
})
})
.collect();
for result in threads.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), MAX_INSERTED_VALUE as usize);
for i in 0..MAX_INSERTED_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_growth() {
const MAX_VALUE: i32 = 512;
const NUM_THREADS: usize = 64;
const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE;
let map = Arc::new(HashMap::with_capacity(0));
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) {
assert_eq!(map.insert_entry_and(j, map.hash(&j), j, |_, v| *v), None);
}
})
})
.collect();
for result in threads.into_iter().map(|t| t.join()) {
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), MAX_INSERTED_VALUE as usize);
for i in 0..MAX_INSERTED_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
run_deferred();
}
#[test]
fn removal() {
const MAX_VALUE: i32 = 512;
let map = HashMap::with_capacity(MAX_VALUE as usize);
for i in 0..MAX_VALUE {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
}
for i in 0..MAX_VALUE {
assert_eq!(map.remove(map.hash(&i), |&k| k == i), Some(i));
}
assert!(map.is_empty());
assert_eq!(map.len(), 0);
for i in 0..MAX_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), None);
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_removal() {
const MAX_VALUE: i32 = 512;
const NUM_THREADS: usize = 64;
const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE;
let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize);
for i in 0..MAX_INSERTED_VALUE {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
}
let map = Arc::new(map);
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) {
assert_eq!(map.remove(map.hash(&j), |&k| k == j), Some(j));
}
})
})
.collect();
for result in threads.into_iter().map(|t| t.join()) {
assert!(result.is_ok());
}
assert_eq!(map.len(), 0);
for i in 0..MAX_INSERTED_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), None);
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_insertion_and_removal() {
const MAX_VALUE: i32 = 512;
const NUM_THREADS: usize = 64;
const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2;
const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2;
let map = HashMap::with_capacity(MAX_INSERTED_VALUE as usize);
for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
}
let map = Arc::new(map);
let barrier = Arc::new(Barrier::new(NUM_THREADS * 2));
#[allow(clippy::needless_collect)]
let insert_threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) {
assert_eq!(map.insert_entry_and(j, map.hash(&j), j, |_, v| *v), None);
}
})
})
.collect();
#[allow(clippy::needless_collect)]
let remove_threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE))
{
assert_eq!(map.remove(map.hash(&j), |&k| k == j), Some(j));
}
})
})
.collect();
for result in insert_threads
.into_iter()
.chain(remove_threads.into_iter())
.map(|t| t.join())
{
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), INSERTED_MIDPOINT as usize);
for i in 0..INSERTED_MIDPOINT {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), None);
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_growth_and_removal() {
const MAX_VALUE: i32 = 512;
const NUM_THREADS: usize = 64;
const MAX_INSERTED_VALUE: i32 = (NUM_THREADS as i32) * MAX_VALUE * 2;
const INSERTED_MIDPOINT: i32 = MAX_INSERTED_VALUE / 2;
let map = HashMap::with_capacity(INSERTED_MIDPOINT as usize);
for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
}
let map = Arc::new(map);
let barrier = Arc::new(Barrier::new(NUM_THREADS * 2));
#[allow(clippy::needless_collect)]
let insert_threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| j + (i as i32 * MAX_VALUE)) {
assert_eq!(map.insert_entry_and(j, map.hash(&j), j, |_, v| *v), None);
}
})
})
.collect();
#[allow(clippy::needless_collect)]
let remove_threads: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in (0..MAX_VALUE).map(|j| INSERTED_MIDPOINT + j + (i as i32 * MAX_VALUE))
{
assert_eq!(map.remove(map.hash(&j), |&k| k == j), Some(j));
}
})
})
.collect();
for result in insert_threads
.into_iter()
.chain(remove_threads.into_iter())
.map(JoinHandle::join)
{
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), INSERTED_MIDPOINT as usize);
for i in 0..INSERTED_MIDPOINT {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
for i in INSERTED_MIDPOINT..MAX_INSERTED_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), None);
}
run_deferred();
}
#[test]
fn insert_with_or_modify() {
let map = HashMap::with_capacity(0);
let key = "key1";
let hash = map.hash(&key);
assert_eq!(
map.insert_with_or_modify(key, hash, || 1, |_, x| x + 1),
None
);
assert_eq!(map.get(hash, |&k| k == key), Some(1));
assert_eq!(
map.insert_with_or_modify(key, hash, || 1, |_, x| x + 1),
Some(1)
);
assert_eq!(map.get(hash, |&k| k == key), Some(2));
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_insert_with_or_modify() {
const NUM_THREADS: usize = 64;
const MAX_VALUE: i32 = 512;
let map = Arc::new(HashMap::with_capacity(0));
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in 0..MAX_VALUE {
map.insert_with_or_modify(j, map.hash(&j), || 1, |_, x| x + 1);
}
})
})
.collect();
for result in threads.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert_eq!(map.len(), MAX_VALUE as usize);
for i in 0..MAX_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(NUM_THREADS as i32));
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_overlapped_insertion() {
const NUM_THREADS: usize = 64;
const MAX_VALUE: i32 = 512;
let map = Arc::new(HashMap::with_capacity(MAX_VALUE as usize));
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in 0..MAX_VALUE {
map.insert_entry_and(j, map.hash(&j), j, |_, v| *v);
}
})
})
.collect();
for result in threads.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert_eq!(map.len(), MAX_VALUE as usize);
for i in 0..MAX_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
run_deferred();
}
#[cfg_attr(any(armv5te, mips), ignore)]
#[test]
fn concurrent_overlapped_growth() {
const NUM_THREADS: usize = 64;
const MAX_VALUE: i32 = 512;
let map = Arc::new(HashMap::with_capacity(1));
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in 0..MAX_VALUE {
map.insert_entry_and(j, map.hash(&j), j, |_, v| *v);
}
})
})
.collect();
for result in threads.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert_eq!(map.len(), MAX_VALUE as usize);
for i in 0..MAX_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
run_deferred();
}
#[cfg_attr(mips, ignore)]
#[test]
fn concurrent_overlapped_removal() {
const NUM_THREADS: usize = 64;
const MAX_VALUE: i32 = 512;
let map = HashMap::with_capacity(MAX_VALUE as usize);
for i in 0..MAX_VALUE {
map.insert_entry_and(i, map.hash(&i), i, |_, v| *v);
}
let map = Arc::new(map);
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let threads: Vec<_> = (0..NUM_THREADS)
.map(|_| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in 0..MAX_VALUE {
let prev_value = map.remove(map.hash(&j), |&k| k == j);
if let Some(v) = prev_value {
assert_eq!(v, j);
}
}
})
})
.collect();
for result in threads.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert!(map.is_empty());
assert_eq!(map.len(), 0);
for i in 0..MAX_VALUE {
assert_eq!(map.get(map.hash(&i), |&k| k == i), None);
}
run_deferred();
}
#[test]
fn drop_value() {
let key_parent = Arc::new(DropNotifier::new());
let value_parent = Arc::new(DropNotifier::new());
{
let map = HashMap::with_capacity(0);
let hash = map.hash(&0);
assert_eq!(
map.insert_entry_and(
NoisyDropper::new(Arc::clone(&key_parent), 0),
hash,
NoisyDropper::new(Arc::clone(&value_parent), 0),
|_, _| ()
),
None
);
assert!(!map.is_empty());
assert_eq!(map.len(), 1);
map.get_key_value_and(hash, |k| k == &0, |_k, v| assert_eq!(v, &0));
map.remove_entry_if_and(hash, |k| k == &0, |_, _| true, |_k, v| assert_eq!(v, &0));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
assert_eq!(map.get_key_value_and(hash, |k| k == &0, |_, _| ()), None);
run_deferred();
assert!(!key_parent.was_dropped());
assert!(value_parent.was_dropped());
}
run_deferred();
assert!(key_parent.was_dropped());
assert!(value_parent.was_dropped());
}
#[test]
fn drop_many_values() {
const NUM_VALUES: usize = 1 << 16;
let key_parents: Vec<_> = std::iter::repeat_with(|| Arc::new(DropNotifier::new()))
.take(NUM_VALUES)
.collect();
let value_parents: Vec<_> = std::iter::repeat_with(|| Arc::new(DropNotifier::new()))
.take(NUM_VALUES)
.collect();
{
let map = HashMap::with_capacity(0);
assert!(map.is_empty());
assert_eq!(map.len(), 0);
for (i, (this_key_parent, this_value_parent)) in
key_parents.iter().zip(value_parents.iter()).enumerate()
{
assert_eq!(
map.insert_entry_and(
NoisyDropper::new(Arc::clone(this_key_parent), i),
map.hash(&i),
NoisyDropper::new(Arc::clone(this_value_parent), i),
|_, _| ()
),
None
);
assert!(!map.is_empty());
assert_eq!(map.len(), i + 1);
}
for i in 0..NUM_VALUES {
assert_eq!(
map.get_key_value_and(
map.hash(&i),
|k| k == &i,
|k, v| {
assert_eq!(**k, i);
assert_eq!(*v, i);
}
),
Some(())
);
}
for i in 0..NUM_VALUES {
assert_eq!(
map.remove_entry_if_and(
map.hash(&i),
|k| k == &i,
|_, _| true,
|k, v| {
assert_eq!(**k, i);
assert_eq!(*v, i);
}
),
Some(())
);
}
assert!(map.is_empty());
assert_eq!(map.len(), 0);
run_deferred();
let live_key_count =
NUM_VALUES - key_parents.iter().filter(|k| k.was_dropped()).count();
let bucket_array_len = map.capacity() * 2;
assert_eq!(bucket_array_len, map.num_segments() * 128 * 2);
if !cfg!(circleci) {
assert!(live_key_count <= bucket_array_len / 10);
for this_value_parent in value_parents.iter() {
assert!(this_value_parent.was_dropped());
}
}
for i in 0..NUM_VALUES {
assert_eq!(
map.get_key_value_and(map.hash(&i), |k| k == &i, |_, _| ()),
None
);
}
}
run_deferred();
for this_key_parent in key_parents.into_iter() {
assert!(this_key_parent.was_dropped());
}
for this_value_parent in value_parents.into_iter() {
assert!(this_value_parent.was_dropped());
}
}
#[test]
fn drop_many_values_concurrent() {
const NUM_THREADS: usize = 64;
const NUM_VALUES_PER_THREAD: usize = 512;
const NUM_VALUES: usize = NUM_THREADS * NUM_VALUES_PER_THREAD;
let key_parents: Arc<Vec<_>> = Arc::new(
std::iter::repeat_with(|| Arc::new(DropNotifier::new()))
.take(NUM_VALUES)
.collect(),
);
let value_parents: Arc<Vec<_>> = Arc::new(
std::iter::repeat_with(|| Arc::new(DropNotifier::new()))
.take(NUM_VALUES)
.collect(),
);
{
let map = Arc::new(HashMap::with_capacity(0));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let handles: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
let key_parents = Arc::clone(&key_parents);
let value_parents = Arc::clone(&value_parents);
spawn(move || {
barrier.wait();
let these_key_parents = &key_parents
[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD];
let these_value_parents = &value_parents
[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD];
for (j, (this_key_parent, this_value_parent)) in these_key_parents
.iter()
.zip(these_value_parents.iter())
.enumerate()
{
let key_value = (i * NUM_VALUES_PER_THREAD + j) as i32;
let hash = map.hash(&key_value);
assert_eq!(
map.insert_entry_and(
NoisyDropper::new(Arc::clone(this_key_parent), key_value),
hash,
NoisyDropper::new(Arc::clone(this_value_parent), key_value),
|_, _| ()
),
None
);
}
})
})
.collect();
for result in handles.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), NUM_VALUES);
run_deferred();
for this_key_parent in key_parents.iter() {
assert!(!this_key_parent.was_dropped());
}
for this_value_parent in value_parents.iter() {
assert!(!this_value_parent.was_dropped());
}
for i in (0..NUM_VALUES).map(|i| i as i32) {
assert_eq!(
map.get_key_value_and(
map.hash(&i),
|k| k == &i,
|k, v| {
assert_eq!(**k, i);
assert_eq!(*v, i);
}
),
Some(())
);
}
#[allow(clippy::needless_collect)]
let handles: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in 0..NUM_VALUES_PER_THREAD {
let key_value = (i * NUM_VALUES_PER_THREAD + j) as i32;
assert_eq!(
map.remove_entry_if_and(
map.hash(&key_value),
|k| k == &key_value,
|_, _| true,
|k, v| {
assert_eq!(**k, key_value);
assert_eq!(*v, key_value);
}
),
Some(())
);
}
})
})
.collect();
for result in handles.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert!(map.is_empty());
assert_eq!(map.len(), 0);
run_deferred();
let live_key_count =
NUM_VALUES - key_parents.iter().filter(|k| k.was_dropped()).count();
let bucket_array_len = map.capacity() * 2;
assert_eq!(bucket_array_len, map.num_segments() * 128 * 2);
assert!(live_key_count <= bucket_array_len / 10);
for this_value_parent in value_parents.iter() {
assert!(this_value_parent.was_dropped());
}
for i in (0..NUM_VALUES).map(|i| i as i32) {
assert_eq!(
map.get_key_value_and(map.hash(&i), |k| k == &i, |_, _| ()),
None
);
}
}
run_deferred();
for this_key_parent in key_parents.iter() {
assert!(this_key_parent.was_dropped());
}
for this_value_parent in value_parents.iter() {
assert!(this_value_parent.was_dropped());
}
}
#[test]
fn drop_map_after_concurrent_updates() {
const NUM_THREADS: usize = 64;
const NUM_VALUES_PER_THREAD: usize = 512;
const NUM_VALUES: usize = NUM_THREADS * NUM_VALUES_PER_THREAD;
let key_parents: Arc<Vec<_>> = Arc::new(
std::iter::repeat_with(|| Arc::new(DropNotifier::new()))
.take(NUM_VALUES)
.collect(),
);
let value_parents: Arc<Vec<_>> = Arc::new(
std::iter::repeat_with(|| Arc::new(DropNotifier::new()))
.take(NUM_VALUES)
.collect(),
);
{
let map = Arc::new(HashMap::with_capacity(0));
assert!(map.is_empty());
assert_eq!(map.len(), 0);
let barrier = Arc::new(Barrier::new(NUM_THREADS));
#[allow(clippy::needless_collect)]
let handles: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
let key_parents = Arc::clone(&key_parents);
let value_parents = Arc::clone(&value_parents);
spawn(move || {
barrier.wait();
let these_key_parents = &key_parents
[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD];
let these_value_parents = &value_parents
[i * NUM_VALUES_PER_THREAD..(i + 1) * NUM_VALUES_PER_THREAD];
for (j, (this_key_parent, this_value_parent)) in these_key_parents
.iter()
.zip(these_value_parents.iter())
.enumerate()
{
let key_value = (i * NUM_VALUES_PER_THREAD + j) as i32;
let hash = map.hash(&key_value);
assert_eq!(
map.insert_entry_and(
NoisyDropper::new(Arc::clone(this_key_parent), key_value),
hash,
NoisyDropper::new(Arc::clone(this_value_parent), key_value),
|_, _| ()
),
None
);
}
})
})
.collect();
for result in handles.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), NUM_VALUES);
run_deferred();
for this_key_parent in key_parents.iter() {
assert!(!this_key_parent.was_dropped());
}
for this_value_parent in value_parents.iter() {
assert!(!this_value_parent.was_dropped());
}
for i in (0..NUM_VALUES).map(|i| i as i32) {
assert_eq!(
map.get_key_value_and(
map.hash(&i),
|k| k == &i,
|k, v| {
assert_eq!(**k, i);
assert_eq!(*v, i);
}
),
Some(())
);
}
#[allow(clippy::needless_collect)]
let handles: Vec<_> = (0..NUM_THREADS)
.map(|i| {
let map = Arc::clone(&map);
let barrier = Arc::clone(&barrier);
spawn(move || {
barrier.wait();
for j in 0..NUM_VALUES_PER_THREAD {
let key_value = (i * NUM_VALUES_PER_THREAD + j) as i32;
if key_value % 4 == 0 {
assert_eq!(
map.remove_entry_if_and(
map.hash(&key_value),
|k| k == &key_value,
|_, _| true,
|k, v| {
assert_eq!(**k, key_value);
assert_eq!(*v, key_value);
}
),
Some(())
);
}
}
})
})
.collect();
for result in handles.into_iter().map(JoinHandle::join) {
assert!(result.is_ok());
}
assert!(!map.is_empty());
assert_eq!(map.len(), NUM_VALUES / 4 * 3);
}
run_deferred();
for this_key_parent in key_parents.iter() {
assert!(this_key_parent.was_dropped());
}
for this_value_parent in value_parents.iter() {
assert!(this_value_parent.was_dropped());
}
}
#[test]
fn remove_if() {
const NUM_VALUES: i32 = 512;
let is_even = |_: &i32, v: &i32| *v % 2 == 0;
let map = HashMap::with_capacity(0);
for i in 0..NUM_VALUES {
assert_eq!(map.insert_entry_and(i, map.hash(&i), i, |_, v| *v), None);
}
for i in 0..NUM_VALUES {
if is_even(&i, &i) {
assert_eq!(map.remove_if(map.hash(&i), |&k| k == i, is_even), Some(i));
} else {
assert_eq!(map.remove_if(map.hash(&i), |&k| k == i, is_even), None);
}
}
for i in (0..NUM_VALUES).filter(|i| i % 2 == 0) {
assert_eq!(map.get(map.hash(&i), |&k| k == i), None);
}
for i in (0..NUM_VALUES).filter(|i| i % 2 != 0) {
assert_eq!(map.get(map.hash(&i), |&k| k == i), Some(i));
}
run_deferred();
}
#[test]
fn keys_in_single_segment() {
let map =
HashMap::with_num_segments_capacity_and_hasher(1, 0, DefaultHashBuilder::default());
assert!(map.is_empty());
assert_eq!(map.len(), 0);
const NUM_KEYS: usize = 200;
for i in 0..NUM_KEYS {
let hash = map.hash(&i);
assert_eq!(map.insert_entry_and(i, hash, i, |_, v| *v), None);
}
assert!(!map.is_empty());
assert_eq!(map.len(), NUM_KEYS);
let mut keys = map.keys(0, |k| *k).unwrap();
assert_eq!(keys.len(), NUM_KEYS);
keys.sort_unstable();
for (i, key) in keys.into_iter().enumerate() {
assert_eq!(i, key);
}
for i in (0..NUM_KEYS).step_by(2) {
assert_eq!(map.remove(map.hash(&i), |&k| k == i), Some(i));
}
assert!(!map.is_empty());
assert_eq!(map.len(), NUM_KEYS / 2);
let mut keys = map.keys(0, |k| *k).unwrap();
assert_eq!(keys.len(), NUM_KEYS / 2);
keys.sort_unstable();
for (i, key) in keys.into_iter().enumerate() {
assert_eq!(i, key / 2);
}
run_deferred();
}
}