liquid_cache_storage/cache/cache_policies/
s3_fifo.rs

1//! S3 FIFO cache policy implementation
2
3use std::collections::{HashMap, HashSet, VecDeque};
4use std::fmt;
5use std::sync::Mutex;
6
7use crate::cache::{cached_batch::CachedBatchType, utils::EntryID};
8use crate::cache_policies::CachePolicy;
9
10type EntryFreq = u8;
11
12#[derive(Debug, Default)]
13struct S3FifoInternalState {
14    small: VecDeque<EntryID>,
15    main: VecDeque<EntryID>,
16    ghost: VecDeque<EntryID>,
17    ghost_set: HashSet<EntryID>,
18
19    frequency: HashMap<EntryID, EntryFreq>,
20
21    small_queue_size: usize,
22    main_queue_size: usize,
23    total_size: usize,
24}
25
26impl S3FifoInternalState {
27    fn cap_frequency(freq: u8) -> u8 {
28        std::cmp::min(freq, 3)
29    }
30
31    fn inc_frequency(&mut self, entry_id: &EntryID) {
32        if let Some(freq) = self.frequency.get_mut(entry_id) {
33            *freq = Self::cap_frequency(*freq + 1);
34        }
35    }
36
37    fn dec_frequency(&mut self, entry_id: &EntryID) {
38        if let Some(freq) = self.frequency.get_mut(entry_id) {
39            *freq = freq.saturating_sub(1);
40        }
41    }
42
43    fn inc_small_queue_size(&mut self, size: usize) {
44        self.small_queue_size += size;
45        self.total_size += size;
46    }
47
48    fn dec_small_queue_size(&mut self, size: usize) {
49        self.small_queue_size -= size;
50        self.total_size -= size;
51    }
52
53    fn inc_main_queue_size(&mut self, size: usize) {
54        self.main_queue_size += size;
55        self.total_size += size;
56    }
57
58    fn dec_main_queue_size(&mut self, size: usize) {
59        self.main_queue_size -= size;
60        self.total_size -= size;
61    }
62
63    fn small_queue_fraction(&self) -> f32 {
64        if self.total_size == 0 {
65            0.0
66        } else {
67            self.small_queue_size as f32 / self.total_size as f32
68        }
69    }
70
71    fn check_if_entry_exists_in_small_or_main(&self, entry_id: &EntryID) -> bool {
72        self.frequency.contains_key(entry_id) && !self.ghost_set.contains(entry_id)
73    }
74}
75
76/// The policy that implements object size aware S3Fifo algorithm using Deque.
77#[derive(Default)]
78pub struct S3FifoPolicy {
79    state: Mutex<S3FifoInternalState>,
80}
81
82impl fmt::Debug for S3FifoPolicy {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("S3FifoPolicy")
85            .field("state", &self.state)
86            .finish()
87    }
88}
89
90unsafe impl Send for S3FifoPolicy {}
91unsafe impl Sync for S3FifoPolicy {}
92
93impl S3FifoPolicy {
94    /// Create a new [`S3FifoPolicy`].
95    pub fn new() -> Self {
96        Self {
97            state: Mutex::new(S3FifoInternalState::default()),
98        }
99    }
100
101    fn entry_size(&self, _entry_id: &EntryID) -> usize {
102        1
103    }
104
105    fn evict_from_small(&self, state: &mut S3FifoInternalState) -> Option<EntryID> {
106        let mut is_evicted = false;
107        let mut victim: Option<EntryID> = None;
108
109        while !is_evicted && !state.small.is_empty() {
110            let Some(element) = state.small.pop_back() else {
111                break;
112            };
113            let freq = state.frequency.get(&element).copied().unwrap_or(0);
114            let entry_size = self.entry_size(&element);
115            state.dec_small_queue_size(entry_size);
116
117            if freq > 1 {
118                state.main.push_front(element);
119                state.inc_main_queue_size(entry_size);
120                state.frequency.insert(element, 0);
121            } else {
122                // Move to ghost queue
123                state.ghost.push_front(element);
124                state.ghost_set.insert(element);
125                state.frequency.remove(&element);
126                is_evicted = true;
127                victim = Some(element);
128            }
129        }
130        victim
131    }
132
133    fn evict_from_main(&self, state: &mut S3FifoInternalState) -> Option<EntryID> {
134        let mut is_evicted = false;
135        let mut victim: Option<EntryID> = None;
136
137        while !is_evicted && !state.main.is_empty() {
138            let Some(element) = state.main.pop_back() else {
139                break;
140            };
141
142            let freq = state.frequency.get(&element).copied().unwrap_or(0);
143            let entry_size = self.entry_size(&element);
144            state.dec_main_queue_size(entry_size);
145
146            if freq > 0 {
147                state.main.push_front(element);
148                state.dec_frequency(&element);
149                state.inc_main_queue_size(entry_size);
150            } else {
151                state.frequency.remove(&element);
152                is_evicted = true;
153                victim = Some(element);
154            }
155        }
156        victim
157    }
158}
159
160impl CachePolicy for S3FifoPolicy {
161    fn find_victim(&self, cnt: usize) -> Vec<EntryID> {
162        let mut state = self.state.lock().unwrap();
163        let mut advices = Vec::with_capacity(cnt);
164        let threshold_for_small_eviction = 0.1;
165        while advices.len() < cnt && state.total_size > 0 {
166            let victim = if !state.small.is_empty()
167                && state.small_queue_fraction() >= threshold_for_small_eviction
168            {
169                self.evict_from_small(&mut state)
170            } else {
171                self.evict_from_main(&mut state)
172            };
173
174            if let Some(v) = victim {
175                advices.push(v);
176            }
177        }
178        advices
179    }
180
181    fn notify_insert(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
182        let mut state = self.state.lock().unwrap();
183        let entry_size = self.entry_size(entry_id);
184
185        if state.check_if_entry_exists_in_small_or_main(entry_id) {
186            state.inc_frequency(entry_id);
187        } else if state.ghost_set.contains(entry_id) {
188            state.ghost_set.remove(entry_id);
189            state.ghost.retain(|x| *x != *entry_id);
190            state.main.push_front(*entry_id);
191            state.inc_main_queue_size(entry_size);
192        } else {
193            state.small.push_front(*entry_id);
194            state.inc_small_queue_size(entry_size);
195            state.frequency.insert(*entry_id, 0);
196        }
197    }
198
199    fn notify_access(&self, entry_id: &EntryID, _batch_type: CachedBatchType) {
200        let mut state = self.state.lock().unwrap();
201        if state.check_if_entry_exists_in_small_or_main(entry_id) {
202            state.inc_frequency(entry_id);
203        }
204    }
205}
206
207impl Drop for S3FifoPolicy {
208    fn drop(&mut self) {
209        let mut state = self.state.lock().unwrap();
210        state.small.clear();
211        state.main.clear();
212        state.ghost.clear();
213        state.ghost_set.clear();
214        state.frequency.clear();
215        state.total_size = 0;
216        state.small_queue_size = 0;
217        state.main_queue_size = 0;
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use crate::cache::utils::EntryID;
225
226    fn entry(id: usize) -> EntryID {
227        id.into()
228    }
229
230    #[test]
231    fn test_s3fifo_basic_insert_eviction() {
232        let policy = S3FifoPolicy::new();
233        let e1 = entry(1);
234        let e2 = entry(2);
235        let e3 = entry(3);
236
237        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
238        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
239        policy.notify_insert(&e3, CachedBatchType::MemoryArrow);
240
241        let evicted = policy.find_victim(1);
242        assert_eq!(evicted.len(), 1);
243    }
244
245    #[test]
246    fn test_s3fifo_frequency_increase() {
247        let policy = S3FifoPolicy::new();
248        let e1 = entry(1);
249        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
250        policy.notify_access(&e1, CachedBatchType::MemoryArrow);
251
252        let state = policy.state.lock().unwrap();
253        assert_eq!(*state.frequency.get(&e1).unwrap(), 1);
254    }
255
256    #[test]
257    fn test_s3fifo_eviction_order() {
258        let policy = S3FifoPolicy::new();
259        let e1 = entry(1);
260        let e2 = entry(2);
261
262        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
263        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
264
265        let evicted = policy.find_victim(1);
266        assert_eq!(evicted[0], e1);
267    }
268
269    #[test]
270    fn test_s3fifo_ghost_promote() {
271        let policy = S3FifoPolicy::new();
272        let e1 = entry(1);
273
274        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
275        let evicted = policy.find_victim(1);
276        assert_eq!(evicted[0], e1);
277
278        // Re-insert evicted entry from ghost
279        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
280        let state = policy.state.lock().unwrap();
281        assert!(state.main.contains(&e1));
282        assert!(!state.ghost_set.contains(&e1));
283    }
284
285    #[test]
286    fn test_s3fifo_size_aware_fraction() {
287        let policy = S3FifoPolicy::new();
288        let e1 = entry(20);
289        let e2 = entry(30);
290
291        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
292        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
293
294        let state = policy.state.lock().unwrap();
295        assert_eq!(state.small_queue_size, 2);
296        assert_eq!(state.small_queue_fraction(), 1.0);
297    }
298
299    #[test]
300    fn test_insert_and_access_updates_freq() {
301        let policy = S3FifoPolicy::new();
302        let e1 = entry(1);
303
304        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
305        policy.notify_access(&e1, CachedBatchType::MemoryArrow);
306        policy.notify_access(&e1, CachedBatchType::MemoryArrow);
307
308        let state = policy.state.lock().unwrap();
309        assert_eq!(*state.frequency.get(&e1).unwrap(), 2); // capped at 3
310    }
311
312    #[test]
313    fn test_freq_cap_at_three() {
314        let policy = S3FifoPolicy::new();
315        let e1 = entry(1);
316
317        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
318        for _ in 0..10 {
319            policy.notify_access(&e1, CachedBatchType::MemoryArrow);
320        }
321
322        let state = policy.state.lock().unwrap();
323        assert_eq!(*state.frequency.get(&e1).unwrap(), 3);
324    }
325
326    #[test]
327    fn test_eviction_from_s_to_ghost() {
328        let policy = S3FifoPolicy::new();
329        let e1 = entry(1);
330
331        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
332        let evicted = policy.find_victim(1);
333
334        assert_eq!(evicted[0], e1);
335        let state = policy.state.lock().unwrap();
336        assert!(state.ghost_set.contains(&e1));
337        assert!(state.ghost.contains(&e1));
338    }
339
340    #[test]
341    fn test_eviction_from_main_and_reinsertion_logic() {
342        let policy = S3FifoPolicy::new();
343        let e1 = entry(1);
344        let e2 = entry(2);
345        let _e3 = entry(3);
346
347        let mut state = policy.state.lock().unwrap();
348        state.main.push_front(e1);
349        state.frequency.insert(e1, 1);
350        state.main_queue_size += 1;
351
352        state.main.push_front(e2);
353        state.frequency.insert(e2, 2);
354        state.main_queue_size += 1;
355        state.total_size += 2;
356        drop(state);
357
358        let evicted = policy.find_victim(2);
359        assert_eq!(evicted.len(), 2);
360    }
361
362    #[test]
363    fn test_s3fifo_reinsert_does_not_duplicate_entry() {
364        let policy = S3FifoPolicy::new();
365        let e1 = entry(1);
366        let e2 = entry(2);
367
368        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
369        policy.notify_insert(&e2, CachedBatchType::MemoryArrow);
370
371        policy.notify_insert(&e1, CachedBatchType::MemoryArrow);
372
373        let state = policy.state.lock().unwrap();
374        let occurrences = state.small.iter().filter(|&&id| id == e1).count()
375            + state.main.iter().filter(|&&id| id == e1).count();
376        assert_eq!(occurrences, 1);
377        assert_eq!(*state.frequency.get(&e1).unwrap(), 1);
378    }
379}