pingora_cache/eviction/
lru.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A shared LRU cache manager
16
17use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use log::{info, warn};
22use pingora_error::{BError, ErrorType::*, OrErr, Result};
23use pingora_lru::Lru;
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::fs::{rename, File};
28use std::hash::{Hash, Hasher};
29use std::io::prelude::*;
30use std::path::Path;
31use std::time::SystemTime;
32
33/// A shared LRU cache manager designed to manage a large volume of assets.
34///
35/// - Space optimized in-memory LRU (see [pingora_lru]).
36/// - Instead of a single giant LRU, this struct shards the assets into `N` independent LRUs.
37///
38/// This allows [EvictionManager::save()] not to lock the entire cache manager while performing
39/// serialization.
40pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
41
42#[derive(Debug, Serialize, Deserialize)]
43struct SerdeHelperNode(CompactCacheKey, usize);
44
45impl<const N: usize> Manager<N> {
46    /// Create a [Manager] with the given size limit and estimated per shard capacity.
47    ///
48    /// The `capacity` is for preallocating to avoid reallocation cost when the LRU grows.
49    pub fn with_capacity(limit: usize, capacity: usize) -> Self {
50        Manager(Lru::with_capacity(limit, capacity))
51    }
52
53    /// Serialize the given shard
54    pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
55        use rmp_serde::encode::Serializer;
56        use serde::ser::SerializeSeq;
57        use serde::ser::Serializer as _;
58
59        assert!(shard < N);
60
61        // NOTE: This could use a lot of memory to buffer the serialized data in memory
62        // NOTE: This for loop could lock the LRU for too long
63        let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
64        self.0.iter_for_each(shard, |(node, size)| {
65            nodes.push(SerdeHelperNode(node.clone(), size));
66        });
67        let mut ser = Serializer::new(vec![]);
68        let mut seq = ser
69            .serialize_seq(Some(self.0.shard_len(shard)))
70            .or_err(InternalError, "fail to serialize node")?;
71        for node in nodes {
72            seq.serialize_element(&node).unwrap(); // write to vec, safe
73        }
74
75        seq.end().or_err(InternalError, "when serializing LRU")?;
76        Ok(ser.into_inner())
77    }
78
79    /// Deserialize a shard
80    ///
81    /// Shard number is not needed because the key itself will hash to the correct shard.
82    pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
83        use rmp_serde::decode::Deserializer;
84        use serde::de::Deserializer as _;
85
86        let mut de = Deserializer::new(buf);
87        let visitor = InsertToManager { lru: self };
88        de.deserialize_seq(visitor)
89            .or_err(InternalError, "when deserializing LRU")?;
90        Ok(())
91    }
92}
93
94struct InsertToManager<'a, const N: usize> {
95    lru: &'a Manager<N>,
96}
97
98impl<'de, const N: usize> serde::de::Visitor<'de> for InsertToManager<'_, N> {
99    type Value = ();
100
101    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
102        formatter.write_str("array of lru nodes")
103    }
104
105    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
106    where
107        A: SeqAccess<'de>,
108    {
109        while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
110            let key = u64key(&node.0);
111            self.lru.0.insert_tail(key, node.0, node.1); // insert in the back
112        }
113        Ok(())
114    }
115}
116
117#[inline]
118fn u64key(key: &CompactCacheKey) -> u64 {
119    // note that std hash is not uniform, I'm not sure if ahash is also the case
120    let mut hasher = ahash::AHasher::default();
121    key.hash(&mut hasher);
122    hasher.finish()
123}
124
125const FILE_NAME: &str = "lru.data";
126
127#[inline]
128fn err_str_path(s: &str, path: &Path) -> String {
129    format!("{s} {}", path.display())
130}
131
132#[async_trait]
133impl<const N: usize> EvictionManager for Manager<N> {
134    fn total_size(&self) -> usize {
135        self.0.weight()
136    }
137    fn total_items(&self) -> usize {
138        self.0.len()
139    }
140    fn evicted_size(&self) -> usize {
141        self.0.evicted_weight()
142    }
143    fn evicted_items(&self) -> usize {
144        self.0.evicted_len()
145    }
146
147    fn admit(
148        &self,
149        item: CompactCacheKey,
150        size: usize,
151        _fresh_until: SystemTime,
152    ) -> Vec<CompactCacheKey> {
153        let key = u64key(&item);
154        self.0.admit(key, item, size);
155        self.0
156            .evict_to_limit()
157            .into_iter()
158            .map(|(key, _weight)| key)
159            .collect()
160    }
161
162    fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
163        let key = u64key(&item);
164        self.0.increment_weight(key, delta);
165        self.0
166            .evict_to_limit()
167            .into_iter()
168            .map(|(key, _weight)| key)
169            .collect()
170    }
171
172    fn remove(&self, item: &CompactCacheKey) {
173        let key = u64key(item);
174        self.0.remove(key);
175    }
176
177    fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
178        let key = u64key(item);
179        if !self.0.promote(key) {
180            self.0.admit(key, item.clone(), size);
181            false
182        } else {
183            true
184        }
185    }
186
187    fn peek(&self, item: &CompactCacheKey) -> bool {
188        let key = u64key(item);
189        self.0.peek(key)
190    }
191
192    async fn save(&self, dir_path: &str) -> Result<()> {
193        let dir_path_str = dir_path.to_owned();
194
195        tokio::task::spawn_blocking(move || {
196            let dir_path = Path::new(&dir_path_str);
197            std::fs::create_dir_all(dir_path)
198                .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
199        })
200        .await
201        .or_err(InternalError, "async blocking IO failure")??;
202
203        for i in 0..N {
204            let data = self.serialize_shard(i)?;
205            let dir_path = dir_path.to_owned();
206            tokio::task::spawn_blocking(move || {
207                let dir_path = Path::new(&dir_path);
208                let final_path = dir_path.join(format!("{}.{i}", FILE_NAME));
209                // create a temporary filename using a randomized u32 hash to minimize the chance of multiple writers writing to the same tmp file
210                let random_suffix: u32 = rand::thread_rng().gen();
211                let temp_path =
212                    dir_path.join(format!("{}.{i}.{:08x}.tmp", FILE_NAME, random_suffix));
213                let mut file = File::create(&temp_path)
214                    .or_err_with(InternalError, || err_str_path("fail to create", &temp_path))?;
215                file.write_all(&data).or_err_with(InternalError, || {
216                    err_str_path("fail to write to", &temp_path)
217                })?;
218                file.flush().or_err_with(InternalError, || {
219                    err_str_path("fail to flush temp file", &temp_path)
220                })?;
221                rename(&temp_path, &final_path).or_err_with(InternalError, || {
222                    format!(
223                        "Failed to rename file from {} to {}",
224                        temp_path.display(),
225                        final_path.display(),
226                    )
227                })
228            })
229            .await
230            .or_err(InternalError, "async blocking IO failure")??;
231        }
232        Ok(())
233    }
234
235    async fn load(&self, dir_path: &str) -> Result<()> {
236        // TODO: check the saved shards so that we load all the save files
237        let mut loaded_shards = 0;
238        for i in 0..N {
239            let dir_path = dir_path.to_owned();
240
241            let data = tokio::task::spawn_blocking(move || {
242                let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
243                let mut file = File::open(&file_path)
244                    .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
245                let mut buffer = Vec::with_capacity(8192);
246                file.read_to_end(&mut buffer)
247                    .or_err_with(InternalError, || {
248                        err_str_path("fail to read from", &file_path)
249                    })?;
250                Ok::<Vec<u8>, BError>(buffer)
251            })
252            .await
253            .or_err(InternalError, "async blocking IO failure")??;
254
255            if let Err(e) = self.deserialize_shard(&data) {
256                warn!("Failed to deserialize shard {}: {}. Skipping shard.", i, e);
257                continue; // Skip shard and move onto the next one
258            }
259            loaded_shards += 1;
260        }
261
262        // Log how many shards were successfully loaded
263        if loaded_shards < N {
264            warn!(
265                "Only loaded {}/{} shards. Cache may be incomplete.",
266                loaded_shards, N
267            )
268        } else {
269            info!("Successfully loaded {}/{} shards.", loaded_shards, N)
270        }
271
272        Ok(())
273    }
274}
275
276#[cfg(test)]
277mod test {
278    use super::*;
279    use crate::CacheKey;
280
281    // we use shard (N) = 1 for eviction consistency in all tests
282
283    #[test]
284    fn test_admission() {
285        let lru = Manager::<1>::with_capacity(4, 10);
286        let key1 = CacheKey::new("", "a", "1").to_compact();
287        let until = SystemTime::now(); // unused value as a placeholder
288        let v = lru.admit(key1.clone(), 1, until);
289        assert_eq!(v.len(), 0);
290        let key2 = CacheKey::new("", "b", "1").to_compact();
291        let v = lru.admit(key2.clone(), 2, until);
292        assert_eq!(v.len(), 0);
293        let key3 = CacheKey::new("", "c", "1").to_compact();
294        let v = lru.admit(key3, 1, until);
295        assert_eq!(v.len(), 0);
296
297        // lru si full (4) now
298
299        let key4 = CacheKey::new("", "d", "1").to_compact();
300        let v = lru.admit(key4, 2, until);
301        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
302        assert_eq!(v.len(), 2);
303        assert_eq!(v[0], key1);
304        assert_eq!(v[1], key2);
305    }
306
307    #[test]
308    fn test_access() {
309        let lru = Manager::<1>::with_capacity(4, 10);
310        let key1 = CacheKey::new("", "a", "1").to_compact();
311        let until = SystemTime::now(); // unused value as a placeholder
312        let v = lru.admit(key1.clone(), 1, until);
313        assert_eq!(v.len(), 0);
314        let key2 = CacheKey::new("", "b", "1").to_compact();
315        let v = lru.admit(key2.clone(), 2, until);
316        assert_eq!(v.len(), 0);
317        let key3 = CacheKey::new("", "c", "1").to_compact();
318        let v = lru.admit(key3, 1, until);
319        assert_eq!(v.len(), 0);
320
321        // lru is full (4) now
322        // make key1 most recently used
323        lru.access(&key1, 1, until);
324        assert_eq!(v.len(), 0);
325
326        let key4 = CacheKey::new("", "d", "1").to_compact();
327        let v = lru.admit(key4, 2, until);
328        assert_eq!(v.len(), 1);
329        assert_eq!(v[0], key2);
330    }
331
332    #[test]
333    fn test_remove() {
334        let lru = Manager::<1>::with_capacity(4, 10);
335        let key1 = CacheKey::new("", "a", "1").to_compact();
336        let until = SystemTime::now(); // unused value as a placeholder
337        let v = lru.admit(key1.clone(), 1, until);
338        assert_eq!(v.len(), 0);
339        let key2 = CacheKey::new("", "b", "1").to_compact();
340        let v = lru.admit(key2.clone(), 2, until);
341        assert_eq!(v.len(), 0);
342        let key3 = CacheKey::new("", "c", "1").to_compact();
343        let v = lru.admit(key3, 1, until);
344        assert_eq!(v.len(), 0);
345
346        // lru is full (4) now
347        // remove key1
348        lru.remove(&key1);
349
350        // key2 is the least recently used one now
351        let key4 = CacheKey::new("", "d", "1").to_compact();
352        let v = lru.admit(key4, 2, until);
353        assert_eq!(v.len(), 1);
354        assert_eq!(v[0], key2);
355    }
356
357    #[test]
358    fn test_access_add() {
359        let lru = Manager::<1>::with_capacity(4, 10);
360        let until = SystemTime::now(); // unused value as a placeholder
361
362        let key1 = CacheKey::new("", "a", "1").to_compact();
363        lru.access(&key1, 1, until);
364        let key2 = CacheKey::new("", "b", "1").to_compact();
365        lru.access(&key2, 2, until);
366        let key3 = CacheKey::new("", "c", "1").to_compact();
367        lru.access(&key3, 2, until);
368
369        let key4 = CacheKey::new("", "d", "1").to_compact();
370        let v = lru.admit(key4, 2, until);
371        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
372        assert_eq!(v.len(), 2);
373        assert_eq!(v[0], key1);
374        assert_eq!(v[1], key2);
375    }
376
377    #[test]
378    fn test_admit_update() {
379        let lru = Manager::<1>::with_capacity(4, 10);
380        let key1 = CacheKey::new("", "a", "1").to_compact();
381        let until = SystemTime::now(); // unused value as a placeholder
382        let v = lru.admit(key1.clone(), 1, until);
383        assert_eq!(v.len(), 0);
384        let key2 = CacheKey::new("", "b", "1").to_compact();
385        let v = lru.admit(key2.clone(), 2, until);
386        assert_eq!(v.len(), 0);
387        let key3 = CacheKey::new("", "c", "1").to_compact();
388        let v = lru.admit(key3, 1, until);
389        assert_eq!(v.len(), 0);
390
391        // lru is full (4) now
392        // update key2 to reduce its size by 1
393        let v = lru.admit(key2, 1, until);
394        assert_eq!(v.len(), 0);
395
396        // lru is not full anymore
397        let key4 = CacheKey::new("", "d", "1").to_compact();
398        let v = lru.admit(key4.clone(), 1, until);
399        assert_eq!(v.len(), 0);
400
401        // make key4 larger
402        let v = lru.admit(key4, 2, until);
403        // need to evict now
404        assert_eq!(v.len(), 1);
405        assert_eq!(v[0], key1);
406    }
407
408    #[test]
409    fn test_peek() {
410        let lru = Manager::<1>::with_capacity(4, 10);
411        let until = SystemTime::now(); // unused value as a placeholder
412
413        let key1 = CacheKey::new("", "a", "1").to_compact();
414        lru.access(&key1, 1, until);
415        let key2 = CacheKey::new("", "b", "1").to_compact();
416        lru.access(&key2, 2, until);
417        assert!(lru.peek(&key1));
418        assert!(lru.peek(&key2));
419    }
420
421    #[test]
422    fn test_serde() {
423        let lru = Manager::<1>::with_capacity(4, 10);
424        let key1 = CacheKey::new("", "a", "1").to_compact();
425        let until = SystemTime::now(); // unused value as a placeholder
426        let v = lru.admit(key1.clone(), 1, until);
427        assert_eq!(v.len(), 0);
428        let key2 = CacheKey::new("", "b", "1").to_compact();
429        let v = lru.admit(key2.clone(), 2, until);
430        assert_eq!(v.len(), 0);
431        let key3 = CacheKey::new("", "c", "1").to_compact();
432        let v = lru.admit(key3, 1, until);
433        assert_eq!(v.len(), 0);
434
435        // lru is full (4) now
436        // make key1 most recently used
437        lru.access(&key1, 1, until);
438        assert_eq!(v.len(), 0);
439
440        // load lru2 with lru's data
441        let ser = lru.serialize_shard(0).unwrap();
442        let lru2 = Manager::<1>::with_capacity(4, 10);
443        lru2.deserialize_shard(&ser).unwrap();
444
445        let key4 = CacheKey::new("", "d", "1").to_compact();
446        let v = lru2.admit(key4, 2, until);
447        assert_eq!(v.len(), 1);
448        assert_eq!(v[0], key2);
449    }
450
451    #[tokio::test]
452    async fn test_save_to_disk() {
453        let until = SystemTime::now(); // unused value as a placeholder
454        let lru = Manager::<2>::with_capacity(10, 10);
455
456        lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
457        lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
458        lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
459        lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
460        lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
461        lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
462
463        // load lru2 with lru's data
464        lru.save("/tmp/test_lru_save").await.unwrap();
465        let lru2 = Manager::<2>::with_capacity(4, 10);
466        lru2.load("/tmp/test_lru_save").await.unwrap();
467
468        let ser0 = lru.serialize_shard(0).unwrap();
469        let ser1 = lru.serialize_shard(1).unwrap();
470
471        assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
472        assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
473    }
474}