liquid_cache/cache/policies/cache/
three_queue.rs1use std::{collections::HashMap, ptr::NonNull};
2
3use crate::{
4 cache::{CachePolicy, EntryID, cached_batch::CachedBatchType},
5 sync::Mutex,
6};
7
8use super::doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11enum QueueKind {
12 Arrow,
13 Liquid,
14 Squeezed,
15 Disk,
16}
17
18#[derive(Debug)]
19struct QueueNode {
20 entry_id: EntryID,
21 queue: QueueKind,
22}
23
24type NodePtr = NonNull<DoublyLinkedNode<QueueNode>>;
25
26#[derive(Default, Debug)]
27struct LiquidQueueInternalState {
28 map: HashMap<EntryID, NodePtr>,
29 arrow: DoublyLinkedList<QueueNode>,
30 liquid: DoublyLinkedList<QueueNode>,
31 squeezed: DoublyLinkedList<QueueNode>,
32 disk: DoublyLinkedList<QueueNode>,
33}
34
35impl LiquidQueueInternalState {
36 unsafe fn list_mut(&mut self, queue: QueueKind) -> &mut DoublyLinkedList<QueueNode> {
37 match queue {
38 QueueKind::Arrow => &mut self.arrow,
39 QueueKind::Liquid => &mut self.liquid,
40 QueueKind::Squeezed => &mut self.squeezed,
41 QueueKind::Disk => &mut self.disk,
42 }
43 }
44
45 unsafe fn push_back(&mut self, queue: QueueKind, mut node_ptr: NodePtr) {
46 unsafe {
47 node_ptr.as_mut().data.queue = queue;
48 self.list_mut(queue).push_back(node_ptr);
49 }
50 }
51
52 unsafe fn detach(&mut self, node_ptr: NodePtr) {
53 unsafe {
54 let queue = node_ptr.as_ref().data.queue;
55 self.list_mut(queue).unlink(node_ptr);
56 }
57 }
58
59 fn upsert_into_queue(&mut self, entry_id: EntryID, target: QueueKind) {
60 if let Some(node_ptr) = self.map.get(&entry_id).copied() {
61 unsafe {
62 self.detach(node_ptr);
63 self.push_back(target, node_ptr);
64 }
65 return;
66 }
67
68 let node = DoublyLinkedNode::new(QueueNode {
69 entry_id,
70 queue: target,
71 });
72 let node_ptr = NonNull::from(Box::leak(node));
73
74 self.map.insert(entry_id, node_ptr);
75 unsafe {
76 self.push_back(target, node_ptr);
77 }
78 }
79
80 fn pop_front(&mut self, queue: QueueKind) -> Option<EntryID> {
81 let list = match queue {
82 QueueKind::Arrow => &mut self.arrow,
83 QueueKind::Liquid => &mut self.liquid,
84 QueueKind::Squeezed => &mut self.squeezed,
85 QueueKind::Disk => &mut self.disk,
86 };
87
88 let head_ptr = list.head()?;
89 let entry_id = unsafe { head_ptr.as_ref().data.entry_id };
90 let node_ptr = self
91 .map
92 .remove(&entry_id)
93 .expect("list head must exist in map");
94 unsafe {
95 list.unlink(node_ptr);
96 drop_boxed_node(node_ptr);
97 }
98 Some(entry_id)
99 }
100
101 fn remove(&mut self, entry_id: &EntryID) -> Option<EntryID> {
102 let node_ptr = self.map.remove(entry_id)?;
103 let removed = unsafe { node_ptr.as_ref().data.entry_id };
104 unsafe {
105 self.detach(node_ptr);
106 drop_boxed_node(node_ptr);
107 }
108 Some(removed)
109 }
110}
111
112impl Drop for LiquidQueueInternalState {
113 fn drop(&mut self) {
114 let nodes: Vec<_> = self.map.drain().map(|(_, ptr)| ptr).collect();
115 for node_ptr in nodes {
116 unsafe {
117 match node_ptr.as_ref().data.queue {
118 QueueKind::Arrow => self.arrow.unlink(node_ptr),
119 QueueKind::Liquid => self.liquid.unlink(node_ptr),
120 QueueKind::Squeezed => self.squeezed.unlink(node_ptr),
121 QueueKind::Disk => self.disk.unlink(node_ptr),
122 }
123 drop_boxed_node(node_ptr);
124 }
125 }
126
127 unsafe {
128 self.arrow.drop_all();
129 self.liquid.drop_all();
130 self.squeezed.drop_all();
131 self.disk.drop_all();
132 }
133 }
134}
135
136#[derive(Debug, Default)]
138pub struct LiquidPolicy {
139 inner: Mutex<LiquidQueueInternalState>,
140}
141
142impl LiquidPolicy {
143 pub fn new() -> Self {
145 Self {
146 inner: Mutex::new(LiquidQueueInternalState::default()),
147 }
148 }
149}
150
151unsafe impl Send for LiquidPolicy {}
153unsafe impl Sync for LiquidPolicy {}
154
155impl CachePolicy for LiquidPolicy {
156 fn notify_insert(&self, entry_id: &EntryID, batch_type: CachedBatchType) {
157 let mut inner = self.inner.lock().unwrap();
158 let target = match batch_type {
159 CachedBatchType::MemoryArrow => QueueKind::Arrow,
160 CachedBatchType::MemoryLiquid => QueueKind::Liquid,
161 CachedBatchType::MemorySqueezedLiquid => QueueKind::Squeezed,
162 CachedBatchType::DiskLiquid | CachedBatchType::DiskArrow => QueueKind::Disk,
163 };
164
165 inner.upsert_into_queue(*entry_id, target);
166 }
167
168 fn find_memory_victim(&self, cnt: usize) -> Vec<EntryID> {
169 if cnt == 0 {
170 return vec![];
171 }
172
173 let mut inner = self.inner.lock().unwrap();
174 let mut victims = Vec::with_capacity(cnt);
175
176 while victims.len() < cnt {
177 if let Some(entry) = inner.pop_front(QueueKind::Arrow) {
178 victims.push(entry);
179 continue;
180 }
181
182 if let Some(entry) = inner.pop_front(QueueKind::Liquid) {
183 victims.push(entry);
184 continue;
185 }
186
187 if let Some(entry) = inner.pop_front(QueueKind::Squeezed) {
188 victims.push(entry);
189 continue;
190 }
191
192 break;
193 }
194
195 victims
196 }
197
198 fn find_disk_victim(&self, cnt: usize) -> Vec<EntryID> {
199 if cnt == 0 {
200 return vec![];
201 }
202
203 let mut inner = self.inner.lock().unwrap();
204 let mut victims = Vec::with_capacity(cnt);
205
206 while victims.len() < cnt {
207 let Some(entry) = inner.pop_front(QueueKind::Disk) else {
208 break;
209 };
210 victims.push(entry);
211 }
212
213 victims
214 }
215
216 fn notify_access(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
217
218 fn notify_remove(&self, entry_id: &EntryID) {
219 let mut inner = self.inner.lock().unwrap();
220 inner.remove(entry_id);
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227 use crate::cache::utils::EntryID;
228
229 fn entry(id: usize) -> EntryID {
230 id.into()
231 }
232
233 #[test]
234 fn test_fifo_within_each_queue() {
235 let policy = LiquidPolicy::new();
236
237 let arrow_a = entry(1);
238 let arrow_b = entry(2);
239 let liquid_a = entry(3);
240 let liquid_b = entry(4);
241
242 policy.notify_insert(&arrow_a, CachedBatchType::MemoryArrow);
243 policy.notify_insert(&arrow_b, CachedBatchType::MemoryArrow);
244 policy.notify_insert(&liquid_a, CachedBatchType::MemoryLiquid);
245 policy.notify_insert(&liquid_b, CachedBatchType::MemoryLiquid);
246
247 assert_eq!(policy.find_memory_victim(1), vec![arrow_a]);
248 assert_eq!(policy.find_memory_victim(2), vec![arrow_b, liquid_a]);
249 assert_eq!(policy.find_memory_victim(1), vec![liquid_b]);
250 }
251
252 #[test]
253 fn test_queue_priority_order() {
254 let policy = LiquidPolicy::new();
255
256 let arrow_entry = entry(1);
257 let liquid_entry = entry(2);
258 let hybrid_entry = entry(3);
259
260 policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
261 policy.notify_insert(&hybrid_entry, CachedBatchType::MemorySqueezedLiquid);
262 policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
263
264 let victims = policy.find_memory_victim(5);
266 assert_eq!(victims, vec![arrow_entry, liquid_entry, hybrid_entry]);
267 }
268
269 #[test]
270 fn test_zero_victim_request_returns_empty() {
271 let policy = LiquidPolicy::new();
272
273 policy.notify_insert(&entry(1), CachedBatchType::MemoryArrow);
274 assert!(policy.find_memory_victim(0).is_empty());
275 }
276
277 #[test]
278 fn test_disk_entries_not_evicted() {
279 let policy = LiquidPolicy::new();
280
281 let disk_entry = entry(1);
282 let arrow_entry = entry(2);
283 let liquid_entry = entry(3);
284
285 policy.notify_insert(&disk_entry, CachedBatchType::DiskArrow);
286 policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
287 policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
288
289 let victims = policy.find_memory_victim(5);
290 assert_eq!(victims, vec![arrow_entry, liquid_entry]);
291
292 assert!(policy.find_memory_victim(1).is_empty());
294 }
295
296 #[test]
297 fn test_disk_victims_and_remove() {
298 let policy = LiquidPolicy::new();
299 let disk_old = entry(1);
300 let disk_new = entry(2);
301
302 policy.notify_insert(&disk_old, CachedBatchType::DiskArrow);
303 policy.notify_insert(&disk_new, CachedBatchType::DiskLiquid);
304
305 assert_eq!(policy.find_disk_victim(1), vec![disk_old]);
306 policy.notify_remove(&disk_new);
307 assert!(policy.find_disk_victim(1).is_empty());
308 }
309
310 #[test]
311 fn test_reinsert_moves_entry_to_back_of_queue() {
312 let policy = LiquidPolicy::new();
313
314 let first = entry(1);
315 let second = entry(2);
316
317 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
318 policy.notify_insert(&second, CachedBatchType::MemoryArrow);
319
320 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
322
323 assert_eq!(policy.find_memory_victim(1), vec![second]);
324 assert_eq!(policy.find_memory_victim(1), vec![first]);
325 }
326
327 #[test]
328 fn test_reinsert_handles_cross_queue_move() {
329 let policy = LiquidPolicy::new();
330
331 let entry_id = entry(42);
332
333 policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
334 policy.notify_insert(&entry_id, CachedBatchType::MemoryLiquid);
335
336 let victims = policy.find_memory_victim(2);
337 assert_eq!(victims, vec![entry_id]);
338 }
339}