#[cfg(test)]
mod tests;
use async_lock::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard};
use owning_ref::{OwningHandle, OwningRef};
use stable_deref_trait::StableDeref;
use std::borrow::Borrow;
use std::collections::hash_map::RandomState;
use std::future::Future;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::atomic::{self, AtomicUsize};
use std::{cmp, fmt, iter, mem, ops};
const ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;
const LENGTH_MULTIPLIER: usize = 4;
const MAX_LOAD_FACTOR_NUM: usize = 100 - 15;
const MAX_LOAD_FACTOR_DENOM: usize = 100;
const DEFAULT_INITIAL_CAPACITY: usize = 64;
const MINIMUM_CAPACITY: usize = 8;
#[derive(Clone)]
enum Bucket<K, V> {
Contains(K, V),
Empty,
Removed,
}
impl<K, V> Bucket<K, V> {
fn is_empty(&self) -> bool {
matches!(*self, Bucket::Empty)
}
fn is_removed(&self) -> bool {
matches!(*self, Bucket::Removed)
}
fn is_free(&self) -> bool {
match *self {
Bucket::Removed | Bucket::Empty => true,
Bucket::Contains(..) => false,
}
}
fn value(self) -> Option<V> {
if let Bucket::Contains(_, val) = self {
Some(val)
} else {
None
}
}
fn value_ref(&self) -> Result<&V, ()> {
if let Bucket::Contains(_, ref val) = *self {
Ok(val)
} else {
Err(())
}
}
fn key_matches(&self, key: &K) -> bool
where
K: PartialEq,
{
if let Bucket::Contains(ref candidate_key, _) = *self {
candidate_key == key
} else {
false
}
}
}
struct Table<K, V, S> {
hash_builder: S,
buckets: Vec<RwLock<Bucket<K, V>>>,
}
impl<K, V> Table<K, V, RandomState> {
fn new(buckets: usize) -> Self {
let mut vec = Vec::with_capacity(buckets);
for _ in 0..buckets {
vec.push(RwLock::new(Bucket::Empty));
}
Table {
hash_builder: RandomState::new(),
buckets: vec,
}
}
fn with_capacity(cap: usize) -> Self {
Table::new(cmp::max(
MINIMUM_CAPACITY,
cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
))
}
}
impl<K, V, S: BuildHasher> Table<K, V, S> {
fn with_hasher(buckets: usize, hash_builder: S) -> Self {
let mut vec = Vec::with_capacity(buckets);
for _ in 0..buckets {
vec.push(RwLock::new(Bucket::Empty));
}
Table {
hash_builder,
buckets: vec,
}
}
fn with_capacity_and_hasher(cap: usize, hash_builder: S) -> Self {
Table::with_hasher(
cmp::max(
MINIMUM_CAPACITY,
cap * MAX_LOAD_FACTOR_DENOM / MAX_LOAD_FACTOR_NUM + 1,
),
hash_builder,
)
}
}
impl<K: PartialEq + Hash, V, S: BuildHasher> Table<K, V, S> {
fn hash<T: ?Sized>(&self, key: &T) -> usize
where
T: Hash,
{
let mut hasher = self.hash_builder.build_hasher();
key.hash(&mut hasher);
hasher.finish() as usize
}
async fn scan<F, Q: ?Sized>(&self, key: &Q, matches: F) -> RwLockReadGuard<'_, Bucket<K, V>>
where
F: Fn(&Bucket<K, V>) -> bool,
K: Borrow<Q>,
Q: Hash,
{
let hash = self.hash(key);
for i in 0..self.buckets.len() {
let lock = self.buckets[(hash + i) % self.buckets.len()].read().await;
if matches(&lock) {
return lock;
}
}
panic!("`CHashMap` scan failed! No entry found.");
}
async fn scan_mut<F, Q: ?Sized>(
&self,
key: &Q,
matches: F,
) -> RwLockWriteGuard<'_, Bucket<K, V>>
where
F: Fn(&Bucket<K, V>) -> bool,
K: Borrow<Q>,
Q: Hash,
{
let hash = self.hash(key);
for i in 0..self.buckets.len() {
let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
if matches(&lock) {
return lock;
}
}
panic!("`CHashMap` scan_mut failed! No entry found.");
}
fn scan_mut_no_lock<F>(&mut self, key: &K, matches: F) -> &mut Bucket<K, V>
where
F: Fn(&Bucket<K, V>) -> bool,
{
let hash = self.hash(key);
let len = self.buckets.len();
for i in 0..self.buckets.len() {
let idx = (hash + i) % len;
if matches(self.buckets[idx].get_mut()) {
return self.buckets[idx].get_mut();
}
}
panic!("`CHashMap` scan_mut_no_lock failed! No entry found.");
}
async fn lookup_or_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
let hash = self.hash(key);
let mut free = None;
for i in 0..self.buckets.len() {
let lock = self.buckets[(hash + i) % self.buckets.len()].write().await;
if lock.key_matches(key) {
return lock;
} else if lock.is_empty() {
return free.unwrap_or(lock);
} else if lock.is_removed() && free.is_none() {
free = Some(lock)
}
}
free.expect("No free buckets found")
}
async fn lookup<Q: ?Sized>(&self, key: &Q) -> RwLockReadGuard<'_, Bucket<K, V>>
where
K: Borrow<Q>,
Q: PartialEq + Hash,
{
self.scan(key, |x| match *x {
Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
Bucket::Empty => true,
_ => false,
})
.await
}
async fn lookup_mut<Q: ?Sized>(&self, key: &Q) -> RwLockWriteGuard<'_, Bucket<K, V>>
where
K: Borrow<Q>,
Q: PartialEq + Hash,
{
self.scan_mut(key, |x| match *x {
Bucket::Contains(ref candidate_key, _) if key.eq(candidate_key.borrow()) => true,
Bucket::Empty => true,
_ => false,
})
.await
}
async fn find_free(&self, key: &K) -> RwLockWriteGuard<'_, Bucket<K, V>> {
self.scan_mut(key, |x| x.is_free()).await
}
fn find_free_no_lock(&mut self, key: &K) -> &mut Bucket<K, V> {
self.scan_mut_no_lock(key, |x| x.is_free())
}
fn fill(&mut self, table: Self) {
for i in table.buckets {
if let Bucket::Contains(key, val) = i.into_inner() {
#[allow(clippy::match_like_matches_macro)]
let bucket = self.scan_mut_no_lock(&key, |x| match *x {
Bucket::Empty => true,
_ => false,
});
*bucket = Bucket::Contains(key, val);
}
}
}
}
impl<K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for Table<K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut map = f.debug_map();
for i in &self.buckets {
let lock = i.try_read();
if let Some(Bucket::Contains(ref key, ref val)) = lock.as_deref() {
map.entry(key, val);
} else {
map.entry(&"<locked>", &"<locked>");
}
}
map.finish()
}
}
pub struct IntoIter<K, V, S> {
table: Table<K, V, S>,
}
impl<K, V, S> Iterator for IntoIter<K, V, S> {
type Item = (K, V);
fn next(&mut self) -> Option<(K, V)> {
while let Some(bucket) = self.table.buckets.pop() {
if let Bucket::Contains(key, val) = bucket.into_inner() {
return Some((key, val));
}
}
None
}
}
impl<K, V, S> IntoIterator for Table<K, V, S> {
type Item = (K, V);
type IntoIter = IntoIter<K, V, S>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { table: self }
}
}
pub struct ReadGuard<'a, K: 'a, V: 'a, S> {
#[allow(clippy::type_complexity)]
inner: OwningRef<
OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableReadGuard<'a, Bucket<K, V>>>,
V,
>,
}
impl<'a, K, V, S> ops::Deref for ReadGuard<'a, K, V, S> {
type Target = V;
fn deref(&self) -> &V {
&self.inner
}
}
impl<'a, K, V: PartialEq, S> cmp::PartialEq for ReadGuard<'a, K, V, S> {
fn eq(&self, other: &ReadGuard<'a, K, V, S>) -> bool {
self == other
}
}
impl<'a, K, V: Eq, S> cmp::Eq for ReadGuard<'a, K, V, S> {}
impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for ReadGuard<'a, K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "ReadGuard({:?})", &**self)
}
}
#[repr(transparent)]
struct StableReadGuard<'a, T: ?Sized>(RwLockReadGuard<'a, T>);
impl<T: ?Sized> std::ops::Deref for StableReadGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
unsafe impl<T: ?Sized> StableDeref for StableReadGuard<'_, T> {}
impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableReadGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
pub struct WriteGuard<'a, K: 'a, V: 'a, S> {
#[allow(clippy::type_complexity)]
inner: OwningHandle<
OwningHandle<StableReadGuard<'a, Table<K, V, S>>, StableWriteGuard<'a, Bucket<K, V>>>,
&'a mut V,
>,
}
impl<'a, K, V, S> ops::Deref for WriteGuard<'a, K, V, S> {
type Target = V;
fn deref(&self) -> &V {
&self.inner
}
}
impl<'a, K, V, S> ops::DerefMut for WriteGuard<'a, K, V, S> {
fn deref_mut(&mut self) -> &mut V {
&mut self.inner
}
}
impl<'a, K, V: PartialEq, S> cmp::PartialEq for WriteGuard<'a, K, V, S> {
fn eq(&self, other: &Self) -> bool {
self == other
}
}
impl<'a, K, V: Eq, S> cmp::Eq for WriteGuard<'a, K, V, S> {}
impl<'a, K: fmt::Debug, V: fmt::Debug, S> fmt::Debug for WriteGuard<'a, K, V, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "WriteGuard({:?})", &**self)
}
}
#[repr(transparent)]
struct StableWriteGuard<'a, T: ?Sized>(RwLockWriteGuard<'a, T>);
impl<T: ?Sized> std::ops::Deref for StableWriteGuard<'_, T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl<T: ?Sized> std::ops::DerefMut for StableWriteGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut *self.0
}
}
unsafe impl<T: ?Sized> StableDeref for StableWriteGuard<'_, T> {}
impl<T: std::fmt::Debug + ?Sized> std::fmt::Debug for StableWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl<T: std::fmt::Display + ?Sized> std::fmt::Display for StableWriteGuard<'_, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
pub struct CHashMap<K, V, S = RandomState> {
table: RwLock<Table<K, V, S>>,
len: AtomicUsize,
}
impl<K, V> CHashMap<K, V, RandomState> {
pub fn with_capacity(cap: usize) -> Self {
CHashMap {
len: AtomicUsize::new(0),
table: RwLock::new(Table::with_capacity(cap)),
}
}
pub fn new() -> Self {
CHashMap::with_capacity(DEFAULT_INITIAL_CAPACITY)
}
}
impl<K, V, S> CHashMap<K, V, S> {
pub fn len(&self) -> usize {
self.len.load(ORDERING)
}
pub async fn capacity(&self) -> usize {
cmp::max(MINIMUM_CAPACITY, self.buckets().await) * MAX_LOAD_FACTOR_NUM
/ MAX_LOAD_FACTOR_DENOM
}
pub async fn buckets(&self) -> usize {
self.table.read().await.buckets.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub async fn retain<F>(&self, predicate: F)
where
F: Fn(&K, &V) -> bool,
{
let table = self.table.read().await;
for bucket in &table.buckets {
let lock = bucket.upgradable_read().await;
match *lock {
Bucket::Contains(ref key, ref val) if !predicate(key, val) => {
let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await;
*lock = Bucket::Removed;
self.len.fetch_sub(1, ORDERING);
}
_ => (),
}
}
}
}
impl<K, V, S: Default + BuildHasher> CHashMap<K, V, S> {
pub fn with_hasher_and_capacity(cap: usize, hash_builder: S) -> Self {
CHashMap {
len: AtomicUsize::new(0),
table: RwLock::new(Table::with_capacity_and_hasher(cap, hash_builder)),
}
}
pub fn with_hasher(hash_builder: S) -> Self {
CHashMap::with_hasher_and_capacity(DEFAULT_INITIAL_CAPACITY, hash_builder)
}
}
impl<K, V, S> CHashMap<K, V, S>
where
S: Default + BuildHasher,
{
pub async fn clear(&self) -> CHashMap<K, V, S> {
let mut lock = self.table.write().await;
CHashMap {
table: RwLock::new(mem::replace(
&mut *lock,
Table::with_hasher(DEFAULT_INITIAL_CAPACITY, S::default()),
)),
len: AtomicUsize::new(self.len.swap(0, ORDERING)),
}
}
}
#[repr(transparent)]
struct UnsafeSendFuture<F>(F);
impl<F: std::future::Future> std::future::Future for UnsafeSendFuture<F> {
type Output = F::Output;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
unsafe { self.map_unchecked_mut(|f| &mut f.0).poll(cx) }
}
}
unsafe impl<F> Send for UnsafeSendFuture<F> {}
impl<K: PartialEq + Hash, V, S: BuildHasher> CHashMap<K, V, S> {
pub async fn get<Q: ?Sized>(&self, key: &Q) -> Option<ReadGuard<'_, K, V, S>>
where
K: Borrow<Q>,
Q: Hash + PartialEq,
{
self.get_inner(key).await
}
fn get_inner<'this, 'a, 'b, Q: ?Sized>(
&'a self,
key: &'b Q,
) -> impl Future<Output = Option<ReadGuard<'a, K, V, S>>> + 'this + Send
where
K: Borrow<Q>,
Q: Hash + PartialEq,
Self: 'this,
'a: 'this,
'b: 'this,
{
UnsafeSendFuture(async move {
if let Ok(inner) = OwningRef::new(
OwningHandle::new_with_async_fn(
StableReadGuard(self.table.read().await),
|x| async move { StableReadGuard(unsafe { &*x }.lookup(key).await) },
)
.await,
)
.try_map(|x| x.value_ref())
{
Some(ReadGuard { inner })
} else {
None
}
})
}
pub async fn get_mut<Q: ?Sized>(&self, key: &Q) -> Option<WriteGuard<'_, K, V, S>>
where
K: Borrow<Q>,
Q: Hash + PartialEq,
{
self.get_mut_inner(key).await
}
fn get_mut_inner<'this, 'a, 'b, Q: ?Sized>(
&'a self,
key: &'b Q,
) -> impl Future<Output = Option<WriteGuard<'a, K, V, S>>> + 'this + Send
where
K: Borrow<Q>,
Q: Hash + PartialEq,
Self: 'this,
'a: 'this,
'b: 'this,
{
UnsafeSendFuture(async move {
let handle = OwningHandle::try_new(
OwningHandle::new_with_async_fn(
StableReadGuard(self.table.read().await),
|x| async move {
StableWriteGuard(UnsafeSendFuture(unsafe { &*x }.lookup_mut(key)).await)
},
)
.await,
|x| match unsafe { &mut *(x as *mut Bucket<K, V>) } {
Bucket::Contains(_, ref mut val) => Ok(val),
_ => Err(()),
},
);
match handle {
Ok(inner) => Some(WriteGuard { inner }),
Err(_) => None,
}
})
}
pub async fn contains_key<Q: ?Sized>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Hash + PartialEq,
{
let lock = self.table.read().await;
let bucket = lock.lookup(key).await;
!bucket.is_free()
}
}
impl<K, V, S> CHashMap<K, V, S>
where
K: PartialEq + Hash,
S: BuildHasher + Default,
{
pub async fn insert_new(&self, key: K, val: V) {
debug_assert!(
!self.contains_key(&key).await,
"Hash table contains already key, contrary to \
the assumptions about `insert_new`'s arguments."
);
let lock = self.table.read().await;
{
let mut bucket = lock.find_free(&key).await;
*bucket = Bucket::Contains(key, val);
}
self.expand(lock).await;
}
pub async fn insert(&self, key: K, val: V) -> Option<V> {
let lock = self.table.read().await;
let ret = {
let mut bucket = lock.lookup_or_free(&key).await;
mem::replace(&mut *bucket, Bucket::Contains(key, val)).value()
};
if ret.is_none() {
self.expand(lock).await;
}
ret
}
pub async fn upsert<F, G>(&self, key: K, insert: F, update: G)
where
F: FnOnce() -> V,
G: FnOnce(&mut V),
{
let lock = self.table.read().await;
{
let mut bucket = lock.lookup_or_free(&key).await;
match *bucket {
Bucket::Contains(_, ref mut val) => {
update(val);
return;
}
ref mut x => *x = Bucket::Contains(key, insert()),
}
}
self.expand(lock).await;
}
pub async fn alter<F, Fut>(&self, key: K, f: F)
where
F: FnOnce(Option<V>) -> Fut,
Fut: std::future::Future<Output = Option<V>>,
{
let lock = self.table.read().await;
{
let mut bucket = lock.lookup_or_free(&key).await;
match mem::replace(&mut *bucket, Bucket::Removed) {
Bucket::Contains(_, val) => {
if let Some(new_val) = f(Some(val)).await {
*bucket = Bucket::Contains(key, new_val);
return;
} else {
self.len.fetch_sub(1, ORDERING);
}
return;
}
_ => {
if let Some(new_val) = f(None).await {
*bucket = Bucket::Contains(key, new_val);
} else {
return;
}
}
}
}
self.expand(lock).await;
}
pub async fn remove<Q: ?Sized>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: PartialEq + Hash,
{
let lock = self.table.read().await;
let mut bucket = lock.lookup_mut(key).await;
match &mut *bucket {
&mut Bucket::Removed | &mut Bucket::Empty => None,
bucket => {
self.len.fetch_sub(1, ORDERING);
mem::replace(bucket, Bucket::Removed).value()
}
}
}
pub async fn reserve(&self, additional: usize) {
let len = (self.len() + additional) * LENGTH_MULTIPLIER;
let mut lock = self.table.write().await;
if lock.buckets.len() < len {
let table = mem::replace(
&mut *lock,
Table::with_capacity_and_hasher(len, S::default()),
);
lock.fill(table);
}
}
pub async fn shrink_to_fit(&self) {
let mut lock = self.table.write().await;
let table = mem::replace(
&mut *lock,
Table::with_capacity_and_hasher(self.len(), S::default()),
);
lock.fill(table);
}
async fn expand(&self, lock: RwLockReadGuard<'_, Table<K, V, S>>) {
let len = self.len.fetch_add(1, ORDERING) + 1;
if len * MAX_LOAD_FACTOR_DENOM > lock.buckets.len() * MAX_LOAD_FACTOR_NUM {
drop(lock);
self.reserve(1).await;
}
}
}
impl<K, V, S: Default + BuildHasher> Default for CHashMap<K, V, S> {
fn default() -> Self {
CHashMap::with_hasher(S::default())
}
}
impl<K, V, S> IntoIterator for CHashMap<K, V, S> {
type Item = (K, V);
type IntoIter = IntoIter<K, V, S>;
fn into_iter(self) -> IntoIter<K, V, S> {
self.table.into_inner().into_iter()
}
}
impl<K: PartialEq + Hash, V, S: Default + BuildHasher> iter::FromIterator<(K, V)>
for CHashMap<K, V, S>
{
fn from_iter<I: IntoIterator<Item = (K, V)>>(iter: I) -> Self {
let vec: Vec<_> = iter.into_iter().collect();
let len = vec.len();
let mut table = Table::with_capacity_and_hasher(len, S::default());
for (key, val) in vec {
let bucket = table.find_free_no_lock(&key);
*bucket = Bucket::Contains(key, val);
}
CHashMap {
table: RwLock::new(table),
len: AtomicUsize::new(len),
}
}
}