1use std::cmp::Ordering;
2use std::collections::{BinaryHeap, HashSet};
3use std::hash::Hash;
4use std::sync::Arc;
5
6use dashmap::DashMap;
7use dashmap::mapref::multiple::RefMulti;
8use log::info;
9use parking_lot::RwLock;
10
11use crate::cache::key_description::KeyDescription;
12use crate::cache::policy::config::CacheWeightConfig;
13use crate::cache::stats::ConcurrentStatsCounter;
14use crate::cache::types::{FrequencyEstimate, KeyHash, KeyId, Weight};
15
16pub(crate) struct WeightedKey<Key> {
18 key: Key,
19 pub(crate) key_hash: KeyHash,
20 weight: Weight,
21}
22
23impl<Key> WeightedKey<Key> {
24 fn new(key: Key, key_hash: KeyHash, weight: Weight) -> Self {
25 WeightedKey {
26 key,
27 key_hash,
28 weight,
29 }
30 }
31}
32
33#[derive(Copy, Clone, Debug)]
36pub(crate) struct SampledKey {
37 pub(crate) id: KeyId,
38 pub(crate) weight: Weight,
39 pub(crate) estimated_frequency: FrequencyEstimate,
40}
41
42impl Ord for SampledKey {
43 fn cmp(&self, other: &Self) -> Ordering {
44 (other.estimated_frequency, self.weight).cmp(&(self.estimated_frequency, other.weight))
45 }
46}
47
48impl PartialOrd for SampledKey {
49 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
50 Some(self.cmp(other))
51 }
52}
53
54impl PartialEq for SampledKey {
55 fn eq(&self, other: &Self) -> bool {
56 self.id == other.id
57 }
58}
59
60impl Eq for SampledKey {}
61
62impl SampledKey {
63 pub(crate) fn new<Key>(frequency: FrequencyEstimate, pair: RefMulti<KeyId, WeightedKey<Key>>) -> Self <> {
64 Self::using(*pair.key(), pair.weight, frequency)
65 }
66
67 fn using(id: KeyId, key_weight: Weight, frequency: FrequencyEstimate) -> Self <> {
68 SampledKey {
69 id,
70 weight: key_weight,
71 estimated_frequency: frequency,
72 }
73 }
74}
75
76pub(crate) struct FrequencyCounterBasedMinHeapSamples<'a, Key, Freq>
80 where Freq: Fn(KeyHash) -> FrequencyEstimate {
81 source: &'a DashMap<KeyId, WeightedKey<Key>>,
82 sample: BinaryHeap<SampledKey>,
83 current_sample_key_ids: HashSet<KeyId>,
84 sample_size: usize,
85 frequency_counter: Freq,
86}
87
88impl<'a, Key, Freq> FrequencyCounterBasedMinHeapSamples<'a, Key, Freq>
89 where Freq: Fn(KeyHash) -> FrequencyEstimate {
90 fn new(
91 source: &'a DashMap<KeyId, WeightedKey<Key>>,
92 sample_size: usize,
93 frequency_counter: Freq) -> Self <> {
94 let (sample, current_sample_key_ids) = Self::initial_sample(source, sample_size, &frequency_counter);
95 FrequencyCounterBasedMinHeapSamples {
96 source,
97 sample,
98 current_sample_key_ids,
99 sample_size,
100 frequency_counter,
101 }
102 }
103
104 pub(crate) fn min_frequency_key(&mut self) -> Option<SampledKey> {
114 if let Some(key) = self.sample.pop() {
115 self.current_sample_key_ids.remove(&key.id);
116 return Some(key);
117 }
118 None
119 }
120
121 pub(crate) fn maybe_fill_in(&mut self) -> bool {
125 let mut filled_in: bool = false;
126 let mut iterator = self.source.iter();
127
128 while self.sample.len() < self.sample_size {
129 match iterator.next() {
130 Some(pair) => {
131 if !self.current_sample_key_ids.contains(pair.key()) {
132 let frequency = (self.frequency_counter)(pair.key_hash);
133 self.current_sample_key_ids.insert(*pair.key());
134 self.sample.push(SampledKey::new(frequency, pair));
135 filled_in = true;
136 }
137 }
138 None => {
139 break;
140 }
141 }
142 }
143 filled_in
144 }
145
146 pub(crate) fn size(&self) -> usize {
147 self.sample.len()
148 }
149
150 fn initial_sample(
152 source: &DashMap<KeyId, WeightedKey<Key>>,
153 sample_size: usize,
154 frequency_counter: &Freq) -> (BinaryHeap<SampledKey>, HashSet<KeyId>) {
155 let mut counter = 0;
156 let mut sample = BinaryHeap::new();
157 let mut current_sample_key_ids = HashSet::new();
158
159 for pair in source.iter().by_ref() {
160 current_sample_key_ids.insert(*pair.key());
161 sample.push(SampledKey::new(frequency_counter(pair.value().key_hash), pair));
162 counter += 1;
163
164 if counter >= sample_size {
165 break;
166 }
167 }
168 (sample, current_sample_key_ids)
169 }
170}
171
172pub(crate) struct CacheWeight<Key>
178 where Key: Hash + Eq + Send + Sync + Clone + 'static, {
179 max_weight: Weight,
180 weight_used: RwLock<Weight>,
181 key_weights: DashMap<KeyId, WeightedKey<Key>>,
182 stats_counter: Arc<ConcurrentStatsCounter>,
183}
184
185impl<Key> CacheWeight<Key>
186 where Key: Hash + Eq + Send + Sync + Clone + 'static, {
187 pub(crate) fn new(cache_weight_config: CacheWeightConfig, stats_counter: Arc<ConcurrentStatsCounter>) -> Self <> {
188 info!("Initializing CacheWeight with a total weight {}", cache_weight_config.total_cache_weight());
189 CacheWeight {
190 max_weight: cache_weight_config.total_cache_weight(),
191 weight_used: RwLock::new(0),
192 key_weights: DashMap::with_capacity_and_shard_amount(cache_weight_config.capacity(), cache_weight_config.shards()),
193 stats_counter,
194 }
195 }
196
197 pub(crate) fn get_max_weight(&self) -> Weight {
198 self.max_weight
199 }
200
201 pub(crate) fn get_weight_used(&self) -> Weight {
202 *self.weight_used.read()
203 }
204
205 pub(crate) fn is_space_available_for(&self, weight: Weight) -> (Weight, bool) {
206 let available = self.max_weight - (*self.weight_used.read());
207 (available, available >= weight)
208 }
209
210 pub(crate) fn add(&self, key_description: &KeyDescription<Key>) {
211 self.key_weights.insert(key_description.id, WeightedKey::new(key_description.clone_key(), key_description.hash, key_description.weight));
212 let mut guard = self.weight_used.write();
213 *guard += key_description.weight;
214
215 self.stats_counter.add_weight(key_description.weight as u64);
216 }
217
218 pub(crate) fn update(&self, key_id: &KeyId, weight: Weight) -> bool {
219 if let Some(mut existing) = self.key_weights.get_mut(key_id) {
220 {
221 let mut guard = self.weight_used.write();
222 *guard += weight - existing.weight;
223 }
224
225 self.stats_counter.update_key();
226 self.update_weight_stats(weight, existing.weight);
227
228 let weighted_key = existing.value_mut();
229 weighted_key.weight = weight;
230
231 return true;
232 }
233 false
234 }
235
236 pub(crate) fn delete<DeleteHook>(&self, key_id: &KeyId, delete_hook: &DeleteHook)
237 where DeleteHook: Fn(Key) {
238 if let Some(weight_by_key_hash) = self.key_weights.remove(key_id) {
239 let mut guard = self.weight_used.write();
240 *guard -= weight_by_key_hash.1.weight;
241 delete_hook(weight_by_key_hash.1.key);
242
243 self.stats_counter.remove_weight(weight_by_key_hash.1.weight as u64);
244 }
245 }
246
247 pub(crate) fn contains(&self, key_id: &KeyId) -> bool {
248 self.key_weights.contains_key(key_id)
249 }
250
251 pub(crate) fn weight_of(&self, key_id: &KeyId) -> Option<Weight> {
252 self.key_weights.get(key_id).map(|pair| pair.weight)
253 }
254
255 pub(crate) fn sample<Freq>(&self, size: usize, frequency_counter: Freq)
256 -> FrequencyCounterBasedMinHeapSamples<'_, Key, Freq>
257 where Freq: Fn(KeyHash) -> FrequencyEstimate {
258 FrequencyCounterBasedMinHeapSamples::new(&self.key_weights, size, frequency_counter)
259 }
260
261 pub(crate) fn clear(&self) {
262 self.key_weights.clear();
263 let mut guard = self.weight_used.write();
264 *guard = 0;
265 }
266
267 fn update_weight_stats(&self, new_weight: Weight, existing_weight: Weight) {
268 if new_weight > existing_weight {
269 let difference = new_weight - existing_weight;
270 self.stats_counter.add_weight(difference as u64);
271 } else {
272 let difference = existing_weight - new_weight;
273 self.stats_counter.add_weight(!(difference - 1) as u64);
274 }
275 }
276}
277
278#[cfg(test)]
279mod tests {
280 use std::sync::Arc;
281
282 use parking_lot::RwLock;
283
284 use crate::cache::key_description::KeyDescription;
285 use crate::cache::policy::cache_weight::CacheWeight;
286 use crate::cache::policy::config::CacheWeightConfig;
287 use crate::cache::stats::ConcurrentStatsCounter;
288
289 struct DeletedKeys<Key> {
290 keys: RwLock<Vec<Key>>,
291 }
292
293 fn test_cache_weight_config() -> CacheWeightConfig {
294 CacheWeightConfig::new(100, 4, 10)
295 }
296
297 #[test]
298 fn maximum_cache_weight() {
299 let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
300 assert_eq!(10, cache_weight.get_max_weight());
301 }
302
303 #[test]
304 fn space_is_available_for_new_key() {
305 let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
306 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
307
308 assert!(cache_weight.is_space_available_for(7).1);
309 }
310
311 #[test]
312 fn space_is_not_available_for_new_key() {
313 let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
314 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
315
316 assert!(!cache_weight.is_space_available_for(8).1);
317 }
318
319 #[test]
320 fn add_key_weight() {
321 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
322 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
323
324 assert_eq!(3, cache_weight.get_weight_used());
325 }
326
327 #[test]
328 fn add_key_weight_and_increase_stats() {
329 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
330 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
331
332 assert_eq!(3, cache_weight.stats_counter.weight_added());
333 }
334
335 #[test]
336 fn update_non_existing_key() {
337 let cache_weight: CacheWeight<&str> = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
338
339 let result = cache_weight.update(&1, 2);
340 assert!(!result);
341 }
342
343 #[test]
344 fn update_an_existing_key() {
345 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
346
347 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
348 let result = cache_weight.update(&1, 3);
349
350 assert!(result);
351 }
352
353 #[test]
354 fn update_key_weight_given_the_updated_weight_is_less() {
355 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
356
357 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
358 assert_eq!(3, cache_weight.get_weight_used());
359
360 cache_weight.update(&1, 2);
361 assert_eq!(2, cache_weight.get_weight_used());
362 }
363
364 #[test]
365 fn update_key_weight_given_the_updated_weight_is_less_and_increase_stats() {
366 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
367
368 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
369 assert_eq!(3, cache_weight.stats_counter.weight_added());
370
371 cache_weight.update(&1, 2);
372 assert_eq!(2, cache_weight.stats_counter.weight_added());
373 assert_eq!(1, cache_weight.stats_counter.keys_updated());
374 }
375
376 #[test]
377 fn update_key_weight_given_the_updated_weight_is_more() {
378 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
379
380 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
381 assert_eq!(4, cache_weight.get_weight_used());
382
383 cache_weight.update(&1, 8);
384 assert_eq!(8, cache_weight.get_weight_used());
385 }
386
387 #[test]
388 fn update_key_weight_given_the_updated_weight_is_more_and_increase_stats() {
389 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
390
391 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
392 assert_eq!(4, cache_weight.stats_counter.weight_added());
393
394 cache_weight.update(&1, 8);
395 assert_eq!(8, cache_weight.stats_counter.weight_added());
396 }
397
398 #[test]
399 fn update_key_weight_given_the_updated_weight_is_same() {
400 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
401
402 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
403 assert_eq!(4, cache_weight.get_weight_used());
404
405 cache_weight.update(&1, 4);
406 assert_eq!(4, cache_weight.get_weight_used());
407 }
408
409 #[test]
410 fn update_key_weight_given_the_updated_weight_is_same_and_make_no_changes_in_stats() {
411 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
412
413 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 4));
414 assert_eq!(4, cache_weight.stats_counter.weight_added());
415
416 cache_weight.update(&1, 4);
417 assert_eq!(4, cache_weight.stats_counter.weight_added());
418 }
419
420 #[test]
421 fn delete_key_weight() {
422 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
423
424 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
425 assert_eq!(3, cache_weight.get_weight_used());
426
427 let deleted_keys = DeletedKeys { keys: RwLock::new(Vec::new()) };
428 let delete_hook = |key| { deleted_keys.keys.write().push(key) };
429 cache_weight.delete(&1, &delete_hook);
430
431 assert_eq!(vec!["disk"], *deleted_keys.keys.read());
432 assert_eq!(0, cache_weight.get_weight_used());
433 assert!(!cache_weight.contains(&1));
434 }
435
436 #[test]
437 fn delete_key_weight_increase_stats() {
438 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
439
440 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
441 assert_eq!(3, cache_weight.get_weight_used());
442
443 let delete_hook = |_| {};
444 cache_weight.delete(&1, &delete_hook);
445
446 assert_eq!(3, cache_weight.stats_counter.weight_removed())
447 }
448
449 #[test]
450 fn clear() {
451 let cache_weight = CacheWeight::new(test_cache_weight_config(), Arc::new(ConcurrentStatsCounter::new()));
452 cache_weight.add(&KeyDescription::new("disk", 1, 3040, 3));
453
454 assert_eq!(3, cache_weight.get_weight_used());
455 assert!(cache_weight.contains(&1));
456
457 cache_weight.clear();
458
459 assert_eq!(0, cache_weight.get_weight_used());
460 assert!(!cache_weight.contains(&1));
461 }
462}
463
464#[cfg(test)]
465mod frequency_counter_based_min_heap_samples_tests {
466 use dashmap::DashMap;
467
468 use crate::cache::policy::cache_weight::{FrequencyCounterBasedMinHeapSamples, SampledKey, WeightedKey};
469 use crate::cache::types::KeyId;
470
471 #[test]
472 fn equality_of_sampled_keys() {
473 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
474 cache.insert(1, WeightedKey::new("disk", 3040, 3));
475
476 let mut sampled_keys = Vec::new();
477 for pair in cache.iter().by_ref() {
478 sampled_keys.push(SampledKey::new(10, pair));
479 }
480
481 assert_eq!(sampled_keys[0], sampled_keys[0]);
482 }
483
484 #[test]
485 fn sample_size() {
486 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
487 cache.insert(1, WeightedKey::new("disk", 3040, 3));
488 cache.insert(2, WeightedKey::new("topic", 1090, 4));
489 cache.insert(3, WeightedKey::new("SSD", 1290, 3));
490
491 let sample = FrequencyCounterBasedMinHeapSamples::new(
492 &cache,
493 2,
494 |_hash| { 1 },
495 );
496
497 assert_eq!(2, sample.size());
498 }
499
500 #[test]
501 fn maybe_fill_in_with_empty_source() {
502 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
503 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
504 &cache,
505 2,
506 |_hash| { 1 },
507 );
508
509 let sample_key = sample.min_frequency_key();
510 assert_eq!(None, sample_key);
511 }
512
513 #[test]
514 fn maybe_fill_in_with_source_having_keys_to_fill() {
515 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
516 cache.insert(1, WeightedKey::new("disk", 3040, 3));
517 cache.insert(2, WeightedKey::new("topic", 1090, 4));
518 cache.insert(3, WeightedKey::new("SSD", 1290, 3));
519
520 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
521 &cache,
522 2,
523 |_hash| { 1 },
524 );
525
526 assert_eq!(2, sample.size());
527
528 let _ = sample.min_frequency_key();
529 let _ = sample.maybe_fill_in();
530
531 assert_eq!(2, sample.size());
532 }
533
534 #[test]
535 fn maybe_fill_in_with_source_not_having_keys_to_fill() {
536 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
537 cache.insert(1, WeightedKey::new("disk", 3040, 3));
538 cache.insert(2, WeightedKey::new("topic", 1090, 4));
539
540 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
541 &cache,
542 2,
543 |_hash| { 1 },
544 );
545
546 assert_eq!(2, sample.size());
547 let _ = sample.min_frequency_key();
548
549 cache.remove(&1);
550 cache.remove(&2);
551 let _ = sample.maybe_fill_in();
552
553 assert_eq!(1, sample.size());
554 }
555
556 #[test]
557 fn maybe_fill_in_with_source_having_an_existing_sample_key_to_fill() {
558 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
559 cache.insert(1, WeightedKey::new("disk", 3040, 3));
560 cache.insert(2, WeightedKey::new("topic", 1090, 4));
561
562 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
563 &cache,
564 2,
565 |hash| match hash {
566 3040 => 1,
567 1090 => 2,
568 _ => 0
569 },
570 );
571
572 assert_eq!(2, sample.size());
573 let _ = sample.min_frequency_key();
574
575 cache.remove(&1);
576 let _ = sample.maybe_fill_in();
577
578 assert_eq!(1, sample.size());
579 }
580
581 #[test]
582 fn maybe_fill_in_with_the_sample_already_containing_the_source_keys() {
583 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
584 cache.insert(1, WeightedKey::new("disk", 3040, 3));
585 cache.insert(2, WeightedKey::new("topic", 1090, 4));
586
587 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
588 &cache,
589 2,
590 |_hash| { 1 },
591 );
592
593 assert_eq!(2, sample.size());
594 let _ = sample.maybe_fill_in();
595 assert_eq!(2, sample.size());
596 }
597
598 #[test]
599 fn sample_keys_with_distinct_frequencies() {
600 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
601 cache.insert(1, WeightedKey::new("disk", 3040, 3));
602 cache.insert(2, WeightedKey::new("topic", 1090, 4));
603 cache.insert(3, WeightedKey::new("SSD", 1290, 3));
604
605 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
606 &cache,
607 3,
608 |hash| {
609 match hash {
610 3040 => 1,
611 1090 => 2,
612 1290 => 3,
613 _ => 0
614 }
615 },
616 );
617
618 assert_eq!(1, sample.min_frequency_key().unwrap().estimated_frequency);
619 assert_eq!(2, sample.min_frequency_key().unwrap().estimated_frequency);
620 assert_eq!(3, sample.min_frequency_key().unwrap().estimated_frequency);
621 }
622
623 #[test]
624 fn sample_keys_with_same_frequencies() {
625 let cache: DashMap<KeyId, WeightedKey<&str>> = DashMap::new();
626 cache.insert(10, WeightedKey::new("disk", 3040, 5));
627 cache.insert(20, WeightedKey::new("topic", 1090, 2));
628 cache.insert(30, WeightedKey::new("SSD", 1290, 3));
629
630 let mut sample = FrequencyCounterBasedMinHeapSamples::new(
631 &cache,
632 3,
633 |hash| {
634 match hash {
635 3040 => 1,
636 1090 => 2,
637 1290 => 1,
638 _ => 0
639 }
640 },
641 );
642
643 let sampled_key = sample.min_frequency_key().unwrap();
644 assert_eq!(1, sampled_key.estimated_frequency);
645 assert_eq!(5, sampled_key.weight);
646 assert_eq!(10, sampled_key.id);
647
648 let sampled_key = sample.min_frequency_key().unwrap();
649 assert_eq!(1, sampled_key.estimated_frequency);
650 assert_eq!(3, sampled_key.weight);
651 assert_eq!(30, sampled_key.id);
652
653 let sampled_key = sample.min_frequency_key().unwrap();
654 assert_eq!(2, sampled_key.estimated_frequency);
655 assert_eq!(2, sampled_key.weight);
656 assert_eq!(20, sampled_key.id);
657 }
658}