xfetch/lib.rs
1#![deny(missing_docs)]
2//! This crate implements the XFetch probabilistic early expiration algorithm.
3//!
4//! It can be used in conjunction with cache containers like LRU cache
5//! to implement cache expiration and re-computation in parallel
6//! environment like multi-thread / multi-process computing.
7//!
8//! It is very efficient because the algorithm does not need
9//! coordination (no locks) between processes.
10//!
11//! # Examples
12//!
13//! Create a single cache entry and test it's expiration:
14//!
15//! ```rust
16//! # struct SomeValue { value: u64, ttl: u64 };
17//! # fn expensive_computation() -> SomeValue { SomeValue { value: 42, ttl: 10000 } }
18//! use xfetch::CacheEntry;
19//! use std::time::Duration;
20//!
21//! let entry = CacheEntry::builder(|| {
22//! expensive_computation()
23//! })
24//! .with_ttl(|value| {
25//! Duration::from_millis(value.ttl)
26//! })
27//! .build();
28//!
29//! assert!(!entry.is_expired());
30//! ```
31//!
32//! The [CacheEntry](struct.CacheEntry.html) can be used with any cache library.
33//! For example the `lru` crate:
34//!
35//! ```rust
36//! use lru::LruCache;
37//! use xfetch::CacheEntry;
38//! use std::time::Duration;
39//!
40//! struct SomeValue {
41//! value: u64,
42//! ttl: u64
43//! };
44//!
45//! fn recompute_value(n: u64) -> SomeValue {
46//! SomeValue { value: n, ttl: 10000 }
47//! }
48//!
49//! let mut cache = LruCache::new(2);
50//!
51//! cache.put("apple", CacheEntry::builder(|| recompute_value(3))
52//! .with_ttl(|v| Duration::from_millis(v.ttl))
53//! .build());
54//! cache.put("banana", CacheEntry::builder(|| recompute_value(2))
55//! .with_ttl(|v| Duration::from_millis(v.ttl))
56//! .build());
57//!
58//! if let Some(entry) = cache.get(&"apple") {
59//! if !entry.is_expired() {
60//! assert_eq!(entry.get().value, 3);
61//! } else {
62//! cache.put("apple", CacheEntry::builder(|| recompute_value(3))
63//! .with_ttl(|v| Duration::from_millis(v.ttl))
64//! .build());
65//! }
66//! }
67//! ```
68//!
69//! # The Algorithm
70//!
71//! Cascading failure can occur when massively parallel computing
72//! systems with caching mechanisms come under very high load.
73//!
74//! Under normal load, cache misses will trigger a recomputation to refresh the
75//! cache. Other process or thread can continue as before.
76//!
77//! Under heavy load, cache misses may trigger multipre process / threads trying
78//! to refresh content thus add more loading to the resource source which the
79//! cache was meant to reduce the loading.
80//!
81//! Several approaches can be used to mitigate this issue. The algorithm
82//! used here is proposed by Vattani, A.; Chierichetti, F.; Lowenstein, K.
83//! (2015) in the paper [Optimal Probabilistic Cache Stampede Prevention][vldb].
84//!
85//! The idea is any worker can volunteer to recompute the value before it
86//! expires. With a probability that increases when the cache entry approaches
87//! expiration, each worker may recompute the cache by making an independent
88//! decision. The effect of the cache stampede is mitigated as fewer workers
89//! will expire at the same time.
90//!
91//! The following is the algorithm pseudo code:
92//!
93//! ```ignore
94//! function XFetch(key, ttl; beta = 1)
95//! value, delta, expiry <- cache_read(key)
96//! if !value or time() - delta * beta * ln(rand()) >= expiry then
97//! start <- time()
98//! value <- recompute_value()
99//! delta <- time() - start
100//! cache_write(key, (value, delta), ttl)
101//! end
102//! return value
103//! end
104//! ```
105//!
106//! # Parameters
107//!
108//! The parameter **beta** can be set to greater than `1.0` to favor earlier
109//! recomputation or lesser to favor later. The default `1.0` is optimal for
110//! most use cases.
111//!
112//! `rand()` is a random number in the range (0, 1].
113//!
114//! **delta** is the time required for the recomputation. If it takes longer to
115//! recompute then the algorithm will also favor earlier recomputation.
116//!
117//! # References
118//!
119//! - Wikipedia [Cache Stampede][wikipedia].
120//! - Vattani, A.; Chierichetti, F.; Lowenstein, K. (2015), [Optimal
121//! Probabilistic Cache Stampede Prevention][vldb] (PDF), 8 (8), VLDB, pp. 886–897,
122//! ISSN 2150-8097.
123//! - Jim Nelson, Internet Archive, [RedisConf17 - Preventing cache stampede with Redis & XFetch][archive].
124//!
125//! [vldb]: http://www.vldb.org/pvldb/vol8/p886-vattani.pdf
126//! [wikipedia]: https://en.wikipedia.org/wiki/Cache_stampede
127//! [archive]: https://www.slideshare.net/RedisLabs/redisconf17-internet-archive-preventing-cache-stampede-with-redis-and-xfetch
128
129use rand::{distributions::OpenClosed01, thread_rng, Rng, RngCore};
130use std::time::{Duration, Instant};
131
132const DEFAULT_BETA: f32 = 1.0;
133
134/// The builder for building [CacheEntry](struct.CacheEntry.html) with
135/// supplied parameters.
136pub struct CacheEntryBuilder<T> {
137 value: T,
138 delta: Duration,
139 beta: f32,
140 expiry: Option<Instant>,
141}
142
143impl<T> CacheEntryBuilder<T> {
144 /// Return a new [CacheEntryBuilder](struct.CacheEntryBuilder.html).
145 ///
146 /// This method takes a closure which should return the value to be cached.
147 pub fn new<F>(f: F) -> CacheEntryBuilder<T>
148 where
149 F: FnOnce() -> T,
150 {
151 let start = Instant::now();
152 let value = f();
153 let recompute_time = start.elapsed();
154 CacheEntryBuilder {
155 value,
156 delta: recompute_time,
157 beta: DEFAULT_BETA,
158 expiry: None,
159 }
160 }
161
162 /// Set the delta.
163 ///
164 /// Usually the delta value is mesured from the time took by the
165 /// recomputation function. However, if the recomputation function does not
166 /// reflect the actual time required (for example, a asynchronous
167 /// computation), then the delta value can be set via this method.
168 ///
169 /// The reference of the value returned by the recomputation function is
170 /// passed to the closure.
171 pub fn with_delta<F>(mut self, f: F) -> CacheEntryBuilder<T>
172 where
173 F: FnOnce(&T) -> Duration,
174 {
175 self.delta = f(&self.value);
176 self
177 }
178
179 /// Set the ttl.
180 ///
181 /// The reference of the value returned by the recomputation function is
182 /// passed to the closure.
183 ///
184 /// If the ttl is not set then the cache entry will become a eternal cache
185 /// entry that will never expire.
186 pub fn with_ttl<F>(mut self, f: F) -> CacheEntryBuilder<T>
187 where
188 F: FnOnce(&T) -> Duration,
189 {
190 self.expiry = Some(Instant::now() + f(&self.value));
191 self
192 }
193
194 /// Set the beta value.
195 ///
196 /// Beta value > `1.0` favors more eager early expiration, value < `1.0`
197 /// favors lazier early expiration.
198 ///
199 /// The default value `1.0` is usually the optimal value for most use cases.
200 pub fn with_beta(mut self, beta: f32) -> CacheEntryBuilder<T> {
201 self.beta = beta;
202 self
203 }
204
205 /// Return a new [CacheEntry](struct.CacheEntry.html) with the supplied
206 /// parameters.
207 pub fn build(self) -> CacheEntry<T> {
208 CacheEntry {
209 value: self.value,
210 delta: self.delta,
211 beta: self.beta,
212 expiry: self.expiry,
213 }
214 }
215}
216
217/// A cache entry that employs probabilistic early expiration
218///
219/// # Examples
220///
221/// In this example, you can see how to create a new cache entry. The value of
222/// the entry is passed in as a closure so the time required for recomputation
223/// can be measured. The time to expiration can be set by chaining the
224/// [`with_ttl()`](struct.CacheEntryBuilder.html#method.with_ttl) method.
225///
226/// ```
227/// use std::time::Duration;
228/// use xfetch::CacheEntry;
229///
230/// let entry = CacheEntry::builder(|| 42)
231/// .with_ttl(|_| Duration::from_secs(10))
232/// .build();
233/// ```
234///
235/// See the [module-level documentation](index.html) for more information.
236#[derive(Copy, Clone)]
237pub struct CacheEntry<T> {
238 value: T,
239 delta: Duration,
240 beta: f32,
241 expiry: Option<Instant>,
242}
243
244impl<T> CacheEntry<T> {
245 /// Return a new [CacheEntryBuilder](struct.CacheEntryBuilder.html).
246 ///
247 /// This method takes a closure which should return the value to be cached.
248 pub fn builder<F>(f: F) -> CacheEntryBuilder<T>
249 where
250 F: FnOnce() -> T,
251 {
252 CacheEntryBuilder::new(f)
253 }
254
255 fn is_expired_with_rng(&self, rng: &mut dyn RngCore) -> bool {
256 match self.expiry {
257 Some(expiry) => {
258 let now = Instant::now();
259 let delta = self.delta.as_millis() as f32;
260 let rand: f32 = rng.sample(OpenClosed01);
261 let xfetch = Duration::from_millis((delta * self.beta * -rand.ln()).round() as u64);
262 (now + xfetch) >= expiry
263 }
264 None => false,
265 }
266 }
267
268 /// Check whether the cache has expired or not.
269 ///
270 /// With probabilstic early expiration, this method may return `true` before
271 /// the entry is really expired.
272 pub fn is_expired(&self) -> bool {
273 self.is_expired_with_rng(&mut thread_rng())
274 }
275
276 /// Check if the cache entry will never expire.
277 ///
278 /// If the cache entry is created without setting time to expiration then it
279 /// is a eternal cache entry.
280 pub fn is_eternal(&self) -> bool {
281 self.expiry.is_none()
282 }
283
284 /// Returns a reference of the contained value.
285 pub fn get(&self) -> &T {
286 &self.value
287 }
288
289 /// Unwraps the value.
290 pub fn into_inner(self) -> T {
291 self.value
292 }
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use rand::rngs::mock::StepRng;
299
300 #[test]
301 fn test_new_entry() {
302 let entry = CacheEntry::builder(|| ()).build();
303 assert_eq!(*entry.get(), ());
304 assert_eq!(entry.into_inner(), ());
305 assert!(entry.is_eternal());
306 assert_eq!(entry.beta, DEFAULT_BETA);
307 }
308
309 #[test]
310 fn test_new_entry_with_ttl() {
311 let entry = CacheEntry::builder(|| ())
312 .with_ttl(|_| Duration::from_secs(60))
313 .build();
314 assert_eq!(*entry.get(), ());
315 assert!(entry.expiry.is_some());
316 }
317
318 #[test]
319 fn test_new_entry_with_beta() {
320 let entry = CacheEntry::builder(|| ()).with_beta(0.9).build();
321 assert_eq!(*entry.get(), ());
322 assert_eq!(entry.beta, 0.9);
323 }
324
325 #[test]
326 fn test_early_expiry() {
327 let mut zeros = StepRng::new(0, 0);
328 let entry = CacheEntry::builder(|| ())
329 .with_delta(|_| Duration::from_secs(10))
330 .with_ttl(|_| Duration::from_secs(120))
331 .build();
332 assert!(entry.is_expired_with_rng(&mut zeros));
333 }
334
335 #[test]
336 fn test_no_early_expiry() {
337 let mut max = StepRng::new(!0, 0);
338 let entry = CacheEntry::builder(|| ())
339 .with_delta(|_| Duration::from_secs(10))
340 .with_ttl(|_| Duration::from_secs(120))
341 .build();
342 assert!(!entry.is_expired_with_rng(&mut max));
343 }
344}