xfetch/
lib.rs

1#![deny(missing_docs)]
2//! This crate implements the XFetch probabilistic early expiration algorithm.
3//!
4//! It can be used in conjunction with cache containers like LRU cache
5//! to implement cache expiration and re-computation in parallel
6//! environment like multi-thread / multi-process computing.
7//!
8//! It is very efficient because the algorithm does not need
9//! coordination (no locks) between processes.
10//!
11//! # Examples
12//!
13//! Create a single cache entry and test it's expiration:
14//!
15//! ```rust
16//! # struct SomeValue { value: u64, ttl: u64 };
17//! # fn expensive_computation() -> SomeValue { SomeValue { value: 42, ttl: 10000 } }
18//! use xfetch::CacheEntry;
19//! use std::time::Duration;
20//!
21//! let entry = CacheEntry::builder(|| {
22//!     expensive_computation()
23//! })
24//! .with_ttl(|value| {
25//!     Duration::from_millis(value.ttl)
26//! })
27//! .build();
28//!
29//! assert!(!entry.is_expired());
30//! ```
31//!
32//! The [CacheEntry](struct.CacheEntry.html) can be used with any cache library.
33//! For example the `lru` crate:
34//!
35//! ```rust
36//! use lru::LruCache;
37//! use xfetch::CacheEntry;
38//! use std::time::Duration;
39//!
40//! struct SomeValue {
41//!     value: u64,
42//!     ttl: u64
43//! };
44//!
45//! fn recompute_value(n: u64) -> SomeValue {
46//!     SomeValue { value: n, ttl: 10000 }
47//! }
48//!
49//! let mut cache = LruCache::new(2);
50//!
51//! cache.put("apple", CacheEntry::builder(|| recompute_value(3))
52//!     .with_ttl(|v| Duration::from_millis(v.ttl))
53//!     .build());
54//! cache.put("banana", CacheEntry::builder(|| recompute_value(2))
55//!     .with_ttl(|v| Duration::from_millis(v.ttl))
56//!     .build());
57//!
58//! if let Some(entry) = cache.get(&"apple") {
59//!     if !entry.is_expired() {
60//!         assert_eq!(entry.get().value, 3);
61//!     } else {
62//!         cache.put("apple", CacheEntry::builder(|| recompute_value(3))
63//!             .with_ttl(|v| Duration::from_millis(v.ttl))
64//!             .build());
65//!     }
66//! }
67//! ```
68//!
69//! # The Algorithm
70//!
71//! Cascading failure can occur when massively parallel computing
72//! systems with caching mechanisms come under very high load.
73//!
74//! Under normal load, cache misses will trigger a recomputation to refresh the
75//! cache. Other process or thread can continue as before.
76//!
77//! Under heavy load, cache misses may trigger multipre process / threads trying
78//! to refresh content thus add more loading to the resource source which the
79//! cache was meant to reduce the loading.
80//!
81//! Several approaches can be used to mitigate this issue. The algorithm
82//! used here is proposed by Vattani, A.; Chierichetti, F.; Lowenstein, K.
83//! (2015) in the paper [Optimal Probabilistic Cache Stampede Prevention][vldb].
84//!
85//! The idea is any worker can volunteer to recompute the value before it
86//! expires. With a probability that increases when the cache entry approaches
87//! expiration, each worker may recompute the cache by making an independent
88//! decision. The effect of the cache stampede is mitigated as fewer workers
89//! will expire at the same time.
90//!
91//! The following is the algorithm pseudo code:
92//!
93//! ```ignore
94//! function XFetch(key, ttl; beta = 1)
95//!     value, delta, expiry <- cache_read(key)
96//!     if !value or time() - delta * beta * ln(rand()) >= expiry then
97//!         start <- time()
98//!         value <- recompute_value()
99//!         delta <- time() - start
100//!         cache_write(key, (value, delta), ttl)
101//!     end
102//!     return value
103//! end
104//! ```
105//!
106//! # Parameters
107//!
108//! The parameter **beta** can be set to greater than `1.0` to favor earlier
109//! recomputation or lesser to favor later. The default `1.0` is optimal for
110//! most use cases.
111//!
112//! `rand()` is a random number in the range (0, 1].
113//!
114//! **delta** is the time required for the recomputation. If it takes longer to
115//! recompute then the algorithm will also favor earlier recomputation.
116//!
117//! # References
118//!
119//! - Wikipedia [Cache Stampede][wikipedia].
120//! - Vattani, A.; Chierichetti, F.; Lowenstein, K. (2015), [Optimal
121//!   Probabilistic Cache Stampede Prevention][vldb] (PDF), 8 (8), VLDB, pp. 886–897,
122//!   ISSN 2150-8097.
123//! - Jim Nelson, Internet Archive, [RedisConf17 - Preventing cache stampede with Redis & XFetch][archive].
124//!
125//! [vldb]: http://www.vldb.org/pvldb/vol8/p886-vattani.pdf
126//! [wikipedia]: https://en.wikipedia.org/wiki/Cache_stampede
127//! [archive]: https://www.slideshare.net/RedisLabs/redisconf17-internet-archive-preventing-cache-stampede-with-redis-and-xfetch
128
129use rand::{distributions::OpenClosed01, thread_rng, Rng, RngCore};
130use std::time::{Duration, Instant};
131
132const DEFAULT_BETA: f32 = 1.0;
133
134/// The builder for building [CacheEntry](struct.CacheEntry.html) with
135/// supplied parameters.
136pub struct CacheEntryBuilder<T> {
137    value: T,
138    delta: Duration,
139    beta: f32,
140    expiry: Option<Instant>,
141}
142
143impl<T> CacheEntryBuilder<T> {
144    /// Return a new [CacheEntryBuilder](struct.CacheEntryBuilder.html).
145    ///
146    /// This method takes a closure which should return the value to be cached.
147    pub fn new<F>(f: F) -> CacheEntryBuilder<T>
148    where
149        F: FnOnce() -> T,
150    {
151        let start = Instant::now();
152        let value = f();
153        let recompute_time = start.elapsed();
154        CacheEntryBuilder {
155            value,
156            delta: recompute_time,
157            beta: DEFAULT_BETA,
158            expiry: None,
159        }
160    }
161
162    /// Set the delta.
163    ///
164    /// Usually the delta value is mesured from the time took by the
165    /// recomputation function. However, if the recomputation function does not
166    /// reflect the actual time required (for example, a asynchronous
167    /// computation), then the delta value can be set via this method.
168    ///
169    /// The reference of the value returned by the recomputation function is
170    /// passed to the closure.
171    pub fn with_delta<F>(mut self, f: F) -> CacheEntryBuilder<T>
172    where
173        F: FnOnce(&T) -> Duration,
174    {
175        self.delta = f(&self.value);
176        self
177    }
178
179    /// Set the ttl.
180    ///
181    /// The reference of the value returned by the recomputation function is
182    /// passed to the closure.
183    ///
184    /// If the ttl is not set then the cache entry will become a eternal cache
185    /// entry that will never expire.
186    pub fn with_ttl<F>(mut self, f: F) -> CacheEntryBuilder<T>
187    where
188        F: FnOnce(&T) -> Duration,
189    {
190        self.expiry = Some(Instant::now() + f(&self.value));
191        self
192    }
193
194    /// Set the beta value.
195    ///
196    /// Beta value > `1.0` favors more eager early expiration, value < `1.0`
197    /// favors lazier early expiration.
198    ///
199    /// The default value `1.0` is usually the optimal value for most use cases.
200    pub fn with_beta(mut self, beta: f32) -> CacheEntryBuilder<T> {
201        self.beta = beta;
202        self
203    }
204
205    /// Return a new [CacheEntry](struct.CacheEntry.html) with the supplied
206    /// parameters.
207    pub fn build(self) -> CacheEntry<T> {
208        CacheEntry {
209            value: self.value,
210            delta: self.delta,
211            beta: self.beta,
212            expiry: self.expiry,
213        }
214    }
215}
216
217/// A cache entry that employs probabilistic early expiration
218///
219/// # Examples
220///
221/// In this example, you can see how to create a new cache entry. The value of
222/// the entry is passed in as a closure so the time required for recomputation
223/// can be measured. The time to expiration can be set by chaining the
224/// [`with_ttl()`](struct.CacheEntryBuilder.html#method.with_ttl) method.
225///
226/// ```
227/// use std::time::Duration;
228/// use xfetch::CacheEntry;
229///
230/// let entry = CacheEntry::builder(|| 42)
231///     .with_ttl(|_| Duration::from_secs(10))
232///     .build();
233/// ```
234///
235/// See the [module-level documentation](index.html) for more information.
236#[derive(Copy, Clone)]
237pub struct CacheEntry<T> {
238    value: T,
239    delta: Duration,
240    beta: f32,
241    expiry: Option<Instant>,
242}
243
244impl<T> CacheEntry<T> {
245    /// Return a new [CacheEntryBuilder](struct.CacheEntryBuilder.html).
246    ///
247    /// This method takes a closure which should return the value to be cached.
248    pub fn builder<F>(f: F) -> CacheEntryBuilder<T>
249    where
250        F: FnOnce() -> T,
251    {
252        CacheEntryBuilder::new(f)
253    }
254
255    fn is_expired_with_rng(&self, rng: &mut dyn RngCore) -> bool {
256        match self.expiry {
257            Some(expiry) => {
258                let now = Instant::now();
259                let delta = self.delta.as_millis() as f32;
260                let rand: f32 = rng.sample(OpenClosed01);
261                let xfetch = Duration::from_millis((delta * self.beta * -rand.ln()).round() as u64);
262                (now + xfetch) >= expiry
263            }
264            None => false,
265        }
266    }
267
268    /// Check whether the cache has expired or not.
269    ///
270    /// With probabilstic early expiration, this method may return `true` before
271    /// the entry is really expired.
272    pub fn is_expired(&self) -> bool {
273        self.is_expired_with_rng(&mut thread_rng())
274    }
275
276    /// Check if the cache entry will never expire.
277    ///
278    /// If the cache entry is created without setting time to expiration then it
279    /// is a eternal cache entry.
280    pub fn is_eternal(&self) -> bool {
281        self.expiry.is_none()
282    }
283
284    /// Returns a reference of the contained value.
285    pub fn get(&self) -> &T {
286        &self.value
287    }
288
289    /// Unwraps the value.
290    pub fn into_inner(self) -> T {
291        self.value
292    }
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use rand::rngs::mock::StepRng;
299
300    #[test]
301    fn test_new_entry() {
302        let entry = CacheEntry::builder(|| ()).build();
303        assert_eq!(*entry.get(), ());
304        assert_eq!(entry.into_inner(), ());
305        assert!(entry.is_eternal());
306        assert_eq!(entry.beta, DEFAULT_BETA);
307    }
308
309    #[test]
310    fn test_new_entry_with_ttl() {
311        let entry = CacheEntry::builder(|| ())
312            .with_ttl(|_| Duration::from_secs(60))
313            .build();
314        assert_eq!(*entry.get(), ());
315        assert!(entry.expiry.is_some());
316    }
317
318    #[test]
319    fn test_new_entry_with_beta() {
320        let entry = CacheEntry::builder(|| ()).with_beta(0.9).build();
321        assert_eq!(*entry.get(), ());
322        assert_eq!(entry.beta, 0.9);
323    }
324
325    #[test]
326    fn test_early_expiry() {
327        let mut zeros = StepRng::new(0, 0);
328        let entry = CacheEntry::builder(|| ())
329            .with_delta(|_| Duration::from_secs(10))
330            .with_ttl(|_| Duration::from_secs(120))
331            .build();
332        assert!(entry.is_expired_with_rng(&mut zeros));
333    }
334
335    #[test]
336    fn test_no_early_expiry() {
337        let mut max = StepRng::new(!0, 0);
338        let entry = CacheEntry::builder(|| ())
339            .with_delta(|_| Duration::from_secs(10))
340            .with_ttl(|_| Duration::from_secs(120))
341            .build();
342        assert!(!entry.is_expired_with_rng(&mut max));
343    }
344}