liquid_cache_storage/cache/cache_policies/
three_queue.rs1use std::{collections::HashMap, ptr::NonNull};
2
3use crate::{
4 cache::{CachePolicy, EntryID, cached_batch::CachedBatchType},
5 sync::Mutex,
6};
7
8use super::doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11enum QueueKind {
12 Arrow,
13 Liquid,
14 Hybrid,
15 Disk,
16}
17
18#[derive(Debug)]
19struct QueueNode {
20 entry_id: EntryID,
21 queue: QueueKind,
22}
23
24type NodePtr = NonNull<DoublyLinkedNode<QueueNode>>;
25
26#[derive(Default, Debug)]
27struct LiquidQueueInternalState {
28 map: HashMap<EntryID, NodePtr>,
29 arrow: DoublyLinkedList<QueueNode>,
30 liquid: DoublyLinkedList<QueueNode>,
31 hybrid: DoublyLinkedList<QueueNode>,
32 disk: DoublyLinkedList<QueueNode>,
33}
34
35impl LiquidQueueInternalState {
36 unsafe fn list_mut(&mut self, queue: QueueKind) -> &mut DoublyLinkedList<QueueNode> {
37 match queue {
38 QueueKind::Arrow => &mut self.arrow,
39 QueueKind::Liquid => &mut self.liquid,
40 QueueKind::Hybrid => &mut self.hybrid,
41 QueueKind::Disk => &mut self.disk,
42 }
43 }
44
45 unsafe fn push_back(&mut self, queue: QueueKind, mut node_ptr: NodePtr) {
46 unsafe {
47 node_ptr.as_mut().data.queue = queue;
48 self.list_mut(queue).push_back(node_ptr);
49 }
50 }
51
52 unsafe fn detach(&mut self, node_ptr: NodePtr) {
53 unsafe {
54 let queue = node_ptr.as_ref().data.queue;
55 self.list_mut(queue).unlink(node_ptr);
56 }
57 }
58
59 fn upsert_into_queue(&mut self, entry_id: EntryID, target: QueueKind) {
60 if let Some(node_ptr) = self.map.get(&entry_id).copied() {
61 unsafe {
62 self.detach(node_ptr);
63 self.push_back(target, node_ptr);
64 }
65 return;
66 }
67
68 let node = DoublyLinkedNode::new(QueueNode {
69 entry_id,
70 queue: target,
71 });
72 let node_ptr = NonNull::from(Box::leak(node));
73
74 self.map.insert(entry_id, node_ptr);
75 unsafe {
76 self.push_back(target, node_ptr);
77 }
78 }
79
80 fn pop_front(&mut self, queue: QueueKind) -> Option<EntryID> {
81 let list = match queue {
82 QueueKind::Arrow => &mut self.arrow,
83 QueueKind::Liquid => &mut self.liquid,
84 QueueKind::Hybrid => &mut self.hybrid,
85 QueueKind::Disk => &mut self.disk,
86 };
87
88 let head_ptr = list.head()?;
89 let entry_id = unsafe { head_ptr.as_ref().data.entry_id };
90 let node_ptr = self
91 .map
92 .remove(&entry_id)
93 .expect("list head must exist in map");
94 unsafe {
95 list.unlink(node_ptr);
96 drop_boxed_node(node_ptr);
97 }
98 Some(entry_id)
99 }
100}
101
102impl Drop for LiquidQueueInternalState {
103 fn drop(&mut self) {
104 let nodes: Vec<_> = self.map.drain().map(|(_, ptr)| ptr).collect();
105 for node_ptr in nodes {
106 unsafe {
107 match node_ptr.as_ref().data.queue {
108 QueueKind::Arrow => self.arrow.unlink(node_ptr),
109 QueueKind::Liquid => self.liquid.unlink(node_ptr),
110 QueueKind::Hybrid => self.hybrid.unlink(node_ptr),
111 QueueKind::Disk => self.disk.unlink(node_ptr),
112 }
113 drop_boxed_node(node_ptr);
114 }
115 }
116
117 unsafe {
118 self.arrow.drop_all();
119 self.liquid.drop_all();
120 self.hybrid.drop_all();
121 self.disk.drop_all();
122 }
123 }
124}
125
126#[derive(Debug, Default)]
128pub struct LiquidPolicy {
129 inner: Mutex<LiquidQueueInternalState>,
130}
131
132impl LiquidPolicy {
133 pub fn new() -> Self {
135 Self {
136 inner: Mutex::new(LiquidQueueInternalState::default()),
137 }
138 }
139}
140
141unsafe impl Send for LiquidPolicy {}
143unsafe impl Sync for LiquidPolicy {}
144
145impl CachePolicy for LiquidPolicy {
146 fn notify_insert(&self, entry_id: &EntryID, batch_type: CachedBatchType) {
147 let mut inner = self.inner.lock().unwrap();
148 let target = match batch_type {
149 CachedBatchType::MemoryArrow => QueueKind::Arrow,
150 CachedBatchType::MemoryLiquid => QueueKind::Liquid,
151 CachedBatchType::MemoryHybridLiquid => QueueKind::Hybrid,
152 CachedBatchType::DiskLiquid | CachedBatchType::DiskArrow => QueueKind::Disk,
153 };
154
155 inner.upsert_into_queue(*entry_id, target);
156 }
157
158 fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
159 if cnt == 0 {
160 return vec![];
161 }
162
163 let mut inner = self.inner.lock().unwrap();
164 let mut victims = Vec::with_capacity(cnt);
165
166 while victims.len() < cnt {
167 if let Some(entry) = inner.pop_front(QueueKind::Arrow) {
168 victims.push(entry);
169 continue;
170 }
171
172 if let Some(entry) = inner.pop_front(QueueKind::Liquid) {
173 victims.push(entry);
174 continue;
175 }
176
177 if let Some(entry) = inner.pop_front(QueueKind::Hybrid) {
178 victims.push(entry);
179 continue;
180 }
181
182 break;
183 }
184
185 victims
186 }
187
188 fn notify_access(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use crate::cache::utils::EntryID;
195
196 fn entry(id: usize) -> EntryID {
197 id.into()
198 }
199
200 #[test]
201 fn test_fifo_within_each_queue() {
202 let policy = LiquidPolicy::new();
203
204 let arrow_a = entry(1);
205 let arrow_b = entry(2);
206 let liquid_a = entry(3);
207 let liquid_b = entry(4);
208
209 policy.notify_insert(&arrow_a, CachedBatchType::MemoryArrow);
210 policy.notify_insert(&arrow_b, CachedBatchType::MemoryArrow);
211 policy.notify_insert(&liquid_a, CachedBatchType::MemoryLiquid);
212 policy.notify_insert(&liquid_b, CachedBatchType::MemoryLiquid);
213
214 assert_eq!(policy.find_victim(1), vec![arrow_a]);
215 assert_eq!(policy.find_victim(2), vec![arrow_b, liquid_a]);
216 assert_eq!(policy.find_victim(1), vec![liquid_b]);
217 }
218
219 #[test]
220 fn test_queue_priority_order() {
221 let policy = LiquidPolicy::new();
222
223 let arrow_entry = entry(1);
224 let liquid_entry = entry(2);
225 let hybrid_entry = entry(3);
226
227 policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
228 policy.notify_insert(&hybrid_entry, CachedBatchType::MemoryHybridLiquid);
229 policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
230
231 let victims = policy.find_victim(5);
233 assert_eq!(victims, vec![arrow_entry, liquid_entry, hybrid_entry]);
234 }
235
236 #[test]
237 fn test_zero_victim_request_returns_empty() {
238 let policy = LiquidPolicy::new();
239
240 policy.notify_insert(&entry(1), CachedBatchType::MemoryArrow);
241 assert!(policy.find_victim(0).is_empty());
242 }
243
244 #[test]
245 fn test_disk_entries_not_evicted() {
246 let policy = LiquidPolicy::new();
247
248 let disk_entry = entry(1);
249 let arrow_entry = entry(2);
250 let liquid_entry = entry(3);
251
252 policy.notify_insert(&disk_entry, CachedBatchType::DiskArrow);
253 policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
254 policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
255
256 let victims = policy.find_victim(5);
257 assert_eq!(victims, vec![arrow_entry, liquid_entry]);
258
259 assert!(policy.find_victim(1).is_empty());
261 }
262
263 #[test]
264 fn test_reinsert_moves_entry_to_back_of_queue() {
265 let policy = LiquidPolicy::new();
266
267 let first = entry(1);
268 let second = entry(2);
269
270 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
271 policy.notify_insert(&second, CachedBatchType::MemoryArrow);
272
273 policy.notify_insert(&first, CachedBatchType::MemoryArrow);
275
276 assert_eq!(policy.find_victim(1), vec![second]);
277 assert_eq!(policy.find_victim(1), vec![first]);
278 }
279
280 #[test]
281 fn test_reinsert_handles_cross_queue_move() {
282 let policy = LiquidPolicy::new();
283
284 let entry_id = entry(42);
285
286 policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
287 policy.notify_insert(&entry_id, CachedBatchType::MemoryLiquid);
288
289 let victims = policy.find_victim(2);
290 assert_eq!(victims, vec![entry_id]);
291 }
292}