bloom_lib/topk.rs
1//! A Top-K tracker for the most frequent items in a stream.
2
3use core::hash::{BuildHasher, Hash};
4
5use alloc::vec::Vec;
6
7use crate::{count_min::CountMinSketch, hash::DefaultHashBuilder, Error};
8
9/// One monitored item and its estimated frequency.
10#[derive(Debug, Clone)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
12struct Entry<T> {
13 key: T,
14 count: u64,
15}
16
17/// Tracks the `k` most frequent items in a stream using bounded memory.
18///
19/// A Top-K tracker combines a [`CountMinSketch`](crate::CountMinSketch) — which
20/// estimates how often *any* item has been seen — with a small list of the `k`
21/// highest-frequency items observed so far. Every insertion updates the sketch
22/// and, when the item's estimated count is high enough, promotes it into the
23/// monitored list, evicting the current minimum. Memory is bounded by the sketch
24/// size plus `k` stored keys, regardless of how many distinct items flow
25/// through.
26///
27/// The estimate of *which* items are in the top set is approximate: under heavy
28/// churn a true heavy hitter can be missed if it never accumulates a high enough
29/// estimate while resident, but frequent, stable items are reported reliably.
30///
31/// Unlike the other structures, `TopK` stores the keys themselves, so the item
32/// type must be `Eq + Hash` and sized. A key is moved into the monitored list
33/// only when it is promoted, so non-promoted insertions store nothing.
34///
35/// # Examples
36///
37/// ```
38/// use bloom_lib::TopK;
39///
40/// let mut top = TopK::new(3, 0.001, 0.001).unwrap();
41///
42/// for _ in 0..100 { top.insert("apple"); }
43/// for _ in 0..50 { top.insert("banana"); }
44/// for _ in 0..10 { top.insert("cherry"); }
45/// for _ in 0..1 { top.insert("date"); }
46///
47/// let ranked = top.top();
48/// assert_eq!(ranked[0].0, &"apple");
49/// assert_eq!(ranked[1].0, &"banana");
50/// assert_eq!(ranked.len(), 3); // only k items are kept
51/// ```
52#[derive(Debug, Clone)]
53#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
54#[cfg_attr(
55 feature = "serde",
56 serde(bound(
57 serialize = "CountMinSketch<T, S>: serde::Serialize, T: serde::Serialize",
58 deserialize = "CountMinSketch<T, S>: serde::Deserialize<'de>, T: serde::Deserialize<'de>"
59 ))
60)]
61pub struct TopK<T, S = DefaultHashBuilder> {
62 sketch: CountMinSketch<T, S>,
63 entries: Vec<Entry<T>>,
64 k: usize,
65}
66
67impl<T> TopK<T, DefaultHashBuilder>
68where
69 T: Hash + Eq,
70{
71 /// Creates a tracker for the top `k` items, with an underlying sketch sized
72 /// for error `epsilon` at confidence `1 - delta`, using the default hasher.
73 ///
74 /// # Parameters
75 ///
76 /// - `k`: how many top items to keep. Must be non-zero.
77 /// - `epsilon`: the sketch error factor, in `(0.0, 1.0)`. Smaller is more
78 /// accurate and uses more memory.
79 /// - `delta`: the sketch failure probability, in `(0.0, 1.0)`.
80 ///
81 /// # Errors
82 ///
83 /// Returns [`Error::InvalidParameter`] if `k` is zero, or if `epsilon` or
84 /// `delta` is not a finite value in `(0.0, 1.0)`.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use bloom_lib::TopK;
90 ///
91 /// let top = TopK::<&str>::new(10, 0.001, 0.001).unwrap();
92 /// assert_eq!(top.k(), 10);
93 /// assert!(top.is_empty());
94 /// ```
95 pub fn new(k: usize, epsilon: f64, delta: f64) -> Result<Self, Error> {
96 Self::with_hasher(k, epsilon, delta, DefaultHashBuilder)
97 }
98}
99
100impl<T, S> TopK<T, S>
101where
102 T: Hash + Eq,
103 S: BuildHasher,
104{
105 /// Creates a tracker with a caller-supplied hasher.
106 ///
107 /// # Errors
108 ///
109 /// Returns [`Error::InvalidParameter`] if `k` is zero, or if `epsilon` or
110 /// `delta` is not a finite value in `(0.0, 1.0)`.
111 ///
112 /// # Examples
113 ///
114 /// ```
115 /// # #[cfg(feature = "std")] {
116 /// use std::collections::hash_map::RandomState;
117 /// use bloom_lib::TopK;
118 ///
119 /// let top: TopK<&str, RandomState> =
120 /// TopK::with_hasher(10, 0.001, 0.001, RandomState::new()).unwrap();
121 /// # }
122 /// ```
123 pub fn with_hasher(k: usize, epsilon: f64, delta: f64, hasher: S) -> Result<Self, Error> {
124 if k == 0 {
125 return Err(Error::InvalidParameter {
126 param: "k",
127 reason: "must be greater than zero",
128 });
129 }
130 let sketch = CountMinSketch::with_hasher(epsilon, delta, hasher)?;
131 Ok(Self {
132 sketch,
133 entries: Vec::with_capacity(k),
134 k,
135 })
136 }
137
138 /// Records one occurrence of `item`, updating the sketch and the top set.
139 ///
140 /// The key is moved into the monitored list only if it is promoted into the
141 /// top `k`; a non-promoted insertion stores nothing beyond the sketch
142 /// update.
143 ///
144 /// # Examples
145 ///
146 /// ```
147 /// use bloom_lib::TopK;
148 ///
149 /// let mut top = TopK::new(2, 0.01, 0.01).unwrap();
150 /// top.insert("x");
151 /// top.insert("x");
152 /// assert_eq!(top.estimate(&"x"), 2);
153 /// ```
154 pub fn insert(&mut self, item: T) {
155 self.sketch.increment(&item);
156 let estimate = self.sketch.estimate(&item);
157
158 // Already monitored: refresh its count.
159 if let Some(entry) = self.entries.iter_mut().find(|entry| entry.key == item) {
160 entry.count = estimate;
161 return;
162 }
163
164 // Room to spare: admit unconditionally.
165 if self.entries.len() < self.k {
166 self.entries.push(Entry {
167 key: item,
168 count: estimate,
169 });
170 return;
171 }
172
173 // Full: replace the current minimum if this item now outranks it.
174 if let Some((min_index, min_count)) = self
175 .entries
176 .iter()
177 .enumerate()
178 .map(|(index, entry)| (index, entry.count))
179 .min_by_key(|&(_, count)| count)
180 {
181 if estimate > min_count {
182 self.entries[min_index] = Entry {
183 key: item,
184 count: estimate,
185 };
186 }
187 }
188 }
189
190 /// Returns the estimated frequency of `item` from the underlying sketch.
191 ///
192 /// This works for any item, whether or not it is in the top set.
193 ///
194 /// # Examples
195 ///
196 /// ```
197 /// use bloom_lib::TopK;
198 ///
199 /// let mut top = TopK::new(5, 0.001, 0.001).unwrap();
200 /// for _ in 0..7 { top.insert("seven"); }
201 /// assert!(top.estimate(&"seven") >= 7);
202 /// assert_eq!(top.estimate(&"absent"), 0);
203 /// ```
204 #[must_use]
205 pub fn estimate(&self, item: &T) -> u64 {
206 self.sketch.estimate(item)
207 }
208
209 /// Returns the monitored items paired with their estimated counts, ordered
210 /// from most to least frequent.
211 ///
212 /// The returned vector borrows the keys, so no key cloning occurs. Its
213 /// length is at most [`k`](Self::k).
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// use bloom_lib::TopK;
219 ///
220 /// let mut top = TopK::new(2, 0.01, 0.01).unwrap();
221 /// for _ in 0..5 { top.insert("a"); }
222 /// for _ in 0..3 { top.insert("b"); }
223 ///
224 /// let ranked = top.top();
225 /// assert_eq!(ranked[0].0, &"a");
226 /// assert_eq!(ranked[1].0, &"b");
227 /// ```
228 #[must_use]
229 pub fn top(&self) -> Vec<(&T, u64)> {
230 let mut ranked: Vec<(&T, u64)> = self
231 .entries
232 .iter()
233 .map(|entry| (&entry.key, entry.count))
234 .collect();
235 // Highest count first; `sort_unstable` avoids an allocation.
236 ranked.sort_unstable_by_key(|&(_, count)| core::cmp::Reverse(count));
237 ranked
238 }
239
240 /// The configured number of top items to track.
241 #[inline]
242 #[must_use]
243 pub fn k(&self) -> usize {
244 self.k
245 }
246
247 /// The number of items currently monitored (at most [`k`](Self::k)).
248 #[inline]
249 #[must_use]
250 pub fn len(&self) -> usize {
251 self.entries.len()
252 }
253
254 /// Returns `true` if no items are monitored yet.
255 #[inline]
256 #[must_use]
257 pub fn is_empty(&self) -> bool {
258 self.entries.is_empty()
259 }
260
261 /// Clears the sketch and the monitored set.
262 ///
263 /// # Examples
264 ///
265 /// ```
266 /// use bloom_lib::TopK;
267 ///
268 /// let mut top = TopK::new(3, 0.01, 0.01).unwrap();
269 /// top.insert("x");
270 /// top.clear();
271 /// assert!(top.is_empty());
272 /// assert_eq!(top.estimate(&"x"), 0);
273 /// ```
274 pub fn clear(&mut self) {
275 self.sketch.clear();
276 self.entries.clear();
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 #![allow(clippy::unwrap_used)]
283
284 use super::*;
285
286 #[test]
287 fn test_new_rejects_zero_k() {
288 assert!(matches!(
289 TopK::<&str>::new(0, 0.01, 0.01),
290 Err(Error::InvalidParameter { .. })
291 ));
292 }
293
294 #[test]
295 fn test_new_rejects_bad_sketch_params() {
296 assert!(matches!(
297 TopK::<&str>::new(5, 0.0, 0.01),
298 Err(Error::InvalidParameter { .. })
299 ));
300 }
301
302 #[test]
303 fn test_tracks_most_frequent() {
304 let mut top = TopK::new(3, 0.001, 0.001).unwrap();
305 for _ in 0..100 {
306 top.insert("apple");
307 }
308 for _ in 0..50 {
309 top.insert("banana");
310 }
311 for _ in 0..10 {
312 top.insert("cherry");
313 }
314 for _ in 0..1 {
315 top.insert("date");
316 }
317
318 let ranked = top.top();
319 assert_eq!(ranked.len(), 3);
320 assert_eq!(ranked[0].0, &"apple");
321 assert_eq!(ranked[1].0, &"banana");
322 assert_eq!(ranked[2].0, &"cherry");
323 }
324
325 #[test]
326 fn test_counts_are_estimated() {
327 let mut top = TopK::new(5, 0.0001, 0.0001).unwrap();
328 for _ in 0..42 {
329 top.insert("x");
330 }
331 assert!(top.estimate(&"x") >= 42);
332 assert_eq!(top.estimate(&"never-seen"), 0);
333 }
334
335 #[test]
336 fn test_len_capped_at_k() {
337 let mut top = TopK::new(2, 0.01, 0.01).unwrap();
338 for i in 0..100u32 {
339 top.insert(i);
340 }
341 assert_eq!(top.len(), 2);
342 assert!(top.len() <= top.k());
343 }
344
345 #[test]
346 fn test_eviction_replaces_minimum() {
347 let mut top = TopK::new(2, 0.0001, 0.0001).unwrap();
348 for _ in 0..10 {
349 top.insert("a");
350 }
351 for _ in 0..5 {
352 top.insert("b");
353 }
354 // "c" arrives with a higher count than the minimum ("b").
355 for _ in 0..8 {
356 top.insert("c");
357 }
358 let ranked = top.top();
359 let keys: Vec<&str> = ranked.iter().map(|&(key, _)| *key).collect();
360 assert!(keys.contains(&"a"));
361 assert!(keys.contains(&"c"));
362 assert!(!keys.contains(&"b"), "low-frequency item should be evicted");
363 }
364
365 #[test]
366 fn test_clear() {
367 let mut top = TopK::new(3, 0.01, 0.01).unwrap();
368 top.insert("x");
369 assert!(!top.is_empty());
370 top.clear();
371 assert!(top.is_empty());
372 assert_eq!(top.estimate(&"x"), 0);
373 }
374
375 #[test]
376 fn test_top_is_sorted_descending() {
377 let mut top = TopK::new(4, 0.0001, 0.0001).unwrap();
378 for _ in 0..3 {
379 top.insert("low");
380 }
381 for _ in 0..30 {
382 top.insert("high");
383 }
384 for _ in 0..15 {
385 top.insert("mid");
386 }
387 let ranked = top.top();
388 for pair in ranked.windows(2) {
389 assert!(pair[0].1 >= pair[1].1, "top() not sorted descending");
390 }
391 }
392}