1use crate::{BufferCache, CacheEntry};
2use feldera_types::config::dev_tweaks::BufferCacheStrategy;
3use quick_cache::{OptionsBuilder, Weighter, sync::Cache as QuickCache};
4use std::any::Any;
5use std::hash::{BuildHasher, Hash, RandomState};
6
7const MIN_ESTIMATED_ITEMS_PER_SHARD: usize = 32;
13
14#[derive(Clone, Copy, Default)]
16struct CacheEntryWeighter;
17
18impl<K, V> Weighter<K, V> for CacheEntryWeighter
19where
20 V: CacheEntry,
21{
22 fn weight(&self, _key: &K, value: &V) -> u64 {
23 value.cost() as u64
24 }
25}
26
27pub struct S3FifoCache<K, V, S = RandomState> {
30 cache: QuickCache<K, V, CacheEntryWeighter, S>,
32 #[cfg(test)]
35 hash_builder: S,
36}
37
38impl<K, V, S> S3FifoCache<K, V, S> {
39 pub const DEFAULT_SHARDS: usize = 256;
41}
42
43impl<K, V> S3FifoCache<K, V, RandomState>
44where
45 K: Eq + Hash + Clone,
46 V: CacheEntry + Clone,
47{
48 pub fn new(total_capacity_bytes: usize) -> Self {
50 Self::with_hasher(
51 total_capacity_bytes,
52 S3FifoCache::<K, V>::DEFAULT_SHARDS,
53 RandomState::new(),
54 )
55 }
56
57 pub fn with_shards(total_capacity_bytes: usize, num_shards: usize) -> Self {
63 Self::with_hasher(total_capacity_bytes, num_shards, RandomState::new())
64 }
65}
66
67#[allow(clippy::len_without_is_empty)]
69impl<K, V, S> S3FifoCache<K, V, S>
70where
71 K: Eq + Hash + Clone,
72 V: CacheEntry + Clone,
73 S: BuildHasher + Clone,
74{
75 pub fn with_hasher(total_capacity_bytes: usize, num_shards: usize, hash_builder: S) -> Self {
85 assert!(num_shards > 0, "num_shards must be > 0");
86 assert!(
87 num_shards.is_power_of_two(),
88 "num_shards must be a power of two"
89 );
90
91 let options = OptionsBuilder::new()
92 .shards(num_shards)
93 .estimated_items_capacity(minimum_estimated_items(num_shards))
94 .weight_capacity(total_capacity_bytes as u64)
95 .build()
96 .expect("valid quick_cache options");
97
98 Self {
99 #[cfg(test)]
100 hash_builder: hash_builder.clone(),
101 cache: QuickCache::with_options(
102 options,
103 CacheEntryWeighter,
104 hash_builder,
105 Default::default(),
106 ),
107 }
108 }
109
110 pub fn insert(&self, key: K, value: V) {
112 self.cache.insert(key, value);
113 }
114
115 pub fn get(&self, key: &K) -> Option<V> {
117 self.cache.get(key)
118 }
119
120 pub fn remove(&self, key: &K) -> Option<V> {
122 self.cache.remove(key).map(|(_, value)| value)
123 }
124
125 pub fn remove_if<F>(&self, predicate: F)
127 where
128 F: Fn(&K) -> bool,
129 {
130 self.cache.retain(|key, _value| !predicate(key))
131 }
132
133 pub fn contains_key(&self, key: &K) -> bool {
135 self.cache.contains_key(key)
136 }
137
138 pub fn len(&self) -> usize {
140 self.cache.len()
141 }
142
143 pub fn total_charge(&self) -> usize {
145 self.cache.weight() as usize
146 }
147
148 pub fn total_capacity(&self) -> usize {
153 self.cache.capacity() as usize
154 }
155
156 pub fn shard_count(&self) -> usize {
158 self.cache.num_shards()
159 }
160
161 #[cfg(test)]
167 pub fn shard_usage(&self, idx: usize) -> (usize, usize) {
168 assert!(idx < self.shard_count(), "shard index out of bounds");
169 let used = self
170 .cache
171 .iter()
172 .filter(|(key, _value)| self.shard_index(key) == idx)
173 .map(|(_key, value)| value.cost())
174 .sum();
175 (used, self.cache.shard_capacity() as usize)
176 }
177
178 #[cfg(test)]
182 pub(crate) fn validate_invariants(&self) {
183 let shard_count = self.shard_count();
184 let mut shard_usage = vec![0usize; shard_count];
185 let mut total_len = 0usize;
186 let mut total_charge = 0usize;
187
188 for (key, value) in self.cache.iter() {
189 let shard_idx = self.shard_index(&key);
190 assert!(shard_idx < shard_count, "invalid backend shard index");
191 let weight = value.cost();
192 shard_usage[shard_idx] += weight;
193 total_len += 1;
194 total_charge += weight;
195 }
196
197 for (idx, used) in shard_usage.into_iter().enumerate() {
198 let (reported_used, reported_capacity) = self.shard_usage(idx);
199 assert_eq!(reported_used, used, "per-shard charge mismatch");
200 assert!(
201 used <= reported_capacity,
202 "used {} exceeds capacity {}",
203 used,
204 reported_capacity
205 );
206 }
207
208 assert_eq!(total_len, self.len(), "global resident count mismatch");
209 assert_eq!(total_charge, self.total_charge(), "global charge mismatch");
210 assert!(
211 total_charge <= self.total_capacity(),
212 "total charge exceeds backend capacity"
213 );
214 }
215
216 #[cfg(test)]
221 pub(crate) fn shard_index(&self, key: &K) -> usize {
222 let shard_mask = (self.shard_count() - 1) as u64;
223 (self
224 .hash_builder
225 .hash_one(key)
226 .rotate_right(usize::BITS / 2)
227 & shard_mask) as usize
228 }
229}
230
231fn minimum_estimated_items(num_shards: usize) -> usize {
234 num_shards.saturating_mul(MIN_ESTIMATED_ITEMS_PER_SHARD)
235}
236
237impl<K, V, S> BufferCache<K, V> for S3FifoCache<K, V, S>
238where
239 K: Eq + Hash + Clone + Send + Sync + 'static,
240 V: CacheEntry + Clone + Send + Sync + 'static,
241 S: BuildHasher + Clone + Send + Sync + 'static,
242{
243 fn as_any(&self) -> &dyn Any {
244 self
245 }
246
247 fn strategy(&self) -> BufferCacheStrategy {
248 BufferCacheStrategy::S3Fifo
249 }
250
251 fn insert(&self, key: K, value: V) {
252 self.insert(key, value);
253 }
254
255 fn get(&self, key: K) -> Option<V> {
256 self.get(&key)
257 }
258
259 fn remove(&self, key: &K) -> Option<V> {
260 self.remove(key)
261 }
262
263 fn remove_if(&self, predicate: &dyn Fn(&K) -> bool) {
264 self.remove_if(|key| predicate(key))
265 }
266
267 fn contains_key(&self, key: &K) -> bool {
268 self.contains_key(key)
269 }
270
271 fn len(&self) -> usize {
272 self.len()
273 }
274
275 fn total_charge(&self) -> usize {
276 self.total_charge()
277 }
278
279 fn total_capacity(&self) -> usize {
280 self.total_capacity()
281 }
282
283 fn shard_count(&self) -> usize {
284 self.shard_count()
285 }
286
287 #[cfg(test)]
288 fn shard_usage(&self, idx: usize) -> (usize, usize) {
289 self.shard_usage(idx)
290 }
291}