Skip to main content

feldera_buffer_cache/
s3_fifo.rs

1use crate::{BufferCache, BufferCacheStrategy, CacheEntry};
2use quick_cache::{OptionsBuilder, Weighter, sync::Cache as QuickCache};
3use std::any::Any;
4use std::hash::{BuildHasher, Hash, RandomState};
5
6/// `quick_cache` requires an item-count estimate and uses it to decide whether
7/// the requested shard count can be honored.
8///
9/// Setting the estimate to at least 32 items per shard keeps the requested
10/// shard count intact without introducing a capacity-based heuristic.
11const MIN_ESTIMATED_ITEMS_PER_SHARD: usize = 32;
12
13/// Converts [`CacheEntry::cost`] into the weight expected by `quick_cache`.
14#[derive(Clone, Copy, Default)]
15struct CacheEntryWeighter;
16
17impl<K, V> Weighter<K, V> for CacheEntryWeighter
18where
19    V: CacheEntry,
20{
21    fn weight(&self, _key: &K, value: &V) -> u64 {
22        value.cost() as u64
23    }
24}
25
26/// A sharded, weighted, thread-safe S3-FIFO cache backed directly by
27/// `quick_cache`.
28pub struct S3FifoCache<K, V, S = RandomState> {
29    /// Shared `quick_cache` backend.
30    cache: QuickCache<K, V, CacheEntryWeighter, S>,
31    /// Hash builder retained only so tests can mirror `quick_cache`'s shard
32    /// selection.
33    #[cfg(test)]
34    hash_builder: S,
35}
36
37impl<K, V, S> S3FifoCache<K, V, S> {
38    /// Default power-of-two shard count used by [`S3FifoCache::new`].
39    pub const DEFAULT_SHARDS: usize = 256;
40}
41
42impl<K, V> S3FifoCache<K, V, RandomState>
43where
44    K: Eq + Hash + Clone,
45    V: CacheEntry + Clone,
46{
47    /// Creates a cache with [`Self::DEFAULT_SHARDS`] shards.
48    pub fn new(total_capacity_bytes: usize) -> Self {
49        Self::with_hasher(
50            total_capacity_bytes,
51            S3FifoCache::<K, V>::DEFAULT_SHARDS,
52            RandomState::new(),
53        )
54    }
55
56    /// Creates a cache with an explicit shard count.
57    ///
58    /// # Panics
59    ///
60    /// Panics if `num_shards == 0` or if `num_shards` is not a power of two.
61    pub fn with_shards(total_capacity_bytes: usize, num_shards: usize) -> Self {
62        Self::with_hasher(total_capacity_bytes, num_shards, RandomState::new())
63    }
64}
65
66// explicit allow, we do have `is_empty` in the trait so this is a false positive
67#[allow(clippy::len_without_is_empty)]
68impl<K, V, S> S3FifoCache<K, V, S>
69where
70    K: Eq + Hash + Clone,
71    V: CacheEntry + Clone,
72    S: BuildHasher + Clone,
73{
74    /// Creates a cache with an explicit shard count and hash builder.
75    ///
76    /// Because `quick_cache` uses equal-capacity shards internally, the actual
77    /// backend capacity may round up when `total_capacity_bytes` is not evenly
78    /// divisible by `num_shards`.
79    ///
80    /// # Panics
81    ///
82    /// Panics if `num_shards == 0` or if `num_shards` is not a power of two.
83    pub fn with_hasher(total_capacity_bytes: usize, num_shards: usize, hash_builder: S) -> Self {
84        assert!(num_shards > 0, "num_shards must be > 0");
85        assert!(
86            num_shards.is_power_of_two(),
87            "num_shards must be a power of two"
88        );
89
90        let options = OptionsBuilder::new()
91            .shards(num_shards)
92            .estimated_items_capacity(minimum_estimated_items(num_shards))
93            .weight_capacity(total_capacity_bytes as u64)
94            .build()
95            .expect("valid quick_cache options");
96
97        Self {
98            #[cfg(test)]
99            hash_builder: hash_builder.clone(),
100            cache: QuickCache::with_options(
101                options,
102                CacheEntryWeighter,
103                hash_builder,
104                Default::default(),
105            ),
106        }
107    }
108
109    /// Inserts or replaces `key` with `value`.
110    pub fn insert(&self, key: K, value: V) {
111        self.cache.insert(key, value);
112    }
113
114    /// Looks up `key` and returns a clone of the stored value.
115    pub fn get(&self, key: &K) -> Option<V> {
116        self.cache.get(key)
117    }
118
119    /// Removes `key` if present and returns the removed value.
120    pub fn remove(&self, key: &K) -> Option<V> {
121        self.cache.remove(key).map(|(_, value)| value)
122    }
123
124    /// Removes all entries matching `predicate` and returns the number removed.
125    pub fn remove_if<F>(&self, predicate: F)
126    where
127        F: Fn(&K) -> bool,
128    {
129        self.cache.retain(|key, _value| !predicate(key))
130    }
131
132    /// Returns `true` if `key` is currently present.
133    pub fn contains_key(&self, key: &K) -> bool {
134        self.cache.contains_key(key)
135    }
136
137    /// Returns the current number of live entries.
138    pub fn len(&self) -> usize {
139        self.cache.len()
140    }
141
142    /// Returns the current total weighted charge.
143    pub fn total_charge(&self) -> usize {
144        self.cache.weight() as usize
145    }
146
147    /// Returns the backend's configured total weighted capacity.
148    ///
149    /// This may be larger than the requested constructor argument when the
150    /// requested total capacity is not evenly divisible across shards.
151    pub fn total_capacity(&self) -> usize {
152        self.cache.capacity() as usize
153    }
154
155    /// Returns the number of backend shards.
156    pub fn shard_count(&self) -> usize {
157        self.cache.num_shards()
158    }
159
160    /// Returns `(used_charge, capacity)` for backend shard `idx`.
161    ///
162    /// # Panics
163    ///
164    /// Panics if `idx >= self.shard_count()`.
165    #[cfg(test)]
166    pub fn shard_usage(&self, idx: usize) -> (usize, usize) {
167        assert!(idx < self.shard_count(), "shard index out of bounds");
168        let used = self
169            .cache
170            .iter()
171            .filter(|(key, _value)| self.shard_index(key) == idx)
172            .map(|(_key, value)| value.cost())
173            .sum();
174        (used, self.cache.shard_capacity() as usize)
175    }
176
177    /// Validates the wrapper invariants we rely on in Feldera tests:
178    /// iteration agrees with the public accounting APIs and each resident key
179    /// maps to the backend shard reported by `shard_usage()`.
180    #[cfg(test)]
181    pub(crate) fn validate_invariants(&self) {
182        let shard_count = self.shard_count();
183        let mut shard_usage = vec![0usize; shard_count];
184        let mut total_len = 0usize;
185        let mut total_charge = 0usize;
186
187        for (key, value) in self.cache.iter() {
188            let shard_idx = self.shard_index(&key);
189            assert!(shard_idx < shard_count, "invalid backend shard index");
190            let weight = value.cost();
191            shard_usage[shard_idx] += weight;
192            total_len += 1;
193            total_charge += weight;
194        }
195
196        for (idx, used) in shard_usage.into_iter().enumerate() {
197            let (reported_used, reported_capacity) = self.shard_usage(idx);
198            assert_eq!(reported_used, used, "per-shard charge mismatch");
199            assert!(
200                used <= reported_capacity,
201                "used {} exceeds capacity {}",
202                used,
203                reported_capacity
204            );
205        }
206
207        assert_eq!(total_len, self.len(), "global resident count mismatch");
208        assert_eq!(total_charge, self.total_charge(), "global charge mismatch");
209        assert!(
210            total_charge <= self.total_capacity(),
211            "total charge exceeds backend capacity"
212        );
213    }
214
215    /// Mirrors `quick_cache`'s shard selection logic for test-only shard
216    /// accounting and validation.
217    ///
218    /// The hash function is not exposed in the public API of `quick_cache`.
219    #[cfg(test)]
220    pub(crate) fn shard_index(&self, key: &K) -> usize {
221        let shard_mask = (self.shard_count() - 1) as u64;
222        (self
223            .hash_builder
224            .hash_one(key)
225            .rotate_right(usize::BITS / 2)
226            & shard_mask) as usize
227    }
228}
229
230/// Returns the minimum item-count estimate needed to preserve a requested
231/// shard count in `quick_cache`.
232fn minimum_estimated_items(num_shards: usize) -> usize {
233    num_shards.saturating_mul(MIN_ESTIMATED_ITEMS_PER_SHARD)
234}
235
236impl<K, V, S> BufferCache<K, V> for S3FifoCache<K, V, S>
237where
238    K: Eq + Hash + Clone + Send + Sync + 'static,
239    V: CacheEntry + Clone + Send + Sync + 'static,
240    S: BuildHasher + Clone + Send + Sync + 'static,
241{
242    fn as_any(&self) -> &dyn Any {
243        self
244    }
245
246    fn strategy(&self) -> BufferCacheStrategy {
247        BufferCacheStrategy::S3Fifo
248    }
249
250    fn insert(&self, key: K, value: V) {
251        self.insert(key, value);
252    }
253
254    fn get(&self, key: K) -> Option<V> {
255        self.get(&key)
256    }
257
258    fn remove(&self, key: &K) -> Option<V> {
259        self.remove(key)
260    }
261
262    fn remove_if(&self, predicate: &dyn Fn(&K) -> bool) {
263        self.remove_if(|key| predicate(key))
264    }
265
266    fn contains_key(&self, key: &K) -> bool {
267        self.contains_key(key)
268    }
269
270    fn len(&self) -> usize {
271        self.len()
272    }
273
274    fn total_charge(&self) -> usize {
275        self.total_charge()
276    }
277
278    fn total_capacity(&self) -> usize {
279        self.total_capacity()
280    }
281
282    fn shard_count(&self) -> usize {
283        self.shard_count()
284    }
285
286    #[cfg(test)]
287    fn shard_usage(&self, idx: usize) -> (usize, usize) {
288        self.shard_usage(idx)
289    }
290}