memory_cache_rust/
policy.rs

1use std::collections::HashMap;
2use std::marker::PhantomData;
3
4use parking_lot::Mutex;
5use seize::Guard;
6
7use crate::bloom::bbloom::Bloom;
8use crate::cache::{COST_ADD, Item, KEEP_GETS, KEY_UPDATE, Metrics, REJECT_SETS};
9use crate::cache::ItemFlag::ItemNew;
10use crate::cmsketch::CmSketch;
11use crate::reclaim::Atomic;
12use crate::store::Node;
13
14const LFU_SAMPLE: usize = 5;
15
16pub trait Policy {
17    fn push(&self, key: [u64]) -> bool;
18    // add attempts to add the key-cost pair to the Policy. It returns a slice
19    // of evicted keys and a bool denoting whether or not the key-cost pair
20    // was added. If it returns true, the key should be stored in cache.
21    fn add<T>(&self, key: u64, cost: i64) -> (Vec<Node<T>>, bool);
22    // Has returns true if the key exists in the Policy.
23    fn has(&self, key: u64) -> bool;
24    // Del deletes the key from the Policy.
25    fn del(&self, key: u64);
26    // Cap returns the available capacity.
27    fn cap(&self) -> i64;
28    // Close stops all goroutines and closes all channels.
29    fn close(&self);
30    // Update updates the cost value for the key.
31    fn update(&self, key: u64, cost: i64);
32    // Cost returns the cost value of a key or -1 if missing.
33    fn cost(&self, key: u64) -> i64;
34    // Optionally, set stats object to track how policy is performing.
35    fn collect_metrics(&self, metrics: &mut Metrics);
36    // Clear zeroes out all counters and clears hashmaps.
37    fn clear(&self);
38}
39
40pub struct DefaultPolicy<T> {
41    pub(crate) admit: TinyLFU,
42
43    pub(crate) evict: SampledLFU,
44    pub metrics: *const Metrics,
45    // pub(crate) flag: AtomicIsize,
46    number_counters: i64,
47    lock: Mutex<()>,
48    max_cost: i64,
49    _merker: PhantomData<T>,
50}
51
52
53impl<T> DefaultPolicy<T> {
54    pub(crate) fn new(number_counters: i64, max_cost: i64, metrics: *const Metrics) -> Self {
55     DefaultPolicy {
56            admit: TinyLFU::new(number_counters),
57
58            evict: SampledLFU::new(max_cost, metrics),
59            metrics: metrics,
60            // flag: AtomicIsize::new(0),
61            number_counters,
62            lock: Default::default(),
63            max_cost,
64            _merker: PhantomData,
65        }
66    }
67
68    pub fn push<'g>(&mut self, keys: Vec<u64>, guard: &'g Guard) -> bool {
69        if keys.len() == 0 {
70            return true;
71        }
72
73
74        // if self.flag.load(Ordering::SeqCst) == 0 {
75        //     self.flag.store(1, Ordering::SeqCst);
76            self.process_items(keys.clone(), guard);
77            let metrics = self.metrics;
78            if !metrics.is_null() {
79                unsafe {
80                    metrics.as_ref().unwrap().add(KEEP_GETS, keys[0], keys.len() as u64, guard)
81                };
82            }
83       /* } else {
84            let metrics = self.metrics;
85            if metrics.is_null() {
86                unsafe {
87                    metrics.as_ref().unwrap().add(DROP_GETS, keys[0], keys.len() as u64, guard)
88                };
89            }
90        }*/
91
92        /*select! {
93            send(self.item_ch.0,keys.clone())->res =>{
94                if !self.metrics.is_null() {
95                    unsafe{self.metrics.as_mut().unwrap().add(KEEP_GETS,keys[0],keys.len() as u64)};
96                    return true;
97                }
98
99            },
100            default=>{
101              if !self.metrics.is_null() {
102                    unsafe {self.metrics.as_mut().unwrap().add(KEEP_GETS,keys[0],keys.len() as u64)};
103                    return false;
104                }
105
106            }
107        }*/
108        return true;
109        // unsafe {
110        //     if !self.metrics.is_null() {
111        //         self.metrics.as_mut().unwrap().add(KEEP_GETS, keys[0], keys.len() as u64)
112        //     }
113        // };
114        // true
115    }
116    // pub fn collect_metrics(&mut self, metrics: *mut Metrics, guard: &Guard) {
117    //     self.metrics = self.metrics;
118    //
119    //     let mut evict = self.evict.load(Ordering::SeqCst, guard);
120    //     if evict.is_null() {
121    //         evict = self.init_evict(self.max_cost, &guard)
122    //     }
123    //
124    //
125    //
126    //     /* let new_table = Owned::new(SampledLFU::new(evict.max_cost));
127    //
128    //      self.evict.store(new_table, Ordering::SeqCst)*/
129    // }
130    pub fn add<'g>(&'g mut self, key: u64, cost: i64, guard: &'g Guard<'_>) -> (Vec<Item<T>>, bool) {
131        let l = self.lock.lock();
132
133
134        if key > 200000 {
135            println!("")
136        }
137        // can't add an item bigger than entire cache
138        if cost > self.evict.max_cost {
139            drop(l);
140            return (vec![], false);
141        }
142        // we don't need to go any further if the item is already in the cache
143        if self.evict.update_if_has(key, cost, guard) {
144            drop(l);
145            // An update does not count as an addition, so return false.
146            return (vec![], false);
147        }
148        let mut room = self.evict.room_left(cost);
149        // if we got this far, this key doesn't exist in the cache
150        //
151        // calculate the remaining room in the cache (usually bytes)
152        if room >= 0 {
153            // there's enough room in the cache to store the new item without
154            // overflowing, so we can do that now and stop here
155            self.evict.add(key, cost);
156            drop(l);
157            return (vec![], true);
158        }
159
160
161        let inc_hits = self.admit.estimate(key);
162        // sample is the eviction candidate pool to be filled via random sampling
163        //
164        // TODO: perhaps we should use a min heap here. Right now our time
165        // complexity is N for finding the min. Min heap should bring it down to
166        // O(lg N).
167
168        let mut sample = Vec::new();
169        let mut victims = Vec::new();
170        room = self.evict.room_left(cost);
171        while room < 0 {
172            room = self.evict.room_left(cost);
173            // fill up empty slots in sample
174            self.evict.fill_sample(&mut sample);
175            let mut min_key: u64 = 0;
176            let mut min_hits: i64 = i64::MAX;
177            let mut min_id: i64 = 0;
178            let mut min_cost: i64 = 0;
179
180
181            for i in 0..sample.len() {
182                let hits = self.admit.estimate(sample[i].key);
183                if hits < min_hits {
184                    min_key = sample[i].key;
185                    min_hits = hits;
186                    min_id = i as i64;
187                    min_cost = sample[i].cost;
188                }
189            }
190            if inc_hits < min_hits {
191                unsafe {
192                    let metrics = self.metrics;
193                    if metrics.is_null() {
194                        unsafe {
195                            metrics.as_ref().unwrap().add(REJECT_SETS, key, 1, guard)
196                        };
197                    }
198                }
199                return (victims, false);
200            }
201            self.evict.del(&min_key);
202            sample[min_id as usize] = sample[sample.len() - 1];
203            victims.push(Item {
204                flag: ItemNew,
205                key: min_key,
206                conflict: 0,
207                value: Atomic::null(),
208                cost: min_cost,
209                expiration: None,
210            })
211        };
212        self.evict.add(key, cost);
213        drop(l);
214        return (victims, true);
215    }
216
217    //TODO lock
218    pub fn has(&self, key: u64, _guard: &Guard) -> bool {
219        self.evict.key_costs.contains_key(&key)
220    }
221
222    pub fn del<'g>(&'g mut self, key: &u64, _guard: &'g Guard) {
223        self.evict.del(key);
224    }
225
226
227    pub fn update<'g>(&'g mut self, key: u64, cost: i64, guard: &'g Guard) {
228        self.evict.update_if_has(key, cost, guard);
229    }
230
231    pub fn clear<'g>(&'g mut self, _guard: &'g Guard) {
232        self.admit.clear();
233        self.evict.clear();
234    }
235
236    pub fn close(&mut self) {
237        //self.stop.0.send(true).expect("Chanla close");
238    }
239    pub fn cost(&self, key: &u64, _guard: &Guard) -> i64 {
240        match self.evict.key_costs.get(&key) {
241            None => -1,
242            Some(v) => *v
243        }
244    }
245
246    pub fn cap(&self) -> i64 {
247        self.evict.max_cost - self.evict.used
248    }
249
250    fn process_items<'g>(&'g mut self, item: Vec<u64>, _guard: &'g Guard) {
251        self.admit.push(item);
252        // self.flag.store(0, Ordering::SeqCst)
253        /*        loop {
254                    select! {
255                       recv(self.item_ch.1) -> item => {
256                            if let Ok(item) = item {
257                                let mut admit = self.admit.load(Ordering::SeqCst,guard);
258                                if admit.is_null() {
259                                    return;
260                                }
261                                let admit = unsafe{admit.deref_mut()};
262                                admit.push(item)
263                            }
264                       },
265                          recv(self.stop.1) -> item => {
266                            return;
267                        }
268                    }
269                }*/
270
271        /*   let msg = self.item_ch.1.try_recv();
272           {
273               match msg {
274                   Ok(r) => {
275                       let mut admit = self.admit.load(Ordering::SeqCst, guard);
276                       if admit.is_null() {
277                           return;
278                       }
279                       let admit = unsafe { admit.deref_mut() };
280
281                       admit.push(r);
282                   }
283                   Err(_) => {}
284               }
285           }*/
286    }
287}
288
289pub struct TinyLFU {
290    pub freq: CmSketch,
291    pub door: Bloom,
292    pub incrs: i64,
293    pub reset_at: i64,
294}
295
296impl TinyLFU {
297    pub fn new(num_counter: i64) -> Self {
298        TinyLFU {
299            freq: CmSketch::new(num_counter),
300            door: Bloom::new(num_counter as f64, 0.01),
301            incrs: 0,
302            reset_at: num_counter,
303        }
304    }
305
306    pub fn push(&mut self, keys: Vec<u64>) {
307        for (_i, key) in keys.iter().enumerate() {
308            self.increment(*key)
309        }
310    }
311
312    pub fn estimate(&mut self, key: u64) -> i64 {
313        let mut hits = self.freq.estimate(key);
314        if self.door.has(key) {
315            hits += 1;
316        }
317        hits
318    }
319
320    pub fn increment(&mut self, key: u64) {
321        // flip doorkeeper bit if not already
322        if self.door.add_if_not_has(key) {
323            // increment count-min counter if doorkeeper bit is already set.
324            self.freq.increment(key);
325        }
326        self.incrs += 1;
327        if self.incrs >= self.reset_at {
328            self.reset()
329        }
330    }
331
332    fn clear(&mut self) {
333        // Zero out incrs.
334        self.incrs = 0;
335        // clears doorkeeper bits
336        self.door.clear();
337        // halves count-min counters
338        self.freq.clear();
339    }
340    fn reset(&mut self) {
341        // Zero out incrs.
342        self.incrs = 0;
343        // clears doorkeeper bits
344        self.door.clear();
345        // halves count-min counters
346        self.freq.clear();
347    }
348}
349
350pub struct SampledLFU {
351    pub key_costs: HashMap<u64, i64>,
352    pub max_cost: i64,
353    pub used: i64,
354    pub(crate) metrics: *const Metrics,
355}
356
357
358
359impl SampledLFU {
360    fn new(max_cost: i64, shared: *const Metrics) -> Self {
361        SampledLFU {
362            key_costs: HashMap::new(),
363            max_cost,
364            used: 0,
365            metrics: shared
366        }
367    }
368
369    fn room_left(&self, cost: i64) -> i64 {
370        self.max_cost - (self.used + cost)
371    }
372
373    fn fill_sample(&self, input: &mut Vec<PolicyPair>) {
374        if input.len() >= LFU_SAMPLE {
375            return;
376        }
377        for (key, cost) in self.key_costs.iter() {
378            input.push(PolicyPair { key: *key, cost: *cost });
379            if input.len() >= LFU_SAMPLE {
380                return;
381            }
382        }
383        return;
384    }
385
386    fn del(&mut self, key: &u64) {
387        match self.key_costs.get(key) {
388            None => {}
389            Some(v) => {
390                self.used -= v;
391                self.key_costs.remove(key);
392            }
393        }
394    }
395
396    fn add(&mut self, key: u64, cost: i64) {
397        //eprintln!("{}", cost);
398        self.key_costs.insert(key, cost);
399        self.used += cost;
400    }
401    fn update_if_has(&mut self, key: u64, cost: i64, guard: &Guard) -> bool {
402        match self.key_costs.get(&key) {
403            None => false,
404            Some(v) => {
405                let metrics = self.metrics;
406                unsafe {
407                    if !metrics.is_null() {
408                        metrics.as_ref().unwrap().add(KEY_UPDATE, key, 1, guard)
409                    }
410                }
411
412                if *v > cost {
413                    let diff = *v - cost;
414                    if !metrics.is_null() {
415                        unsafe { metrics.as_ref().unwrap().add(COST_ADD, key, (diff - 1) as u64, guard) }
416                    }
417                } else if cost > *v {
418                    let diff = *v - cost;
419                    if !metrics.is_null() {
420                        unsafe { metrics.as_ref().unwrap().add(COST_ADD, key, diff as u64, guard) }
421                    }
422                }
423                self.used += cost - v;
424                self.key_costs.insert(key, cost);
425                true
426            }
427        }
428    }
429
430    fn clear(&mut self) {
431        self.used = 0;
432        self.key_costs = HashMap::default();
433    }
434}
435
436#[derive(Clone, Copy)]
437struct PolicyPair {
438    key: u64,
439    cost: i64,
440}
441
442
443#[cfg(test)]
444mod tests {
445    use seize::Collector;
446
447    use crate::cache::{DO_NOT_USE, Metrics};
448    use crate::policy::{DefaultPolicy, SampledLFU};
449
450    #[test]
451    fn test_policy_policy_push() {
452        let collector = Collector::new();
453
454        let _guard = collector.enter();
455        let shard_metric = Box::new(Metrics::new(DO_NOT_USE, &collector));
456
457        let guard = collector.enter();
458
459        let mut p = DefaultPolicy::<i32>::new(100, 10, &*shard_metric);
460        let mut keep_count = 0;
461        for _i in 0..10 {
462            if p.push(vec![1, 2, 3, 4, 5], &guard) {
463                keep_count += 1;
464            }
465        }
466        assert_ne!(keep_count, 0);
467        drop(Box::into_raw(shard_metric))
468    }
469
470    #[test]
471    fn test_policy_policy_add() {
472        let collector = Collector::new();
473
474        let guard = collector.enter();
475        let shard_metric = Box::into_raw(Box::new(Metrics::new(DO_NOT_USE, &collector)));
476        let mut p = DefaultPolicy::<i32>::new(1000, 100, shard_metric);
477        let v = p.add(1, 101, &guard);
478        assert!(v.0.len() == 0 || v.1, "can't add an item bigger than entire cache");
479
480        p.add(1, 1, &guard);
481        p.admit.increment(1);
482        p.admit.increment(2);
483        p.admit.increment(3);
484
485        let v = p.add(1, 1, &guard);
486        assert_eq!(v.0.len(), 0);
487        assert_eq!(v.1, false);
488
489        let v = p.add(2, 20, &guard);
490        assert_eq!(v.0.len(), 0);
491        assert_eq!(v.1, true);
492
493        let v = p.add(3, 90, &guard);
494        assert!(v.0.len()>0);
495        assert_eq!(v.1, true);
496
497        let v = p.add(4, 20, &guard);
498        assert_eq!(v.0.len(), 0);
499        assert_eq!(v.1, false);
500    }
501
502
503    #[test]
504    fn test_policy_del() {
505        let collector = Collector::new();
506
507        let guard = collector.enter();
508        let shard_metric = Box::new(Metrics::new(DO_NOT_USE, &collector));
509
510
511        let mut p = DefaultPolicy::<i32>::new(1000, 100, &*shard_metric);
512
513        p.add(1, 1, &guard);
514        p.del(&1,&guard);
515        p.del(&2,&guard);
516
517        assert_eq!(p.has(1,&guard),false);
518        assert_eq!(p.has(2,&guard),false);
519        drop(Box::into_raw(shard_metric))
520    }
521    #[test]
522    fn test_policy_cap() {
523        let collector = Collector::new();
524
525        let guard = collector.enter();
526        let shard_metric =Box::new(Metrics::new(DO_NOT_USE, &collector));
527
528
529        let mut p = DefaultPolicy::<i32>::new(100, 10, &*shard_metric);
530
531        p.add(1, 1, &guard);
532
533        assert_eq!(p.cap(),9);
534        drop(Box::into_raw(shard_metric))
535    }
536
537
538    #[test]
539    fn test_policy_update() {
540        let collector = Collector::new();
541
542        let guard = collector.enter();
543        let shard_metric = Box::new(Metrics::new(DO_NOT_USE, &collector));
544
545
546        let mut p = DefaultPolicy::<i32>::new(100, 10, &*shard_metric);
547
548        p.add(1, 1, &guard);
549        p.add(1, 2, &guard);
550
551        assert_eq!(p.evict.key_costs.get(&1),Some(&2));
552        drop(Box::into_raw(shard_metric))
553    }
554
555    #[test]
556    fn test_policy_cost() {
557        let collector = Collector::new();
558
559        let guard = collector.enter();
560        let shard_metric = Box::new(Metrics::new(DO_NOT_USE, &collector));
561
562
563        let mut p = DefaultPolicy::<i32>::new(100, 10, &*shard_metric);
564
565        p.add(1, 1, &guard);
566
567
568        assert_eq!(p.cost(&1,&guard),1);
569        assert_eq!(p.cost(&2,&guard),-1);
570        drop(Box::into_raw(shard_metric))
571    }
572
573
574    #[test]
575    fn test_policy_clear() {
576        let collector = Collector::new();
577
578        let guard = collector.enter();
579        let shard_metric= Box::new(Metrics::new(DO_NOT_USE, &collector));
580
581
582        let mut p = DefaultPolicy::<i32>::new(100, 10, &*shard_metric);
583
584        p.add(1, 1, &guard);
585        p.add(2, 2, &guard);
586        p.add(3, 3, &guard);
587        p.clear(&guard);
588
589
590        assert_eq!(p.has(1,&guard),false);
591        assert_eq!(p.has(2,&guard),false);
592        assert_eq!(p.has(2,&guard),false);
593
594        drop(Box::into_raw(shard_metric))
595    }
596    #[test]
597    fn test_lfu_add(){
598
599        let collector = Collector::new();
600
601        let _guard = collector.enter();
602        let shard_metric =Box::new(Metrics::new(DO_NOT_USE, &collector));
603
604
605
606        let mut lfu = SampledLFU::new(4,&*shard_metric);
607        lfu.add(1, 1);
608        lfu.add(2, 2);
609        lfu.add(3, 1);
610        assert_eq!(lfu.used,4);
611        assert_eq!(lfu.key_costs.get(&2),Some(&2));
612        drop(Box::into_raw(shard_metric))
613    }
614
615    #[test]
616    fn test_lfu_del(){
617
618        let collector = Collector::new();
619
620        let _guard = collector.enter();
621        let shard_metric = Box::new(Metrics::new(DO_NOT_USE, &collector));
622
623
624
625        let mut lfu = SampledLFU::new(4,&*shard_metric);
626        lfu.add(1, 1);
627        lfu.add(2, 2);
628        lfu.del(&2);
629        assert_eq!(lfu.used,1);
630        assert_eq!(lfu.key_costs.get(&2),None);
631        drop(Box::into_raw(shard_metric))
632    }
633
634
635    #[test]
636    fn test_lfu_update(){
637
638        let collector = Collector::new();
639
640        let guard = collector.enter();
641        let shard_metric =Box::new(Metrics::new(DO_NOT_USE, &collector));
642
643
644        let mut lfu = SampledLFU::new(4,&*shard_metric);
645        lfu.add(1, 1);
646
647        assert_eq!( lfu.update_if_has(1,2,&guard),true);
648        assert_eq!(lfu.used,2);
649        assert_eq!( lfu.update_if_has(2,2,&guard),false);
650        drop(Box::into_raw(shard_metric))
651    }
652
653    #[test]
654    fn test_lfu_clear(){
655
656        let collector = Collector::new();
657
658        let _guard = collector.enter();
659        let shard_metric =Box::new(Metrics::new(DO_NOT_USE, &collector));
660
661
662        let mut lfu = SampledLFU::new(4,&*shard_metric);
663        lfu.add(1, 1);
664        lfu.add(2, 2);
665        lfu.add(3, 3);
666        lfu.clear();
667
668        assert_eq!(lfu.used,0);
669        assert_eq!(lfu.key_costs.len(),0);
670        drop(Box::into_raw(shard_metric))
671
672    }
673}