1use crate::{BufferCache, BufferCacheStrategy, CacheEntry};
2use quick_cache::{OptionsBuilder, Weighter, sync::Cache as QuickCache};
3use std::any::Any;
4use std::hash::{BuildHasher, Hash, RandomState};
5
6const MIN_ESTIMATED_ITEMS_PER_SHARD: usize = 32;
12
13#[derive(Clone, Copy, Default)]
15struct CacheEntryWeighter;
16
17impl<K, V> Weighter<K, V> for CacheEntryWeighter
18where
19 V: CacheEntry,
20{
21 fn weight(&self, _key: &K, value: &V) -> u64 {
22 value.cost() as u64
23 }
24}
25
26pub struct S3FifoCache<K, V, S = RandomState> {
29 cache: QuickCache<K, V, CacheEntryWeighter, S>,
31 #[cfg(test)]
34 hash_builder: S,
35}
36
37impl<K, V, S> S3FifoCache<K, V, S> {
38 pub const DEFAULT_SHARDS: usize = 256;
40}
41
42impl<K, V> S3FifoCache<K, V, RandomState>
43where
44 K: Eq + Hash + Clone,
45 V: CacheEntry + Clone,
46{
47 pub fn new(total_capacity_bytes: usize) -> Self {
49 Self::with_hasher(
50 total_capacity_bytes,
51 S3FifoCache::<K, V>::DEFAULT_SHARDS,
52 RandomState::new(),
53 )
54 }
55
56 pub fn with_shards(total_capacity_bytes: usize, num_shards: usize) -> Self {
62 Self::with_hasher(total_capacity_bytes, num_shards, RandomState::new())
63 }
64}
65
66#[allow(clippy::len_without_is_empty)]
68impl<K, V, S> S3FifoCache<K, V, S>
69where
70 K: Eq + Hash + Clone,
71 V: CacheEntry + Clone,
72 S: BuildHasher + Clone,
73{
74 pub fn with_hasher(total_capacity_bytes: usize, num_shards: usize, hash_builder: S) -> Self {
84 assert!(num_shards > 0, "num_shards must be > 0");
85 assert!(
86 num_shards.is_power_of_two(),
87 "num_shards must be a power of two"
88 );
89
90 let options = OptionsBuilder::new()
91 .shards(num_shards)
92 .estimated_items_capacity(minimum_estimated_items(num_shards))
93 .weight_capacity(total_capacity_bytes as u64)
94 .build()
95 .expect("valid quick_cache options");
96
97 Self {
98 #[cfg(test)]
99 hash_builder: hash_builder.clone(),
100 cache: QuickCache::with_options(
101 options,
102 CacheEntryWeighter,
103 hash_builder,
104 Default::default(),
105 ),
106 }
107 }
108
109 pub fn insert(&self, key: K, value: V) {
111 self.cache.insert(key, value);
112 }
113
114 pub fn get(&self, key: &K) -> Option<V> {
116 self.cache.get(key)
117 }
118
119 pub fn remove(&self, key: &K) -> Option<V> {
121 self.cache.remove(key).map(|(_, value)| value)
122 }
123
124 pub fn remove_if<F>(&self, predicate: F)
126 where
127 F: Fn(&K) -> bool,
128 {
129 self.cache.retain(|key, _value| !predicate(key))
130 }
131
132 pub fn contains_key(&self, key: &K) -> bool {
134 self.cache.contains_key(key)
135 }
136
137 pub fn len(&self) -> usize {
139 self.cache.len()
140 }
141
142 pub fn total_charge(&self) -> usize {
144 self.cache.weight() as usize
145 }
146
147 pub fn total_capacity(&self) -> usize {
152 self.cache.capacity() as usize
153 }
154
155 pub fn shard_count(&self) -> usize {
157 self.cache.num_shards()
158 }
159
160 #[cfg(test)]
166 pub fn shard_usage(&self, idx: usize) -> (usize, usize) {
167 assert!(idx < self.shard_count(), "shard index out of bounds");
168 let used = self
169 .cache
170 .iter()
171 .filter(|(key, _value)| self.shard_index(key) == idx)
172 .map(|(_key, value)| value.cost())
173 .sum();
174 (used, self.cache.shard_capacity() as usize)
175 }
176
177 #[cfg(test)]
181 pub(crate) fn validate_invariants(&self) {
182 let shard_count = self.shard_count();
183 let mut shard_usage = vec![0usize; shard_count];
184 let mut total_len = 0usize;
185 let mut total_charge = 0usize;
186
187 for (key, value) in self.cache.iter() {
188 let shard_idx = self.shard_index(&key);
189 assert!(shard_idx < shard_count, "invalid backend shard index");
190 let weight = value.cost();
191 shard_usage[shard_idx] += weight;
192 total_len += 1;
193 total_charge += weight;
194 }
195
196 for (idx, used) in shard_usage.into_iter().enumerate() {
197 let (reported_used, reported_capacity) = self.shard_usage(idx);
198 assert_eq!(reported_used, used, "per-shard charge mismatch");
199 assert!(
200 used <= reported_capacity,
201 "used {} exceeds capacity {}",
202 used,
203 reported_capacity
204 );
205 }
206
207 assert_eq!(total_len, self.len(), "global resident count mismatch");
208 assert_eq!(total_charge, self.total_charge(), "global charge mismatch");
209 assert!(
210 total_charge <= self.total_capacity(),
211 "total charge exceeds backend capacity"
212 );
213 }
214
215 #[cfg(test)]
220 pub(crate) fn shard_index(&self, key: &K) -> usize {
221 let shard_mask = (self.shard_count() - 1) as u64;
222 (self
223 .hash_builder
224 .hash_one(key)
225 .rotate_right(usize::BITS / 2)
226 & shard_mask) as usize
227 }
228}
229
230fn minimum_estimated_items(num_shards: usize) -> usize {
233 num_shards.saturating_mul(MIN_ESTIMATED_ITEMS_PER_SHARD)
234}
235
236impl<K, V, S> BufferCache<K, V> for S3FifoCache<K, V, S>
237where
238 K: Eq + Hash + Clone + Send + Sync + 'static,
239 V: CacheEntry + Clone + Send + Sync + 'static,
240 S: BuildHasher + Clone + Send + Sync + 'static,
241{
242 fn as_any(&self) -> &dyn Any {
243 self
244 }
245
246 fn strategy(&self) -> BufferCacheStrategy {
247 BufferCacheStrategy::S3Fifo
248 }
249
250 fn insert(&self, key: K, value: V) {
251 self.insert(key, value);
252 }
253
254 fn get(&self, key: K) -> Option<V> {
255 self.get(&key)
256 }
257
258 fn remove(&self, key: &K) -> Option<V> {
259 self.remove(key)
260 }
261
262 fn remove_if(&self, predicate: &dyn Fn(&K) -> bool) {
263 self.remove_if(|key| predicate(key))
264 }
265
266 fn contains_key(&self, key: &K) -> bool {
267 self.contains_key(key)
268 }
269
270 fn len(&self) -> usize {
271 self.len()
272 }
273
274 fn total_charge(&self) -> usize {
275 self.total_charge()
276 }
277
278 fn total_capacity(&self) -> usize {
279 self.total_capacity()
280 }
281
282 fn shard_count(&self) -> usize {
283 self.shard_count()
284 }
285
286 #[cfg(test)]
287 fn shard_usage(&self, idx: usize) -> (usize, usize) {
288 self.shard_usage(idx)
289 }
290}