1use std::{
16 collections::{HashSet, VecDeque},
17 mem::offset_of,
18 sync::{
19 atomic::{AtomicU8, Ordering},
20 Arc,
21 },
22};
23
24use foyer_common::{
25 code::{Key, Value},
26 properties::Properties,
27 strict_assert, strict_assert_eq,
28};
29use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
30use serde::{Deserialize, Serialize};
31
32use super::{Eviction, Op};
33use crate::{
34 error::{Error, Result},
35 record::Record,
36};
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct S3FifoConfig {
41 pub small_queue_capacity_ratio: f64,
43 pub ghost_queue_capacity_ratio: f64,
45 pub small_to_main_freq_threshold: u8,
47}
48
49impl Default for S3FifoConfig {
50 fn default() -> Self {
51 Self {
52 small_queue_capacity_ratio: 0.1,
53 ghost_queue_capacity_ratio: 1.0,
54 small_to_main_freq_threshold: 1,
55 }
56 }
57}
58
59#[derive(Debug, PartialEq, Eq)]
60enum Queue {
61 None,
62 Main,
63 Small,
64}
65
66impl Default for Queue {
67 fn default() -> Self {
68 Self::None
69 }
70}
71
72#[derive(Debug, Default)]
74pub struct S3FifoState {
75 link: LinkedListAtomicLink,
76 frequency: AtomicU8,
77 queue: Queue,
78}
79
80impl S3FifoState {
81 const MAX_FREQUENCY: u8 = 3;
82
83 fn frequency(&self) -> u8 {
84 self.frequency.load(Ordering::Acquire)
85 }
86
87 fn set_frequency(&self, val: u8) {
88 self.frequency.store(val, Ordering::Release)
89 }
90
91 fn inc_frequency(&self) -> u8 {
92 self.frequency
93 .fetch_update(Ordering::Release, Ordering::Acquire, |v| {
94 Some(std::cmp::min(Self::MAX_FREQUENCY, v + 1))
95 })
96 .unwrap()
97 }
98
99 fn dec_frequency(&self) -> u8 {
100 self.frequency
101 .fetch_update(Ordering::Release, Ordering::Acquire, |v| Some(v.saturating_sub(1)))
102 .unwrap()
103 }
104}
105
106intrusive_adapter! { Adapter<K, V, P> = Arc<Record<S3Fifo<K, V, P>>>: Record<S3Fifo<K, V, P>> { ?offset = Record::<S3Fifo<K, V, P>>::STATE_OFFSET + offset_of!(S3FifoState, link) => LinkedListAtomicLink } where K: Key, V: Value, P: Properties }
107
108pub struct S3Fifo<K, V, P>
109where
110 K: Key,
111 V: Value,
112 P: Properties,
113{
114 ghost_queue: GhostQueue,
115 small_queue: LinkedList<Adapter<K, V, P>>,
116 main_queue: LinkedList<Adapter<K, V, P>>,
117
118 small_weight_capacity: usize,
119
120 small_weight: usize,
121 main_weight: usize,
122
123 small_to_main_freq_threshold: u8,
124
125 config: S3FifoConfig,
126}
127
128impl<K, V, P> S3Fifo<K, V, P>
129where
130 K: Key,
131 V: Value,
132 P: Properties,
133{
134 fn evict(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
135 if self.small_weight > self.small_weight_capacity {
137 if let Some(record) = self.evict_small() {
138 return Some(record);
139 }
140 }
141 if let Some(record) = self.evict_main() {
142 return Some(record);
143 }
144 self.evict_small_force()
145 }
146
147 #[expect(clippy::never_loop)]
148 fn evict_small_force(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
149 while let Some(record) = self.small_queue.pop_front() {
150 let state = unsafe { &mut *record.state().get() };
151 state.queue = Queue::None;
152 state.set_frequency(0);
153 self.small_weight -= record.weight();
154 return Some(record);
155 }
156 None
157 }
158
159 fn evict_small(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
160 while let Some(record) = self.small_queue.pop_front() {
161 let state = unsafe { &mut *record.state().get() };
162 if state.frequency() >= self.small_to_main_freq_threshold {
163 state.queue = Queue::Main;
164 self.small_weight -= record.weight();
165 self.main_weight += record.weight();
166 self.main_queue.push_back(record);
167 } else {
168 state.queue = Queue::None;
169 state.set_frequency(0);
170 self.small_weight -= record.weight();
171
172 self.ghost_queue.push(record.hash(), record.weight());
173
174 return Some(record);
175 }
176 }
177 None
178 }
179
180 fn evict_main(&mut self) -> Option<Arc<Record<S3Fifo<K, V, P>>>> {
181 while let Some(record) = self.main_queue.pop_front() {
182 let state = unsafe { &mut *record.state().get() };
183 if state.dec_frequency() > 0 {
184 self.main_queue.push_back(record);
185 } else {
186 state.queue = Queue::None;
187 self.main_weight -= record.weight();
188 return Some(record);
189 }
190 }
191 None
192 }
193}
194
195impl<K, V, P> Eviction for S3Fifo<K, V, P>
196where
197 K: Key,
198 V: Value,
199 P: Properties,
200{
201 type Config = S3FifoConfig;
202 type Key = K;
203 type Value = V;
204 type Properties = P;
205 type State = S3FifoState;
206
207 fn new(capacity: usize, config: &Self::Config) -> Self
208 where
209 Self: Sized,
210 {
211 assert!(
212 config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0,
213 "small_queue_capacity_ratio must be in (0, 1), given: {}",
214 config.small_queue_capacity_ratio
215 );
216
217 let config = config.clone();
218
219 let ghost_queue_capacity = (capacity as f64 * config.ghost_queue_capacity_ratio) as usize;
220 let ghost_queue = GhostQueue::new(ghost_queue_capacity);
221 let small_weight_capacity = (capacity as f64 * config.small_queue_capacity_ratio) as usize;
222
223 Self {
224 ghost_queue,
225 small_queue: LinkedList::new(Adapter::new()),
226 main_queue: LinkedList::new(Adapter::new()),
227 small_weight_capacity,
228 small_weight: 0,
229 main_weight: 0,
230 small_to_main_freq_threshold: config.small_to_main_freq_threshold.min(S3FifoState::MAX_FREQUENCY),
231 config,
232 }
233 }
234
235 fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
236 if let Some(config) = config {
237 if config.small_queue_capacity_ratio > 0.0 && config.small_queue_capacity_ratio < 1.0 {
238 return Err(Error::ConfigError(format!(
239 "small_queue_capacity_ratio must be in (0, 1), given: {}",
240 config.small_queue_capacity_ratio
241 )));
242 }
243 self.config = config.clone();
244 }
245
246 let ghost_queue_capacity = (capacity as f64 * self.config.ghost_queue_capacity_ratio) as usize;
247 let small_weight_capacity = (capacity as f64 * self.config.small_queue_capacity_ratio) as usize;
248 self.ghost_queue.update(ghost_queue_capacity);
249 self.small_weight_capacity = small_weight_capacity;
250
251 Ok(())
252 }
253
254 fn push(&mut self, record: Arc<Record<Self>>) {
255 let state = unsafe { &mut *record.state().get() };
256
257 strict_assert_eq!(state.frequency(), 0);
258 strict_assert_eq!(state.queue, Queue::None);
259
260 record.set_in_eviction(true);
261
262 if self.ghost_queue.contains(record.hash()) {
263 state.queue = Queue::Main;
264 self.main_weight += record.weight();
265 self.main_queue.push_back(record);
266 } else {
267 state.queue = Queue::Small;
268 self.small_weight += record.weight();
269 self.small_queue.push_back(record);
270 }
271 }
272
273 fn pop(&mut self) -> Option<Arc<Record<Self>>> {
274 if let Some(record) = self.evict() {
275 record.set_in_eviction(false);
277 Some(record)
278 } else {
279 strict_assert!(self.small_queue.is_empty());
280 strict_assert!(self.main_queue.is_empty());
281 None
282 }
283 }
284
285 fn remove(&mut self, record: &Arc<Record<Self>>) {
286 let state = unsafe { &mut *record.state().get() };
287
288 match state.queue {
289 Queue::None => unreachable!(),
290 Queue::Main => {
291 unsafe { self.main_queue.remove_from_ptr(Arc::as_ptr(record)) };
292
293 state.queue = Queue::None;
294 state.set_frequency(0);
295 record.set_in_eviction(false);
296
297 self.main_weight -= record.weight();
298 }
299 Queue::Small => {
300 unsafe { self.small_queue.remove_from_ptr(Arc::as_ptr(record)) };
301
302 state.queue = Queue::None;
303 state.set_frequency(0);
304 record.set_in_eviction(false);
305
306 self.small_weight -= record.weight();
307 }
308 }
309 }
310
311 fn acquire() -> Op<Self> {
312 Op::immutable(|_: &Self, record| {
313 let state = unsafe { &mut *record.state().get() };
314 state.inc_frequency();
315 })
316 }
317
318 fn release() -> Op<Self> {
319 Op::noop()
320 }
321}
322
323struct GhostQueue {
325 counts: HashSet<u64>,
326 queue: VecDeque<(u64, usize)>,
327 capacity: usize,
328 weight: usize,
329}
330
331impl GhostQueue {
332 fn new(capacity: usize) -> Self {
333 Self {
334 counts: HashSet::default(),
335 queue: VecDeque::new(),
336 capacity,
337 weight: 0,
338 }
339 }
340
341 fn update(&mut self, capacity: usize) {
342 self.capacity = capacity;
343 if self.capacity == 0 {
344 return;
345 }
346 while self.weight > self.capacity && self.weight > 0 {
347 self.pop();
348 }
349 }
350
351 fn push(&mut self, hash: u64, weight: usize) {
352 if self.capacity == 0 {
353 return;
354 }
355 while self.weight + weight > self.capacity && self.weight > 0 {
356 self.pop();
357 }
358 self.queue.push_back((hash, weight));
359 self.counts.insert(hash);
360 self.weight += weight;
361 }
362
363 fn pop(&mut self) {
364 if let Some((hash, weight)) = self.queue.pop_front() {
365 self.weight -= weight;
366 self.counts.remove(&hash);
367 }
368 }
369
370 fn contains(&self, hash: u64) -> bool {
371 self.counts.contains(&hash)
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use std::ops::Range;
378
379 use itertools::Itertools;
380
381 use super::*;
382 use crate::{
383 eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt, TestProperties},
384 record::Data,
385 };
386
387 impl<K, V> Dump for S3Fifo<K, V, TestProperties>
388 where
389 K: Key + Clone,
390 V: Value + Clone,
391 {
392 type Output = Vec<Vec<Arc<Record<Self>>>>;
393
394 fn dump(&self) -> Self::Output {
395 let mut small = vec![];
396 let mut main = vec![];
397
398 let mut cursor = self.small_queue.cursor();
399 loop {
400 cursor.move_next();
401 match cursor.clone_pointer() {
402 Some(record) => small.push(record),
403 None => break,
404 }
405 }
406
407 let mut cursor = self.main_queue.cursor();
408 loop {
409 cursor.move_next();
410 match cursor.clone_pointer() {
411 Some(record) => main.push(record),
412 None => break,
413 }
414 }
415
416 vec![small, main]
417 }
418 }
419
420 type TestS3Fifo = S3Fifo<u64, u64, TestProperties>;
421
422 fn assert_frequencies(rs: &[Arc<Record<TestS3Fifo>>], range: Range<usize>, count: u8) {
423 rs[range]
424 .iter()
425 .for_each(|r| assert_eq!(unsafe { &*r.state().get() }.frequency(), count));
426 }
427
428 #[test]
429 fn test_s3fifo() {
430 let rs = (0..100)
431 .map(|i| {
432 Arc::new(Record::new(Data {
433 key: i,
434 value: i,
435 properties: TestProperties::default(),
436 hash: i,
437 weight: 1,
438 }))
439 })
440 .collect_vec();
441 let r = |i: usize| rs[i].clone();
442
443 let config = S3FifoConfig {
445 small_queue_capacity_ratio: 0.25,
446 ghost_queue_capacity_ratio: 10.0,
447 small_to_main_freq_threshold: 2,
448 };
449 let mut s3fifo = TestS3Fifo::new(8, &config);
450
451 assert_eq!(s3fifo.small_weight_capacity, 2);
452
453 s3fifo.push(r(0));
454 s3fifo.push(r(1));
455 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1)], vec![]]);
456
457 s3fifo.push(r(2));
458 s3fifo.push(r(3));
459 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![r(0), r(1), r(2), r(3)], vec![]]);
460
461 assert_frequencies(&rs, 0..4, 0);
462
463 (0..4).for_each(|i| s3fifo.acquire_immutable(&rs[i]));
464 s3fifo.acquire_immutable(&rs[1]);
465 s3fifo.acquire_immutable(&rs[2]);
466 assert_frequencies(&rs, 0..1, 1);
467 assert_frequencies(&rs, 1..3, 2);
468 assert_frequencies(&rs, 3..4, 1);
469
470 let r0 = s3fifo.pop().unwrap();
471 let r3 = s3fifo.pop().unwrap();
472 assert_ptr_eq(&rs[0], &r0);
473 assert_ptr_eq(&rs[3], &r3);
474 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(1), r(2)]]);
475 assert_frequencies(&rs, 0..1, 0);
476 assert_frequencies(&rs, 1..3, 2);
477 assert_frequencies(&rs, 3..4, 0);
478
479 let r1 = s3fifo.pop().unwrap();
480 assert_ptr_eq(&rs[1], &r1);
481 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![r(2)]]);
482 assert_frequencies(&rs, 0..4, 0);
483
484 s3fifo.clear();
485 assert_ptr_vec_vec_eq(s3fifo.dump(), vec![vec![], vec![]]);
486 }
487}