liquid_cache_storage/cache/cache_policies/
s3_fifo.rs1use std::collections::{HashMap, HashSet, VecDeque};
4use std::fmt;
5use std::sync::Mutex;
6
7use crate::cache::{cached_batch::CachedBatchType, utils::EntryID};
8use crate::cache_policies::CachePolicy;
9
10type EntryFreq = u8;
11
12#[derive(Debug, Default)]
13struct S3FifoInternalState {
14 small: VecDeque<EntryID>,
15 main: VecDeque<EntryID>,
16 ghost: VecDeque<EntryID>,
17 ghost_set: HashSet<EntryID>,
18
19 frequency: HashMap<EntryID, EntryFreq>,
20
21 small_queue_size: usize,
22 main_queue_size: usize,
23 total_size: usize,
24}
25
26impl S3FifoInternalState {
27 fn cap_frequency(freq: u8) -> u8 {
28 std::cmp::min(freq, 3)
29 }
30
31 fn inc_frequency(&mut self, entry_id: &EntryID) {
32 if let Some(freq) = self.frequency.get_mut(entry_id) {
33 *freq = Self::cap_frequency(*freq + 1);
34 }
35 }
36
37 fn dec_frequency(&mut self, entry_id: &EntryID) {
38 if let Some(freq) = self.frequency.get_mut(entry_id) {
39 *freq = freq.saturating_sub(1);
40 }
41 }
42
43 fn inc_small_queue_size(&mut self, size: usize) {
44 self.small_queue_size += size;
45 self.total_size += size;
46 }
47
48 fn dec_small_queue_size(&mut self, size: usize) {
49 self.small_queue_size -= size;
50 self.total_size -= size;
51 }
52
53 fn inc_main_queue_size(&mut self, size: usize) {
54 self.main_queue_size += size;
55 self.total_size += size;
56 }
57
58 fn dec_main_queue_size(&mut self, size: usize) {
59 self.main_queue_size -= size;
60 self.total_size -= size;
61 }
62
63 fn small_queue_fraction(&self) -> f32 {
64 if self.total_size == 0 {
65 0.0
66 } else {
67 self.small_queue_size as f32 / self.total_size as f32
68 }
69 }
70
71 fn check_if_entry_exists_in_small_or_main(&self, entry_id: &EntryID) -> bool {
72 self.frequency.contains_key(entry_id) && !self.ghost_set.contains(entry_id)
73 }
74}
75
76#[derive(Default)]
78pub struct S3FifoPolicy {
79 state: Mutex<S3FifoInternalState>,
80}
81
82impl fmt::Debug for S3FifoPolicy {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 f.debug_struct("S3FifoPolicy")
85 .field("state", &self.state)
86 .finish()
87 }
88}
89
90unsafe impl Send for S3FifoPolicy {}
91unsafe impl Sync for S3FifoPolicy {}
92
93impl S3FifoPolicy {
94 pub fn new() -> Self {
96 Self {
97 state: Mutex::new(S3FifoInternalState::default()),
98 }
99 }
100
101 fn entry_size(&self, _entry_id: &EntryID) -> usize {
102 1
103 }
104
105 fn evict_from_small(&self, state: &mut S3FifoInternalState) -> Option<EntryID> {
106 let mut is_evicted = false;
107 let mut victim: Option<EntryID> = None;
108
109 while !is_evicted && !state.small.is_empty() {
110 let Some(element) = state.small.pop_back() else {
111 break;
112 };
113 let freq = state.frequency.get(&element).copied().unwrap_or(0);
114 let entry_size = self.entry_size(&element);
115 state.dec_small_queue_size(entry_size);
116
117 if freq > 1 {
118 state.main.push_front(element);
119 state.inc_main_queue_size(entry_size);
120 state.frequency.insert(element, 0);
121 } else {
122 state.ghost.push_front(element);
124 state.ghost_set.insert(element);
125 state.frequency.remove(&element);
126 is_evicted = true;
127 victim = Some(element);
128 }
129 }
130 victim
131 }
132
133 fn evict_from_main(&self, state: &mut S3FifoInternalState) -> Option<EntryID> {
134 let mut is_evicted = false;
135 let mut victim: Option<EntryID> = None;
136
137 while !is_evicted && !state.main.is_empty() {
138 let Some(element) = state.main.pop_back() else {
139 break;
140 };
141
142 let freq = state.frequency.get(&element).copied().unwrap_or(0);
143 let entry_size = self.entry_size(&element);
144 state.dec_main_queue_size(entry_size);
145
146 if freq > 0 {
147 state.main.push_front(element);
148 state.dec_frequency(&element);
149 state.inc_main_queue_size(entry_size);
150 } else {
151 state.frequency.remove(&element);
152 is_evicted = true;
153 victim = Some(element);
154 }
155 }
156 victim
157 }
158}
159
160impl CachePolicy for S3FifoPolicy {
161 fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
162 let mut state = self.state.lock().unwrap();
163 let mut advices = Vec::with_capacity(cnt);
164 let threshold_for_small_eviction = 0.1;
165 while advices.len() < cnt && state.total_size > 0 {
166 let victim = if !state.small.is_empty()
167 && state.small_queue_fraction() >= threshold_for_small_eviction
168 {
169 self.evict_from_small(&mut state)
170 } else {
171 self.evict_from_main(&mut state)
172 };
173
174 if let Some(v) = victim {
175 advices.push(v);
176 }
177 }
178 advices
179 }
180
181 fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
182 let mut state = self.state.lock().unwrap();
183 let entry_size = self.entry_size(entry_id);
184
185 if state.check_if_entry_exists_in_small_or_main(entry_id) {
186 state.inc_frequency(entry_id);
187 } else if state.ghost_set.contains(entry_id) {
188 state.ghost_set.remove(entry_id);
189 state.ghost.retain(|x| *x != *entry_id);
190 state.main.push_front(*entry_id);
191 state.inc_main_queue_size(entry_size);
192 } else {
193 state.small.push_front(*entry_id);
194 state.inc_small_queue_size(entry_size);
195 state.frequency.insert(*entry_id, 0);
196 }
197 }
198
199 fn notify_access(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
200 let mut state = self.state.lock().unwrap();
201 if state.check_if_entry_exists_in_small_or_main(entry_id) {
202 state.inc_frequency(entry_id);
203 }
204 }
205}
206
207impl Drop for S3FifoPolicy {
208 fn drop(&mut self) {
209 let mut state = self.state.lock().unwrap();
210 state.small.clear();
211 state.main.clear();
212 state.ghost.clear();
213 state.ghost_set.clear();
214 state.frequency.clear();
215 state.total_size = 0;
216 state.small_queue_size = 0;
217 state.main_queue_size = 0;
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::cache::utils::EntryID;
225
226 fn entry(id: usize) -> EntryID {
227 id.into()
228 }
229
230 #[test]
231 fn test_s3fifo_basic_insert_eviction() {
232 let policy = S3FifoPolicy::new();
233 let e1 = entry(1);
234 let e2 = entry(2);
235 let e3 = entry(3);
236
237 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
238 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
239 policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
240
241 let evicted = policy.find_victim(1);
242 assert_eq!(evicted.len(), 1);
243 }
244
245 #[test]
246 fn test_s3fifo_frequency_increase() {
247 let policy = S3FifoPolicy::new();
248 let e1 = entry(1);
249 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
250 policy.notify_access(&e1, CachedBatchType::MemoryArrow);
251
252 let state = policy.state.lock().unwrap();
253 assert_eq!(*state.frequency.get(&e1).unwrap(), 1);
254 }
255
256 #[test]
257 fn test_s3fifo_eviction_order() {
258 let policy = S3FifoPolicy::new();
259 let e1 = entry(1);
260 let e2 = entry(2);
261
262 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
263 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
264
265 let evicted = policy.find_victim(1);
266 assert_eq!(evicted[0], e1);
267 }
268
269 #[test]
270 fn test_s3fifo_ghost_promote() {
271 let policy = S3FifoPolicy::new();
272 let e1 = entry(1);
273
274 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
275 let evicted = policy.find_victim(1);
276 assert_eq!(evicted[0], e1);
277
278 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
280 let state = policy.state.lock().unwrap();
281 assert!(state.main.contains(&e1));
282 assert!(!state.ghost_set.contains(&e1));
283 }
284
285 #[test]
286 fn test_s3fifo_size_aware_fraction() {
287 let policy = S3FifoPolicy::new();
288 let e1 = entry(20);
289 let e2 = entry(30);
290
291 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
292 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
293
294 let state = policy.state.lock().unwrap();
295 assert_eq!(state.small_queue_size, 2);
296 assert_eq!(state.small_queue_fraction(), 1.0);
297 }
298
299 #[test]
300 fn test_insert_and_access_updates_freq() {
301 let policy = S3FifoPolicy::new();
302 let e1 = entry(1);
303
304 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
305 policy.notify_access(&e1, CachedBatchType::MemoryArrow);
306 policy.notify_access(&e1, CachedBatchType::MemoryArrow);
307
308 let state = policy.state.lock().unwrap();
309 assert_eq!(*state.frequency.get(&e1).unwrap(), 2); }
311
312 #[test]
313 fn test_freq_cap_at_three() {
314 let policy = S3FifoPolicy::new();
315 let e1 = entry(1);
316
317 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
318 for _ in 0..10 {
319 policy.notify_access(&e1, CachedBatchType::MemoryArrow);
320 }
321
322 let state = policy.state.lock().unwrap();
323 assert_eq!(*state.frequency.get(&e1).unwrap(), 3);
324 }
325
326 #[test]
327 fn test_eviction_from_s_to_ghost() {
328 let policy = S3FifoPolicy::new();
329 let e1 = entry(1);
330
331 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
332 let evicted = policy.find_victim(1);
333
334 assert_eq!(evicted[0], e1);
335 let state = policy.state.lock().unwrap();
336 assert!(state.ghost_set.contains(&e1));
337 assert!(state.ghost.contains(&e1));
338 }
339
340 #[test]
341 fn test_eviction_from_main_and_reinsertion_logic() {
342 let policy = S3FifoPolicy::new();
343 let e1 = entry(1);
344 let e2 = entry(2);
345 let _e3 = entry(3);
346
347 let mut state = policy.state.lock().unwrap();
348 state.main.push_front(e1);
349 state.frequency.insert(e1, 1);
350 state.main_queue_size += 1;
351
352 state.main.push_front(e2);
353 state.frequency.insert(e2, 2);
354 state.main_queue_size += 1;
355 state.total_size += 2;
356 drop(state);
357
358 let evicted = policy.find_victim(2);
359 assert_eq!(evicted.len(), 2);
360 }
361
362 #[test]
363 fn test_s3fifo_reinsert_does_not_duplicate_entry() {
364 let policy = S3FifoPolicy::new();
365 let e1 = entry(1);
366 let e2 = entry(2);
367
368 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
369 policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
370
371 policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
372
373 let state = policy.state.lock().unwrap();
374 let occurrences = state.small.iter().filter(|&&id| id == e1).count()
375 + state.main.iter().filter(|&&id| id == e1).count();
376 assert_eq!(occurrences, 1);
377 assert_eq!(*state.frequency.get(&e1).unwrap(), 1);
378 }
379}