1use std::{
16 collections::{HashSet, VecDeque},
17 mem::offset_of,
18 sync::{
19 atomic::{AtomicU8, Ordering},
20 Arc,
21 },
22};
23
24use foyer_common::{
25 code::{Key, Value},
26 error::{Error, ErrorKind, Result},
27 properties::Properties,
28 strict_assert, strict_assert_eq,
29};
30use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
31use serde::{Deserialize, Serialize};
32
33use super::{Eviction, Op};
34use crate::record::Record;
35
36#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct S3FifoConfig {
39 pub small_queue_capacity_ratio: f64,
41 pub ghost_queue_capacity_ratio: f64,
43 pub small_to_main_freq_threshold: u8,
45}
46
47impl Default for S3FifoConfig {
48 fn default() -> Self {
49 Self {
50 small_queue_capacity_ratio: 0.1,
51 ghost_queue_capacity_ratio: 1.0,
52 small_to_main_freq_threshold: 1,
53 }
54 }
55}
56
57#[derive(Debug, PartialEq, Eq, Default)]
58enum Queue {
59 #[default]
60 None,
61 Main,
62 Small,
63}
64
65#[derive(Debug, Default)]
67pub struct S3FifoState {
68 link: LinkedListAtomicLink,
69 frequency: AtomicU8,
70 queue: Queue,
71}
72
73impl S3FifoState {
74 const MAX_FREQUENCY: u8 = 3;
75
76 fn frequency(&self) -> u8 {
77 self.frequency.load(Ordering::Acquire)
78 }
79
80 fn set_frequency(&self, val: u8) {
81 self.frequency.store(val, Ordering::Release)
82 }
83
84 fn inc_frequency(&self) -> u8 {
85 self.frequency
86 .fetch_update(Ordering::Release, Ordering::Acquire, |v| {
87 Some(std::cmp::min(Self::MAX_FREQUENCY, v + 1))
88 })
89 .unwrap()
90 }
91
92 fn dec_frequency(&self) -> u8 {
93 self.frequency
94 .fetch_update(Ordering::Release, Ordering::Acquire, |v| Some(v.saturating_sub(1)))
95 .unwrap()
96 }
97}
98
99intrusive_adapter! { Adapter<K, V, P> = Arc<Record<S3Fifo<K, V, P>>>: Record<S3Fifo<K, V, P>> { ?offset = Record::<S3Fifo<K, V, P>>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value, P: Properties }
100
101pub struct S3Fifo<K, V, P>
102where
103 K: Key,
104 V: Value,
105 P: Properties,
106{
107 ghost_queue: GhostQueue,
108 small_queue: LinkedList<Adapter<K, V, P>>,
109 main_queue: LinkedList<Adapter<K, V, P>>,
110
111 small_weight_capacity: usize,
112
113 small_weight: usize,
114 main_weight: usize,
115
116 small_to_main_freq_threshold: u8,
117
118 config: S3FifoConfig,
119}
120
121impl<K, V, P> S3Fifo<K, V, P>
122where
123 K: Key,
124 V: Value,
125 P: Properties,
126{
127 fn evict(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
128 if self.small_weight > self.small_weight_capacity {
130 if let Some(record) = self.evict_small() {
131 return Some(record);
132 }
133 }
134 if let Some(record) = self.evict_main() {
135 return Some(record);
136 }
137 self.evict_small_force()
138 }
139
140 #[expect(clippy::never_loop)]
141 fn evict_small_force(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
142 while let Some(record) = self.small_queue.pop_front() {
143 let state = unsafe { &mut *record.state().get() };
144 state.queue = Queue::None;
145 state.set_frequency(0);
146 self.small_weight -= record.weight();
147 return Some(record);
148 }
149 None
150 }
151
152 fn evict_small(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
153 while let Some(record) = self.small_queue.pop_front() {
154 let state = unsafe { &mut *record.state().get() };
155 if state.frequency() >= self.small_to_main_freq_threshold {
156 state.queue = Queue::Main;
157 self.small_weight -= record.weight();
158 self.main_weight += record.weight();
159 self.main_queue.push_back(record);
160 } else {
161 state.queue = Queue::None;
162 state.set_frequency(0);
163 self.small_weight -= record.weight();
164
165 self.ghost_queue.push(record.hash(), record.weight());
166
167 return Some(record);
168 }
169 }
170 None
171 }
172
173 fn evict_main(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
174 while let Some(record) = self.main_queue.pop_front() {
175 let state = unsafe { &mut *record.state().get() };
176 if state.dec_frequency() > 0 {
177 self.main_queue.push_back(record);
178 } else {
179 state.queue = Queue::None;
180 self.main_weight -= record.weight();
181 return Some(record);
182 }
183 }
184 None
185 }
186}
187
188impl<K, V, P> Eviction for S3Fifo<K, V, P>
189where
190 K: Key,
191 V: Value,
192 P: Properties,
193{
194 type Config = S3FifoConfig;
195 type Key = K;
196 type Value = V;
197 type Properties = P;
198 type State = S3FifoState;
199
200 fn new(capacity: usize, config: &Self::Config) -> Self
201 where
202 Self: Sized,
203 {
204 assert!(
205 config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0,
206 "small_queue_capacity_ratio must be in (0, 1), given: {}",
207 config.small_queue_capacity_ratio
208 );
209
210 let config = config.clone();
211
212 let ghost_queue_capacity = (capacity as f64 * config.ghost_queue_capacity_ratio) as usize;
213 let ghost_queue = GhostQueue::new(ghost_queue_capacity);
214 let small_weight_capacity = (capacity as f64 * config.small_queue_capacity_ratio) as usize;
215
216 Self {
217 ghost_queue,
218 small_queue: LinkedList::new(Adapter::new()),
219 main_queue: LinkedList::new(Adapter::new()),
220 small_weight_capacity,
221 small_weight: 0,
222 main_weight: 0,
223 small_to_main_freq_threshold: config.small_to_main_freq_threshold.min(S3FifoState::MAX_FREQUENCY),
224 config,
225 }
226 }
227
228 fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
229 if let Some(config) = config {
230 if config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0 {
231 return Err(
232 Error::new(ErrorKind::Config, "update S3-FIFO config failed").with_context(
233 "reason",
234 format!(
235 "small_queue_capacity_ratio must be in (0, 1), given: {}",
236 config.small_queue_capacity_ratio
237 ),
238 ),
239 );
240 }
241 self.config = config.clone();
242 }
243
244 let ghost_queue_capacity = (capacity as f64 * self.config.ghost_queue_capacity_ratio) as usize;
245 let small_weight_capacity = (capacity as f64 * self.config.small_queue_capacity_ratio) as usize;
246 self.ghost_queue.update(ghost_queue_capacity);
247 self.small_weight_capacity = small_weight_capacity;
248
249 Ok(())
250 }
251
252 fn push(&mut self, record: Arc<Record<Self>>) {
253 let state = unsafe { &mut *record.state().get() };
254
255 strict_assert_eq!(state.frequency(), 0);
256 strict_assert_eq!(state.queue, Queue::None);
257
258 record.set_in_eviction(true);
259
260 if self.ghost_queue.contains(record.hash()) {
261 state.queue = Queue::Main;
262 self.main_weight += record.weight();
263 self.main_queue.push_back(record);
264 } else {
265 state.queue = Queue::Small;
266 self.small_weight += record.weight();
267 self.small_queue.push_back(record);
268 }
269 }
270
271 fn pop(&mut self) -> Option<Arc<Record<Self>>> {
272 if let Some(record) = self.evict() {
273 record.set_in_eviction(false);
275 Some(record)
276 } else {
277 strict_assert!(self.small_queue.is_empty());
278 strict_assert!(self.main_queue.is_empty());
279 None
280 }
281 }
282
283 fn remove(&mut self, record: &Arc<Record<Self>>) {
284 let state = unsafe { &mut *record.state().get() };
285
286 match state.queue {
287 Queue::None => unreachable!(),
288 Queue::Main => {
289 unsafe { self.main_queue.remove_from_ptr(Arc::as_ptr(record)) };
290
291 state.queue = Queue::None;
292 state.set_frequency(0);
293 record.set_in_eviction(false);
294
295 self.main_weight -= record.weight();
296 }
297 Queue::Small => {
298 unsafe { self.small_queue.remove_from_ptr(Arc::as_ptr(record)) };
299
300 state.queue = Queue::None;
301 state.set_frequency(0);
302 record.set_in_eviction(false);
303
304 self.small_weight -= record.weight();
305 }
306 }
307 }
308
309 fn acquire() -> Op<Self> {
310 Op::immutable(|_: &Self, record| {
311 let state = unsafe { &mut *record.state().get() };
312 state.inc_frequency();
313 })
314 }
315
316 fn release() -> Op<Self> {
317 Op::noop()
318 }
319}
320
321struct GhostQueue {
323 counts: HashSet<u64>,
324 queue: VecDeque<(u64, usize)>,
325 capacity: usize,
326 weight: usize,
327}
328
329impl GhostQueue {
330 fn new(capacity: usize) -> Self {
331 Self {
332 counts: HashSet::default(),
333 queue: VecDeque::new(),
334 capacity,
335 weight: 0,
336 }
337 }
338
339 fn update(&mut self, capacity: usize) {
340 self.capacity = capacity;
341 if self.capacity == 0 {
342 return;
343 }
344 while self.weight > self.capacity && self.weight > 0 {
345 self.pop();
346 }
347 }
348
349 fn push(&mut self, hash: u64, weight: usize) {
350 if self.capacity == 0 {
351 return;
352 }
353 while self.weight + weight > self.capacity && self.weight > 0 {
354 self.pop();
355 }
356 self.queue.push_back((hash, weight));
357 self.counts.insert(hash);
358 self.weight += weight;
359 }
360
361 fn pop(&mut self) {
362 if let Some((hash, weight)) = self.queue.pop_front() {
363 self.weight -= weight;
364 self.counts.remove(&hash);
365 }
366 }
367
368 fn contains(&self, hash: u64) -> bool {
369 self.counts.contains(&hash)
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use std::ops::Range;
376
377 use itertools::Itertools;
378
379 use super::*;
380 use crate::{
381 eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt, TestProperties},
382 record::Data,
383 };
384
385 impl<K, V> Dump for S3Fifo<K, V, TestProperties>
386 where
387 K: Key + Clone,
388 V: Value + Clone,
389 {
390 type Output = Vec<Vec<Arc<Record<Self>>>>;
391
392 fn dump(&self) -> Self::Output {
393 let mut small = vec![];
394 let mut main = vec![];
395
396 let mut cursor = self.small_queue.cursor();
397 loop {
398 cursor.move_next();
399 match cursor.clone_pointer() {
400 Some(record) => small.push(record),
401 None => break,
402 }
403 }
404
405 let mut cursor = self.main_queue.cursor();
406 loop {
407 cursor.move_next();
408 match cursor.clone_pointer() {
409 Some(record) => main.push(record),
410 None => break,
411 }
412 }
413
414 vec![small, main]
415 }
416 }
417
418 type TestS3Fifo = S3Fifo<u64, u64, TestProperties>;
419
420 fn assert_frequencies(rs: &[Arc<Record<TestS3Fifo>>], range: Range<usize>, count: u8) {
421 rs[range]
422 .iter()
423 .for_each(|r| assert_eq!(unsafe { &*r.state().get() }.frequency(), count));
424 }
425
426 #[test]
427 fn test_s3fifo() {
428 let rs = (0..100)
429 .map(|i| {
430 Arc::new(Record::new(Data {
431 key: i,
432 value: i,
433 properties: TestProperties::default(),
434 hash: i,
435 weight: 1,
436 }))
437 })
438 .collect_vec();
439 let r = |i: usize| rs[i].clone();
440
441 let config = S3FifoConfig {
443 small_queue_capacity_ratio: 0.25,
444 ghost_queue_capacity_ratio: 10.0,
445 small_to_main_freq_threshold: 2,
446 };
447 let mut s3fifo = TestS3Fifo::new(8, &config);
448
449 assert_eq!(s3fifo.small_weight_capacity, 2);
450
451 s3fifo.push(r(0));
452 s3fifo.push(r(1));
453 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1)], vec![]]);
454
455 s3fifo.push(r(2));
456 s3fifo.push(r(3));
457 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1), r(2), r(3)], vec![]]);
458
459 assert_frequencies(&rs, 0..4, 0);
460
461 (0..4).for_each(|i| s3fifo.acquire_immutable(&rs[i]));
462 s3fifo.acquire_immutable(&rs[1]);
463 s3fifo.acquire_immutable(&rs[2]);
464 assert_frequencies(&rs, 0..1, 1);
465 assert_frequencies(&rs, 1..3, 2);
466 assert_frequencies(&rs, 3..4, 1);
467
468 let r0 = s3fifo.pop().unwrap();
469 let r3 = s3fifo.pop().unwrap();
470 assert_ptr_eq(&rs[0], &r0);
471 assert_ptr_eq(&rs[3], &r3);
472 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(1), r(2)]]);
473 assert_frequencies(&rs, 0..1, 0);
474 assert_frequencies(&rs, 1..3, 2);
475 assert_frequencies(&rs, 3..4, 0);
476
477 let r1 = s3fifo.pop().unwrap();
478 assert_ptr_eq(&rs[1], &r1);
479 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(2)]]);
480 assert_frequencies(&rs, 0..4, 0);
481
482 s3fifo.clear();
483 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![]]);
484 }
485}