retainer/cache.rs
1//! Caching structures for use in an asynchronous context.
2//!
3//! The main point of this module is the `Cache` type, which offers a small
4//! implementation of a cache with time based expiration support. The underlying
5//! structure is nothing more than a map wrapped inside some asynchronous locking
6//! mechanisms to avoid blocking the entire async runtime when waiting for a handle.
7//!
8//! The eviction algorithm has been based on Redis, and essentially just samples
9//! the entry set on an interval to prune the inner tree over time. More information
10//! on how this works can be seen on the `monitor` method of the `Cache` type.
11use std::cmp;
12use std::collections::{BTreeMap, BTreeSet};
13use std::marker::PhantomData;
14use std::time::{Duration, Instant};
15
16use async_lock::{RwLock, RwLockUpgradableReadGuard};
17use async_timer::Interval;
18use log::{debug, log_enabled, trace, Level};
19use rand::prelude::*;
20
21use crate::entry::{CacheEntry, CacheExpiration, CacheReadGuard};
22
23// Define small private macro to unpack entry references.
24macro_rules! unpack {
25 ($entry: expr) => {
26 if $entry.expiration().is_expired() {
27 None
28 } else {
29 Some($entry)
30 }
31 };
32}
33
34/// Basic caching structure with asynchronous locking support.
35///
36/// This structure provides asynchronous access wrapped around a standard
37/// `BTreeMap` to avoid blocking event loops when a writer cannot gain a
38/// handle - which is what would happen with standard locking implementations.
39pub struct Cache<K, V> {
40 store: RwLock<BTreeMap<K, CacheEntry<V>>>,
41 label: String,
42}
43
44impl<K, V> Cache<K, V>
45where
46 K: Ord + Clone,
47{
48 /// Construct a new `Cache`.
49 pub fn new() -> Self {
50 Self {
51 store: RwLock::new(BTreeMap::new()),
52 label: "".to_owned(),
53 }
54 }
55
56 /// Sets the label inside this cache for logging purposes.
57 pub fn with_label(mut self, s: &str) -> Self {
58 self.label = format!("cache({}): ", s);
59 self
60 }
61
62 /// Remove all entries from the cache.
63 pub async fn clear(&self) {
64 self.store.write().await.clear()
65 }
66
67 /// Retrieve the number of expired entries inside the cache.
68 ///
69 /// Note that this is calculated by walking the set of entries and
70 /// should therefore not be used in performance sensitive situations.
71 pub async fn expired(&self) -> usize {
72 self.store
73 .read()
74 .await
75 .iter()
76 .filter(|(_, entry)| entry.expiration().is_expired())
77 .count()
78 }
79
80 /// Retrieve a reference to a value inside the cache.
81 ///
82 /// The returned reference is bound inside a `RwLockReadGuard`.
83 pub async fn get(&self, k: &K) -> Option<CacheReadGuard<'_, V>> {
84 let guard = self.store.read().await;
85 let found = guard.get(k)?;
86 let valid = unpack!(found)?;
87
88 Some(CacheReadGuard {
89 entry: valid,
90 marker: PhantomData,
91 })
92 }
93
94 /// Retrieve the number of entries inside the cache.
95 ///
96 /// This *does* include entries which may be expired but are not yet evicted. In
97 /// future there may be an API addition to find the unexpired count, but as it's
98 /// relatively expensive it has been omitted for the time being.
99 pub async fn len(&self) -> usize {
100 self.store.read().await.len()
101 }
102
103 /// Insert a key/value pair into the cache with an associated expiration.
104 ///
105 /// The third argument controls expiration, which can be provided using any type which
106 /// implements `Into<CacheExpiration>`. This allows for various different syntax based
107 /// on your use case. If you do not want expiration, use `CacheExpiration::none()`.
108 pub async fn insert<E>(&self, k: K, v: V, e: E) -> Option<V>
109 where
110 E: Into<CacheExpiration>,
111 {
112 let entry = CacheEntry::new(v, e.into());
113 self.store
114 .write()
115 .await
116 .insert(k, entry)
117 .and_then(|entry| unpack!(entry))
118 .map(CacheEntry::into_inner)
119 }
120
121 /// Check whether the cache is empty.
122 pub async fn is_empty(&self) -> bool {
123 self.store.read().await.is_empty()
124 }
125
126 /// Retrieve a `Future` used to monitor expired keys.
127 ///
128 /// This future must be spawned on whatever runtime you are using inside your
129 /// application; not doing this will result in keys never being expired.
130 ///
131 /// For expiration logic, please see `Cache::purge`, as this is used under the hood.
132 pub async fn monitor(&self, sample: usize, threshold: f64, frequency: Duration) {
133 let mut interval = Interval::platform_new(frequency);
134 loop {
135 interval.as_mut().await;
136 self.purge(sample, threshold).await;
137 }
138 }
139
140 /// Cleanses the cache of expired entries.
141 ///
142 /// Keys are expired using the same logic as the popular caching system Redis:
143 ///
144 /// 1. Wait until the next tick of `frequency`.
145 /// 2. Take a sample of `sample` keys from the cache.
146 /// 3. Remove any expired keys from the sample.
147 /// 4. Based on `threshold` percentage:
148 /// 4a. If more than `threshold` were expired, goto #2.
149 /// 4b. If less than `threshold` were expired, goto #1.
150 ///
151 /// This means that at any point you may have up to `threshold` percent of your
152 /// cache storing expired entries (assuming the monitor just ran), so make sure
153 /// to tune your frequency, sample size, and threshold accordingly.
154 pub async fn purge(&self, sample: usize, threshold: f64) {
155 let start = Instant::now();
156
157 let mut locked = Duration::from_nanos(0);
158 let mut removed = 0;
159
160 loop {
161 // lock the store and grab a generator
162 let store = self.store.upgradable_read().await;
163
164 // once we're empty, no point carrying on
165 if store.is_empty() {
166 break;
167 }
168
169 // determine the sample size of the batch
170 let total = store.len();
171 let sample = cmp::min(sample, total);
172
173 // counter to track removed keys
174 let mut gone = 0;
175
176 // create our temporary key store and index tree
177 let mut keys = Vec::with_capacity(sample);
178 let mut indices: BTreeSet<usize> = BTreeSet::new();
179
180 {
181 // fetch `sample` keys at random
182 let mut rng = rand::thread_rng();
183 while indices.len() < sample {
184 indices.insert(rng.gen_range(0..total));
185 }
186 }
187
188 {
189 // tracker for previous index
190 let mut prev = 0;
191
192 // boxed iterator to allow us to iterate a single time for all indices
193 let mut iter: Box<dyn Iterator<Item = (&K, &CacheEntry<V>)>> =
194 Box::new(store.iter());
195
196 // walk our index list
197 for idx in indices {
198 // calculate how much we need to shift the iterator
199 let offset = idx
200 .checked_sub(prev)
201 .and_then(|idx| idx.checked_sub(1))
202 .unwrap_or(0);
203
204 // shift and mark the current index
205 iter = Box::new(iter.skip(offset));
206 prev = idx;
207
208 // fetch the next pair (at our index)
209 let (key, entry) = iter.next().unwrap();
210
211 // skip if not expired
212 if !entry.expiration().is_expired() {
213 continue;
214 }
215
216 // otherwise mark for removal
217 keys.push(key.to_owned());
218
219 // and increment remove count
220 gone += 1;
221 }
222 }
223
224 {
225 // upgrade to a write guard so that we can make our changes
226 let acquired = Instant::now();
227 let mut store = RwLockUpgradableReadGuard::upgrade(store).await;
228
229 // remove all expired keys
230 for key in &keys {
231 store.remove(key);
232 }
233
234 // increment the lock timer tracking directly
235 locked = locked.checked_add(acquired.elapsed()).unwrap();
236 }
237
238 // log out now many of the sampled keys were removed
239 if log_enabled!(Level::Trace) {
240 trace!(
241 "{}removed {} / {} ({:.2}%) of the sampled keys",
242 self.label,
243 gone,
244 sample,
245 (gone as f64 / sample as f64) * 100f64,
246 );
247 }
248
249 // bump total remove count
250 removed += gone;
251
252 // break the loop if we don't meet thresholds
253 if (gone as f64) < (sample as f64 * threshold) {
254 break;
255 }
256 }
257
258 // log out the completion as well as the time taken in millis
259 if log_enabled!(Level::Debug) {
260 debug!(
261 "{}purge loop removed {} entries in {:.0?} ({:.0?} locked)",
262 self.label,
263 removed,
264 start.elapsed(),
265 locked
266 );
267 }
268 }
269
270 /// Remove an entry from the cache and return any stored value.
271 pub async fn remove(&self, k: &K) -> Option<V> {
272 self.store
273 .write()
274 .await
275 .remove(k)
276 .and_then(|entry| unpack!(entry))
277 .map(CacheEntry::into_inner)
278 }
279
280 /// Retrieve the number of unexpired entries inside the cache.
281 ///
282 /// Note that this is calculated by walking the set of entries and
283 /// should therefore not be used in performance sensitive situations.
284 pub async fn unexpired(&self) -> usize {
285 self.store
286 .read()
287 .await
288 .iter()
289 .filter(|(_, entry)| !entry.expiration().is_expired())
290 .count()
291 }
292
293 /// Updates an entry in the cache without changing the expiration.
294 pub async fn update<F>(&self, k: &K, f: F)
295 where
296 F: FnOnce(&mut V),
297 {
298 let mut guard = self.store.write().await;
299 if let Some(entry) = guard.get_mut(k).and_then(|entry| unpack!(entry)) {
300 f(entry.value_mut());
301 }
302 }
303}
304
305/// Default implementation.
306impl<K, V> Default for Cache<K, V>
307where
308 K: Ord + Clone,
309{
310 fn default() -> Self {
311 Cache::new()
312 }
313}