tinylfu_cached/cache/policy/
cache_weight.rs

1use std::cmp::Ordering;
2use std::collections::{BinaryHeap, HashSet};
3use std::hash::Hash;
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use dashmap::mapref::multiple::RefMulti;
8use log::info;
9use parking_lot::RwLock;
10
11use crate::cache::key_description::KeyDescription;
12use crate::cache::policy::config::CacheWeightConfig;
13use crate::cache::stats::ConcurrentStatsCounter;
14use crate::cache::types::{FrequencyEstimate, KeyHash, KeyId, Weight};
15
16/// WeightedKey maintains the key, its hash and its weight. It is used as a value type in the DashMap used inside `CacheWeight`
17pub(crate) struct WeightedKey<Key> {
18    key: Key,
19    pub(crate) key_hash: KeyHash,
20    weight: Weight,
21}
22
23impl<Key> WeightedKey<Key> {
24    fn new(key: Key, key_hash: KeyHash, weight: Weight) -> Self {
25        WeightedKey {
26            key,
27            key_hash,
28            weight,
29        }
30    }
31}
32
33/// SampledKey represents a key with its id, weight and estimated frequency.
34/// A collection of `SampledKey` is returned by `FrequencyCounterBasedMinHeapSamples` when a sample is requested during the admission of a key
35#[derive(Copy, Clone, Debug)]
36pub(crate) struct SampledKey {
37    pub(crate) id: KeyId,
38    pub(crate) weight: Weight,
39    pub(crate) estimated_frequency: FrequencyEstimate,
40}
41
42impl Ord for SampledKey {
43    fn cmp(&self, other: &Self) -> Ordering {
44        (other.estimated_frequency, self.weight).cmp(&(self.estimated_frequency, other.weight))
45    }
46}
47
48impl PartialOrd for SampledKey {
49    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
50        Some(self.cmp(other))
51    }
52}
53
54impl PartialEq for SampledKey {
55    fn eq(&self, other: &Self) -> bool {
56        self.id == other.id
57    }
58}
59
60impl Eq for SampledKey {}
61
62impl SampledKey {
63    pub(crate) fn new<Key>(frequency: FrequencyEstimate, pair: RefMulti<KeyId, WeightedKey<Key>>) -> Self <> {
64        Self::using(*pair.key(), pair.weight, frequency)
65    }
66
67    fn using(id: KeyId, key_weight: Weight, frequency: FrequencyEstimate) -> Self <> {
68        SampledKey {
69            id,
70            weight: key_weight,
71            estimated_frequency: frequency,
72        }
73    }
74}
75
76/// FrequencyCounterBasedMinHeapSamples returns a sample to the `create_space` method of `crate::cache::policy::admission_policy::AdmissionPolicy`
77/// The idea is to return a sample and allow getting the key with the smallest access frequency.
78/// Internally, `FrequencyCounterBasedMinHeapSamples` uses [`std::collections::BinaryHeap`] and returns `SampledKey` that contains the key_id, its weight and its access frequency.
79pub(crate) struct FrequencyCounterBasedMinHeapSamples<'a, Key, Freq>
80    where Freq: Fn(KeyHash) -> FrequencyEstimate {
81    source: &'a DashMap<KeyId, WeightedKey<Key>>,
82    sample: BinaryHeap<SampledKey>,
83    current_sample_key_ids: HashSet<KeyId>,
84    sample_size: usize,
85    frequency_counter: Freq,
86}
87
88impl<'a, Key, Freq> FrequencyCounterBasedMinHeapSamples<'a, Key, Freq>
89    where Freq: Fn(KeyHash) -> FrequencyEstimate {
90    fn new(
91        source: &'a DashMap<KeyId, WeightedKey<Key>>,
92        sample_size: usize,
93        frequency_counter: Freq) -> Self <> {
94        let (sample, current_sample_key_ids) = Self::initial_sample(source, sample_size, &frequency_counter);
95        FrequencyCounterBasedMinHeapSamples {
96            source,
97            sample,
98            current_sample_key_ids,
99            sample_size,
100            frequency_counter,
101        }
102    }
103
104    /// There should never be a case where `min_frequency_key` is not able to return a key because sample size is always greater than 1.
105    /// However, the current implementation has a sort of a race condition.
106    /// Consider there is a `put` operation in progress through `AdmissionPolicy`.
107    /// At the same time, `shutdown` is invoked on the instance of `Cached`. The current implementation of `shutdown` sends a `Shutdown` command to the
108    /// `crate::cache::command::command_executor::CommandExecutor` and then calls `clear` on the `AdmissionPolicy`.
109    /// The `clear` method of `AdmissionPolicy` clears all the entries in the `CacheWeight`.
110    /// Consider that the `clear` operation is done on  `CacheWeight` and now `maybe_fill_in` is called but it would not be able to fill in the new keys
111    /// There would be a stage where `self.sample` might become empty and `pop` does not return anything.
112    /// Hence, at this stage, `min_frequency_key` returns an Option<SampledKey>
113    pub(crate) fn min_frequency_key(&mut self) -> Option<SampledKey> {
114        if let Some(key) = self.sample.pop() {
115            self.current_sample_key_ids.remove(&key.id);
116            return Some(key);
117        }
118        None
119    }
120
121    /// Everytime a key is used from the sample, an attempt is made to fill_in the sample
122    /// In order to fill_in, we need to ensure that we do not add the key which is already present in the `BinaryHeap` again
123    /// Rust's `BinaryHeap` does not provide a `contains` method, so we use a `HashSet` to determine the key_ids that are a part of the current sample.
124    pub(crate) fn maybe_fill_in(&mut self) -> bool {
125        let mut filled_in: bool = false;
126        let mut iterator = self.source.iter();
127
128        while self.sample.len() < self.sample_size {
129            match iterator.next() {
130                Some(pair) => {
131                    if !self.current_sample_key_ids.contains(pair.key()) {
132                        let frequency = (self.frequency_counter)(pair.key_hash);
133                        self.current_sample_key_ids.insert(*pair.key());
134                        self.sample.push(SampledKey::new(frequency, pair));
135                        filled_in = true;
136                    }
137                }
138                None => {
139                    break;
140                }
141            }
142        }
143        filled_in
144    }
145
146    pub(crate) fn size(&self) -> usize {
147        self.sample.len()
148    }
149
150    /// Return an initial sample with size = `sample_size`
151    fn initial_sample(
152        source: &DashMap<KeyId, WeightedKey<Key>>,
153        sample_size: usize,
154        frequency_counter: &Freq) -> (BinaryHeap<SampledKey>, HashSet<KeyId>) {
155        let mut counter = 0;
156        let mut sample = BinaryHeap::new();
157        let mut current_sample_key_ids = HashSet::new();
158
159        for pair in source.iter().by_ref() {
160            current_sample_key_ids.insert(*pair.key());
161            sample.push(SampledKey::new(frequency_counter(pair.value().key_hash), pair));
162            counter += 1;
163
164            if counter >= sample_size {
165                break;
166            }
167        }
168        (sample, current_sample_key_ids)
169    }
170}
171
172/// CacheWeight maintains the weight of each key in the Cache and also manages the weight that is used in the cache.
173/// Weight of each key is maintained via `WeightedKey` abstraction that contains the key, key_hash and its weight.
174/// Every time a key is added in the cache, it is also added in `CacheWeight`, there by increasing the total weight of the cache.
175/// Every time a key is updated, an attempt is made to update its weight, there by changing the total weight of the cache.
176/// Every time a key is deleted, it is also deleted from `CacheWeight`, there by decreasing the total weight of the cache.
177pub(crate) struct CacheWeight<Key>
178    where Key: Hash + Eq + Send + Sync + Clone + 'static, {
179    max_weight: Weight,
180    weight_used: RwLock<Weight>,
181    key_weights: DashMap<KeyId, WeightedKey<Key>>,
182    stats_counter: Arc<ConcurrentStatsCounter>,
183}
184
185impl<Key> CacheWeight<Key>
186    where Key: Hash + Eq + Send + Sync + Clone + 'static, {
187    pub(crate) fn new(cache_weight_config: CacheWeightConfig, stats_counter: Arc<ConcurrentStatsCounter>) -> Self <> {
188        info!("Initializing CacheWeight with a total weight {}", cache_weight_config.total_cache_weight());
189        CacheWeight {
190            max_weight: cache_weight_config.total_cache_weight(),
191            weight_used: RwLock::new(0),
192            key_weights: DashMap::with_capacity_and_shard_amount(cache_weight_config.capacity(), cache_weight_config.shards()),
193            stats_counter,
194        }
195    }
196
197    pub(crate) fn get_max_weight(&self) -> Weight {
198        self.max_weight
199    }
200
201    pub(crate) fn get_weight_used(&self) -> Weight {
202        *self.weight_used.read()
203    }
204
205    pub(crate) fn is_space_available_for(&self, weight: Weight) -> (Weight, bool) {
206        let available = self.max_weight - (*self.weight_used.read());
207        (available, available >= weight)
208    }
209
210    pub(crate) fn add(&self, key_description: &KeyDescription<Key>) {
211        self.key_weights.insert(key_description.id, WeightedKey::new(key_description.clone_key(), key_description.hash, key_description.weight));
212        let mut guard = self.weight_used.write();
213        *guard += key_description.weight;
214
215        self.stats_counter.add_weight(key_description.weight as u64);
216    }
217
218    pub(crate) fn update(&self, key_id: &KeyId, weight: Weight) -> bool {
219        if let Some(mut existing) = self.key_weights.get_mut(key_id) {
220            {
221                let mut guard = self.weight_used.write();
222                *guard += weight - existing.weight;
223            }
224
225            self.stats_counter.update_key();
226            self.update_weight_stats(weight, existing.weight);
227
228            let weighted_key = existing.value_mut();
229            weighted_key.weight = weight;
230
231            return true;
232        }
233        false
234    }
235
236    pub(crate) fn delete<DeleteHook>(&self, key_id: &KeyId, delete_hook: &DeleteHook)
237        where DeleteHook: Fn(Key) {
238        if let Some(weight_by_key_hash) = self.key_weights.remove(key_id) {
239            let mut guard = self.weight_used.write();
240            *guard -= weight_by_key_hash.1.weight;
241            delete_hook(weight_by_key_hash.1.key);
242
243            self.stats_counter.remove_weight(weight_by_key_hash.1.weight as u64);
244        }
245    }
246
247    pub(crate) fn contains(&self, key_id: &KeyId) -> bool {
248        self.key_weights.contains_key(key_id)
249    }
250
251    pub(crate) fn weight_of(&self, key_id: &KeyId) -> Option<Weight> {
252        self.key_weights.get(key_id).map(|pair| pair.weight)
253    }
254
255    pub(crate) fn sample<Freq>(&self, size: usize, frequency_counter: Freq)
256                               -> FrequencyCounterBasedMinHeapSamples<'_, Key, Freq>
257        where Freq: Fn(KeyHash) -> FrequencyEstimate {
258        FrequencyCounterBasedMinHeapSamples::new(&self.key_weights, size, frequency_counter)
259    }
260
261    pub(crate) fn clear(&self) {
262        self.key_weights.clear();
263        let mut guard = self.weight_used.write();
264        *guard = 0;
265    }
266
267    fn update_weight_stats(&self, new_weight: Weight, existing_weight: Weight) {
268        if new_weight > existing_weight {
269            let difference = new_weight - existing_weight;
270            self.stats_counter.add_weight(difference as u64);
271        } else {
272            let difference = existing_weight - new_weight;
273            self.stats_counter.add_weight(!(difference - 1) as u64);
274        }
275    }
276}
277
278#[cfg(test)]
279mod tests {
280    use std::sync::Arc;
281
282    use parking_lot::RwLock;
283
284    use crate::cache::key_description::KeyDescription;
285    use crate::cache::policy::cache_weight::CacheWeight;
286    use crate::cache::policy::config::CacheWeightConfig;
287    use crate::cache::stats::ConcurrentStatsCounter;
288
289    struct DeletedKeys<Key> {
290        keys: RwLock<Vec<Key>>,
291    }
292
293    fn test_cache_weight_config() -> CacheWeightConfig {
294        CacheWeightConfig::new(100, 4, 10)
295    }
296
297    #[test]
298    fn maximum_cache_weight() {
299        let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
300        assert_eq!(10, cache_weight.get_max_weight());
301    }
302
303    #[test]
304    fn space_is_available_for_new_key() {
305        let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
306        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
307
308        assert!(cache_weight.is_space_available_for(7).1);
309    }
310
311    #[test]
312    fn space_is_not_available_for_new_key() {
313        let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
314        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
315
316        assert!(!cache_weight.is_space_available_for(8).1);
317    }
318
319    #[test]
320    fn add_key_weight() {
321        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
322        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
323
324        assert_eq!(3, cache_weight.get_weight_used());
325    }
326
327    #[test]
328    fn add_key_weight_and_increase_stats() {
329        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
330        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
331
332        assert_eq!(3, cache_weight.stats_counter.weight_added());
333    }
334
335    #[test]
336    fn update_non_existing_key() {
337        let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
338
339        let result = cache_weight.update(&1, 2);
340        assert!(!result);
341    }
342
343    #[test]
344    fn update_an_existing_key() {
345        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
346
347        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
348        let result = cache_weight.update(&1, 3);
349
350        assert!(result);
351    }
352
353    #[test]
354    fn update_key_weight_given_the_updated_weight_is_less() {
355        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
356
357        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
358        assert_eq!(3, cache_weight.get_weight_used());
359
360        cache_weight.update(&1, 2);
361        assert_eq!(2, cache_weight.get_weight_used());
362    }
363
364    #[test]
365    fn update_key_weight_given_the_updated_weight_is_less_and_increase_stats() {
366        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
367
368        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
369        assert_eq!(3, cache_weight.stats_counter.weight_added());
370
371        cache_weight.update(&1, 2);
372        assert_eq!(2, cache_weight.stats_counter.weight_added());
373        assert_eq!(1, cache_weight.stats_counter.keys_updated());
374    }
375
376    #[test]
377    fn update_key_weight_given_the_updated_weight_is_more() {
378        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
379
380        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
381        assert_eq!(4, cache_weight.get_weight_used());
382
383        cache_weight.update(&1, 8);
384        assert_eq!(8, cache_weight.get_weight_used());
385    }
386
387    #[test]
388    fn update_key_weight_given_the_updated_weight_is_more_and_increase_stats() {
389        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
390
391        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
392        assert_eq!(4, cache_weight.stats_counter.weight_added());
393
394        cache_weight.update(&1, 8);
395        assert_eq!(8, cache_weight.stats_counter.weight_added());
396    }
397
398    #[test]
399    fn update_key_weight_given_the_updated_weight_is_same() {
400        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
401
402        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
403        assert_eq!(4, cache_weight.get_weight_used());
404
405        cache_weight.update(&1, 4);
406        assert_eq!(4, cache_weight.get_weight_used());
407    }
408
409    #[test]
410    fn update_key_weight_given_the_updated_weight_is_same_and_make_no_changes_in_stats() {
411        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
412
413        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
414        assert_eq!(4, cache_weight.stats_counter.weight_added());
415
416        cache_weight.update(&1, 4);
417        assert_eq!(4, cache_weight.stats_counter.weight_added());
418    }
419
420    #[test]
421    fn delete_key_weight() {
422        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
423
424        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
425        assert_eq!(3, cache_weight.get_weight_used());
426
427        let deleted_keys = DeletedKeys { keys: RwLock::new(Vec::new()) };
428        let delete_hook = |key| { deleted_keys.keys.write().push(key) };
429        cache_weight.delete(&1, &delete_hook);
430
431        assert_eq!(vec!["disk"], *deleted_keys.keys.read());
432        assert_eq!(0, cache_weight.get_weight_used());
433        assert!(!cache_weight.contains(&1));
434    }
435
436    #[test]
437    fn delete_key_weight_increase_stats() {
438        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
439
440        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
441        assert_eq!(3, cache_weight.get_weight_used());
442
443        let delete_hook = |_| {};
444        cache_weight.delete(&1, &delete_hook);
445
446        assert_eq!(3, cache_weight.stats_counter.weight_removed())
447    }
448
449    #[test]
450    fn clear() {
451        let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
452        cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
453
454        assert_eq!(3, cache_weight.get_weight_used());
455        assert!(cache_weight.contains(&1));
456
457        cache_weight.clear();
458
459        assert_eq!(0, cache_weight.get_weight_used());
460        assert!(!cache_weight.contains(&1));
461    }
462}
463
464#[cfg(test)]
465mod frequency_counter_based_min_heap_samples_tests {
466    use dashmap::DashMap;
467
468    use crate::cache::policy::cache_weight::{FrequencyCounterBasedMinHeapSamples, SampledKey, WeightedKey};
469    use crate::cache::types::KeyId;
470
471    #[test]
472    fn equality_of_sampled_keys() {
473        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
474        cache.insert(1, WeightedKey::new("disk", 3040, 3));
475
476        let mut sampled_keys = Vec::new();
477        for pair in cache.iter().by_ref() {
478            sampled_keys.push(SampledKey::new(10, pair));
479        }
480
481        assert_eq!(sampled_keys[0], sampled_keys[0]);
482    }
483
484    #[test]
485    fn sample_size() {
486        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
487        cache.insert(1, WeightedKey::new("disk", 3040, 3));
488        cache.insert(2, WeightedKey::new("topic", 1090, 4));
489        cache.insert(3, WeightedKey::new("SSD", 1290, 3));
490
491        let sample = FrequencyCounterBasedMinHeapSamples::new(
492            &cache,
493            2,
494            |_hash| { 1 },
495        );
496
497        assert_eq!(2, sample.size());
498    }
499
500    #[test]
501    fn maybe_fill_in_with_empty_source() {
502        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
503        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
504            &cache,
505            2,
506            |_hash| { 1 },
507        );
508
509        let sample_key = sample.min_frequency_key();
510        assert_eq!(None, sample_key);
511    }
512
513    #[test]
514    fn maybe_fill_in_with_source_having_keys_to_fill() {
515        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
516        cache.insert(1, WeightedKey::new("disk", 3040, 3));
517        cache.insert(2, WeightedKey::new("topic", 1090, 4));
518        cache.insert(3, WeightedKey::new("SSD", 1290, 3));
519
520        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
521            &cache,
522            2,
523            |_hash| { 1 },
524        );
525
526        assert_eq!(2, sample.size());
527
528        let _ = sample.min_frequency_key();
529        let _ = sample.maybe_fill_in();
530
531        assert_eq!(2, sample.size());
532    }
533
534    #[test]
535    fn maybe_fill_in_with_source_not_having_keys_to_fill() {
536        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
537        cache.insert(1, WeightedKey::new("disk", 3040, 3));
538        cache.insert(2, WeightedKey::new("topic", 1090, 4));
539
540        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
541            &cache,
542            2,
543            |_hash| { 1 },
544        );
545
546        assert_eq!(2, sample.size());
547        let _ = sample.min_frequency_key();
548
549        cache.remove(&1);
550        cache.remove(&2);
551        let _ = sample.maybe_fill_in();
552
553        assert_eq!(1, sample.size());
554    }
555
556    #[test]
557    fn maybe_fill_in_with_source_having_an_existing_sample_key_to_fill() {
558        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
559        cache.insert(1, WeightedKey::new("disk", 3040, 3));
560        cache.insert(2, WeightedKey::new("topic", 1090, 4));
561
562        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
563            &cache,
564            2,
565            |hash| match hash {
566                3040 => 1,
567                1090 => 2,
568                _ => 0
569            },
570        );
571
572        assert_eq!(2, sample.size());
573        let _ = sample.min_frequency_key();
574
575        cache.remove(&1);
576        let _ = sample.maybe_fill_in();
577
578        assert_eq!(1, sample.size());
579    }
580
581    #[test]
582    fn maybe_fill_in_with_the_sample_already_containing_the_source_keys() {
583        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
584        cache.insert(1, WeightedKey::new("disk", 3040, 3));
585        cache.insert(2, WeightedKey::new("topic", 1090, 4));
586
587        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
588            &cache,
589            2,
590            |_hash| { 1 },
591        );
592
593        assert_eq!(2, sample.size());
594        let _ = sample.maybe_fill_in();
595        assert_eq!(2, sample.size());
596    }
597
598    #[test]
599    fn sample_keys_with_distinct_frequencies() {
600        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
601        cache.insert(1, WeightedKey::new("disk", 3040, 3));
602        cache.insert(2, WeightedKey::new("topic", 1090, 4));
603        cache.insert(3, WeightedKey::new("SSD", 1290, 3));
604
605        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
606            &cache,
607            3,
608            |hash| {
609                match hash {
610                    3040 => 1,
611                    1090 => 2,
612                    1290 => 3,
613                    _ => 0
614                }
615            },
616        );
617
618        assert_eq!(1, sample.min_frequency_key().unwrap().estimated_frequency);
619        assert_eq!(2, sample.min_frequency_key().unwrap().estimated_frequency);
620        assert_eq!(3, sample.min_frequency_key().unwrap().estimated_frequency);
621    }
622
623    #[test]
624    fn sample_keys_with_same_frequencies() {
625        let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
626        cache.insert(10, WeightedKey::new("disk", 3040, 5));
627        cache.insert(20, WeightedKey::new("topic", 1090, 2));
628        cache.insert(30, WeightedKey::new("SSD", 1290, 3));
629
630        let mut sample = FrequencyCounterBasedMinHeapSamples::new(
631            &cache,
632            3,
633            |hash| {
634                match hash {
635                    3040 => 1,
636                    1090 => 2,
637                    1290 => 1,
638                    _ => 0
639                }
640            },
641        );
642
643        let sampled_key = sample.min_frequency_key().unwrap();
644        assert_eq!(1, sampled_key.estimated_frequency);
645        assert_eq!(5, sampled_key.weight);
646        assert_eq!(10, sampled_key.id);
647
648        let sampled_key = sample.min_frequency_key().unwrap();
649        assert_eq!(1, sampled_key.estimated_frequency);
650        assert_eq!(3, sampled_key.weight);
651        assert_eq!(30, sampled_key.id);
652
653        let sampled_key = sample.min_frequency_key().unwrap();
654        assert_eq!(2, sampled_key.estimated_frequency);
655        assert_eq!(2, sampled_key.weight);
656        assert_eq!(20, sampled_key.id);
657    }
658}