1use std::{mem::offset_of, sync::Arc};
16
17use cmsketch::CMSketchU16;
18use foyer_common::{
19 code::{Key, Value},
20 strict_assert, strict_assert_eq, strict_assert_ne,
21};
22use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
23use serde::{Deserialize, Serialize};
24
25use super::{Eviction, Op};
26use crate::{
27 error::{Error, Result},
28 record::{CacheHint, Record},
29};
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
33pub struct LfuConfig {
34 pub window_capacity_ratio: f64,
40 pub protected_capacity_ratio: f64,
46
47 pub cmsketch_eps: f64,
51
52 pub cmsketch_confidence: f64,
56}
57
58impl Default for LfuConfig {
59 fn default() -> Self {
60 Self {
61 window_capacity_ratio: 0.1,
62 protected_capacity_ratio: 0.8,
63 cmsketch_eps: 0.001,
64 cmsketch_confidence: 0.9,
65 }
66 }
67}
68
69#[derive(Debug, Clone, Default)]
71pub struct LfuHint;
72
73impl From<CacheHint> for LfuHint {
74 fn from(_: CacheHint) -> Self {
75 LfuHint
76 }
77}
78
79impl From<LfuHint> for CacheHint {
80 fn from(_: LfuHint) -> Self {
81 CacheHint::Normal
82 }
83}
84
85#[derive(Debug, PartialEq, Eq, Clone, Copy)]
86enum Queue {
87 None,
88 Window,
89 Probation,
90 Protected,
91}
92
93impl Default for Queue {
94 fn default() -> Self {
95 Self::None
96 }
97}
98
99#[derive(Debug, Default)]
101pub struct LfuState {
102 link: LinkedListAtomicLink,
103 queue: Queue,
104}
105
106intrusive_adapter! { Adapter<K, V> = Arc<Record<Lfu<K, V>>>: Record<Lfu<K, V>> { ?offset = Record::<Lfu<K, V>>::STATE_OFFSET + offset_of!(LfuState, link) => LinkedListAtomicLink } where K: Key, V: Value }
107
108pub struct Lfu<K, V>
121where
122 K: Key,
123 V: Value,
124{
125 window: LinkedList<Adapter<K, V>>,
126 probation: LinkedList<Adapter<K, V>>,
127 protected: LinkedList<Adapter<K, V>>,
128
129 window_weight: usize,
130 probation_weight: usize,
131 protected_weight: usize,
132
133 window_weight_capacity: usize,
134 protected_weight_capacity: usize,
135
136 frequencies: CMSketchU16,
138
139 step: usize,
140 decay: usize,
141
142 config: LfuConfig,
143}
144
145impl<K, V> Lfu<K, V>
146where
147 K: Key,
148 V: Value,
149{
150 fn increase_queue_weight(&mut self, queue: Queue, weight: usize) {
151 match queue {
152 Queue::None => unreachable!(),
153 Queue::Window => self.window_weight += weight,
154 Queue::Probation => self.probation_weight += weight,
155 Queue::Protected => self.protected_weight += weight,
156 }
157 }
158
159 fn decrease_queue_weight(&mut self, queue: Queue, weight: usize) {
160 match queue {
161 Queue::None => unreachable!(),
162 Queue::Window => self.window_weight -= weight,
163 Queue::Probation => self.probation_weight -= weight,
164 Queue::Protected => self.protected_weight -= weight,
165 }
166 }
167
168 fn update_frequencies(&mut self, hash: u64) {
169 self.frequencies.inc(hash);
170 self.step += 1;
171 if self.step >= self.decay {
172 self.step >>= 1;
173 self.frequencies.halve();
174 }
175 }
176}
177
178impl<K, V> Eviction for Lfu<K, V>
179where
180 K: Key,
181 V: Value,
182{
183 type Config = LfuConfig;
184 type Key = K;
185 type Value = V;
186 type Hint = LfuHint;
187 type State = LfuState;
188
189 fn new(capacity: usize, config: &Self::Config) -> Self
190 where
191 Self: Sized,
192 {
193 assert!(
194 config.window_capacity_ratio > 0.0 && config.window_capacity_ratio < 1.0,
195 "window_capacity_ratio must be in (0, 1), given: {}",
196 config.window_capacity_ratio
197 );
198
199 assert!(
200 config.protected_capacity_ratio > 0.0 && config.protected_capacity_ratio < 1.0,
201 "protected_capacity_ratio must be in (0, 1), given: {}",
202 config.protected_capacity_ratio
203 );
204
205 assert!(
206 config.window_capacity_ratio + config.protected_capacity_ratio < 1.0,
207 "must guarantee: window_capacity_ratio + protected_capacity_ratio < 1, given: {}",
208 config.window_capacity_ratio + config.protected_capacity_ratio
209 );
210
211 let config = config.clone();
212
213 let window_weight_capacity = (capacity as f64 * config.window_capacity_ratio) as usize;
214 let protected_weight_capacity = (capacity as f64 * config.protected_capacity_ratio) as usize;
215 let frequencies = CMSketchU16::new(config.cmsketch_eps, config.cmsketch_confidence);
216 let decay = frequencies.width();
217
218 Self {
219 window: LinkedList::new(Adapter::new()),
220 probation: LinkedList::new(Adapter::new()),
221 protected: LinkedList::new(Adapter::new()),
222 window_weight: 0,
223 probation_weight: 0,
224 protected_weight: 0,
225 window_weight_capacity,
226 protected_weight_capacity,
227 frequencies,
228 step: 0,
229 decay,
230 config,
231 }
232 }
233
234 fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
235 if let Some(config) = config {
236 let mut msgs = vec![];
237 if config.window_capacity_ratio <= 0.0 || config.window_capacity_ratio >= 1.0 {
238 msgs.push(format!(
239 "window_capacity_ratio must be in (0, 1), given: {}, new config ignored",
240 config.window_capacity_ratio
241 ));
242 }
243 if config.protected_capacity_ratio <= 0.0 || config.protected_capacity_ratio >= 1.0 {
244 msgs.push(format!(
245 "protected_capacity_ratio must be in (0, 1), given: {}, new config ignored",
246 config.protected_capacity_ratio
247 ));
248 }
249 if config.window_capacity_ratio + config.protected_capacity_ratio >= 1.0 {
250 msgs.push(format!(
251 "must guarantee: window_capacity_ratio + protected_capacity_ratio < 1, given: {}, new config ignored",
252 config.window_capacity_ratio + config.protected_capacity_ratio
253 ));
254 }
255
256 if !msgs.is_empty() {
257 return Err(Error::ConfigError(msgs.join(" | ")));
258 }
259
260 self.config = config.clone();
261 }
262
263 let window_weight_capacity = (capacity as f64 * self.config.window_capacity_ratio) as usize;
266 let protected_weight_capacity = (capacity as f64 * self.config.protected_capacity_ratio) as usize;
267
268 self.window_weight_capacity = window_weight_capacity;
269 self.protected_weight_capacity = protected_weight_capacity;
270
271 Ok(())
272 }
273
274 fn push(&mut self, record: Arc<Record<Self>>) {
278 let state = unsafe { &mut *record.state().get() };
279
280 strict_assert!(!state.link.is_linked());
281 strict_assert!(!record.is_in_eviction());
282 strict_assert_eq!(state.queue, Queue::None);
283
284 record.set_in_eviction(true);
285 state.queue = Queue::Window;
286 self.increase_queue_weight(Queue::Window, record.weight());
287 self.update_frequencies(record.hash());
288 self.window.push_back(record);
289
290 while self.window_weight > self.window_weight_capacity {
292 strict_assert!(!self.window.is_empty());
293 let r = self.window.pop_front().unwrap();
294 let s = unsafe { &mut *r.state().get() };
295 self.decrease_queue_weight(Queue::Window, r.weight());
296 s.queue = Queue::Probation;
297 self.increase_queue_weight(Queue::Probation, r.weight());
298 self.probation.push_back(r);
299 }
300 }
301
302 fn pop(&mut self) -> Option<Arc<Record<Self>>> {
303 let mut cw = self.window.front_mut();
306 let mut cp = self.probation.front_mut();
307 let record = match (cw.get(), cp.get()) {
308 (None, None) => None,
309 (None, Some(_)) => cp.remove(),
310 (Some(_), None) => cw.remove(),
311 (Some(w), Some(p)) => {
312 if self.frequencies.estimate(w.hash()) < self.frequencies.estimate(p.hash()) {
313 cw.remove()
314
315 } else {
318 cp.remove()
319 }
320 }
321 }
322 .or_else(|| self.protected.pop_front())?;
323
324 let state = unsafe { &mut *record.state().get() };
325
326 strict_assert!(!state.link.is_linked());
327 strict_assert!(record.is_in_eviction());
328 strict_assert_ne!(state.queue, Queue::None);
329
330 self.decrease_queue_weight(state.queue, record.weight());
331 state.queue = Queue::None;
332 record.set_in_eviction(false);
333
334 Some(record)
335 }
336
337 fn remove(&mut self, record: &Arc<Record<Self>>) {
338 let state = unsafe { &mut *record.state().get() };
339
340 strict_assert!(state.link.is_linked());
341 strict_assert!(record.is_in_eviction());
342 strict_assert_ne!(state.queue, Queue::None);
343
344 match state.queue {
345 Queue::None => unreachable!(),
346 Queue::Window => unsafe { self.window.remove_from_ptr(Arc::as_ptr(record)) },
347 Queue::Probation => unsafe { self.probation.remove_from_ptr(Arc::as_ptr(record)) },
348 Queue::Protected => unsafe { self.protected.remove_from_ptr(Arc::as_ptr(record)) },
349 };
350
351 strict_assert!(!state.link.is_linked());
352
353 self.decrease_queue_weight(state.queue, record.weight());
354 state.queue = Queue::None;
355 record.set_in_eviction(false);
356 }
357
358 fn clear(&mut self) {
359 while let Some(record) = self.pop() {
360 let state = unsafe { &*record.state().get() };
361 strict_assert!(!record.is_in_eviction());
362 strict_assert!(!state.link.is_linked());
363 strict_assert_eq!(state.queue, Queue::None);
364 }
365 }
366
367 fn acquire() -> Op<Self> {
368 Op::mutable(|this: &mut Self, record| {
369 this.update_frequencies(record.hash());
371
372 if !record.is_in_eviction() {
373 return;
374 }
375
376 let state = unsafe { &mut *record.state().get() };
377
378 strict_assert!(state.link.is_linked());
379
380 match state.queue {
381 Queue::None => unreachable!(),
382 Queue::Window => {
383 let r = unsafe { this.window.remove_from_ptr(Arc::as_ptr(record)) };
385 this.window.push_back(r);
386 }
387 Queue::Probation => {
388 let r = unsafe { this.probation.remove_from_ptr(Arc::as_ptr(record)) };
390 this.decrease_queue_weight(Queue::Probation, record.weight());
391 state.queue = Queue::Protected;
392 this.increase_queue_weight(Queue::Protected, record.weight());
393 this.protected.push_back(r);
394
395 while this.protected_weight > this.protected_weight_capacity {
397 strict_assert!(!this.protected.is_empty());
398 let r = this.protected.pop_front().unwrap();
399 let s = unsafe { &mut *r.state().get() };
400 this.decrease_queue_weight(Queue::Protected, r.weight());
401 s.queue = Queue::Probation;
402 this.increase_queue_weight(Queue::Probation, r.weight());
403 this.probation.push_back(r);
404 }
405 }
406 Queue::Protected => {
407 let r = unsafe { this.protected.remove_from_ptr(Arc::as_ptr(record)) };
409 this.protected.push_back(r);
410 }
411 }
412 })
413 }
414
415 fn release() -> Op<Self> {
416 Op::noop()
417 }
418}
419
420#[cfg(test)]
421mod tests {
422
423 use itertools::Itertools;
424
425 use super::*;
426 use crate::{
427 eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt},
428 record::Data,
429 };
430
431 impl<K, V> Dump for Lfu<K, V>
432 where
433 K: Key + Clone,
434 V: Value + Clone,
435 {
436 type Output = Vec<Vec<Arc<Record<Self>>>>;
437 fn dump(&self) -> Self::Output {
438 let mut window = vec![];
439 let mut probation = vec![];
440 let mut protected = vec![];
441
442 let mut cursor = self.window.cursor();
443 loop {
444 cursor.move_next();
445 match cursor.clone_pointer() {
446 Some(record) => window.push(record),
447 None => break,
448 }
449 }
450
451 let mut cursor = self.probation.cursor();
452 loop {
453 cursor.move_next();
454 match cursor.clone_pointer() {
455 Some(record) => probation.push(record),
456 None => break,
457 }
458 }
459
460 let mut cursor = self.protected.cursor();
461 loop {
462 cursor.move_next();
463 match cursor.clone_pointer() {
464 Some(record) => protected.push(record),
465 None => break,
466 }
467 }
468
469 vec![window, probation, protected]
470 }
471 }
472
473 type TestLfu = Lfu<u64, u64>;
474
475 #[test]
476 fn test_lfu() {
477 let rs = (0..100)
478 .map(|i| {
479 Arc::new(Record::new(Data {
480 key: i,
481 value: i,
482 hint: LfuHint,
483 hash: i,
484 weight: 1,
485 }))
486 })
487 .collect_vec();
488 let r = |i: usize| rs[i].clone();
489
490 let config = LfuConfig {
492 window_capacity_ratio: 0.2,
493 protected_capacity_ratio: 0.6,
494 cmsketch_eps: 0.01,
495 cmsketch_confidence: 0.95,
496 };
497 let mut lfu = TestLfu::new(10, &config);
498
499 assert_eq!(lfu.window_weight_capacity, 2);
500 assert_eq!(lfu.protected_weight_capacity, 6);
501
502 lfu.push(r(0));
503 lfu.push(r(1));
504 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![r(0), r(1)], vec![], vec![]]);
505
506 lfu.push(r(2));
507 lfu.push(r(3));
508 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![r(2), r(3)], vec![r(0), r(1)], vec![]]);
509
510 (4..10).for_each(|i| lfu.push(r(i)));
511 assert_ptr_vec_vec_eq(
512 lfu.dump(),
513 vec![
514 vec![r(8), r(9)],
515 vec![r(0), r(1), r(2), r(3), r(4), r(5), r(6), r(7)],
516 vec![],
517 ],
518 );
519
520 let r0 = lfu.pop().unwrap();
522 assert_ptr_eq(&rs[0], &r0);
523
524 lfu.push(r(0));
526 assert_ptr_vec_vec_eq(
527 lfu.dump(),
528 vec![
529 vec![r(9), r(0)],
530 vec![r(1), r(2), r(3), r(4), r(5), r(6), r(7), r(8)],
531 vec![],
532 ],
533 );
534
535 lfu.acquire_mutable(&rs[9]);
537 assert_ptr_vec_vec_eq(
538 lfu.dump(),
539 vec![
540 vec![r(0), r(9)],
541 vec![r(1), r(2), r(3), r(4), r(5), r(6), r(7), r(8)],
542 vec![],
543 ],
544 );
545
546 (3..7).for_each(|i| lfu.acquire_mutable(&rs[i]));
548 assert_ptr_vec_vec_eq(
549 lfu.dump(),
550 vec![
551 vec![r(0), r(9)],
552 vec![r(1), r(2), r(7), r(8)],
553 vec![r(3), r(4), r(5), r(6)],
554 ],
555 );
556
557 (3..5).for_each(|i| lfu.acquire_mutable(&rs[i]));
559 assert_ptr_vec_vec_eq(
560 lfu.dump(),
561 vec![
562 vec![r(0), r(9)],
563 vec![r(1), r(2), r(7), r(8)],
564 vec![r(5), r(6), r(3), r(4)],
565 ],
566 );
567
568 [1, 2, 7, 8].into_iter().for_each(|i| lfu.acquire_mutable(&rs[i]));
570 assert_ptr_vec_vec_eq(
571 lfu.dump(),
572 vec![
573 vec![r(0), r(9)],
574 vec![r(5), r(6)],
575 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
576 ],
577 );
578
579 let r5 = lfu.pop().unwrap();
581 assert_ptr_eq(&rs[5], &r5);
582 assert_ptr_vec_vec_eq(
583 lfu.dump(),
584 vec![vec![r(0), r(9)], vec![r(6)], vec![r(3), r(4), r(1), r(2), r(7), r(8)]],
585 );
586
587 (10..13).for_each(|i| lfu.push(r(i)));
589 assert_ptr_vec_vec_eq(
590 lfu.dump(),
591 vec![
592 vec![r(11), r(12)],
593 vec![r(6), r(0), r(9), r(10)],
594 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
595 ],
596 );
597
598 (0..10).for_each(|_| lfu.acquire_mutable(&rs[0]));
601 assert_ptr_vec_vec_eq(
602 lfu.dump(),
603 vec![
604 vec![r(11), r(12)],
605 vec![r(6), r(9), r(10), r(3)],
606 vec![r(4), r(1), r(2), r(7), r(8), r(0)],
607 ],
608 );
609
610 lfu.acquire_mutable(&rs[6]);
613 lfu.acquire_mutable(&rs[9]);
614 lfu.acquire_mutable(&rs[10]);
615 lfu.acquire_mutable(&rs[3]);
616 lfu.acquire_mutable(&rs[4]);
617 lfu.acquire_mutable(&rs[1]);
618 lfu.acquire_mutable(&rs[2]);
619 lfu.acquire_mutable(&rs[7]);
620 lfu.acquire_mutable(&rs[8]);
621 assert_ptr_vec_vec_eq(
622 lfu.dump(),
623 vec![
624 vec![r(11), r(12)],
625 vec![r(0), r(6), r(9), r(10)],
626 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
627 ],
628 );
629
630 assert!(lfu.frequencies.estimate(0) > lfu.frequencies.estimate(11));
633 assert!(lfu.frequencies.estimate(0) > lfu.frequencies.estimate(12));
634 let r11 = lfu.pop().unwrap();
635 let r12 = lfu.pop().unwrap();
636 assert_ptr_eq(&rs[11], &r11);
637 assert_ptr_eq(&rs[12], &r12);
638 assert_ptr_vec_vec_eq(
639 lfu.dump(),
640 vec![
641 vec![],
642 vec![r(0), r(6), r(9), r(10)],
643 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
644 ],
645 );
646
647 let r0 = lfu.pop().unwrap();
650 assert_ptr_eq(&rs[0], &r0);
651 assert_ptr_vec_vec_eq(
652 lfu.dump(),
653 vec![
654 vec![],
655 vec![r(6), r(9), r(10)],
656 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
657 ],
658 );
659
660 lfu.clear();
661 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![], vec![], vec![]]);
662 }
663}