Skip to main content

liquid_cache/cache/policies/cache/
three_queue.rs

1use std::{collections::HashMap, ptr::NonNull};
2
3use crate::{
4    cache::{CachePolicy, EntryID, cached_batch::CachedBatchType},
5    sync::Mutex,
6};
7
8use super::doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11enum QueueKind {
12    Arrow,
13    Liquid,
14    Squeezed,
15    Disk,
16}
17
18#[derive(Debug)]
19struct QueueNode {
20    entry_id: EntryID,
21    queue: QueueKind,
22}
23
24type NodePtr = NonNull<DoublyLinkedNode<QueueNode>>;
25
26#[derive(Default, Debug)]
27struct LiquidQueueInternalState {
28    map: HashMap<EntryID, NodePtr>,
29    arrow: DoublyLinkedList<QueueNode>,
30    liquid: DoublyLinkedList<QueueNode>,
31    squeezed: DoublyLinkedList<QueueNode>,
32    disk: DoublyLinkedList<QueueNode>,
33}
34
35impl LiquidQueueInternalState {
36    unsafe fn list_mut(&mut self, queue: QueueKind) -> &mut DoublyLinkedList<QueueNode> {
37        match queue {
38            QueueKind::Arrow => &mut self.arrow,
39            QueueKind::Liquid => &mut self.liquid,
40            QueueKind::Squeezed => &mut self.squeezed,
41            QueueKind::Disk => &mut self.disk,
42        }
43    }
44
45    unsafe fn push_back(&mut self, queue: QueueKind, mut node_ptr: NodePtr) {
46        unsafe {
47            node_ptr.as_mut().data.queue = queue;
48            self.list_mut(queue).push_back(node_ptr);
49        }
50    }
51
52    unsafe fn detach(&mut self, node_ptr: NodePtr) {
53        unsafe {
54            let queue = node_ptr.as_ref().data.queue;
55            self.list_mut(queue).unlink(node_ptr);
56        }
57    }
58
59    fn upsert_into_queue(&mut self, entry_id: EntryID, target: QueueKind) {
60        if let Some(node_ptr) = self.map.get(&entry_id).copied() {
61            unsafe {
62                self.detach(node_ptr);
63                self.push_back(target, node_ptr);
64            }
65            return;
66        }
67
68        let node = DoublyLinkedNode::new(QueueNode {
69            entry_id,
70            queue: target,
71        });
72        let node_ptr = NonNull::from(Box::leak(node));
73
74        self.map.insert(entry_id, node_ptr);
75        unsafe {
76            self.push_back(target, node_ptr);
77        }
78    }
79
80    fn pop_front(&mut self, queue: QueueKind) -> Option<EntryID> {
81        let list = match queue {
82            QueueKind::Arrow => &mut self.arrow,
83            QueueKind::Liquid => &mut self.liquid,
84            QueueKind::Squeezed => &mut self.squeezed,
85            QueueKind::Disk => &mut self.disk,
86        };
87
88        let head_ptr = list.head()?;
89        let entry_id = unsafe { head_ptr.as_ref().data.entry_id };
90        let node_ptr = self
91            .map
92            .remove(&entry_id)
93            .expect("list head must exist in map");
94        unsafe {
95            list.unlink(node_ptr);
96            drop_boxed_node(node_ptr);
97        }
98        Some(entry_id)
99    }
100
101    fn remove(&mut self, entry_id: &EntryID) -> Option<EntryID> {
102        let node_ptr = self.map.remove(entry_id)?;
103        let removed = unsafe { node_ptr.as_ref().data.entry_id };
104        unsafe {
105            self.detach(node_ptr);
106            drop_boxed_node(node_ptr);
107        }
108        Some(removed)
109    }
110}
111
112impl Drop for LiquidQueueInternalState {
113    fn drop(&mut self) {
114        let nodes: Vec<_> = self.map.drain().map(|(_, ptr)| ptr).collect();
115        for node_ptr in nodes {
116            unsafe {
117                match node_ptr.as_ref().data.queue {
118                    QueueKind::Arrow => self.arrow.unlink(node_ptr),
119                    QueueKind::Liquid => self.liquid.unlink(node_ptr),
120                    QueueKind::Squeezed => self.squeezed.unlink(node_ptr),
121                    QueueKind::Disk => self.disk.unlink(node_ptr),
122                }
123                drop_boxed_node(node_ptr);
124            }
125        }
126
127        unsafe {
128            self.arrow.drop_all();
129            self.liquid.drop_all();
130            self.squeezed.drop_all();
131            self.disk.drop_all();
132        }
133    }
134}
135
136/// Cache policy that keeps independent FIFO queues per batch type.
137#[derive(Debug, Default)]
138pub struct LiquidPolicy {
139    inner: Mutex<LiquidQueueInternalState>,
140}
141
142impl LiquidPolicy {
143    /// Create a new [`LiquidPolicy`].
144    pub fn new() -> Self {
145        Self {
146            inner: Mutex::new(LiquidQueueInternalState::default()),
147        }
148    }
149}
150
151// SAFETY: Access to raw pointers is protected by the internal `Mutex`.
152unsafe impl Send for LiquidPolicy {}
153unsafe impl Sync for LiquidPolicy {}
154
155impl CachePolicy for LiquidPolicy {
156    fn notify_insert(&self, entry_id: &EntryID, batch_type: CachedBatchType) {
157        let mut inner = self.inner.lock().unwrap();
158        let target = match batch_type {
159            CachedBatchType::MemoryArrow => QueueKind::Arrow,
160            CachedBatchType::MemoryLiquid => QueueKind::Liquid,
161            CachedBatchType::MemorySqueezedLiquid => QueueKind::Squeezed,
162            CachedBatchType::DiskLiquid | CachedBatchType::DiskArrow => QueueKind::Disk,
163        };
164
165        inner.upsert_into_queue(*entry_id, target);
166    }
167
168    fn find_memory_victim(&self, cnt: usize) -> Vec<EntryID> {
169        if cnt == 0 {
170            return vec![];
171        }
172
173        let mut inner = self.inner.lock().unwrap();
174        let mut victims = Vec::with_capacity(cnt);
175
176        while victims.len() < cnt {
177            if let Some(entry) = inner.pop_front(QueueKind::Arrow) {
178                victims.push(entry);
179                continue;
180            }
181
182            if let Some(entry) = inner.pop_front(QueueKind::Liquid) {
183                victims.push(entry);
184                continue;
185            }
186
187            if let Some(entry) = inner.pop_front(QueueKind::Squeezed) {
188                victims.push(entry);
189                continue;
190            }
191
192            break;
193        }
194
195        victims
196    }
197
198    fn find_disk_victim(&self, cnt: usize) -> Vec<EntryID> {
199        if cnt == 0 {
200            return vec![];
201        }
202
203        let mut inner = self.inner.lock().unwrap();
204        let mut victims = Vec::with_capacity(cnt);
205
206        while victims.len() < cnt {
207            let Some(entry) = inner.pop_front(QueueKind::Disk) else {
208                break;
209            };
210            victims.push(entry);
211        }
212
213        victims
214    }
215
216    fn notify_access(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
217
218    fn notify_remove(&self, entry_id: &EntryID) {
219        let mut inner = self.inner.lock().unwrap();
220        inner.remove(entry_id);
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227    use crate::cache::utils::EntryID;
228
229    fn entry(id: usize) -> EntryID {
230        id.into()
231    }
232
233    #[test]
234    fn test_fifo_within_each_queue() {
235        let policy = LiquidPolicy::new();
236
237        let arrow_a = entry(1);
238        let arrow_b = entry(2);
239        let liquid_a = entry(3);
240        let liquid_b = entry(4);
241
242        policy.notify_insert(&arrow_a, CachedBatchType::MemoryArrow);
243        policy.notify_insert(&arrow_b, CachedBatchType::MemoryArrow);
244        policy.notify_insert(&liquid_a, CachedBatchType::MemoryLiquid);
245        policy.notify_insert(&liquid_b, CachedBatchType::MemoryLiquid);
246
247        assert_eq!(policy.find_memory_victim(1), vec![arrow_a]);
248        assert_eq!(policy.find_memory_victim(2), vec![arrow_b, liquid_a]);
249        assert_eq!(policy.find_memory_victim(1), vec![liquid_b]);
250    }
251
252    #[test]
253    fn test_queue_priority_order() {
254        let policy = LiquidPolicy::new();
255
256        let arrow_entry = entry(1);
257        let liquid_entry = entry(2);
258        let hybrid_entry = entry(3);
259
260        policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
261        policy.notify_insert(&hybrid_entry, CachedBatchType::MemorySqueezedLiquid);
262        policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
263
264        // Request more victims than available to ensure we only get what exists.
265        let victims = policy.find_memory_victim(5);
266        assert_eq!(victims, vec![arrow_entry, liquid_entry, hybrid_entry]);
267    }
268
269    #[test]
270    fn test_zero_victim_request_returns_empty() {
271        let policy = LiquidPolicy::new();
272
273        policy.notify_insert(&entry(1), CachedBatchType::MemoryArrow);
274        assert!(policy.find_memory_victim(0).is_empty());
275    }
276
277    #[test]
278    fn test_disk_entries_not_evicted() {
279        let policy = LiquidPolicy::new();
280
281        let disk_entry = entry(1);
282        let arrow_entry = entry(2);
283        let liquid_entry = entry(3);
284
285        policy.notify_insert(&disk_entry, CachedBatchType::DiskArrow);
286        policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
287        policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
288
289        let victims = policy.find_memory_victim(5);
290        assert_eq!(victims, vec![arrow_entry, liquid_entry]);
291
292        // Only the disk entry remains and should still not be evicted.
293        assert!(policy.find_memory_victim(1).is_empty());
294    }
295
296    #[test]
297    fn test_disk_victims_and_remove() {
298        let policy = LiquidPolicy::new();
299        let disk_old = entry(1);
300        let disk_new = entry(2);
301
302        policy.notify_insert(&disk_old, CachedBatchType::DiskArrow);
303        policy.notify_insert(&disk_new, CachedBatchType::DiskLiquid);
304
305        assert_eq!(policy.find_disk_victim(1), vec![disk_old]);
306        policy.notify_remove(&disk_new);
307        assert!(policy.find_disk_victim(1).is_empty());
308    }
309
310    #[test]
311    fn test_reinsert_moves_entry_to_back_of_queue() {
312        let policy = LiquidPolicy::new();
313
314        let first = entry(1);
315        let second = entry(2);
316
317        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
318        policy.notify_insert(&second, CachedBatchType::MemoryArrow);
319
320        // Reinserting should refresh the entry as the newest arrow batch.
321        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
322
323        assert_eq!(policy.find_memory_victim(1), vec![second]);
324        assert_eq!(policy.find_memory_victim(1), vec![first]);
325    }
326
327    #[test]
328    fn test_reinsert_handles_cross_queue_move() {
329        let policy = LiquidPolicy::new();
330
331        let entry_id = entry(42);
332
333        policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
334        policy.notify_insert(&entry_id, CachedBatchType::MemoryLiquid);
335
336        let victims = policy.find_memory_victim(2);
337        assert_eq!(victims, vec![entry_id]);
338    }
339}