1use std::{mem::offset_of, sync::Arc};
16
17use cmsketch::CMSketchU16;
18use foyer_common::{
19 code::{Key, Value},
20 error::{Error, ErrorKind, Result},
21 properties::Properties,
22 strict_assert, strict_assert_eq, strict_assert_ne,
23};
24use intrusive_collections::{intrusive_adapter, LinkedList, LinkedListAtomicLink};
25use serde::{Deserialize, Serialize};
26
27use super::{Eviction, Op};
28use crate::record::Record;
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct LfuConfig {
33 pub window_capacity_ratio: f64,
39 pub protected_capacity_ratio: f64,
45
46 pub cmsketch_eps: f64,
50
51 pub cmsketch_confidence: f64,
55}
56
57impl Default for LfuConfig {
58 fn default() -> Self {
59 Self {
60 window_capacity_ratio: 0.1,
61 protected_capacity_ratio: 0.8,
62 cmsketch_eps: 0.001,
63 cmsketch_confidence: 0.9,
64 }
65 }
66}
67
68#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)]
69enum Queue {
70 #[default]
71 None,
72 Window,
73 Probation,
74 Protected,
75}
76
77#[derive(Debug, Default)]
79pub struct LfuState {
80 link: LinkedListAtomicLink,
81 queue: Queue,
82}
83
84intrusive_adapter! { Adapter<K, V, P> = Arc<Record<Lfu<K, V, P>>>: Record<Lfu<K, V, P>> { ?offset = Record::<Lfu<K, V, P>>::STATE_OFFSET + offset_of!(LfuState, link) => LinkedListAtomicLink } where K: Key, V: Value, P: Properties }
85
86pub struct Lfu<K, V, P>
99where
100 K: Key,
101 V: Value,
102 P: Properties,
103{
104 window: LinkedList<Adapter<K, V, P>>,
105 probation: LinkedList<Adapter<K, V, P>>,
106 protected: LinkedList<Adapter<K, V, P>>,
107
108 window_weight: usize,
109 probation_weight: usize,
110 protected_weight: usize,
111
112 window_weight_capacity: usize,
113 protected_weight_capacity: usize,
114
115 frequencies: CMSketchU16,
117
118 step: usize,
119 decay: usize,
120
121 config: LfuConfig,
122}
123
124impl<K, V, P> Lfu<K, V, P>
125where
126 K: Key,
127 V: Value,
128 P: Properties,
129{
130 fn increase_queue_weight(&mut self, queue: Queue, weight: usize) {
131 match queue {
132 Queue::None => unreachable!(),
133 Queue::Window => self.window_weight += weight,
134 Queue::Probation => self.probation_weight += weight,
135 Queue::Protected => self.protected_weight += weight,
136 }
137 }
138
139 fn decrease_queue_weight(&mut self, queue: Queue, weight: usize) {
140 match queue {
141 Queue::None => unreachable!(),
142 Queue::Window => self.window_weight -= weight,
143 Queue::Probation => self.probation_weight -= weight,
144 Queue::Protected => self.protected_weight -= weight,
145 }
146 }
147
148 fn update_frequencies(&mut self, hash: u64) {
149 self.frequencies.inc(hash);
150 self.step += 1;
151 if self.step >= self.decay {
152 self.step >>= 1;
153 self.frequencies.halve();
154 }
155 }
156}
157
158impl<K, V, P> Eviction for Lfu<K, V, P>
159where
160 K: Key,
161 V: Value,
162 P: Properties,
163{
164 type Config = LfuConfig;
165 type Key = K;
166 type Value = V;
167 type Properties = P;
168 type State = LfuState;
169
170 fn new(capacity: usize, config: &Self::Config) -> Self
171 where
172 Self: Sized,
173 {
174 assert!(
175 config.window_capacity_ratio > 0.0 && config.window_capacity_ratio < 1.0,
176 "window_capacity_ratio must be in (0, 1), given: {}",
177 config.window_capacity_ratio
178 );
179
180 assert!(
181 config.protected_capacity_ratio > 0.0 && config.protected_capacity_ratio < 1.0,
182 "protected_capacity_ratio must be in (0, 1), given: {}",
183 config.protected_capacity_ratio
184 );
185
186 assert!(
187 config.window_capacity_ratio + config.protected_capacity_ratio < 1.0,
188 "must guarantee: window_capacity_ratio + protected_capacity_ratio < 1, given: {}",
189 config.window_capacity_ratio + config.protected_capacity_ratio
190 );
191
192 let config = config.clone();
193
194 let window_weight_capacity = (capacity as f64 * config.window_capacity_ratio) as usize;
195 let protected_weight_capacity = (capacity as f64 * config.protected_capacity_ratio) as usize;
196 let frequencies = CMSketchU16::new(config.cmsketch_eps, config.cmsketch_confidence);
197 let decay = frequencies.width();
198
199 Self {
200 window: LinkedList::new(Adapter::new()),
201 probation: LinkedList::new(Adapter::new()),
202 protected: LinkedList::new(Adapter::new()),
203 window_weight: 0,
204 probation_weight: 0,
205 protected_weight: 0,
206 window_weight_capacity,
207 protected_weight_capacity,
208 frequencies,
209 step: 0,
210 decay,
211 config,
212 }
213 }
214
215 fn update(&mut self, capacity: usize, config: Option<&Self::Config>) -> Result<()> {
216 if let Some(config) = config {
217 let mut reasons = vec![];
218 if config.window_capacity_ratio <= 0.0 || config.window_capacity_ratio >= 1.0 {
219 reasons.push(format!(
220 "window_capacity_ratio must be in (0, 1), given: {}, new config ignored",
221 config.window_capacity_ratio
222 ));
223 }
224 if config.protected_capacity_ratio <= 0.0 || config.protected_capacity_ratio >= 1.0 {
225 reasons.push(format!(
226 "protected_capacity_ratio must be in (0, 1), given: {}, new config ignored",
227 config.protected_capacity_ratio
228 ));
229 }
230 if config.window_capacity_ratio + config.protected_capacity_ratio >= 1.0 {
231 reasons.push(format!(
232 "must guarantee: window_capacity_ratio + protected_capacity_ratio < 1, given: {}, new config ignored",
233 config.window_capacity_ratio + config.protected_capacity_ratio
234 ));
235 }
236
237 if !reasons.is_empty() {
238 let mut e = Error::new(ErrorKind::Config, "update LFU config failed");
239 for reason in reasons.iter() {
240 e = e.with_context("reason", reason);
241 }
242 return Err(e);
243 }
244
245 self.config = config.clone();
246 }
247
248 let window_weight_capacity = (capacity as f64 * self.config.window_capacity_ratio) as usize;
251 let protected_weight_capacity = (capacity as f64 * self.config.protected_capacity_ratio) as usize;
252
253 self.window_weight_capacity = window_weight_capacity;
254 self.protected_weight_capacity = protected_weight_capacity;
255
256 Ok(())
257 }
258
259 fn push(&mut self, record: Arc<Record<Self>>) {
263 let state = unsafe { &mut *record.state().get() };
264
265 strict_assert!(!state.link.is_linked());
266 strict_assert!(!record.is_in_eviction());
267 strict_assert_eq!(state.queue, Queue::None);
268
269 record.set_in_eviction(true);
270 state.queue = Queue::Window;
271 self.increase_queue_weight(Queue::Window, record.weight());
272 self.update_frequencies(record.hash());
273 self.window.push_back(record);
274
275 while self.window_weight > self.window_weight_capacity {
277 strict_assert!(!self.window.is_empty());
278 let r = self.window.pop_front().unwrap();
279 let s = unsafe { &mut *r.state().get() };
280 self.decrease_queue_weight(Queue::Window, r.weight());
281 s.queue = Queue::Probation;
282 self.increase_queue_weight(Queue::Probation, r.weight());
283 self.probation.push_back(r);
284 }
285 }
286
287 fn pop(&mut self) -> Option<Arc<Record<Self>>> {
288 let mut cw = self.window.front_mut();
291 let mut cp = self.probation.front_mut();
292 let record = match (cw.get(), cp.get()) {
293 (None, None) => None,
294 (None, Some(_)) => cp.remove(),
295 (Some(_), None) => cw.remove(),
296 (Some(w), Some(p)) => {
297 if self.frequencies.estimate(w.hash()) < self.frequencies.estimate(p.hash()) {
298 cw.remove()
299
300 } else {
303 cp.remove()
304 }
305 }
306 }
307 .or_else(|| self.protected.pop_front())?;
308
309 let state = unsafe { &mut *record.state().get() };
310
311 strict_assert!(!state.link.is_linked());
312 strict_assert!(record.is_in_eviction());
313 strict_assert_ne!(state.queue, Queue::None);
314
315 self.decrease_queue_weight(state.queue, record.weight());
316 state.queue = Queue::None;
317 record.set_in_eviction(false);
318
319 Some(record)
320 }
321
322 fn remove(&mut self, record: &Arc<Record<Self>>) {
323 let state = unsafe { &mut *record.state().get() };
324
325 strict_assert!(state.link.is_linked());
326 strict_assert!(record.is_in_eviction());
327 strict_assert_ne!(state.queue, Queue::None);
328
329 match state.queue {
330 Queue::None => unreachable!(),
331 Queue::Window => unsafe { self.window.remove_from_ptr(Arc::as_ptr(record)) },
332 Queue::Probation => unsafe { self.probation.remove_from_ptr(Arc::as_ptr(record)) },
333 Queue::Protected => unsafe { self.protected.remove_from_ptr(Arc::as_ptr(record)) },
334 };
335
336 strict_assert!(!state.link.is_linked());
337
338 self.decrease_queue_weight(state.queue, record.weight());
339 state.queue = Queue::None;
340 record.set_in_eviction(false);
341 }
342
343 fn clear(&mut self) {
344 while let Some(record) = self.pop() {
345 let state = unsafe { &*record.state().get() };
346 strict_assert!(!record.is_in_eviction());
347 strict_assert!(!state.link.is_linked());
348 strict_assert_eq!(state.queue, Queue::None);
349 }
350 }
351
352 fn acquire() -> Op<Self> {
353 Op::mutable(|this: &mut Self, record| {
354 this.update_frequencies(record.hash());
356
357 if !record.is_in_eviction() {
358 return;
359 }
360
361 let state = unsafe { &mut *record.state().get() };
362
363 strict_assert!(state.link.is_linked());
364
365 match state.queue {
366 Queue::None => unreachable!(),
367 Queue::Window => {
368 let r = unsafe { this.window.remove_from_ptr(Arc::as_ptr(record)) };
370 this.window.push_back(r);
371 }
372 Queue::Probation => {
373 let r = unsafe { this.probation.remove_from_ptr(Arc::as_ptr(record)) };
375 this.decrease_queue_weight(Queue::Probation, record.weight());
376 state.queue = Queue::Protected;
377 this.increase_queue_weight(Queue::Protected, record.weight());
378 this.protected.push_back(r);
379
380 while this.protected_weight > this.protected_weight_capacity {
382 strict_assert!(!this.protected.is_empty());
383 let r = this.protected.pop_front().unwrap();
384 let s = unsafe { &mut *r.state().get() };
385 this.decrease_queue_weight(Queue::Protected, r.weight());
386 s.queue = Queue::Probation;
387 this.increase_queue_weight(Queue::Probation, r.weight());
388 this.probation.push_back(r);
389 }
390 }
391 Queue::Protected => {
392 let r = unsafe { this.protected.remove_from_ptr(Arc::as_ptr(record)) };
394 this.protected.push_back(r);
395 }
396 }
397 })
398 }
399
400 fn release() -> Op<Self> {
401 Op::noop()
402 }
403}
404
405#[cfg(test)]
406mod tests {
407
408 use itertools::Itertools;
409
410 use super::*;
411 use crate::{
412 eviction::test_utils::{assert_ptr_eq, assert_ptr_vec_vec_eq, Dump, OpExt, TestProperties},
413 record::Data,
414 };
415
416 impl<K, V> Dump for Lfu<K, V, TestProperties>
417 where
418 K: Key + Clone,
419 V: Value + Clone,
420 {
421 type Output = Vec<Vec<Arc<Record<Self>>>>;
422 fn dump(&self) -> Self::Output {
423 let mut window = vec![];
424 let mut probation = vec![];
425 let mut protected = vec![];
426
427 let mut cursor = self.window.cursor();
428 loop {
429 cursor.move_next();
430 match cursor.clone_pointer() {
431 Some(record) => window.push(record),
432 None => break,
433 }
434 }
435
436 let mut cursor = self.probation.cursor();
437 loop {
438 cursor.move_next();
439 match cursor.clone_pointer() {
440 Some(record) => probation.push(record),
441 None => break,
442 }
443 }
444
445 let mut cursor = self.protected.cursor();
446 loop {
447 cursor.move_next();
448 match cursor.clone_pointer() {
449 Some(record) => protected.push(record),
450 None => break,
451 }
452 }
453
454 vec![window, probation, protected]
455 }
456 }
457
458 type TestLfu = Lfu<u64, u64, TestProperties>;
459
460 #[test]
461 fn test_lfu() {
462 let rs = (0..100)
463 .map(|i| {
464 Arc::new(Record::new(Data {
465 key: i,
466 value: i,
467 properties: TestProperties::default(),
468 hash: i,
469 weight: 1,
470 }))
471 })
472 .collect_vec();
473 let r = |i: usize| rs[i].clone();
474
475 let config = LfuConfig {
477 window_capacity_ratio: 0.2,
478 protected_capacity_ratio: 0.6,
479 cmsketch_eps: 0.01,
480 cmsketch_confidence: 0.95,
481 };
482 let mut lfu = TestLfu::new(10, &config);
483
484 assert_eq!(lfu.window_weight_capacity, 2);
485 assert_eq!(lfu.protected_weight_capacity, 6);
486
487 lfu.push(r(0));
488 lfu.push(r(1));
489 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![r(0), r(1)], vec![], vec![]]);
490
491 lfu.push(r(2));
492 lfu.push(r(3));
493 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![r(2), r(3)], vec![r(0), r(1)], vec![]]);
494
495 (4..10).for_each(|i| lfu.push(r(i)));
496 assert_ptr_vec_vec_eq(
497 lfu.dump(),
498 vec![
499 vec![r(8), r(9)],
500 vec![r(0), r(1), r(2), r(3), r(4), r(5), r(6), r(7)],
501 vec![],
502 ],
503 );
504
505 let r0 = lfu.pop().unwrap();
507 assert_ptr_eq(&rs[0], &r0);
508
509 lfu.push(r(0));
511 assert_ptr_vec_vec_eq(
512 lfu.dump(),
513 vec![
514 vec![r(9), r(0)],
515 vec![r(1), r(2), r(3), r(4), r(5), r(6), r(7), r(8)],
516 vec![],
517 ],
518 );
519
520 lfu.acquire_mutable(&rs[9]);
522 assert_ptr_vec_vec_eq(
523 lfu.dump(),
524 vec![
525 vec![r(0), r(9)],
526 vec![r(1), r(2), r(3), r(4), r(5), r(6), r(7), r(8)],
527 vec![],
528 ],
529 );
530
531 (3..7).for_each(|i| lfu.acquire_mutable(&rs[i]));
533 assert_ptr_vec_vec_eq(
534 lfu.dump(),
535 vec![
536 vec![r(0), r(9)],
537 vec![r(1), r(2), r(7), r(8)],
538 vec![r(3), r(4), r(5), r(6)],
539 ],
540 );
541
542 (3..5).for_each(|i| lfu.acquire_mutable(&rs[i]));
544 assert_ptr_vec_vec_eq(
545 lfu.dump(),
546 vec![
547 vec![r(0), r(9)],
548 vec![r(1), r(2), r(7), r(8)],
549 vec![r(5), r(6), r(3), r(4)],
550 ],
551 );
552
553 [1, 2, 7, 8].into_iter().for_each(|i| lfu.acquire_mutable(&rs[i]));
555 assert_ptr_vec_vec_eq(
556 lfu.dump(),
557 vec![
558 vec![r(0), r(9)],
559 vec![r(5), r(6)],
560 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
561 ],
562 );
563
564 let r5 = lfu.pop().unwrap();
566 assert_ptr_eq(&rs[5], &r5);
567 assert_ptr_vec_vec_eq(
568 lfu.dump(),
569 vec![vec![r(0), r(9)], vec![r(6)], vec![r(3), r(4), r(1), r(2), r(7), r(8)]],
570 );
571
572 (10..13).for_each(|i| lfu.push(r(i)));
574 assert_ptr_vec_vec_eq(
575 lfu.dump(),
576 vec![
577 vec![r(11), r(12)],
578 vec![r(6), r(0), r(9), r(10)],
579 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
580 ],
581 );
582
583 (0..10).for_each(|_| lfu.acquire_mutable(&rs[0]));
586 assert_ptr_vec_vec_eq(
587 lfu.dump(),
588 vec![
589 vec![r(11), r(12)],
590 vec![r(6), r(9), r(10), r(3)],
591 vec![r(4), r(1), r(2), r(7), r(8), r(0)],
592 ],
593 );
594
595 lfu.acquire_mutable(&rs[6]);
598 lfu.acquire_mutable(&rs[9]);
599 lfu.acquire_mutable(&rs[10]);
600 lfu.acquire_mutable(&rs[3]);
601 lfu.acquire_mutable(&rs[4]);
602 lfu.acquire_mutable(&rs[1]);
603 lfu.acquire_mutable(&rs[2]);
604 lfu.acquire_mutable(&rs[7]);
605 lfu.acquire_mutable(&rs[8]);
606 assert_ptr_vec_vec_eq(
607 lfu.dump(),
608 vec![
609 vec![r(11), r(12)],
610 vec![r(0), r(6), r(9), r(10)],
611 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
612 ],
613 );
614
615 assert!(lfu.frequencies.estimate(0) > lfu.frequencies.estimate(11));
618 assert!(lfu.frequencies.estimate(0) > lfu.frequencies.estimate(12));
619 let r11 = lfu.pop().unwrap();
620 let r12 = lfu.pop().unwrap();
621 assert_ptr_eq(&rs[11], &r11);
622 assert_ptr_eq(&rs[12], &r12);
623 assert_ptr_vec_vec_eq(
624 lfu.dump(),
625 vec![
626 vec![],
627 vec![r(0), r(6), r(9), r(10)],
628 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
629 ],
630 );
631
632 let r0 = lfu.pop().unwrap();
635 assert_ptr_eq(&rs[0], &r0);
636 assert_ptr_vec_vec_eq(
637 lfu.dump(),
638 vec![
639 vec![],
640 vec![r(6), r(9), r(10)],
641 vec![r(3), r(4), r(1), r(2), r(7), r(8)],
642 ],
643 );
644
645 lfu.clear();
646 assert_ptr_vec_vec_eq(lfu.dump(), vec![vec![], vec![], vec![]]);
647 }
648}