Skip to main content

omega_cache/core/
workload.rs

1use crate::core::utils::random_string;
2use bytes::Bytes;
3use dashmap::DashMap;
4use rand::Rng;
5use rand::prelude::*;
6use rand_distr::Zipf;
7use std::cmp::Reverse;
8use std::collections::{BinaryHeap, VecDeque};
9
10/// A Zipfian workload generator for cache benchmarking.
11///
12/// This structure generates a fixed set of random keys and samples them
13/// according to a Power Law distribution. This mimics real-world scenarios
14/// where a small subset of "hot" keys receives the majority of traffic.
15pub struct WorkloadGenerator {
16    /// The pool of pre-generated random strings.
17    keys: Vec<Bytes>,
18    /// The mathematical distribution used to pick indices from `keys`.
19    distribution: Zipf<f64>,
20}
21
22impl WorkloadGenerator {
23    /// Creates a new workload with a specified number of unique keys and skew.
24    ///
25    /// # Arguments
26    /// * `count` - The number of unique logical keys in the workload universe.
27    /// * `skew` - The Zipfian exponent ($s$).
28    ///   * `0.5`: Approaching uniform (flatter distribution).
29    ///   * `1.0`: Standard Zipfian (classic "Power Law").
30    ///   * `1.5`: Highly skewed (very few keys receive almost all traffic).
31    ///
32    /// # Panics
33    /// Panics if `skew` is not within the statistically significant range of `0.5` to `1.5`.
34    #[inline(always)]
35    pub fn new(count: usize, skew: f64) -> Self {
36        assert!(
37            (0.5..=1.5).contains(&skew),
38            "skew must be in range from 0.5 to 1.5"
39        );
40
41        let keys = (0..count).map(|_| Bytes::from(random_string())).collect();
42
43        let distribution = Zipf::new(count as f64, skew).expect("incorrect args");
44
45        Self { keys, distribution }
46    }
47
48    /// Samples a key from the workload using the provided random number generator.
49    ///
50    /// This method is designed for high-concurrency environments; by passing a
51    /// mutable reference to a thread-local `Rng`, it avoids global lock contention.
52    ///
53    /// # Performance
54    /// This operation is $O(1)$ relative to the size of the key strings but follows
55    /// the complexity of the `rand_distr::Zipf` sampling algorithm (typically rejection inversion).
56    #[inline(always)]
57    pub fn key<D: Rng>(&self, distribution: &mut D) -> Bytes {
58        let index = self.distribution.sample(distribution) as usize - 1;
59        self.keys[index].clone()
60    }
61}
62
63/// A thread-safe statistical observer used to validate cache efficiency.
64///
65/// It captures the "Ground Truth" of a workload by counting every key access.
66/// This allows tests to compare the cache's internal state against the
67/// mathematically ideal set of frequent items.
68pub struct WorkloadStatistics {
69    counts: DashMap<Bytes, usize>,
70}
71
72impl Default for WorkloadStatistics {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78impl WorkloadStatistics {
79    /// Creates a new, empty statistics tracker.
80    #[inline(always)]
81    pub fn new() -> Self {
82        Self {
83            counts: Default::default(),
84        }
85    }
86
87    /// Records access to a specific key.
88    ///
89    /// In a concurrent test, multiple threads call this to build a global
90    /// view of key popularity.
91    pub fn record(&self, key: Bytes) {
92        let mut entry = self.counts.entry(key).or_default();
93        *entry += 1;
94    }
95
96    /// Retrieves the `count` most frequently accessed keys, ordered from
97    /// hottest to coldest.
98    ///
99    /// This uses a Min-Heap (via `Reverse`) to maintain a sliding window of the
100    /// top elements, ensuring $O(N \log K)$ time complexity where $N$ is total
101    /// unique keys and $K$ is the requested count.
102    ///
103    /// # Performance
104    /// This method is intended for use at the *end* of a test run, as it
105    /// iterates over the entire frequency map.
106    pub fn frequent_keys(&self, count: usize) -> Vec<Bytes> {
107        let mut top_frequent = BinaryHeap::new();
108
109        for entry in &self.counts {
110            let key = entry.key();
111            let frequency = *entry.value();
112
113            if top_frequent.len() < count {
114                top_frequent.push(Reverse((frequency, key.clone())));
115            } else if let Some(&Reverse((other_frequency, _))) = top_frequent.peek()
116                && other_frequency < frequency
117            {
118                top_frequent.pop();
119                top_frequent.push(Reverse((frequency, key.clone())))
120            }
121        }
122
123        let mut deque = VecDeque::with_capacity(top_frequent.len());
124
125        while let Some(Reverse((_, key))) = top_frequent.pop() {
126            deque.push_front(key);
127        }
128
129        deque.into_iter().collect()
130    }
131}