Skip to main content

anomstream_core/
space_saving.rs

1//! Space-Saving — deterministic top-K heavy hitters in `O(K)`
2//! memory.
3//!
4//! Maintains at most `K` monitored keys with `(estimate, error)`
5//! pairs. On each observation `x`:
6//!
7//! - `x` already tracked → increment its estimate.
8//! - `x` new and the table has fewer than `K` entries → insert
9//!   with `estimate = 1`, `error = 0`.
10//! - `x` new and the table is full → evict the current minimum
11//!   entry `m` and insert `x` with `estimate = m.estimate + 1`,
12//!   `error = m.estimate`. The error tracks the worst-case
13//!   overestimate so callers can bound uncertainty.
14//!
15//! Guarantees (Metwally et al. 2005):
16//!
17//! - Every key with true frequency `> N/K` is retained.
18//! - `estimate(x) − error(x) ≤ true_count(x) ≤ estimate(x)`.
19//!
20//! Memory is `O(K)` — typical `K = 128` costs ≈ 4 KiB for
21//! 16-byte keys (IPv6 addresses / flow-hash tuples). Complements
22//! [`crate::CountMinSketch`] — where CMS is probabilistic per-key
23//! frequency, Space-Saving is deterministic top-K under a fixed
24//! memory cap.
25//!
26//! Per-observe cost: `O(1)` on the tracked-key path, `O(K)` on
27//! the evict path (linear scan for the current minimum). For
28//! typical SOC heavy-hitter tables (`K` in the low hundreds) this
29//! is sub-microsecond. If per-packet observe is required at
30//! 10 Gbps ingress, prefer a two-stage pipeline: pre-filter via
31//! [`crate::CountMinSketch`] (saturating inserts at fixed cost),
32//! refresh Space-Saving on an aggregation cadence.
33//!
34//! Gated behind the `std` feature because the inner store uses
35//! [`std::collections::HashMap`].
36//!
37//! # Reference
38//!
39//! A. Metwally, D. Agrawal, A. El Abbadi, "Efficient Computation
40//! of Frequent and Top-k Elements in Data Streams", ICDT 2005.
41
42use alloc::vec::Vec;
43use core::hash::Hash;
44use std::collections::HashMap;
45
46use crate::error::{RcfError, RcfResult};
47
48/// Default capacity — tracks 128 heavy hitters, ~4 KiB for
49/// 16-byte keys.
50pub const DEFAULT_CAPACITY: usize = 128;
51
52/// One entry in the monitored table.
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
55pub struct HeavyHitterEntry {
56    /// Caller-facing frequency estimate. Overestimate bounded by
57    /// `error`.
58    pub estimate: u64,
59    /// Worst-case overestimate: `true_count = estimate − error`.
60    /// Zero for keys inserted before capacity was reached.
61    pub error: u64,
62}
63
64/// Scored entry returned by [`SpaceSaving::top_k`]. `K` carries
65/// the caller's key type so the report keeps the original
66/// identifier (no hash collision on IP addresses, flow tuples,
67/// etc.).
68#[derive(Debug, Clone, PartialEq, Eq)]
69#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70pub struct HeavyHitter<K> {
71    /// Rank in the top-K (0 = highest estimate).
72    pub rank: u32,
73    /// Caller-supplied key.
74    pub key: K,
75    /// Frequency estimate (always `≥ true_count`).
76    pub estimate: u64,
77    /// Worst-case overestimate (`estimate − true_count`
78    /// upper-bound).
79    pub error: u64,
80}
81
82/// Streaming top-K tracker with `O(K)` memory.
83///
84/// # Examples
85///
86/// ```
87/// use anomstream_core::SpaceSaving;
88///
89/// let mut ss: SpaceSaving<u32> = SpaceSaving::with_default_capacity().unwrap();
90/// // 1 000 observations of "10" versus 5 of "99".
91/// for _ in 0..1_000 {
92///     ss.observe(10_u32);
93/// }
94/// for _ in 0..5 {
95///     ss.observe(99_u32);
96/// }
97/// let top = ss.top_k(1);
98/// assert_eq!(top[0].key, 10);
99/// assert_eq!(top[0].estimate, 1_000);
100/// ```
101#[derive(Debug, Clone)]
102#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
103pub struct SpaceSaving<K>
104where
105    K: Hash + Eq + Clone,
106{
107    /// Bounded table — at most `capacity` entries.
108    counts: HashMap<K, HeavyHitterEntry>,
109    /// Maximum `counts` length.
110    capacity: usize,
111    /// Total observations — ops signal, also the divisor of the
112    /// `N/K` frequency guarantee.
113    total: u64,
114}
115
116impl<K> SpaceSaving<K>
117where
118    K: Hash + Eq + Clone,
119{
120    /// Build a tracker with caller-chosen `capacity`.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`RcfError::InvalidConfig`] on `capacity == 0`.
125    pub fn new(capacity: usize) -> RcfResult<Self> {
126        if capacity == 0 {
127            return Err(RcfError::InvalidConfig(
128                alloc::string::ToString::to_string("SpaceSaving: capacity must be > 0").into(),
129            ));
130        }
131        Ok(Self {
132            counts: HashMap::with_capacity(capacity),
133            capacity,
134            total: 0,
135        })
136    }
137
138    /// Default tracker — `capacity = 128`.
139    ///
140    /// # Errors
141    ///
142    /// Never in practice — [`DEFAULT_CAPACITY`] is a positive
143    /// compile-time constant.
144    pub fn with_default_capacity() -> RcfResult<Self> {
145        Self::new(DEFAULT_CAPACITY)
146    }
147
148    /// Configured capacity (max tracked keys).
149    #[must_use]
150    pub fn capacity(&self) -> usize {
151        self.capacity
152    }
153
154    /// Current number of tracked keys (`≤ capacity`).
155    #[must_use]
156    pub fn len(&self) -> usize {
157        self.counts.len()
158    }
159
160    /// `true` when the table holds no keys.
161    #[must_use]
162    pub fn is_empty(&self) -> bool {
163        self.counts.is_empty()
164    }
165
166    /// Total observations — sum of every [`Self::observe`] weight.
167    #[must_use]
168    pub fn total(&self) -> u64 {
169        self.total
170    }
171
172    /// Worst-case per-estimate error bound (`N/K`). Keys whose
173    /// true frequency exceeds this are guaranteed to be tracked.
174    #[must_use]
175    pub fn error_bound(&self) -> u64 {
176        if self.capacity == 0 {
177            return 0;
178        }
179        self.total / (self.capacity as u64)
180    }
181
182    /// Ingest one occurrence of `key` with unit weight.
183    #[inline]
184    pub fn observe(&mut self, key: K) {
185        self.observe_weighted(key, 1);
186    }
187
188    /// Ingest `key` with caller-supplied `weight` — byte-count
189    /// heavy hitters in NDR workloads (per-packet bytes, not
190    /// just packet counts).
191    #[inline]
192    pub fn observe_weighted(&mut self, key: K, weight: u64) {
193        if weight == 0 {
194            return;
195        }
196        self.total = self.total.saturating_add(weight);
197
198        if let Some(entry) = self.counts.get_mut(&key) {
199            entry.estimate = entry.estimate.saturating_add(weight);
200            return;
201        }
202
203        if self.counts.len() < self.capacity {
204            self.counts.insert(
205                key,
206                HeavyHitterEntry {
207                    estimate: weight,
208                    error: 0,
209                },
210            );
211            return;
212        }
213
214        // Table full — evict current minimum, reinsert `key` with
215        // `estimate = min.estimate + weight` and `error = min.estimate`.
216        if let Some((min_key, min_entry)) = self.find_min() {
217            self.counts.remove(&min_key);
218            let boosted = HeavyHitterEntry {
219                estimate: min_entry.estimate.saturating_add(weight),
220                error: min_entry.estimate,
221            };
222            self.counts.insert(key, boosted);
223        }
224    }
225
226    /// Frequency estimate for `key`. Returns `None` when the key
227    /// is not tracked (its true count may still be up to
228    /// [`Self::error_bound`]).
229    #[must_use]
230    pub fn estimate(&self, key: &K) -> Option<HeavyHitterEntry> {
231        self.counts.get(key).copied()
232    }
233
234    /// Ranked top-`n` snapshot — sorted by descending estimate.
235    /// `n` is clamped to [`Self::len`].
236    #[must_use]
237    pub fn top_k(&self, n: usize) -> Vec<HeavyHitter<K>> {
238        let mut entries: Vec<(K, HeavyHitterEntry)> =
239            self.counts.iter().map(|(k, e)| (k.clone(), *e)).collect();
240        entries.sort_by_key(|(_, e)| core::cmp::Reverse(e.estimate));
241        entries.truncate(n);
242        entries
243            .into_iter()
244            .enumerate()
245            .map(|(idx, (k, e))| HeavyHitter {
246                rank: u32::try_from(idx).unwrap_or(u32::MAX),
247                key: k,
248                estimate: e.estimate,
249                error: e.error,
250            })
251            .collect()
252    }
253
254    /// Iterate every tracked `(&K, HeavyHitterEntry)` in
255    /// insertion order (not ranked). Useful for ad-hoc scans /
256    /// serialisation sinks.
257    pub fn iter(&self) -> impl Iterator<Item = (&K, &HeavyHitterEntry)> {
258        self.counts.iter()
259    }
260
261    /// Drop every tracked key. Allocation is preserved.
262    pub fn reset(&mut self) {
263        self.counts.clear();
264        self.total = 0;
265    }
266
267    /// `O(K)` linear scan for the minimum-estimate entry. Returns
268    /// `None` when the table is empty — callers on the evict path
269    /// have already confirmed non-emptiness, so `None` is treated
270    /// as a no-op insert.
271    fn find_min(&self) -> Option<(K, HeavyHitterEntry)> {
272        self.counts
273            .iter()
274            .min_by_key(|(_, e)| e.estimate)
275            .map(|(k, e)| (k.clone(), *e))
276    }
277}
278
279#[cfg(test)]
280#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
281mod tests {
282    use super::*;
283
284    #[test]
285    fn new_rejects_zero_capacity() {
286        assert!(SpaceSaving::<u32>::new(0).is_err());
287    }
288
289    #[test]
290    fn exact_counts_within_capacity() {
291        let mut ss: SpaceSaving<u32> = SpaceSaving::new(8).unwrap();
292        for i in 0..5_u32 {
293            for _ in 0..=u64::from(i) {
294                ss.observe(i);
295            }
296        }
297        // 5 distinct keys, capacity 8 → every count is exact,
298        // zero error.
299        let top = ss.top_k(5);
300        assert_eq!(top.len(), 5);
301        for hh in &top {
302            assert_eq!(hh.error, 0);
303        }
304        assert_eq!(top[0].key, 4);
305        assert_eq!(top[0].estimate, 5);
306    }
307
308    #[test]
309    fn heavy_hitter_always_retained() {
310        // Heavy hitter with frequency > N/K must never be evicted.
311        let mut ss: SpaceSaving<u32> = SpaceSaving::new(8).unwrap();
312        // 1 000 observations of key 0 → 500× larger than the
313        // N/K = 125 bound for the remaining 992 observations.
314        for _ in 0..1_000 {
315            ss.observe(0_u32);
316        }
317        // Flood with 2 000 unique noise keys.
318        for i in 1..2_001_u32 {
319            ss.observe(i);
320        }
321        let h = ss
322            .top_k(8)
323            .into_iter()
324            .find(|hh| hh.key == 0)
325            .expect("heavy hitter retained");
326        // Estimate ≥ true count.
327        assert!(h.estimate >= 1_000);
328    }
329
330    #[test]
331    fn error_bound_sandwiches_true_count() {
332        let mut ss: SpaceSaving<u32> = SpaceSaving::new(16).unwrap();
333        for i in 0..100_u32 {
334            for _ in 0..10 {
335                ss.observe(i);
336            }
337        }
338        for hh in ss.top_k(16) {
339            // estimate − error ≤ true_count ≤ estimate
340            assert!(hh.estimate >= hh.error);
341            let lower = hh.estimate - hh.error;
342            assert!(lower <= 10, "lower={lower}");
343            assert!(hh.estimate >= 10 || hh.error > 0);
344        }
345    }
346
347    #[test]
348    fn estimate_returns_none_for_untracked() {
349        let mut ss: SpaceSaving<u32> = SpaceSaving::new(2).unwrap();
350        ss.observe(1);
351        ss.observe(2);
352        for _ in 0..5 {
353            ss.observe(3);
354        }
355        // Capacity 2, 3 becomes tracked via eviction of min.
356        assert!(ss.estimate(&3).is_some());
357        // 100 is never observed.
358        assert!(ss.estimate(&100).is_none());
359    }
360
361    #[test]
362    fn weighted_observe_accumulates() {
363        let mut ss: SpaceSaving<u32> = SpaceSaving::new(4).unwrap();
364        ss.observe_weighted(7, 1_000);
365        ss.observe_weighted(7, 500);
366        let h = ss.estimate(&7).expect("tracked");
367        assert_eq!(h.estimate, 1_500);
368        assert_eq!(ss.total(), 1_500);
369    }
370
371    #[test]
372    fn zero_weight_is_noop() {
373        let mut ss: SpaceSaving<u32> = SpaceSaving::new(4).unwrap();
374        ss.observe_weighted(1, 0);
375        assert!(ss.is_empty());
376        assert_eq!(ss.total(), 0);
377    }
378
379    #[test]
380    fn error_bound_grows_linearly() {
381        let mut ss: SpaceSaving<u32> = SpaceSaving::new(10).unwrap();
382        for i in 0..1_000_u32 {
383            ss.observe(i);
384        }
385        // N/K = 1000/10 = 100.
386        assert_eq!(ss.error_bound(), 100);
387    }
388
389    #[test]
390    fn top_k_ranks_descending() {
391        let mut ss: SpaceSaving<u32> = SpaceSaving::new(8).unwrap();
392        for (key, count) in [(1_u32, 100_u64), (2, 50), (3, 25), (4, 10)] {
393            for _ in 0..count {
394                ss.observe(key);
395            }
396        }
397        let top = ss.top_k(4);
398        assert_eq!(top[0].key, 1);
399        assert_eq!(top[1].key, 2);
400        assert_eq!(top[2].key, 3);
401        assert_eq!(top[3].key, 4);
402        assert_eq!(top[0].rank, 0);
403        assert_eq!(top[3].rank, 3);
404    }
405
406    #[test]
407    fn top_k_clamps_to_len() {
408        let mut ss: SpaceSaving<u32> = SpaceSaving::new(8).unwrap();
409        ss.observe(1);
410        assert_eq!(ss.top_k(10).len(), 1);
411        assert_eq!(ss.top_k(0).len(), 0);
412    }
413
414    #[test]
415    fn reset_clears_everything() {
416        let mut ss: SpaceSaving<u32> = SpaceSaving::new(4).unwrap();
417        for i in 0..100_u32 {
418            ss.observe(i);
419        }
420        ss.reset();
421        assert!(ss.is_empty());
422        assert_eq!(ss.total(), 0);
423        assert_eq!(ss.top_k(4).len(), 0);
424    }
425
426    #[test]
427    fn byte_key_roundtrip() {
428        let mut ss: SpaceSaving<[u8; 16]> = SpaceSaving::new(4).unwrap();
429        let k = [0x01_u8; 16];
430        for _ in 0..10 {
431            ss.observe(k);
432        }
433        assert_eq!(ss.estimate(&k).unwrap().estimate, 10);
434    }
435
436    #[cfg(all(feature = "serde", feature = "postcard"))]
437    #[test]
438    fn postcard_roundtrip_preserves_top_k() {
439        let mut ss: SpaceSaving<u32> = SpaceSaving::new(8).unwrap();
440        for i in 0..20_u32 {
441            for _ in 0..=u64::from(i) {
442                ss.observe(i);
443            }
444        }
445        let bytes = postcard::to_allocvec(&ss).expect("serde ok");
446        let back: SpaceSaving<u32> = postcard::from_bytes(&bytes).expect("serde ok");
447        let a = ss.top_k(8);
448        let b = back.top_k(8);
449        for (x, y) in a.iter().zip(b.iter()) {
450            assert_eq!(x.key, y.key);
451            assert_eq!(x.estimate, y.estimate);
452            assert_eq!(x.error, y.error);
453        }
454    }
455}