use crate::{BufferCache, CacheEntry};
use feldera_types::config::dev_tweaks::BufferCacheStrategy;
use quick_cache::{OptionsBuilder, Weighter, sync::Cache as QuickCache};
use std::any::Any;
use std::hash::{BuildHasher, Hash, RandomState};
const MIN_ESTIMATED_ITEMS_PER_SHARD: usize = 32;
#[derive(Clone, Copy, Default)]
struct CacheEntryWeighter;
impl<K, V> Weighter<K, V> for CacheEntryWeighter
where
V: CacheEntry,
{
fn weight(&self, _key: &K, value: &V) -> u64 {
value.cost() as u64
}
}
pub struct S3FifoCache<K, V, S = RandomState> {
cache: QuickCache<K, V, CacheEntryWeighter, S>,
#[cfg(test)]
hash_builder: S,
}
impl<K, V, S> S3FifoCache<K, V, S> {
pub const DEFAULT_SHARDS: usize = 256;
}
impl<K, V> S3FifoCache<K, V, RandomState>
where
K: Eq + Hash + Clone,
V: CacheEntry + Clone,
{
pub fn new(total_capacity_bytes: usize) -> Self {
Self::with_hasher(
total_capacity_bytes,
S3FifoCache::<K, V>::DEFAULT_SHARDS,
RandomState::new(),
)
}
pub fn with_shards(total_capacity_bytes: usize, num_shards: usize) -> Self {
Self::with_hasher(total_capacity_bytes, num_shards, RandomState::new())
}
}
#[allow(clippy::len_without_is_empty)]
impl<K, V, S> S3FifoCache<K, V, S>
where
K: Eq + Hash + Clone,
V: CacheEntry + Clone,
S: BuildHasher + Clone,
{
pub fn with_hasher(total_capacity_bytes: usize, num_shards: usize, hash_builder: S) -> Self {
assert!(num_shards > 0, "num_shards must be > 0");
assert!(
num_shards.is_power_of_two(),
"num_shards must be a power of two"
);
let options = OptionsBuilder::new()
.shards(num_shards)
.estimated_items_capacity(minimum_estimated_items(num_shards))
.weight_capacity(total_capacity_bytes as u64)
.build()
.expect("valid quick_cache options");
Self {
#[cfg(test)]
hash_builder: hash_builder.clone(),
cache: QuickCache::with_options(
options,
CacheEntryWeighter,
hash_builder,
Default::default(),
),
}
}
pub fn insert(&self, key: K, value: V) {
self.cache.insert(key, value);
}
pub fn get(&self, key: &K) -> Option<V> {
self.cache.get(key)
}
pub fn remove(&self, key: &K) -> Option<V> {
self.cache.remove(key).map(|(_, value)| value)
}
pub fn remove_if<F>(&self, predicate: F)
where
F: Fn(&K) -> bool,
{
self.cache.retain(|key, _value| !predicate(key))
}
pub fn contains_key(&self, key: &K) -> bool {
self.cache.contains_key(key)
}
pub fn len(&self) -> usize {
self.cache.len()
}
pub fn total_charge(&self) -> usize {
self.cache.weight() as usize
}
pub fn total_capacity(&self) -> usize {
self.cache.capacity() as usize
}
pub fn shard_count(&self) -> usize {
self.cache.num_shards()
}
#[cfg(test)]
pub fn shard_usage(&self, idx: usize) -> (usize, usize) {
assert!(idx < self.shard_count(), "shard index out of bounds");
let used = self
.cache
.iter()
.filter(|(key, _value)| self.shard_index(key) == idx)
.map(|(_key, value)| value.cost())
.sum();
(used, self.cache.shard_capacity() as usize)
}
#[cfg(test)]
pub(crate) fn validate_invariants(&self) {
let shard_count = self.shard_count();
let mut shard_usage = vec![0usize; shard_count];
let mut total_len = 0usize;
let mut total_charge = 0usize;
for (key, value) in self.cache.iter() {
let shard_idx = self.shard_index(&key);
assert!(shard_idx < shard_count, "invalid backend shard index");
let weight = value.cost();
shard_usage[shard_idx] += weight;
total_len += 1;
total_charge += weight;
}
for (idx, used) in shard_usage.into_iter().enumerate() {
let (reported_used, reported_capacity) = self.shard_usage(idx);
assert_eq!(reported_used, used, "per-shard charge mismatch");
assert!(
used <= reported_capacity,
"used {} exceeds capacity {}",
used,
reported_capacity
);
}
assert_eq!(total_len, self.len(), "global resident count mismatch");
assert_eq!(total_charge, self.total_charge(), "global charge mismatch");
assert!(
total_charge <= self.total_capacity(),
"total charge exceeds backend capacity"
);
}
#[cfg(test)]
pub(crate) fn shard_index(&self, key: &K) -> usize {
let shard_mask = (self.shard_count() - 1) as u64;
(self
.hash_builder
.hash_one(key)
.rotate_right(usize::BITS / 2)
& shard_mask) as usize
}
}
fn minimum_estimated_items(num_shards: usize) -> usize {
num_shards.saturating_mul(MIN_ESTIMATED_ITEMS_PER_SHARD)
}
impl<K, V, S> BufferCache<K, V> for S3FifoCache<K, V, S>
where
K: Eq + Hash + Clone + Send + Sync + 'static,
V: CacheEntry + Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
fn as_any(&self) -> &dyn Any {
self
}
fn strategy(&self) -> BufferCacheStrategy {
BufferCacheStrategy::S3Fifo
}
fn insert(&self, key: K, value: V) {
self.insert(key, value);
}
fn get(&self, key: K) -> Option<V> {
self.get(&key)
}
fn remove(&self, key: &K) -> Option<V> {
self.remove(key)
}
fn remove_if(&self, predicate: &dyn Fn(&K) -> bool) {
self.remove_if(|key| predicate(key))
}
fn contains_key(&self, key: &K) -> bool {
self.contains_key(key)
}
fn len(&self) -> usize {
self.len()
}
fn total_charge(&self) -> usize {
self.total_charge()
}
fn total_capacity(&self) -> usize {
self.total_capacity()
}
fn shard_count(&self) -> usize {
self.shard_count()
}
#[cfg(test)]
fn shard_usage(&self, idx: usize) -> (usize, usize) {
self.shard_usage(idx)
}
}