#![allow(clippy::type_complexity)]
use std::collections::{hash_map::DefaultHasher, HashSet};
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, AtomicU8, AtomicUsize, Ordering};
use std::fmt::{self, Debug, Formatter};
use std::mem::ManuallyDrop;
use intrusive_collections::LinkedList;
#[allow(unused_imports)]
pub use log::{debug, error, info, trace, warn};
use parking_method::{Mutex, MutexGuard};
use crate::entry::EntryAdapter;
use crate::Entry;
use crate::KeyTraits;
use crate::UnsafeRef;
pub(crate) struct Bucket<K, V>
where
K: KeyTraits,
{
map: ManuallyDrop<Mutex<HashSet<Pin<Box<Entry<K, V>>>>>>,
lru_list: ManuallyDrop<Mutex<LinkedList<EntryAdapter<K, V>>>>,
pub(crate) cached: AtomicUsize,
pub(crate) cache_target: AtomicU8,
pub(crate) target_countdown: AtomicU32,
pub(crate) target_cooldown: AtomicU32,
pub(crate) highwater: AtomicUsize,
pub(crate) max_capacity_limit: AtomicUsize,
pub(crate) min_capacity_limit: AtomicUsize,
pub(crate) max_cache_percent: AtomicU8,
pub(crate) min_cache_percent: AtomicU8,
pub(crate) evict_batch: AtomicU8,
}
impl<K, V> Drop for Bucket<K, V>
where
K: KeyTraits,
{
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.lru_list);
ManuallyDrop::drop(&mut self.map);
}
}
}
impl<K, V> Bucket<K, V>
where
K: KeyTraits,
{
pub(crate) fn new() -> Self {
Self {
map: ManuallyDrop::new(Mutex::new(HashSet::new())),
lru_list: ManuallyDrop::new(Mutex::new(LinkedList::new(EntryAdapter::new()))),
cached: AtomicUsize::new(0),
cache_target: AtomicU8::new(50),
highwater: AtomicUsize::new(usize::MAX),
target_countdown: AtomicU32::new(0),
target_cooldown: AtomicU32::new(100),
max_capacity_limit: AtomicUsize::new(10000000),
min_capacity_limit: AtomicUsize::new(1000),
max_cache_percent: AtomicU8::new(60),
min_cache_percent: AtomicU8::new(5),
evict_batch: AtomicU8::new(16),
}
}
pub(crate) fn lock_map(&self) -> MutexGuard<HashSet<Pin<Box<Entry<K, V>>>>> {
self.map.lock()
}
pub(crate) fn lock_lru(&self) -> MutexGuard<LinkedList<EntryAdapter<K, V>>> {
self.lru_list.lock()
}
pub(crate) fn use_entry(
&self,
mut lru_lock: MutexGuard<LinkedList<EntryAdapter<K, V>>>,
entry: &Entry<K, V>,
) {
if entry.lru_link.is_linked() {
unsafe { lru_lock.cursor_mut_from_ptr(entry).remove() };
self.cached.fetch_sub(1, Ordering::Relaxed);
}
entry.expire.store(false, Ordering::Relaxed);
}
pub(crate) fn unuse_entry(
&self,
mut lru_lock: MutexGuard<LinkedList<EntryAdapter<K, V>>>,
entry: &Entry<K, V>,
) {
if !entry.value.is_locked() && !entry.lru_link.is_linked() {
self.cached.fetch_add(1, Ordering::Relaxed);
if !entry.expire.load(Ordering::Relaxed) {
lru_lock.push_back(unsafe { UnsafeRef::from_raw(entry) });
} else {
lru_lock.push_front(unsafe { UnsafeRef::from_raw(entry) });
}
}
}
pub(crate) fn remove(&self, key: &K) {
let mut map_lock = self.lock_map();
if let Some(entry) = map_lock.get(key) {
let mut lru_lock = self.lock_lru();
if entry.lru_link.is_linked() {
unsafe { lru_lock.cursor_mut_from_ptr(&**entry).remove() };
map_lock.remove(key);
self.cached.fetch_sub(1, Ordering::Relaxed);
} else {
entry.expire.store(true, Ordering::Relaxed);
}
}
}
pub(crate) fn enlist_entry(
&self,
mut lru_lock: MutexGuard<LinkedList<EntryAdapter<K, V>>>,
entry: &Entry<K, V>,
) {
if !entry.lru_link.is_linked() {
self.cached.fetch_add(1, Ordering::Relaxed);
lru_lock.push_back(unsafe { UnsafeRef::from_raw(entry) });
}
}
pub(crate) fn maybe_evict(&self, map_lock: &mut MutexGuard<HashSet<Pin<Box<Entry<K, V>>>>>) {
let min_capacity_limit = self.min_capacity_limit.load(Ordering::Relaxed);
let min_cache_percent = self.min_cache_percent.load(Ordering::Relaxed);
let capacity = map_lock.capacity();
let countdown = self.target_countdown.load(Ordering::Relaxed);
if countdown > 0 {
self.target_countdown
.store(countdown - 1, Ordering::Relaxed)
} else {
let max_capacity_limit = self.max_capacity_limit.load(Ordering::Relaxed);
let max_cache_percent = self.max_cache_percent.load(Ordering::Relaxed);
self.target_countdown.store(
self.target_cooldown.load(Ordering::Relaxed),
Ordering::Relaxed,
);
let cache_target = if capacity > max_capacity_limit {
min_cache_percent
} else if capacity < min_capacity_limit {
max_cache_percent
} else {
((max_cache_percent as usize * (max_capacity_limit - capacity)
+ min_cache_percent as usize * (capacity - min_capacity_limit))
/ (max_capacity_limit - min_capacity_limit)) as u8
};
self.cache_target.store(cache_target, Ordering::Relaxed);
}
let cached = self.cached.load(Ordering::Relaxed);
let percent_cached = if capacity > 0 {
(cached * 100 / capacity) as u8
} else {
0
};
if map_lock.len() > self.highwater.load(Ordering::Relaxed)
|| (capacity > min_capacity_limit
&& percent_cached > self.cache_target.load(Ordering::Relaxed))
{
self.evict(self.evict_batch.load(Ordering::Relaxed) as usize, map_lock);
}
}
pub(crate) fn evict(
&self,
n: usize,
map_lock: &mut MutexGuard<HashSet<Pin<Box<Entry<K, V>>>>>,
) -> usize {
#[cfg(feature = "logging")]
debug!("evicting {} elements", n);
for i in 0..n {
if let Some(entry) = self.lru_list.lock().pop_front() {
map_lock.remove(&entry.key);
self.cached.fetch_sub(1, Ordering::Relaxed);
} else {
return i;
}
}
n
}
}
impl<K, V> Debug for Bucket<K, V>
where
K: KeyTraits,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let map_lock = self.lock_map();
f.debug_struct("Bucket")
.field("map.len()", &map_lock.len())
.field("map.capacity()", &map_lock.capacity())
.field("cached", &self.cached.load(Ordering::Relaxed))
.field("cache_target", &self.cache_target.load(Ordering::Relaxed))
.field(
"max_capacity_limit",
&self.max_capacity_limit.load(Ordering::Relaxed),
)
.field(
"min_capacity_limit",
&self.min_capacity_limit.load(Ordering::Relaxed),
)
.field(
"max_cache_percent",
&self.max_cache_percent.load(Ordering::Relaxed),
)
.field(
"min_cache_percent",
&self.min_cache_percent.load(Ordering::Relaxed),
)
.field("evict_batch", &self.evict_batch.load(Ordering::Relaxed))
.finish()
}
}
pub trait Bucketize: Hash {
fn bucket(&self) -> usize {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
hasher.finish() as usize
}
fn bucket_range<const N: usize>(&self) -> usize {
self.bucket() % N
}
}