pingora_cache/eviction/
simple_lru.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A simple LRU cache manager built on top of the `lru` crate
16
17use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use lru::LruCache;
22use parking_lot::RwLock;
23use pingora_error::{BError, ErrorType::*, OrErr, Result};
24use serde::de::SeqAccess;
25use serde::{Deserialize, Serialize};
26use std::collections::hash_map::DefaultHasher;
27use std::fs::File;
28use std::hash::{Hash, Hasher};
29use std::io::prelude::*;
30use std::path::Path;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::time::SystemTime;
33
34#[derive(Debug, Deserialize, Serialize)]
35struct Node {
36    key: CompactCacheKey,
37    size: usize,
38}
39
40/// A simple LRU eviction manager
41///
42/// The implementation is not optimized. All operations require global locks.
43pub struct Manager {
44    lru: RwLock<LruCache<u64, Node>>,
45    limit: usize,
46    used: AtomicUsize,
47    items: AtomicUsize,
48    evicted_size: AtomicUsize,
49    evicted_items: AtomicUsize,
50}
51
52impl Manager {
53    /// Create a new [Manager] with the given total size limit `limit`.
54    pub fn new(limit: usize) -> Self {
55        Manager {
56            lru: RwLock::new(LruCache::unbounded()),
57            limit,
58            used: AtomicUsize::new(0),
59            items: AtomicUsize::new(0),
60            evicted_size: AtomicUsize::new(0),
61            evicted_items: AtomicUsize::new(0),
62        }
63    }
64
65    fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
66        use std::cmp::Ordering::*;
67        let node = Node { key: node, size };
68        let old = {
69            let mut lru = self.lru.write();
70            let old = lru.push(hash_key, node);
71            if reverse && old.is_none() {
72                lru.demote(&hash_key);
73            }
74            old
75        };
76        if let Some(old) = old {
77            // replacing a node, just need to update used size
78            match size.cmp(&old.1.size) {
79                Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
80                Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
81                Equal => 0, // same size, update nothing, use 0 to match other arms' type
82            };
83        } else {
84            self.used.fetch_add(size, Ordering::Relaxed);
85            self.items.fetch_add(1, Ordering::Relaxed);
86        }
87    }
88
89    fn increase_weight(&self, key: u64, delta: usize) {
90        let mut lru = self.lru.write();
91        let Some(node) = lru.get_key_value_mut(&key) else {
92            return;
93        };
94        node.1.size += delta;
95        self.used.fetch_add(delta, Ordering::Relaxed);
96    }
97
98    // evict items until the used capacity is below the limit
99    fn evict(&self) -> Vec<CompactCacheKey> {
100        if self.used.load(Ordering::Relaxed) <= self.limit {
101            return vec![];
102        }
103        let mut to_evict = Vec::with_capacity(1); // we will at least pop 1 item
104        while self.used.load(Ordering::Relaxed) > self.limit {
105            if let Some((_, node)) = self.lru.write().pop_lru() {
106                self.used.fetch_sub(node.size, Ordering::Relaxed);
107                self.items.fetch_sub(1, Ordering::Relaxed);
108                self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
109                self.evicted_items.fetch_add(1, Ordering::Relaxed);
110                to_evict.push(node.key);
111            } else {
112                // lru empty
113                return to_evict;
114            }
115        }
116        to_evict
117    }
118
119    // This could use a lot of memory to buffer the serialized data in memory and could lock the LRU
120    // for too long
121    fn serialize(&self) -> Result<Vec<u8>> {
122        use rmp_serde::encode::Serializer;
123        use serde::ser::SerializeSeq;
124        use serde::ser::Serializer as _;
125        // NOTE: This could use a lot of memory to buffer the serialized data in memory
126        let mut ser = Serializer::new(vec![]);
127        // NOTE: This long for loop could lock the LRU for too long
128        let lru = self.lru.read();
129        let mut seq = ser
130            .serialize_seq(Some(lru.len()))
131            .or_err(InternalError, "fail to serialize node")?;
132        for item in lru.iter() {
133            seq.serialize_element(item.1).unwrap(); // write to vec, safe
134        }
135        seq.end().or_err(InternalError, "when serializing LRU")?;
136        Ok(ser.into_inner())
137    }
138
139    fn deserialize(&self, buf: &[u8]) -> Result<()> {
140        use rmp_serde::decode::Deserializer;
141        use serde::de::Deserializer as _;
142        let mut de = Deserializer::new(buf);
143        let visitor = InsertToManager { lru: self };
144        de.deserialize_seq(visitor)
145            .or_err(InternalError, "when deserializing LRU")?;
146        Ok(())
147    }
148}
149
150struct InsertToManager<'a> {
151    lru: &'a Manager,
152}
153
154impl<'de> serde::de::Visitor<'de> for InsertToManager<'_> {
155    type Value = ();
156
157    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
158        formatter.write_str("array of lru nodes")
159    }
160
161    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
162    where
163        A: SeqAccess<'de>,
164    {
165        while let Some(node) = seq.next_element::<Node>()? {
166            let key = u64key(&node.key);
167            self.lru.insert(key, node.key, node.size, true); // insert in the back
168        }
169        Ok(())
170    }
171}
172
173#[inline]
174fn u64key(key: &CompactCacheKey) -> u64 {
175    let mut hasher = DefaultHasher::new();
176    key.hash(&mut hasher);
177    hasher.finish()
178}
179
180const FILE_NAME: &str = "simple_lru.data";
181
182#[async_trait]
183impl EvictionManager for Manager {
184    fn total_size(&self) -> usize {
185        self.used.load(Ordering::Relaxed)
186    }
187    fn total_items(&self) -> usize {
188        self.items.load(Ordering::Relaxed)
189    }
190    fn evicted_size(&self) -> usize {
191        self.evicted_size.load(Ordering::Relaxed)
192    }
193    fn evicted_items(&self) -> usize {
194        self.evicted_items.load(Ordering::Relaxed)
195    }
196
197    fn admit(
198        &self,
199        item: CompactCacheKey,
200        size: usize,
201        _fresh_until: SystemTime,
202    ) -> Vec<CompactCacheKey> {
203        let key = u64key(&item);
204        self.insert(key, item, size, false);
205        self.evict()
206    }
207
208    fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
209        let key = u64key(&item);
210        self.increase_weight(key, delta);
211        self.evict()
212    }
213
214    fn remove(&self, item: &CompactCacheKey) {
215        let key = u64key(item);
216        let node = self.lru.write().pop(&key);
217        if let Some(n) = node {
218            self.used.fetch_sub(n.size, Ordering::Relaxed);
219            self.items.fetch_sub(1, Ordering::Relaxed);
220        }
221    }
222
223    fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
224        let key = u64key(item);
225        if self.lru.write().get(&key).is_none() {
226            self.insert(key, item.clone(), size, false);
227            false
228        } else {
229            true
230        }
231    }
232
233    fn peek(&self, item: &CompactCacheKey) -> bool {
234        let key = u64key(item);
235        self.lru.read().peek(&key).is_some()
236    }
237
238    async fn save(&self, dir_path: &str) -> Result<()> {
239        let data = self.serialize()?;
240        let dir_str = dir_path.to_owned();
241        tokio::task::spawn_blocking(move || {
242            let dir_path = Path::new(&dir_str);
243            std::fs::create_dir_all(dir_path)
244                .or_err_with(InternalError, || format!("fail to create {dir_str}"))?;
245            let file_path = dir_path.join(FILE_NAME);
246            let mut file = File::create(&file_path).or_err_with(InternalError, || {
247                format!("fail to create {}", file_path.display())
248            })?;
249            file.write_all(&data).or_err_with(InternalError, || {
250                format!("fail to write to {}", file_path.display())
251            })
252        })
253        .await
254        .or_err(InternalError, "async blocking IO failure")?
255    }
256
257    async fn load(&self, dir_path: &str) -> Result<()> {
258        let dir_path = dir_path.to_owned();
259        let data = tokio::task::spawn_blocking(move || {
260            let file_path = Path::new(&dir_path).join(FILE_NAME);
261            let mut file = File::open(file_path.clone()).or_err_with(InternalError, || {
262                format!("fail to open {}", file_path.display())
263            })?;
264            let mut buffer = Vec::with_capacity(8192);
265            file.read_to_end(&mut buffer)
266                .or_err(InternalError, "fail to read from {file_path}")?;
267            Ok::<Vec<u8>, BError>(buffer)
268        })
269        .await
270        .or_err(InternalError, "async blocking IO failure")??;
271        self.deserialize(&data)
272    }
273}
274
275#[cfg(test)]
276mod test {
277    use super::*;
278    use crate::CacheKey;
279
280    #[test]
281    fn test_admission() {
282        let lru = Manager::new(4);
283        let key1 = CacheKey::new("", "a", "1").to_compact();
284        let until = SystemTime::now(); // unused value as a placeholder
285        let v = lru.admit(key1.clone(), 1, until);
286        assert_eq!(v.len(), 0);
287        let key2 = CacheKey::new("", "b", "1").to_compact();
288        let v = lru.admit(key2.clone(), 2, until);
289        assert_eq!(v.len(), 0);
290        let key3 = CacheKey::new("", "c", "1").to_compact();
291        let v = lru.admit(key3, 1, until);
292        assert_eq!(v.len(), 0);
293
294        // lru si full (4) now
295
296        let key4 = CacheKey::new("", "d", "1").to_compact();
297        let v = lru.admit(key4, 2, until);
298        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
299        assert_eq!(v.len(), 2);
300        assert_eq!(v[0], key1);
301        assert_eq!(v[1], key2);
302    }
303
304    #[test]
305    fn test_access() {
306        let lru = Manager::new(4);
307        let key1 = CacheKey::new("", "a", "1").to_compact();
308        let until = SystemTime::now(); // unused value as a placeholder
309        let v = lru.admit(key1.clone(), 1, until);
310        assert_eq!(v.len(), 0);
311        let key2 = CacheKey::new("", "b", "1").to_compact();
312        let v = lru.admit(key2.clone(), 2, until);
313        assert_eq!(v.len(), 0);
314        let key3 = CacheKey::new("", "c", "1").to_compact();
315        let v = lru.admit(key3, 1, until);
316        assert_eq!(v.len(), 0);
317
318        // lru is full (4) now
319        // make key1 most recently used
320        lru.access(&key1, 1, until);
321        assert_eq!(v.len(), 0);
322
323        let key4 = CacheKey::new("", "d", "1").to_compact();
324        let v = lru.admit(key4, 2, until);
325        assert_eq!(v.len(), 1);
326        assert_eq!(v[0], key2);
327    }
328
329    #[test]
330    fn test_remove() {
331        let lru = Manager::new(4);
332        let key1 = CacheKey::new("", "a", "1").to_compact();
333        let until = SystemTime::now(); // unused value as a placeholder
334        let v = lru.admit(key1.clone(), 1, until);
335        assert_eq!(v.len(), 0);
336        let key2 = CacheKey::new("", "b", "1").to_compact();
337        let v = lru.admit(key2.clone(), 2, until);
338        assert_eq!(v.len(), 0);
339        let key3 = CacheKey::new("", "c", "1").to_compact();
340        let v = lru.admit(key3, 1, until);
341        assert_eq!(v.len(), 0);
342
343        // lru is full (4) now
344        // remove key1
345        lru.remove(&key1);
346
347        // key2 is the least recently used one now
348        let key4 = CacheKey::new("", "d", "1").to_compact();
349        let v = lru.admit(key4, 2, until);
350        assert_eq!(v.len(), 1);
351        assert_eq!(v[0], key2);
352    }
353
354    #[test]
355    fn test_access_add() {
356        let lru = Manager::new(4);
357        let until = SystemTime::now(); // unused value as a placeholder
358
359        let key1 = CacheKey::new("", "a", "1").to_compact();
360        lru.access(&key1, 1, until);
361        let key2 = CacheKey::new("", "b", "1").to_compact();
362        lru.access(&key2, 2, until);
363        let key3 = CacheKey::new("", "c", "1").to_compact();
364        lru.access(&key3, 2, until);
365
366        let key4 = CacheKey::new("", "d", "1").to_compact();
367        let v = lru.admit(key4, 2, until);
368        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
369        assert_eq!(v.len(), 2);
370        assert_eq!(v[0], key1);
371        assert_eq!(v[1], key2);
372    }
373
374    #[test]
375    fn test_admit_update() {
376        let lru = Manager::new(4);
377        let key1 = CacheKey::new("", "a", "1").to_compact();
378        let until = SystemTime::now(); // unused value as a placeholder
379        let v = lru.admit(key1.clone(), 1, until);
380        assert_eq!(v.len(), 0);
381        let key2 = CacheKey::new("", "b", "1").to_compact();
382        let v = lru.admit(key2.clone(), 2, until);
383        assert_eq!(v.len(), 0);
384        let key3 = CacheKey::new("", "c", "1").to_compact();
385        let v = lru.admit(key3, 1, until);
386        assert_eq!(v.len(), 0);
387
388        // lru is full (4) now
389        // update key2 to reduce its size by 1
390        let v = lru.admit(key2, 1, until);
391        assert_eq!(v.len(), 0);
392
393        // lru is not full anymore
394        let key4 = CacheKey::new("", "d", "1").to_compact();
395        let v = lru.admit(key4.clone(), 1, until);
396        assert_eq!(v.len(), 0);
397
398        // make key4 larger
399        let v = lru.admit(key4, 2, until);
400        // need to evict now
401        assert_eq!(v.len(), 1);
402        assert_eq!(v[0], key1);
403    }
404
405    #[test]
406    fn test_serde() {
407        let lru = Manager::new(4);
408        let key1 = CacheKey::new("", "a", "1").to_compact();
409        let until = SystemTime::now(); // unused value as a placeholder
410        let v = lru.admit(key1.clone(), 1, until);
411        assert_eq!(v.len(), 0);
412        let key2 = CacheKey::new("", "b", "1").to_compact();
413        let v = lru.admit(key2.clone(), 2, until);
414        assert_eq!(v.len(), 0);
415        let key3 = CacheKey::new("", "c", "1").to_compact();
416        let v = lru.admit(key3, 1, until);
417        assert_eq!(v.len(), 0);
418
419        // lru is full (4) now
420        // make key1 most recently used
421        lru.access(&key1, 1, until);
422        assert_eq!(v.len(), 0);
423
424        // load lru2 with lru's data
425        let ser = lru.serialize().unwrap();
426        let lru2 = Manager::new(4);
427        lru2.deserialize(&ser).unwrap();
428
429        let key4 = CacheKey::new("", "d", "1").to_compact();
430        let v = lru2.admit(key4, 2, until);
431        assert_eq!(v.len(), 1);
432        assert_eq!(v[0], key2);
433    }
434
435    #[tokio::test]
436    async fn test_save_to_disk() {
437        let lru = Manager::new(4);
438        let key1 = CacheKey::new("", "a", "1").to_compact();
439        let until = SystemTime::now(); // unused value as a placeholder
440        let v = lru.admit(key1.clone(), 1, until);
441        assert_eq!(v.len(), 0);
442        let key2 = CacheKey::new("", "b", "1").to_compact();
443        let v = lru.admit(key2.clone(), 2, until);
444        assert_eq!(v.len(), 0);
445        let key3 = CacheKey::new("", "c", "1").to_compact();
446        let v = lru.admit(key3, 1, until);
447        assert_eq!(v.len(), 0);
448
449        // lru is full (4) now
450        // make key1 most recently used
451        lru.access(&key1, 1, until);
452        assert_eq!(v.len(), 0);
453
454        // load lru2 with lru's data
455        lru.save("/tmp/test_simple_lru_save").await.unwrap();
456        let lru2 = Manager::new(4);
457        lru2.load("/tmp/test_simple_lru_save").await.unwrap();
458
459        let key4 = CacheKey::new("", "d", "1").to_compact();
460        let v = lru2.admit(key4, 2, until);
461        assert_eq!(v.len(), 1);
462        assert_eq!(v[0], key2);
463    }
464}