pub mod maintenance_frequency;
use crate::error::BuildError;
use crate::handles::{AsyncCache, Cache};
use crate::loader::Loader;
use crate::metrics::Metrics;
use crate::policy::CachePolicy;
use crate::shared::CacheShared;
#[cfg(feature = "serde")]
use crate::snapshot::CacheSnapshot;
use crate::store::{hash_key, ShardedStore};
use crate::task::janitor::{Janitor, JanitorContext};
use crate::task::notifier::Notifier;
use crate::{time, EvictionListener, TaskSpawner};
use core::fmt;
use std::collections::HashMap;
use std::future::Future;
use std::hash::{BuildHasher, Hash};
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TimerWheelMode {
Default,
HighPrecisionShortLived,
LowPrecisionLongLived,
}
pub struct CacheBuilder<K: Send, V: Send, H = ahash::RandomState> {
pub(crate) capacity: u64,
pub(crate) shards: usize,
pub(crate) time_to_live: Option<Duration>,
pub(crate) time_to_idle: Option<Duration>,
pub(crate) hasher: H,
pub(crate) stale_while_revalidate: Option<Duration>,
pub(crate) janitor_tick_interval: Option<Duration>,
timer_wheel_tick_duration: Option<Duration>,
timer_wheel_size: Option<usize>,
listener: Option<Arc<dyn EvictionListener<K, V>>>,
policy_factory: Option<Arc<dyn Fn() -> Box<dyn CachePolicy<K, V>> + Send + Sync>>,
loader: Option<Loader<K, V>>,
spawner: Option<Arc<dyn TaskSpawner>>,
maintenance_probability_denominator: u32,
_key_marker: PhantomData<K>,
_value_marker: PhantomData<V>,
}
impl<K: Send, V: Send, H> fmt::Debug for CacheBuilder<K, V, H> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CacheBuilder")
.field("capacity", &self.capacity)
.field("shards", &self.shards)
.field("time_to_live", &self.time_to_live)
.field("time_to_idle", &self.time_to_idle)
.field("has_listener", &self.listener.is_some())
.finish_non_exhaustive()
}
}
impl<K: Send, V: Send, H> CacheBuilder<K, V, H> {
pub fn capacity(mut self, capacity: u64) -> Self {
self.capacity = capacity;
self
}
pub fn unbounded(mut self) -> Self {
self.capacity = u64::MAX;
self
}
pub fn shards(mut self, shards: usize) -> Self {
self.shards = shards.max(1).next_power_of_two();
self
}
pub fn time_to_live(mut self, duration: Duration) -> Self {
self.time_to_live = Some(duration);
self
}
pub fn time_to_idle(mut self, duration: Duration) -> Self {
self.time_to_idle = Some(duration);
self
}
pub fn eviction_listener<Listener>(mut self, listener: Listener) -> Self
where
Listener: EvictionListener<K, V> + 'static,
{
self.listener = Some(Arc::new(listener));
self
}
pub fn cache_policy_factory<F>(mut self, factory: F) -> Self
where
F: Fn() -> Box<dyn CachePolicy<K, V>> + Send + Sync + 'static,
{
self.policy_factory = Some(Arc::new(factory));
self
}
pub fn loader(mut self, f: impl Fn(K) -> (V, u64) + Send + Sync + 'static) -> Self {
self.loader = Some(Loader::Sync(Arc::new(f)));
self
}
pub fn async_loader<F, Fut>(mut self, f: F) -> Self
where
F: Fn(K) -> Fut + Send + Sync + 'static,
Fut: Future<Output = (V, u64)> + Send + 'static,
{
let loader_fn = move |key| Box::pin(f(key)) as Pin<Box<dyn Future<Output = (V, u64)> + Send>>;
self.loader = Some(Loader::Async(Arc::new(loader_fn)));
self
}
pub fn spawner(mut self, spawner: Arc<dyn TaskSpawner>) -> Self {
self.spawner = Some(spawner);
self
}
#[doc(hidden)] pub fn janitor_tick_interval(mut self, duration: Duration) -> Self {
self.janitor_tick_interval = Some(duration);
self
}
pub fn timer_mode(mut self, mode: TimerWheelMode) -> Self {
let (size, duration) = match mode {
TimerWheelMode::Default => (60, Duration::from_secs(1)),
TimerWheelMode::HighPrecisionShortLived => (100, Duration::from_millis(10)),
TimerWheelMode::LowPrecisionLongLived => (120, Duration::from_secs(30)),
};
self.timer_wheel_size = Some(size);
self.timer_wheel_tick_duration = Some(duration);
self
}
pub fn timer_tick_duration(mut self, duration: Duration) -> Self {
self.timer_wheel_tick_duration = Some(duration);
self
}
pub fn timer_wheel_size(mut self, size: usize) -> Self {
self.timer_wheel_size = Some(size);
self
}
pub fn maintenance_chance(mut self, denominator: u32) -> Self {
assert!(
denominator > 0,
"maintenance chance denominator cannot be zero"
);
self.maintenance_probability_denominator = denominator.next_power_of_two();
self
}
}
impl<K: Send, V: Send, H: BuildHasher + Default> CacheBuilder<K, V, H> {
pub fn new() -> Self {
Self {
capacity: u64::MAX,
shards: (num_cpus::get() * 8).max(1).next_power_of_two(),
time_to_live: None,
time_to_idle: None,
hasher: H::default(),
stale_while_revalidate: None,
janitor_tick_interval: None,
timer_wheel_tick_duration: None, timer_wheel_size: None, listener: None,
policy_factory: None,
loader: None,
spawner: None,
maintenance_probability_denominator: maintenance_frequency::RESPONSIVE,
_key_marker: PhantomData,
_value_marker: PhantomData,
}
}
}
impl<K: Send, V: Send> Default for CacheBuilder<K, V, ahash::RandomState> {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "rapidhash")]
impl<K: Send, V: Send> CacheBuilder<K, V, rapidhash::RapidRandomState> {
pub fn rapidhash() -> Self {
Self::new()
}
}
impl<K, V, H> CacheBuilder<K, V, H>
where
K: Eq + Hash + Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
H: BuildHasher + Clone + Send + Sync + 'static,
{
pub fn hasher(mut self, hasher: H) -> Self {
self.hasher = hasher;
self
}
pub fn stale_while_revalidate(mut self, duration: Duration) -> Self {
self.stale_while_revalidate = Some(duration);
self
}
pub fn build(mut self) -> Result<Cache<K, V, H>, BuildError> {
self.validate()?;
let shared = self.build_shared_core(None)?;
Ok(Cache { shared })
}
pub fn build_async(mut self) -> Result<AsyncCache<K, V, H>, BuildError> {
self.validate()?;
let shared = self.build_shared_core(None)?;
Ok(AsyncCache { shared })
}
pub(crate) fn build_shared_core(
&mut self,
#[cfg(feature = "serde")]
snapshot: Option<CacheSnapshot<K, V>>,
#[cfg(not(feature = "serde"))]
_snapshot: Option<()>,
) -> Result<Arc<CacheShared<K, V, H>>, BuildError> {
let mut spawner = self.spawner.take();
if matches!(self.loader, Some(Loader::Async(_))) && spawner.is_none() {
#[cfg(feature = "tokio")]
{
spawner = Some(Arc::new(crate::runtime::TokioSpawner::new()));
}
#[cfg(not(feature = "tokio"))]
{
return Err(BuildError::SpawnerRequired);
}
}
let has_timer_logic = self.time_to_live.is_some() || self.time_to_idle.is_some();
let tick_duration = self
.timer_wheel_tick_duration
.unwrap_or(Duration::from_secs(1));
let wheel_size = self.timer_wheel_size.unwrap_or(60);
let store = Arc::new(ShardedStore::new(
self.shards,
self.hasher.clone(),
wheel_size,
tick_duration,
has_timer_logic,
));
let metrics = Arc::new(Metrics::new());
let factory = self.policy_factory.take().unwrap_or_else(|| {
let capacity = self.capacity;
let shards = self.shards;
Arc::new(move || {
if capacity == u64::MAX {
Box::new(crate::policy::null::NullPolicy)
} else {
let shard_capacity = (capacity as f64 / shards as f64).ceil() as u64;
Box::new(crate::policy::tinylfu::TinyLfuPolicy::new(shard_capacity))
}
})
});
let cache_policy: Box<[Arc<dyn CachePolicy<K, V>>]> = (0..self.shards)
.map(|_| Arc::from(factory()))
.collect::<Vec<_>>()
.into_boxed_slice();
let (notifier, notification_sender) = if let Some(listener) = &self.listener {
let (notifier, sender) = Notifier::spawn(listener.clone());
(Some(notifier), Some(sender))
} else {
(None, None)
};
#[cfg(feature = "serde")]
if let Some(snap) = snapshot {
let now_duration = time::now_duration();
let mut total_cost = 0;
let mut entries_by_shard: Vec<HashMap<K, Arc<crate::entry::CacheEntry<V>>, H>> = (0..self
.shards)
.map(|_| HashMap::with_hasher(self.hasher.clone()))
.collect();
for p_entry in snap.entries {
let expires_at = p_entry.ttl_remaining.map(|ttl| now_duration + ttl);
let entry = crate::entry::CacheEntry::new_with_expiry(
p_entry.value,
p_entry.cost,
expires_at,
self.time_to_idle,
);
total_cost += p_entry.cost;
let hash = hash_key(&self.hasher, &p_entry.key);
let index = hash as usize % self.shards;
entries_by_shard[index].insert(p_entry.key, Arc::new(entry));
}
metrics.current_cost.store(total_cost, Ordering::Relaxed);
for (i, entries) in entries_by_shard.into_iter().enumerate() {
if !entries.is_empty() {
let mut guard = store.shards[i].map.write();
*guard = entries;
}
}
}
let janitor_context = JanitorContext {
store: Arc::clone(&store),
metrics: Arc::clone(&metrics),
cache_policy: cache_policy.clone(),
capacity: self.capacity,
time_to_idle: self.time_to_idle,
notification_sender: notification_sender.as_ref().map(|val| val.clone()),
};
let maintenance_probability_denominator = self.maintenance_probability_denominator;
let janitor =
if self.time_to_live.is_some() || self.time_to_idle.is_some() || self.capacity != u64::MAX {
let tick_interval = self.janitor_tick_interval.unwrap_or(Duration::from_secs(1));
Some(Janitor::spawn(janitor_context, tick_interval, maintenance_probability_denominator))
} else {
None
};
let pending_loads = (0..self.shards)
.map(|_| crate::sync::HybridMutex::new(Default::default()))
.collect::<Vec<_>>()
.into_boxed_slice();
Ok(Arc::new(CacheShared {
store,
metrics,
cache_policy,
janitor,
capacity: self.capacity,
time_to_live: self.time_to_live,
time_to_idle: self.time_to_idle,
stale_while_revalidate: self.stale_while_revalidate,
notification_sender,
notifier,
loader: self.loader.take(),
pending_loads,
spawner,
maintenance_probability_denominator,
}))
}
pub(crate) fn validate(&self) -> Result<(), BuildError> {
if self.capacity == 0 {
return Err(BuildError::ZeroCapacity);
}
if self.shards == 0 {
return Err(BuildError::ZeroShards);
}
Ok(())
}
}