pingora_cache/eviction/
lru.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A shared LRU cache manager
16
17use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use pingora_error::{BError, ErrorType::*, OrErr, Result};
22use pingora_lru::Lru;
23use serde::de::SeqAccess;
24use serde::{Deserialize, Serialize};
25use std::fs::File;
26use std::hash::{Hash, Hasher};
27use std::io::prelude::*;
28use std::path::Path;
29use std::time::SystemTime;
30
31/// A shared LRU cache manager designed to manage a large volume of assets.
32///
33/// - Space optimized in-memory LRU (see [pingora_lru]).
34/// - Instead of a single giant LRU, this struct shards the assets into `N` independent LRUs.
35///
36/// This allows [EvictionManager::save()] not to lock the entire cache manager while performing
37/// serialization.
38pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
39
40#[derive(Debug, Serialize, Deserialize)]
41struct SerdeHelperNode(CompactCacheKey, usize);
42
43impl<const N: usize> Manager<N> {
44    /// Create a [Manager] with the given size limit and estimated per shard capacity.
45    ///
46    /// The `capacity` is for preallocating to avoid reallocation cost when the LRU grows.
47    pub fn with_capacity(limit: usize, capacity: usize) -> Self {
48        Manager(Lru::with_capacity(limit, capacity))
49    }
50
51    /// Serialize the given shard
52    pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
53        use rmp_serde::encode::Serializer;
54        use serde::ser::SerializeSeq;
55        use serde::ser::Serializer as _;
56
57        assert!(shard < N);
58
59        // NOTE: This could use a lot of memory to buffer the serialized data in memory
60        // NOTE: This for loop could lock the LRU for too long
61        let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
62        self.0.iter_for_each(shard, |(node, size)| {
63            nodes.push(SerdeHelperNode(node.clone(), size));
64        });
65        let mut ser = Serializer::new(vec![]);
66        let mut seq = ser
67            .serialize_seq(Some(self.0.shard_len(shard)))
68            .or_err(InternalError, "fail to serialize node")?;
69        for node in nodes {
70            seq.serialize_element(&node).unwrap(); // write to vec, safe
71        }
72
73        seq.end().or_err(InternalError, "when serializing LRU")?;
74        Ok(ser.into_inner())
75    }
76
77    /// Deserialize a shard
78    ///
79    /// Shard number is not needed because the key itself will hash to the correct shard.
80    pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
81        use rmp_serde::decode::Deserializer;
82        use serde::de::Deserializer as _;
83
84        let mut de = Deserializer::new(buf);
85        let visitor = InsertToManager { lru: self };
86        de.deserialize_seq(visitor)
87            .or_err(InternalError, "when deserializing LRU")?;
88        Ok(())
89    }
90}
91
92struct InsertToManager<'a, const N: usize> {
93    lru: &'a Manager<N>,
94}
95
96impl<'de, const N: usize> serde::de::Visitor<'de> for InsertToManager<'_, N> {
97    type Value = ();
98
99    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
100        formatter.write_str("array of lru nodes")
101    }
102
103    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
104    where
105        A: SeqAccess<'de>,
106    {
107        while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
108            let key = u64key(&node.0);
109            self.lru.0.insert_tail(key, node.0, node.1); // insert in the back
110        }
111        Ok(())
112    }
113}
114
115#[inline]
116fn u64key(key: &CompactCacheKey) -> u64 {
117    // note that std hash is not uniform, I'm not sure if ahash is also the case
118    let mut hasher = ahash::AHasher::default();
119    key.hash(&mut hasher);
120    hasher.finish()
121}
122
123const FILE_NAME: &str = "lru.data";
124
125#[inline]
126fn err_str_path(s: &str, path: &Path) -> String {
127    format!("{s} {}", path.display())
128}
129
130#[async_trait]
131impl<const N: usize> EvictionManager for Manager<N> {
132    fn total_size(&self) -> usize {
133        self.0.weight()
134    }
135    fn total_items(&self) -> usize {
136        self.0.len()
137    }
138    fn evicted_size(&self) -> usize {
139        self.0.evicted_weight()
140    }
141    fn evicted_items(&self) -> usize {
142        self.0.evicted_len()
143    }
144
145    fn admit(
146        &self,
147        item: CompactCacheKey,
148        size: usize,
149        _fresh_until: SystemTime,
150    ) -> Vec<CompactCacheKey> {
151        let key = u64key(&item);
152        self.0.admit(key, item, size);
153        self.0
154            .evict_to_limit()
155            .into_iter()
156            .map(|(key, _weight)| key)
157            .collect()
158    }
159
160    fn increment_weight(&self, item: CompactCacheKey, delta: usize) -> Vec<CompactCacheKey> {
161        let key = u64key(&item);
162        self.0.increment_weight(key, delta);
163        self.0
164            .evict_to_limit()
165            .into_iter()
166            .map(|(key, _weight)| key)
167            .collect()
168    }
169
170    fn remove(&self, item: &CompactCacheKey) {
171        let key = u64key(item);
172        self.0.remove(key);
173    }
174
175    fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
176        let key = u64key(item);
177        if !self.0.promote(key) {
178            self.0.admit(key, item.clone(), size);
179            false
180        } else {
181            true
182        }
183    }
184
185    fn peek(&self, item: &CompactCacheKey) -> bool {
186        let key = u64key(item);
187        self.0.peek(key)
188    }
189
190    async fn save(&self, dir_path: &str) -> Result<()> {
191        let dir_path_str = dir_path.to_owned();
192
193        tokio::task::spawn_blocking(move || {
194            let dir_path = Path::new(&dir_path_str);
195            std::fs::create_dir_all(dir_path)
196                .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
197        })
198        .await
199        .or_err(InternalError, "async blocking IO failure")??;
200
201        for i in 0..N {
202            let data = self.serialize_shard(i)?;
203            let dir_path = dir_path.to_owned();
204            tokio::task::spawn_blocking(move || {
205                let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
206                let mut file = File::create(&file_path)
207                    .or_err_with(InternalError, || err_str_path("fail to create", &file_path))?;
208                file.write_all(&data).or_err_with(InternalError, || {
209                    err_str_path("fail to write to", &file_path)
210                })
211            })
212            .await
213            .or_err(InternalError, "async blocking IO failure")??;
214        }
215        Ok(())
216    }
217
218    async fn load(&self, dir_path: &str) -> Result<()> {
219        // TODO: check the saved shards so that we load all the save files
220        for i in 0..N {
221            let dir_path = dir_path.to_owned();
222
223            let data = tokio::task::spawn_blocking(move || {
224                let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
225                let mut file = File::open(&file_path)
226                    .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
227                let mut buffer = Vec::with_capacity(8192);
228                file.read_to_end(&mut buffer)
229                    .or_err_with(InternalError, || {
230                        err_str_path("fail to read from", &file_path)
231                    })?;
232                Ok::<Vec<u8>, BError>(buffer)
233            })
234            .await
235            .or_err(InternalError, "async blocking IO failure")??;
236            self.deserialize_shard(&data)?;
237        }
238
239        Ok(())
240    }
241}
242
243#[cfg(test)]
244mod test {
245    use super::*;
246    use crate::CacheKey;
247
248    // we use shard (N) = 1 for eviction consistency in all tests
249
250    #[test]
251    fn test_admission() {
252        let lru = Manager::<1>::with_capacity(4, 10);
253        let key1 = CacheKey::new("", "a", "1").to_compact();
254        let until = SystemTime::now(); // unused value as a placeholder
255        let v = lru.admit(key1.clone(), 1, until);
256        assert_eq!(v.len(), 0);
257        let key2 = CacheKey::new("", "b", "1").to_compact();
258        let v = lru.admit(key2.clone(), 2, until);
259        assert_eq!(v.len(), 0);
260        let key3 = CacheKey::new("", "c", "1").to_compact();
261        let v = lru.admit(key3, 1, until);
262        assert_eq!(v.len(), 0);
263
264        // lru si full (4) now
265
266        let key4 = CacheKey::new("", "d", "1").to_compact();
267        let v = lru.admit(key4, 2, until);
268        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
269        assert_eq!(v.len(), 2);
270        assert_eq!(v[0], key1);
271        assert_eq!(v[1], key2);
272    }
273
274    #[test]
275    fn test_access() {
276        let lru = Manager::<1>::with_capacity(4, 10);
277        let key1 = CacheKey::new("", "a", "1").to_compact();
278        let until = SystemTime::now(); // unused value as a placeholder
279        let v = lru.admit(key1.clone(), 1, until);
280        assert_eq!(v.len(), 0);
281        let key2 = CacheKey::new("", "b", "1").to_compact();
282        let v = lru.admit(key2.clone(), 2, until);
283        assert_eq!(v.len(), 0);
284        let key3 = CacheKey::new("", "c", "1").to_compact();
285        let v = lru.admit(key3, 1, until);
286        assert_eq!(v.len(), 0);
287
288        // lru is full (4) now
289        // make key1 most recently used
290        lru.access(&key1, 1, until);
291        assert_eq!(v.len(), 0);
292
293        let key4 = CacheKey::new("", "d", "1").to_compact();
294        let v = lru.admit(key4, 2, until);
295        assert_eq!(v.len(), 1);
296        assert_eq!(v[0], key2);
297    }
298
299    #[test]
300    fn test_remove() {
301        let lru = Manager::<1>::with_capacity(4, 10);
302        let key1 = CacheKey::new("", "a", "1").to_compact();
303        let until = SystemTime::now(); // unused value as a placeholder
304        let v = lru.admit(key1.clone(), 1, until);
305        assert_eq!(v.len(), 0);
306        let key2 = CacheKey::new("", "b", "1").to_compact();
307        let v = lru.admit(key2.clone(), 2, until);
308        assert_eq!(v.len(), 0);
309        let key3 = CacheKey::new("", "c", "1").to_compact();
310        let v = lru.admit(key3, 1, until);
311        assert_eq!(v.len(), 0);
312
313        // lru is full (4) now
314        // remove key1
315        lru.remove(&key1);
316
317        // key2 is the least recently used one now
318        let key4 = CacheKey::new("", "d", "1").to_compact();
319        let v = lru.admit(key4, 2, until);
320        assert_eq!(v.len(), 1);
321        assert_eq!(v[0], key2);
322    }
323
324    #[test]
325    fn test_access_add() {
326        let lru = Manager::<1>::with_capacity(4, 10);
327        let until = SystemTime::now(); // unused value as a placeholder
328
329        let key1 = CacheKey::new("", "a", "1").to_compact();
330        lru.access(&key1, 1, until);
331        let key2 = CacheKey::new("", "b", "1").to_compact();
332        lru.access(&key2, 2, until);
333        let key3 = CacheKey::new("", "c", "1").to_compact();
334        lru.access(&key3, 2, until);
335
336        let key4 = CacheKey::new("", "d", "1").to_compact();
337        let v = lru.admit(key4, 2, until);
338        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
339        assert_eq!(v.len(), 2);
340        assert_eq!(v[0], key1);
341        assert_eq!(v[1], key2);
342    }
343
344    #[test]
345    fn test_admit_update() {
346        let lru = Manager::<1>::with_capacity(4, 10);
347        let key1 = CacheKey::new("", "a", "1").to_compact();
348        let until = SystemTime::now(); // unused value as a placeholder
349        let v = lru.admit(key1.clone(), 1, until);
350        assert_eq!(v.len(), 0);
351        let key2 = CacheKey::new("", "b", "1").to_compact();
352        let v = lru.admit(key2.clone(), 2, until);
353        assert_eq!(v.len(), 0);
354        let key3 = CacheKey::new("", "c", "1").to_compact();
355        let v = lru.admit(key3, 1, until);
356        assert_eq!(v.len(), 0);
357
358        // lru is full (4) now
359        // update key2 to reduce its size by 1
360        let v = lru.admit(key2, 1, until);
361        assert_eq!(v.len(), 0);
362
363        // lru is not full anymore
364        let key4 = CacheKey::new("", "d", "1").to_compact();
365        let v = lru.admit(key4.clone(), 1, until);
366        assert_eq!(v.len(), 0);
367
368        // make key4 larger
369        let v = lru.admit(key4, 2, until);
370        // need to evict now
371        assert_eq!(v.len(), 1);
372        assert_eq!(v[0], key1);
373    }
374
375    #[test]
376    fn test_peek() {
377        let lru = Manager::<1>::with_capacity(4, 10);
378        let until = SystemTime::now(); // unused value as a placeholder
379
380        let key1 = CacheKey::new("", "a", "1").to_compact();
381        lru.access(&key1, 1, until);
382        let key2 = CacheKey::new("", "b", "1").to_compact();
383        lru.access(&key2, 2, until);
384        assert!(lru.peek(&key1));
385        assert!(lru.peek(&key2));
386    }
387
388    #[test]
389    fn test_serde() {
390        let lru = Manager::<1>::with_capacity(4, 10);
391        let key1 = CacheKey::new("", "a", "1").to_compact();
392        let until = SystemTime::now(); // unused value as a placeholder
393        let v = lru.admit(key1.clone(), 1, until);
394        assert_eq!(v.len(), 0);
395        let key2 = CacheKey::new("", "b", "1").to_compact();
396        let v = lru.admit(key2.clone(), 2, until);
397        assert_eq!(v.len(), 0);
398        let key3 = CacheKey::new("", "c", "1").to_compact();
399        let v = lru.admit(key3, 1, until);
400        assert_eq!(v.len(), 0);
401
402        // lru is full (4) now
403        // make key1 most recently used
404        lru.access(&key1, 1, until);
405        assert_eq!(v.len(), 0);
406
407        // load lru2 with lru's data
408        let ser = lru.serialize_shard(0).unwrap();
409        let lru2 = Manager::<1>::with_capacity(4, 10);
410        lru2.deserialize_shard(&ser).unwrap();
411
412        let key4 = CacheKey::new("", "d", "1").to_compact();
413        let v = lru2.admit(key4, 2, until);
414        assert_eq!(v.len(), 1);
415        assert_eq!(v[0], key2);
416    }
417
418    #[tokio::test]
419    async fn test_save_to_disk() {
420        let until = SystemTime::now(); // unused value as a placeholder
421        let lru = Manager::<2>::with_capacity(10, 10);
422
423        lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
424        lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
425        lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
426        lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
427        lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
428        lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
429
430        // load lru2 with lru's data
431        lru.save("/tmp/test_lru_save").await.unwrap();
432        let lru2 = Manager::<2>::with_capacity(4, 10);
433        lru2.load("/tmp/test_lru_save").await.unwrap();
434
435        let ser0 = lru.serialize_shard(0).unwrap();
436        let ser1 = lru.serialize_shard(1).unwrap();
437
438        assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
439        assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
440    }
441}