retainer/
cache.rs

1//! Caching structures for use in an asynchronous context.
2//!
3//! The main point of this module is the `Cache` type, which offers a small
4//! implementation of a cache with time based expiration  support. The underlying
5//! structure is nothing more than a map wrapped inside some asynchronous locking
6//! mechanisms to avoid blocking the entire async runtime when waiting for a handle.
7//!
8//! The eviction algorithm has been based on Redis, and essentially just samples
9//! the entry set on an interval to prune the inner tree over time. More information
10//! on how this works can be seen on the `monitor` method of the `Cache` type.
11use std::cmp;
12use std::collections::{BTreeMap, BTreeSet};
13use std::marker::PhantomData;
14use std::time::{Duration, Instant};
15
16use async_lock::{RwLock, RwLockUpgradableReadGuard};
17use async_timer::Interval;
18use log::{debug, log_enabled, trace, Level};
19use rand::prelude::*;
20
21use crate::entry::{CacheEntry, CacheExpiration, CacheReadGuard};
22
23// Define small private macro to unpack entry references.
24macro_rules! unpack {
25    ($entry: expr) => {
26        if $entry.expiration().is_expired() {
27            None
28        } else {
29            Some($entry)
30        }
31    };
32}
33
34/// Basic caching structure with asynchronous locking support.
35///
36/// This structure provides asynchronous access wrapped around a standard
37/// `BTreeMap` to avoid blocking event loops when a writer cannot gain a
38/// handle - which is what would happen with standard locking implementations.
39pub struct Cache<K, V> {
40    store: RwLock<BTreeMap<K, CacheEntry<V>>>,
41    label: String,
42}
43
44impl<K, V> Cache<K, V>
45where
46    K: Ord + Clone,
47{
48    /// Construct a new `Cache`.
49    pub fn new() -> Self {
50        Self {
51            store: RwLock::new(BTreeMap::new()),
52            label: "".to_owned(),
53        }
54    }
55
56    /// Sets the label inside this cache for logging purposes.
57    pub fn with_label(mut self, s: &str) -> Self {
58        self.label = format!("cache({}): ", s);
59        self
60    }
61
62    /// Remove all entries from the cache.
63    pub async fn clear(&self) {
64        self.store.write().await.clear()
65    }
66
67    /// Retrieve the number of expired entries inside the cache.
68    ///
69    /// Note that this is calculated by walking the set of entries and
70    /// should therefore not be used in performance sensitive situations.
71    pub async fn expired(&self) -> usize {
72        self.store
73            .read()
74            .await
75            .iter()
76            .filter(|(_, entry)| entry.expiration().is_expired())
77            .count()
78    }
79
80    /// Retrieve a reference to a value inside the cache.
81    ///
82    /// The returned reference is bound inside a `RwLockReadGuard`.
83    pub async fn get(&self, k: &K) -> Option<CacheReadGuard<'_, V>> {
84        let guard = self.store.read().await;
85        let found = guard.get(k)?;
86        let valid = unpack!(found)?;
87
88        Some(CacheReadGuard {
89            entry: valid,
90            marker: PhantomData,
91        })
92    }
93
94    /// Retrieve the number of entries inside the cache.
95    ///
96    /// This *does* include entries which may be expired but are not yet evicted. In
97    /// future there may be an API addition to find the unexpired count, but as it's
98    /// relatively expensive it has been omitted for the time being.
99    pub async fn len(&self) -> usize {
100        self.store.read().await.len()
101    }
102
103    /// Insert a key/value pair into the cache with an associated expiration.
104    ///
105    /// The third argument controls expiration, which can be provided using any type which
106    /// implements `Into<CacheExpiration>`. This allows for various different syntax based
107    /// on your use case. If you do not want expiration, use `CacheExpiration::none()`.
108    pub async fn insert<E>(&self, k: K, v: V, e: E) -> Option<V>
109    where
110        E: Into<CacheExpiration>,
111    {
112        let entry = CacheEntry::new(v, e.into());
113        self.store
114            .write()
115            .await
116            .insert(k, entry)
117            .and_then(|entry| unpack!(entry))
118            .map(CacheEntry::into_inner)
119    }
120
121    /// Check whether the cache is empty.
122    pub async fn is_empty(&self) -> bool {
123        self.store.read().await.is_empty()
124    }
125
126    /// Retrieve a `Future` used to monitor expired keys.
127    ///
128    /// This future must be spawned on whatever runtime you are using inside your
129    /// application; not doing this will result in keys never being expired.
130    ///
131    /// For expiration logic, please see `Cache::purge`, as this is used under the hood.
132    pub async fn monitor(&self, sample: usize, threshold: f64, frequency: Duration) {
133        let mut interval = Interval::platform_new(frequency);
134        loop {
135            interval.as_mut().await;
136            self.purge(sample, threshold).await;
137        }
138    }
139
140    /// Cleanses the cache of expired entries.
141    ///
142    /// Keys are expired using the same logic as the popular caching system Redis:
143    ///
144    /// 1. Wait until the next tick of `frequency`.
145    /// 2. Take a sample of `sample` keys from the cache.
146    /// 3. Remove any expired keys from the sample.
147    /// 4. Based on `threshold` percentage:
148    ///     4a. If more than `threshold` were expired, goto #2.
149    ///     4b. If less than `threshold` were expired, goto #1.
150    ///
151    /// This means that at any point you may have up to `threshold` percent of your
152    /// cache storing expired entries (assuming the monitor just ran), so make sure
153    /// to tune your frequency, sample size, and threshold accordingly.
154    pub async fn purge(&self, sample: usize, threshold: f64) {
155        let start = Instant::now();
156
157        let mut locked = Duration::from_nanos(0);
158        let mut removed = 0;
159
160        loop {
161            // lock the store and grab a generator
162            let store = self.store.upgradable_read().await;
163
164            // once we're empty, no point carrying on
165            if store.is_empty() {
166                break;
167            }
168
169            // determine the sample size of the batch
170            let total = store.len();
171            let sample = cmp::min(sample, total);
172
173            // counter to track removed keys
174            let mut gone = 0;
175
176            // create our temporary key store and index tree
177            let mut keys = Vec::with_capacity(sample);
178            let mut indices: BTreeSet<usize> = BTreeSet::new();
179
180            {
181                // fetch `sample` keys at random
182                let mut rng = rand::thread_rng();
183                while indices.len() < sample {
184                    indices.insert(rng.gen_range(0..total));
185                }
186            }
187
188            {
189                // tracker for previous index
190                let mut prev = 0;
191
192                // boxed iterator to allow us to iterate a single time for all indices
193                let mut iter: Box<dyn Iterator<Item = (&K, &CacheEntry<V>)>> =
194                    Box::new(store.iter());
195
196                // walk our index list
197                for idx in indices {
198                    // calculate how much we need to shift the iterator
199                    let offset = idx
200                        .checked_sub(prev)
201                        .and_then(|idx| idx.checked_sub(1))
202                        .unwrap_or(0);
203
204                    // shift and mark the current index
205                    iter = Box::new(iter.skip(offset));
206                    prev = idx;
207
208                    // fetch the next pair (at our index)
209                    let (key, entry) = iter.next().unwrap();
210
211                    // skip if not expired
212                    if !entry.expiration().is_expired() {
213                        continue;
214                    }
215
216                    // otherwise mark for removal
217                    keys.push(key.to_owned());
218
219                    // and increment remove count
220                    gone += 1;
221                }
222            }
223
224            {
225                // upgrade to a write guard so that we can make our changes
226                let acquired = Instant::now();
227                let mut store = RwLockUpgradableReadGuard::upgrade(store).await;
228
229                // remove all expired keys
230                for key in &keys {
231                    store.remove(key);
232                }
233
234                // increment the lock timer tracking directly
235                locked = locked.checked_add(acquired.elapsed()).unwrap();
236            }
237
238            // log out now many of the sampled keys were removed
239            if log_enabled!(Level::Trace) {
240                trace!(
241                    "{}removed {} / {} ({:.2}%) of the sampled keys",
242                    self.label,
243                    gone,
244                    sample,
245                    (gone as f64 / sample as f64) * 100f64,
246                );
247            }
248
249            // bump total remove count
250            removed += gone;
251
252            // break the loop if we don't meet thresholds
253            if (gone as f64) < (sample as f64 * threshold) {
254                break;
255            }
256        }
257
258        // log out the completion as well as the time taken in millis
259        if log_enabled!(Level::Debug) {
260            debug!(
261                "{}purge loop removed {} entries in {:.0?} ({:.0?} locked)",
262                self.label,
263                removed,
264                start.elapsed(),
265                locked
266            );
267        }
268    }
269
270    /// Remove an entry from the cache and return any stored value.
271    pub async fn remove(&self, k: &K) -> Option<V> {
272        self.store
273            .write()
274            .await
275            .remove(k)
276            .and_then(|entry| unpack!(entry))
277            .map(CacheEntry::into_inner)
278    }
279
280    /// Retrieve the number of unexpired entries inside the cache.
281    ///
282    /// Note that this is calculated by walking the set of entries and
283    /// should therefore not be used in performance sensitive situations.
284    pub async fn unexpired(&self) -> usize {
285        self.store
286            .read()
287            .await
288            .iter()
289            .filter(|(_, entry)| !entry.expiration().is_expired())
290            .count()
291    }
292
293    /// Updates an entry in the cache without changing the expiration.
294    pub async fn update<F>(&self, k: &K, f: F)
295    where
296        F: FnOnce(&mut V),
297    {
298        let mut guard = self.store.write().await;
299        if let Some(entry) = guard.get_mut(k).and_then(|entry| unpack!(entry)) {
300            f(entry.value_mut());
301        }
302    }
303}
304
305/// Default implementation.
306impl<K, V> Default for Cache<K, V>
307where
308    K: Ord + Clone,
309{
310    fn default() -> Self {
311        Cache::new()
312    }
313}