liquid_cache_storage/cache/cache_policies/
filo.rs

1//! FILO (First In, Last Out) and FIFO cache policy implementations.
2
3use std::{collections::HashMap, ptr::NonNull};
4
5use crate::{
6    cache::{cached_batch::CachedBatchType, utils::EntryID},
7    sync::Mutex,
8};
9
10use super::{
11    CachePolicy,
12    doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node},
13};
14
15#[derive(Debug)]
16struct QueueNode {
17    entry_id: EntryID,
18}
19
20type NodePtr = NonNull<DoublyLinkedNode<QueueNode>>;
21
22#[derive(Debug, Default)]
23struct QueueState {
24    map: HashMap<EntryID, NodePtr>,
25    list: DoublyLinkedList<QueueNode>,
26}
27
28impl QueueState {
29    fn is_empty(&self) -> bool {
30        self.list.head().is_none()
31    }
32
33    fn insert_front(&mut self, entry_id: EntryID) {
34        if let Some(ptr) = self.map.get(&entry_id).copied() {
35            unsafe {
36                self.list.unlink(ptr);
37                self.list.push_front(ptr);
38            }
39            return;
40        }
41
42        let node = DoublyLinkedNode::new(QueueNode { entry_id });
43        let ptr = NonNull::from(Box::leak(node));
44
45        self.map.insert(entry_id, ptr);
46        unsafe {
47            self.list.push_front(ptr);
48        }
49    }
50
51    fn insert_back(&mut self, entry_id: EntryID) {
52        if let Some(ptr) = self.map.get(&entry_id).copied() {
53            unsafe {
54                self.list.unlink(ptr);
55                self.list.push_back(ptr);
56            }
57            return;
58        }
59
60        let node = DoublyLinkedNode::new(QueueNode { entry_id });
61        let ptr = NonNull::from(Box::leak(node));
62
63        self.map.insert(entry_id, ptr);
64        unsafe {
65            self.list.push_back(ptr);
66        }
67    }
68
69    fn pop_front(&mut self) -> Option<EntryID> {
70        let head_ptr = self.list.head()?;
71        let entry_id = unsafe { head_ptr.as_ref().data.entry_id };
72        let node_ptr = self
73            .map
74            .remove(&entry_id)
75            .expect("head pointer must have map entry");
76        unsafe {
77            self.list.unlink(node_ptr);
78            drop_boxed_node(node_ptr);
79        }
80        Some(entry_id)
81    }
82}
83
84impl Drop for QueueState {
85    fn drop(&mut self) {
86        let handles: Vec<_> = self.map.drain().map(|(_, ptr)| ptr).collect();
87        for ptr in handles {
88            unsafe {
89                self.list.unlink(ptr);
90                drop_boxed_node(ptr);
91            }
92        }
93        unsafe {
94            self.list.drop_all();
95        }
96    }
97}
98
99/// The policy that implements the FILO (First In, Last Out) algorithm.
100/// Newest entries are evicted first.
101#[derive(Debug, Default)]
102pub struct FiloPolicy {
103    state: Mutex<QueueState>,
104}
105
106impl FiloPolicy {
107    /// Create a new [`FiloPolicy`].
108    pub fn new() -> Self {
109        Self {
110            state: Mutex::new(QueueState::default()),
111        }
112    }
113}
114
115// SAFETY: Access to raw pointers is protected by the internal `Mutex`.
116unsafe impl Send for FiloPolicy {}
117unsafe impl Sync for FiloPolicy {}
118
119impl CachePolicy for FiloPolicy {
120    fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
121        if cnt == 0 {
122            return vec![];
123        }
124
125        let mut state = self.state.lock().unwrap();
126        if state.is_empty() {
127            return vec![];
128        }
129
130        let mut victims = Vec::with_capacity(cnt);
131        for _ in 0..cnt {
132            let Some(entry) = state.pop_front() else {
133                break;
134            };
135            victims.push(entry);
136        }
137        victims
138    }
139
140    fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
141        let mut state = self.state.lock().unwrap();
142        state.insert_front(*entry_id);
143    }
144}
145
146/// The policy that implements the FIFO (First In, First Out) algorithm.
147/// Oldest entries are evicted first.
148#[derive(Debug, Default)]
149pub struct FifoPolicy {
150    state: Mutex<QueueState>,
151}
152
153impl FifoPolicy {
154    /// Create a new [`FifoPolicy`].
155    pub fn new() -> Self {
156        Self {
157            state: Mutex::new(QueueState::default()),
158        }
159    }
160}
161
162// SAFETY: Access to raw pointers is protected by the internal `Mutex`.
163unsafe impl Send for FifoPolicy {}
164unsafe impl Sync for FifoPolicy {}
165
166impl CachePolicy for FifoPolicy {
167    fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
168        if cnt == 0 {
169            return vec![];
170        }
171
172        let mut state = self.state.lock().unwrap();
173        if state.is_empty() {
174            return vec![];
175        }
176
177        let mut victims = Vec::with_capacity(cnt);
178        for _ in 0..cnt {
179            let Some(entry) = state.pop_front() else {
180                break;
181            };
182            victims.push(entry);
183        }
184        victims
185    }
186
187    fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
188        let mut state = self.state.lock().unwrap();
189        state.insert_back(*entry_id);
190    }
191}
192
193#[cfg(test)]
194mod tests {
195    use super::*;
196    use crate::cache::cached_batch::{CachedBatch, CachedBatchType};
197    use crate::cache::utils::{EntryID, create_cache_store, create_test_arrow_array};
198
199    fn entry(id: usize) -> EntryID {
200        id.into()
201    }
202
203    #[tokio::test]
204    async fn test_filo_advisor() {
205        let advisor = FiloPolicy::new();
206        let store = create_cache_store(3000, Box::new(advisor));
207
208        let entry_id1 = EntryID::from(1);
209        let entry_id2 = EntryID::from(2);
210        let entry_id3 = EntryID::from(3);
211
212        store.insert(entry_id1, create_test_arrow_array(100)).await;
213
214        let data = store.index().get(&entry_id1).unwrap();
215        assert!(matches!(data, CachedBatch::MemoryArrow(_)));
216        store.insert(entry_id2, create_test_arrow_array(100)).await;
217        store.insert(entry_id3, create_test_arrow_array(100)).await;
218
219        let entry_id4: EntryID = EntryID::from(4);
220        store.insert(entry_id4, create_test_arrow_array(100)).await;
221
222        assert!(store.index().get(&entry_id1).is_some());
223        assert!(store.index().get(&entry_id2).is_some());
224        assert!(store.index().get(&entry_id4).is_some());
225
226        if let Some(data) = store.index().get(&entry_id3) {
227            assert!(matches!(data, CachedBatch::DiskLiquid(_)));
228        }
229    }
230
231    #[test]
232    fn test_filo_advise_empty() {
233        let policy = FiloPolicy::new();
234        assert!(policy.find_victim(1).is_empty());
235    }
236
237    #[test]
238    fn test_filo_advise_order() {
239        let policy = FiloPolicy::new();
240        let e1 = entry(1);
241        let e2 = entry(2);
242
243        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
244        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
245
246        assert_eq!(policy.find_victim(1), vec![e2]);
247        assert_eq!(policy.find_victim(1), vec![e1]);
248    }
249
250    #[test]
251    fn test_filo_reinsert_moves_to_front() {
252        let policy = FiloPolicy::new();
253        let first = entry(1);
254        let second = entry(2);
255
256        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
257        policy.notify_insert(&second, CachedBatchType::MemoryArrow);
258        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
259
260        assert_eq!(policy.find_victim(1), vec![first]);
261        assert_eq!(policy.find_victim(1), vec![second]);
262    }
263
264    #[test]
265    fn test_fifo_advise_empty() {
266        let policy = FifoPolicy::new();
267        assert!(policy.find_victim(1).is_empty());
268    }
269
270    #[test]
271    fn test_fifo_advise_order() {
272        let policy = FifoPolicy::new();
273        let e1 = entry(1);
274        let e2 = entry(2);
275
276        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
277        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
278
279        assert_eq!(policy.find_victim(1), vec![e1]);
280        assert_eq!(policy.find_victim(1), vec![e2]);
281    }
282
283    #[test]
284    fn test_fifo_reinsert_moves_to_back() {
285        let policy = FifoPolicy::new();
286        let first = entry(1);
287        let second = entry(2);
288
289        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
290        policy.notify_insert(&second, CachedBatchType::MemoryArrow);
291        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
292
293        assert_eq!(policy.find_victim(1), vec![second]);
294        assert_eq!(policy.find_victim(1), vec![first]);
295    }
296}