use crate::entry::CacheEntry;
use crate::policy::AccessEvent;
use crate::rng::FastRng;
use crate::sync::{HybridMutex, HybridRwLock};
use crate::task::access_batcher::AccessBatcher;
use crate::task::timer::TimerWheel;
use core::fmt;
use std::borrow::Borrow;
use equivalent::Equivalent;
use fibre::mpsc;
use rand::Rng;
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::Arc;
use std::time::Duration;
use crossbeam_utils::CachePadded;
#[inline]
pub(crate) fn hash_key<K: Hash + ?Sized, H: BuildHasher>(hasher: &H, key: &K) -> u64 {
let mut state = hasher.build_hasher();
key.hash(&mut state);
state.finish()
}
const ACCESS_EVENT_CHANNEL_BUFFER: usize = 512;
pub type AccessEventSender<K> = fibre::mpsc::BoundedSender<AccessEvent<K>>;
pub type AccessEventReceiver<K> = fibre::mpsc::BoundedReceiver<AccessEvent<K>>;
pub(crate) struct Shard<K: Send, V, H> {
pub(crate) map: HybridRwLock<HashMap<K, Arc<CacheEntry<V>>, H>>,
pub(crate) timer_wheel: Option<TimerWheel>,
pub(crate) event_buffer_tx: AccessEventSender<K>,
pub(crate) event_buffer_rx: AccessEventReceiver<K>,
pub(crate) read_access_batcher: AccessBatcher<K>,
pub(crate) maintenance_lock: HybridMutex<()>,
pub(crate) rng: FastRng,
}
pub(crate) struct ShardedStore<K: Send, V, H> {
pub(crate) shards: Box<[CachePadded<Shard<K, V, H>>]>,
pub(crate) hasher: H,
}
impl<K: Send, V, H> fmt::Debug for ShardedStore<K, V, H> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ShardedStore")
.field("num_shards", &self.shards.len())
.finish()
}
}
impl<K, V, H> ShardedStore<K, V, H>
where
K: Eq + Hash + Send,
V: Send,
H: BuildHasher + Clone,
{
pub(crate) fn new(
num_shards: usize,
hasher: H,
timer_wheel_size: usize,
timer_tick_duration: Duration,
has_timer_logic: bool,
) -> Self {
let mut shards = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
let shard_map = HashMap::with_hasher(hasher.clone());
let lock = HybridRwLock::new(shard_map);
let (tx, rx) = mpsc::bounded(ACCESS_EVENT_CHANNEL_BUFFER);
let timer_wheel = if has_timer_logic {
Some(TimerWheel::new(timer_wheel_size, timer_tick_duration))
} else {
None
};
let initial_seed = rand::rng().random();
let shard = Shard {
map: lock,
timer_wheel,
event_buffer_tx: tx,
event_buffer_rx: rx,
read_access_batcher: AccessBatcher::new(),
maintenance_lock: HybridMutex::new(()),
rng: FastRng::new(initial_seed),
};
shards.push(CachePadded::new(shard));
}
Self {
shards: shards.into_boxed_slice(),
hasher,
}
}
}
impl<K, V, H> ShardedStore<K, V, H>
where
K: Hash + Send,
H: BuildHasher,
{
#[inline]
pub(crate) fn get_shard_index<Q>(&self, key: &Q) -> usize
where
Q: Hash + Equivalent<K> + ?Sized,
{
let hash = hash_key(&self.hasher, key);
hash as usize & (self.shards.len() - 1)
}
#[inline]
pub(crate) fn get_shard<Q>(&self, key: &Q) -> &Shard<K, V, H>
where
K: Borrow<Q>,
Q: Eq + Hash + Equivalent<K> + ?Sized,
{
let hash = hash_key(&self.hasher, key);
let index = hash as usize & (self.shards.len() - 1);
&self.shards[index]
}
pub(crate) fn iter_shards(&self) -> impl Iterator<Item = &Shard<K, V, H>> {
self.shards.iter().map(|padded_shard| &**padded_shard)
}
}