Skip to main content

bloom_lib/
topk.rs

1//! A Top-K tracker for the most frequent items in a stream.
2
3use core::hash::{BuildHasher, Hash};
4
5use alloc::vec::Vec;
6
7use crate::{count_min::CountMinSketch, hash::DefaultHashBuilder, Error};
8
9/// One monitored item and its estimated frequency.
10#[derive(Debug, Clone)]
11#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
12struct Entry<T> {
13    key: T,
14    count: u64,
15}
16
17/// Tracks the `k` most frequent items in a stream using bounded memory.
18///
19/// A Top-K tracker combines a [`CountMinSketch`](crate::CountMinSketch) — which
20/// estimates how often *any* item has been seen — with a small list of the `k`
21/// highest-frequency items observed so far. Every insertion updates the sketch
22/// and, when the item's estimated count is high enough, promotes it into the
23/// monitored list, evicting the current minimum. Memory is bounded by the sketch
24/// size plus `k` stored keys, regardless of how many distinct items flow
25/// through.
26///
27/// The estimate of *which* items are in the top set is approximate: under heavy
28/// churn a true heavy hitter can be missed if it never accumulates a high enough
29/// estimate while resident, but frequent, stable items are reported reliably.
30///
31/// Unlike the other structures, `TopK` stores the keys themselves, so the item
32/// type must be `Eq + Hash` and sized. A key is moved into the monitored list
33/// only when it is promoted, so non-promoted insertions store nothing.
34///
35/// # Examples
36///
37/// ```
38/// use bloom_lib::TopK;
39///
40/// let mut top = TopK::new(3, 0.001, 0.001).unwrap();
41///
42/// for _ in 0..100 { top.insert("apple"); }
43/// for _ in 0..50  { top.insert("banana"); }
44/// for _ in 0..10  { top.insert("cherry"); }
45/// for _ in 0..1   { top.insert("date"); }
46///
47/// let ranked = top.top();
48/// assert_eq!(ranked[0].0, &"apple");
49/// assert_eq!(ranked[1].0, &"banana");
50/// assert_eq!(ranked.len(), 3); // only k items are kept
51/// ```
52#[derive(Debug, Clone)]
53#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
54#[cfg_attr(
55    feature = "serde",
56    serde(bound(
57        serialize = "CountMinSketch<T, S>: serde::Serialize, T: serde::Serialize",
58        deserialize = "CountMinSketch<T, S>: serde::Deserialize<'de>, T: serde::Deserialize<'de>"
59    ))
60)]
61pub struct TopK<T, S = DefaultHashBuilder> {
62    sketch: CountMinSketch<T, S>,
63    entries: Vec<Entry<T>>,
64    k: usize,
65}
66
67impl<T> TopK<T, DefaultHashBuilder>
68where
69    T: Hash + Eq,
70{
71    /// Creates a tracker for the top `k` items, with an underlying sketch sized
72    /// for error `epsilon` at confidence `1 - delta`, using the default hasher.
73    ///
74    /// # Parameters
75    ///
76    /// - `k`: how many top items to keep. Must be non-zero.
77    /// - `epsilon`: the sketch error factor, in `(0.0, 1.0)`. Smaller is more
78    ///   accurate and uses more memory.
79    /// - `delta`: the sketch failure probability, in `(0.0, 1.0)`.
80    ///
81    /// # Errors
82    ///
83    /// Returns [`Error::InvalidParameter`] if `k` is zero, or if `epsilon` or
84    /// `delta` is not a finite value in `(0.0, 1.0)`.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// use bloom_lib::TopK;
90    ///
91    /// let top = TopK::<&str>::new(10, 0.001, 0.001).unwrap();
92    /// assert_eq!(top.k(), 10);
93    /// assert!(top.is_empty());
94    /// ```
95    pub fn new(k: usize, epsilon: f64, delta: f64) -> Result<Self, Error> {
96        Self::with_hasher(k, epsilon, delta, DefaultHashBuilder)
97    }
98}
99
100impl<T, S> TopK<T, S>
101where
102    T: Hash + Eq,
103    S: BuildHasher,
104{
105    /// Creates a tracker with a caller-supplied hasher.
106    ///
107    /// # Errors
108    ///
109    /// Returns [`Error::InvalidParameter`] if `k` is zero, or if `epsilon` or
110    /// `delta` is not a finite value in `(0.0, 1.0)`.
111    ///
112    /// # Examples
113    ///
114    /// ```
115    /// # #[cfg(feature = "std")] {
116    /// use std::collections::hash_map::RandomState;
117    /// use bloom_lib::TopK;
118    ///
119    /// let top: TopK<&str, RandomState> =
120    ///     TopK::with_hasher(10, 0.001, 0.001, RandomState::new()).unwrap();
121    /// # }
122    /// ```
123    pub fn with_hasher(k: usize, epsilon: f64, delta: f64, hasher: S) -> Result<Self, Error> {
124        if k == 0 {
125            return Err(Error::InvalidParameter {
126                param: "k",
127                reason: "must be greater than zero",
128            });
129        }
130        let sketch = CountMinSketch::with_hasher(epsilon, delta, hasher)?;
131        Ok(Self {
132            sketch,
133            entries: Vec::with_capacity(k),
134            k,
135        })
136    }
137
138    /// Records one occurrence of `item`, updating the sketch and the top set.
139    ///
140    /// The key is moved into the monitored list only if it is promoted into the
141    /// top `k`; a non-promoted insertion stores nothing beyond the sketch
142    /// update.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use bloom_lib::TopK;
148    ///
149    /// let mut top = TopK::new(2, 0.01, 0.01).unwrap();
150    /// top.insert("x");
151    /// top.insert("x");
152    /// assert_eq!(top.estimate(&"x"), 2);
153    /// ```
154    pub fn insert(&mut self, item: T) {
155        self.sketch.increment(&item);
156        let estimate = self.sketch.estimate(&item);
157
158        // Already monitored: refresh its count.
159        if let Some(entry) = self.entries.iter_mut().find(|entry| entry.key == item) {
160            entry.count = estimate;
161            return;
162        }
163
164        // Room to spare: admit unconditionally.
165        if self.entries.len() < self.k {
166            self.entries.push(Entry {
167                key: item,
168                count: estimate,
169            });
170            return;
171        }
172
173        // Full: replace the current minimum if this item now outranks it.
174        if let Some((min_index, min_count)) = self
175            .entries
176            .iter()
177            .enumerate()
178            .map(|(index, entry)| (index, entry.count))
179            .min_by_key(|&(_, count)| count)
180        {
181            if estimate > min_count {
182                self.entries[min_index] = Entry {
183                    key: item,
184                    count: estimate,
185                };
186            }
187        }
188    }
189
190    /// Returns the estimated frequency of `item` from the underlying sketch.
191    ///
192    /// This works for any item, whether or not it is in the top set.
193    ///
194    /// # Examples
195    ///
196    /// ```
197    /// use bloom_lib::TopK;
198    ///
199    /// let mut top = TopK::new(5, 0.001, 0.001).unwrap();
200    /// for _ in 0..7 { top.insert("seven"); }
201    /// assert!(top.estimate(&"seven") >= 7);
202    /// assert_eq!(top.estimate(&"absent"), 0);
203    /// ```
204    #[must_use]
205    pub fn estimate(&self, item: &T) -> u64 {
206        self.sketch.estimate(item)
207    }
208
209    /// Returns the monitored items paired with their estimated counts, ordered
210    /// from most to least frequent.
211    ///
212    /// The returned vector borrows the keys, so no key cloning occurs. Its
213    /// length is at most [`k`](Self::k).
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// use bloom_lib::TopK;
219    ///
220    /// let mut top = TopK::new(2, 0.01, 0.01).unwrap();
221    /// for _ in 0..5 { top.insert("a"); }
222    /// for _ in 0..3 { top.insert("b"); }
223    ///
224    /// let ranked = top.top();
225    /// assert_eq!(ranked[0].0, &"a");
226    /// assert_eq!(ranked[1].0, &"b");
227    /// ```
228    #[must_use]
229    pub fn top(&self) -> Vec<(&T, u64)> {
230        let mut ranked: Vec<(&T, u64)> = self
231            .entries
232            .iter()
233            .map(|entry| (&entry.key, entry.count))
234            .collect();
235        // Highest count first; `sort_unstable` avoids an allocation.
236        ranked.sort_unstable_by_key(|&(_, count)| core::cmp::Reverse(count));
237        ranked
238    }
239
240    /// The configured number of top items to track.
241    #[inline]
242    #[must_use]
243    pub fn k(&self) -> usize {
244        self.k
245    }
246
247    /// The number of items currently monitored (at most [`k`](Self::k)).
248    #[inline]
249    #[must_use]
250    pub fn len(&self) -> usize {
251        self.entries.len()
252    }
253
254    /// Returns `true` if no items are monitored yet.
255    #[inline]
256    #[must_use]
257    pub fn is_empty(&self) -> bool {
258        self.entries.is_empty()
259    }
260
261    /// Clears the sketch and the monitored set.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// use bloom_lib::TopK;
267    ///
268    /// let mut top = TopK::new(3, 0.01, 0.01).unwrap();
269    /// top.insert("x");
270    /// top.clear();
271    /// assert!(top.is_empty());
272    /// assert_eq!(top.estimate(&"x"), 0);
273    /// ```
274    pub fn clear(&mut self) {
275        self.sketch.clear();
276        self.entries.clear();
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    #![allow(clippy::unwrap_used)]
283
284    use super::*;
285
286    #[test]
287    fn test_new_rejects_zero_k() {
288        assert!(matches!(
289            TopK::<&str>::new(0, 0.01, 0.01),
290            Err(Error::InvalidParameter { .. })
291        ));
292    }
293
294    #[test]
295    fn test_new_rejects_bad_sketch_params() {
296        assert!(matches!(
297            TopK::<&str>::new(5, 0.0, 0.01),
298            Err(Error::InvalidParameter { .. })
299        ));
300    }
301
302    #[test]
303    fn test_tracks_most_frequent() {
304        let mut top = TopK::new(3, 0.001, 0.001).unwrap();
305        for _ in 0..100 {
306            top.insert("apple");
307        }
308        for _ in 0..50 {
309            top.insert("banana");
310        }
311        for _ in 0..10 {
312            top.insert("cherry");
313        }
314        for _ in 0..1 {
315            top.insert("date");
316        }
317
318        let ranked = top.top();
319        assert_eq!(ranked.len(), 3);
320        assert_eq!(ranked[0].0, &"apple");
321        assert_eq!(ranked[1].0, &"banana");
322        assert_eq!(ranked[2].0, &"cherry");
323    }
324
325    #[test]
326    fn test_counts_are_estimated() {
327        let mut top = TopK::new(5, 0.0001, 0.0001).unwrap();
328        for _ in 0..42 {
329            top.insert("x");
330        }
331        assert!(top.estimate(&"x") >= 42);
332        assert_eq!(top.estimate(&"never-seen"), 0);
333    }
334
335    #[test]
336    fn test_len_capped_at_k() {
337        let mut top = TopK::new(2, 0.01, 0.01).unwrap();
338        for i in 0..100u32 {
339            top.insert(i);
340        }
341        assert_eq!(top.len(), 2);
342        assert!(top.len() <= top.k());
343    }
344
345    #[test]
346    fn test_eviction_replaces_minimum() {
347        let mut top = TopK::new(2, 0.0001, 0.0001).unwrap();
348        for _ in 0..10 {
349            top.insert("a");
350        }
351        for _ in 0..5 {
352            top.insert("b");
353        }
354        // "c" arrives with a higher count than the minimum ("b").
355        for _ in 0..8 {
356            top.insert("c");
357        }
358        let ranked = top.top();
359        let keys: Vec<&str> = ranked.iter().map(|&(key, _)| *key).collect();
360        assert!(keys.contains(&"a"));
361        assert!(keys.contains(&"c"));
362        assert!(!keys.contains(&"b"), "low-frequency item should be evicted");
363    }
364
365    #[test]
366    fn test_clear() {
367        let mut top = TopK::new(3, 0.01, 0.01).unwrap();
368        top.insert("x");
369        assert!(!top.is_empty());
370        top.clear();
371        assert!(top.is_empty());
372        assert_eq!(top.estimate(&"x"), 0);
373    }
374
375    #[test]
376    fn test_top_is_sorted_descending() {
377        let mut top = TopK::new(4, 0.0001, 0.0001).unwrap();
378        for _ in 0..3 {
379            top.insert("low");
380        }
381        for _ in 0..30 {
382            top.insert("high");
383        }
384        for _ in 0..15 {
385            top.insert("mid");
386        }
387        let ranked = top.top();
388        for pair in ranked.windows(2) {
389            assert!(pair[0].1 >= pair[1].1, "top() not sorted descending");
390        }
391    }
392}