foyer_memory/eviction/
s3fifo.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashSet, VecDeque},
17    mem::offset_of,
18    sync::{
19        atomic::{AtomicU8, Ordering},
20        Arc,
21    },
22};
23
24use foyer_common::{
25    code::{Key, Value},
26    properties::Properties,
27    strict_assert, strict_assert_eq,
28};
29use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
30use serde::{Deserialize, Serialize};
31
32use super::{Eviction, Op};
33use crate::{
34    error::{Error, Result},
35    record::Record,
36};
37
38/// S3Fifo eviction algorithm config.
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct S3FifoConfig {
41    /// Capacity ratio of the small S3FIFO queue.
42    pub small_queue_capacity_ratio: f64,
43    /// Capacity ratio of the ghost S3FIFO queue.
44    pub ghost_queue_capacity_ratio: f64,
45    /// Minimum access times when population entry from small queue to main queue.
46    pub small_to_main_freq_threshold: u8,
47}
48
49impl Default for S3FifoConfig {
50    fn default() -> Self {
51        Self {
52            small_queue_capacity_ratio: 0.1,
53            ghost_queue_capacity_ratio: 1.0,
54            small_to_main_freq_threshold: 1,
55        }
56    }
57}
58
59#[derive(Debug, PartialEq, Eq)]
60enum Queue {
61    None,
62    Main,
63    Small,
64}
65
66impl Default for Queue {
67    fn default() -> Self {
68        Self::None
69    }
70}
71
72/// S3Fifo eviction algorithm hint.
73#[derive(Debug, Default)]
74pub struct S3FifoState {
75    link: LinkedListAtomicLink,
76    frequency: AtomicU8,
77    queue: Queue,
78}
79
80impl S3FifoState {
81    const MAX_FREQUENCY: u8 = 3;
82
83    fn frequency(&self) -> u8 {
84        self.frequency.load(Ordering::Acquire)
85    }
86
87    fn set_frequency(&self, val: u8) {
88        self.frequency.store(val, Ordering::Release)
89    }
90
91    fn inc_frequency(&self) -> u8 {
92        self.frequency
93            .fetch_update(Ordering::Release, Ordering::Acquire, |v| {
94                Some(std::cmp::min(Self::MAX_FREQUENCY, v + 1))
95            })
96            .unwrap()
97    }
98
99    fn dec_frequency(&self) -> u8 {
100        self.frequency
101            .fetch_update(Ordering::Release, Ordering::Acquire, |v| Some(v.saturating_sub(1)))
102            .unwrap()
103    }
104}
105
106intrusive_adapter! { Adapter<K, V, P> = Arc<Record<S3Fifo<K, V, P>>>: Record<S3Fifo<K, V, P>> { ?offset = Record::<S3Fifo<K, V, P>>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value, P: Properties }
107
108pub struct S3Fifo<K, V, P>
109where
110    K: Key,
111    V: Value,
112    P: Properties,
113{
114    ghost_queue: GhostQueue,
115    small_queue: LinkedList<Adapter<K, V, P>>,
116    main_queue: LinkedList<Adapter<K, V, P>>,
117
118    small_weight_capacity: usize,
119
120    small_weight: usize,
121    main_weight: usize,
122
123    small_to_main_freq_threshold: u8,
124
125    config: S3FifoConfig,
126}
127
128impl<K, V, P> S3Fifo<K, V, P>
129where
130    K: Key,
131    V: Value,
132    P: Properties,
133{
134    fn evict(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
135        // TODO(MrCroxx): Use `let_chains` here after it is stable.
136        if self.small_weight > self.small_weight_capacity {
137            if let Some(record) = self.evict_small() {
138                return Some(record);
139            }
140        }
141        if let Some(record) = self.evict_main() {
142            return Some(record);
143        }
144        self.evict_small_force()
145    }
146
147    #[expect(clippy::never_loop)]
148    fn evict_small_force(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
149        while let Some(record) = self.small_queue.pop_front() {
150            let state = unsafe { &mut *record.state().get() };
151            state.queue = Queue::None;
152            state.set_frequency(0);
153            self.small_weight -= record.weight();
154            return Some(record);
155        }
156        None
157    }
158
159    fn evict_small(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
160        while let Some(record) = self.small_queue.pop_front() {
161            let state = unsafe { &mut *record.state().get() };
162            if state.frequency() >= self.small_to_main_freq_threshold {
163                state.queue = Queue::Main;
164                self.small_weight -= record.weight();
165                self.main_weight += record.weight();
166                self.main_queue.push_back(record);
167            } else {
168                state.queue = Queue::None;
169                state.set_frequency(0);
170                self.small_weight -= record.weight();
171
172                self.ghost_queue.push(record.hash(), record.weight());
173
174                return Some(record);
175            }
176        }
177        None
178    }
179
180    fn evict_main(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
181        while let Some(record) = self.main_queue.pop_front() {
182            let state = unsafe { &mut *record.state().get() };
183            if state.dec_frequency() > 0 {
184                self.main_queue.push_back(record);
185            } else {
186                state.queue = Queue::None;
187                self.main_weight -= record.weight();
188                return Some(record);
189            }
190        }
191        None
192    }
193}
194
195impl<K, V, P> Eviction for S3Fifo<K, V, P>
196where
197    K: Key,
198    V: Value,
199    P: Properties,
200{
201    type Config = S3FifoConfig;
202    type Key = K;
203    type Value = V;
204    type Properties = P;
205    type State = S3FifoState;
206
207    fn new(capacity: usize, config: &Self::Config) -> Self
208    where
209        Self: Sized,
210    {
211        assert!(
212            config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0,
213            "small_queue_capacity_ratio must be in (0, 1), given: {}",
214            config.small_queue_capacity_ratio
215        );
216
217        let config = config.clone();
218
219        let ghost_queue_capacity = (capacity as f64 * config.ghost_queue_capacity_ratio) as usize;
220        let ghost_queue = GhostQueue::new(ghost_queue_capacity);
221        let small_weight_capacity = (capacity as f64 * config.small_queue_capacity_ratio) as usize;
222
223        Self {
224            ghost_queue,
225            small_queue: LinkedList::new(Adapter::new()),
226            main_queue: LinkedList::new(Adapter::new()),
227            small_weight_capacity,
228            small_weight: 0,
229            main_weight: 0,
230            small_to_main_freq_threshold: config.small_to_main_freq_threshold.min(S3FifoState::MAX_FREQUENCY),
231            config,
232        }
233    }
234
235    fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
236        if let Some(config) = config {
237            if config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0 {
238                return Err(Error::ConfigError(format!(
239                    "small_queue_capacity_ratio must be in (0, 1), given: {}",
240                    config.small_queue_capacity_ratio
241                )));
242            }
243            self.config = config.clone();
244        }
245
246        let ghost_queue_capacity = (capacity as f64 * self.config.ghost_queue_capacity_ratio) as usize;
247        let small_weight_capacity = (capacity as f64 * self.config.small_queue_capacity_ratio) as usize;
248        self.ghost_queue.update(ghost_queue_capacity);
249        self.small_weight_capacity = small_weight_capacity;
250
251        Ok(())
252    }
253
254    fn push(&mut self, record: Arc<Record<Self>>) {
255        let state = unsafe { &mut *record.state().get() };
256
257        strict_assert_eq!(state.frequency(), 0);
258        strict_assert_eq!(state.queue, Queue::None);
259
260        record.set_in_eviction(true);
261
262        if self.ghost_queue.contains(record.hash()) {
263            state.queue = Queue::Main;
264            self.main_weight += record.weight();
265            self.main_queue.push_back(record);
266        } else {
267            state.queue = Queue::Small;
268            self.small_weight += record.weight();
269            self.small_queue.push_back(record);
270        }
271    }
272
273    fn pop(&mut self) -> Option<Arc<Record<Self>>> {
274        if let Some(record) = self.evict() {
275            // `handle.queue` has already been set with `evict()`
276            record.set_in_eviction(false);
277            Some(record)
278        } else {
279            strict_assert!(self.small_queue.is_empty());
280            strict_assert!(self.main_queue.is_empty());
281            None
282        }
283    }
284
285    fn remove(&mut self, record: &Arc<Record<Self>>) {
286        let state = unsafe { &mut *record.state().get() };
287
288        match state.queue {
289            Queue::None => unreachable!(),
290            Queue::Main => {
291                unsafe { self.main_queue.remove_from_ptr(Arc::as_ptr(record)) };
292
293                state.queue = Queue::None;
294                state.set_frequency(0);
295                record.set_in_eviction(false);
296
297                self.main_weight -= record.weight();
298            }
299            Queue::Small => {
300                unsafe { self.small_queue.remove_from_ptr(Arc::as_ptr(record)) };
301
302                state.queue = Queue::None;
303                state.set_frequency(0);
304                record.set_in_eviction(false);
305
306                self.small_weight -= record.weight();
307            }
308        }
309    }
310
311    fn acquire() -> Op<Self> {
312        Op::immutable(|_: &Self, record| {
313            let state = unsafe { &mut *record.state().get() };
314            state.inc_frequency();
315        })
316    }
317
318    fn release() -> Op<Self> {
319        Op::noop()
320    }
321}
322
323// TODO(MrCroxx): use ordered hash map?
324struct GhostQueue {
325    counts: HashSet<u64>,
326    queue: VecDeque<(u64, usize)>,
327    capacity: usize,
328    weight: usize,
329}
330
331impl GhostQueue {
332    fn new(capacity: usize) -> Self {
333        Self {
334            counts: HashSet::default(),
335            queue: VecDeque::new(),
336            capacity,
337            weight: 0,
338        }
339    }
340
341    fn update(&mut self, capacity: usize) {
342        self.capacity = capacity;
343        if self.capacity == 0 {
344            return;
345        }
346        while self.weight > self.capacity && self.weight > 0 {
347            self.pop();
348        }
349    }
350
351    fn push(&mut self, hash: u64, weight: usize) {
352        if self.capacity == 0 {
353            return;
354        }
355        while self.weight + weight > self.capacity && self.weight > 0 {
356            self.pop();
357        }
358        self.queue.push_back((hash, weight));
359        self.counts.insert(hash);
360        self.weight += weight;
361    }
362
363    fn pop(&mut self) {
364        if let Some((hash, weight)) = self.queue.pop_front() {
365            self.weight -= weight;
366            self.counts.remove(&hash);
367        }
368    }
369
370    fn contains(&self, hash: u64) -> bool {
371        self.counts.contains(&hash)
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use std::ops::Range;
378
379    use itertools::Itertools;
380
381    use super::*;
382    use crate::{
383        eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt, TestProperties},
384        record::Data,
385    };
386
387    impl<K, V> Dump for S3Fifo<K, V, TestProperties>
388    where
389        K: Key + Clone,
390        V: Value + Clone,
391    {
392        type Output = Vec<Vec<Arc<Record<Self>>>>;
393
394        fn dump(&self) -> Self::Output {
395            let mut small = vec![];
396            let mut main = vec![];
397
398            let mut cursor = self.small_queue.cursor();
399            loop {
400                cursor.move_next();
401                match cursor.clone_pointer() {
402                    Some(record) => small.push(record),
403                    None => break,
404                }
405            }
406
407            let mut cursor = self.main_queue.cursor();
408            loop {
409                cursor.move_next();
410                match cursor.clone_pointer() {
411                    Some(record) => main.push(record),
412                    None => break,
413                }
414            }
415
416            vec![small, main]
417        }
418    }
419
420    type TestS3Fifo = S3Fifo<u64, u64, TestProperties>;
421
422    fn assert_frequencies(rs: &[Arc<Record<TestS3Fifo>>], range: Range<usize>, count: u8) {
423        rs[range]
424            .iter()
425            .for_each(|r| assert_eq!(unsafe { &*r.state().get() }.frequency(), count));
426    }
427
428    #[test]
429    fn test_s3fifo() {
430        let rs = (0..100)
431            .map(|i| {
432                Arc::new(Record::new(Data {
433                    key: i,
434                    value: i,
435                    properties: TestProperties::default(),
436                    hash: i,
437                    weight: 1,
438                }))
439            })
440            .collect_vec();
441        let r = |i: usize| rs[i].clone();
442
443        // capacity: 8, small: 2, ghost: 80
444        let config = S3FifoConfig {
445            small_queue_capacity_ratio: 0.25,
446            ghost_queue_capacity_ratio: 10.0,
447            small_to_main_freq_threshold: 2,
448        };
449        let mut s3fifo = TestS3Fifo::new(8, &config);
450
451        assert_eq!(s3fifo.small_weight_capacity, 2);
452
453        s3fifo.push(r(0));
454        s3fifo.push(r(1));
455        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1)], vec![]]);
456
457        s3fifo.push(r(2));
458        s3fifo.push(r(3));
459        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1), r(2), r(3)], vec![]]);
460
461        assert_frequencies(&rs, 0..4, 0);
462
463        (0..4).for_each(|i| s3fifo.acquire_immutable(&rs[i]));
464        s3fifo.acquire_immutable(&rs[1]);
465        s3fifo.acquire_immutable(&rs[2]);
466        assert_frequencies(&rs, 0..1, 1);
467        assert_frequencies(&rs, 1..3, 2);
468        assert_frequencies(&rs, 3..4, 1);
469
470        let r0 = s3fifo.pop().unwrap();
471        let r3 = s3fifo.pop().unwrap();
472        assert_ptr_eq(&rs[0], &r0);
473        assert_ptr_eq(&rs[3], &r3);
474        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(1), r(2)]]);
475        assert_frequencies(&rs, 0..1, 0);
476        assert_frequencies(&rs, 1..3, 2);
477        assert_frequencies(&rs, 3..4, 0);
478
479        let r1 = s3fifo.pop().unwrap();
480        assert_ptr_eq(&rs[1], &r1);
481        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(2)]]);
482        assert_frequencies(&rs, 0..4, 0);
483
484        s3fifo.clear();
485        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![]]);
486    }
487}