liquid_cache_storage/cache/cache_policies/
clock.rs

1//! CLOCK (second-chance) cache policy implementation with optional size awareness.
2
3use std::{collections::HashMap, fmt, ptr::NonNull, sync::Arc};
4
5use crate::{
6    cache::{cached_batch::CachedBatchType, utils::EntryID},
7    sync::Mutex,
8};
9
10use super::{
11    CachePolicy,
12    doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node},
13};
14
15type ClockEntrySizeFn = Option<Arc<dyn Fn(&EntryID) -> usize + Send + Sync>>;
16
17/// The CLOCK (second-chance) eviction policy with optional size awareness.
18#[derive(Default)]
19pub struct ClockPolicy {
20    state: Mutex<ClockInternalState>,
21    size_of: ClockEntrySizeFn,
22}
23
24#[derive(Debug)]
25struct ClockNode {
26    entry_id: EntryID,
27    referenced: bool,
28}
29
30type NodePtr = NonNull<DoublyLinkedNode<ClockNode>>;
31
32#[derive(Debug, Default)]
33struct ClockInternalState {
34    map: HashMap<EntryID, NodePtr>,
35    list: DoublyLinkedList<ClockNode>,
36    hand: Option<NodePtr>,
37    total_size: usize,
38}
39
40impl fmt::Debug for ClockPolicy {
41    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42        f.debug_struct("ClockPolicy")
43            .field("state", &self.state)
44            .finish()
45    }
46}
47
48impl ClockPolicy {
49    /// Create a new CLOCK policy.
50    pub fn new() -> Self {
51        Self::new_with_size_fn(None)
52    }
53
54    /// Create a new CLOCK policy with size awareness.
55    pub fn new_with_size_fn(size_of: ClockEntrySizeFn) -> Self {
56        ClockPolicy {
57            state: Mutex::new(ClockInternalState::default()),
58            size_of,
59        }
60    }
61
62    fn entry_size(&self, entry_id: &EntryID) -> usize {
63        self.size_of.as_ref().map(|f| f(entry_id)).unwrap_or(1)
64    }
65}
66
67unsafe impl Send for ClockPolicy {}
68unsafe impl Sync for ClockPolicy {}
69
70impl CachePolicy for ClockPolicy {
71    fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
72        let mut state = self.state.lock().unwrap();
73        if cnt == 0 {
74            return Vec::new();
75        }
76
77        let mut evicted = Vec::with_capacity(cnt);
78        let mut cursor = match state.hand {
79            Some(ptr) => Some(ptr),
80            None => state.list.head(),
81        };
82
83        for _ in 0..cnt {
84            loop {
85                let Some(handle) = cursor else {
86                    state.hand = None;
87                    break;
88                };
89
90                let mut handle_ptr = handle;
91                if unsafe { handle_ptr.as_ref() }.data.referenced {
92                    unsafe { handle_ptr.as_mut() }.data.referenced = false;
93                    let next = unsafe { handle_ptr.as_ref().next }.or(state.list.head());
94                    cursor = next;
95                    state.hand = next;
96                } else {
97                    let victim_id = unsafe { handle_ptr.as_ref().data.entry_id };
98                    let succ = unsafe { handle_ptr.as_ref().next };
99                    state
100                        .map
101                        .remove(&victim_id)
102                        .expect("pointer must exist in map");
103                    unsafe {
104                        state.list.unlink(handle_ptr);
105                        drop_boxed_node(handle_ptr);
106                    }
107                    state.total_size -= self.entry_size(&victim_id);
108                    state.hand = succ.or(state.list.head());
109                    evicted.push(victim_id);
110                    cursor = state.hand;
111                    break;
112                }
113            }
114
115            if state.hand.is_none() {
116                break;
117            }
118        }
119
120        evicted
121    }
122
123    fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
124        let mut state = self.state.lock().unwrap();
125
126        if let Some(mut existing) = state.map.get(entry_id).copied() {
127            unsafe {
128                existing.as_mut().data.referenced = true;
129            }
130            return;
131        }
132
133        let node = DoublyLinkedNode::new(ClockNode {
134            entry_id: *entry_id,
135            referenced: true,
136        });
137        let new_ptr = NonNull::from(Box::leak(node));
138
139        unsafe { state.list.push_back(new_ptr) };
140        if state.hand.is_none() {
141            state.hand = Some(new_ptr);
142        }
143
144        state.map.insert(*entry_id, new_ptr);
145        state.total_size += self.entry_size(entry_id);
146    }
147
148    fn notify_access(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
149        let state = self.state.lock().unwrap();
150        if let Some(mut handle) = state.map.get(entry_id).copied() {
151            unsafe {
152                handle.as_mut().data.referenced = true;
153            }
154        }
155    }
156}
157
158impl Drop for ClockPolicy {
159    fn drop(&mut self) {
160        if let Ok(mut state) = self.state.lock() {
161            let handles: Vec<_> = state.map.drain().map(|(_, ptr)| ptr).collect();
162            for ptr in handles {
163                unsafe {
164                    state.list.unlink(ptr);
165                    drop_boxed_node(ptr);
166                }
167            }
168            unsafe {
169                state.list.drop_all();
170            }
171            state.hand = None;
172            state.total_size = 0;
173        }
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180    use crate::cache::{
181        cached_batch::CachedBatch,
182        utils::{EntryID, create_cache_store, create_test_arrow_array},
183    };
184
185    fn entry(id: usize) -> EntryID {
186        id.into()
187    }
188
189    #[test]
190    fn test_clock_policy_insertion_order() {
191        let advisor = ClockPolicy::new();
192
193        let entry_id1 = EntryID::from(1);
194        let entry_id2 = EntryID::from(2);
195        let entry_id3 = EntryID::from(3);
196
197        advisor.notify_insert(&entry_id1, CachedBatchType::MemoryArrow);
198        advisor.notify_insert(&entry_id2, CachedBatchType::MemoryArrow);
199        advisor.notify_insert(&entry_id3, CachedBatchType::MemoryArrow);
200
201        assert_eq!(advisor.find_victim(1), vec![entry_id1]);
202    }
203
204    #[test]
205    fn test_clock_policy_sequential_evictions() {
206        let advisor = ClockPolicy::new();
207
208        let entry_id1 = EntryID::from(1);
209        let entry_id2 = EntryID::from(2);
210        let entry_id3 = EntryID::from(3);
211
212        advisor.notify_insert(&entry_id1, CachedBatchType::MemoryArrow);
213        advisor.notify_insert(&entry_id2, CachedBatchType::MemoryArrow);
214        advisor.notify_insert(&entry_id3, CachedBatchType::MemoryArrow);
215
216        assert_eq!(advisor.find_victim(1), vec![entry_id1]);
217        assert_eq!(advisor.find_victim(1), vec![entry_id2]);
218        assert_eq!(advisor.find_victim(1), vec![entry_id3]);
219    }
220
221    #[test]
222    fn test_clock_policy_single_item() {
223        let advisor = ClockPolicy::new();
224
225        let entry_id1 = EntryID::from(1);
226        advisor.notify_insert(&entry_id1, CachedBatchType::MemoryArrow);
227
228        assert_eq!(advisor.find_victim(1), vec![entry_id1]);
229    }
230
231    #[test]
232    fn test_clock_policy_advise_empty() {
233        let advisor = ClockPolicy::new();
234
235        assert_eq!(advisor.find_victim(1), vec![]);
236    }
237
238    #[tokio::test]
239    async fn test_clock_policy_integration_with_store() {
240        let advisor = ClockPolicy::new();
241        let store = create_cache_store(3000, Box::new(advisor));
242
243        let entry_id1 = EntryID::from(1);
244        let entry_id2 = EntryID::from(2);
245        let entry_id3 = EntryID::from(3);
246
247        store.insert(entry_id1, create_test_arrow_array(100)).await;
248        store.insert(entry_id2, create_test_arrow_array(100)).await;
249        store.insert(entry_id3, create_test_arrow_array(100)).await;
250
251        let entry_id4 = EntryID::from(4);
252        store.insert(entry_id4, create_test_arrow_array(100)).await;
253
254        if let Some(data) = store.index().get(&entry_id1) {
255            assert!(matches!(data, CachedBatch::DiskLiquid(_)));
256        }
257        assert!(store.index().get(&entry_id2).is_some());
258        assert!(store.index().get(&entry_id3).is_some());
259        assert!(store.index().get(&entry_id4).is_some());
260    }
261
262    #[test]
263    fn test_clock_policy_size_awareness_with_closure() {
264        let policy =
265            ClockPolicy::new_with_size_fn(Some(Arc::new(
266                |id: &EntryID| {
267                    if id.gt(&entry(10)) { 100 } else { 1 }
268                },
269            )));
270
271        let e1 = entry(1);
272        let e2 = entry(2);
273        let e3 = entry(11);
274
275        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
276        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
277        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
278
279        let state = policy.state.lock().unwrap();
280        assert_eq!(state.total_size, 102);
281    }
282
283    #[test]
284    fn test_clock_policy_size_awareness_without_closure() {
285        let policy = ClockPolicy::new();
286
287        let e1 = entry(1);
288        let e2 = entry(2);
289        let e3 = entry(11);
290
291        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
292        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
293        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
294
295        let state = policy.state.lock().unwrap();
296        assert_eq!(state.total_size, 3);
297    }
298
299    #[test]
300    fn test_clock_policy_size_tracking_on_eviction() {
301        let policy =
302            ClockPolicy::new_with_size_fn(Some(Arc::new(
303                |id: &EntryID| {
304                    if id.gt(&entry(10)) { 100 } else { 1 }
305                },
306            )));
307
308        let e1 = entry(1);
309        let e2 = entry(2);
310        let e3 = entry(11);
311
312        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
313        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
314        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
315
316        {
317            let state = policy.state.lock().unwrap();
318            assert_eq!(state.total_size, 102);
319        }
320
321        let evicted = policy.find_victim(1);
322        assert_eq!(evicted, vec![e1]);
323
324        {
325            let state = policy.state.lock().unwrap();
326            assert_eq!(state.total_size, 101);
327        }
328
329        let evicted = policy.find_victim(1);
330        assert_eq!(evicted, vec![e2]);
331
332        {
333            let state = policy.state.lock().unwrap();
334            assert_eq!(state.total_size, 100);
335        }
336    }
337
338    #[test]
339    fn test_clock_policy_reinsert_sets_reference_bit() {
340        let policy = ClockPolicy::new();
341        let entry_id = entry(42);
342
343        policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
344
345        {
346            let state = policy.state.lock().unwrap();
347            let mut node_ptr = state.map.get(&entry_id).copied().unwrap();
348            unsafe {
349                node_ptr.as_mut().data.referenced = false;
350            }
351        }
352
353        policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
354
355        let state = policy.state.lock().unwrap();
356        let node_ptr = state.map.get(&entry_id).copied().unwrap();
357        unsafe {
358            assert!(node_ptr.as_ref().data.referenced);
359        }
360        assert_eq!(state.map.len(), 1);
361    }
362}