liquid_cache_storage/cache/cache_policies/
sieve.rs

1//! SIEVE cache policy implementation.
2
3use std::{collections::HashMap, fmt, ptr::NonNull};
4
5use crate::{
6    cache::{cached_batch::CachedBatchType, utils::EntryID},
7    sync::Mutex,
8};
9
10use super::{
11    CachePolicy,
12    doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node},
13};
14
15#[derive(Debug)]
16struct SieveNode {
17    entry_id: EntryID,
18    visited: bool,
19}
20
21type NodePtr = NonNull<DoublyLinkedNode<SieveNode>>;
22
23#[derive(Debug, Default)]
24struct SieveInternalState {
25    map: HashMap<EntryID, NodePtr>,
26    list: DoublyLinkedList<SieveNode>,
27    hand: Option<NodePtr>,
28    total_size: usize,
29}
30
31/// The policy that implements object size aware SIEVE algorithm using a HashMap and a doubly linked list.
32#[derive(Default)]
33pub struct SievePolicy {
34    state: Mutex<SieveInternalState>,
35}
36
37impl fmt::Debug for SievePolicy {
38    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
39        f.debug_struct("SievePolicy")
40            .field("state", &self.state)
41            .finish()
42    }
43}
44
45impl SievePolicy {
46    /// Create a new [`SievePolicy`].
47    pub fn new() -> Self {
48        Self {
49            state: Mutex::new(SieveInternalState::default()),
50        }
51    }
52
53    fn entry_size(&self, _entry_id: &EntryID) -> usize {
54        1
55    }
56}
57
58unsafe impl Send for SievePolicy {}
59unsafe impl Sync for SievePolicy {}
60
61impl CachePolicy for SievePolicy {
62    fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
63        let mut state = self.state.lock().unwrap();
64        let mut advices = Vec::with_capacity(cnt);
65        for _ in 0..cnt {
66            let hand_ptr = match state.hand {
67                Some(ptr) => Some(ptr),
68                None => state.list.tail(),
69            };
70            let mut hand_ptr = match hand_ptr {
71                Some(p) => p,
72                None => break,
73            };
74            loop {
75                if unsafe { hand_ptr.as_ref() }.data.visited {
76                    unsafe { hand_ptr.as_mut() }.data.visited = false;
77                    let prev = unsafe { hand_ptr.as_ref().prev };
78                    let next_hand = prev
79                        .or(state.list.tail())
80                        .expect("non-empty list must have a tail");
81                    hand_ptr = next_hand;
82                    state.hand = Some(next_hand);
83                } else {
84                    let victim_id = unsafe { hand_ptr.as_ref().data.entry_id };
85                    let prev = unsafe { hand_ptr.as_ref().prev };
86                    let node_ptr = state.map.remove(&victim_id).unwrap();
87                    unsafe {
88                        state.list.unlink(node_ptr);
89                        drop_boxed_node(node_ptr);
90                    }
91                    state.total_size -= self.entry_size(&victim_id);
92                    advices.push(victim_id);
93                    state.hand = prev.or(state.list.tail());
94                    break;
95                }
96            }
97        }
98        advices
99    }
100
101    fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
102        let mut state = self.state.lock().unwrap();
103        if state.map.contains_key(entry_id) {
104            if let Some(mut node_ptr) = state.map.get(entry_id).copied() {
105                unsafe {
106                    node_ptr.as_mut().data.visited = true;
107                }
108            }
109            return;
110        }
111
112        let was_empty = state.list.head().is_none();
113        let node = DoublyLinkedNode::new(SieveNode {
114            entry_id: *entry_id,
115            visited: false,
116        });
117        let node_ptr = NonNull::from(Box::leak(node));
118        state.map.insert(*entry_id, node_ptr);
119        unsafe {
120            state.list.push_front(node_ptr);
121        }
122        if was_empty {
123            state.hand = Some(node_ptr);
124        }
125        state.total_size += self.entry_size(entry_id);
126    }
127
128    fn notify_access(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
129        let state = self.state.lock().unwrap();
130        if let Some(mut node_ptr) = state.map.get(entry_id).copied() {
131            unsafe {
132                node_ptr.as_mut().data.visited = true;
133            }
134        }
135    }
136}
137
138impl Drop for SievePolicy {
139    fn drop(&mut self) {
140        let mut state = self.state.lock().unwrap();
141        let handles: Vec<_> = state.map.drain().map(|(_, ptr)| ptr).collect();
142        for node_ptr in handles {
143            unsafe {
144                state.list.unlink(node_ptr);
145                drop_boxed_node(node_ptr);
146            }
147        }
148        unsafe {
149            state.list.drop_all();
150        }
151        state.hand = None;
152        state.total_size = 0;
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::cache::{
160        cached_batch::CachedBatchType,
161        utils::{EntryID, create_cache_store, create_test_arrow_array},
162    };
163
164    fn entry(id: usize) -> EntryID {
165        id.into()
166    }
167
168    fn assert_evict_advice(policy: &SievePolicy, expect_evict: EntryID) {
169        let advice = policy.find_victim(1);
170        assert_eq!(advice, vec![expect_evict]);
171    }
172
173    #[test]
174    fn test_sieve_insert_order() {
175        let policy = SievePolicy::new();
176        let e1 = entry(1);
177        let e2 = entry(2);
178        let e3 = entry(3);
179
180        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
181        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
182        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
183
184        assert_evict_advice(&policy, e1);
185    }
186
187    #[test]
188    fn test_sieve_access_sets_visited() {
189        let policy = SievePolicy::new();
190        let e1 = entry(1);
191        let e2 = entry(2);
192        let e3 = entry(3);
193
194        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
195        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
196        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
197
198        policy.notify_access(&e1, CachedBatchType::MemoryArrow);
199        assert_evict_advice(&policy, e2);
200    }
201
202    #[test]
203    fn test_sieve_reinsert_marks_visited() {
204        let policy = SievePolicy::new();
205        let e1 = entry(1);
206        let e2 = entry(2);
207
208        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
209        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
210
211        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
212
213        assert_evict_advice(&policy, e2);
214    }
215
216    #[test]
217    fn test_sieve_reinsert_sets_visited_flag() {
218        let policy = SievePolicy::new();
219        let e1 = entry(1);
220
221        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
222
223        {
224            let state = policy.state.lock().unwrap();
225            let mut node_ptr = state.map.get(&e1).copied().unwrap();
226            unsafe {
227                node_ptr.as_mut().data.visited = false;
228            }
229        }
230
231        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
232
233        let state = policy.state.lock().unwrap();
234        let node_ptr = state.map.get(&e1).copied().unwrap();
235        unsafe {
236            assert!(node_ptr.as_ref().data.visited);
237        }
238        assert_eq!(state.map.len(), 1);
239    }
240
241    #[test]
242    fn test_sieve_advise_empty() {
243        let policy = SievePolicy::new();
244        assert_eq!(policy.find_victim(1), vec![]);
245    }
246
247    #[test]
248    fn test_sieve_with_sizeof_closure_defined() {
249        let policy = SievePolicy::new();
250
251        let e1 = entry(1);
252        let e2 = entry(2);
253        let e3 = entry(11);
254
255        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
256        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
257        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
258
259        let state = policy.state.lock().unwrap();
260        assert_eq!(state.total_size, 3);
261    }
262
263    #[test]
264    fn test_sieve_sizeof_without_closure() {
265        let policy = SievePolicy::new();
266
267        let e1 = entry(1);
268        let e2 = entry(2);
269        let e3 = entry(11);
270
271        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
272        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
273        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
274
275        let state = policy.state.lock().unwrap();
276        assert_eq!(state.total_size, 3);
277    }
278
279    #[tokio::test]
280    async fn test_sieve_integration() {
281        let advisor = SievePolicy::new();
282        let store = create_cache_store(3000, Box::new(advisor));
283
284        let entry_id1 = EntryID::from(1);
285        let entry_id2 = EntryID::from(2);
286        let entry_id3 = EntryID::from(3);
287
288        store.insert(entry_id1, create_test_arrow_array(100)).await;
289        store.insert(entry_id2, create_test_arrow_array(100)).await;
290        store.insert(entry_id3, create_test_arrow_array(100)).await;
291        assert!(store.index().get(&entry_id1).is_some());
292        assert!(store.index().get(&entry_id2).is_some());
293        assert!(store.index().get(&entry_id3).is_some());
294    }
295}