//! Concurrent hash maps.
//!
//! This crate implements concurrent hash maps, based on bucket-level multi-reader locks. It has
//! excellent performance characteristics¹ and supports resizing, in-place mutation and more.
//!
//! The API derives directly from `std::collections::HashMap`, giving it a familiar feel.
//!
//! ¹Note that it heavily depends on the behavior of your program, but in most cases, it's really
//! good. In some (rare) cases you might want atomic hash maps instead.
//!
//! # How it works
//!
//! `chashmap` is not lockless, but it distributes locks across the map such that lock contentions
//! (which is what could make accesses expensive) are very rare.
//!
//! Hash maps consists of so called "buckets", which each defines a potential entry in the table.
//! The bucket of some key-value pair is determined by the hash of the key. By holding a read-write
//! lock for each bucket, we ensure that you will generally be able to insert, read, modify, etc.
//! with only one or two locking subroutines.
//!
//! There is a special-case: reallocation. When the table is filled up such that very few buckets
//! are free (note that this is "very few" and not "no", since the load factor shouldn't get too
//! high as it hurts performance), a global lock is obtained while rehashing the table. This is
//! pretty inefficient, but it rarely happens, and due to the adaptive nature of the capacity, it
//! will only happen a few times when the map has just been initialized.
//!
//! ## Collision resolution
//!
//! When two hashes collide, they cannot share the same bucket, so there must be an algorithm which
//! can resolve collisions. In our case, we use linear probing, which means that we take the bucket
//! following it, and repeat until we find a free bucket.
//!
//! This method is far from ideal, but superior methods like Robin-Hood hashing works poorly (if at
//! all) in a concurrent structure.
//!
//! # The API
//!
//! The API should feel very familiar, if you are used to the libstd hash map implementation. They
//! share many of the methods, and I've carefully made sure that all the items, which have similarly
//! named items in libstd, matches in semantics and behavior.
#[cfg(test)]
mod tests;
use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
use owning_ref::{OwningHandle, OwningRef};
use stable_deref_trait::StableDeref;
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::future::Future;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{self, AtomicUsize};
use std::{cmp, fmt, iter, mem, ops};
/// The atomic ordering used throughout the code.
const ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;
/// The length-to-capacity factor.
const LENGTH_MULTIPLIER: usize = 4;
/// The maximal load factor's numerator.
const MAX_LOAD_FACTOR_NUM: usize = 100 - 15;
/// The maximal load factor's denominator.
const MAX_LOAD_FACTOR_DENOM: usize = 100;
/// The default initial capacity.
const DEFAULT_INITIAL_CAPACITY: usize = 64;
/// The lowest capacity a table can have.
const MINIMUM_CAPACITY: usize = 8;
/// A bucket state.
///
/// Buckets are the bricks of hash tables. They represent a single entry into the table.
#[derive(Clone)]
enum Bucket<K, V> {
/// The bucket contains a key-value pair.
Contains(K, V),
/// The bucket is empty and has never been used.
///
/// Since hash collisions are resolved by jumping to the next bucket, some buckets can cluster
/// together, meaning that they are potential candidates for lookups. Empty buckets can be seen
/// as the delimiter of such cluters.
Empty,
/// The bucket was removed.
///
/// The technique of distincting between "empty" and "removed" was first described by Knuth.
/// The idea is that when you search for a key, you will probe over these buckets, since the
/// key could have been pushed behind the removed element:
///```notest
/// Contains(k1, v1) // hash = h
/// Removed
/// Contains(k2, v2) // hash = h
///```
/// If we stopped at `Removed`, we won't be able to find the second KV pair. So `Removed` is
/// semantically different from `Empty`, as the search won't stop.
///
/// However, we are still able to insert new pairs at the removed buckets.
Removed,
}
impl<K, V> Bucket<K, V> {
/// Is this bucket 'empty'?
fn is_empty(&self) -> bool {
matches!(*self, Bucket::Empty)
}
/// Is this bucket 'removed'?
fn is_removed(&self) -> bool {
matches!(*self, Bucket::Removed)
}
/// Is this bucket free?
///
/// "Free" means that it can safely be replace by another bucket — namely that the bucket is
/// not occupied.
fn is_free(&self) -> bool {
match *self {
// The two replacable bucket types are removed buckets and empty buckets.
Bucket::Removed | Bucket::Empty => true,
// KV pairs can't be replaced as they contain data.
Bucket::Contains(..) => false,
}
}
/// Get the value (if any) of this bucket.
///
/// This gets the value of the KV pair, if any. If the bucket is not a KV pair, `None` is
/// returned.
fn value(self) -> Option<V> {
if let Bucket::Contains(_, val) = self {
Some(val)
} else {
None
}
}
/// Get a reference to the value of the bucket (if any).
///
/// This returns a reference to the value of the bucket, if it is a KV pair. If not, it will
/// return `None`.
///
/// Rather than `Option`, it returns a `Result`, in order to make it easier to work with the
/// `owning_ref` crate (`try_new` and `try_map` of `OwningHandle` and `OwningRef`
/// respectively).
fn value_ref(&self) -> Result<&V, ()> {
if let Bucket::Contains(_, ref val) = *self {
Ok(val)
} else {
Err(())
}
}
/// Does the bucket match a given key?
///
/// This returns `true` if the bucket is a KV pair with key `key`. If not, `false` is returned.
fn key_matches(&self, key: &K) -> bool
where
K: PartialEq,
{
if let Bucket::Contains(ref candidate_key, _) = *self {
// Check if the keys matches.
candidate_key == key
} else {
// The bucket isn't a KV pair, so we'll return false, since there is no key to test
// against.
false
}
}
}
/// The low-level representation of the hash table.
///
/// This is different from `CHashMap` in two ways:
///
/// 1. It is not wrapped in a lock, meaning that resizing and reallocation is not possible.
/// 2. It does not track the number of occupied buckets, making it expensive to obtain the load
/// factor.
struct Table<K, V, S> {
/// The hash function builder.
///
/// When a `Table` use the default hash builder, it randomly picks a hash function from
/// some family of functions in libstd. This effectively eliminates the issue of hash flooding.
hash_builder: S,
/// The bucket array.
///
/// This vector stores the buckets. The order in which they're stored is far from arbitrary: A
/// KV pair `(key, val)`'s first priority location is at `self.hash(&key) % len`. If not
/// possible, the next bucket is used, and this process repeats until the bucket is free (or
/// the end is reached, in which we simply wrap around).
buckets: Vec<RwLock<Bucket<K, V>>>,
}
impl<K, V> Table<K, V, RandomState> {
/// Create a table with a certain number of buckets.
fn new(buckets: usize) -> Self {
// TODO: For some obscure reason `RwLock` doesn't implement `Clone`.
// Fill a vector with `buckets` of `Empty` buckets.
let mut vec = Vec::with_capacity(buckets);
for _ in 0..buckets {
vec.push(RwLock::new(Bucket::Empty));
}
Table {
// Generate a hash function.
hash_builder: RandomState::new(),
buckets: vec,
}
}
/// Create a table with at least some capacity.
fn with_capacity(cap: usize) -> Self {
// The + 1 is needed to avoid losing fractional bucket to integer division.
Table::new(cmp::max(
MINIMUM_CAPACITY,
cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
))
}
}
impl<K, V, S: BuildHasher> Table<K, V, S> {
/// Create a `Table` with the `BuildHasher`
fn with_hasher(buckets: usize, hash_builder: S) -> Self {
// TODO: For some obscure reason `RwLock` doesn't implement `Clone`.
// Fill a vector with `buckets` of `Empty` buckets.
let mut vec = Vec::with_capacity(buckets);
for _ in 0..buckets {
vec.push(RwLock::new(Bucket::Empty));
}
Table {
hash_builder,
buckets: vec,
}
}
/// Create a `Table` with a specific capacity and the `BuildHasher`
fn with_capacity_and_hasher(cap: usize, hash_builder: S) -> Self {
// The + 1 is needed to avoid losing fractional bucket to integer division.
Table::with_hasher(
cmp::max(
MINIMUM_CAPACITY,
cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
),
hash_builder,
)
}
}
impl<K: PartialEq + Hash, V, S: BuildHasher> Table<K, V, S> {
/// Hash some key through the internal hash function.
fn hash<T: ?Sized>(&self, key: &T) -> usize
where
T: Hash,
{
// Build the initial hash function state.
let mut hasher = self.hash_builder.build_hasher();
// Hash the key.
key.hash(&mut hasher);
// Cast to `usize`. Since the hash function returns `u64`, this cast won't ever cause
// entropy less than the output space.
hasher.finish() as usize
}
/// Scan from the first priority of a key until a match is found.
///
/// This scans from the first priority of `key` (as defined by its hash), until a match is
/// found (will wrap on end), i.e. `matches` returns `true` with the bucket as argument.
///
/// The read guard from the RW-lock of the bucket is returned.
async fn scan<F, Q: ?Sized>(&self, key: &Q, matches: F) -> RwLockReadGuard<'_, Bucket<K, V>>
where
F: Fn(&Bucket<K, V>) -> bool,
K: Borrow<Q>,
Q: Hash,
{
// Hash the key.
let hash = self.hash(key);
// Start at the first priority bucket, and then move upwards, searching for the matching
// bucket.
for i in 0..self.buckets.len() {
// Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
let lock = self.buckets[(hash + i) % self.buckets.len()].read().await;
// Check if it is a match.
if matches(&lock) {
// Yup. Return.
return lock;
}
}
panic!("`CHashMap` scan failed! No entry found.");
}
/// Scan from the first priority of a key until a match is found (mutable guard).
///
/// This is similar to `scan`, but instead of an immutable lock guard, a mutable lock guard is
/// returned.
async fn scan_mut<F, Q: ?Sized>(
&self,
key: &Q,
matches: F,
) -> RwLockWriteGuard<'_, Bucket<K, V>>
where
F: Fn(&Bucket<K, V>) -> bool,
K: Borrow<Q>,
Q: Hash,
{
// Hash the key.
let hash = self.hash(key);
// Start at the first priority bucket, and then move upwards, searching for the matching
// bucket.
for i in 0..self.buckets.len() {
// Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
// Check if it is a match.
if matches(&lock) {
// Yup. Return.
return lock;
}
}
panic!("`CHashMap` scan_mut failed! No entry found.");
}
/// Scan from the first priority of a key until a match is found (bypass locks).
///
/// This is similar to `scan_mut`, but it safely bypasses the locks by making use of the
/// aliasing invariants of `&mut`.
fn scan_mut_no_lock<F>(&mut self, key: &K, matches: F) -> &mut Bucket<K, V>
where
F: Fn(&Bucket<K, V>) -> bool,
{
// Hash the key.
let hash = self.hash(key);
// TODO: To tame the borrowchecker, we fetch this in advance.
let len = self.buckets.len();
// Start at the first priority bucket, and then move upwards, searching for the matching
// bucket.
for i in 0..self.buckets.len() {
// TODO: hacky hacky
let idx = (hash + i) % len;
// Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
// Check if it is a match.
if matches(self.buckets[idx].get_mut()) {
// Yup. Return.
return self.buckets[idx].get_mut();
}
}
panic!("`CHashMap` scan_mut_no_lock failed! No entry found.");
}
/// Find a bucket with some key, or a free bucket in same cluster.
///
/// This scans for buckets with key `key`. If one is found, it will be returned. If none are
/// found, it will return a free bucket in the same cluster.
async fn lookup_or_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
// Hash the key.
let hash = self.hash(key);
// The encountered free bucket.
let mut free = None;
// Start at the first priority bucket, and then move upwards, searching for the matching
// bucket.
for i in 0..self.buckets.len() {
// Get the lock of the `i`'th bucket after the first priority bucket (wrap on end).
let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
if lock.key_matches(key) {
// We found a match.
return lock;
} else if lock.is_empty() {
// The cluster is over. Use the encountered free bucket, if any.
return free.unwrap_or(lock);
} else if lock.is_removed() && free.is_none() {
// We found a free bucket, so we can store it to later (if we don't already have
// one).
free = Some(lock)
}
}
free.expect("No free buckets found")
}
/// Lookup some key.
///
/// This searches some key `key`, and returns a immutable lock guard to its bucket. If the key
/// couldn't be found, the returned value will be an `Empty` cluster.
async fn lookup<Q: ?Sized>(&self, key: &Q) -> RwLockReadGuard<'_, Bucket<K, V>>
where
K: Borrow<Q>,
Q: PartialEq + Hash,
{
self.scan(key, |x| match *x {
// We'll check that the keys does indeed match, as the chance of hash collisions
// happening is inevitable
Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
// We reached an empty bucket, meaning that there are no more buckets, not even removed
// ones, to search.
Bucket::Empty => true,
_ => false,
})
.await
}
/// Lookup some key, mutably.
///
/// This is similar to `lookup`, but it returns a mutable guard.
///
/// Replacing at this bucket is safe as the bucket will be in the same cluster of buckets as
/// the first priority cluster.
async fn lookup_mut<Q: ?Sized>(&self, key: &Q) -> RwLockWriteGuard<'_, Bucket<K, V>>
where
K: Borrow<Q>,
Q: PartialEq + Hash,
{
self.scan_mut(key, |x| match *x {
// We'll check that the keys does indeed match, as the chance of hash collisions
// happening is inevitable
Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
// We reached an empty bucket, meaning that there are no more buckets, not even removed
// ones, to search.
Bucket::Empty => true,
_ => false,
})
.await
}
/// Find a free bucket in the same cluster as some key.
///
/// This means that the returned lock guard defines a valid, free bucket, where `key` can be
/// inserted.
async fn find_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
self.scan_mut(key, |x| x.is_free()).await
}
/// Find a free bucket in the same cluster as some key (bypassing locks).
///
/// This is similar to `find_free`, except that it safely bypasses locks through the aliasing
/// guarantees of `&mut`.
fn find_free_no_lock(&mut self, key: &K) -> &mut Bucket<K, V> {
self.scan_mut_no_lock(key, |x| x.is_free())
}
/// Fill the table with data from another table.
///
/// This is used to efficiently copy the data of `table` into `self`.
///
/// # Important
///
/// The table should be empty for this to work correctly/logically.
fn fill(&mut self, table: Self) {
// Run over all the buckets.
for i in table.buckets {
// We'll only transfer the bucket if it is a KV pair.
if let Bucket::Contains(key, val) = i.into_inner() {
// Find a bucket where the KV pair can be inserted.
// shush clippy, the comments are important
#[allow(clippy::match_like_matches_macro)]
let bucket = self.scan_mut_no_lock(&key, |x| match *x {
// Halt on an empty bucket.
Bucket::Empty => true,
// We'll assume that the rest of the buckets either contains other KV pairs (in
// particular, no buckets have been removed in the newly construct table).
_ => false,
});
// Set the bucket to the KV pair.
*bucket = Bucket::Contains(key, val);
}
}
}
}
/*impl<K: Clone, V: Clone, S: Clone> Clone for Table<K, V, S> {
fn clone(&self) -> Self {
Table {
// Since we copy plainly without rehashing etc., it is important that we keep the same
// hash function.
hash_builder: self.hash_builder.clone(),
// Lock and clone every bucket individually.
buckets: self
.buckets
.iter()
// TODO: NO NO VERY BAD
.map(|x| RwLock::new(futures::executor::block_on(x.read()).clone()))
.collect(),
}
}
}*/
impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for Table<K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// create a debug map and fill with entries
let mut map = f.debug_map();
// We'll just run over all buckets and output one after one.
for i in &self.buckets {
// Acquire the lock.
let lock = i.try_read();
// Check if the bucket actually contains anything.
if let Some(Bucket::Contains(ref key, ref val)) = lock.as_deref() {
// add this entry to the map
map.entry(key, val);
} else {
map.entry(&"<locked>", &"<locked>");
}
}
map.finish()
}
}
/// An iterator over the entries of some table.
pub struct IntoIter<K, V, S> {
/// The inner table.
table: Table<K, V, S>,
}
impl<K, V, S> Iterator for IntoIter<K, V, S> {
type Item = (K, V);
fn next(&mut self) -> Option<(K, V)> {
// We own the table, and can thus do what we want with it. We'll simply pop from the
// buckets until we find a bucket containing data.
while let Some(bucket) = self.table.buckets.pop() {
// We can bypass dem ebil locks.
if let Bucket::Contains(key, val) = bucket.into_inner() {
// The bucket contained data, so we'll return the pair.
return Some((key, val));
}
}
// We've exhausted all the buckets, and no more data could be found.
None
}
}
impl<K, V, S> IntoIterator for Table<K, V, S> {
type Item = (K, V);
type IntoIter = IntoIter<K, V, S>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { table: self }
}
}
/// A RAII guard for reading an entry of a hash map.
///
/// This is an access type dereferencing to the inner value of the entry. It will handle unlocking
/// on drop.
pub struct ReadGuard<'a, K: 'a, V: 'a, S> {
/// The inner hecking long type.
#[allow(clippy::type_complexity)]
inner: OwningRef<
OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableReadGuard<'a, Bucket<K, V>>>,
V,
>,
}
impl<'a, K, V, S> ops::Deref for ReadGuard<'a, K, V, S> {
type Target = V;
fn deref(&self) -> &V {
&self.inner
}
}
impl<'a, K, V: PartialEq, S> cmp::PartialEq for ReadGuard<'a, K, V, S> {
fn eq(&self, other: &ReadGuard<'a, K, V, S>) -> bool {
self == other
}
}
impl<'a, K, V: Eq, S> cmp::Eq for ReadGuard<'a, K, V, S> {}
impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for ReadGuard<'a, K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ReadGuard({:?})", &**self)
}
}
/// Wrapper type for [`RwLockReadGuard`] that implements [`StableDeref`].
#[repr(transparent)]
struct StableReadGuard<'a, T: ?Sized>(RwLockReadGuard<'a, T>);
impl<T: ?Sized> std::ops::Deref for StableReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
unsafe impl<T: ?Sized> StableDeref for StableReadGuard<'_, T> {}
impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// A mutable RAII guard for reading an entry of a hash map.
///
/// This is an access type dereferencing to the inner value of the entry. It will handle unlocking
/// on drop.
pub struct WriteGuard<'a, K: 'a, V: 'a, S> {
/// The inner hecking long type.
#[allow(clippy::type_complexity)]
inner: OwningHandle<
OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableWriteGuard<'a, Bucket<K, V>>>,
&'a mut V,
>,
}
impl<'a, K, V, S> ops::Deref for WriteGuard<'a, K, V, S> {
type Target = V;
fn deref(&self) -> &V {
&self.inner
}
}
impl<'a, K, V, S> ops::DerefMut for WriteGuard<'a, K, V, S> {
fn deref_mut(&mut self) -> &mut V {
&mut self.inner
}
}
impl<'a, K, V: PartialEq, S> cmp::PartialEq for WriteGuard<'a, K, V, S> {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
impl<'a, K, V: Eq, S> cmp::Eq for WriteGuard<'a, K, V, S> {}
impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for WriteGuard<'a, K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "WriteGuard({:?})", &**self)
}
}
#[repr(transparent)]
struct StableWriteGuard<'a, T: ?Sized>(RwLockWriteGuard<'a, T>);
impl<T: ?Sized> std::ops::Deref for StableWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl<T: ?Sized> std::ops::DerefMut for StableWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.0
}
}
unsafe impl<T: ?Sized> StableDeref for StableWriteGuard<'_, T> {}
impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
/// A concurrent hash map.
///
/// This type defines a concurrent associative array, based on hash tables with linear probing and
/// dynamic resizing.
///
/// The idea is to let each entry hold a multi-reader lock, effectively limiting lock contentions
/// to writing simultaneously on the same entry, and resizing the table.
///
/// It is not an atomic or lockless hash table, since such construction is only useful in very few
/// cases, due to limitations on in-place operations on values.
pub struct CHashMap<K, V, S = RandomState> {
/// The inner table.
table: RwLock<Table<K, V, S>>,
/// The total number of KV pairs in the table.
///
/// This is used to calculate the load factor.
len: AtomicUsize,
}
impl<K, V> CHashMap<K, V, RandomState> {
/// Create a new hash map with a certain capacity.
///
/// "Capacity" means the amount of entries the hash map can hold before reallocating. This
/// function allocates a hash map with at least the capacity of `cap`.
pub fn with_capacity(cap: usize) -> Self {
CHashMap {
// Start at 0 KV pairs.
len: AtomicUsize::new(0),
// Make a new empty table. We will make sure that it is at least one.
table: RwLock::new(Table::with_capacity(cap)),
}
}
/// Create a new hash map.
///
/// This creates a new hash map with some fixed initial capacity.
pub fn new() -> Self {
CHashMap::with_capacity(DEFAULT_INITIAL_CAPACITY)
}
}
impl<K, V, S> CHashMap<K, V, S> {
/// Get the number of entries in the hash table.
///
/// This is entirely atomic, and will not acquire any locks.
///
/// This is guaranteed to reflect the number of entries _at this particular moment.
pub fn len(&self) -> usize {
self.len.load(ORDERING)
}
/// Get the capacity of the hash table.
///
/// The capacity is equal to the number of entries the table can hold before reallocating.
pub async fn capacity(&self) -> usize {
cmp::max(MINIMUM_CAPACITY, self.buckets().await) * MAX_LOAD_FACTOR_NUM
/ MAX_LOAD_FACTOR_DENOM
}
/// Get the number of buckets of the hash table.
///
/// "Buckets" refers to the amount of potential entries in the inner table. It is different
/// from capacity, in the sense that the map cannot hold this number of entries, since it needs
/// to keep the load factor low.
pub async fn buckets(&self) -> usize {
self.table.read().await.buckets.len()
}
/// Is the hash table empty?
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Filter the map based on some predicate.
///
/// This tests every entry in the hash map by closure `predicate`. If it returns `true`, the
/// map will retain the entry. If not, the entry will be removed.
///
/// This won't lock the table. This can be a major performance trade-off, as it means that it
/// must lock on every table entry. However, it won't block other operations of the table,
/// while filtering.
pub async fn retain<F>(&self, predicate: F)
where
F: Fn(&K, &V) -> bool,
{
// Acquire the read lock to the table.
let table = self.table.read().await;
// Run over every bucket and apply the filter.
for bucket in &table.buckets {
// Acquire the read lock, which we will upgrade if necessary.
let lock = bucket.upgradable_read().await;
// Skip the free buckets.
match *lock {
Bucket::Contains(ref key, ref val) if !predicate(key, val) => {
// Predicate didn't match. Set the bucket to removed.
let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await;
*lock = Bucket::Removed;
// Decrement the length to account for the removed bucket.
// TODO: Can we somehow bundle these up to reduce the overhead of atomic
// operations? Storing in a local variable and then subtracting causes
// issues with consistency.
self.len.fetch_sub(1, ORDERING);
}
_ => (),
}
}
}
}
impl<K, V, S: Default + BuildHasher> CHashMap<K, V, S> {
/// Creates an empty `CHashMap` with the specified capacity, using `hash_builder`
/// to hash the keys.
///
/// The hash map will be able to hold at least `capacity` elements without
/// reallocating. If `capacity` is 0, the hash map will not allocate.
///
/// Warning: `hash_builder` is normally randomly generated, and
/// is designed to allow HashMaps to be resistant to attacks that
/// cause many collisions and very poor performance. Setting it
/// manually using this function can expose a DoS attack vector.
pub fn with_hasher_and_capacity(cap: usize, hash_builder: S) -> Self {
CHashMap {
// Start at 0 KV pairs.
len: AtomicUsize::new(0),
// Make a new empty table. We will make sure that it is at least one.
table: RwLock::new(Table::with_capacity_and_hasher(cap, hash_builder)),
}
}
/// Creates an empty `CHashMap` which will use the given hash builder to hash
/// keys.
///
/// The created map has the default initial capacity.
///
/// Warning: `hash_builder` is normally randomly generated, and
/// is designed to allow HashMaps to be resistant to attacks that
/// cause many collisions and very poor performance. Setting it
/// manually using this function can expose a DoS attack vector.
pub fn with_hasher(hash_builder: S) -> Self {
CHashMap::with_hasher_and_capacity(DEFAULT_INITIAL_CAPACITY, hash_builder)
}
}
impl<K, V, S> CHashMap<K, V, S>
where
S: Default + BuildHasher,
{
/// Clear the map.
///
/// This clears the hash map and returns the previous version of the map.
///
/// It is relatively efficient, although it needs to write lock a RW lock.
pub async fn clear(&self) -> CHashMap<K, V, S> {
// Acquire a writable lock.
let mut lock = self.table.write().await;
CHashMap {
// Replace the old table with an empty initial table.
table: RwLock::new(mem::replace(
&mut *lock,
Table::with_hasher(DEFAULT_INITIAL_CAPACITY, S::default()),
)),
// Replace the length with 0 and use the old length.
len: AtomicUsize::new(self.len.swap(0, ORDERING)),
}
}
}
#[repr(transparent)]
struct UnsafeSendFuture<F>(F);
impl<F: std::future::Future> std::future::Future for UnsafeSendFuture<F> {
type Output = F::Output;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|f| &mut f.0).poll(cx) }
}
}
unsafe impl<F> Send for UnsafeSendFuture<F> {}
impl<K: PartialEq + Hash, V, S: BuildHasher> CHashMap<K, V, S> {
/// Get the value of some key.
///
/// This will lookup the entry of some key `key`, and acquire the read-only lock. This means
/// that all other parties are blocked from _writing_ (not reading) this value while the guard
/// is held.
pub async fn get<Q: ?Sized>(&self, key: &Q) -> Option<ReadGuard<'_, K, V, S>>
where
K: Borrow<Q>,
Q: Hash + PartialEq,
{
self.get_inner(key).await
}
/// Ugly workaround because the compiler is overzealous about making futures with raw pointers
/// in their body !Send
fn get_inner<'this, 'a, 'b, Q: ?Sized>(
&'a self,
key: &'b Q,
) -> impl Future<Output = Option<ReadGuard<'a, K, V, S>>> + 'this + Send
where
K: Borrow<Q>,
Q: Hash + PartialEq,
Self: 'this,
'a: 'this,
'b: 'this,
{
UnsafeSendFuture(async move {
// Acquire the read lock and lookup in the table.
if let Ok(inner) = OwningRef::new(
OwningHandle::new_with_async_fn(
StableReadGuard(self.table.read().await),
|x| async move { StableReadGuard(unsafe { &*x }.lookup(key).await) },
)
.await,
)
.try_map(|x| x.value_ref())
{
// The bucket contains data.
Some(ReadGuard { inner })
} else {
// The bucket is empty/removed.
None
}
})
}
/// Get the (mutable) value of some key.
///
/// This will lookup the entry of some key `key`, and acquire the writable lock. This means
/// that all other parties are blocked from both reading and writing this value while the guard
/// is held.
pub async fn get_mut<Q: ?Sized>(&self, key: &Q) -> Option<WriteGuard<'_, K, V, S>>
where
K: Borrow<Q>,
Q: Hash + PartialEq,
{
self.get_mut_inner(key).await
}
/// Ugly workaround because the compiler is overzealous about making futures with raw pointers
/// in their body !Send
fn get_mut_inner<'this, 'a, 'b, Q: ?Sized>(
&'a self,
key: &'b Q,
) -> impl Future<Output = Option<WriteGuard<'a, K, V, S>>> + 'this + Send
where
K: Borrow<Q>,
Q: Hash + PartialEq,
Self: 'this,
'a: 'this,
'b: 'this,
{
UnsafeSendFuture(async move {
// Acquire the write lock and lookup in the table.
let handle = OwningHandle::try_new(
OwningHandle::new_with_async_fn(
StableReadGuard(self.table.read().await),
|x| async move {
StableWriteGuard(UnsafeSendFuture(unsafe { &*x }.lookup_mut(key)).await)
},
)
.await,
|x| match unsafe { &mut *(x as *mut Bucket<K, V>) } {
Bucket::Contains(_, ref mut val) => Ok(val),
// The bucket contains data.
_ => Err(()),
},
);
match handle {
Ok(inner) => Some(WriteGuard { inner }),
Err(_) => None,
}
})
}
/// Does the hash map contain this key?
pub async fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + PartialEq,
{
// Acquire the lock.
let lock = self.table.read().await;
// Look the key up in the table
let bucket = lock.lookup(key).await;
// Test if it is free or not.
!bucket.is_free()
// fuck im sleepy rn
// me too buddy
}
}
impl<K, V, S> CHashMap<K, V, S>
where
K: PartialEq + Hash,
S: BuildHasher + Default,
{
/// Insert a **new** entry.
///
/// This inserts an entry, which the map does not already contain, into the table. If the entry
/// exists, the old entry won't be replaced, nor will an error be returned. It will possibly
/// introduce silent bugs.
///
/// To be more specific, it assumes that the entry does not already exist, and will simply skip
/// to the end of the cluster, even if it does exist.
///
/// This is faster than e.g. `insert`, but should only be used, if you know that the entry
/// doesn't already exist.
///
/// # Warning
///
/// Only use this, if you know what you're doing. This can easily introduce very complex logic
/// errors.
///
/// For most other purposes, use `insert`.
///
/// # Panics
///
/// This might perform checks in debug mode testing if the key exists already.
pub async fn insert_new(&self, key: K, val: V) {
debug_assert!(
!self.contains_key(&key).await,
"Hash table contains already key, contrary to \
the assumptions about `insert_new`'s arguments."
);
// Expand and lock the table. We need to expand to ensure the bounds on the load factor.
let lock = self.table.read().await;
{
// Find the free bucket.
let mut bucket = lock.find_free(&key).await;
// Set the bucket to the new KV pair.
*bucket = Bucket::Contains(key, val);
}
// Expand the table (we know beforehand that the entry didn't already exist).
self.expand(lock).await;
}
/// Replace an existing entry, or insert a new one.
///
/// This will replace an existing entry and return the old entry, if any. If no entry exists,
/// it will simply insert the new entry and return `None`.
pub async fn insert(&self, key: K, val: V) -> Option<V> {
// Expand and lock the table. We need to expand to ensure the bounds on the load factor.
let lock = self.table.read().await;
let ret = {
// Lookup the key or a free bucket in the inner table.
let mut bucket = lock.lookup_or_free(&key).await;
// Replace the bucket.
mem::replace(&mut *bucket, Bucket::Contains(key, val)).value()
};
// Expand the table if no bucket was overwritten (i.e. the entry is fresh).
if ret.is_none() {
self.expand(lock).await;
}
ret
}
/// Insert or update.
///
/// This looks up `key`. If it exists, the reference to its value is passed through closure
/// `update`. If it doesn't exist, the result of closure `insert` is inserted.
pub async fn upsert<F, G>(&self, key: K, insert: F, update: G)
where
F: FnOnce() -> V,
G: FnOnce(&mut V),
{
// Expand and lock the table. We need to expand to ensure the bounds on the load factor.
let lock = self.table.read().await;
{
// Lookup the key or a free bucket in the inner table.
let mut bucket = lock.lookup_or_free(&key).await;
match *bucket {
// The bucket had KV pair!
Bucket::Contains(_, ref mut val) => {
// Run it through the closure.
update(val);
// TODO: We return to stop the borrowck to yell at us. This prevents the control flow
// from reaching the expansion after the match if it has been right here.
return;
}
// The bucket was empty, simply insert.
ref mut x => *x = Bucket::Contains(key, insert()),
}
}
// Expand the table (this will only happen if the function hasn't returned yet).
self.expand(lock).await;
}
/// Map or insert an entry.
///
/// This sets the value associated with key `key` to `f(Some(old_val))` (if it returns `None`,
/// the entry is removed) if it exists. If it does not exist, it inserts it with value
/// `f(None)`, unless the closure returns `None`.
///
/// Note that if `f` returns `None`, the entry of key `key` is removed unconditionally.
pub async fn alter<F, Fut>(&self, key: K, f: F)
where
F: FnOnce(Option<V>) -> Fut,
Fut: std::future::Future<Output = Option<V>>,
{
// Expand and lock the table. We need to expand to ensure the bounds on the load factor.
let lock = self.table.read().await;
{
// Lookup the key or a free bucket in the inner table.
let mut bucket = lock.lookup_or_free(&key).await;
match mem::replace(&mut *bucket, Bucket::Removed) {
Bucket::Contains(_, val) => {
if let Some(new_val) = f(Some(val)).await {
// Set the bucket to a KV pair with the new value.
*bucket = Bucket::Contains(key, new_val);
// No extension required, as the bucket already had a KV pair previously.
return;
} else {
// The old entry was removed, so we decrement the length of the map.
self.len.fetch_sub(1, ORDERING);
}
// TODO: We return as a hack to avoid the borrowchecker from thinking we moved a
// referenced object. Namely, under this match arm the expansion after the match
// statement won't ever be reached.
return;
}
_ => {
if let Some(new_val) = f(None).await {
// The previously free cluster will get a KV pair with the new value.
*bucket = Bucket::Contains(key, new_val);
} else {
return;
}
}
}
}
// A new entry was inserted, so naturally, we expand the table.
self.expand(lock).await;
}
/// Remove an entry.
///
/// This removes and returns the entry with key `key`. If no entry with said key exists, it
/// will simply return `None`.
pub async fn remove<Q: ?Sized>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: PartialEq + Hash,
{
// Acquire the read lock of the table.
let lock = self.table.read().await;
// Lookup the table, mutably.
let mut bucket = lock.lookup_mut(key).await;
// Remove the bucket.
match &mut *bucket {
// There was nothing to remove.
&mut Bucket::Removed | &mut Bucket::Empty => None,
// TODO: We know that this is a `Bucket::Contains` variant, but to bypass borrowck
// madness, we do weird weird stuff.
bucket => {
// Decrement the length of the map.
self.len.fetch_sub(1, ORDERING);
// Set the bucket to "removed" and return its value.
mem::replace(bucket, Bucket::Removed).value()
}
}
}
/// Reserve additional space.
///
/// This reserves additional `additional` buckets to the table. Note that it might reserve more
/// in order make reallocation less common.
pub async fn reserve(&self, additional: usize) {
// Get the new length.
let len = (self.len() + additional) * LENGTH_MULTIPLIER;
// Acquire the write lock (needed because we'll mess with the table).
let mut lock = self.table.write().await;
// Handle the case where another thread has resized the table while we were acquiring the
// lock.
if lock.buckets.len() < len {
// Swap the table out with a new table of desired size (multiplied by some factor).
let table = mem::replace(
&mut *lock,
Table::with_capacity_and_hasher(len, S::default()),
);
// Fill the new table with the data from the old table.
lock.fill(table);
}
}
/// Shrink the capacity of the map to reduce space usage.
///
/// This will shrink the capacity of the map to the needed amount (plus some additional space
/// to avoid reallocations), effectively reducing memory usage in cases where there is
/// excessive space.
///
/// It is healthy to run this once in a while, if the size of your hash map changes a lot (e.g.
/// has a high maximum case).
pub async fn shrink_to_fit(&self) {
// Acquire the write lock (needed because we'll mess with the table).
let mut lock = self.table.write().await;
// Swap the table out with a new table of desired size (multiplied by some factor).
let table = mem::replace(
&mut *lock,
Table::with_capacity_and_hasher(self.len(), S::default()),
);
// Fill the new table with the data from the old table.
lock.fill(table);
}
/// Increment the size of the hash map and expand it so one more entry can fit in.
///
/// This returns the read lock, such that the caller won't have to acquire it twice.
async fn expand(&self, lock: RwLockReadGuard<'_, Table<K, V, S>>) {
// Increment the length to take the new element into account.
let len = self.len.fetch_add(1, ORDERING) + 1;
// Extend if necessary. We multiply by some constant to adjust our load factor.
if len * MAX_LOAD_FACTOR_DENOM > lock.buckets.len() * MAX_LOAD_FACTOR_NUM {
// Drop the read lock to avoid deadlocks when acquiring the write lock.
drop(lock);
// Reserve 1 entry in space (the function will handle the excessive space logic).
self.reserve(1).await;
}
}
}
impl<K, V, S: Default + BuildHasher> Default for CHashMap<K, V, S> {
fn default() -> Self {
// Forward the call to `new`.
CHashMap::with_hasher(S::default())
}
}
/*impl<K: Clone, V: Clone, S: Clone> Clone for CHashMap<K, V, S> {
fn clone(&self) -> Self {
CHashMap {
table: RwLock::new(self.table.read().clone()),
len: AtomicUsize::new(self.len.load(ORDERING)),
}
}
}*/
/*impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for CHashMap<K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
(*self.table.read()).fmt(f)
}
}*/
impl<K, V, S> IntoIterator for CHashMap<K, V, S> {
type Item = (K, V);
type IntoIter = IntoIter<K, V, S>;
fn into_iter(self) -> IntoIter<K, V, S> {
self.table.into_inner().into_iter()
}
}
impl<K: PartialEq + Hash, V, S: Default + BuildHasher> iter::FromIterator<(K, V)>
for CHashMap<K, V, S>
{
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
// TODO: This step is required to obtain the length of the iterator. Eliminate it.
let vec: Vec<_> = iter.into_iter().collect();
let len = vec.len();
// Start with an empty table.
let mut table = Table::with_capacity_and_hasher(len, S::default());
// Fill the table with the pairs from the iterator.
for (key, val) in vec {
// Insert the KV pair. This is fine, as we are ensured that there are no duplicates in
// the iterator.
let bucket = table.find_free_no_lock(&key);
*bucket = Bucket::Contains(key, val);
}
CHashMap {
table: RwLock::new(table),
len: AtomicUsize::new(len),
}
}
}