Skip to main content

pingora_cache/eviction/
lru.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A shared LRU cache manager
16
17use super::EvictionManager;
18use crate::key::CompactCacheKey;
19
20use async_trait::async_trait;
21use log::{info, warn};
22use pingora_error::{BError, ErrorType::*, OrErr, Result};
23use pingora_lru::Lru;
24use rand::Rng;
25use serde::de::SeqAccess;
26use serde::{Deserialize, Serialize};
27use std::fs::{rename, File};
28use std::hash::{Hash, Hasher};
29use std::io::prelude::*;
30use std::path::Path;
31use std::time::SystemTime;
32
33/// A shared LRU cache manager designed to manage a large volume of assets.
34///
35/// - Space optimized in-memory LRU (see [pingora_lru]).
36/// - Instead of a single giant LRU, this struct shards the assets into `N` independent LRUs.
37///
38/// This allows [EvictionManager::save()] not to lock the entire cache manager while performing
39/// serialization.
40pub struct Manager<const N: usize>(Lru<CompactCacheKey, N>);
41
42#[derive(Debug, Serialize, Deserialize)]
43struct SerdeHelperNode(CompactCacheKey, usize);
44
45impl<const N: usize> Manager<N> {
46    /// Create a [Manager] with the given size limit and estimated per shard capacity.
47    ///
48    /// The `capacity` is for preallocating to avoid reallocation cost when the LRU grows.
49    pub fn with_capacity(limit: usize, capacity: usize) -> Self {
50        Manager(Lru::with_capacity(limit, capacity))
51    }
52
53    /// Create a [Manager] with an optional watermark in addition to weight limit.
54    ///
55    /// When `watermark` is set, the underlying LRU will also evict to keep total item count
56    /// under or equal to that watermark.
57    pub fn with_capacity_and_watermark(
58        limit: usize,
59        capacity: usize,
60        watermark: Option<usize>,
61    ) -> Self {
62        Manager(Lru::with_capacity_and_watermark(limit, capacity, watermark))
63    }
64
65    /// Get the number of shards
66    pub fn shards(&self) -> usize {
67        self.0.shards()
68    }
69
70    /// Get the weight (total size) of a specific shard
71    pub fn shard_weight(&self, shard: usize) -> usize {
72        self.0.shard_weight(shard)
73    }
74
75    /// Get the number of items in a specific shard
76    pub fn shard_len(&self, shard: usize) -> usize {
77        self.0.shard_len(shard)
78    }
79
80    /// Get the shard index for a given cache key
81    ///
82    /// This allows callers to know which shard was affected by an operation
83    /// without acquiring any locks.
84    pub fn get_shard_for_key(&self, key: &CompactCacheKey) -> usize {
85        (u64key(key) % N as u64) as usize
86    }
87
88    /// Serialize the given shard
89    pub fn serialize_shard(&self, shard: usize) -> Result<Vec<u8>> {
90        use rmp_serde::encode::Serializer;
91        use serde::ser::SerializeSeq;
92        use serde::ser::Serializer as _;
93
94        assert!(shard < N);
95
96        // NOTE: This could use a lot of memory to buffer the serialized data in memory
97        // NOTE: This for loop could lock the LRU for too long
98        let mut nodes = Vec::with_capacity(self.0.shard_len(shard));
99        self.0.iter_for_each(shard, |(node, size)| {
100            nodes.push(SerdeHelperNode(node.clone(), size));
101        });
102        let mut ser = Serializer::new(vec![]);
103        let mut seq = ser
104            .serialize_seq(Some(self.0.shard_len(shard)))
105            .or_err(InternalError, "fail to serialize node")?;
106        for node in nodes {
107            seq.serialize_element(&node).unwrap(); // write to vec, safe
108        }
109
110        seq.end().or_err(InternalError, "when serializing LRU")?;
111        Ok(ser.into_inner())
112    }
113
114    /// Deserialize a shard
115    ///
116    /// Shard number is not needed because the key itself will hash to the correct shard.
117    pub fn deserialize_shard(&self, buf: &[u8]) -> Result<()> {
118        use rmp_serde::decode::Deserializer;
119        use serde::de::Deserializer as _;
120
121        let mut de = Deserializer::new(buf);
122        let visitor = InsertToManager { lru: self };
123        de.deserialize_seq(visitor)
124            .or_err(InternalError, "when deserializing LRU")?;
125        Ok(())
126    }
127
128    /// Peek the weight associated with a cache key without changing its LRU order.
129    pub fn peek_weight(&self, item: &CompactCacheKey) -> Option<usize> {
130        let key = u64key(item);
131        self.0.peek_weight(key)
132    }
133}
134
135struct InsertToManager<'a, const N: usize> {
136    lru: &'a Manager<N>,
137}
138
139impl<'de, const N: usize> serde::de::Visitor<'de> for InsertToManager<'_, N> {
140    type Value = ();
141
142    fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
143        formatter.write_str("array of lru nodes")
144    }
145
146    fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
147    where
148        A: SeqAccess<'de>,
149    {
150        while let Some(node) = seq.next_element::<SerdeHelperNode>()? {
151            let key = u64key(&node.0);
152            self.lru.0.insert_tail(key, node.0, node.1); // insert in the back
153        }
154        Ok(())
155    }
156}
157
158#[inline]
159fn u64key(key: &CompactCacheKey) -> u64 {
160    // note that std hash is not uniform, I'm not sure if ahash is also the case
161    let mut hasher = ahash::AHasher::default();
162    key.hash(&mut hasher);
163    hasher.finish()
164}
165
166const FILE_NAME: &str = "lru.data";
167
168#[inline]
169fn err_str_path(s: &str, path: &Path) -> String {
170    format!("{s} {}", path.display())
171}
172
173#[async_trait]
174impl<const N: usize> EvictionManager for Manager<N> {
175    fn total_size(&self) -> usize {
176        self.0.weight()
177    }
178    fn total_items(&self) -> usize {
179        self.0.len()
180    }
181    fn evicted_size(&self) -> usize {
182        self.0.evicted_weight()
183    }
184    fn evicted_items(&self) -> usize {
185        self.0.evicted_len()
186    }
187
188    fn admit(
189        &self,
190        item: CompactCacheKey,
191        size: usize,
192        _fresh_until: SystemTime,
193    ) -> Vec<CompactCacheKey> {
194        let key = u64key(&item);
195        self.0.admit(key, item, size);
196        self.0
197            .evict_to_limit()
198            .into_iter()
199            .map(|(key, _weight)| key)
200            .collect()
201    }
202
203    fn increment_weight(
204        &self,
205        item: &CompactCacheKey,
206        delta: usize,
207        max_weight: Option<usize>,
208    ) -> Vec<CompactCacheKey> {
209        let key = u64key(item);
210        self.0.increment_weight(key, delta, max_weight);
211        self.0
212            .evict_to_limit()
213            .into_iter()
214            .map(|(key, _weight)| key)
215            .collect()
216    }
217
218    fn remove(&self, item: &CompactCacheKey) {
219        let key = u64key(item);
220        self.0.remove(key);
221    }
222
223    fn access(&self, item: &CompactCacheKey, size: usize, _fresh_until: SystemTime) -> bool {
224        let key = u64key(item);
225        if !self.0.promote(key) {
226            self.0.admit(key, item.clone(), size);
227            false
228        } else {
229            true
230        }
231    }
232
233    fn peek(&self, item: &CompactCacheKey) -> bool {
234        let key = u64key(item);
235        self.0.peek(key)
236    }
237
238    async fn save(&self, dir_path: &str) -> Result<()> {
239        let dir_path_str = dir_path.to_owned();
240
241        tokio::task::spawn_blocking(move || {
242            let dir_path = Path::new(&dir_path_str);
243            std::fs::create_dir_all(dir_path)
244                .or_err_with(InternalError, || err_str_path("fail to create", dir_path))
245        })
246        .await
247        .or_err(InternalError, "async blocking IO failure")??;
248
249        for i in 0..N {
250            let data = self.serialize_shard(i)?;
251            let dir_path = dir_path.to_owned();
252            tokio::task::spawn_blocking(move || {
253                let dir_path = Path::new(&dir_path);
254                let final_path = dir_path.join(format!("{}.{i}", FILE_NAME));
255                // create a temporary filename using a randomized u32 hash to minimize the chance of multiple writers writing to the same tmp file
256                let random_suffix: u32 = rand::thread_rng().gen();
257                let temp_path =
258                    dir_path.join(format!("{}.{i}.{:08x}.tmp", FILE_NAME, random_suffix));
259                let mut file = File::create(&temp_path)
260                    .or_err_with(InternalError, || err_str_path("fail to create", &temp_path))?;
261                file.write_all(&data).or_err_with(InternalError, || {
262                    err_str_path("fail to write to", &temp_path)
263                })?;
264                file.flush().or_err_with(InternalError, || {
265                    err_str_path("fail to flush temp file", &temp_path)
266                })?;
267                rename(&temp_path, &final_path).or_err_with(InternalError, || {
268                    format!(
269                        "Failed to rename file from {} to {}",
270                        temp_path.display(),
271                        final_path.display(),
272                    )
273                })
274            })
275            .await
276            .or_err(InternalError, "async blocking IO failure")??;
277        }
278        Ok(())
279    }
280
281    async fn load(&self, dir_path: &str) -> Result<()> {
282        // TODO: check the saved shards so that we load all the save files
283        let mut loaded_shards = 0;
284        for i in 0..N {
285            let dir_path = dir_path.to_owned();
286
287            let data = tokio::task::spawn_blocking(move || {
288                let file_path = Path::new(&dir_path).join(format!("{}.{i}", FILE_NAME));
289                let mut file = File::open(&file_path)
290                    .or_err_with(InternalError, || err_str_path("fail to open", &file_path))?;
291                let mut buffer = Vec::with_capacity(8192);
292                file.read_to_end(&mut buffer)
293                    .or_err_with(InternalError, || {
294                        err_str_path("fail to read from", &file_path)
295                    })?;
296                Ok::<Vec<u8>, BError>(buffer)
297            })
298            .await
299            .or_err(InternalError, "async blocking IO failure")??;
300
301            if let Err(e) = self.deserialize_shard(&data) {
302                warn!("Failed to deserialize shard {}: {}. Skipping shard.", i, e);
303                continue; // Skip shard and move onto the next one
304            }
305            loaded_shards += 1;
306        }
307
308        // Log how many shards were successfully loaded
309        if loaded_shards < N {
310            warn!(
311                "Only loaded {}/{} shards. Cache may be incomplete.",
312                loaded_shards, N
313            )
314        } else {
315            info!("Successfully loaded {}/{} shards.", loaded_shards, N)
316        }
317
318        cleanup_temp_files(dir_path);
319
320        Ok(())
321    }
322}
323
324fn cleanup_temp_files(dir_path: &str) {
325    let dir_path = Path::new(dir_path).to_owned();
326
327    tokio::task::spawn_blocking({
328        move || {
329            if !dir_path.exists() {
330                return;
331            }
332
333            let entries = match std::fs::read_dir(&dir_path) {
334                Ok(entries) => entries,
335                Err(e) => {
336                    warn!("Failed to read directory {}: {e}", dir_path.display());
337                    return;
338                }
339            };
340
341            let mut cleaned_count = 0;
342            let mut error_count = 0;
343
344            for entry in entries {
345                let entry = match entry {
346                    Ok(entry) => entry,
347                    Err(e) => {
348                        warn!(
349                            "Failed to read directory entry in {}: {e}",
350                            dir_path.display()
351                        );
352                        error_count += 1;
353                        continue;
354                    }
355                };
356
357                let file_name = entry.file_name();
358                let file_name_str = file_name.to_string_lossy();
359
360                if file_name_str.starts_with(FILE_NAME) && file_name_str.ends_with(".tmp") {
361                    match std::fs::remove_file(entry.path()) {
362                        Ok(()) => {
363                            info!("Cleaned up orphaned temp file: {}", entry.path().display());
364                            cleaned_count += 1;
365                        }
366                        Err(e) => {
367                            warn!("Failed to remove temp file {}: {e}", entry.path().display());
368                            error_count += 1;
369                        }
370                    }
371                }
372            }
373
374            if cleaned_count > 0 || error_count > 0 {
375                info!(
376                    "Temp file cleanup completed. Removed: {cleaned_count}, Errors: {error_count}"
377                );
378            }
379        }
380    });
381}
382
383#[cfg(test)]
384mod test {
385    use super::*;
386    use crate::CacheKey;
387
388    // we use shard (N) = 1 for eviction consistency in all tests
389
390    #[test]
391    fn test_admission() {
392        let lru = Manager::<1>::with_capacity(4, 10);
393        let key1 = CacheKey::new("", "a", "1").to_compact();
394        let until = SystemTime::now(); // unused value as a placeholder
395        let v = lru.admit(key1.clone(), 1, until);
396        assert_eq!(v.len(), 0);
397        let key2 = CacheKey::new("", "b", "1").to_compact();
398        let v = lru.admit(key2.clone(), 2, until);
399        assert_eq!(v.len(), 0);
400        let key3 = CacheKey::new("", "c", "1").to_compact();
401        let v = lru.admit(key3, 1, until);
402        assert_eq!(v.len(), 0);
403
404        // lru si full (4) now
405
406        let key4 = CacheKey::new("", "d", "1").to_compact();
407        let v = lru.admit(key4, 2, until);
408        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
409        assert_eq!(v.len(), 2);
410        assert_eq!(v[0], key1);
411        assert_eq!(v[1], key2);
412    }
413
414    #[test]
415    fn test_access() {
416        let lru = Manager::<1>::with_capacity(4, 10);
417        let key1 = CacheKey::new("", "a", "1").to_compact();
418        let until = SystemTime::now(); // unused value as a placeholder
419        let v = lru.admit(key1.clone(), 1, until);
420        assert_eq!(v.len(), 0);
421        let key2 = CacheKey::new("", "b", "1").to_compact();
422        let v = lru.admit(key2.clone(), 2, until);
423        assert_eq!(v.len(), 0);
424        let key3 = CacheKey::new("", "c", "1").to_compact();
425        let v = lru.admit(key3, 1, until);
426        assert_eq!(v.len(), 0);
427
428        // lru is full (4) now
429        // make key1 most recently used
430        lru.access(&key1, 1, until);
431        assert_eq!(v.len(), 0);
432
433        let key4 = CacheKey::new("", "d", "1").to_compact();
434        let v = lru.admit(key4, 2, until);
435        assert_eq!(v.len(), 1);
436        assert_eq!(v[0], key2);
437    }
438
439    #[test]
440    fn test_remove() {
441        let lru = Manager::<1>::with_capacity(4, 10);
442        let key1 = CacheKey::new("", "a", "1").to_compact();
443        let until = SystemTime::now(); // unused value as a placeholder
444        let v = lru.admit(key1.clone(), 1, until);
445        assert_eq!(v.len(), 0);
446        let key2 = CacheKey::new("", "b", "1").to_compact();
447        let v = lru.admit(key2.clone(), 2, until);
448        assert_eq!(v.len(), 0);
449        let key3 = CacheKey::new("", "c", "1").to_compact();
450        let v = lru.admit(key3, 1, until);
451        assert_eq!(v.len(), 0);
452
453        // lru is full (4) now
454        // remove key1
455        lru.remove(&key1);
456
457        // key2 is the least recently used one now
458        let key4 = CacheKey::new("", "d", "1").to_compact();
459        let v = lru.admit(key4, 2, until);
460        assert_eq!(v.len(), 1);
461        assert_eq!(v[0], key2);
462    }
463
464    #[test]
465    fn test_access_add() {
466        let lru = Manager::<1>::with_capacity(4, 10);
467        let until = SystemTime::now(); // unused value as a placeholder
468
469        let key1 = CacheKey::new("", "a", "1").to_compact();
470        lru.access(&key1, 1, until);
471        let key2 = CacheKey::new("", "b", "1").to_compact();
472        lru.access(&key2, 2, until);
473        let key3 = CacheKey::new("", "c", "1").to_compact();
474        lru.access(&key3, 2, until);
475
476        let key4 = CacheKey::new("", "d", "1").to_compact();
477        let v = lru.admit(key4, 2, until);
478        // need to reduce used by at least 2, both key1 and key2 are evicted to make room for 3
479        assert_eq!(v.len(), 2);
480        assert_eq!(v[0], key1);
481        assert_eq!(v[1], key2);
482    }
483
484    #[test]
485    fn test_admit_update() {
486        let lru = Manager::<1>::with_capacity(4, 10);
487        let key1 = CacheKey::new("", "a", "1").to_compact();
488        let until = SystemTime::now(); // unused value as a placeholder
489        let v = lru.admit(key1.clone(), 1, until);
490        assert_eq!(v.len(), 0);
491        let key2 = CacheKey::new("", "b", "1").to_compact();
492        let v = lru.admit(key2.clone(), 2, until);
493        assert_eq!(v.len(), 0);
494        let key3 = CacheKey::new("", "c", "1").to_compact();
495        let v = lru.admit(key3, 1, until);
496        assert_eq!(v.len(), 0);
497
498        // lru is full (4) now
499        // update key2 to reduce its size by 1
500        let v = lru.admit(key2, 1, until);
501        assert_eq!(v.len(), 0);
502
503        // lru is not full anymore
504        let key4 = CacheKey::new("", "d", "1").to_compact();
505        let v = lru.admit(key4.clone(), 1, until);
506        assert_eq!(v.len(), 0);
507
508        // make key4 larger
509        let v = lru.admit(key4, 2, until);
510        // need to evict now
511        assert_eq!(v.len(), 1);
512        assert_eq!(v[0], key1);
513    }
514
515    #[test]
516    fn test_peek() {
517        let lru = Manager::<1>::with_capacity(4, 10);
518        let until = SystemTime::now(); // unused value as a placeholder
519
520        let key1 = CacheKey::new("", "a", "1").to_compact();
521        lru.access(&key1, 1, until);
522        let key2 = CacheKey::new("", "b", "1").to_compact();
523        lru.access(&key2, 2, until);
524        assert!(lru.peek(&key1));
525        assert!(lru.peek(&key2));
526    }
527
528    #[test]
529    fn test_serde() {
530        let lru = Manager::<1>::with_capacity(4, 10);
531        let key1 = CacheKey::new("", "a", "1").to_compact();
532        let until = SystemTime::now(); // unused value as a placeholder
533        let v = lru.admit(key1.clone(), 1, until);
534        assert_eq!(v.len(), 0);
535        let key2 = CacheKey::new("", "b", "1").to_compact();
536        let v = lru.admit(key2.clone(), 2, until);
537        assert_eq!(v.len(), 0);
538        let key3 = CacheKey::new("", "c", "1").to_compact();
539        let v = lru.admit(key3, 1, until);
540        assert_eq!(v.len(), 0);
541
542        // lru is full (4) now
543        // make key1 most recently used
544        lru.access(&key1, 1, until);
545        assert_eq!(v.len(), 0);
546
547        // load lru2 with lru's data
548        let ser = lru.serialize_shard(0).unwrap();
549        let lru2 = Manager::<1>::with_capacity(4, 10);
550        lru2.deserialize_shard(&ser).unwrap();
551
552        let key4 = CacheKey::new("", "d", "1").to_compact();
553        let v = lru2.admit(key4, 2, until);
554        assert_eq!(v.len(), 1);
555        assert_eq!(v[0], key2);
556    }
557
558    #[tokio::test]
559    async fn test_save_to_disk() {
560        let until = SystemTime::now(); // unused value as a placeholder
561        let lru = Manager::<2>::with_capacity(10, 10);
562
563        lru.admit(CacheKey::new("", "a", "1").to_compact(), 1, until);
564        lru.admit(CacheKey::new("", "b", "1").to_compact(), 2, until);
565        lru.admit(CacheKey::new("", "c", "1").to_compact(), 1, until);
566        lru.admit(CacheKey::new("", "d", "1").to_compact(), 1, until);
567        lru.admit(CacheKey::new("", "e", "1").to_compact(), 2, until);
568        lru.admit(CacheKey::new("", "f", "1").to_compact(), 1, until);
569
570        // load lru2 with lru's data
571        lru.save("/tmp/test_lru_save").await.unwrap();
572        let lru2 = Manager::<2>::with_capacity(4, 10);
573        lru2.load("/tmp/test_lru_save").await.unwrap();
574
575        let ser0 = lru.serialize_shard(0).unwrap();
576        let ser1 = lru.serialize_shard(1).unwrap();
577
578        assert_eq!(ser0, lru2.serialize_shard(0).unwrap());
579        assert_eq!(ser1, lru2.serialize_shard(1).unwrap());
580    }
581
582    #[tokio::test]
583    async fn test_temp_file_cleanup() {
584        let test_dir = "/tmp/test_lru_cleanup";
585        let dir_path = Path::new(test_dir);
586
587        // Create test directory
588        std::fs::create_dir_all(dir_path).unwrap();
589
590        // Create some fake temp files
591        let temp_files = [
592            "lru.data.0.12345678.tmp",
593            "lru.data.1.abcdef00.tmp",
594            "other_file.tmp", // Should not be removed
595            "lru.data.2",     // Should not be removed
596        ];
597
598        for file in temp_files {
599            let file_path = dir_path.join(file);
600            std::fs::write(&file_path, b"test").unwrap();
601        }
602
603        // Run cleanup
604        cleanup_temp_files(test_dir);
605
606        tokio::time::sleep(core::time::Duration::from_secs(1)).await;
607
608        // Check results
609        assert!(!dir_path.join("lru.data.0.12345678.tmp").exists());
610        assert!(!dir_path.join("lru.data.1.abcdef00.tmp").exists());
611        assert!(dir_path.join("other_file.tmp").exists()); // Should remain
612        assert!(dir_path.join("lru.data.2").exists()); // Should remain
613
614        // Cleanup test directory
615        std::fs::remove_dir_all(dir_path).unwrap();
616    }
617}