Skip to main content

pingora_cache/eviction/
simple_lru.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A simple LRU cache manager built on top of the `lru` crate
16
17use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use lru::LruCache;
22use parking_lot::RwLock;
23use pingora_error::{BError, ErrorType::*, OrErr, Result};
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::collections::hash_map::DefaultHasher;
28use std::fs::File;
29use std::hash::{Hash, Hasher};
30use std::io::prelude::*;
31use std::path::Path;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::time::SystemTime;
34
35#[derive(Debug, Deserialize, Serialize)]
36struct Node {
37    key: CompactCacheKey,
38    size: usize,
39}
40
41/// A simple LRU eviction manager
42///
43/// The implementation is not optimized. All operations require global locks.
44pub struct Manager {
45    lru: RwLock<LruCache<u64, Node>>,
46    limit: usize,
47    items_watermark: Option<usize>,
48    used: AtomicUsize,
49    items: AtomicUsize,
50    evicted_size: AtomicUsize,
51    evicted_items: AtomicUsize,
52}
53
54impl Manager {
55    /// Create a new [Manager] with the given total size limit `limit`.
56    pub fn new(limit: usize) -> Self {
57        Manager {
58            lru: RwLock::new(LruCache::unbounded()),
59            limit,
60            items_watermark: None,
61            used: AtomicUsize::new(0),
62            items: AtomicUsize::new(0),
63            evicted_size: AtomicUsize::new(0),
64            evicted_items: AtomicUsize::new(0),
65        }
66    }
67
68    /// Create a new [Manager] with optional watermark in addition to size limit `limit`.
69    pub fn new_with_watermark(limit: usize, items_watermark: Option<usize>) -> Self {
70        Manager {
71            lru: RwLock::new(LruCache::unbounded()),
72            limit,
73            items_watermark,
74            used: AtomicUsize::new(0),
75            items: AtomicUsize::new(0),
76            evicted_size: AtomicUsize::new(0),
77            evicted_items: AtomicUsize::new(0),
78        }
79    }
80
81    fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
82        use std::cmp::Ordering::*;
83        let node = Node { key: node, size };
84        let old = {
85            let mut lru = self.lru.write();
86            let old = lru.push(hash_key, node);
87            if reverse && old.is_none() {
88                lru.demote(&hash_key);
89            }
90            old
91        };
92        if let Some(old) = old {
93            // replacing a node, just need to update used size
94            match size.cmp(&old.1.size) {
95                Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
96                Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
97                Equal => 0, // same size, update nothing, use 0 to match other arms' type
98            };
99        } else {
100            self.used.fetch_add(size, Ordering::Relaxed);
101            self.items.fetch_add(1, Ordering::Relaxed);
102        }
103    }
104
105    fn increase_weight(&self, key: u64, delta: usize) {
106        let mut lru = self.lru.write();
107        let Some(node) = lru.get_key_value_mut(&key) else {
108            return;
109        };
110        node.1.size += delta;
111        self.used.fetch_add(delta, Ordering::Relaxed);
112    }
113
114    #[inline]
115    fn over_limits(&self) -> bool {
116        self.used.load(Ordering::Relaxed) > self.limit
117            || self
118                .items_watermark
119                .is_some_and(|w| self.items.load(Ordering::Relaxed) > w)
120    }
121
122    // evict items until the used capacity is below the size limit and watermark count
123    fn evict(&self) -> Vec<CompactCacheKey> {
124        if self.used.load(Ordering::Relaxed) <= self.limit
125            && self
126                .items_watermark
127                .is_none_or(|w| self.items.load(Ordering::Relaxed) <= w)
128        {
129            return vec![];
130        }
131
132        let mut to_evict = Vec::with_capacity(1); // we will at least pop 1 item
133
134        while self.over_limits() {
135            if let Some((_, node)) = self.lru.write().pop_lru() {
136                self.used.fetch_sub(node.size, Ordering::Relaxed);
137                self.items.fetch_sub(1, Ordering::Relaxed);
138                self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
139                self.evicted_items.fetch_add(1, Ordering::Relaxed);
140                to_evict.push(node.key);
141            } else {
142                // lru empty
143                return to_evict;
144            }
145        }
146        to_evict
147    }
148
149    // This could use a lot of memory to buffer the serialized data in memory and could lock the LRU
150    // for too long
151    fn serialize(&self) -> Result<Vec<u8>> {
152        use rmp_serde::encode::Serializer;
153        use serde::ser::SerializeSeq;
154        use serde::ser::Serializer as _;
155        // NOTE: This could use a lot of memory to buffer the serialized data in memory
156        let mut ser = Serializer::new(vec![]);
157        // NOTE: This long for loop could lock the LRU for too long
158        let lru = self.lru.read();
159        let mut seq = ser
160            .serialize_seq(Some(lru.len()))
161            .or_err(InternalError, "fail to serialize node")?;
162        for item in lru.iter() {
163            seq.serialize_element(item.1).unwrap(); // write to vec, safe
164        }
165        seq.end().or_err(InternalError, "when serializing LRU")?;
166        Ok(ser.into_inner())
167    }
168
169    fn deserialize(&self, buf: &[u8]) -> Result<()> {
170        use rmp_serde::decode::Deserializer;
171        use serde::de::Deserializer as _;
172        let mut de = Deserializer::new(buf);
173        let visitor = InsertToManager { lru: self };
174        de.deserialize_seq(visitor)
175            .or_err(InternalError, "when deserializing LRU")?;
176        Ok(())
177    }
178}
179
180struct InsertToManager<'a> {
181    lru: &'a Manager,
182}
183
184impl<'de> serde::de::Visitor<'de> for InsertToManager<'_> {
185    type Value = ();
186
187    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
188        formatter.write_str("array of lru nodes")
189    }
190
191    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
192    where
193        A: SeqAccess<'de>,
194    {
195        while let Some(node) = seq.next_element::<Node>()? {
196            let key = u64key(&node.key);
197            self.lru.insert(key, node.key, node.size, true); // insert in the back
198        }
199        Ok(())
200    }
201}
202
203#[inline]
204fn u64key(key: &CompactCacheKey) -> u64 {
205    let mut hasher = DefaultHasher::new();
206    key.hash(&mut hasher);
207    hasher.finish()
208}
209
210const FILE_NAME: &str = "simple_lru.data";
211
212#[async_trait]
213impl EvictionManager for Manager {
214    fn total_size(&self) -> usize {
215        self.used.load(Ordering::Relaxed)
216    }
217    fn total_items(&self) -> usize {
218        self.items.load(Ordering::Relaxed)
219    }
220    fn evicted_size(&self) -> usize {
221        self.evicted_size.load(Ordering::Relaxed)
222    }
223    fn evicted_items(&self) -> usize {
224        self.evicted_items.load(Ordering::Relaxed)
225    }
226
227    fn admit(
228        &self,
229        item: CompactCacheKey,
230        size: usize,
231        _fresh_until: SystemTime,
232    ) -> Vec<CompactCacheKey> {
233        let key = u64key(&item);
234        self.insert(key, item, size, false);
235        self.evict()
236    }
237
238    fn increment_weight(
239        &self,
240        item: &CompactCacheKey,
241        delta: usize,
242        _max_weight: Option<usize>,
243    ) -> Vec<CompactCacheKey> {
244        let key = u64key(item);
245        self.increase_weight(key, delta);
246        self.evict()
247    }
248
249    fn remove(&self, item: &CompactCacheKey) {
250        let key = u64key(item);
251        let node = self.lru.write().pop(&key);
252        if let Some(n) = node {
253            self.used.fetch_sub(n.size, Ordering::Relaxed);
254            self.items.fetch_sub(1, Ordering::Relaxed);
255        }
256    }
257
258    fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
259        let key = u64key(item);
260        if self.lru.write().get(&key).is_none() {
261            self.insert(key, item.clone(), size, false);
262            false
263        } else {
264            true
265        }
266    }
267
268    fn peek(&self, item: &CompactCacheKey) -> bool {
269        let key = u64key(item);
270        self.lru.read().peek(&key).is_some()
271    }
272
273    async fn save(&self, dir_path: &str) -> Result<()> {
274        let data = self.serialize()?;
275        let dir_str = dir_path.to_owned();
276        tokio::task::spawn_blocking(move || {
277            let dir_path = Path::new(&dir_str);
278            std::fs::create_dir_all(dir_path)
279                .or_err_with(InternalError, || format!("fail to create {dir_str}"))?;
280
281            let final_file_path = dir_path.join(FILE_NAME);
282            // create a temporary filename using a randomized u32 hash to minimize the chance of multiple writers writing to the same tmp file
283            let random_suffix: u32 = rand::thread_rng().gen();
284            let temp_file_path = dir_path.join(format!("{}.{:08x}.tmp", FILE_NAME, random_suffix));
285            let mut file = File::create(&temp_file_path).or_err_with(InternalError, || {
286                format!("fail to create temporary file {}", temp_file_path.display())
287            })?;
288            file.write_all(&data).or_err_with(InternalError, || {
289                format!("fail to write to {}", temp_file_path.display())
290            })?;
291            file.flush().or_err_with(InternalError, || {
292                format!("fail to flush temp file {}", temp_file_path.display())
293            })?;
294            std::fs::rename(&temp_file_path, &final_file_path).or_err_with(InternalError, || {
295                format!(
296                    "fail to rename temporary file {} to {}",
297                    temp_file_path.display(),
298                    final_file_path.display()
299                )
300            })
301        })
302        .await
303        .or_err(InternalError, "async blocking IO failure")?
304    }
305
306    async fn load(&self, dir_path: &str) -> Result<()> {
307        let dir_path = dir_path.to_owned();
308        let data = tokio::task::spawn_blocking(move || {
309            let file_path = Path::new(&dir_path).join(FILE_NAME);
310            let mut file = File::open(file_path.clone()).or_err_with(InternalError, || {
311                format!("fail to open {}", file_path.display())
312            })?;
313            let mut buffer = Vec::with_capacity(8192);
314            file.read_to_end(&mut buffer)
315                .or_err(InternalError, "fail to read from {file_path}")?;
316            Ok::<Vec<u8>, BError>(buffer)
317        })
318        .await
319        .or_err(InternalError, "async blocking IO failure")??;
320        self.deserialize(&data)
321    }
322}
323
324#[cfg(test)]
325mod test {
326    use super::*;
327    use crate::CacheKey;
328
329    #[test]
330    fn test_admission() {
331        let lru = Manager::new(4);
332        let key1 = CacheKey::new("", "a", "1").to_compact();
333        let until = SystemTime::now(); // unused value as a placeholder
334        let v = lru.admit(key1.clone(), 1, until);
335        assert_eq!(v.len(), 0);
336        let key2 = CacheKey::new("", "b", "1").to_compact();
337        let v = lru.admit(key2.clone(), 2, until);
338        assert_eq!(v.len(), 0);
339        let key3 = CacheKey::new("", "c", "1").to_compact();
340        let v = lru.admit(key3, 1, until);
341        assert_eq!(v.len(), 0);
342
343        // lru si full (4) now
344
345        let key4 = CacheKey::new("", "d", "1").to_compact();
346        let v = lru.admit(key4, 2, until);
347        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
348        assert_eq!(v.len(), 2);
349        assert_eq!(v[0], key1);
350        assert_eq!(v[1], key2);
351    }
352
353    #[test]
354    fn test_access() {
355        let lru = Manager::new(4);
356        let key1 = CacheKey::new("", "a", "1").to_compact();
357        let until = SystemTime::now(); // unused value as a placeholder
358        let v = lru.admit(key1.clone(), 1, until);
359        assert_eq!(v.len(), 0);
360        let key2 = CacheKey::new("", "b", "1").to_compact();
361        let v = lru.admit(key2.clone(), 2, until);
362        assert_eq!(v.len(), 0);
363        let key3 = CacheKey::new("", "c", "1").to_compact();
364        let v = lru.admit(key3, 1, until);
365        assert_eq!(v.len(), 0);
366
367        // lru is full (4) now
368        // make key1 most recently used
369        lru.access(&key1, 1, until);
370        assert_eq!(v.len(), 0);
371
372        let key4 = CacheKey::new("", "d", "1").to_compact();
373        let v = lru.admit(key4, 2, until);
374        assert_eq!(v.len(), 1);
375        assert_eq!(v[0], key2);
376    }
377
378    #[test]
379    fn test_remove() {
380        let lru = Manager::new(4);
381        let key1 = CacheKey::new("", "a", "1").to_compact();
382        let until = SystemTime::now(); // unused value as a placeholder
383        let v = lru.admit(key1.clone(), 1, until);
384        assert_eq!(v.len(), 0);
385        let key2 = CacheKey::new("", "b", "1").to_compact();
386        let v = lru.admit(key2.clone(), 2, until);
387        assert_eq!(v.len(), 0);
388        let key3 = CacheKey::new("", "c", "1").to_compact();
389        let v = lru.admit(key3, 1, until);
390        assert_eq!(v.len(), 0);
391
392        // lru is full (4) now
393        // remove key1
394        lru.remove(&key1);
395
396        // key2 is the least recently used one now
397        let key4 = CacheKey::new("", "d", "1").to_compact();
398        let v = lru.admit(key4, 2, until);
399        assert_eq!(v.len(), 1);
400        assert_eq!(v[0], key2);
401    }
402
403    #[test]
404    fn test_access_add() {
405        let lru = Manager::new(4);
406        let until = SystemTime::now(); // unused value as a placeholder
407
408        let key1 = CacheKey::new("", "a", "1").to_compact();
409        lru.access(&key1, 1, until);
410        let key2 = CacheKey::new("", "b", "1").to_compact();
411        lru.access(&key2, 2, until);
412        let key3 = CacheKey::new("", "c", "1").to_compact();
413        lru.access(&key3, 2, until);
414
415        let key4 = CacheKey::new("", "d", "1").to_compact();
416        let v = lru.admit(key4, 2, until);
417        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
418        assert_eq!(v.len(), 2);
419        assert_eq!(v[0], key1);
420        assert_eq!(v[1], key2);
421    }
422
423    #[test]
424    fn test_admit_update() {
425        let lru = Manager::new(4);
426        let key1 = CacheKey::new("", "a", "1").to_compact();
427        let until = SystemTime::now(); // unused value as a placeholder
428        let v = lru.admit(key1.clone(), 1, until);
429        assert_eq!(v.len(), 0);
430        let key2 = CacheKey::new("", "b", "1").to_compact();
431        let v = lru.admit(key2.clone(), 2, until);
432        assert_eq!(v.len(), 0);
433        let key3 = CacheKey::new("", "c", "1").to_compact();
434        let v = lru.admit(key3, 1, until);
435        assert_eq!(v.len(), 0);
436
437        // lru is full (4) now
438        // update key2 to reduce its size by 1
439        let v = lru.admit(key2, 1, until);
440        assert_eq!(v.len(), 0);
441
442        // lru is not full anymore
443        let key4 = CacheKey::new("", "d", "1").to_compact();
444        let v = lru.admit(key4.clone(), 1, until);
445        assert_eq!(v.len(), 0);
446
447        // make key4 larger
448        let v = lru.admit(key4, 2, until);
449        // need to evict now
450        assert_eq!(v.len(), 1);
451        assert_eq!(v[0], key1);
452    }
453
454    #[test]
455    fn test_serde() {
456        let lru = Manager::new(4);
457        let key1 = CacheKey::new("", "a", "1").to_compact();
458        let until = SystemTime::now(); // unused value as a placeholder
459        let v = lru.admit(key1.clone(), 1, until);
460        assert_eq!(v.len(), 0);
461        let key2 = CacheKey::new("", "b", "1").to_compact();
462        let v = lru.admit(key2.clone(), 2, until);
463        assert_eq!(v.len(), 0);
464        let key3 = CacheKey::new("", "c", "1").to_compact();
465        let v = lru.admit(key3, 1, until);
466        assert_eq!(v.len(), 0);
467
468        // lru is full (4) now
469        // make key1 most recently used
470        lru.access(&key1, 1, until);
471        assert_eq!(v.len(), 0);
472
473        // load lru2 with lru's data
474        let ser = lru.serialize().unwrap();
475        let lru2 = Manager::new(4);
476        lru2.deserialize(&ser).unwrap();
477
478        let key4 = CacheKey::new("", "d", "1").to_compact();
479        let v = lru2.admit(key4, 2, until);
480        assert_eq!(v.len(), 1);
481        assert_eq!(v[0], key2);
482    }
483
484    #[tokio::test]
485    async fn test_save_to_disk() {
486        let lru = Manager::new(4);
487        let key1 = CacheKey::new("", "a", "1").to_compact();
488        let until = SystemTime::now(); // unused value as a placeholder
489        let v = lru.admit(key1.clone(), 1, until);
490        assert_eq!(v.len(), 0);
491        let key2 = CacheKey::new("", "b", "1").to_compact();
492        let v = lru.admit(key2.clone(), 2, until);
493        assert_eq!(v.len(), 0);
494        let key3 = CacheKey::new("", "c", "1").to_compact();
495        let v = lru.admit(key3, 1, until);
496        assert_eq!(v.len(), 0);
497
498        // lru is full (4) now
499        // make key1 most recently used
500        lru.access(&key1, 1, until);
501        assert_eq!(v.len(), 0);
502
503        // load lru2 with lru's data
504        lru.save("/tmp/test_simple_lru_save").await.unwrap();
505        let lru2 = Manager::new(4);
506        lru2.load("/tmp/test_simple_lru_save").await.unwrap();
507
508        let key4 = CacheKey::new("", "d", "1").to_compact();
509        let v = lru2.admit(key4, 2, until);
510        assert_eq!(v.len(), 1);
511        assert_eq!(v[0], key2);
512    }
513
514    #[test]
515    fn test_watermark_eviction() {
516        const SIZE_LIMIT: usize = usize::MAX / 2;
517        let lru = Manager::new_with_watermark(SIZE_LIMIT, Some(4));
518        let until = SystemTime::now();
519
520        // admit 6 items of size 1
521        for name in ["a", "b", "c", "d", "e", "f"] {
522            let key = CacheKey::new("", name, "1").to_compact();
523            let _ = lru.admit(key, 1, until);
524        }
525
526        // test items were evicted due to watermark
527        assert_eq!(lru.total_items(), 4);
528        assert_eq!(lru.evicted_items(), 2);
529        assert_eq!(lru.evicted_size(), 2);
530        assert!(lru.total_size() <= SIZE_LIMIT);
531    }
532}