1use std::{mem::offset_of, sync::Arc};
16
17use cmsketch::CMSketchU16;
18use foyer_common::{
19 code::{Key, Value},
20 properties::Properties,
21 strict_assert, strict_assert_eq, strict_assert_ne,
22};
23use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
24use serde::{Deserialize, Serialize};
25
26use super::{Eviction, Op};
27use crate::{
28 error::{Error, Result},
29 record::Record,
30};
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LfuConfig {
35 pub window_capacity_ratio: f64,
41 pub protected_capacity_ratio: f64,
47
48 pub cmsketch_eps: f64,
52
53 pub cmsketch_confidence: f64,
57}
58
59impl Default for LfuConfig {
60 fn default() -> Self {
61 Self {
62 window_capacity_ratio: 0.1,
63 protected_capacity_ratio: 0.8,
64 cmsketch_eps: 0.001,
65 cmsketch_confidence: 0.9,
66 }
67 }
68}
69
70#[derive(Debug, PartialEq, Eq, Clone, Copy)]
71enum Queue {
72 None,
73 Window,
74 Probation,
75 Protected,
76}
77
78impl Default for Queue {
79 fn default() -> Self {
80 Self::None
81 }
82}
83
84#[derive(Debug, Default)]
86pub struct LfuState {
87 link: LinkedListAtomicLink,
88 queue: Queue,
89}
90
91intrusive_adapter! { Adapter<K, V, P> = Arc<Record<Lfu<K, V, P>>>: Record<Lfu<K, V, P>> { ?offset = Record::<Lfu<K, V, P>>::STATE_OFFSET + offset_of!(LfuState, link) => LinkedListAtomicLink } where K: Key, V: Value, P: Properties }
92
93pub struct Lfu<K, V, P>
106where
107 K: Key,
108 V: Value,
109 P: Properties,
110{
111 window: LinkedList<Adapter<K, V, P>>,
112 probation: LinkedList<Adapter<K, V, P>>,
113 protected: LinkedList<Adapter<K, V, P>>,
114
115 window_weight: usize,
116 probation_weight: usize,
117 protected_weight: usize,
118
119 window_weight_capacity: usize,
120 protected_weight_capacity: usize,
121
122 frequencies: CMSketchU16,
124
125 step: usize,
126 decay: usize,
127
128 config: LfuConfig,
129}
130
131impl<K, V, P> Lfu<K, V, P>
132where
133 K: Key,
134 V: Value,
135 P: Properties,
136{
137 fn increase_queue_weight(&mut self, queue: Queue, weight: usize) {
138 match queue {
139 Queue::None => unreachable!(),
140 Queue::Window => self.window_weight += weight,
141 Queue::Probation => self.probation_weight += weight,
142 Queue::Protected => self.protected_weight += weight,
143 }
144 }
145
146 fn decrease_queue_weight(&mut self, queue: Queue, weight: usize) {
147 match queue {
148 Queue::None => unreachable!(),
149 Queue::Window => self.window_weight -= weight,
150 Queue::Probation => self.probation_weight -= weight,
151 Queue::Protected => self.protected_weight -= weight,
152 }
153 }
154
155 fn update_frequencies(&mut self, hash: u64) {
156 self.frequencies.inc(hash);
157 self.step += 1;
158 if self.step >= self.decay {
159 self.step >>= 1;
160 self.frequencies.halve();
161 }
162 }
163}
164
165impl<K, V, P> Eviction for Lfu<K, V, P>
166where
167 K: Key,
168 V: Value,
169 P: Properties,
170{
171 type Config = LfuConfig;
172 type Key = K;
173 type Value = V;
174 type Properties = P;
175 type State = LfuState;
176
177 fn new(capacity: usize, config: &Self::Config) -> Self
178 where
179 Self: Sized,
180 {
181 assert!(
182 config.window_capacity_ratio > 0.0 && config.window_capacity_ratio < 1.0,
183 "window_capacity_ratio must be in (0, 1), given: {}",
184 config.window_capacity_ratio
185 );
186
187 assert!(
188 config.protected_capacity_ratio > 0.0 && config.protected_capacity_ratio < 1.0,
189 "protected_capacity_ratio must be in (0, 1), given: {}",
190 config.protected_capacity_ratio
191 );
192
193 assert!(
194 config.window_capacity_ratio + config.protected_capacity_ratio < 1.0,
195 "must guarantee: window_capacity_ratio + protected_capacity_ratio < 1, given: {}",
196 config.window_capacity_ratio + config.protected_capacity_ratio
197 );
198
199 let config = config.clone();
200
201 let window_weight_capacity = (capacity as f64 * config.window_capacity_ratio) as usize;
202 let protected_weight_capacity = (capacity as f64 * config.protected_capacity_ratio) as usize;
203 let frequencies = CMSketchU16::new(config.cmsketch_eps, config.cmsketch_confidence);
204 let decay = frequencies.width();
205
206 Self {
207 window: LinkedList::new(Adapter::new()),
208 probation: LinkedList::new(Adapter::new()),
209 protected: LinkedList::new(Adapter::new()),
210 window_weight: 0,
211 probation_weight: 0,
212 protected_weight: 0,
213 window_weight_capacity,
214 protected_weight_capacity,
215 frequencies,
216 step: 0,
217 decay,
218 config,
219 }
220 }
221
222 fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
223 if let Some(config) = config {
224 let mut msgs = vec![];
225 if config.window_capacity_ratio <= 0.0 || config.window_capacity_ratio >= 1.0 {
226 msgs.push(format!(
227 "window_capacity_ratio must be in (0, 1), given: {}, new config ignored",
228 config.window_capacity_ratio
229 ));
230 }
231 if config.protected_capacity_ratio <= 0.0 || config.protected_capacity_ratio >= 1.0 {
232 msgs.push(format!(
233 "protected_capacity_ratio must be in (0, 1), given: {}, new config ignored",
234 config.protected_capacity_ratio
235 ));
236 }
237 if config.window_capacity_ratio + config.protected_capacity_ratio >= 1.0 {
238 msgs.push(format!(
239 "must guarantee: window_capacity_ratio + protected_capacity_ratio < 1, given: {}, new config ignored",
240 config.window_capacity_ratio + config.protected_capacity_ratio
241 ));
242 }
243
244 if !msgs.is_empty() {
245 return Err(Error::ConfigError(msgs.join(" | ")));
246 }
247
248 self.config = config.clone();
249 }
250
251 let window_weight_capacity = (capacity as f64 * self.config.window_capacity_ratio) as usize;
254 let protected_weight_capacity = (capacity as f64 * self.config.protected_capacity_ratio) as usize;
255
256 self.window_weight_capacity = window_weight_capacity;
257 self.protected_weight_capacity = protected_weight_capacity;
258
259 Ok(())
260 }
261
262 fn push(&mut self, record: Arc<Record<Self>>) {
266 let state = unsafe { &mut *record.state().get() };
267
268 strict_assert!(!state.link.is_linked());
269 strict_assert!(!record.is_in_eviction());
270 strict_assert_eq!(state.queue, Queue::None);
271
272 record.set_in_eviction(true);
273 state.queue = Queue::Window;
274 self.increase_queue_weight(Queue::Window, record.weight());
275 self.update_frequencies(record.hash());
276 self.window.push_back(record);
277
278 while self.window_weight > self.window_weight_capacity {
280 strict_assert!(!self.window.is_empty());
281 let r = self.window.pop_front().unwrap();
282 let s = unsafe { &mut *r.state().get() };
283 self.decrease_queue_weight(Queue::Window, r.weight());
284 s.queue = Queue::Probation;
285 self.increase_queue_weight(Queue::Probation, r.weight());
286 self.probation.push_back(r);
287 }
288 }
289
290 fn pop(&mut self) -> Option<Arc<Record<Self>>> {
291 let mut cw = self.window.front_mut();
294 let mut cp = self.probation.front_mut();
295 let record = match (cw.get(), cp.get()) {
296 (None, None) => None,
297 (None, Some(_)) => cp.remove(),
298 (Some(_), None) => cw.remove(),
299 (Some(w), Some(p)) => {
300 if self.frequencies.estimate(w.hash()) < self.frequencies.estimate(p.hash()) {
301 cw.remove()
302
303 } else {
306 cp.remove()
307 }
308 }
309 }
310 .or_else(|| self.protected.pop_front())?;
311
312 let state = unsafe { &mut *record.state().get() };
313
314 strict_assert!(!state.link.is_linked());
315 strict_assert!(record.is_in_eviction());
316 strict_assert_ne!(state.queue, Queue::None);
317
318 self.decrease_queue_weight(state.queue, record.weight());
319 state.queue = Queue::None;
320 record.set_in_eviction(false);
321
322 Some(record)
323 }
324
325 fn remove(&mut self, record: &Arc<Record<Self>>) {
326 let state = unsafe { &mut *record.state().get() };
327
328 strict_assert!(state.link.is_linked());
329 strict_assert!(record.is_in_eviction());
330 strict_assert_ne!(state.queue, Queue::None);
331
332 match state.queue {
333 Queue::None => unreachable!(),
334 Queue::Window => unsafe { self.window.remove_from_ptr(Arc::as_ptr(record)) },
335 Queue::Probation => unsafe { self.probation.remove_from_ptr(Arc::as_ptr(record)) },
336 Queue::Protected => unsafe { self.protected.remove_from_ptr(Arc::as_ptr(record)) },
337 };
338
339 strict_assert!(!state.link.is_linked());
340
341 self.decrease_queue_weight(state.queue, record.weight());
342 state.queue = Queue::None;
343 record.set_in_eviction(false);
344 }
345
346 fn clear(&mut self) {
347 while let Some(record) = self.pop() {
348 let state = unsafe { &*record.state().get() };
349 strict_assert!(!record.is_in_eviction());
350 strict_assert!(!state.link.is_linked());
351 strict_assert_eq!(state.queue, Queue::None);
352 }
353 }
354
355 fn acquire() -> Op<Self> {
356 Op::mutable(|this: &mut Self, record| {
357 this.update_frequencies(record.hash());
359
360 if !record.is_in_eviction() {
361 return;
362 }
363
364 let state = unsafe { &mut *record.state().get() };
365
366 strict_assert!(state.link.is_linked());
367
368 match state.queue {
369 Queue::None => unreachable!(),
370 Queue::Window => {
371 let r = unsafe { this.window.remove_from_ptr(Arc::as_ptr(record)) };
373 this.window.push_back(r);
374 }
375 Queue::Probation => {
376 let r = unsafe { this.probation.remove_from_ptr(Arc::as_ptr(record)) };
378 this.decrease_queue_weight(Queue::Probation, record.weight());
379 state.queue = Queue::Protected;
380 this.increase_queue_weight(Queue::Protected, record.weight());
381 this.protected.push_back(r);
382
383 while this.protected_weight > this.protected_weight_capacity {
385 strict_assert!(!this.protected.is_empty());
386 let r = this.protected.pop_front().unwrap();
387 let s = unsafe { &mut *r.state().get() };
388 this.decrease_queue_weight(Queue::Protected, r.weight());
389 s.queue = Queue::Probation;
390 this.increase_queue_weight(Queue::Probation, r.weight());
391 this.probation.push_back(r);
392 }
393 }
394 Queue::Protected => {
395 let r = unsafe { this.protected.remove_from_ptr(Arc::as_ptr(record)) };
397 this.protected.push_back(r);
398 }
399 }
400 })
401 }
402
403 fn release() -> Op<Self> {
404 Op::noop()
405 }
406}
407
408#[cfg(test)]
409mod tests {
410
411 use itertools::Itertools;
412
413 use super::*;
414 use crate::{
415 eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt, TestProperties},
416 record::Data,
417 };
418
419 impl<K, V> Dump for Lfu<K, V, TestProperties>
420 where
421 K: Key + Clone,
422 V: Value + Clone,
423 {
424 type Output = Vec<Vec<Arc<Record<Self>>>>;
425 fn dump(&self) -> Self::Output {
426 let mut window = vec![];
427 let mut probation = vec![];
428 let mut protected = vec![];
429
430 let mut cursor = self.window.cursor();
431 loop {
432 cursor.move_next();
433 match cursor.clone_pointer() {
434 Some(record) => window.push(record),
435 None => break,
436 }
437 }
438
439 let mut cursor = self.probation.cursor();
440 loop {
441 cursor.move_next();
442 match cursor.clone_pointer() {
443 Some(record) => probation.push(record),
444 None => break,
445 }
446 }
447
448 let mut cursor = self.protected.cursor();
449 loop {
450 cursor.move_next();
451 match cursor.clone_pointer() {
452 Some(record) => protected.push(record),
453 None => break,
454 }
455 }
456
457 vec![window, probation, protected]
458 }
459 }
460
461 type TestLfu = Lfu<u64, u64, TestProperties>;
462
463 #[test]
464 fn test_lfu() {
465 let rs = (0..100)
466 .map(|i| {
467 Arc::new(Record::new(Data {
468 key: i,
469 value: i,
470 properties: TestProperties::default(),
471 hash: i,
472 weight: 1,
473 }))
474 })
475 .collect_vec();
476 let r = |i: usize| rs[i].clone();
477
478 let config = LfuConfig {
480 window_capacity_ratio: 0.2,
481 protected_capacity_ratio: 0.6,
482 cmsketch_eps: 0.01,
483 cmsketch_confidence: 0.95,
484 };
485 let mut lfu = TestLfu::new(10, &config);
486
487 assert_eq!(lfu.window_weight_capacity, 2);
488 assert_eq!(lfu.protected_weight_capacity, 6);
489
490 lfu.push(r(0));
491 lfu.push(r(1));
492 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![r(0), r(1)], vec![], vec![]]);
493
494 lfu.push(r(2));
495 lfu.push(r(3));
496 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![r(2), r(3)], vec![r(0), r(1)], vec![]]);
497
498 (4..10).for_each(|i| lfu.push(r(i)));
499 assert_ptr_vec_vec_eq(
500 lfu.dump(),
501 vec![
502 vec![r(8), r(9)],
503 vec![r(0), r(1), r(2), r(3), r(4), r(5), r(6), r(7)],
504 vec![],
505 ],
506 );
507
508 let r0 = lfu.pop().unwrap();
510 assert_ptr_eq(&rs[0], &r0);
511
512 lfu.push(r(0));
514 assert_ptr_vec_vec_eq(
515 lfu.dump(),
516 vec![
517 vec![r(9), r(0)],
518 vec![r(1), r(2), r(3), r(4), r(5), r(6), r(7), r(8)],
519 vec![],
520 ],
521 );
522
523 lfu.acquire_mutable(&rs[9]);
525 assert_ptr_vec_vec_eq(
526 lfu.dump(),
527 vec![
528 vec![r(0), r(9)],
529 vec![r(1), r(2), r(3), r(4), r(5), r(6), r(7), r(8)],
530 vec![],
531 ],
532 );
533
534 (3..7).for_each(|i| lfu.acquire_mutable(&rs[i]));
536 assert_ptr_vec_vec_eq(
537 lfu.dump(),
538 vec![
539 vec![r(0), r(9)],
540 vec![r(1), r(2), r(7), r(8)],
541 vec![r(3), r(4), r(5), r(6)],
542 ],
543 );
544
545 (3..5).for_each(|i| lfu.acquire_mutable(&rs[i]));
547 assert_ptr_vec_vec_eq(
548 lfu.dump(),
549 vec![
550 vec![r(0), r(9)],
551 vec![r(1), r(2), r(7), r(8)],
552 vec![r(5), r(6), r(3), r(4)],
553 ],
554 );
555
556 [1, 2, 7, 8].into_iter().for_each(|i| lfu.acquire_mutable(&rs[i]));
558 assert_ptr_vec_vec_eq(
559 lfu.dump(),
560 vec![
561 vec![r(0), r(9)],
562 vec![r(5), r(6)],
563 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
564 ],
565 );
566
567 let r5 = lfu.pop().unwrap();
569 assert_ptr_eq(&rs[5], &r5);
570 assert_ptr_vec_vec_eq(
571 lfu.dump(),
572 vec![vec![r(0), r(9)], vec![r(6)], vec![r(3), r(4), r(1), r(2), r(7), r(8)]],
573 );
574
575 (10..13).for_each(|i| lfu.push(r(i)));
577 assert_ptr_vec_vec_eq(
578 lfu.dump(),
579 vec![
580 vec![r(11), r(12)],
581 vec![r(6), r(0), r(9), r(10)],
582 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
583 ],
584 );
585
586 (0..10).for_each(|_| lfu.acquire_mutable(&rs[0]));
589 assert_ptr_vec_vec_eq(
590 lfu.dump(),
591 vec![
592 vec![r(11), r(12)],
593 vec![r(6), r(9), r(10), r(3)],
594 vec![r(4), r(1), r(2), r(7), r(8), r(0)],
595 ],
596 );
597
598 lfu.acquire_mutable(&rs[6]);
601 lfu.acquire_mutable(&rs[9]);
602 lfu.acquire_mutable(&rs[10]);
603 lfu.acquire_mutable(&rs[3]);
604 lfu.acquire_mutable(&rs[4]);
605 lfu.acquire_mutable(&rs[1]);
606 lfu.acquire_mutable(&rs[2]);
607 lfu.acquire_mutable(&rs[7]);
608 lfu.acquire_mutable(&rs[8]);
609 assert_ptr_vec_vec_eq(
610 lfu.dump(),
611 vec![
612 vec![r(11), r(12)],
613 vec![r(0), r(6), r(9), r(10)],
614 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
615 ],
616 );
617
618 assert!(lfu.frequencies.estimate(0) > lfu.frequencies.estimate(11));
621 assert!(lfu.frequencies.estimate(0) > lfu.frequencies.estimate(12));
622 let r11 = lfu.pop().unwrap();
623 let r12 = lfu.pop().unwrap();
624 assert_ptr_eq(&rs[11], &r11);
625 assert_ptr_eq(&rs[12], &r12);
626 assert_ptr_vec_vec_eq(
627 lfu.dump(),
628 vec![
629 vec![],
630 vec![r(0), r(6), r(9), r(10)],
631 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
632 ],
633 );
634
635 let r0 = lfu.pop().unwrap();
638 assert_ptr_eq(&rs[0], &r0);
639 assert_ptr_vec_vec_eq(
640 lfu.dump(),
641 vec![
642 vec![],
643 vec![r(6), r(9), r(10)],
644 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
645 ],
646 );
647
648 lfu.clear();
649 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![], vec![], vec![]]);
650 }
651}