#![allow(clippy::type_complexity)]
mod lock;
use ahash::RandomState;
use core::borrow::Borrow;
use core::hash::{BuildHasher, Hash, Hasher};
use crossbeam_utils::CachePadded;
use hashbrown::raw::RawTable;
use lock::RwLock;
#[cfg(feature = "slot-cache")]
use std::ptr;
#[cfg(feature = "slot-cache")]
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU32, AtomicUsize, Ordering};
#[cfg(not(feature = "slot-cache"))]
type ShardMap<K, V> = RawTable<(K, V)>;
#[cfg(feature = "slot-cache")]
type ShardMap<K, V> = RawTable<Box<Entry<K, V>>>;
pub struct QuotaMap<K, V, S = RandomState> {
shift: usize,
shards: Box<[CachePadded<RwLock<ShardMap<K, V>>>]>,
hasher: S,
#[cfg(feature = "slot-cache")]
cache: Box<[CacheLine<K, V>]>,
#[cfg(feature = "slot-cache")]
cache_mask: usize,
#[cfg(feature = "slot-cache")]
retired_head: AtomicPtr<RetiredEntry<K, V>>,
}
#[cfg(feature = "slot-cache")]
struct Entry<K, V> {
key: K,
value: V,
generation: AtomicU32,
deleted: AtomicBool,
}
#[cfg(feature = "slot-cache")]
struct CacheLine<K, V> {
hash: AtomicUsize,
generation: AtomicU32,
ptr: AtomicPtr<Entry<K, V>>,
}
#[cfg(feature = "slot-cache")]
struct RetiredEntry<K, V> {
entry: *mut Entry<K, V>,
next: AtomicPtr<RetiredEntry<K, V>>,
}
impl<K, V> QuotaMap<K, V, RandomState>
where
K: Eq + Hash,
{
#[inline]
pub fn with_capacity(capacity: usize) -> Self {
Self::with_capacity_and_hasher(capacity, RandomState::default())
}
}
impl<K, V, S> QuotaMap<K, V, S>
where
K: Eq + Hash,
S: BuildHasher + Clone,
{
pub fn with_capacity_and_hasher(mut capacity: usize, hasher: S) -> Self {
let shard_amount = default_shard_amount();
let shift = usize::BITS as usize - shard_amount.trailing_zeros() as usize;
if capacity != 0 {
capacity = (capacity + (shard_amount - 1)) & !(shard_amount - 1);
}
let per_shard = capacity / shard_amount;
let shards = (0..shard_amount)
.map(|_| CachePadded::new(RwLock::new(ShardMap::with_capacity(per_shard))))
.collect();
Self {
shift,
shards,
hasher,
#[cfg(feature = "slot-cache")]
cache: new_cache(capacity),
#[cfg(feature = "slot-cache")]
cache_mask: cache_size(capacity) - 1,
#[cfg(feature = "slot-cache")]
retired_head: AtomicPtr::new(ptr::null_mut()),
}
}
#[inline]
pub fn len(&self) -> usize {
self.shards.iter().map(|shard| shard.read().len()).sum()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn contains_key<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let hash = self.hash_key(key);
let shard = self.shard(hash).read();
self.find(&shard, hash, key).is_some()
}
#[inline]
pub fn with_or_insert<Q, R>(
&self,
key: &Q,
init: impl FnOnce(K) -> V,
f: impl FnOnce(&V) -> R,
) -> R
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
#[cfg(feature = "slot-cache")]
{
self.with_or_insert_cached(key, init, f)
}
#[cfg(not(feature = "slot-cache"))]
{
self.with_or_insert_uncached(key, init, f)
}
}
#[inline]
pub fn remove<Q>(&self, key: &Q) -> bool
where
K: Borrow<Q>,
Q: Eq + Hash + ?Sized,
{
let hash = self.hash_key(key);
let mut shard = self.shard(hash).write();
#[cfg(feature = "slot-cache")]
{
if let Some(entry) = shard.remove_entry(hash, |entry| entry.key.borrow() == key) {
entry.deleted.store(true, Ordering::Release);
entry.generation.store(
next_generation(entry.generation.load(Ordering::Relaxed)),
Ordering::Release,
);
self.invalidate_cache(hash, &entry);
self.retire(entry);
return true;
}
}
#[cfg(not(feature = "slot-cache"))]
{
if shard
.remove_entry(hash, |(entry_key, _)| entry_key.borrow() == key)
.is_some()
{
return true;
}
}
false
}
#[cfg(not(feature = "slot-cache"))]
#[inline]
fn with_or_insert_uncached<Q, R>(
&self,
key: &Q,
init: impl FnOnce(K) -> V,
f: impl FnOnce(&V) -> R,
) -> R
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
let hash = self.hash_key(key);
let shard_index = self.determine_shard(hash as usize);
{
let shard = self.shards[shard_index].read();
if let Some((_, value)) = self.find(&shard, hash, key) {
return f(value);
}
}
let owned = key.to_owned();
let mut shard = self.shards[shard_index].write();
if let Some((_, value)) = self.find(&shard, hash, key) {
return f(value);
}
let value = init(owned);
let entry = shard.insert_entry(hash, (key.to_owned(), value), |(entry_key, _)| {
self.hash_owned(entry_key)
});
f(&entry.1)
}
#[cfg(feature = "slot-cache")]
#[inline]
fn with_or_insert_cached<Q, R>(
&self,
key: &Q,
init: impl FnOnce(K) -> V,
f: impl FnOnce(&V) -> R,
) -> R
where
K: Borrow<Q>,
Q: Eq + Hash + ToOwned<Owned = K> + ?Sized,
{
let hash = self.hash_key(key);
if let Some(entry) = self.cached_entry(hash, key) {
return f(&entry.value);
}
let shard_index = self.determine_shard(hash as usize);
{
let shard = self.shards[shard_index].read();
if let Some(entry) = self.find(&shard, hash, key) {
self.cache_entry(hash, entry);
return f(&entry.value);
}
}
let owned = key.to_owned();
let mut shard = self.shards[shard_index].write();
if let Some(entry) = self.find(&shard, hash, key) {
self.cache_entry(hash, entry);
return f(&entry.value);
}
let entry = Box::new(Entry {
key: owned,
value: init(key.to_owned()),
generation: AtomicU32::new(1),
deleted: AtomicBool::new(false),
});
let entry = shard.insert_entry(hash, entry, |entry| self.hash_owned(&entry.key));
let entry = entry.as_ref();
self.cache_entry(hash, entry);
f(&entry.value)
}
#[cfg(not(feature = "slot-cache"))]
#[inline]
fn find<'a, Q>(&self, shard: &'a ShardMap<K, V>, hash: u64, key: &Q) -> Option<&'a (K, V)>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
shard
.find(hash, |(entry_key, _)| entry_key.borrow() == key)
.map(|bucket| unsafe { bucket.as_ref() })
}
#[cfg(feature = "slot-cache")]
#[inline]
fn find<'a, Q>(&self, shard: &'a ShardMap<K, V>, hash: u64, key: &Q) -> Option<&'a Entry<K, V>>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
shard
.find(hash, |entry| entry.key.borrow() == key)
.map(|bucket| unsafe { bucket.as_ref().as_ref() })
}
#[inline]
fn shard(&self, hash: u64) -> &RwLock<ShardMap<K, V>> {
&self.shards[self.determine_shard(hash as usize)]
}
#[inline]
fn determine_shard(&self, hash: usize) -> usize {
(hash << 7) >> self.shift
}
#[inline]
fn hash_key<Q>(&self, key: &Q) -> u64
where
Q: Hash + ?Sized,
{
let mut hasher = self.hasher.build_hasher();
key.hash(&mut hasher);
hasher.finish()
}
#[inline]
fn hash_owned(&self, key: &K) -> u64 {
self.hash_key(key)
}
#[cfg(feature = "slot-cache")]
#[inline]
fn cached_entry<Q>(&self, hash: u64, key: &Q) -> Option<&Entry<K, V>>
where
K: Borrow<Q>,
Q: Eq + ?Sized,
{
let line = &self.cache[(hash as usize) & self.cache_mask];
if line.hash.load(Ordering::Acquire) != hash as usize {
return None;
}
let ptr = line.ptr.load(Ordering::Acquire);
if ptr.is_null() {
return None;
}
let generation = line.generation.load(Ordering::Relaxed);
let entry = unsafe { &*ptr };
if entry.generation.load(Ordering::Acquire) != generation {
return None;
}
if entry.deleted.load(Ordering::Acquire) {
return None;
}
if entry.key.borrow() == key {
Some(entry)
} else {
None
}
}
#[cfg(feature = "slot-cache")]
#[inline]
fn cache_entry(&self, hash: u64, entry: &Entry<K, V>) {
let line = &self.cache[(hash as usize) & self.cache_mask];
line.ptr.store(
entry as *const Entry<K, V> as *mut Entry<K, V>,
Ordering::Relaxed,
);
line.generation
.store(entry.generation.load(Ordering::Relaxed), Ordering::Relaxed);
line.hash.store(hash as usize, Ordering::Release);
}
#[cfg(feature = "slot-cache")]
#[inline]
fn invalidate_cache(&self, hash: u64, entry: &Entry<K, V>) {
let line = &self.cache[(hash as usize) & self.cache_mask];
if line.hash.load(Ordering::Acquire) != hash as usize {
return;
}
let ptr = line.ptr.load(Ordering::Relaxed);
let entry_ptr = entry as *const Entry<K, V> as *mut Entry<K, V>;
if ptr == entry_ptr {
line.ptr.store(ptr::null_mut(), Ordering::Relaxed);
line.hash.store(0, Ordering::Release);
}
}
#[cfg(feature = "slot-cache")]
#[inline]
fn retire(&self, entry: Box<Entry<K, V>>) {
let node = Box::into_raw(Box::new(RetiredEntry {
entry: Box::into_raw(entry),
next: AtomicPtr::new(ptr::null_mut()),
}));
let mut head = self.retired_head.load(Ordering::Acquire);
loop {
unsafe {
(*node).next.store(head, Ordering::Relaxed);
}
match self.retired_head.compare_exchange_weak(
head,
node,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return,
Err(actual) => head = actual,
}
}
}
}
#[cfg(feature = "slot-cache")]
impl<K, V, S> Drop for QuotaMap<K, V, S> {
fn drop(&mut self) {
let mut retired = self.retired_head.load(Ordering::Relaxed);
while !retired.is_null() {
let node = unsafe { Box::from_raw(retired) };
retired = node.next.load(Ordering::Relaxed);
unsafe {
drop(Box::from_raw(node.entry));
}
}
}
}
#[cfg(feature = "slot-cache")]
impl<K, V> CacheLine<K, V> {
#[inline]
fn empty() -> Self {
Self {
hash: AtomicUsize::new(0),
generation: AtomicU32::new(0),
ptr: AtomicPtr::new(ptr::null_mut()),
}
}
}
fn default_shard_amount() -> usize {
(std::thread::available_parallelism().map_or(1, usize::from) * 4).next_power_of_two()
}
#[cfg(feature = "slot-cache")]
fn new_cache<K, V>(capacity: usize) -> Box<[CacheLine<K, V>]> {
(0..cache_size(capacity))
.map(|_| CacheLine::empty())
.collect::<Vec<_>>()
.into_boxed_slice()
}
#[cfg(feature = "slot-cache")]
fn cache_size(capacity: usize) -> usize {
let base = capacity.max(64).next_power_of_two();
base.checked_mul(64).unwrap_or(1 << 20).clamp(1024, 1 << 20)
}
#[cfg(feature = "slot-cache")]
#[inline]
fn next_generation(generation: u32) -> u32 {
generation.wrapping_add(1).max(1)
}