foyer_storage/picker/
utils.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    collections::VecDeque,
17    fmt::Debug,
18    num::NonZeroUsize,
19    ops::Range,
20    sync::{
21        atomic::{AtomicUsize, Ordering},
22        Arc,
23    },
24    time::Duration,
25};
26
27use foyer_common::strict_assert;
28use itertools::Itertools;
29
30use super::{AdmissionPicker, EvictionInfo, EvictionPicker, Pick, ReinsertionPicker};
31use crate::{device::RegionId, io::throttle::IoThrottler, statistics::Statistics};
32
33/// Only admit on all chained admission pickers pick.
34#[derive(Debug, Default, Clone)]
35pub struct ChainedAdmissionPicker {
36    pickers: Arc<Vec<Arc<dyn AdmissionPicker>>>,
37}
38
39impl AdmissionPicker for ChainedAdmissionPicker {
40    fn pick(&self, stats: &Arc<Statistics>, hash: u64) -> Pick {
41        let mut duration = Duration::ZERO;
42        for picker in self.pickers.iter() {
43            match picker.pick(stats, hash) {
44                Pick::Admit => {}
45                Pick::Reject => return Pick::Reject,
46                Pick::Throttled(dur) => duration += dur,
47            }
48        }
49        if duration.is_zero() {
50            Pick::Admit
51        } else {
52            Pick::Throttled(duration)
53        }
54    }
55}
56
57/// A builder for [`ChainedAdmissionPicker`].
58#[derive(Debug, Default)]
59pub struct ChainedAdmissionPickerBuilder {
60    pickers: Vec<Arc<dyn AdmissionPicker>>,
61}
62
63impl ChainedAdmissionPickerBuilder {
64    /// Chain a new admission picker.
65    pub fn chain(mut self, picker: Arc<dyn AdmissionPicker>) -> Self {
66        self.pickers.push(picker);
67        self
68    }
69
70    /// Build the chained admission picker.
71    pub fn build(self) -> ChainedAdmissionPicker {
72        ChainedAdmissionPicker {
73            pickers: Arc::new(self.pickers),
74        }
75    }
76}
77
78/// A picker that always returns `true`.
79#[derive(Debug, Default)]
80pub struct AdmitAllPicker;
81
82impl AdmissionPicker for AdmitAllPicker {
83    fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
84        Pick::Admit
85    }
86}
87
88impl ReinsertionPicker for AdmitAllPicker {
89    fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
90        Pick::Admit
91    }
92}
93
94/// A picker that always returns `false`.
95#[derive(Debug, Default)]
96pub struct RejectAllPicker;
97
98impl AdmissionPicker for RejectAllPicker {
99    fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
100        Pick::Reject
101    }
102}
103
104impl ReinsertionPicker for RejectAllPicker {
105    fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
106        Pick::Reject
107    }
108}
109
110#[derive(Debug)]
111struct IoThrottlerPickerInner {
112    throttler: IoThrottler,
113    bytes_last: AtomicUsize,
114    ios_last: AtomicUsize,
115    target: IoThrottlerTarget,
116}
117
118/// Target of the io throttler.
119#[derive(Debug, Clone, PartialEq, Eq)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub enum IoThrottlerTarget {
122    /// Count read io for io throttling.
123    Read,
124    /// Count write io for io throttling.
125    Write,
126    /// Count read and write io for io throttling.
127    ReadWrite,
128}
129
130/// A picker that picks based on the disk statistics and the given throttle args.
131///
132/// NOTE: This picker is automatically applied if the device throttle is set.
133///
134/// Please use set the `throttle` in the device options instead use this picker directly.
135/// Unless you know what you are doing. :D
136#[derive(Debug, Clone)]
137pub struct IoThrottlerPicker {
138    inner: Arc<IoThrottlerPickerInner>,
139}
140
141impl IoThrottlerPicker {
142    /// Create a rate limit picker with the given rate limit.
143    ///
144    /// Note: `None` stands for unlimited.
145    pub fn new(target: IoThrottlerTarget, throughput: Option<NonZeroUsize>, iops: Option<NonZeroUsize>) -> Self {
146        let inner = IoThrottlerPickerInner {
147            throttler: IoThrottler::new(throughput, iops),
148            bytes_last: AtomicUsize::default(),
149            ios_last: AtomicUsize::default(),
150            target,
151        };
152        Self { inner: Arc::new(inner) }
153    }
154
155    fn pick_inner(&self, stats: &Arc<Statistics>) -> Pick {
156        let duration = self.inner.throttler.probe();
157
158        let bytes_current = match self.inner.target {
159            IoThrottlerTarget::Read => stats.disk_read_bytes(),
160            IoThrottlerTarget::Write => stats.disk_write_bytes(),
161            IoThrottlerTarget::ReadWrite => stats.disk_read_bytes() + stats.disk_write_bytes(),
162        };
163        let ios_current = match self.inner.target {
164            IoThrottlerTarget::Read => stats.disk_read_ios(),
165            IoThrottlerTarget::Write => stats.disk_write_ios(),
166            IoThrottlerTarget::ReadWrite => stats.disk_read_ios() + stats.disk_write_ios(),
167        };
168
169        let bytes_last = self.inner.bytes_last.load(Ordering::Relaxed);
170        let ios_last = self.inner.ios_last.load(Ordering::Relaxed);
171
172        let bytes_delta = bytes_current.saturating_sub(bytes_last);
173        let ios_delta = ios_current.saturating_sub(ios_last);
174
175        self.inner.bytes_last.store(bytes_current, Ordering::Relaxed);
176        self.inner.ios_last.store(ios_current, Ordering::Relaxed);
177
178        self.inner.throttler.reduce(bytes_delta as f64, ios_delta as f64);
179
180        if duration.is_zero() {
181            Pick::Admit
182        } else {
183            Pick::Throttled(duration)
184        }
185    }
186}
187
188impl AdmissionPicker for IoThrottlerPicker {
189    fn pick(&self, stats: &Arc<Statistics>, _: u64) -> Pick {
190        self.pick_inner(stats)
191    }
192}
193
194/// A picker that pick region to eviction with a FIFO behavior.
195#[derive(Debug)]
196pub struct FifoPicker {
197    queue: VecDeque<RegionId>,
198    regions: usize,
199    probations: usize,
200    probation_ratio: f64,
201}
202
203impl Default for FifoPicker {
204    fn default() -> Self {
205        Self::new(0.1)
206    }
207}
208
209impl FifoPicker {
210    /// Create a new [`FifoPicker`] with the given `probation_ratio` (0.0 ~ 1.0).
211    ///
212    /// The `probation_ratio` is the ratio of regions that will be marked as probation.
213    /// The regions that are marked as probation will be evicted first.
214    ///
215    /// The default value is 0.1.
216    pub fn new(probation_ratio: f64) -> Self {
217        assert!(
218            (0.0..=1.0).contains(&probation_ratio),
219            "probation ratio {} must be in [0.0, 1.0]",
220            probation_ratio
221        );
222        Self {
223            queue: VecDeque::new(),
224            regions: 0,
225            probations: 0,
226            probation_ratio,
227        }
228    }
229}
230
231impl FifoPicker {
232    fn mark_probation(&self, info: EvictionInfo<'_>) {
233        let marks = self.probations.saturating_sub(info.clean);
234        self.queue.iter().take(marks).for_each(|rid| {
235            if info.evictable.contains(rid) {
236                info.regions[*rid as usize]
237                    .statistics()
238                    .probation
239                    .store(true, Ordering::Relaxed);
240                tracing::trace!(rid, "[fifo picker]: mark  probation");
241            }
242        });
243    }
244}
245
246impl EvictionPicker for FifoPicker {
247    fn init(&mut self, regions: Range<RegionId>, _: usize) {
248        self.regions = regions.len();
249        let probations = (self.regions as f64 * self.probation_ratio).floor() as usize;
250        self.probations = probations.clamp(0, self.regions);
251    }
252
253    fn pick(&mut self, info: EvictionInfo<'_>) -> Option<RegionId> {
254        tracing::trace!(evictable = ?info.evictable, clean = info.clean, "[fifo picker]: pick");
255        self.mark_probation(info);
256        let res = self.queue.front().copied();
257        tracing::trace!("[fifo picker]: pick {res:?}");
258        res
259    }
260
261    fn on_region_evictable(&mut self, _: EvictionInfo<'_>, region: RegionId) {
262        tracing::trace!("[fifo picker]: {region} is evictable");
263        self.queue.push_back(region);
264    }
265
266    fn on_region_evict(&mut self, _: EvictionInfo<'_>, region: RegionId) {
267        tracing::trace!("[fifo picker]: {region} is evicted");
268        let index = self.queue.iter().position(|r| r == &region).unwrap();
269        self.queue.remove(index);
270    }
271}
272
273/// Evict the region with the largest invalid data ratio.
274///
275/// If the largest invalid data ratio is less than the threshold, no region will be picked.
276#[derive(Debug)]
277pub struct InvalidRatioPicker {
278    threshold: f64,
279    region_size: usize,
280}
281
282impl InvalidRatioPicker {
283    /// Create [`InvalidRatioPicker`] with the given `threshold` (0.0 ~ 1.0).
284    pub fn new(threshold: f64) -> Self {
285        let ratio = threshold.clamp(0.0, 1.0);
286        Self {
287            threshold: ratio,
288            region_size: 0,
289        }
290    }
291}
292
293impl EvictionPicker for InvalidRatioPicker {
294    fn init(&mut self, _: Range<RegionId>, region_size: usize) {
295        self.region_size = region_size;
296    }
297
298    fn pick(&mut self, info: EvictionInfo<'_>) -> Option<RegionId> {
299        strict_assert!(self.region_size > 0);
300
301        let mut data = info
302            .evictable
303            .iter()
304            .map(|rid| {
305                (
306                    *rid,
307                    info.regions[*rid as usize].statistics().invalid.load(Ordering::Relaxed),
308                )
309            })
310            .collect_vec();
311        data.sort_by_key(|(_, invalid)| *invalid);
312
313        let (rid, invalid) = data.last().copied()?;
314        if (invalid as f64 / self.region_size as f64) < self.threshold {
315            return None;
316        }
317        tracing::trace!("[invalid ratio picker]: pick {rid:?}");
318        Some(rid)
319    }
320
321    fn on_region_evictable(&mut self, _: EvictionInfo<'_>, region: RegionId) {
322        tracing::trace!("[invalid ratio picker]: {region} is evictable");
323    }
324
325    fn on_region_evict(&mut self, _: EvictionInfo<'_>, region: RegionId) {
326        tracing::trace!("[invalid ratio picker]: {region} is evicted");
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use std::collections::HashSet;
333
334    use super::*;
335    use crate::{device::test_utils::NoopDevice, Region};
336
337    #[test_log::test]
338    fn test_fifo_picker() {
339        let mut picker = FifoPicker::new(0.1);
340
341        let regions = (0..10)
342            .map(|rid| Region::new_for_test(rid, NoopDevice::monitored()))
343            .collect_vec();
344        let mut evictable = HashSet::new();
345
346        fn info<'a>(regions: &'a [Region], evictable: &'a HashSet<RegionId>, dirty: usize) -> EvictionInfo<'a> {
347            EvictionInfo {
348                regions,
349                evictable,
350                clean: regions.len() - evictable.len() - dirty,
351            }
352        }
353
354        fn check(regions: &[Region], probations: impl IntoIterator<Item = RegionId>) {
355            let probations = probations.into_iter().collect::<HashSet<_>>();
356            for rid in 0..regions.len() as RegionId {
357                if probations.contains(&rid) {
358                    assert!(
359                        regions[rid as usize].statistics().probation.load(Ordering::Relaxed),
360                        "probations {probations:?}, assert {rid} is probation failed"
361                    );
362                } else {
363                    assert!(
364                        !regions[rid as usize].statistics().probation.load(Ordering::Relaxed),
365                        "probations {probations:?}, assert {rid} is not probation failed"
366                    );
367                }
368            }
369        }
370
371        picker.init(0..10, 0);
372
373        // mark 0..10 evictable in order
374        (0..10).for_each(|i| {
375            evictable.insert(i as _);
376            picker.on_region_evictable(info(&regions, &evictable, 0), i);
377        });
378
379        // evict 0, mark 0 probation.
380        assert_eq!(picker.pick(info(&regions, &evictable, 0)), Some(0));
381        check(&regions, [0]);
382        evictable.remove(&0);
383        picker.on_region_evict(info(&regions, &evictable, 1), 0);
384
385        // evict 1, with 1 dirty region, mark 1 probation.
386        assert_eq!(picker.pick(info(&regions, &evictable, 1)), Some(1));
387        check(&regions, [0, 1]);
388        evictable.remove(&1);
389        picker.on_region_evict(info(&regions, &evictable, 2), 1);
390
391        // evict 2 with 1 dirty region, mark no region probation.
392        assert_eq!(picker.pick(info(&regions, &evictable, 1)), Some(2));
393        check(&regions, [0, 1]);
394        evictable.remove(&2);
395        picker.on_region_evict(info(&regions, &evictable, 2), 2);
396
397        picker.on_region_evict(info(&regions, &evictable, 3), 3);
398        evictable.remove(&3);
399        picker.on_region_evict(info(&regions, &evictable, 4), 5);
400        evictable.remove(&5);
401        picker.on_region_evict(info(&regions, &evictable, 5), 7);
402        evictable.remove(&7);
403        picker.on_region_evict(info(&regions, &evictable, 6), 9);
404        evictable.remove(&9);
405
406        picker.on_region_evict(info(&regions, &evictable, 7), 4);
407        evictable.remove(&4);
408        picker.on_region_evict(info(&regions, &evictable, 8), 6);
409        evictable.remove(&6);
410        picker.on_region_evict(info(&regions, &evictable, 9), 8);
411        evictable.remove(&8);
412    }
413
414    #[test]
415    fn test_invalid_ratio_picker() {
416        let mut picker = InvalidRatioPicker::new(0.5);
417        picker.init(0..10, 10);
418
419        let regions = (0..10)
420            .map(|rid| Region::new_for_test(rid, NoopDevice::monitored()))
421            .collect_vec();
422        let mut evictable = HashSet::new();
423
424        (0..10).for_each(|i| {
425            regions[i].statistics().invalid.fetch_add(i as _, Ordering::Relaxed);
426            evictable.insert(i as RegionId);
427        });
428
429        fn info<'a>(regions: &'a [Region], evictable: &'a HashSet<RegionId>) -> EvictionInfo<'a> {
430            EvictionInfo {
431                regions,
432                evictable,
433                clean: regions.len() - evictable.len(),
434            }
435        }
436
437        assert_eq!(picker.pick(info(&regions, &evictable)), Some(9));
438
439        assert_eq!(picker.pick(info(&regions, &evictable)), Some(9));
440        evictable.remove(&9);
441        assert_eq!(picker.pick(info(&regions, &evictable)), Some(8));
442        evictable.remove(&8);
443        assert_eq!(picker.pick(info(&regions, &evictable)), Some(7));
444        evictable.remove(&7);
445        assert_eq!(picker.pick(info(&regions, &evictable)), Some(6));
446        evictable.remove(&6);
447        assert_eq!(picker.pick(info(&regions, &evictable)), Some(5));
448        evictable.remove(&5);
449
450        assert_eq!(picker.pick(info(&regions, &evictable)), None);
451        assert_eq!(picker.pick(info(&regions, &evictable)), None);
452    }
453}