omega_cache/core/workload.rs
1use crate::core::utils::random_string;
2use bytes::Bytes;
3use dashmap::DashMap;
4use rand::Rng;
5use rand::prelude::*;
6use rand_distr::Zipf;
7use std::cmp::Reverse;
8use std::collections::{BinaryHeap, VecDeque};
9
10/// A Zipfian workload generator for cache benchmarking.
11///
12/// This structure generates a fixed set of random keys and samples them
13/// according to a Power Law distribution. This mimics real-world scenarios
14/// where a small subset of "hot" keys receives the majority of traffic.
15pub struct WorkloadGenerator {
16 /// The pool of pre-generated random strings.
17 keys: Vec<Bytes>,
18 /// The mathematical distribution used to pick indices from `keys`.
19 distribution: Zipf<f64>,
20}
21
22impl WorkloadGenerator {
23 /// Creates a new workload with a specified number of unique keys and skew.
24 ///
25 /// # Arguments
26 /// * `count` - The number of unique logical keys in the workload universe.
27 /// * `skew` - The Zipfian exponent ($s$).
28 /// * `0.5`: Approaching uniform (flatter distribution).
29 /// * `1.0`: Standard Zipfian (classic "Power Law").
30 /// * `1.5`: Highly skewed (very few keys receive almost all traffic).
31 ///
32 /// # Panics
33 /// Panics if `skew` is not within the statistically significant range of `0.5` to `1.5`.
34 #[inline(always)]
35 pub fn new(count: usize, skew: f64) -> Self {
36 assert!(
37 (0.5..=1.5).contains(&skew),
38 "skew must be in range from 0.5 to 1.5"
39 );
40
41 let keys = (0..count).map(|_| Bytes::from(random_string())).collect();
42
43 let distribution = Zipf::new(count as f64, skew).expect("incorrect args");
44
45 Self { keys, distribution }
46 }
47
48 /// Samples a key from the workload using the provided random number generator.
49 ///
50 /// This method is designed for high-concurrency environments; by passing a
51 /// mutable reference to a thread-local `Rng`, it avoids global lock contention.
52 ///
53 /// # Performance
54 /// This operation is $O(1)$ relative to the size of the key strings but follows
55 /// the complexity of the `rand_distr::Zipf` sampling algorithm (typically rejection inversion).
56 #[inline(always)]
57 pub fn key<D: Rng>(&self, distribution: &mut D) -> Bytes {
58 let index = self.distribution.sample(distribution) as usize - 1;
59 self.keys[index].clone()
60 }
61}
62
63/// A thread-safe statistical observer used to validate cache efficiency.
64///
65/// It captures the "Ground Truth" of a workload by counting every key access.
66/// This allows tests to compare the cache's internal state against the
67/// mathematically ideal set of frequent items.
68pub struct WorkloadStatistics {
69 counts: DashMap<Bytes, usize>,
70}
71
72impl Default for WorkloadStatistics {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78impl WorkloadStatistics {
79 /// Creates a new, empty statistics tracker.
80 #[inline(always)]
81 pub fn new() -> Self {
82 Self {
83 counts: Default::default(),
84 }
85 }
86
87 /// Records access to a specific key.
88 ///
89 /// In a concurrent test, multiple threads call this to build a global
90 /// view of key popularity.
91 pub fn record(&self, key: Bytes) {
92 let mut entry = self.counts.entry(key).or_default();
93 *entry += 1;
94 }
95
96 /// Retrieves the `count` most frequently accessed keys, ordered from
97 /// hottest to coldest.
98 ///
99 /// This uses a Min-Heap (via `Reverse`) to maintain a sliding window of the
100 /// top elements, ensuring $O(N \log K)$ time complexity where $N$ is total
101 /// unique keys and $K$ is the requested count.
102 ///
103 /// # Performance
104 /// This method is intended for use at the *end* of a test run, as it
105 /// iterates over the entire frequency map.
106 pub fn frequent_keys(&self, count: usize) -> Vec<Bytes> {
107 let mut top_frequent = BinaryHeap::new();
108
109 for entry in &self.counts {
110 let key = entry.key();
111 let frequency = *entry.value();
112
113 if top_frequent.len() < count {
114 top_frequent.push(Reverse((frequency, key.clone())));
115 } else if let Some(&Reverse((other_frequency, _))) = top_frequent.peek()
116 && other_frequency < frequency
117 {
118 top_frequent.pop();
119 top_frequent.push(Reverse((frequency, key.clone())))
120 }
121 }
122
123 let mut deque = VecDeque::with_capacity(top_frequent.len());
124
125 while let Some(Reverse((_, key))) = top_frequent.pop() {
126 deque.push_front(key);
127 }
128
129 deque.into_iter().collect()
130 }
131}