Skip to main content

liquid_cache/cache/policies/cache/
mod.rs

1//! Cache policies for liquid cache.
2
3use crate::cache::cached_batch::CachedBatchType;
4use crate::cache::utils::EntryID;
5
6mod clock;
7mod doubly_linked_list;
8mod filo;
9mod lru;
10mod s3_fifo;
11mod sieve;
12mod three_queue;
13
14pub use clock::ClockPolicy;
15pub use filo::FifoPolicy;
16pub use filo::FiloPolicy;
17pub use lru::LruPolicy;
18pub use s3_fifo::S3FifoPolicy;
19pub use sieve::SievePolicy;
20pub use three_queue::LiquidPolicy;
21
22/// The cache policy that guides the replacement of LiquidCache
23pub trait CachePolicy: std::fmt::Debug + Send + Sync {
24    /// Give cnt amount of entries to evict when cache is full.
25    fn find_victim(&self, cnt: usize) -> Vec<EntryID>;
26
27    /// Notify the cache policy that an entry was inserted.
28    fn notify_insert(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
29
30    /// Notify the cache policy that an entry was accessed.
31    fn notify_access(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
32}
33
34#[cfg(test)]
35mod tests {
36    use super::*;
37    use crate::cache::utils::EntryID;
38    use crate::sync::{Arc, Mutex, thread};
39
40    fn entry(id: usize) -> EntryID {
41        id.into()
42    }
43
44    fn concurrent_invariant_advice_once(policy: Arc<dyn CachePolicy>) {
45        let num_threads = 4;
46
47        for i in 0..100 {
48            policy.notify_insert(&entry(i), CachedBatchType::MemoryArrow);
49        }
50
51        let advised_entries = Arc::new(Mutex::new(Vec::new()));
52
53        let mut handles = Vec::new();
54        for _ in 0..num_threads {
55            let policy_clone = policy.clone();
56            let advised_entries_clone = advised_entries.clone();
57
58            let handle = thread::spawn(move || {
59                let advice = policy_clone.find_victim(1);
60                if let Some(entry_id) = advice.first() {
61                    let mut entries = advised_entries_clone.lock().unwrap();
62                    entries.push(*entry_id);
63                }
64            });
65
66            handles.push(handle);
67        }
68
69        for handle in handles {
70            handle.join().unwrap();
71        }
72
73        let entries = advised_entries.lock().unwrap();
74        let mut unique_entries = entries.clone();
75        unique_entries.sort();
76        unique_entries.dedup();
77
78        assert_eq!(
79            entries.len(),
80            unique_entries.len(),
81            "Some entries were advised for eviction multiple times: {entries:?}"
82        );
83    }
84
85    fn run_concurrent_invariant_tests() {
86        concurrent_invariant_advice_once(Arc::new(LruPolicy::new()));
87        concurrent_invariant_advice_once(Arc::new(FiloPolicy::new()));
88    }
89
90    #[test]
91    fn test_concurrent_invariant_advice_once() {
92        run_concurrent_invariant_tests();
93    }
94
95    #[cfg(feature = "shuttle")]
96    #[test]
97    fn shuttle_concurrent_invariant_advice_once() {
98        crate::utils::shuttle_test(run_concurrent_invariant_tests);
99    }
100}