foyer_memory/eviction/
s3fifo.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::{HashSet, VecDeque},
17    mem::offset_of,
18    sync::{
19        atomic::{AtomicU8, Ordering},
20        Arc,
21    },
22};
23
24use foyer_common::{
25    code::{Key, Value},
26    error::{Error, ErrorKind, Result},
27    properties::Properties,
28    strict_assert, strict_assert_eq,
29};
30use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
31use serde::{Deserialize, Serialize};
32
33use super::{Eviction, Op};
34use crate::record::Record;
35
36/// S3Fifo eviction algorithm config.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct S3FifoConfig {
39    /// Capacity ratio of the small S3FIFO queue.
40    pub small_queue_capacity_ratio: f64,
41    /// Capacity ratio of the ghost S3FIFO queue.
42    pub ghost_queue_capacity_ratio: f64,
43    /// Minimum access times when population entry from small queue to main queue.
44    pub small_to_main_freq_threshold: u8,
45}
46
47impl Default for S3FifoConfig {
48    fn default() -> Self {
49        Self {
50            small_queue_capacity_ratio: 0.1,
51            ghost_queue_capacity_ratio: 1.0,
52            small_to_main_freq_threshold: 1,
53        }
54    }
55}
56
57#[derive(Debug, PartialEq, Eq, Default)]
58enum Queue {
59    #[default]
60    None,
61    Main,
62    Small,
63}
64
65/// S3Fifo eviction algorithm hint.
66#[derive(Debug, Default)]
67pub struct S3FifoState {
68    link: LinkedListAtomicLink,
69    frequency: AtomicU8,
70    queue: Queue,
71}
72
73impl S3FifoState {
74    const MAX_FREQUENCY: u8 = 3;
75
76    fn frequency(&self) -> u8 {
77        self.frequency.load(Ordering::Acquire)
78    }
79
80    fn set_frequency(&self, val: u8) {
81        self.frequency.store(val, Ordering::Release)
82    }
83
84    fn inc_frequency(&self) -> u8 {
85        self.frequency
86            .fetch_update(Ordering::Release, Ordering::Acquire, |v| {
87                Some(std::cmp::min(Self::MAX_FREQUENCY, v + 1))
88            })
89            .unwrap()
90    }
91
92    fn dec_frequency(&self) -> u8 {
93        self.frequency
94            .fetch_update(Ordering::Release, Ordering::Acquire, |v| Some(v.saturating_sub(1)))
95            .unwrap()
96    }
97}
98
99intrusive_adapter! { Adapter<K, V, P> = Arc<Record<S3Fifo<K, V, P>>>: Record<S3Fifo<K, V, P>> { ?offset = Record::<S3Fifo<K, V, P>>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value, P: Properties }
100
101pub struct S3Fifo<K, V, P>
102where
103    K: Key,
104    V: Value,
105    P: Properties,
106{
107    ghost_queue: GhostQueue,
108    small_queue: LinkedList<Adapter<K, V, P>>,
109    main_queue: LinkedList<Adapter<K, V, P>>,
110
111    small_weight_capacity: usize,
112
113    small_weight: usize,
114    main_weight: usize,
115
116    small_to_main_freq_threshold: u8,
117
118    config: S3FifoConfig,
119}
120
121impl<K, V, P> S3Fifo<K, V, P>
122where
123    K: Key,
124    V: Value,
125    P: Properties,
126{
127    fn evict(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
128        // TODO(MrCroxx): Use `let_chains` here after it is stable.
129        if self.small_weight > self.small_weight_capacity {
130            if let Some(record) = self.evict_small() {
131                return Some(record);
132            }
133        }
134        if let Some(record) = self.evict_main() {
135            return Some(record);
136        }
137        self.evict_small_force()
138    }
139
140    #[expect(clippy::never_loop)]
141    fn evict_small_force(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
142        while let Some(record) = self.small_queue.pop_front() {
143            let state = unsafe { &mut *record.state().get() };
144            state.queue = Queue::None;
145            state.set_frequency(0);
146            self.small_weight -= record.weight();
147            return Some(record);
148        }
149        None
150    }
151
152    fn evict_small(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
153        while let Some(record) = self.small_queue.pop_front() {
154            let state = unsafe { &mut *record.state().get() };
155            if state.frequency() >= self.small_to_main_freq_threshold {
156                state.queue = Queue::Main;
157                self.small_weight -= record.weight();
158                self.main_weight += record.weight();
159                self.main_queue.push_back(record);
160            } else {
161                state.queue = Queue::None;
162                state.set_frequency(0);
163                self.small_weight -= record.weight();
164
165                self.ghost_queue.push(record.hash(), record.weight());
166
167                return Some(record);
168            }
169        }
170        None
171    }
172
173    fn evict_main(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
174        while let Some(record) = self.main_queue.pop_front() {
175            let state = unsafe { &mut *record.state().get() };
176            if state.dec_frequency() > 0 {
177                self.main_queue.push_back(record);
178            } else {
179                state.queue = Queue::None;
180                self.main_weight -= record.weight();
181                return Some(record);
182            }
183        }
184        None
185    }
186}
187
188impl<K, V, P> Eviction for S3Fifo<K, V, P>
189where
190    K: Key,
191    V: Value,
192    P: Properties,
193{
194    type Config = S3FifoConfig;
195    type Key = K;
196    type Value = V;
197    type Properties = P;
198    type State = S3FifoState;
199
200    fn new(capacity: usize, config: &Self::Config) -> Self
201    where
202        Self: Sized,
203    {
204        assert!(
205            config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0,
206            "small_queue_capacity_ratio must be in (0, 1), given: {}",
207            config.small_queue_capacity_ratio
208        );
209
210        let config = config.clone();
211
212        let ghost_queue_capacity = (capacity as f64 * config.ghost_queue_capacity_ratio) as usize;
213        let ghost_queue = GhostQueue::new(ghost_queue_capacity);
214        let small_weight_capacity = (capacity as f64 * config.small_queue_capacity_ratio) as usize;
215
216        Self {
217            ghost_queue,
218            small_queue: LinkedList::new(Adapter::new()),
219            main_queue: LinkedList::new(Adapter::new()),
220            small_weight_capacity,
221            small_weight: 0,
222            main_weight: 0,
223            small_to_main_freq_threshold: config.small_to_main_freq_threshold.min(S3FifoState::MAX_FREQUENCY),
224            config,
225        }
226    }
227
228    fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
229        if let Some(config) = config {
230            if config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0 {
231                return Err(
232                    Error::new(ErrorKind::Config, "update S3-FIFO config failed").with_context(
233                        "reason",
234                        format!(
235                            "small_queue_capacity_ratio must be in (0, 1), given: {}",
236                            config.small_queue_capacity_ratio
237                        ),
238                    ),
239                );
240            }
241            self.config = config.clone();
242        }
243
244        let ghost_queue_capacity = (capacity as f64 * self.config.ghost_queue_capacity_ratio) as usize;
245        let small_weight_capacity = (capacity as f64 * self.config.small_queue_capacity_ratio) as usize;
246        self.ghost_queue.update(ghost_queue_capacity);
247        self.small_weight_capacity = small_weight_capacity;
248
249        Ok(())
250    }
251
252    fn push(&mut self, record: Arc<Record<Self>>) {
253        let state = unsafe { &mut *record.state().get() };
254
255        strict_assert_eq!(state.frequency(), 0);
256        strict_assert_eq!(state.queue, Queue::None);
257
258        record.set_in_eviction(true);
259
260        if self.ghost_queue.contains(record.hash()) {
261            state.queue = Queue::Main;
262            self.main_weight += record.weight();
263            self.main_queue.push_back(record);
264        } else {
265            state.queue = Queue::Small;
266            self.small_weight += record.weight();
267            self.small_queue.push_back(record);
268        }
269    }
270
271    fn pop(&mut self) -> Option<Arc<Record<Self>>> {
272        if let Some(record) = self.evict() {
273            // `handle.queue` has already been set with `evict()`
274            record.set_in_eviction(false);
275            Some(record)
276        } else {
277            strict_assert!(self.small_queue.is_empty());
278            strict_assert!(self.main_queue.is_empty());
279            None
280        }
281    }
282
283    fn remove(&mut self, record: &Arc<Record<Self>>) {
284        let state = unsafe { &mut *record.state().get() };
285
286        match state.queue {
287            Queue::None => unreachable!(),
288            Queue::Main => {
289                unsafe { self.main_queue.remove_from_ptr(Arc::as_ptr(record)) };
290
291                state.queue = Queue::None;
292                state.set_frequency(0);
293                record.set_in_eviction(false);
294
295                self.main_weight -= record.weight();
296            }
297            Queue::Small => {
298                unsafe { self.small_queue.remove_from_ptr(Arc::as_ptr(record)) };
299
300                state.queue = Queue::None;
301                state.set_frequency(0);
302                record.set_in_eviction(false);
303
304                self.small_weight -= record.weight();
305            }
306        }
307    }
308
309    fn acquire() -> Op<Self> {
310        Op::immutable(|_: &Self, record| {
311            let state = unsafe { &mut *record.state().get() };
312            state.inc_frequency();
313        })
314    }
315
316    fn release() -> Op<Self> {
317        Op::noop()
318    }
319}
320
321// TODO(MrCroxx): use ordered hash map?
322struct GhostQueue {
323    counts: HashSet<u64>,
324    queue: VecDeque<(u64, usize)>,
325    capacity: usize,
326    weight: usize,
327}
328
329impl GhostQueue {
330    fn new(capacity: usize) -> Self {
331        Self {
332            counts: HashSet::default(),
333            queue: VecDeque::new(),
334            capacity,
335            weight: 0,
336        }
337    }
338
339    fn update(&mut self, capacity: usize) {
340        self.capacity = capacity;
341        if self.capacity == 0 {
342            return;
343        }
344        while self.weight > self.capacity && self.weight > 0 {
345            self.pop();
346        }
347    }
348
349    fn push(&mut self, hash: u64, weight: usize) {
350        if self.capacity == 0 {
351            return;
352        }
353        while self.weight + weight > self.capacity && self.weight > 0 {
354            self.pop();
355        }
356        self.queue.push_back((hash, weight));
357        self.counts.insert(hash);
358        self.weight += weight;
359    }
360
361    fn pop(&mut self) {
362        if let Some((hash, weight)) = self.queue.pop_front() {
363            self.weight -= weight;
364            self.counts.remove(&hash);
365        }
366    }
367
368    fn contains(&self, hash: u64) -> bool {
369        self.counts.contains(&hash)
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use std::ops::Range;
376
377    use itertools::Itertools;
378
379    use super::*;
380    use crate::{
381        eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt, TestProperties},
382        record::Data,
383    };
384
385    impl<K, V> Dump for S3Fifo<K, V, TestProperties>
386    where
387        K: Key + Clone,
388        V: Value + Clone,
389    {
390        type Output = Vec<Vec<Arc<Record<Self>>>>;
391
392        fn dump(&self) -> Self::Output {
393            let mut small = vec![];
394            let mut main = vec![];
395
396            let mut cursor = self.small_queue.cursor();
397            loop {
398                cursor.move_next();
399                match cursor.clone_pointer() {
400                    Some(record) => small.push(record),
401                    None => break,
402                }
403            }
404
405            let mut cursor = self.main_queue.cursor();
406            loop {
407                cursor.move_next();
408                match cursor.clone_pointer() {
409                    Some(record) => main.push(record),
410                    None => break,
411                }
412            }
413
414            vec![small, main]
415        }
416    }
417
418    type TestS3Fifo = S3Fifo<u64, u64, TestProperties>;
419
420    fn assert_frequencies(rs: &[Arc<Record<TestS3Fifo>>], range: Range<usize>, count: u8) {
421        rs[range]
422            .iter()
423            .for_each(|r| assert_eq!(unsafe { &*r.state().get() }.frequency(), count));
424    }
425
426    #[test]
427    fn test_s3fifo() {
428        let rs = (0..100)
429            .map(|i| {
430                Arc::new(Record::new(Data {
431                    key: i,
432                    value: i,
433                    properties: TestProperties::default(),
434                    hash: i,
435                    weight: 1,
436                }))
437            })
438            .collect_vec();
439        let r = |i: usize| rs[i].clone();
440
441        // capacity: 8, small: 2, ghost: 80
442        let config = S3FifoConfig {
443            small_queue_capacity_ratio: 0.25,
444            ghost_queue_capacity_ratio: 10.0,
445            small_to_main_freq_threshold: 2,
446        };
447        let mut s3fifo = TestS3Fifo::new(8, &config);
448
449        assert_eq!(s3fifo.small_weight_capacity, 2);
450
451        s3fifo.push(r(0));
452        s3fifo.push(r(1));
453        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1)], vec![]]);
454
455        s3fifo.push(r(2));
456        s3fifo.push(r(3));
457        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1), r(2), r(3)], vec![]]);
458
459        assert_frequencies(&rs, 0..4, 0);
460
461        (0..4).for_each(|i| s3fifo.acquire_immutable(&rs[i]));
462        s3fifo.acquire_immutable(&rs[1]);
463        s3fifo.acquire_immutable(&rs[2]);
464        assert_frequencies(&rs, 0..1, 1);
465        assert_frequencies(&rs, 1..3, 2);
466        assert_frequencies(&rs, 3..4, 1);
467
468        let r0 = s3fifo.pop().unwrap();
469        let r3 = s3fifo.pop().unwrap();
470        assert_ptr_eq(&rs[0], &r0);
471        assert_ptr_eq(&rs[3], &r3);
472        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(1), r(2)]]);
473        assert_frequencies(&rs, 0..1, 0);
474        assert_frequencies(&rs, 1..3, 2);
475        assert_frequencies(&rs, 3..4, 0);
476
477        let r1 = s3fifo.pop().unwrap();
478        assert_ptr_eq(&rs[1], &r1);
479        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(2)]]);
480        assert_frequencies(&rs, 0..4, 0);
481
482        s3fifo.clear();
483        assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![]]);
484    }
485}