Skip to main content

liquid_cache/cache/policies/cache/
mod.rs

1//! Cache policies for liquid cache.
2
3use crate::cache::cached_batch::CachedBatchType;
4use crate::cache::utils::EntryID;
5
6mod doubly_linked_list;
7mod three_queue;
8
9pub use three_queue::LiquidPolicy;
10
11/// The cache policy that guides the replacement of LiquidCache
12pub trait CachePolicy: std::fmt::Debug + Send + Sync {
13    /// Give cnt amount of entries to evict when cache is full.
14    fn find_memory_victim(&self, cnt: usize) -> Vec<EntryID>;
15
16    /// Give cnt amount of disk entries to remove when disk is full.
17    fn find_disk_victim(&self, _cnt: usize) -> Vec<EntryID> {
18        vec![]
19    }
20
21    /// Notify the cache policy that an entry was inserted.
22    fn notify_insert(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
23
24    /// Notify the cache policy that an entry was accessed.
25    fn notify_access(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
26
27    /// Notify the cache policy that an entry was removed.
28    fn notify_remove(&self, _entry_id: &EntryID) {}
29}
30
31#[cfg(test)]
32mod tests {
33    use super::*;
34    use crate::cache::utils::EntryID;
35    use crate::sync::{Arc, Mutex, thread};
36
37    fn entry(id: usize) -> EntryID {
38        id.into()
39    }
40
41    fn concurrent_invariant_advice_once(policy: Arc<dyn CachePolicy>) {
42        let num_threads = 4;
43
44        for i in 0..100 {
45            policy.notify_insert(&entry(i), CachedBatchType::MemoryArrow);
46        }
47
48        let advised_entries = Arc::new(Mutex::new(Vec::new()));
49
50        let mut handles = Vec::new();
51        for _ in 0..num_threads {
52            let policy_clone = policy.clone();
53            let advised_entries_clone = advised_entries.clone();
54
55            let handle = thread::spawn(move || {
56                let advice = policy_clone.find_memory_victim(1);
57                if let Some(entry_id) = advice.first() {
58                    let mut entries = advised_entries_clone.lock().unwrap();
59                    entries.push(*entry_id);
60                }
61            });
62
63            handles.push(handle);
64        }
65
66        for handle in handles {
67            handle.join().unwrap();
68        }
69
70        let entries = advised_entries.lock().unwrap();
71        let mut unique_entries = entries.clone();
72        unique_entries.sort();
73        unique_entries.dedup();
74
75        assert_eq!(
76            entries.len(),
77            unique_entries.len(),
78            "Some entries were advised for eviction multiple times: {entries:?}"
79        );
80    }
81
82    fn run_concurrent_invariant_tests() {
83        concurrent_invariant_advice_once(Arc::new(LiquidPolicy::new()));
84    }
85
86    #[test]
87    fn test_concurrent_invariant_advice_once() {
88        run_concurrent_invariant_tests();
89    }
90
91    #[cfg(feature = "shuttle")]
92    #[test]
93    fn shuttle_concurrent_invariant_advice_once() {
94        crate::utils::shuttle_test(run_concurrent_invariant_tests);
95    }
96}