liquid_cache_storage/cache/cache_policies/
three_queue.rs

1use std::{collections::HashMap, ptr::NonNull};
2
3use crate::{
4    cache::{CachePolicy, EntryID, cached_batch::CachedBatchType},
5    sync::Mutex,
6};
7
8use super::doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node};
9
10#[derive(Clone, Copy, Debug, PartialEq, Eq)]
11enum QueueKind {
12    Arrow,
13    Liquid,
14    Hybrid,
15    Disk,
16}
17
18#[derive(Debug)]
19struct QueueNode {
20    entry_id: EntryID,
21    queue: QueueKind,
22}
23
24type NodePtr = NonNull<DoublyLinkedNode<QueueNode>>;
25
26#[derive(Default, Debug)]
27struct LiquidQueueInternalState {
28    map: HashMap<EntryID, NodePtr>,
29    arrow: DoublyLinkedList<QueueNode>,
30    liquid: DoublyLinkedList<QueueNode>,
31    hybrid: DoublyLinkedList<QueueNode>,
32    disk: DoublyLinkedList<QueueNode>,
33}
34
35impl LiquidQueueInternalState {
36    unsafe fn list_mut(&mut self, queue: QueueKind) -> &mut DoublyLinkedList<QueueNode> {
37        match queue {
38            QueueKind::Arrow => &mut self.arrow,
39            QueueKind::Liquid => &mut self.liquid,
40            QueueKind::Hybrid => &mut self.hybrid,
41            QueueKind::Disk => &mut self.disk,
42        }
43    }
44
45    unsafe fn push_back(&mut self, queue: QueueKind, mut node_ptr: NodePtr) {
46        unsafe {
47            node_ptr.as_mut().data.queue = queue;
48            self.list_mut(queue).push_back(node_ptr);
49        }
50    }
51
52    unsafe fn detach(&mut self, node_ptr: NodePtr) {
53        unsafe {
54            let queue = node_ptr.as_ref().data.queue;
55            self.list_mut(queue).unlink(node_ptr);
56        }
57    }
58
59    fn upsert_into_queue(&mut self, entry_id: EntryID, target: QueueKind) {
60        if let Some(node_ptr) = self.map.get(&entry_id).copied() {
61            unsafe {
62                self.detach(node_ptr);
63                self.push_back(target, node_ptr);
64            }
65            return;
66        }
67
68        let node = DoublyLinkedNode::new(QueueNode {
69            entry_id,
70            queue: target,
71        });
72        let node_ptr = NonNull::from(Box::leak(node));
73
74        self.map.insert(entry_id, node_ptr);
75        unsafe {
76            self.push_back(target, node_ptr);
77        }
78    }
79
80    fn pop_front(&mut self, queue: QueueKind) -> Option<EntryID> {
81        let list = match queue {
82            QueueKind::Arrow => &mut self.arrow,
83            QueueKind::Liquid => &mut self.liquid,
84            QueueKind::Hybrid => &mut self.hybrid,
85            QueueKind::Disk => &mut self.disk,
86        };
87
88        let head_ptr = list.head()?;
89        let entry_id = unsafe { head_ptr.as_ref().data.entry_id };
90        let node_ptr = self
91            .map
92            .remove(&entry_id)
93            .expect("list head must exist in map");
94        unsafe {
95            list.unlink(node_ptr);
96            drop_boxed_node(node_ptr);
97        }
98        Some(entry_id)
99    }
100}
101
102impl Drop for LiquidQueueInternalState {
103    fn drop(&mut self) {
104        let nodes: Vec<_> = self.map.drain().map(|(_, ptr)| ptr).collect();
105        for node_ptr in nodes {
106            unsafe {
107                match node_ptr.as_ref().data.queue {
108                    QueueKind::Arrow => self.arrow.unlink(node_ptr),
109                    QueueKind::Liquid => self.liquid.unlink(node_ptr),
110                    QueueKind::Hybrid => self.hybrid.unlink(node_ptr),
111                    QueueKind::Disk => self.disk.unlink(node_ptr),
112                }
113                drop_boxed_node(node_ptr);
114            }
115        }
116
117        unsafe {
118            self.arrow.drop_all();
119            self.liquid.drop_all();
120            self.hybrid.drop_all();
121            self.disk.drop_all();
122        }
123    }
124}
125
126/// Cache policy that keeps independent FIFO queues per batch type.
127#[derive(Debug, Default)]
128pub struct LiquidPolicy {
129    inner: Mutex<LiquidQueueInternalState>,
130}
131
132impl LiquidPolicy {
133    /// Create a new [`LiquidPolicy`].
134    pub fn new() -> Self {
135        Self {
136            inner: Mutex::new(LiquidQueueInternalState::default()),
137        }
138    }
139}
140
141// SAFETY: Access to raw pointers is protected by the internal `Mutex`.
142unsafe impl Send for LiquidPolicy {}
143unsafe impl Sync for LiquidPolicy {}
144
145impl CachePolicy for LiquidPolicy {
146    fn notify_insert(&self, entry_id: &EntryID, batch_type: CachedBatchType) {
147        let mut inner = self.inner.lock().unwrap();
148        let target = match batch_type {
149            CachedBatchType::MemoryArrow => QueueKind::Arrow,
150            CachedBatchType::MemoryLiquid => QueueKind::Liquid,
151            CachedBatchType::MemoryHybridLiquid => QueueKind::Hybrid,
152            CachedBatchType::DiskLiquid | CachedBatchType::DiskArrow => QueueKind::Disk,
153        };
154
155        inner.upsert_into_queue(*entry_id, target);
156    }
157
158    fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
159        if cnt == 0 {
160            return vec![];
161        }
162
163        let mut inner = self.inner.lock().unwrap();
164        let mut victims = Vec::with_capacity(cnt);
165
166        while victims.len() < cnt {
167            if let Some(entry) = inner.pop_front(QueueKind::Arrow) {
168                victims.push(entry);
169                continue;
170            }
171
172            if let Some(entry) = inner.pop_front(QueueKind::Liquid) {
173                victims.push(entry);
174                continue;
175            }
176
177            if let Some(entry) = inner.pop_front(QueueKind::Hybrid) {
178                victims.push(entry);
179                continue;
180            }
181
182            break;
183        }
184
185        victims
186    }
187
188    fn notify_access(&self, _entry_id: &EntryID, _batch_type: CachedBatchType) {}
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use crate::cache::utils::EntryID;
195
196    fn entry(id: usize) -> EntryID {
197        id.into()
198    }
199
200    #[test]
201    fn test_fifo_within_each_queue() {
202        let policy = LiquidPolicy::new();
203
204        let arrow_a = entry(1);
205        let arrow_b = entry(2);
206        let liquid_a = entry(3);
207        let liquid_b = entry(4);
208
209        policy.notify_insert(&arrow_a, CachedBatchType::MemoryArrow);
210        policy.notify_insert(&arrow_b, CachedBatchType::MemoryArrow);
211        policy.notify_insert(&liquid_a, CachedBatchType::MemoryLiquid);
212        policy.notify_insert(&liquid_b, CachedBatchType::MemoryLiquid);
213
214        assert_eq!(policy.find_victim(1), vec![arrow_a]);
215        assert_eq!(policy.find_victim(2), vec![arrow_b, liquid_a]);
216        assert_eq!(policy.find_victim(1), vec![liquid_b]);
217    }
218
219    #[test]
220    fn test_queue_priority_order() {
221        let policy = LiquidPolicy::new();
222
223        let arrow_entry = entry(1);
224        let liquid_entry = entry(2);
225        let hybrid_entry = entry(3);
226
227        policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
228        policy.notify_insert(&hybrid_entry, CachedBatchType::MemoryHybridLiquid);
229        policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
230
231        // Request more victims than available to ensure we only get what exists.
232        let victims = policy.find_victim(5);
233        assert_eq!(victims, vec![arrow_entry, liquid_entry, hybrid_entry]);
234    }
235
236    #[test]
237    fn test_zero_victim_request_returns_empty() {
238        let policy = LiquidPolicy::new();
239
240        policy.notify_insert(&entry(1), CachedBatchType::MemoryArrow);
241        assert!(policy.find_victim(0).is_empty());
242    }
243
244    #[test]
245    fn test_disk_entries_not_evicted() {
246        let policy = LiquidPolicy::new();
247
248        let disk_entry = entry(1);
249        let arrow_entry = entry(2);
250        let liquid_entry = entry(3);
251
252        policy.notify_insert(&disk_entry, CachedBatchType::DiskArrow);
253        policy.notify_insert(&arrow_entry, CachedBatchType::MemoryArrow);
254        policy.notify_insert(&liquid_entry, CachedBatchType::MemoryLiquid);
255
256        let victims = policy.find_victim(5);
257        assert_eq!(victims, vec![arrow_entry, liquid_entry]);
258
259        // Only the disk entry remains and should still not be evicted.
260        assert!(policy.find_victim(1).is_empty());
261    }
262
263    #[test]
264    fn test_reinsert_moves_entry_to_back_of_queue() {
265        let policy = LiquidPolicy::new();
266
267        let first = entry(1);
268        let second = entry(2);
269
270        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
271        policy.notify_insert(&second, CachedBatchType::MemoryArrow);
272
273        // Reinserting should refresh the entry as the newest arrow batch.
274        policy.notify_insert(&first, CachedBatchType::MemoryArrow);
275
276        assert_eq!(policy.find_victim(1), vec![second]);
277        assert_eq!(policy.find_victim(1), vec![first]);
278    }
279
280    #[test]
281    fn test_reinsert_handles_cross_queue_move() {
282        let policy = LiquidPolicy::new();
283
284        let entry_id = entry(42);
285
286        policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
287        policy.notify_insert(&entry_id, CachedBatchType::MemoryLiquid);
288
289        let victims = policy.find_victim(2);
290        assert_eq!(victims, vec![entry_id]);
291    }
292}