foyer_memory/eviction/
s3fifo.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashSet, VecDeque},
17    mem::offset_of,
18    sync::{
19        atomic::{AtomicU8, Ordering},
20        Arc,
21    },
22};
23
24use foyer_common::{
25    code::{Key, Value},
26    strict_assert, strict_assert_eq,
27};
28use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
29use serde::{Deserialize, Serialize};
30
31use super::{Eviction, Op};
32use crate::{
33    error::{Error, Result},
34    record::{CacheHint, Record},
35};
36
37/// S3Fifo eviction algorithm config.
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct S3FifoConfig {
40    /// Capacity ratio of the small S3FIFO queue.
41    pub small_queue_capacity_ratio: f64,
42    /// Capacity ratio of the ghost S3FIFO queue.
43    pub ghost_queue_capacity_ratio: f64,
44    /// Minimum access times when population entry from small queue to main queue.
45    pub small_to_main_freq_threshold: u8,
46}
47
48impl Default for S3FifoConfig {
49    fn default() -> Self {
50        Self {
51            small_queue_capacity_ratio: 0.1,
52            ghost_queue_capacity_ratio: 1.0,
53            small_to_main_freq_threshold: 1,
54        }
55    }
56}
57
58/// S3Fifo eviction algorithm hint.
59#[derive(Debug, Clone, Default)]
60pub struct S3FifoHint;
61
62impl From<CacheHint> for S3FifoHint {
63    fn from(_: CacheHint) -> Self {
64        S3FifoHint
65    }
66}
67
68impl From<S3FifoHint> for CacheHint {
69    fn from(_: S3FifoHint) -> Self {
70        CacheHint::Normal
71    }
72}
73
74#[derive(Debug, PartialEq, Eq)]
75enum Queue {
76    None,
77    Main,
78    Small,
79}
80
81impl Default for Queue {
82    fn default() -> Self {
83        Self::None
84    }
85}
86
87/// S3Fifo eviction algorithm hint.
88#[derive(Debug, Default)]
89pub struct S3FifoState {
90    link: LinkedListAtomicLink,
91    frequency: AtomicU8,
92    queue: Queue,
93}
94
95impl S3FifoState {
96    const MAX_FREQUENCY: u8 = 3;
97
98    fn frequency(&self) -> u8 {
99        self.frequency.load(Ordering::Acquire)
100    }
101
102    fn set_frequency(&self, val: u8) {
103        self.frequency.store(val, Ordering::Release)
104    }
105
106    fn inc_frequency(&self) -> u8 {
107        self.frequency
108            .fetch_update(Ordering::Release, Ordering::Acquire, |v| {
109                Some(std::cmp::min(Self::MAX_FREQUENCY, v + 1))
110            })
111            .unwrap()
112    }
113
114    fn dec_frequency(&self) -> u8 {
115        self.frequency
116            .fetch_update(Ordering::Release, Ordering::Acquire, |v| Some(v.saturating_sub(1)))
117            .unwrap()
118    }
119}
120
121intrusive_adapter! { Adapter<K, V> = Arc<Record<S3Fifo<K, V>>>: Record<S3Fifo<K, V>> { ?offset = Record::<S3Fifo<K, V>>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value }
122
123pub struct S3Fifo<K, V>
124where
125    K: Key,
126    V: Value,
127{
128    ghost_queue: GhostQueue,
129    small_queue: LinkedList<Adapter<K, V>>,
130    main_queue: LinkedList<Adapter<K, V>>,
131
132    small_weight_capacity: usize,
133
134    small_weight: usize,
135    main_weight: usize,
136
137    small_to_main_freq_threshold: u8,
138
139    config: S3FifoConfig,
140}
141
142impl<K, V> S3Fifo<K, V>
143where
144    K: Key,
145    V: Value,
146{
147    fn evict(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
148        // TODO(MrCroxx): Use `let_chains` here after it is stable.
149        if self.small_weight > self.small_weight_capacity {
150            if let Some(record) = self.evict_small() {
151                return Some(record);
152            }
153        }
154        if let Some(record) = self.evict_main() {
155            return Some(record);
156        }
157        self.evict_small_force()
158    }
159
160    #[expect(clippy::never_loop)]
161    fn evict_small_force(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
162        while let Some(record) = self.small_queue.pop_front() {
163            let state = unsafe { &mut *record.state().get() };
164            state.queue = Queue::None;
165            state.set_frequency(0);
166            self.small_weight -= record.weight();
167            return Some(record);
168        }
169        None
170    }
171
172    fn evict_small(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
173        while let Some(record) = self.small_queue.pop_front() {
174            let state = unsafe { &mut *record.state().get() };
175            if state.frequency() >= self.small_to_main_freq_threshold {
176                state.queue = Queue::Main;
177                self.small_weight -= record.weight();
178                self.main_weight += record.weight();
179                self.main_queue.push_back(record);
180            } else {
181                state.queue = Queue::None;
182                state.set_frequency(0);
183                self.small_weight -= record.weight();
184
185                self.ghost_queue.push(record.hash(), record.weight());
186
187                return Some(record);
188            }
189        }
190        None
191    }
192
193    fn evict_main(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
194        while let Some(record) = self.main_queue.pop_front() {
195            let state = unsafe { &mut *record.state().get() };
196            if state.dec_frequency() > 0 {
197                self.main_queue.push_back(record);
198            } else {
199                state.queue = Queue::None;
200                self.main_weight -= record.weight();
201                return Some(record);
202            }
203        }
204        None
205    }
206}
207
208impl<K, V> Eviction for S3Fifo<K, V>
209where
210    K: Key,
211    V: Value,
212{
213    type Config = S3FifoConfig;
214    type Key = K;
215    type Value = V;
216    type Hint = S3FifoHint;
217    type State = S3FifoState;
218
219    fn new(capacity: usize, config: &Self::Config) -> Self
220    where
221        Self: Sized,
222    {
223        assert!(
224            config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0,
225            "small_queue_capacity_ratio must be in (0, 1), given: {}",
226            config.small_queue_capacity_ratio
227        );
228
229        let config = config.clone();
230
231        let ghost_queue_capacity = (capacity as f64 * config.ghost_queue_capacity_ratio) as usize;
232        let ghost_queue = GhostQueue::new(ghost_queue_capacity);
233        let small_weight_capacity = (capacity as f64 * config.small_queue_capacity_ratio) as usize;
234
235        Self {
236            ghost_queue,
237            small_queue: LinkedList::new(Adapter::new()),
238            main_queue: LinkedList::new(Adapter::new()),
239            small_weight_capacity,
240            small_weight: 0,
241            main_weight: 0,
242            small_to_main_freq_threshold: config.small_to_main_freq_threshold.min(S3FifoState::MAX_FREQUENCY),
243            config,
244        }
245    }
246
247    fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
248        if let Some(config) = config {
249            if config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0 {
250                return Err(Error::ConfigError(format!(
251                    "small_queue_capacity_ratio must be in (0, 1), given: {}",
252                    config.small_queue_capacity_ratio
253                )));
254            }
255            self.config = config.clone();
256        }
257
258        let ghost_queue_capacity = (capacity as f64 * self.config.ghost_queue_capacity_ratio) as usize;
259        let small_weight_capacity = (capacity as f64 * self.config.small_queue_capacity_ratio) as usize;
260        self.ghost_queue.update(ghost_queue_capacity);
261        self.small_weight_capacity = small_weight_capacity;
262
263        Ok(())
264    }
265
266    fn push(&mut self, record: Arc<Record<Self>>) {
267        let state = unsafe { &mut *record.state().get() };
268
269        strict_assert_eq!(state.frequency(), 0);
270        strict_assert_eq!(state.queue, Queue::None);
271
272        record.set_in_eviction(true);
273
274        if self.ghost_queue.contains(record.hash()) {
275            state.queue = Queue::Main;
276            self.main_weight += record.weight();
277            self.main_queue.push_back(record);
278        } else {
279            state.queue = Queue::Small;
280            self.small_weight += record.weight();
281            self.small_queue.push_back(record);
282        }
283    }
284
285    fn pop(&mut self) -> Option<Arc<Record<Self>>> {
286        if let Some(record) = self.evict() {
287            // `handle.queue` has already been set with `evict()`
288            record.set_in_eviction(false);
289            Some(record)
290        } else {
291            strict_assert!(self.small_queue.is_empty());
292            strict_assert!(self.main_queue.is_empty());
293            None
294        }
295    }
296
297    fn remove(&mut self, record: &Arc<Record<Self>>) {
298        let state = unsafe { &mut *record.state().get() };
299
300        match state.queue {
301            Queue::None => unreachable!(),
302            Queue::Main => {
303                unsafe { self.main_queue.remove_from_ptr(Arc::as_ptr(record)) };
304
305                state.queue = Queue::None;
306                state.set_frequency(0);
307                record.set_in_eviction(false);
308
309                self.main_weight -= record.weight();
310            }
311            Queue::Small => {
312                unsafe { self.small_queue.remove_from_ptr(Arc::as_ptr(record)) };
313
314                state.queue = Queue::None;
315                state.set_frequency(0);
316                record.set_in_eviction(false);
317
318                self.small_weight -= record.weight();
319            }
320        }
321    }
322
323    fn acquire() -> Op<Self> {
324        Op::immutable(|_: &Self, record| {
325            let state = unsafe { &mut *record.state().get() };
326            state.inc_frequency();
327        })
328    }
329
330    fn release() -> Op<Self> {
331        Op::noop()
332    }
333}
334
335// TODO(MrCroxx): use ordered hash map?
336struct GhostQueue {
337    counts: HashSet<u64>,
338    queue: VecDeque<(u64, usize)>,
339    capacity: usize,
340    weight: usize,
341}
342
343impl GhostQueue {
344    fn new(capacity: usize) -> Self {
345        Self {
346            counts: HashSet::default(),
347            queue: VecDeque::new(),
348            capacity,
349            weight: 0,
350        }
351    }
352
353    fn update(&mut self, capacity: usize) {
354        self.capacity = capacity;
355        if self.capacity == 0 {
356            return;
357        }
358        while self.weight > self.capacity && self.weight > 0 {
359            self.pop();
360        }
361    }
362
363    fn push(&mut self, hash: u64, weight: usize) {
364        if self.capacity == 0 {
365            return;
366        }
367        while self.weight + weight > self.capacity && self.weight > 0 {
368            self.pop();
369        }
370        self.queue.push_back((hash, weight));
371        self.counts.insert(hash);
372        self.weight += weight;
373    }
374
375    fn pop(&mut self) {
376        if let Some((hash, weight)) = self.queue.pop_front() {
377            self.weight -= weight;
378            self.counts.remove(&hash);
379        }
380    }
381
382    fn contains(&self, hash: u64) -> bool {
383        self.counts.contains(&hash)
384    }
385}
386
387#[cfg(test)]
388mod tests {
389    use std::ops::Range;
390
391    use itertools::Itertools;
392
393    use super::*;
394    use crate::{
395        eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt},
396        record::Data,
397    };
398
399    impl<K, V> Dump for S3Fifo<K, V>
400    where
401        K: Key + Clone,
402        V: Value + Clone,
403    {
404        type Output = Vec<Vec<Arc<Record<Self>>>>;
405
406        fn dump(&self) -> Self::Output {
407            let mut small = vec![];
408            let mut main = vec![];
409
410            let mut cursor = self.small_queue.cursor();
411            loop {
412                cursor.move_next();
413                match cursor.clone_pointer() {
414                    Some(record) => small.push(record),
415                    None => break,
416                }
417            }
418
419            let mut cursor = self.main_queue.cursor();
420            loop {
421                cursor.move_next();
422                match cursor.clone_pointer() {
423                    Some(record) => main.push(record),
424                    None => break,
425                }
426            }
427
428            vec![small, main]
429        }
430    }
431
432    type TestS3Fifo = S3Fifo<u64, u64>;
433
434    fn assert_frequencies(rs: &[Arc<Record<TestS3Fifo>>], range: Range<usize>, count: u8) {
435        rs[range]
436            .iter()
437            .for_each(|r| assert_eq!(unsafe { &*r.state().get() }.frequency(), count));
438    }
439
440    #[test]
441    fn test_s3fifo() {
442        let rs = (0..100)
443            .map(|i| {
444                Arc::new(Record::new(Data {
445                    key: i,
446                    value: i,
447                    hint: S3FifoHint,
448                    hash: i,
449                    weight: 1,
450                }))
451            })
452            .collect_vec();
453        let r = |i: usize| rs[i].clone();
454
455        // capacity: 8, small: 2, ghost: 80
456        let config = S3FifoConfig {
457            small_queue_capacity_ratio: 0.25,
458            ghost_queue_capacity_ratio: 10.0,
459            small_to_main_freq_threshold: 2,
460        };
461        let mut s3fifo = TestS3Fifo::new(8, &config);
462
463        assert_eq!(s3fifo.small_weight_capacity, 2);
464
465        s3fifo.push(r(0));
466        s3fifo.push(r(1));
467        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1)], vec![]]);
468
469        s3fifo.push(r(2));
470        s3fifo.push(r(3));
471        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1), r(2), r(3)], vec![]]);
472
473        assert_frequencies(&rs, 0..4, 0);
474
475        (0..4).for_each(|i| s3fifo.acquire_immutable(&rs[i]));
476        s3fifo.acquire_immutable(&rs[1]);
477        s3fifo.acquire_immutable(&rs[2]);
478        assert_frequencies(&rs, 0..1, 1);
479        assert_frequencies(&rs, 1..3, 2);
480        assert_frequencies(&rs, 3..4, 1);
481
482        let r0 = s3fifo.pop().unwrap();
483        let r3 = s3fifo.pop().unwrap();
484        assert_ptr_eq(&rs[0], &r0);
485        assert_ptr_eq(&rs[3], &r3);
486        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(1), r(2)]]);
487        assert_frequencies(&rs, 0..1, 0);
488        assert_frequencies(&rs, 1..3, 2);
489        assert_frequencies(&rs, 3..4, 0);
490
491        let r1 = s3fifo.pop().unwrap();
492        assert_ptr_eq(&rs[1], &r1);
493        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(2)]]);
494        assert_frequencies(&rs, 0..4, 0);
495
496        s3fifo.clear();
497        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![]]);
498    }
499}