use crate::ThreadType;
use crate::{CacheEntry, LruCache, S3FifoCache, SharedBufferCache};
use enum_map::{Enum, EnumMap};
use feldera_types::config::dev_tweaks::{BufferCacheAllocationStrategy, BufferCacheStrategy};
use std::fmt::Debug;
use std::hash::{BuildHasher, Hash, RandomState};
use std::marker::PhantomData;
use tracing::warn;
pub struct BufferCacheBuilder<K, V, S = RandomState> {
strategy: BufferCacheStrategy,
max_buckets: Option<usize>,
allocation_strategy: BufferCacheAllocationStrategy,
hash_builder: S,
marker: PhantomData<fn(K) -> V>,
}
impl<K, V> BufferCacheBuilder<K, V, RandomState> {
pub fn new() -> Self {
Self {
strategy: BufferCacheStrategy::default(),
max_buckets: None,
allocation_strategy: BufferCacheAllocationStrategy::default(),
hash_builder: RandomState::new(),
marker: PhantomData,
}
}
}
impl<K, V> Default for BufferCacheBuilder<K, V, RandomState> {
fn default() -> Self {
Self::new()
}
}
impl<K, V, S> BufferCacheBuilder<K, V, S> {
pub fn with_buffer_cache_strategy(mut self, strategy: BufferCacheStrategy) -> Self {
self.strategy = strategy;
self
}
pub fn with_buffer_max_buckets(mut self, max_buckets: Option<usize>) -> Self {
self.max_buckets = max_buckets;
self
}
pub fn with_buffer_cache_allocation_strategy(
mut self,
allocation_strategy: BufferCacheAllocationStrategy,
) -> Self {
self.allocation_strategy = allocation_strategy;
self
}
pub fn with_hash_builder<NewS>(self, hash_builder: NewS) -> BufferCacheBuilder<K, V, NewS> {
BufferCacheBuilder {
strategy: self.strategy,
max_buckets: self.max_buckets,
allocation_strategy: self.allocation_strategy,
hash_builder,
marker: PhantomData,
}
}
}
impl<K, V, S> BufferCacheBuilder<K, V, S>
where
K: Eq + Hash + Ord + Clone + Debug + Send + Sync + 'static,
V: CacheEntry + Clone + Send + Sync + 'static,
S: BuildHasher + Clone + Send + Sync + 'static,
{
pub fn build(
&self,
worker_pairs: usize,
total_capacity_bytes: usize,
) -> Vec<EnumMap<ThreadType, SharedBufferCache<K, V>>> {
if worker_pairs == 0 {
return Vec::new();
}
let per_thread_capacity = total_capacity_bytes / worker_pairs / ThreadType::LENGTH;
let per_pair_capacity = total_capacity_bytes / worker_pairs;
match self.strategy {
BufferCacheStrategy::Lru => {
if self.allocation_strategy == BufferCacheAllocationStrategy::Global {
warn!(
"unsupported buffer cache allocation strategy {:?} set in dev_tweaks for LRU cache, falling back to `per_thread`",
self.allocation_strategy
);
}
(0..worker_pairs)
.map(|_| self.build_thread_slots(per_thread_capacity))
.collect()
}
BufferCacheStrategy::S3Fifo => match self.allocation_strategy {
BufferCacheAllocationStrategy::PerThread => (0..worker_pairs)
.map(|_| self.build_thread_slots(per_thread_capacity))
.collect(),
BufferCacheAllocationStrategy::SharedPerWorkerPair => (0..worker_pairs)
.map(|_| Self::shared_thread_slots(self.build_s3_fifo(per_pair_capacity)))
.collect(),
BufferCacheAllocationStrategy::Global => {
let cache = self.build_s3_fifo(total_capacity_bytes);
(0..worker_pairs)
.map(|_| Self::shared_thread_slots(cache.clone()))
.collect()
}
},
}
}
pub fn build_single(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
match self.strategy {
BufferCacheStrategy::Lru => self.build_lru(capacity_bytes),
BufferCacheStrategy::S3Fifo => self.build_s3_fifo(capacity_bytes),
}
}
fn build_lru(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
let cache: SharedBufferCache<K, V> = std::sync::Arc::new(LruCache::with_hasher(
capacity_bytes,
self.hash_builder.clone(),
));
cache
}
fn build_s3_fifo(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
let shards = normalize_sharded_cache_shards(self.max_buckets);
let cache: SharedBufferCache<K, V> = std::sync::Arc::new(S3FifoCache::with_hasher(
capacity_bytes,
shards,
self.hash_builder.clone(),
));
cache
}
fn build_thread_slots(
&self,
capacity_bytes: usize,
) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
EnumMap::from_fn(|_| self.build_single(capacity_bytes))
}
fn shared_thread_slots(
cache: SharedBufferCache<K, V>,
) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
EnumMap::from_fn(|_| cache.clone())
}
}
fn normalize_sharded_cache_shards(max_buckets: Option<usize>) -> usize {
match max_buckets {
None | Some(0) => S3FifoCache::<(), ()>::DEFAULT_SHARDS,
Some(buckets) => buckets
.checked_next_power_of_two()
.unwrap_or(S3FifoCache::<(), ()>::DEFAULT_SHARDS),
}
}