liquid_cache_storage/cache/cache_policies/
sieve.rs1use std::{collections::HashMap, fmt, ptr::NonNull};
4
5use crate::{
6 cache::{cached_batch::CachedBatchType, utils::EntryID},
7 sync::Mutex,
8};
9
10use super::{
11 CachePolicy,
12 doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node},
13};
14
15#[derive(Debug)]
16struct SieveNode {
17 entry_id: EntryID,
18 visited: bool,
19}
20
21type NodePtr = NonNull<DoublyLinkedNode<SieveNode>>;
22
23#[derive(Debug, Default)]
24struct SieveInternalState {
25 map: HashMap<EntryID, NodePtr>,
26 list: DoublyLinkedList<SieveNode>,
27 hand: Option<NodePtr>,
28 total_size: usize,
29}
30
31#[derive(Default)]
33pub struct SievePolicy {
34 state: Mutex<SieveInternalState>,
35}
36
37impl fmt::Debug for SievePolicy {
38 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39 f.debug_struct("SievePolicy")
40 .field("state", &self.state)
41 .finish()
42 }
43}
44
45impl SievePolicy {
46 pub fn new() -> Self {
48 Self {
49 state: Mutex::new(SieveInternalState::default()),
50 }
51 }
52
53 fn entry_size(&self, _entry_id: &EntryID) -> usize {
54 1
55 }
56}
57
58unsafe impl Send for SievePolicy {}
59unsafe impl Sync for SievePolicy {}
60
61impl CachePolicy for SievePolicy {
62 fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
63 let mut state = self.state.lock().unwrap();
64 let mut advices = Vec::with_capacity(cnt);
65 for _ in 0..cnt {
66 let hand_ptr = match state.hand {
67 Some(ptr) => Some(ptr),
68 None => state.list.tail(),
69 };
70 let mut hand_ptr = match hand_ptr {
71 Some(p) => p,
72 None => break,
73 };
74 loop {
75 if unsafe { hand_ptr.as_ref() }.data.visited {
76 unsafe { hand_ptr.as_mut() }.data.visited = false;
77 let prev = unsafe { hand_ptr.as_ref().prev };
78 let next_hand = prev
79 .or(state.list.tail())
80 .expect("non-empty list must have a tail");
81 hand_ptr = next_hand;
82 state.hand = Some(next_hand);
83 } else {
84 let victim_id = unsafe { hand_ptr.as_ref().data.entry_id };
85 let prev = unsafe { hand_ptr.as_ref().prev };
86 let node_ptr = state.map.remove(&victim_id).unwrap();
87 unsafe {
88 state.list.unlink(node_ptr);
89 drop_boxed_node(node_ptr);
90 }
91 state.total_size -= self.entry_size(&victim_id);
92 advices.push(victim_id);
93 state.hand = prev.or(state.list.tail());
94 break;
95 }
96 }
97 }
98 advices
99 }
100
101 fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
102 let mut state = self.state.lock().unwrap();
103 if state.map.contains_key(entry_id) {
104 if let Some(mut node_ptr) = state.map.get(entry_id).copied() {
105 unsafe {
106 node_ptr.as_mut().data.visited = true;
107 }
108 }
109 return;
110 }
111
112 let was_empty = state.list.head().is_none();
113 let node = DoublyLinkedNode::new(SieveNode {
114 entry_id: *entry_id,
115 visited: false,
116 });
117 let node_ptr = NonNull::from(Box::leak(node));
118 state.map.insert(*entry_id, node_ptr);
119 unsafe {
120 state.list.push_front(node_ptr);
121 }
122 if was_empty {
123 state.hand = Some(node_ptr);
124 }
125 state.total_size += self.entry_size(entry_id);
126 }
127
128 fn notify_access(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
129 let state = self.state.lock().unwrap();
130 if let Some(mut node_ptr) = state.map.get(entry_id).copied() {
131 unsafe {
132 node_ptr.as_mut().data.visited = true;
133 }
134 }
135 }
136}
137
138impl Drop for SievePolicy {
139 fn drop(&mut self) {
140 let mut state = self.state.lock().unwrap();
141 let handles: Vec<_> = state.map.drain().map(|(_, ptr)| ptr).collect();
142 for node_ptr in handles {
143 unsafe {
144 state.list.unlink(node_ptr);
145 drop_boxed_node(node_ptr);
146 }
147 }
148 unsafe {
149 state.list.drop_all();
150 }
151 state.hand = None;
152 state.total_size = 0;
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::cache::{
160 cached_batch::CachedBatchType,
161 utils::{EntryID, create_cache_store, create_test_arrow_array},
162 };
163
164 fn entry(id: usize) -> EntryID {
165 id.into()
166 }
167
168 fn assert_evict_advice(policy: &SievePolicy, expect_evict: EntryID) {
169 let advice = policy.find_victim(1);
170 assert_eq!(advice, vec![expect_evict]);
171 }
172
173 #[test]
174 fn test_sieve_insert_order() {
175 let policy = SievePolicy::new();
176 let e1 = entry(1);
177 let e2 = entry(2);
178 let e3 = entry(3);
179
180 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
181 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
182 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
183
184 assert_evict_advice(&policy, e1);
185 }
186
187 #[test]
188 fn test_sieve_access_sets_visited() {
189 let policy = SievePolicy::new();
190 let e1 = entry(1);
191 let e2 = entry(2);
192 let e3 = entry(3);
193
194 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
195 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
196 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
197
198 policy.notify_access(&e1, CachedBatchType::MemoryArrow);
199 assert_evict_advice(&policy, e2);
200 }
201
202 #[test]
203 fn test_sieve_reinsert_marks_visited() {
204 let policy = SievePolicy::new();
205 let e1 = entry(1);
206 let e2 = entry(2);
207
208 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
209 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
210
211 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
212
213 assert_evict_advice(&policy, e2);
214 }
215
216 #[test]
217 fn test_sieve_reinsert_sets_visited_flag() {
218 let policy = SievePolicy::new();
219 let e1 = entry(1);
220
221 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
222
223 {
224 let state = policy.state.lock().unwrap();
225 let mut node_ptr = state.map.get(&e1).copied().unwrap();
226 unsafe {
227 node_ptr.as_mut().data.visited = false;
228 }
229 }
230
231 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
232
233 let state = policy.state.lock().unwrap();
234 let node_ptr = state.map.get(&e1).copied().unwrap();
235 unsafe {
236 assert!(node_ptr.as_ref().data.visited);
237 }
238 assert_eq!(state.map.len(), 1);
239 }
240
241 #[test]
242 fn test_sieve_advise_empty() {
243 let policy = SievePolicy::new();
244 assert_eq!(policy.find_victim(1), vec![]);
245 }
246
247 #[test]
248 fn test_sieve_with_sizeof_closure_defined() {
249 let policy = SievePolicy::new();
250
251 let e1 = entry(1);
252 let e2 = entry(2);
253 let e3 = entry(11);
254
255 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
256 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
257 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
258
259 let state = policy.state.lock().unwrap();
260 assert_eq!(state.total_size, 3);
261 }
262
263 #[test]
264 fn test_sieve_sizeof_without_closure() {
265 let policy = SievePolicy::new();
266
267 let e1 = entry(1);
268 let e2 = entry(2);
269 let e3 = entry(11);
270
271 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
272 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
273 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
274
275 let state = policy.state.lock().unwrap();
276 assert_eq!(state.total_size, 3);
277 }
278
279 #[tokio::test]
280 async fn test_sieve_integration() {
281 let advisor = SievePolicy::new();
282 let store = create_cache_store(3000, Box::new(advisor));
283
284 let entry_id1 = EntryID::from(1);
285 let entry_id2 = EntryID::from(2);
286 let entry_id3 = EntryID::from(3);
287
288 store.insert(entry_id1, create_test_arrow_array(100)).await;
289 store.insert(entry_id2, create_test_arrow_array(100)).await;
290 store.insert(entry_id3, create_test_arrow_array(100)).await;
291 assert!(store.index().get(&entry_id1).is_some());
292 assert!(store.index().get(&entry_id2).is_some());
293 assert!(store.index().get(&entry_id3).is_some());
294 }
295}