pingora_cache/eviction/
simple_lru.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A simple LRU cache manager built on top of the `lru` crate
16
17use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use lru::LruCache;
22use parking_lot::RwLock;
23use pingora_error::{BError, ErrorType::*, OrErr, Result};
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::collections::hash_map::DefaultHasher;
28use std::fs::File;
29use std::hash::{Hash, Hasher};
30use std::io::prelude::*;
31use std::path::Path;
32use std::sync::atomic::{AtomicUsize, Ordering};
33use std::time::SystemTime;
34
35#[derive(Debug, Deserialize, Serialize)]
36struct Node {
37    key: CompactCacheKey,
38    size: usize,
39}
40
41/// A simple LRU eviction manager
42///
43/// The implementation is not optimized. All operations require global locks.
44pub struct Manager {
45    lru: RwLock<LruCache<u64, Node>>,
46    limit: usize,
47    used: AtomicUsize,
48    items: AtomicUsize,
49    evicted_size: AtomicUsize,
50    evicted_items: AtomicUsize,
51}
52
53impl Manager {
54    /// Create a new [Manager] with the given total size limit `limit`.
55    pub fn new(limit: usize) -> Self {
56        Manager {
57            lru: RwLock::new(LruCache::unbounded()),
58            limit,
59            used: AtomicUsize::new(0),
60            items: AtomicUsize::new(0),
61            evicted_size: AtomicUsize::new(0),
62            evicted_items: AtomicUsize::new(0),
63        }
64    }
65
66    fn insert(&self, hash_key: u64, node: CompactCacheKey, size: usize, reverse: bool) {
67        use std::cmp::Ordering::*;
68        let node = Node { key: node, size };
69        let old = {
70            let mut lru = self.lru.write();
71            let old = lru.push(hash_key, node);
72            if reverse && old.is_none() {
73                lru.demote(&hash_key);
74            }
75            old
76        };
77        if let Some(old) = old {
78            // replacing a node, just need to update used size
79            match size.cmp(&old.1.size) {
80                Greater => self.used.fetch_add(size - old.1.size, Ordering::Relaxed),
81                Less => self.used.fetch_sub(old.1.size - size, Ordering::Relaxed),
82                Equal => 0, // same size, update nothing, use 0 to match other arms' type
83            };
84        } else {
85            self.used.fetch_add(size, Ordering::Relaxed);
86            self.items.fetch_add(1, Ordering::Relaxed);
87        }
88    }
89
90    fn increase_weight(&self, key: u64, delta: usize) {
91        let mut lru = self.lru.write();
92        let Some(node) = lru.get_key_value_mut(&key) else {
93            return;
94        };
95        node.1.size += delta;
96        self.used.fetch_add(delta, Ordering::Relaxed);
97    }
98
99    // evict items until the used capacity is below the limit
100    fn evict(&self) -> Vec<CompactCacheKey> {
101        if self.used.load(Ordering::Relaxed) <= self.limit {
102            return vec![];
103        }
104        let mut to_evict = Vec::with_capacity(1); // we will at least pop 1 item
105        while self.used.load(Ordering::Relaxed) > self.limit {
106            if let Some((_, node)) = self.lru.write().pop_lru() {
107                self.used.fetch_sub(node.size, Ordering::Relaxed);
108                self.items.fetch_sub(1, Ordering::Relaxed);
109                self.evicted_size.fetch_add(node.size, Ordering::Relaxed);
110                self.evicted_items.fetch_add(1, Ordering::Relaxed);
111                to_evict.push(node.key);
112            } else {
113                // lru empty
114                return to_evict;
115            }
116        }
117        to_evict
118    }
119
120    // This could use a lot of memory to buffer the serialized data in memory and could lock the LRU
121    // for too long
122    fn serialize(&self) -> Result<Vec<u8>> {
123        use rmp_serde::encode::Serializer;
124        use serde::ser::SerializeSeq;
125        use serde::ser::Serializer as _;
126        // NOTE: This could use a lot of memory to buffer the serialized data in memory
127        let mut ser = Serializer::new(vec![]);
128        // NOTE: This long for loop could lock the LRU for too long
129        let lru = self.lru.read();
130        let mut seq = ser
131            .serialize_seq(Some(lru.len()))
132            .or_err(InternalError, "fail to serialize node")?;
133        for item in lru.iter() {
134            seq.serialize_element(item.1).unwrap(); // write to vec, safe
135        }
136        seq.end().or_err(InternalError, "when serializing LRU")?;
137        Ok(ser.into_inner())
138    }
139
140    fn deserialize(&self, buf: &[u8]) -> Result<()> {
141        use rmp_serde::decode::Deserializer;
142        use serde::de::Deserializer as _;
143        let mut de = Deserializer::new(buf);
144        let visitor = InsertToManager { lru: self };
145        de.deserialize_seq(visitor)
146            .or_err(InternalError, "when deserializing LRU")?;
147        Ok(())
148    }
149}
150
151struct InsertToManager<'a> {
152    lru: &'a Manager,
153}
154
155impl<'de> serde::de::Visitor<'de> for InsertToManager<'_> {
156    type Value = ();
157
158    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
159        formatter.write_str("array of lru nodes")
160    }
161
162    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
163    where
164        A: SeqAccess<'de>,
165    {
166        while let Some(node) = seq.next_element::<Node>()? {
167            let key = u64key(&node.key);
168            self.lru.insert(key, node.key, node.size, true); // insert in the back
169        }
170        Ok(())
171    }
172}
173
174#[inline]
175fn u64key(key: &CompactCacheKey) -> u64 {
176    let mut hasher = DefaultHasher::new();
177    key.hash(&mut hasher);
178    hasher.finish()
179}
180
181const FILE_NAME: &str = "simple_lru.data";
182
183#[async_trait]
184impl EvictionManager for Manager {
185    fn total_size(&self) -> usize {
186        self.used.load(Ordering::Relaxed)
187    }
188    fn total_items(&self) -> usize {
189        self.items.load(Ordering::Relaxed)
190    }
191    fn evicted_size(&self) -> usize {
192        self.evicted_size.load(Ordering::Relaxed)
193    }
194    fn evicted_items(&self) -> usize {
195        self.evicted_items.load(Ordering::Relaxed)
196    }
197
198    fn admit(
199        &self,
200        item: CompactCacheKey,
201        size: usize,
202        _fresh_until: SystemTime,
203    ) -> Vec<CompactCacheKey> {
204        let key = u64key(&item);
205        self.insert(key, item, size, false);
206        self.evict()
207    }
208
209    fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
210        let key = u64key(&item);
211        self.increase_weight(key, delta);
212        self.evict()
213    }
214
215    fn remove(&self, item: &CompactCacheKey) {
216        let key = u64key(item);
217        let node = self.lru.write().pop(&key);
218        if let Some(n) = node {
219            self.used.fetch_sub(n.size, Ordering::Relaxed);
220            self.items.fetch_sub(1, Ordering::Relaxed);
221        }
222    }
223
224    fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
225        let key = u64key(item);
226        if self.lru.write().get(&key).is_none() {
227            self.insert(key, item.clone(), size, false);
228            false
229        } else {
230            true
231        }
232    }
233
234    fn peek(&self, item: &CompactCacheKey) -> bool {
235        let key = u64key(item);
236        self.lru.read().peek(&key).is_some()
237    }
238
239    async fn save(&self, dir_path: &str) -> Result<()> {
240        let data = self.serialize()?;
241        let dir_str = dir_path.to_owned();
242        tokio::task::spawn_blocking(move || {
243            let dir_path = Path::new(&dir_str);
244            std::fs::create_dir_all(dir_path)
245                .or_err_with(InternalError, || format!("fail to create {dir_str}"))?;
246
247            let final_file_path = dir_path.join(FILE_NAME);
248            // create a temporary filename using a randomized u32 hash to minimize the chance of multiple writers writing to the same tmp file
249            let random_suffix: u32 = rand::thread_rng().gen();
250            let temp_file_path = dir_path.join(format!("{}.{:08x}.tmp", FILE_NAME, random_suffix));
251            let mut file = File::create(&temp_file_path).or_err_with(InternalError, || {
252                format!("fail to create temporary file {}", temp_file_path.display())
253            })?;
254            file.write_all(&data).or_err_with(InternalError, || {
255                format!("fail to write to {}", temp_file_path.display())
256            })?;
257            file.flush().or_err_with(InternalError, || {
258                format!("fail to flush temp file {}", temp_file_path.display())
259            })?;
260            std::fs::rename(&temp_file_path, &final_file_path).or_err_with(InternalError, || {
261                format!(
262                    "fail to rename temporary file {} to {}",
263                    temp_file_path.display(),
264                    final_file_path.display()
265                )
266            })
267        })
268        .await
269        .or_err(InternalError, "async blocking IO failure")?
270    }
271
272    async fn load(&self, dir_path: &str) -> Result<()> {
273        let dir_path = dir_path.to_owned();
274        let data = tokio::task::spawn_blocking(move || {
275            let file_path = Path::new(&dir_path).join(FILE_NAME);
276            let mut file = File::open(file_path.clone()).or_err_with(InternalError, || {
277                format!("fail to open {}", file_path.display())
278            })?;
279            let mut buffer = Vec::with_capacity(8192);
280            file.read_to_end(&mut buffer)
281                .or_err(InternalError, "fail to read from {file_path}")?;
282            Ok::<Vec<u8>, BError>(buffer)
283        })
284        .await
285        .or_err(InternalError, "async blocking IO failure")??;
286        self.deserialize(&data)
287    }
288}
289
290#[cfg(test)]
291mod test {
292    use super::*;
293    use crate::CacheKey;
294
295    #[test]
296    fn test_admission() {
297        let lru = Manager::new(4);
298        let key1 = CacheKey::new("", "a", "1").to_compact();
299        let until = SystemTime::now(); // unused value as a placeholder
300        let v = lru.admit(key1.clone(), 1, until);
301        assert_eq!(v.len(), 0);
302        let key2 = CacheKey::new("", "b", "1").to_compact();
303        let v = lru.admit(key2.clone(), 2, until);
304        assert_eq!(v.len(), 0);
305        let key3 = CacheKey::new("", "c", "1").to_compact();
306        let v = lru.admit(key3, 1, until);
307        assert_eq!(v.len(), 0);
308
309        // lru si full (4) now
310
311        let key4 = CacheKey::new("", "d", "1").to_compact();
312        let v = lru.admit(key4, 2, until);
313        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
314        assert_eq!(v.len(), 2);
315        assert_eq!(v[0], key1);
316        assert_eq!(v[1], key2);
317    }
318
319    #[test]
320    fn test_access() {
321        let lru = Manager::new(4);
322        let key1 = CacheKey::new("", "a", "1").to_compact();
323        let until = SystemTime::now(); // unused value as a placeholder
324        let v = lru.admit(key1.clone(), 1, until);
325        assert_eq!(v.len(), 0);
326        let key2 = CacheKey::new("", "b", "1").to_compact();
327        let v = lru.admit(key2.clone(), 2, until);
328        assert_eq!(v.len(), 0);
329        let key3 = CacheKey::new("", "c", "1").to_compact();
330        let v = lru.admit(key3, 1, until);
331        assert_eq!(v.len(), 0);
332
333        // lru is full (4) now
334        // make key1 most recently used
335        lru.access(&key1, 1, until);
336        assert_eq!(v.len(), 0);
337
338        let key4 = CacheKey::new("", "d", "1").to_compact();
339        let v = lru.admit(key4, 2, until);
340        assert_eq!(v.len(), 1);
341        assert_eq!(v[0], key2);
342    }
343
344    #[test]
345    fn test_remove() {
346        let lru = Manager::new(4);
347        let key1 = CacheKey::new("", "a", "1").to_compact();
348        let until = SystemTime::now(); // unused value as a placeholder
349        let v = lru.admit(key1.clone(), 1, until);
350        assert_eq!(v.len(), 0);
351        let key2 = CacheKey::new("", "b", "1").to_compact();
352        let v = lru.admit(key2.clone(), 2, until);
353        assert_eq!(v.len(), 0);
354        let key3 = CacheKey::new("", "c", "1").to_compact();
355        let v = lru.admit(key3, 1, until);
356        assert_eq!(v.len(), 0);
357
358        // lru is full (4) now
359        // remove key1
360        lru.remove(&key1);
361
362        // key2 is the least recently used one now
363        let key4 = CacheKey::new("", "d", "1").to_compact();
364        let v = lru.admit(key4, 2, until);
365        assert_eq!(v.len(), 1);
366        assert_eq!(v[0], key2);
367    }
368
369    #[test]
370    fn test_access_add() {
371        let lru = Manager::new(4);
372        let until = SystemTime::now(); // unused value as a placeholder
373
374        let key1 = CacheKey::new("", "a", "1").to_compact();
375        lru.access(&key1, 1, until);
376        let key2 = CacheKey::new("", "b", "1").to_compact();
377        lru.access(&key2, 2, until);
378        let key3 = CacheKey::new("", "c", "1").to_compact();
379        lru.access(&key3, 2, until);
380
381        let key4 = CacheKey::new("", "d", "1").to_compact();
382        let v = lru.admit(key4, 2, until);
383        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
384        assert_eq!(v.len(), 2);
385        assert_eq!(v[0], key1);
386        assert_eq!(v[1], key2);
387    }
388
389    #[test]
390    fn test_admit_update() {
391        let lru = Manager::new(4);
392        let key1 = CacheKey::new("", "a", "1").to_compact();
393        let until = SystemTime::now(); // unused value as a placeholder
394        let v = lru.admit(key1.clone(), 1, until);
395        assert_eq!(v.len(), 0);
396        let key2 = CacheKey::new("", "b", "1").to_compact();
397        let v = lru.admit(key2.clone(), 2, until);
398        assert_eq!(v.len(), 0);
399        let key3 = CacheKey::new("", "c", "1").to_compact();
400        let v = lru.admit(key3, 1, until);
401        assert_eq!(v.len(), 0);
402
403        // lru is full (4) now
404        // update key2 to reduce its size by 1
405        let v = lru.admit(key2, 1, until);
406        assert_eq!(v.len(), 0);
407
408        // lru is not full anymore
409        let key4 = CacheKey::new("", "d", "1").to_compact();
410        let v = lru.admit(key4.clone(), 1, until);
411        assert_eq!(v.len(), 0);
412
413        // make key4 larger
414        let v = lru.admit(key4, 2, until);
415        // need to evict now
416        assert_eq!(v.len(), 1);
417        assert_eq!(v[0], key1);
418    }
419
420    #[test]
421    fn test_serde() {
422        let lru = Manager::new(4);
423        let key1 = CacheKey::new("", "a", "1").to_compact();
424        let until = SystemTime::now(); // unused value as a placeholder
425        let v = lru.admit(key1.clone(), 1, until);
426        assert_eq!(v.len(), 0);
427        let key2 = CacheKey::new("", "b", "1").to_compact();
428        let v = lru.admit(key2.clone(), 2, until);
429        assert_eq!(v.len(), 0);
430        let key3 = CacheKey::new("", "c", "1").to_compact();
431        let v = lru.admit(key3, 1, until);
432        assert_eq!(v.len(), 0);
433
434        // lru is full (4) now
435        // make key1 most recently used
436        lru.access(&key1, 1, until);
437        assert_eq!(v.len(), 0);
438
439        // load lru2 with lru's data
440        let ser = lru.serialize().unwrap();
441        let lru2 = Manager::new(4);
442        lru2.deserialize(&ser).unwrap();
443
444        let key4 = CacheKey::new("", "d", "1").to_compact();
445        let v = lru2.admit(key4, 2, until);
446        assert_eq!(v.len(), 1);
447        assert_eq!(v[0], key2);
448    }
449
450    #[tokio::test]
451    async fn test_save_to_disk() {
452        let lru = Manager::new(4);
453        let key1 = CacheKey::new("", "a", "1").to_compact();
454        let until = SystemTime::now(); // unused value as a placeholder
455        let v = lru.admit(key1.clone(), 1, until);
456        assert_eq!(v.len(), 0);
457        let key2 = CacheKey::new("", "b", "1").to_compact();
458        let v = lru.admit(key2.clone(), 2, until);
459        assert_eq!(v.len(), 0);
460        let key3 = CacheKey::new("", "c", "1").to_compact();
461        let v = lru.admit(key3, 1, until);
462        assert_eq!(v.len(), 0);
463
464        // lru is full (4) now
465        // make key1 most recently used
466        lru.access(&key1, 1, until);
467        assert_eq!(v.len(), 0);
468
469        // load lru2 with lru's data
470        lru.save("/tmp/test_simple_lru_save").await.unwrap();
471        let lru2 = Manager::new(4);
472        lru2.load("/tmp/test_simple_lru_save").await.unwrap();
473
474        let key4 = CacheKey::new("", "d", "1").to_compact();
475        let v = lru2.admit(key4, 2, until);
476        assert_eq!(v.len(), 1);
477        assert_eq!(v[0], key2);
478    }
479}