1use std::{
16 collections::VecDeque,
17 fmt::Debug,
18 num::NonZeroUsize,
19 ops::Range,
20 sync::{
21 atomic::{AtomicUsize, Ordering},
22 Arc,
23 },
24 time::Duration,
25};
26
27use foyer_common::strict_assert;
28use itertools::Itertools;
29
30use super::{AdmissionPicker, EvictionInfo, EvictionPicker, Pick, ReinsertionPicker};
31use crate::{device::RegionId, io::throttle::IoThrottler, statistics::Statistics};
32
33#[derive(Debug, Default, Clone)]
35pub struct ChainedAdmissionPicker {
36 pickers: Arc<Vec<Arc<dyn AdmissionPicker>>>,
37}
38
39impl AdmissionPicker for ChainedAdmissionPicker {
40 fn pick(&self, stats: &Arc<Statistics>, hash: u64) -> Pick {
41 let mut duration = Duration::ZERO;
42 for picker in self.pickers.iter() {
43 match picker.pick(stats, hash) {
44 Pick::Admit => {}
45 Pick::Reject => return Pick::Reject,
46 Pick::Throttled(dur) => duration += dur,
47 }
48 }
49 if duration.is_zero() {
50 Pick::Admit
51 } else {
52 Pick::Throttled(duration)
53 }
54 }
55}
56
57#[derive(Debug, Default)]
59pub struct ChainedAdmissionPickerBuilder {
60 pickers: Vec<Arc<dyn AdmissionPicker>>,
61}
62
63impl ChainedAdmissionPickerBuilder {
64 pub fn chain(mut self, picker: Arc<dyn AdmissionPicker>) -> Self {
66 self.pickers.push(picker);
67 self
68 }
69
70 pub fn build(self) -> ChainedAdmissionPicker {
72 ChainedAdmissionPicker {
73 pickers: Arc::new(self.pickers),
74 }
75 }
76}
77
78#[derive(Debug, Default)]
80pub struct AdmitAllPicker;
81
82impl AdmissionPicker for AdmitAllPicker {
83 fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
84 Pick::Admit
85 }
86}
87
88impl ReinsertionPicker for AdmitAllPicker {
89 fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
90 Pick::Admit
91 }
92}
93
94#[derive(Debug, Default)]
96pub struct RejectAllPicker;
97
98impl AdmissionPicker for RejectAllPicker {
99 fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
100 Pick::Reject
101 }
102}
103
104impl ReinsertionPicker for RejectAllPicker {
105 fn pick(&self, _: &Arc<Statistics>, _: u64) -> Pick {
106 Pick::Reject
107 }
108}
109
110#[derive(Debug)]
111struct IoThrottlerPickerInner {
112 throttler: IoThrottler,
113 bytes_last: AtomicUsize,
114 ios_last: AtomicUsize,
115 target: IoThrottlerTarget,
116}
117
118#[derive(Debug, Clone, PartialEq, Eq)]
120#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
121pub enum IoThrottlerTarget {
122 Read,
124 Write,
126 ReadWrite,
128}
129
130#[derive(Debug, Clone)]
137pub struct IoThrottlerPicker {
138 inner: Arc<IoThrottlerPickerInner>,
139}
140
141impl IoThrottlerPicker {
142 pub fn new(target: IoThrottlerTarget, throughput: Option<NonZeroUsize>, iops: Option<NonZeroUsize>) -> Self {
146 let inner = IoThrottlerPickerInner {
147 throttler: IoThrottler::new(throughput, iops),
148 bytes_last: AtomicUsize::default(),
149 ios_last: AtomicUsize::default(),
150 target,
151 };
152 Self { inner: Arc::new(inner) }
153 }
154
155 fn pick_inner(&self, stats: &Arc<Statistics>) -> Pick {
156 let duration = self.inner.throttler.probe();
157
158 let bytes_current = match self.inner.target {
159 IoThrottlerTarget::Read => stats.disk_read_bytes(),
160 IoThrottlerTarget::Write => stats.disk_write_bytes(),
161 IoThrottlerTarget::ReadWrite => stats.disk_read_bytes() + stats.disk_write_bytes(),
162 };
163 let ios_current = match self.inner.target {
164 IoThrottlerTarget::Read => stats.disk_read_ios(),
165 IoThrottlerTarget::Write => stats.disk_write_ios(),
166 IoThrottlerTarget::ReadWrite => stats.disk_read_ios() + stats.disk_write_ios(),
167 };
168
169 let bytes_last = self.inner.bytes_last.load(Ordering::Relaxed);
170 let ios_last = self.inner.ios_last.load(Ordering::Relaxed);
171
172 let bytes_delta = bytes_current.saturating_sub(bytes_last);
173 let ios_delta = ios_current.saturating_sub(ios_last);
174
175 self.inner.bytes_last.store(bytes_current, Ordering::Relaxed);
176 self.inner.ios_last.store(ios_current, Ordering::Relaxed);
177
178 self.inner.throttler.reduce(bytes_delta as f64, ios_delta as f64);
179
180 if duration.is_zero() {
181 Pick::Admit
182 } else {
183 Pick::Throttled(duration)
184 }
185 }
186}
187
188impl AdmissionPicker for IoThrottlerPicker {
189 fn pick(&self, stats: &Arc<Statistics>, _: u64) -> Pick {
190 self.pick_inner(stats)
191 }
192}
193
194#[derive(Debug)]
196pub struct FifoPicker {
197 queue: VecDeque<RegionId>,
198 regions: usize,
199 probations: usize,
200 probation_ratio: f64,
201}
202
203impl Default for FifoPicker {
204 fn default() -> Self {
205 Self::new(0.1)
206 }
207}
208
209impl FifoPicker {
210 pub fn new(probation_ratio: f64) -> Self {
217 assert!(
218 (0.0..=1.0).contains(&probation_ratio),
219 "probation ratio {} must be in [0.0, 1.0]",
220 probation_ratio
221 );
222 Self {
223 queue: VecDeque::new(),
224 regions: 0,
225 probations: 0,
226 probation_ratio,
227 }
228 }
229}
230
231impl FifoPicker {
232 fn mark_probation(&self, info: EvictionInfo<'_>) {
233 let marks = self.probations.saturating_sub(info.clean);
234 self.queue.iter().take(marks).for_each(|rid| {
235 if info.evictable.contains(rid) {
236 info.regions[*rid as usize]
237 .statistics()
238 .probation
239 .store(true, Ordering::Relaxed);
240 tracing::trace!(rid, "[fifo picker]: mark probation");
241 }
242 });
243 }
244}
245
246impl EvictionPicker for FifoPicker {
247 fn init(&mut self, regions: Range<RegionId>, _: usize) {
248 self.regions = regions.len();
249 let probations = (self.regions as f64 * self.probation_ratio).floor() as usize;
250 self.probations = probations.clamp(0, self.regions);
251 }
252
253 fn pick(&mut self, info: EvictionInfo<'_>) -> Option<RegionId> {
254 tracing::trace!(evictable = ?info.evictable, clean = info.clean, "[fifo picker]: pick");
255 self.mark_probation(info);
256 let res = self.queue.front().copied();
257 tracing::trace!("[fifo picker]: pick {res:?}");
258 res
259 }
260
261 fn on_region_evictable(&mut self, _: EvictionInfo<'_>, region: RegionId) {
262 tracing::trace!("[fifo picker]: {region} is evictable");
263 self.queue.push_back(region);
264 }
265
266 fn on_region_evict(&mut self, _: EvictionInfo<'_>, region: RegionId) {
267 tracing::trace!("[fifo picker]: {region} is evicted");
268 let index = self.queue.iter().position(|r| r == ®ion).unwrap();
269 self.queue.remove(index);
270 }
271}
272
273#[derive(Debug)]
277pub struct InvalidRatioPicker {
278 threshold: f64,
279 region_size: usize,
280}
281
282impl InvalidRatioPicker {
283 pub fn new(threshold: f64) -> Self {
285 let ratio = threshold.clamp(0.0, 1.0);
286 Self {
287 threshold: ratio,
288 region_size: 0,
289 }
290 }
291}
292
293impl EvictionPicker for InvalidRatioPicker {
294 fn init(&mut self, _: Range<RegionId>, region_size: usize) {
295 self.region_size = region_size;
296 }
297
298 fn pick(&mut self, info: EvictionInfo<'_>) -> Option<RegionId> {
299 strict_assert!(self.region_size > 0);
300
301 let mut data = info
302 .evictable
303 .iter()
304 .map(|rid| {
305 (
306 *rid,
307 info.regions[*rid as usize].statistics().invalid.load(Ordering::Relaxed),
308 )
309 })
310 .collect_vec();
311 data.sort_by_key(|(_, invalid)| *invalid);
312
313 let (rid, invalid) = data.last().copied()?;
314 if (invalid as f64 / self.region_size as f64) < self.threshold {
315 return None;
316 }
317 tracing::trace!("[invalid ratio picker]: pick {rid:?}");
318 Some(rid)
319 }
320
321 fn on_region_evictable(&mut self, _: EvictionInfo<'_>, region: RegionId) {
322 tracing::trace!("[invalid ratio picker]: {region} is evictable");
323 }
324
325 fn on_region_evict(&mut self, _: EvictionInfo<'_>, region: RegionId) {
326 tracing::trace!("[invalid ratio picker]: {region} is evicted");
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use std::collections::HashSet;
333
334 use super::*;
335 use crate::{device::test_utils::NoopDevice, Region};
336
337 #[test_log::test]
338 fn test_fifo_picker() {
339 let mut picker = FifoPicker::new(0.1);
340
341 let regions = (0..10)
342 .map(|rid| Region::new_for_test(rid, NoopDevice::monitored()))
343 .collect_vec();
344 let mut evictable = HashSet::new();
345
346 fn info<'a>(regions: &'a [Region], evictable: &'a HashSet<RegionId>, dirty: usize) -> EvictionInfo<'a> {
347 EvictionInfo {
348 regions,
349 evictable,
350 clean: regions.len() - evictable.len() - dirty,
351 }
352 }
353
354 fn check(regions: &[Region], probations: impl IntoIterator<Item = RegionId>) {
355 let probations = probations.into_iter().collect::<HashSet<_>>();
356 for rid in 0..regions.len() as RegionId {
357 if probations.contains(&rid) {
358 assert!(
359 regions[rid as usize].statistics().probation.load(Ordering::Relaxed),
360 "probations {probations:?}, assert {rid} is probation failed"
361 );
362 } else {
363 assert!(
364 !regions[rid as usize].statistics().probation.load(Ordering::Relaxed),
365 "probations {probations:?}, assert {rid} is not probation failed"
366 );
367 }
368 }
369 }
370
371 picker.init(0..10, 0);
372
373 (0..10).for_each(|i| {
375 evictable.insert(i as _);
376 picker.on_region_evictable(info(®ions, &evictable, 0), i);
377 });
378
379 assert_eq!(picker.pick(info(®ions, &evictable, 0)), Some(0));
381 check(®ions, [0]);
382 evictable.remove(&0);
383 picker.on_region_evict(info(®ions, &evictable, 1), 0);
384
385 assert_eq!(picker.pick(info(®ions, &evictable, 1)), Some(1));
387 check(®ions, [0, 1]);
388 evictable.remove(&1);
389 picker.on_region_evict(info(®ions, &evictable, 2), 1);
390
391 assert_eq!(picker.pick(info(®ions, &evictable, 1)), Some(2));
393 check(®ions, [0, 1]);
394 evictable.remove(&2);
395 picker.on_region_evict(info(®ions, &evictable, 2), 2);
396
397 picker.on_region_evict(info(®ions, &evictable, 3), 3);
398 evictable.remove(&3);
399 picker.on_region_evict(info(®ions, &evictable, 4), 5);
400 evictable.remove(&5);
401 picker.on_region_evict(info(®ions, &evictable, 5), 7);
402 evictable.remove(&7);
403 picker.on_region_evict(info(®ions, &evictable, 6), 9);
404 evictable.remove(&9);
405
406 picker.on_region_evict(info(®ions, &evictable, 7), 4);
407 evictable.remove(&4);
408 picker.on_region_evict(info(®ions, &evictable, 8), 6);
409 evictable.remove(&6);
410 picker.on_region_evict(info(®ions, &evictable, 9), 8);
411 evictable.remove(&8);
412 }
413
414 #[test]
415 fn test_invalid_ratio_picker() {
416 let mut picker = InvalidRatioPicker::new(0.5);
417 picker.init(0..10, 10);
418
419 let regions = (0..10)
420 .map(|rid| Region::new_for_test(rid, NoopDevice::monitored()))
421 .collect_vec();
422 let mut evictable = HashSet::new();
423
424 (0..10).for_each(|i| {
425 regions[i].statistics().invalid.fetch_add(i as _, Ordering::Relaxed);
426 evictable.insert(i as RegionId);
427 });
428
429 fn info<'a>(regions: &'a [Region], evictable: &'a HashSet<RegionId>) -> EvictionInfo<'a> {
430 EvictionInfo {
431 regions,
432 evictable,
433 clean: regions.len() - evictable.len(),
434 }
435 }
436
437 assert_eq!(picker.pick(info(®ions, &evictable)), Some(9));
438
439 assert_eq!(picker.pick(info(®ions, &evictable)), Some(9));
440 evictable.remove(&9);
441 assert_eq!(picker.pick(info(®ions, &evictable)), Some(8));
442 evictable.remove(&8);
443 assert_eq!(picker.pick(info(®ions, &evictable)), Some(7));
444 evictable.remove(&7);
445 assert_eq!(picker.pick(info(®ions, &evictable)), Some(6));
446 evictable.remove(&6);
447 assert_eq!(picker.pick(info(®ions, &evictable)), Some(5));
448 evictable.remove(&5);
449
450 assert_eq!(picker.pick(info(®ions, &evictable)), None);
451 assert_eq!(picker.pick(info(®ions, &evictable)), None);
452 }
453}