#[cfg(test)]
mod test;
use crate::{
bbloom::Bloom,
error::CacheError,
metrics::{MetricType, Metrics},
sketch::CountMinSketch,
};
use parking_lot::Mutex;
use rand::seq::IteratorRandom;
use std::{
collections::{HashMap, hash_map::RandomState},
hash::BuildHasher,
sync::{
Arc,
atomic::{AtomicI64, Ordering},
},
};
const DEFAULT_SAMPLES: usize = 5;
macro_rules! impl_policy {
($policy: ident) => {
use crate::policy::{AddOutcome, DEFAULT_SAMPLES, PolicyPair};
impl<S: BuildHasher + Clone + 'static> $policy<S> {
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn collect_metrics(&mut self, metrics: Arc<Metrics>) {
self.metrics = metrics.clone();
self.inner.lock().set_metrics(metrics);
}
pub fn add(&self, key: u64, cost: i64) -> AddOutcome {
let mut inner = self.inner.lock();
let max_cost = inner.costs.get_max_cost();
if cost < 0 || cost > max_cost {
return AddOutcome::RejectedByCost;
}
if inner.costs.update(&key, cost) {
return AddOutcome::UpdatedExisting;
}
let mut room = inner.costs.room_left(cost);
if room >= 0 {
inner.costs.increment(key, cost);
self.metrics.add(MetricType::CostAdd, key, cost as u64);
return AddOutcome::Admitted {
victims: Vec::new(),
};
}
let inc_hits = inner.admit.estimate(key);
let mut sample = Vec::with_capacity(DEFAULT_SAMPLES);
let mut victims = Vec::new();
while room < 0 {
sample = inner.costs.fill_sample(sample);
let (mut min_key, mut min_hits, mut min_id, mut min_cost) = (0u64, i64::MAX, 0, 0i64);
sample.iter().enumerate().for_each(|(idx, pair)| {
let hits = inner.admit.estimate(pair.key);
if hits < min_hits {
min_key = pair.key;
min_hits = hits;
min_id = idx;
min_cost = pair.cost;
}
});
if inc_hits < min_hits {
self.metrics.add(MetricType::RejectSets, key, 1);
return AddOutcome::RejectedBySampling { victims };
}
inner.costs.remove(&min_key).map(|cost| {
self
.metrics
.add(MetricType::CostEvict, min_key, cost as u64);
self.metrics.add(MetricType::KeyEvict, min_key, 1);
});
let new_len = sample.len() - 1;
sample[min_id] = sample[new_len];
sample.drain(new_len..);
victims.push(PolicyPair::new(min_key, min_cost));
room = inner.costs.room_left(cost);
}
inner.costs.increment(key, cost);
self.metrics.add(MetricType::CostAdd, key, cost as u64);
AddOutcome::Admitted { victims }
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn contains(&self, k: &u64) -> bool {
let inner = self.inner.lock();
inner.costs.contains(k)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn remove(&self, k: &u64) {
let mut inner = self.inner.lock();
inner.costs.remove(k).map(|cost| {
self.metrics.add(MetricType::CostEvict, *k, cost as u64);
self.metrics.add(MetricType::KeyEvict, *k, 1);
});
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn cap(&self) -> i64 {
let inner = self.inner.lock();
inner.costs.get_max_cost() - inner.costs.used
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn update(&self, k: &u64, cost: i64) -> bool {
let mut inner = self.inner.lock();
inner.costs.update(k, cost)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn cost(&self, k: &u64) -> i64 {
let inner = self.inner.lock();
inner.costs.key_costs.get(k).map_or(-1, |cost| *cost)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn clear(&self) {
let mut inner = self.inner.lock();
inner.admit.clear();
inner.costs.clear();
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn max_cost(&self) -> i64 {
let inner = self.inner.lock();
inner.costs.get_max_cost()
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn update_max_cost(&self, mc: i64) {
let inner = self.inner.lock();
inner.costs.update_max_cost(mc)
}
}
};
}
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
mod sync;
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
pub(crate) use sync::LFUPolicy;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
mod r#async;
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
pub(crate) use r#async::AsyncLFUPolicy;
pub(crate) struct PolicyInner<S = RandomState> {
admit: TinyLFU,
costs: SampledLFU<S>,
}
pub(crate) enum AddOutcome {
Admitted { victims: Vec<PolicyPair> },
UpdatedExisting,
RejectedByCost,
RejectedBySampling { victims: Vec<PolicyPair> },
}
#[derive(Copy, Clone, Debug, Default)]
pub(crate) struct PolicyPair {
pub(crate) key: u64,
pub(crate) cost: i64,
}
impl PolicyPair {
#[cfg_attr(not(tarpaulin), inline(always))]
fn new(k: u64, c: i64) -> Self {
Self { key: k, cost: c }
}
}
impl From<(u64, i64)> for PolicyPair {
fn from(pair: (u64, i64)) -> Self {
Self {
key: pair.0,
cost: pair.1,
}
}
}
impl<S: BuildHasher + Clone + 'static> PolicyInner<S> {
#[cfg_attr(not(tarpaulin), inline(always))]
fn set_metrics(&mut self, metrics: Arc<Metrics>) {
self.costs.metrics = metrics;
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn with_hasher(ctrs: usize, max_cost: i64, hasher: S) -> Result<Arc<Mutex<Self>>, CacheError> {
let this = Self {
admit: TinyLFU::new(ctrs)?,
costs: SampledLFU::with_hasher(max_cost, hasher),
};
Ok(Arc::new(Mutex::new(this)))
}
}
pub(crate) struct SampledLFU<S = RandomState> {
samples: usize,
max_cost: AtomicI64,
used: i64,
key_costs: HashMap<u64, i64, S>,
metrics: Arc<Metrics>,
}
impl SampledLFU {
pub fn new(max_cost: i64) -> Self {
Self {
samples: DEFAULT_SAMPLES,
max_cost: AtomicI64::new(max_cost),
used: 0,
key_costs: HashMap::new(),
metrics: Arc::new(Metrics::new()),
}
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn with_samples(max_cost: i64, samples: usize) -> Self {
Self {
samples,
max_cost: AtomicI64::new(max_cost),
used: 0,
key_costs: HashMap::new(),
metrics: Arc::new(Metrics::new()),
}
}
}
impl<S: BuildHasher + Clone + 'static> SampledLFU<S> {
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn with_hasher(max_cost: i64, hasher: S) -> Self {
Self {
samples: DEFAULT_SAMPLES,
max_cost: AtomicI64::new(max_cost),
used: 0,
key_costs: HashMap::with_hasher(hasher),
metrics: Arc::new(Metrics::Noop),
}
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn with_samples_and_hasher(max_cost: i64, samples: usize, hasher: S) -> Self {
Self {
samples,
max_cost: AtomicI64::new(max_cost),
used: 0,
key_costs: HashMap::with_hasher(hasher),
metrics: Arc::new(Metrics::Noop),
}
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn update_max_cost(&self, mc: i64) {
self.max_cost.store(mc, Ordering::SeqCst);
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn get_max_cost(&self) -> i64 {
self.max_cost.load(Ordering::SeqCst)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn room_left(&self, cost: i64) -> i64 {
self.get_max_cost() - (self.used + cost)
}
pub fn fill_sample(&mut self, mut pairs: Vec<PolicyPair>) -> Vec<PolicyPair> {
if pairs.len() >= self.samples {
return pairs;
}
let need = self.samples - pairs.len();
let mut rng = rand::rng();
for (k, v) in self
.key_costs
.iter()
.map(|(k, v)| (*k, *v))
.sample(&mut rng, need)
{
pairs.push(PolicyPair::new(k, v));
}
pairs
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn increment(&mut self, key: u64, cost: i64) {
self.key_costs.insert(key, cost);
self.used += cost;
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn remove(&mut self, kh: &u64) -> Option<i64> {
self.key_costs.remove(kh).inspect(|&cost| {
self.used -= cost;
})
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn contains(&self, k: &u64) -> bool {
self.key_costs.contains_key(k)
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn clear(&mut self) {
self.used = 0;
self.key_costs.clear();
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn update(&mut self, k: &u64, cost: i64) -> bool {
if cost < 0 || cost > self.get_max_cost() {
return false;
}
match self.key_costs.get_mut(k) {
None => false,
Some(prev) => {
let prev_val = *prev;
let k = *k;
if self.metrics.is_op() {
self.metrics.add(MetricType::KeyUpdate, k, 1);
match prev_val.cmp(&cost) {
std::cmp::Ordering::Less => {
let diff = (cost - prev_val) as u64;
self.metrics.add(MetricType::CostAdd, k, diff);
}
std::cmp::Ordering::Equal => {}
std::cmp::Ordering::Greater => {
let diff = (prev_val - cost) as u64;
self.metrics.add(MetricType::CostEvict, k, diff);
}
}
}
self.used += cost - prev_val;
*prev = cost;
true
}
}
}
}
pub(crate) struct TinyLFU {
ctr: CountMinSketch,
doorkeeper: Bloom,
samples: usize,
w: usize,
}
impl TinyLFU {
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn new(num_ctrs: usize) -> Result<Self, CacheError> {
Ok(Self {
ctr: CountMinSketch::new(num_ctrs as u64)?,
doorkeeper: Bloom::new(num_ctrs, 0.01),
samples: num_ctrs,
w: 0,
})
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn estimate(&self, kh: u64) -> i64 {
let mut hits = self.ctr.estimate(kh);
if self.doorkeeper.contains(kh) {
hits += 1;
}
hits
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn increments(&mut self, khs: Vec<u64>) {
khs.iter().for_each(|k| self.increment(*k))
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn increment(&mut self, kh: u64) {
if !self.doorkeeper.contains_or_add(kh) {
self.ctr.increment(kh);
}
self.try_reset();
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn try_reset(&mut self) {
self.w += 1;
if self.w >= self.samples {
self.reset();
}
}
#[cfg_attr(not(tarpaulin), inline(always))]
fn reset(&mut self) {
self.w = 0;
self.doorkeeper.reset();
self.ctr.reset();
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn clear(&mut self) {
self.w = 0;
self.doorkeeper.clear();
self.ctr.clear();
}
#[cfg_attr(not(tarpaulin), inline(always))]
pub fn contains(&self, kh: u64) -> bool {
self.doorkeeper.contains(kh)
}
}