breve/
lib.rs

1// Copyright 2025 Chojan Shang.
2// Copyright 2024 Cloudflare, Inc.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Breve is an in-memory cache implementation with Uno as the admission policy and [S3-FIFO](https://s3fifo.com/) as the eviction policy.
17
18use ahash::RandomState;
19use crossbeam_queue::SegQueue;
20use std::marker::PhantomData;
21use std::sync::atomic::AtomicUsize;
22use std::sync::atomic::{
23    AtomicBool, AtomicU8,
24    Ordering::{Acquire, Relaxed, SeqCst},
25};
26
27mod atomic;
28mod buckets;
29mod estimator;
30mod uno;
31
32use buckets::Buckets;
33use std::hash::Hash;
34use uno::{UnoLearner, UnoSketch};
35
36const SMALL: bool = false;
37const MAIN: bool = true;
38
39// Indicate which queue an item is located
40#[derive(Debug, Default)]
41struct Location(AtomicBool);
42
43impl Location {
44    fn new_small() -> Self {
45        Self(AtomicBool::new(SMALL))
46    }
47
48    fn value(&self) -> bool {
49        self.0.load(Relaxed)
50    }
51
52    fn is_main(&self) -> bool {
53        self.value()
54    }
55
56    fn move_to_main(&self) {
57        self.0.store(true, Relaxed);
58    }
59}
60
61// We have 8 bits to spare but we still cap at 3. This is to make sure that the main queue
62// in the worst case can find something to evict quickly
63const USES_CAP: u8 = 3;
64
65#[derive(Debug, Default)]
66struct Uses(AtomicU8);
67
68impl Uses {
69    pub fn inc_uses(&self) -> u8 {
70        loop {
71            let uses = self.uses();
72            if uses >= USES_CAP {
73                return uses;
74            }
75            if let Err(new) = self.0.compare_exchange(uses, uses + 1, Acquire, Relaxed) {
76                // someone else beat us to it
77                if new >= USES_CAP {
78                    // already above cap
79                    return new;
80                } // else, try again
81            } else {
82                return uses + 1;
83            }
84        }
85    }
86
87    // decrease uses, return the previous value
88    pub fn decr_uses(&self) -> u8 {
89        loop {
90            let uses = self.uses();
91            if uses == 0 {
92                return 0;
93            }
94            if let Err(new) = self.0.compare_exchange(uses, uses - 1, Acquire, Relaxed) {
95                // someone else beat us to it
96                if new == 0 {
97                    return 0;
98                } // else, try again
99            } else {
100                return uses;
101            }
102        }
103    }
104
105    pub fn uses(&self) -> u8 {
106        self.0.load(Relaxed)
107    }
108}
109
110type Key = u64;
111type Weight = u16;
112
113/// The key-value pair returned from cache eviction
114#[derive(Clone)]
115pub struct KV<T> {
116    /// NOTE: that we currently don't store the Actual key in the cache. This returned value
117    /// is just the hash of it.
118    pub key: Key,
119    pub data: T,
120    pub weight: Weight,
121}
122
123// the data and its metadata
124struct Bucket<T> {
125    uses: Uses,
126    queue: Location,
127    weight: Weight,
128    data: T,
129}
130
131const SMALL_QUEUE_PERCENTAGE: f32 = 0.1;
132
133struct FiFoQueues<T> {
134    total_weight_limit: usize,
135
136    small: SegQueue<Key>,
137    small_weight: AtomicUsize,
138
139    main: SegQueue<Key>,
140    main_weight: AtomicUsize,
141
142    // this replaces the ghost queue of S3-FIFO with similar goal: track the evicted assets
143    estimator: UnoSketch,
144    // try to machine learning
145    learner: UnoLearner,
146
147    _t: PhantomData<T>,
148}
149
150impl<T: Clone + Send + Sync + 'static> FiFoQueues<T> {
151    fn admit(
152        &self,
153        key: Key,
154        data: T,
155        weight: u16,
156        ignore_learner: bool,
157        buckets: &Buckets<T>,
158    ) -> Vec<KV<T>> {
159        // Note that we only use Uno during cache admission but not cache read.
160        // So effectively we mostly sketch the popularity of less popular assets.
161        // In this way the sketch is a bit more accurate on these assets.
162        // Also we don't need another separated window cache to address the sparse burst issue as
163        // this sketch doesn't favor very popular assets much.
164        assert!(weight > 0);
165        let new_bucket = {
166            let Some((uses, queue, weight)) = buckets.get_map(&key, |bucket| {
167                // the item exists, in case weight changes
168                let old_weight = bucket.weight;
169                let uses = bucket.uses.uses();
170                fn update_atomic(weight: &AtomicUsize, old: u16, new: u16) {
171                    if old == new {
172                        return;
173                    }
174                    if old > new {
175                        weight.fetch_sub((old - new) as usize, SeqCst);
176                    } else {
177                        weight.fetch_add((new - old) as usize, SeqCst);
178                    }
179                }
180                let queue = bucket.queue.is_main();
181                if queue == MAIN {
182                    update_atomic(&self.main_weight, old_weight, weight);
183                } else {
184                    update_atomic(&self.small_weight, old_weight, weight);
185                }
186                (uses, queue, weight)
187            }) else {
188                let mut evicted = self.evict_to_limit(weight, buckets);
189                let (key, data, weight) = if !ignore_learner && evicted.len() == 1 {
190                    // Apply the admission algorithm of Uno: compare the incoming new item
191                    // and the evicted one. The more popular one is admitted to cachet
192                    self.estimator.tick();
193                    let evicted_first = &evicted[0];
194                    let evicted_metrics = self.estimator.get(evicted_first.key);
195                    let evicted_pred = self.learner.predict(evicted_metrics.0, evicted_metrics.1);
196                    let new_metrics = self.estimator.get(key);
197                    let new_pred = self.learner.predict(new_metrics.0, new_metrics.1);
198                    if evicted_pred > new_pred {
199                        // put it back
200                        let first = evicted.pop().expect("just check non-empty");
201                        // return the put value
202                        evicted.push(KV { key, data, weight });
203                        if evicted_pred < 0.0 {
204                            self.learner
205                                .weight_update(evicted_metrics.0, evicted_metrics.1, true);
206                        }
207                        self.estimator.incr_freq_only(key);
208                        (first.key, first.data, first.weight)
209                    } else {
210                        if evicted_pred > 0.0 {
211                            self.learner
212                                .weight_update(evicted_metrics.0, evicted_metrics.1, false);
213                        }
214                        self.estimator.incr_freq_only(key);
215                        self.estimator.incr_reuse_only(key);
216                        (key, data, weight)
217                    }
218                } else {
219                    self.estimator.incr(key);
220                    (key, data, weight)
221                };
222
223                // Almost the same criterion as ghost.
224                let metrics = self.estimator.get(key);
225                let check_main =
226                    metrics.0 > 2 && (metrics.0 > 3 || metrics.1 < self.learner.scale_factor * 8);
227
228                let location = Location::new_small();
229                let count = Uses::default();
230                if check_main {
231                    location.move_to_main();
232                }
233
234                let bucket = Bucket {
235                    queue: location,
236                    weight,
237                    uses: count, // 0 for small, 1 for main
238                    data,
239                };
240                let old = buckets.insert(key, bucket);
241                if old.is_none() {
242                    // Always push key first before updating weight
243                    // If doing the other order, another concurrent thread might not
244                    // find things to evict
245                    if check_main {
246                        self.main.push(key);
247                        self.main_weight.fetch_add(weight as usize, SeqCst);
248                    } else {
249                        self.small.push(key);
250                        self.small_weight.fetch_add(weight as usize, SeqCst);
251                    }
252                } // else: two threads are racing adding the item
253                return evicted;
254            };
255            Bucket {
256                queue: Location(queue.into()),
257                weight,
258                uses: Uses(uses.into()),
259                data,
260            }
261        };
262
263        // replace the existing one
264        buckets.insert(key, new_bucket);
265
266        // NOTE: there is a chance that the item itself is evicted if it happens to be the one selected
267        // by the algorithm. We could avoid this by checking if the item is in the returned evicted items,
268        // and then add it back. But to keep the code simple we just allow it to happen.
269        self.evict_to_limit(0, buckets)
270    }
271
272    // the `extra_weight` is to essentially tell the cache to reserve that amount of weight for
273    // admission. It is used when calling `evict_to_limit` before admitting the asset itself.
274    fn evict_to_limit(&self, extra_weight: Weight, buckets: &Buckets<T>) -> Vec<KV<T>> {
275        let mut evicted = if self.total_weight_limit
276            < self.small_weight.load(SeqCst) + self.main_weight.load(SeqCst) + extra_weight as usize
277        {
278            Vec::with_capacity(1)
279        } else {
280            vec![]
281        };
282        if self.total_weight_limit
283            < self.small_weight.load(SeqCst) + self.main_weight.load(SeqCst) + extra_weight as usize
284        {
285            if let Some(evicted_item) = self.evict_one(buckets) {
286                evicted.push(evicted_item);
287            }
288        }
289
290        evicted
291    }
292
293    fn evict_one(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
294        let evict_small = self.small_weight_limit() <= self.small_weight.load(SeqCst);
295
296        if evict_small {
297            let evicted = self.evict_one_from_small(buckets);
298            // evict_one_from_small could just promote everything to main without evicting any
299            // so need to evict_one_from_main if nothing evicted
300            if evicted.is_some() {
301                return evicted;
302            }
303        }
304        self.evict_one_from_main(buckets)
305    }
306
307    fn small_weight_limit(&self) -> usize {
308        (self.total_weight_limit as f32 * SMALL_QUEUE_PERCENTAGE).floor() as usize + 1
309    }
310
311    fn evict_one_from_small(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
312        loop {
313            let Some(to_evict) = self.small.pop() else {
314                // empty queue, this is caught between another pop() and fetch_sub()
315                return None;
316            };
317
318            let v = buckets
319                .get_map(&to_evict, |bucket| {
320                    let weight = bucket.weight;
321                    self.small_weight.fetch_sub(weight as usize, SeqCst);
322
323                    if bucket.uses.uses() > 1 {
324                        // move to main
325                        bucket.queue.move_to_main();
326                        self.main.push(to_evict);
327                        self.main_weight.fetch_add(weight as usize, SeqCst);
328                        // continue until find one to evict
329                        None
330                    } else {
331                        let data = bucket.data.clone();
332                        let weight = bucket.weight;
333                        buckets.remove(&to_evict);
334                        Some(KV {
335                            key: to_evict,
336                            data,
337                            weight,
338                        })
339                    }
340                })
341                .flatten();
342            if v.is_some() {
343                // found the one to evict, break
344                return v;
345            }
346        }
347    }
348
349    fn evict_one_from_main(&self, buckets: &Buckets<T>) -> Option<KV<T>> {
350        loop {
351            let to_evict = self.main.pop()?;
352
353            if let Some(v) = buckets
354                .get_map(&to_evict, |bucket| {
355                    if bucket.uses.decr_uses() > 0 {
356                        // put it back
357                        self.main.push(to_evict);
358                        // continue the loop
359                        None
360                    } else {
361                        // evict
362                        let weight = bucket.weight;
363                        self.main_weight.fetch_sub(weight as usize, SeqCst);
364                        let data = bucket.data.clone();
365                        buckets.remove(&to_evict);
366                        Some(KV {
367                            key: to_evict,
368                            data,
369                            weight,
370                        })
371                    }
372                })
373                .flatten()
374            {
375                // found the one to evict, break
376                return Some(v);
377            }
378        }
379    }
380}
381
382/// [Breve] cache
383pub struct Breve<K, T> {
384    queues: FiFoQueues<T>,
385    buckets: Buckets<T>,
386    random_status: RandomState,
387    _k: PhantomData<K>,
388}
389impl<K: Hash, T: Clone + Send + Sync + 'static> Breve<K, T> {
390    /// Create a new breve cache with the given weight limit and the given
391    /// size limit of the ghost queue.
392    pub fn new(total_weight_limit: usize, estimated_size: usize, learning_rate: f32) -> Self {
393        let queues = FiFoQueues {
394            small: SegQueue::new(),
395            small_weight: 0.into(),
396            main: SegQueue::new(),
397            main_weight: 0.into(),
398            total_weight_limit,
399            estimator: UnoSketch::new(estimated_size),
400            learner: UnoLearner::new(learning_rate, estimated_size),
401            _t: PhantomData,
402        };
403        Breve {
404            queues,
405            buckets: Buckets::new_fast(estimated_size),
406            random_status: RandomState::new(),
407            _k: PhantomData,
408        }
409    }
410
411    /// Create a new breve cache but with more memory efficient data structures.
412    /// The trade-off is that the the get() is slower by a constant factor.
413    /// The cache hit ratio could be higher as this type of breve allows to store
414    /// more assets with the same memory.
415    pub fn new_compact(
416        total_weight_limit: usize,
417        estimated_size: usize,
418        learning_rate: f32,
419    ) -> Self {
420        let queues = FiFoQueues {
421            small: SegQueue::new(),
422            small_weight: 0.into(),
423            main: SegQueue::new(),
424            main_weight: 0.into(),
425            total_weight_limit,
426            estimator: UnoSketch::new_compact(
427                (estimated_size as f32 * SMALL_QUEUE_PERCENTAGE) as usize,
428            ),
429            learner: UnoLearner::new(
430                learning_rate,
431                (estimated_size as f32 * SMALL_QUEUE_PERCENTAGE) as usize,
432            ),
433            _t: PhantomData,
434        };
435        Breve {
436            queues,
437            buckets: Buckets::new_compact(estimated_size, 32),
438            random_status: RandomState::new(),
439            _k: PhantomData,
440        }
441    }
442
443    // TODO: with_capacity()
444
445    /// Read the given key
446    ///
447    /// Return Some(T) if the key exists
448    pub fn get(&self, key: &K) -> Option<T> {
449        let key = self.random_status.hash_one(key);
450        self.buckets.get_map(&key, |p| {
451            p.uses.inc_uses();
452            p.data.clone()
453        })
454    }
455
456    /// Put the key value to the [breve]
457    ///
458    /// Return a list of [KV] of key and `T` that are evicted
459    pub fn put(&self, key: K, data: T, weight: Weight) -> Vec<KV<T>> {
460        let key = self.random_status.hash_one(&key);
461        self.queues.admit(key, data, weight, false, &self.buckets)
462    }
463
464    pub fn remove(&self, key: K) {
465        let key = self.random_status.hash_one(key);
466        self.buckets.get_map(&key, |p| {
467            if p.queue.is_main() {
468                self.queues.main_weight.fetch_sub(p.weight as usize, SeqCst);
469            } else {
470                self.queues
471                    .small_weight
472                    .fetch_sub(p.weight as usize, SeqCst);
473            }
474        });
475
476        //estimator should not be updated due to collision possibility
477        self.buckets.remove(&key);
478    }
479
480    /// Always put the key value to the [breve]
481    ///
482    /// Return a list of [KV] of key and `T` that are evicted
483    ///
484    /// Similar to [Self::put] but guarantee the assertion of the asset.
485    /// In [Self::put], the TinyLFU check may reject putting the current asset if it is less
486    /// popular than the once being evicted.
487    ///
488    /// In some real world use cases, a few reads to the same asset may be pending for the put action
489    /// to be finished so that they can read the asset from cache. Neither the above behaviors are ideal
490    /// for this use case.
491    ///
492    /// Compared to [Self::put], the hit ratio when using this function is reduced by about 0.5pp or less in
493    /// under zipf workloads.
494    pub fn force_put(&self, key: K, data: T, weight: Weight) -> Vec<KV<T>> {
495        let key = self.random_status.hash_one(&key);
496        self.queues.admit(key, data, weight, true, &self.buckets)
497    }
498}