1use std::{
16 collections::{HashSet, VecDeque},
17 mem::offset_of,
18 sync::{
19 atomic::{AtomicU8, Ordering},
20 Arc,
21 },
22};
23
24use foyer_common::{
25 code::{Key, Value},
26 strict_assert, strict_assert_eq,
27};
28use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
29use serde::{Deserialize, Serialize};
30
31use super::{Eviction, Op};
32use crate::{
33 error::{Error, Result},
34 record::{CacheHint, Record},
35};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct S3FifoConfig {
40 pub small_queue_capacity_ratio: f64,
42 pub ghost_queue_capacity_ratio: f64,
44 pub small_to_main_freq_threshold: u8,
46}
47
48impl Default for S3FifoConfig {
49 fn default() -> Self {
50 Self {
51 small_queue_capacity_ratio: 0.1,
52 ghost_queue_capacity_ratio: 1.0,
53 small_to_main_freq_threshold: 1,
54 }
55 }
56}
57
58#[derive(Debug, Clone, Default)]
60pub struct S3FifoHint;
61
62impl From<CacheHint> for S3FifoHint {
63 fn from(_: CacheHint) -> Self {
64 S3FifoHint
65 }
66}
67
68impl From<S3FifoHint> for CacheHint {
69 fn from(_: S3FifoHint) -> Self {
70 CacheHint::Normal
71 }
72}
73
74#[derive(Debug, PartialEq, Eq)]
75enum Queue {
76 None,
77 Main,
78 Small,
79}
80
81impl Default for Queue {
82 fn default() -> Self {
83 Self::None
84 }
85}
86
87#[derive(Debug, Default)]
89pub struct S3FifoState {
90 link: LinkedListAtomicLink,
91 frequency: AtomicU8,
92 queue: Queue,
93}
94
95impl S3FifoState {
96 const MAX_FREQUENCY: u8 = 3;
97
98 fn frequency(&self) -> u8 {
99 self.frequency.load(Ordering::Acquire)
100 }
101
102 fn set_frequency(&self, val: u8) {
103 self.frequency.store(val, Ordering::Release)
104 }
105
106 fn inc_frequency(&self) -> u8 {
107 self.frequency
108 .fetch_update(Ordering::Release, Ordering::Acquire, |v| {
109 Some(std::cmp::min(Self::MAX_FREQUENCY, v + 1))
110 })
111 .unwrap()
112 }
113
114 fn dec_frequency(&self) -> u8 {
115 self.frequency
116 .fetch_update(Ordering::Release, Ordering::Acquire, |v| Some(v.saturating_sub(1)))
117 .unwrap()
118 }
119}
120
121intrusive_adapter! { Adapter<K, V> = Arc<Record<S3Fifo<K, V>>>: Record<S3Fifo<K, V>> { ?offset = Record::<S3Fifo<K, V>>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value }
122
123pub struct S3Fifo<K, V>
124where
125 K: Key,
126 V: Value,
127{
128 ghost_queue: GhostQueue,
129 small_queue: LinkedList<Adapter<K, V>>,
130 main_queue: LinkedList<Adapter<K, V>>,
131
132 small_weight_capacity: usize,
133
134 small_weight: usize,
135 main_weight: usize,
136
137 small_to_main_freq_threshold: u8,
138
139 config: S3FifoConfig,
140}
141
142impl<K, V> S3Fifo<K, V>
143where
144 K: Key,
145 V: Value,
146{
147 fn evict(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
148 if self.small_weight > self.small_weight_capacity {
150 if let Some(record) = self.evict_small() {
151 return Some(record);
152 }
153 }
154 if let Some(record) = self.evict_main() {
155 return Some(record);
156 }
157 self.evict_small_force()
158 }
159
160 #[expect(clippy::never_loop)]
161 fn evict_small_force(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
162 while let Some(record) = self.small_queue.pop_front() {
163 let state = unsafe { &mut *record.state().get() };
164 state.queue = Queue::None;
165 state.set_frequency(0);
166 self.small_weight -= record.weight();
167 return Some(record);
168 }
169 None
170 }
171
172 fn evict_small(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
173 while let Some(record) = self.small_queue.pop_front() {
174 let state = unsafe { &mut *record.state().get() };
175 if state.frequency() >= self.small_to_main_freq_threshold {
176 state.queue = Queue::Main;
177 self.small_weight -= record.weight();
178 self.main_weight += record.weight();
179 self.main_queue.push_back(record);
180 } else {
181 state.queue = Queue::None;
182 state.set_frequency(0);
183 self.small_weight -= record.weight();
184
185 self.ghost_queue.push(record.hash(), record.weight());
186
187 return Some(record);
188 }
189 }
190 None
191 }
192
193 fn evict_main(&mut self) -> Option<Arc<Record<S3Fifo<K, V>>>> {
194 while let Some(record) = self.main_queue.pop_front() {
195 let state = unsafe { &mut *record.state().get() };
196 if state.dec_frequency() > 0 {
197 self.main_queue.push_back(record);
198 } else {
199 state.queue = Queue::None;
200 self.main_weight -= record.weight();
201 return Some(record);
202 }
203 }
204 None
205 }
206}
207
208impl<K, V> Eviction for S3Fifo<K, V>
209where
210 K: Key,
211 V: Value,
212{
213 type Config = S3FifoConfig;
214 type Key = K;
215 type Value = V;
216 type Hint = S3FifoHint;
217 type State = S3FifoState;
218
219 fn new(capacity: usize, config: &Self::Config) -> Self
220 where
221 Self: Sized,
222 {
223 assert!(
224 config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0,
225 "small_queue_capacity_ratio must be in (0, 1), given: {}",
226 config.small_queue_capacity_ratio
227 );
228
229 let config = config.clone();
230
231 let ghost_queue_capacity = (capacity as f64 * config.ghost_queue_capacity_ratio) as usize;
232 let ghost_queue = GhostQueue::new(ghost_queue_capacity);
233 let small_weight_capacity = (capacity as f64 * config.small_queue_capacity_ratio) as usize;
234
235 Self {
236 ghost_queue,
237 small_queue: LinkedList::new(Adapter::new()),
238 main_queue: LinkedList::new(Adapter::new()),
239 small_weight_capacity,
240 small_weight: 0,
241 main_weight: 0,
242 small_to_main_freq_threshold: config.small_to_main_freq_threshold.min(S3FifoState::MAX_FREQUENCY),
243 config,
244 }
245 }
246
247 fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
248 if let Some(config) = config {
249 if config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0 {
250 return Err(Error::ConfigError(format!(
251 "small_queue_capacity_ratio must be in (0, 1), given: {}",
252 config.small_queue_capacity_ratio
253 )));
254 }
255 self.config = config.clone();
256 }
257
258 let ghost_queue_capacity = (capacity as f64 * self.config.ghost_queue_capacity_ratio) as usize;
259 let small_weight_capacity = (capacity as f64 * self.config.small_queue_capacity_ratio) as usize;
260 self.ghost_queue.update(ghost_queue_capacity);
261 self.small_weight_capacity = small_weight_capacity;
262
263 Ok(())
264 }
265
266 fn push(&mut self, record: Arc<Record<Self>>) {
267 let state = unsafe { &mut *record.state().get() };
268
269 strict_assert_eq!(state.frequency(), 0);
270 strict_assert_eq!(state.queue, Queue::None);
271
272 record.set_in_eviction(true);
273
274 if self.ghost_queue.contains(record.hash()) {
275 state.queue = Queue::Main;
276 self.main_weight += record.weight();
277 self.main_queue.push_back(record);
278 } else {
279 state.queue = Queue::Small;
280 self.small_weight += record.weight();
281 self.small_queue.push_back(record);
282 }
283 }
284
285 fn pop(&mut self) -> Option<Arc<Record<Self>>> {
286 if let Some(record) = self.evict() {
287 record.set_in_eviction(false);
289 Some(record)
290 } else {
291 strict_assert!(self.small_queue.is_empty());
292 strict_assert!(self.main_queue.is_empty());
293 None
294 }
295 }
296
297 fn remove(&mut self, record: &Arc<Record<Self>>) {
298 let state = unsafe { &mut *record.state().get() };
299
300 match state.queue {
301 Queue::None => unreachable!(),
302 Queue::Main => {
303 unsafe { self.main_queue.remove_from_ptr(Arc::as_ptr(record)) };
304
305 state.queue = Queue::None;
306 state.set_frequency(0);
307 record.set_in_eviction(false);
308
309 self.main_weight -= record.weight();
310 }
311 Queue::Small => {
312 unsafe { self.small_queue.remove_from_ptr(Arc::as_ptr(record)) };
313
314 state.queue = Queue::None;
315 state.set_frequency(0);
316 record.set_in_eviction(false);
317
318 self.small_weight -= record.weight();
319 }
320 }
321 }
322
323 fn acquire() -> Op<Self> {
324 Op::immutable(|_: &Self, record| {
325 let state = unsafe { &mut *record.state().get() };
326 state.inc_frequency();
327 })
328 }
329
330 fn release() -> Op<Self> {
331 Op::noop()
332 }
333}
334
335struct GhostQueue {
337 counts: HashSet<u64>,
338 queue: VecDeque<(u64, usize)>,
339 capacity: usize,
340 weight: usize,
341}
342
343impl GhostQueue {
344 fn new(capacity: usize) -> Self {
345 Self {
346 counts: HashSet::default(),
347 queue: VecDeque::new(),
348 capacity,
349 weight: 0,
350 }
351 }
352
353 fn update(&mut self, capacity: usize) {
354 self.capacity = capacity;
355 if self.capacity == 0 {
356 return;
357 }
358 while self.weight > self.capacity && self.weight > 0 {
359 self.pop();
360 }
361 }
362
363 fn push(&mut self, hash: u64, weight: usize) {
364 if self.capacity == 0 {
365 return;
366 }
367 while self.weight + weight > self.capacity && self.weight > 0 {
368 self.pop();
369 }
370 self.queue.push_back((hash, weight));
371 self.counts.insert(hash);
372 self.weight += weight;
373 }
374
375 fn pop(&mut self) {
376 if let Some((hash, weight)) = self.queue.pop_front() {
377 self.weight -= weight;
378 self.counts.remove(&hash);
379 }
380 }
381
382 fn contains(&self, hash: u64) -> bool {
383 self.counts.contains(&hash)
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use std::ops::Range;
390
391 use itertools::Itertools;
392
393 use super::*;
394 use crate::{
395 eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt},
396 record::Data,
397 };
398
399 impl<K, V> Dump for S3Fifo<K, V>
400 where
401 K: Key + Clone,
402 V: Value + Clone,
403 {
404 type Output = Vec<Vec<Arc<Record<Self>>>>;
405
406 fn dump(&self) -> Self::Output {
407 let mut small = vec![];
408 let mut main = vec![];
409
410 let mut cursor = self.small_queue.cursor();
411 loop {
412 cursor.move_next();
413 match cursor.clone_pointer() {
414 Some(record) => small.push(record),
415 None => break,
416 }
417 }
418
419 let mut cursor = self.main_queue.cursor();
420 loop {
421 cursor.move_next();
422 match cursor.clone_pointer() {
423 Some(record) => main.push(record),
424 None => break,
425 }
426 }
427
428 vec![small, main]
429 }
430 }
431
432 type TestS3Fifo = S3Fifo<u64, u64>;
433
434 fn assert_frequencies(rs: &[Arc<Record<TestS3Fifo>>], range: Range<usize>, count: u8) {
435 rs[range]
436 .iter()
437 .for_each(|r| assert_eq!(unsafe { &*r.state().get() }.frequency(), count));
438 }
439
440 #[test]
441 fn test_s3fifo() {
442 let rs = (0..100)
443 .map(|i| {
444 Arc::new(Record::new(Data {
445 key: i,
446 value: i,
447 hint: S3FifoHint,
448 hash: i,
449 weight: 1,
450 }))
451 })
452 .collect_vec();
453 let r = |i: usize| rs[i].clone();
454
455 let config = S3FifoConfig {
457 small_queue_capacity_ratio: 0.25,
458 ghost_queue_capacity_ratio: 10.0,
459 small_to_main_freq_threshold: 2,
460 };
461 let mut s3fifo = TestS3Fifo::new(8, &config);
462
463 assert_eq!(s3fifo.small_weight_capacity, 2);
464
465 s3fifo.push(r(0));
466 s3fifo.push(r(1));
467 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1)], vec![]]);
468
469 s3fifo.push(r(2));
470 s3fifo.push(r(3));
471 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1), r(2), r(3)], vec![]]);
472
473 assert_frequencies(&rs, 0..4, 0);
474
475 (0..4).for_each(|i| s3fifo.acquire_immutable(&rs[i]));
476 s3fifo.acquire_immutable(&rs[1]);
477 s3fifo.acquire_immutable(&rs[2]);
478 assert_frequencies(&rs, 0..1, 1);
479 assert_frequencies(&rs, 1..3, 2);
480 assert_frequencies(&rs, 3..4, 1);
481
482 let r0 = s3fifo.pop().unwrap();
483 let r3 = s3fifo.pop().unwrap();
484 assert_ptr_eq(&rs[0], &r0);
485 assert_ptr_eq(&rs[3], &r3);
486 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(1), r(2)]]);
487 assert_frequencies(&rs, 0..1, 0);
488 assert_frequencies(&rs, 1..3, 2);
489 assert_frequencies(&rs, 3..4, 0);
490
491 let r1 = s3fifo.pop().unwrap();
492 assert_ptr_eq(&rs[1], &r1);
493 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(2)]]);
494 assert_frequencies(&rs, 0..4, 0);
495
496 s3fifo.clear();
497 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![]]);
498 }
499}