liquid_cache_storage/cache/cache_policies/
clock.rs1use std::{collections::HashMap, fmt, ptr::NonNull, sync::Arc};
4
5use crate::{
6 cache::{cached_batch::CachedBatchType, utils::EntryID},
7 sync::Mutex,
8};
9
10use super::{
11 CachePolicy,
12 doubly_linked_list::{DoublyLinkedList, DoublyLinkedNode, drop_boxed_node},
13};
14
15type ClockEntrySizeFn = Option<Arc<dyn Fn(&EntryID) -> usize + Send + Sync>>;
16
17#[derive(Default)]
19pub struct ClockPolicy {
20 state: Mutex<ClockInternalState>,
21 size_of: ClockEntrySizeFn,
22}
23
24#[derive(Debug)]
25struct ClockNode {
26 entry_id: EntryID,
27 referenced: bool,
28}
29
30type NodePtr = NonNull<DoublyLinkedNode<ClockNode>>;
31
32#[derive(Debug, Default)]
33struct ClockInternalState {
34 map: HashMap<EntryID, NodePtr>,
35 list: DoublyLinkedList<ClockNode>,
36 hand: Option<NodePtr>,
37 total_size: usize,
38}
39
40impl fmt::Debug for ClockPolicy {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 f.debug_struct("ClockPolicy")
43 .field("state", &self.state)
44 .finish()
45 }
46}
47
48impl ClockPolicy {
49 pub fn new() -> Self {
51 Self::new_with_size_fn(None)
52 }
53
54 pub fn new_with_size_fn(size_of: ClockEntrySizeFn) -> Self {
56 ClockPolicy {
57 state: Mutex::new(ClockInternalState::default()),
58 size_of,
59 }
60 }
61
62 fn entry_size(&self, entry_id: &EntryID) -> usize {
63 self.size_of.as_ref().map(|f| f(entry_id)).unwrap_or(1)
64 }
65}
66
67unsafe impl Send for ClockPolicy {}
68unsafe impl Sync for ClockPolicy {}
69
70impl CachePolicy for ClockPolicy {
71 fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
72 let mut state = self.state.lock().unwrap();
73 if cnt == 0 {
74 return Vec::new();
75 }
76
77 let mut evicted = Vec::with_capacity(cnt);
78 let mut cursor = match state.hand {
79 Some(ptr) => Some(ptr),
80 None => state.list.head(),
81 };
82
83 for _ in 0..cnt {
84 loop {
85 let Some(handle) = cursor else {
86 state.hand = None;
87 break;
88 };
89
90 let mut handle_ptr = handle;
91 if unsafe { handle_ptr.as_ref() }.data.referenced {
92 unsafe { handle_ptr.as_mut() }.data.referenced = false;
93 let next = unsafe { handle_ptr.as_ref().next }.or(state.list.head());
94 cursor = next;
95 state.hand = next;
96 } else {
97 let victim_id = unsafe { handle_ptr.as_ref().data.entry_id };
98 let succ = unsafe { handle_ptr.as_ref().next };
99 state
100 .map
101 .remove(&victim_id)
102 .expect("pointer must exist in map");
103 unsafe {
104 state.list.unlink(handle_ptr);
105 drop_boxed_node(handle_ptr);
106 }
107 state.total_size -= self.entry_size(&victim_id);
108 state.hand = succ.or(state.list.head());
109 evicted.push(victim_id);
110 cursor = state.hand;
111 break;
112 }
113 }
114
115 if state.hand.is_none() {
116 break;
117 }
118 }
119
120 evicted
121 }
122
123 fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
124 let mut state = self.state.lock().unwrap();
125
126 if let Some(mut existing) = state.map.get(entry_id).copied() {
127 unsafe {
128 existing.as_mut().data.referenced = true;
129 }
130 return;
131 }
132
133 let node = DoublyLinkedNode::new(ClockNode {
134 entry_id: *entry_id,
135 referenced: true,
136 });
137 let new_ptr = NonNull::from(Box::leak(node));
138
139 unsafe { state.list.push_back(new_ptr) };
140 if state.hand.is_none() {
141 state.hand = Some(new_ptr);
142 }
143
144 state.map.insert(*entry_id, new_ptr);
145 state.total_size += self.entry_size(entry_id);
146 }
147
148 fn notify_access(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
149 let state = self.state.lock().unwrap();
150 if let Some(mut handle) = state.map.get(entry_id).copied() {
151 unsafe {
152 handle.as_mut().data.referenced = true;
153 }
154 }
155 }
156}
157
158impl Drop for ClockPolicy {
159 fn drop(&mut self) {
160 if let Ok(mut state) = self.state.lock() {
161 let handles: Vec<_> = state.map.drain().map(|(_, ptr)| ptr).collect();
162 for ptr in handles {
163 unsafe {
164 state.list.unlink(ptr);
165 drop_boxed_node(ptr);
166 }
167 }
168 unsafe {
169 state.list.drop_all();
170 }
171 state.hand = None;
172 state.total_size = 0;
173 }
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180 use crate::cache::{
181 cached_batch::CachedBatch,
182 utils::{EntryID, create_cache_store, create_test_arrow_array},
183 };
184
185 fn entry(id: usize) -> EntryID {
186 id.into()
187 }
188
189 #[test]
190 fn test_clock_policy_insertion_order() {
191 let advisor = ClockPolicy::new();
192
193 let entry_id1 = EntryID::from(1);
194 let entry_id2 = EntryID::from(2);
195 let entry_id3 = EntryID::from(3);
196
197 advisor.notify_insert(&entry_id1, CachedBatchType::MemoryArrow);
198 advisor.notify_insert(&entry_id2, CachedBatchType::MemoryArrow);
199 advisor.notify_insert(&entry_id3, CachedBatchType::MemoryArrow);
200
201 assert_eq!(advisor.find_victim(1), vec![entry_id1]);
202 }
203
204 #[test]
205 fn test_clock_policy_sequential_evictions() {
206 let advisor = ClockPolicy::new();
207
208 let entry_id1 = EntryID::from(1);
209 let entry_id2 = EntryID::from(2);
210 let entry_id3 = EntryID::from(3);
211
212 advisor.notify_insert(&entry_id1, CachedBatchType::MemoryArrow);
213 advisor.notify_insert(&entry_id2, CachedBatchType::MemoryArrow);
214 advisor.notify_insert(&entry_id3, CachedBatchType::MemoryArrow);
215
216 assert_eq!(advisor.find_victim(1), vec![entry_id1]);
217 assert_eq!(advisor.find_victim(1), vec![entry_id2]);
218 assert_eq!(advisor.find_victim(1), vec![entry_id3]);
219 }
220
221 #[test]
222 fn test_clock_policy_single_item() {
223 let advisor = ClockPolicy::new();
224
225 let entry_id1 = EntryID::from(1);
226 advisor.notify_insert(&entry_id1, CachedBatchType::MemoryArrow);
227
228 assert_eq!(advisor.find_victim(1), vec![entry_id1]);
229 }
230
231 #[test]
232 fn test_clock_policy_advise_empty() {
233 let advisor = ClockPolicy::new();
234
235 assert_eq!(advisor.find_victim(1), vec![]);
236 }
237
238 #[tokio::test]
239 async fn test_clock_policy_integration_with_store() {
240 let advisor = ClockPolicy::new();
241 let store = create_cache_store(3000, Box::new(advisor));
242
243 let entry_id1 = EntryID::from(1);
244 let entry_id2 = EntryID::from(2);
245 let entry_id3 = EntryID::from(3);
246
247 store.insert(entry_id1, create_test_arrow_array(100)).await;
248 store.insert(entry_id2, create_test_arrow_array(100)).await;
249 store.insert(entry_id3, create_test_arrow_array(100)).await;
250
251 let entry_id4 = EntryID::from(4);
252 store.insert(entry_id4, create_test_arrow_array(100)).await;
253
254 if let Some(data) = store.index().get(&entry_id1) {
255 assert!(matches!(data, CachedBatch::DiskLiquid(_)));
256 }
257 assert!(store.index().get(&entry_id2).is_some());
258 assert!(store.index().get(&entry_id3).is_some());
259 assert!(store.index().get(&entry_id4).is_some());
260 }
261
262 #[test]
263 fn test_clock_policy_size_awareness_with_closure() {
264 let policy =
265 ClockPolicy::new_with_size_fn(Some(Arc::new(
266 |id: &EntryID| {
267 if id.gt(&entry(10)) { 100 } else { 1 }
268 },
269 )));
270
271 let e1 = entry(1);
272 let e2 = entry(2);
273 let e3 = entry(11);
274
275 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
276 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
277 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
278
279 let state = policy.state.lock().unwrap();
280 assert_eq!(state.total_size, 102);
281 }
282
283 #[test]
284 fn test_clock_policy_size_awareness_without_closure() {
285 let policy = ClockPolicy::new();
286
287 let e1 = entry(1);
288 let e2 = entry(2);
289 let e3 = entry(11);
290
291 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
292 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
293 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
294
295 let state = policy.state.lock().unwrap();
296 assert_eq!(state.total_size, 3);
297 }
298
299 #[test]
300 fn test_clock_policy_size_tracking_on_eviction() {
301 let policy =
302 ClockPolicy::new_with_size_fn(Some(Arc::new(
303 |id: &EntryID| {
304 if id.gt(&entry(10)) { 100 } else { 1 }
305 },
306 )));
307
308 let e1 = entry(1);
309 let e2 = entry(2);
310 let e3 = entry(11);
311
312 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
313 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
314 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
315
316 {
317 let state = policy.state.lock().unwrap();
318 assert_eq!(state.total_size, 102);
319 }
320
321 let evicted = policy.find_victim(1);
322 assert_eq!(evicted, vec![e1]);
323
324 {
325 let state = policy.state.lock().unwrap();
326 assert_eq!(state.total_size, 101);
327 }
328
329 let evicted = policy.find_victim(1);
330 assert_eq!(evicted, vec![e2]);
331
332 {
333 let state = policy.state.lock().unwrap();
334 assert_eq!(state.total_size, 100);
335 }
336 }
337
338 #[test]
339 fn test_clock_policy_reinsert_sets_reference_bit() {
340 let policy = ClockPolicy::new();
341 let entry_id = entry(42);
342
343 policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
344
345 {
346 let state = policy.state.lock().unwrap();
347 let mut node_ptr = state.map.get(&entry_id).copied().unwrap();
348 unsafe {
349 node_ptr.as_mut().data.referenced = false;
350 }
351 }
352
353 policy.notify_insert(&entry_id, CachedBatchType::MemoryArrow);
354
355 let state = policy.state.lock().unwrap();
356 let node_ptr = state.map.get(&entry_id).copied().unwrap();
357 unsafe {
358 assert!(node_ptr.as_ref().data.referenced);
359 }
360 assert_eq!(state.map.len(), 1);
361 }
362}