Skip to main content

feldera_buffer_cache/
s3_fifo.rs

1use crate::{BufferCache, CacheEntry};
2use feldera_types::config::dev_tweaks::BufferCacheStrategy;
3use quick_cache::{OptionsBuilder, Weighter, sync::Cache as QuickCache};
4use std::any::Any;
5use std::hash::{BuildHasher, Hash, RandomState};
6
7/// `quick_cache` requires an item-count estimate and uses it to decide whether
8/// the requested shard count can be honored.
9///
10/// Setting the estimate to at least 32 items per shard keeps the requested
11/// shard count intact without introducing a capacity-based heuristic.
12const MIN_ESTIMATED_ITEMS_PER_SHARD: usize = 32;
13
14/// Converts [`CacheEntry::cost`] into the weight expected by `quick_cache`.
15#[derive(Clone, Copy, Default)]
16struct CacheEntryWeighter;
17
18impl<K, V> Weighter<K, V> for CacheEntryWeighter
19where
20    V: CacheEntry,
21{
22    fn weight(&self, _key: &K, value: &V) -> u64 {
23        value.cost() as u64
24    }
25}
26
27/// A sharded, weighted, thread-safe S3-FIFO cache backed directly by
28/// `quick_cache`.
29pub struct S3FifoCache<K, V, S = RandomState> {
30    /// Shared `quick_cache` backend.
31    cache: QuickCache<K, V, CacheEntryWeighter, S>,
32    /// Hash builder retained only so tests can mirror `quick_cache`'s shard
33    /// selection.
34    #[cfg(test)]
35    hash_builder: S,
36}
37
38impl<K, V, S> S3FifoCache<K, V, S> {
39    /// Default power-of-two shard count used by [`S3FifoCache::new`].
40    pub const DEFAULT_SHARDS: usize = 256;
41}
42
43impl<K, V> S3FifoCache<K, V, RandomState>
44where
45    K: Eq + Hash + Clone,
46    V: CacheEntry + Clone,
47{
48    /// Creates a cache with [`Self::DEFAULT_SHARDS`] shards.
49    pub fn new(total_capacity_bytes: usize) -> Self {
50        Self::with_hasher(
51            total_capacity_bytes,
52            S3FifoCache::<K, V>::DEFAULT_SHARDS,
53            RandomState::new(),
54        )
55    }
56
57    /// Creates a cache with an explicit shard count.
58    ///
59    /// # Panics
60    ///
61    /// Panics if `num_shards == 0` or if `num_shards` is not a power of two.
62    pub fn with_shards(total_capacity_bytes: usize, num_shards: usize) -> Self {
63        Self::with_hasher(total_capacity_bytes, num_shards, RandomState::new())
64    }
65}
66
67// explicit allow, we do have `is_empty` in the trait so this is a false positive
68#[allow(clippy::len_without_is_empty)]
69impl<K, V, S> S3FifoCache<K, V, S>
70where
71    K: Eq + Hash + Clone,
72    V: CacheEntry + Clone,
73    S: BuildHasher + Clone,
74{
75    /// Creates a cache with an explicit shard count and hash builder.
76    ///
77    /// Because `quick_cache` uses equal-capacity shards internally, the actual
78    /// backend capacity may round up when `total_capacity_bytes` is not evenly
79    /// divisible by `num_shards`.
80    ///
81    /// # Panics
82    ///
83    /// Panics if `num_shards == 0` or if `num_shards` is not a power of two.
84    pub fn with_hasher(total_capacity_bytes: usize, num_shards: usize, hash_builder: S) -> Self {
85        assert!(num_shards > 0, "num_shards must be > 0");
86        assert!(
87            num_shards.is_power_of_two(),
88            "num_shards must be a power of two"
89        );
90
91        let options = OptionsBuilder::new()
92            .shards(num_shards)
93            .estimated_items_capacity(minimum_estimated_items(num_shards))
94            .weight_capacity(total_capacity_bytes as u64)
95            .build()
96            .expect("valid quick_cache options");
97
98        Self {
99            #[cfg(test)]
100            hash_builder: hash_builder.clone(),
101            cache: QuickCache::with_options(
102                options,
103                CacheEntryWeighter,
104                hash_builder,
105                Default::default(),
106            ),
107        }
108    }
109
110    /// Inserts or replaces `key` with `value`.
111    pub fn insert(&self, key: K, value: V) {
112        self.cache.insert(key, value);
113    }
114
115    /// Looks up `key` and returns a clone of the stored value.
116    pub fn get(&self, key: &K) -> Option<V> {
117        self.cache.get(key)
118    }
119
120    /// Removes `key` if present and returns the removed value.
121    pub fn remove(&self, key: &K) -> Option<V> {
122        self.cache.remove(key).map(|(_, value)| value)
123    }
124
125    /// Removes all entries matching `predicate` and returns the number removed.
126    pub fn remove_if<F>(&self, predicate: F)
127    where
128        F: Fn(&K) -> bool,
129    {
130        self.cache.retain(|key, _value| !predicate(key))
131    }
132
133    /// Returns `true` if `key` is currently present.
134    pub fn contains_key(&self, key: &K) -> bool {
135        self.cache.contains_key(key)
136    }
137
138    /// Returns the current number of live entries.
139    pub fn len(&self) -> usize {
140        self.cache.len()
141    }
142
143    /// Returns the current total weighted charge.
144    pub fn total_charge(&self) -> usize {
145        self.cache.weight() as usize
146    }
147
148    /// Returns the backend's configured total weighted capacity.
149    ///
150    /// This may be larger than the requested constructor argument when the
151    /// requested total capacity is not evenly divisible across shards.
152    pub fn total_capacity(&self) -> usize {
153        self.cache.capacity() as usize
154    }
155
156    /// Returns the number of backend shards.
157    pub fn shard_count(&self) -> usize {
158        self.cache.num_shards()
159    }
160
161    /// Returns `(used_charge, capacity)` for backend shard `idx`.
162    ///
163    /// # Panics
164    ///
165    /// Panics if `idx >= self.shard_count()`.
166    #[cfg(test)]
167    pub fn shard_usage(&self, idx: usize) -> (usize, usize) {
168        assert!(idx < self.shard_count(), "shard index out of bounds");
169        let used = self
170            .cache
171            .iter()
172            .filter(|(key, _value)| self.shard_index(key) == idx)
173            .map(|(_key, value)| value.cost())
174            .sum();
175        (used, self.cache.shard_capacity() as usize)
176    }
177
178    /// Validates the wrapper invariants we rely on in Feldera tests:
179    /// iteration agrees with the public accounting APIs and each resident key
180    /// maps to the backend shard reported by `shard_usage()`.
181    #[cfg(test)]
182    pub(crate) fn validate_invariants(&self) {
183        let shard_count = self.shard_count();
184        let mut shard_usage = vec![0usize; shard_count];
185        let mut total_len = 0usize;
186        let mut total_charge = 0usize;
187
188        for (key, value) in self.cache.iter() {
189            let shard_idx = self.shard_index(&key);
190            assert!(shard_idx < shard_count, "invalid backend shard index");
191            let weight = value.cost();
192            shard_usage[shard_idx] += weight;
193            total_len += 1;
194            total_charge += weight;
195        }
196
197        for (idx, used) in shard_usage.into_iter().enumerate() {
198            let (reported_used, reported_capacity) = self.shard_usage(idx);
199            assert_eq!(reported_used, used, "per-shard charge mismatch");
200            assert!(
201                used <= reported_capacity,
202                "used {} exceeds capacity {}",
203                used,
204                reported_capacity
205            );
206        }
207
208        assert_eq!(total_len, self.len(), "global resident count mismatch");
209        assert_eq!(total_charge, self.total_charge(), "global charge mismatch");
210        assert!(
211            total_charge <= self.total_capacity(),
212            "total charge exceeds backend capacity"
213        );
214    }
215
216    /// Mirrors `quick_cache`'s shard selection logic for test-only shard
217    /// accounting and validation.
218    ///
219    /// The hash function is not exposed in the public API of `quick_cache`.
220    #[cfg(test)]
221    pub(crate) fn shard_index(&self, key: &K) -> usize {
222        let shard_mask = (self.shard_count() - 1) as u64;
223        (self
224            .hash_builder
225            .hash_one(key)
226            .rotate_right(usize::BITS / 2)
227            & shard_mask) as usize
228    }
229}
230
231/// Returns the minimum item-count estimate needed to preserve a requested
232/// shard count in `quick_cache`.
233fn minimum_estimated_items(num_shards: usize) -> usize {
234    num_shards.saturating_mul(MIN_ESTIMATED_ITEMS_PER_SHARD)
235}
236
237impl<K, V, S> BufferCache<K, V> for S3FifoCache<K, V, S>
238where
239    K: Eq + Hash + Clone + Send + Sync + 'static,
240    V: CacheEntry + Clone + Send + Sync + 'static,
241    S: BuildHasher + Clone + Send + Sync + 'static,
242{
243    fn as_any(&self) -> &dyn Any {
244        self
245    }
246
247    fn strategy(&self) -> BufferCacheStrategy {
248        BufferCacheStrategy::S3Fifo
249    }
250
251    fn insert(&self, key: K, value: V) {
252        self.insert(key, value);
253    }
254
255    fn get(&self, key: K) -> Option<V> {
256        self.get(&key)
257    }
258
259    fn remove(&self, key: &K) -> Option<V> {
260        self.remove(key)
261    }
262
263    fn remove_if(&self, predicate: &dyn Fn(&K) -> bool) {
264        self.remove_if(|key| predicate(key))
265    }
266
267    fn contains_key(&self, key: &K) -> bool {
268        self.contains_key(key)
269    }
270
271    fn len(&self) -> usize {
272        self.len()
273    }
274
275    fn total_charge(&self) -> usize {
276        self.total_charge()
277    }
278
279    fn total_capacity(&self) -> usize {
280        self.total_capacity()
281    }
282
283    fn shard_count(&self) -> usize {
284        self.shard_count()
285    }
286
287    #[cfg(test)]
288    fn shard_usage(&self, idx: usize) -> (usize, usize) {
289        self.shard_usage(idx)
290    }
291}