liquid_cache_storage/cache/cache_policies/
filo.rs1use std::{collections::HashMap, ptr::NonNull};
4
5use crate::{
6 cache::{cached_batch::CachedBatchType, utils::EntryID},
7 sync::Mutex,
8};
9
10use super::{
11 CachePolicy,
12 doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node},
13};
14
15#[derive(Debug)]
16struct QueueNode {
17 entry_id: EntryID,
18}
19
20type NodePtr = NonNull<DoublyLinkedNode<QueueNode>>;
21
22#[derive(Debug, Default)]
23struct QueueState {
24 map: HashMap<EntryID, NodePtr>,
25 list: DoublyLinkedList<QueueNode>,
26}
27
28impl QueueState {
29 fn is_empty(&self) -> bool {
30 self.list.head().is_none()
31 }
32
33 fn insert_front(&mut self, entry_id: EntryID) {
34 if let Some(ptr) = self.map.get(&entry_id).copied() {
35 unsafe {
36 self.list.unlink(ptr);
37 self.list.push_front(ptr);
38 }
39 return;
40 }
41
42 let node = DoublyLinkedNode::new(QueueNode { entry_id });
43 let ptr = NonNull::from(Box::leak(node));
44
45 self.map.insert(entry_id, ptr);
46 unsafe {
47 self.list.push_front(ptr);
48 }
49 }
50
51 fn insert_back(&mut self, entry_id: EntryID) {
52 if let Some(ptr) = self.map.get(&entry_id).copied() {
53 unsafe {
54 self.list.unlink(ptr);
55 self.list.push_back(ptr);
56 }
57 return;
58 }
59
60 let node = DoublyLinkedNode::new(QueueNode { entry_id });
61 let ptr = NonNull::from(Box::leak(node));
62
63 self.map.insert(entry_id, ptr);
64 unsafe {
65 self.list.push_back(ptr);
66 }
67 }
68
69 fn pop_front(&mut self) -> Option<EntryID> {
70 let head_ptr = self.list.head()?;
71 let entry_id = unsafe { head_ptr.as_ref().data.entry_id };
72 let node_ptr = self
73 .map
74 .remove(&entry_id)
75 .expect("head pointer must have map entry");
76 unsafe {
77 self.list.unlink(node_ptr);
78 drop_boxed_node(node_ptr);
79 }
80 Some(entry_id)
81 }
82}
83
84impl Drop for QueueState {
85 fn drop(&mut self) {
86 let handles: Vec<_> = self.map.drain().map(|(_, ptr)| ptr).collect();
87 for ptr in handles {
88 unsafe {
89 self.list.unlink(ptr);
90 drop_boxed_node(ptr);
91 }
92 }
93 unsafe {
94 self.list.drop_all();
95 }
96 }
97}
98
99#[derive(Debug, Default)]
102pub struct FiloPolicy {
103 state: Mutex<QueueState>,
104}
105
106impl FiloPolicy {
107 pub fn new() -> Self {
109 Self {
110 state: Mutex::new(QueueState::default()),
111 }
112 }
113}
114
115unsafe impl Send for FiloPolicy {}
117unsafe impl Sync for FiloPolicy {}
118
119impl CachePolicy for FiloPolicy {
120 fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
121 if cnt == 0 {
122 return vec![];
123 }
124
125 let mut state = self.state.lock().unwrap();
126 if state.is_empty() {
127 return vec![];
128 }
129
130 let mut victims = Vec::with_capacity(cnt);
131 for _ in 0..cnt {
132 let Some(entry) = state.pop_front() else {
133 break;
134 };
135 victims.push(entry);
136 }
137 victims
138 }
139
140 fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
141 let mut state = self.state.lock().unwrap();
142 state.insert_front(*entry_id);
143 }
144}
145
146#[derive(Debug, Default)]
149pub struct FifoPolicy {
150 state: Mutex<QueueState>,
151}
152
153impl FifoPolicy {
154 pub fn new() -> Self {
156 Self {
157 state: Mutex::new(QueueState::default()),
158 }
159 }
160}
161
162unsafe impl Send for FifoPolicy {}
164unsafe impl Sync for FifoPolicy {}
165
166impl CachePolicy for FifoPolicy {
167 fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
168 if cnt == 0 {
169 return vec![];
170 }
171
172 let mut state = self.state.lock().unwrap();
173 if state.is_empty() {
174 return vec![];
175 }
176
177 let mut victims = Vec::with_capacity(cnt);
178 for _ in 0..cnt {
179 let Some(entry) = state.pop_front() else {
180 break;
181 };
182 victims.push(entry);
183 }
184 victims
185 }
186
187 fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
188 let mut state = self.state.lock().unwrap();
189 state.insert_back(*entry_id);
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use super::*;
196 use crate::cache::cached_batch::{CachedBatch, CachedBatchType};
197 use crate::cache::utils::{EntryID, create_cache_store, create_test_arrow_array};
198
199 fn entry(id: usize) -> EntryID {
200 id.into()
201 }
202
203 #[tokio::test]
204 async fn test_filo_advisor() {
205 let advisor = FiloPolicy::new();
206 let store = create_cache_store(3000, Box::new(advisor));
207
208 let entry_id1 = EntryID::from(1);
209 let entry_id2 = EntryID::from(2);
210 let entry_id3 = EntryID::from(3);
211
212 store.insert(entry_id1, create_test_arrow_array(100)).await;
213
214 let data = store.index().get(&entry_id1).unwrap();
215 assert!(matches!(data, CachedBatch::MemoryArrow(_)));
216 store.insert(entry_id2, create_test_arrow_array(100)).await;
217 store.insert(entry_id3, create_test_arrow_array(100)).await;
218
219 let entry_id4: EntryID = EntryID::from(4);
220 store.insert(entry_id4, create_test_arrow_array(100)).await;
221
222 assert!(store.index().get(&entry_id1).is_some());
223 assert!(store.index().get(&entry_id2).is_some());
224 assert!(store.index().get(&entry_id4).is_some());
225
226 if let Some(data) = store.index().get(&entry_id3) {
227 assert!(matches!(data, CachedBatch::DiskLiquid(_)));
228 }
229 }
230
231 #[test]
232 fn test_filo_advise_empty() {
233 let policy = FiloPolicy::new();
234 assert!(policy.find_victim(1).is_empty());
235 }
236
237 #[test]
238 fn test_filo_advise_order() {
239 let policy = FiloPolicy::new();
240 let e1 = entry(1);
241 let e2 = entry(2);
242
243 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
244 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
245
246 assert_eq!(policy.find_victim(1), vec![e2]);
247 assert_eq!(policy.find_victim(1), vec![e1]);
248 }
249
250 #[test]
251 fn test_filo_reinsert_moves_to_front() {
252 let policy = FiloPolicy::new();
253 let first = entry(1);
254 let second = entry(2);
255
256 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
257 policy.notify_insert(&second, CachedBatchType::MemoryArrow);
258 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
259
260 assert_eq!(policy.find_victim(1), vec![first]);
261 assert_eq!(policy.find_victim(1), vec![second]);
262 }
263
264 #[test]
265 fn test_fifo_advise_empty() {
266 let policy = FifoPolicy::new();
267 assert!(policy.find_victim(1).is_empty());
268 }
269
270 #[test]
271 fn test_fifo_advise_order() {
272 let policy = FifoPolicy::new();
273 let e1 = entry(1);
274 let e2 = entry(2);
275
276 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
277 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
278
279 assert_eq!(policy.find_victim(1), vec![e1]);
280 assert_eq!(policy.find_victim(1), vec![e2]);
281 }
282
283 #[test]
284 fn test_fifo_reinsert_moves_to_back() {
285 let policy = FifoPolicy::new();
286 let first = entry(1);
287 let second = entry(2);
288
289 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
290 policy.notify_insert(&second, CachedBatchType::MemoryArrow);
291 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
292
293 assert_eq!(policy.find_victim(1), vec![second]);
294 assert_eq!(policy.find_victim(1), vec![first]);
295 }
296}