feldera_buffer_cache/
builder.rs1use crate::ThreadType;
2use crate::{CacheEntry, LruCache, S3FifoCache, SharedBufferCache};
3use enum_map::{Enum, EnumMap};
4use feldera_types::config::dev_tweaks::{BufferCacheAllocationStrategy, BufferCacheStrategy};
5use std::fmt::Debug;
6use std::hash::{BuildHasher, Hash, RandomState};
7use std::marker::PhantomData;
8use tracing::warn;
9
10pub struct BufferCacheBuilder<K, V, S = RandomState> {
12 strategy: BufferCacheStrategy,
14 max_buckets: Option<usize>,
16 allocation_strategy: BufferCacheAllocationStrategy,
18 hash_builder: S,
20 marker: PhantomData<fn(K) -> V>,
22}
23
24impl<K, V> BufferCacheBuilder<K, V, RandomState> {
25 pub fn new() -> Self {
27 Self {
28 strategy: BufferCacheStrategy::default(),
29 max_buckets: None,
30 allocation_strategy: BufferCacheAllocationStrategy::default(),
31 hash_builder: RandomState::new(),
32 marker: PhantomData,
33 }
34 }
35}
36
37impl<K, V> Default for BufferCacheBuilder<K, V, RandomState> {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl<K, V, S> BufferCacheBuilder<K, V, S> {
44 pub fn with_buffer_cache_strategy(mut self, strategy: BufferCacheStrategy) -> Self {
46 self.strategy = strategy;
47 self
48 }
49
50 pub fn with_buffer_max_buckets(mut self, max_buckets: Option<usize>) -> Self {
52 self.max_buckets = max_buckets;
53 self
54 }
55
56 pub fn with_buffer_cache_allocation_strategy(
58 mut self,
59 allocation_strategy: BufferCacheAllocationStrategy,
60 ) -> Self {
61 self.allocation_strategy = allocation_strategy;
62 self
63 }
64
65 pub fn with_hash_builder<NewS>(self, hash_builder: NewS) -> BufferCacheBuilder<K, V, NewS> {
67 BufferCacheBuilder {
68 strategy: self.strategy,
69 max_buckets: self.max_buckets,
70 allocation_strategy: self.allocation_strategy,
71 hash_builder,
72 marker: PhantomData,
73 }
74 }
75}
76
77impl<K, V, S> BufferCacheBuilder<K, V, S>
78where
79 K: Eq + Hash + Ord + Clone + Debug + Send + Sync + 'static,
80 V: CacheEntry + Clone + Send + Sync + 'static,
81 S: BuildHasher + Clone + Send + Sync + 'static,
82{
83 pub fn build(
85 &self,
86 worker_pairs: usize,
87 total_capacity_bytes: usize,
88 ) -> Vec<EnumMap<ThreadType, SharedBufferCache<K, V>>> {
89 if worker_pairs == 0 {
90 return Vec::new();
91 }
92
93 let per_thread_capacity = total_capacity_bytes / worker_pairs / ThreadType::LENGTH;
94 let per_pair_capacity = total_capacity_bytes / worker_pairs;
95
96 match self.strategy {
97 BufferCacheStrategy::Lru => {
98 if self.allocation_strategy == BufferCacheAllocationStrategy::Global {
99 warn!(
100 "unsupported buffer cache allocation strategy {:?} set in dev_tweaks for LRU cache, falling back to `per_thread`",
101 self.allocation_strategy
102 );
103 }
104 (0..worker_pairs)
105 .map(|_| self.build_thread_slots(per_thread_capacity))
106 .collect()
107 }
108 BufferCacheStrategy::S3Fifo => match self.allocation_strategy {
109 BufferCacheAllocationStrategy::PerThread => (0..worker_pairs)
110 .map(|_| self.build_thread_slots(per_thread_capacity))
111 .collect(),
112 BufferCacheAllocationStrategy::SharedPerWorkerPair => (0..worker_pairs)
113 .map(|_| Self::shared_thread_slots(self.build_s3_fifo(per_pair_capacity)))
114 .collect(),
115 BufferCacheAllocationStrategy::Global => {
116 let cache = self.build_s3_fifo(total_capacity_bytes);
117 (0..worker_pairs)
118 .map(|_| Self::shared_thread_slots(cache.clone()))
119 .collect()
120 }
121 },
122 }
123 }
124
125 pub fn build_single(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
127 match self.strategy {
128 BufferCacheStrategy::Lru => self.build_lru(capacity_bytes),
129 BufferCacheStrategy::S3Fifo => self.build_s3_fifo(capacity_bytes),
130 }
131 }
132
133 fn build_lru(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
135 let cache: SharedBufferCache<K, V> = std::sync::Arc::new(LruCache::with_hasher(
136 capacity_bytes,
137 self.hash_builder.clone(),
138 ));
139 cache
140 }
141
142 fn build_s3_fifo(&self, capacity_bytes: usize) -> SharedBufferCache<K, V> {
144 let shards = normalize_sharded_cache_shards(self.max_buckets);
145 let cache: SharedBufferCache<K, V> = std::sync::Arc::new(S3FifoCache::with_hasher(
146 capacity_bytes,
147 shards,
148 self.hash_builder.clone(),
149 ));
150 cache
151 }
152
153 fn build_thread_slots(
155 &self,
156 capacity_bytes: usize,
157 ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
158 EnumMap::from_fn(|_| self.build_single(capacity_bytes))
159 }
160
161 fn shared_thread_slots(
163 cache: SharedBufferCache<K, V>,
164 ) -> EnumMap<ThreadType, SharedBufferCache<K, V>> {
165 EnumMap::from_fn(|_| cache.clone())
166 }
167}
168
169fn normalize_sharded_cache_shards(max_buckets: Option<usize>) -> usize {
172 match max_buckets {
173 None | Some(0) => S3FifoCache::<(), ()>::DEFAULT_SHARDS,
174 Some(buckets) => buckets
175 .checked_next_power_of_two()
176 .unwrap_or(S3FifoCache::<(), ()>::DEFAULT_SHARDS),
177 }
178}