Skip to main content

anomstream_core/
hyperloglog.rs

1//! `HyperLogLog` — probabilistic distinct-count (cardinality)
2//! estimator.
3//!
4//! `m = 2^p` register bank; each `add(x)` hashes `x` into 64 bits,
5//! uses the top `p` bits as a register index and counts the
6//! leading zeros of the remaining `64 − p` bits. Per-register
7//! state is `max(zeros + 1)` across every element routed there.
8//! Cardinality is recovered by a harmonic mean of `2^(-register)`
9//! with an `α_m` bias correction, plus small-range linear
10//! counting when many registers are still empty.
11//!
12//! Memory: `m` bytes (one `u8` per register). Typical configs:
13//!
14//! | `p` | `m` | memory | standard error |
15//! |---|---|---|---|
16//! | 10 | 1 024  | 1 KiB   | 3.25 %  |
17//! | 12 | 4 096  | 4 KiB   | 1.625 % |
18//! | 14 | 16 384 | 16 KiB  | 0.81 %  |
19//! | 16 | 65 536 | 64 KiB  | 0.40 %  |
20//!
21//! Gated behind the `std` feature because the hash path relies on
22//! [`std::hash::DefaultHasher`] (`SipHash` 1-3).
23//!
24//! # References
25//!
26//! 1. P. Flajolet, É. Fusy, O. Gandouet, F. Meunier,
27//!    "`HyperLogLog`: the analysis of a near-optimal cardinality
28//!    estimation algorithm", `AofA` 2007.
29//! 2. S. Heule, M. Nunkesser, A. Hall, "`HyperLogLog` in Practice:
30//!    Algorithmic Engineering of a State of the Art Cardinality
31//!    Estimation Algorithm", EDBT 2013.
32
33use alloc::vec;
34use alloc::vec::Vec;
35use core::hash::{Hash, Hasher};
36use std::hash::DefaultHasher;
37
38#[cfg(not(feature = "std"))]
39#[allow(unused_imports)]
40use num_traits::Float;
41
42use crate::error::{RcfError, RcfResult};
43
44/// Minimum precision bit count — 16 registers.
45pub const MIN_PRECISION: u8 = 4;
46/// Maximum precision bit count — 65 536 registers.
47pub const MAX_PRECISION: u8 = 16;
48/// Default precision `p = 12` — 4 096 registers, ≈ 1.625 % std
49/// error, ~4 KiB memory.
50pub const DEFAULT_PRECISION: u8 = 12;
51
52/// Probabilistic distinct-count sketch.
53///
54/// # Examples
55///
56/// ```
57/// use anomstream_core::HyperLogLog;
58///
59/// let mut hll = HyperLogLog::with_default_precision();
60/// for ip in 0..10_000_u32 {
61///     hll.add(&ip.to_le_bytes());
62/// }
63/// let est = hll.estimate();
64/// let err = (est as i64 - 10_000).unsigned_abs() as f64 / 10_000.0;
65/// assert!(err < 0.05); // 3× the theoretical 1.625 % — conservative
66/// ```
67#[derive(Clone, Debug)]
68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69#[cfg_attr(feature = "serde", serde(try_from = "HyperLogLogShadow"))]
70pub struct HyperLogLog {
71    /// Precision — register count is `2^precision`.
72    precision: u8,
73    /// Per-register max-leading-zero count. `len == 2^precision`.
74    registers: Vec<u8>,
75    /// Total values offered to [`Self::add`] — ops signal, not
76    /// used by the estimator.
77    total_added: u64,
78}
79
80/// Over-the-wire [`HyperLogLog`] layout — mirrors the public type
81/// field-for-field. Deserialization lands here first so
82/// [`TryFrom`] can re-run the constructor's invariant checks
83/// (`precision ∈ [MIN, MAX]`, register bank length `== 2^precision`)
84/// before a live sketch is handed out.
85#[cfg(feature = "serde")]
86#[derive(serde::Serialize, serde::Deserialize)]
87#[allow(clippy::missing_docs_in_private_items)]
88struct HyperLogLogShadow {
89    precision: u8,
90    registers: Vec<u8>,
91    total_added: u64,
92}
93
94#[cfg(feature = "serde")]
95impl TryFrom<HyperLogLogShadow> for HyperLogLog {
96    type Error = RcfError;
97
98    fn try_from(raw: HyperLogLogShadow) -> Result<Self, Self::Error> {
99        if !(MIN_PRECISION..=MAX_PRECISION).contains(&raw.precision) {
100            return Err(RcfError::InvalidConfig(
101                alloc::format!(
102                    "HyperLogLog: precision {} out of [{MIN_PRECISION}, {MAX_PRECISION}]",
103                    raw.precision
104                )
105                .into(),
106            ));
107        }
108        let expected = 1_usize << raw.precision;
109        if raw.registers.len() != expected {
110            return Err(RcfError::InvalidConfig(
111                alloc::format!(
112                    "HyperLogLog: register bank length {} != expected {expected} for precision {}",
113                    raw.registers.len(),
114                    raw.precision
115                )
116                .into(),
117            ));
118        }
119        Ok(Self {
120            precision: raw.precision,
121            registers: raw.registers,
122            total_added: raw.total_added,
123        })
124    }
125}
126
127impl HyperLogLog {
128    /// Build a sketch with caller-chosen precision.
129    ///
130    /// # Errors
131    ///
132    /// Returns [`RcfError::InvalidConfig`] when `precision` is
133    /// outside `[MIN_PRECISION, MAX_PRECISION]`.
134    pub fn new(precision: u8) -> RcfResult<Self> {
135        if !(MIN_PRECISION..=MAX_PRECISION).contains(&precision) {
136            return Err(RcfError::InvalidConfig(
137                alloc::format!(
138                    "HyperLogLog: precision {precision} out of [{MIN_PRECISION}, {MAX_PRECISION}]"
139                )
140                .into(),
141            ));
142        }
143        let m = 1_usize << precision;
144        Ok(Self {
145            precision,
146            registers: vec![0_u8; m],
147            total_added: 0,
148        })
149    }
150
151    /// Default sketch — `p = 12`, 4 096 registers, ≈ 1.625 % std
152    /// error, ~4 KiB memory.
153    ///
154    /// # Panics
155    ///
156    /// Never in practice — [`DEFAULT_PRECISION`] is a compile-time
157    /// constant validated against the `[MIN, MAX]` range.
158    #[must_use]
159    pub fn with_default_precision() -> Self {
160        Self::new(DEFAULT_PRECISION).expect("DEFAULT_PRECISION is in range")
161    }
162
163    /// Register count `m = 2^p`.
164    #[must_use]
165    pub fn register_count(&self) -> usize {
166        1_usize << self.precision
167    }
168
169    /// Precision bit count.
170    #[must_use]
171    pub fn precision(&self) -> u8 {
172        self.precision
173    }
174
175    /// Total values passed through [`Self::add`].
176    #[must_use]
177    pub fn total_added(&self) -> u64 {
178        self.total_added
179    }
180
181    /// Memory footprint in bytes (register bank only).
182    #[must_use]
183    pub fn memory_bytes(&self) -> usize {
184        self.registers.len()
185    }
186
187    /// Ingest a `Hash`-able value. Pre-hashes through
188    /// [`DefaultHasher`] (`SipHash`) so per-key distribution is
189    /// uniform irrespective of the user type's own `Hash` impl
190    /// quality.
191    pub fn add<T: Hash + ?Sized>(&mut self, value: &T) {
192        let mut h = DefaultHasher::new();
193        value.hash(&mut h);
194        self.add_hash(h.finish());
195    }
196
197    /// Ingest a raw byte key. Cheaper when the caller already has
198    /// a fixed-size fingerprint (e.g. `[u8; 16]` IP, flow-hash
199    /// tuple) — skips the generic `Hash` dispatch.
200    #[inline]
201    pub fn add_bytes(&mut self, key: &[u8]) {
202        let mut h = DefaultHasher::new();
203        key.hash(&mut h);
204        self.add_hash(h.finish());
205    }
206
207    /// Ingest a caller-supplied 64-bit hash. Escape hatch for
208    /// callers with a stronger hasher (e.g. xxhash, siphash with
209    /// a keyed seed) — the sketch's accuracy depends on
210    /// `hash % 2^p` being uniform.
211    #[allow(clippy::cast_possible_truncation)]
212    #[inline]
213    pub fn add_hash(&mut self, hash: u64) {
214        self.total_added = self.total_added.saturating_add(1);
215        let p = self.precision;
216        // `hash >> (64 - p)` yields a value in `[0, 2^p)`; the
217        // `as usize` cast is infallible on 32-bit+ targets since
218        // `p ≤ 16` bounds the result to ≤ 65 535.
219        let idx = (hash >> (64 - p)) as usize;
220        // Retained bits after the index — shifted so `leading_zeros`
221        // counts within the full 64-bit lane. Result fits `u8`
222        // because `leading_zeros` is in `[0, 64]` and `p ≥ 4`.
223        let tail = hash << p;
224        let rho = if tail == 0 {
225            64 - p + 1
226        } else {
227            (tail.leading_zeros() as u8) + 1
228        };
229        let slot = &mut self.registers[idx];
230        if rho > *slot {
231            *slot = rho;
232        }
233    }
234
235    /// Cardinality estimate — number of distinct values ingested.
236    #[must_use]
237    #[allow(
238        clippy::cast_precision_loss,
239        clippy::cast_possible_truncation,
240        clippy::cast_sign_loss
241    )]
242    pub fn estimate(&self) -> u64 {
243        let m = self.register_count();
244        let m_f = m as f64;
245
246        // Harmonic mean of 2^(-register).
247        let mut sum = 0.0_f64;
248        let mut zeros: usize = 0;
249        for &r in &self.registers {
250            if r == 0 {
251                zeros += 1;
252            }
253            sum += 2.0_f64.powi(-(i32::from(r)));
254        }
255
256        let alpha = alpha_m(m);
257        let raw = alpha * m_f * m_f / sum;
258
259        // Small-range correction — switch to linear counting
260        // when many registers still hold the initial zero.
261        if raw <= 2.5 * m_f && zeros > 0 {
262            let v = zeros as f64;
263            return (m_f * (m_f / v).ln()).round().max(0.0) as u64;
264        }
265
266        // No large-range correction needed for 64-bit hashes —
267        // the usual `2^32` ceiling only matters for 32-bit output.
268        raw.round().max(0.0) as u64
269    }
270
271    /// Fold `other` into `self` by taking a per-register maximum.
272    /// Two sketches must share the same precision — HLL merge is
273    /// the whole reason the sketch is decomposable across shards
274    /// / time windows.
275    ///
276    /// # Errors
277    ///
278    /// Returns [`RcfError::InvalidConfig`] when the two sketches
279    /// disagree on `precision`.
280    pub fn merge(&mut self, other: &Self) -> RcfResult<()> {
281        if self.precision != other.precision {
282            return Err(RcfError::InvalidConfig(
283                alloc::format!(
284                    "HyperLogLog::merge: precision mismatch ({} vs {})",
285                    self.precision,
286                    other.precision
287                )
288                .into(),
289            ));
290        }
291        for (slot, other_r) in self.registers.iter_mut().zip(other.registers.iter()) {
292            if *other_r > *slot {
293                *slot = *other_r;
294            }
295        }
296        self.total_added = self.total_added.saturating_add(other.total_added);
297        Ok(())
298    }
299
300    /// Zero every register. Allocation is preserved.
301    pub fn reset(&mut self) {
302        self.registers.iter_mut().for_each(|r| *r = 0);
303        self.total_added = 0;
304    }
305}
306
307/// Bias correction coefficient `α_m` (Flajolet 2007, Figure 3).
308#[must_use]
309#[allow(clippy::cast_precision_loss)]
310fn alpha_m(m: usize) -> f64 {
311    match m {
312        16 => 0.673,
313        32 => 0.697,
314        64 => 0.709,
315        _ => 0.7213 / (1.0 + 1.079 / m as f64),
316    }
317}
318
319#[cfg(test)]
320#[allow(
321    clippy::cast_precision_loss,
322    clippy::cast_possible_truncation,
323    clippy::cast_sign_loss,
324    clippy::cast_possible_wrap,
325    clippy::items_after_statements,
326    clippy::manual_range_contains
327)]
328mod tests {
329    use super::*;
330
331    #[test]
332    fn new_rejects_out_of_range_precision() {
333        assert!(HyperLogLog::new(3).is_err());
334        assert!(HyperLogLog::new(17).is_err());
335        assert!(HyperLogLog::new(MIN_PRECISION).is_ok());
336        assert!(HyperLogLog::new(MAX_PRECISION).is_ok());
337    }
338
339    #[test]
340    fn empty_sketch_estimates_zero() {
341        let hll = HyperLogLog::with_default_precision();
342        assert_eq!(hll.estimate(), 0);
343    }
344
345    #[test]
346    fn exact_cardinality_at_tiny_scale() {
347        // Linear-counting regime: small cardinality should land
348        // very close to the truth (no variance floor yet).
349        let mut hll = HyperLogLog::with_default_precision();
350        for i in 0..10_u32 {
351            hll.add(&i.to_le_bytes());
352        }
353        let est = hll.estimate();
354        assert!(est >= 9 && est <= 11, "est {est}");
355    }
356
357    #[test]
358    fn estimates_within_5pct_at_10k_distinct() {
359        let mut hll = HyperLogLog::with_default_precision();
360        for i in 0..10_000_u32 {
361            hll.add(&i.to_le_bytes());
362        }
363        let est = hll.estimate();
364        let err = (est as i64 - 10_000).unsigned_abs() as f64 / 10_000.0;
365        assert!(err < 0.05, "err {err:.4}, est {est}");
366    }
367
368    #[test]
369    fn estimates_within_3pct_at_100k_distinct_p14() {
370        let mut hll = HyperLogLog::new(14).expect("p=14 in range");
371        for i in 0..100_000_u32 {
372            hll.add(&i.to_le_bytes());
373        }
374        let est = hll.estimate();
375        let err = (est as i64 - 100_000).unsigned_abs() as f64 / 100_000.0;
376        assert!(err < 0.03, "err {err:.4}, est {est}");
377    }
378
379    #[test]
380    fn duplicate_inserts_do_not_inflate_estimate() {
381        let mut hll = HyperLogLog::with_default_precision();
382        for _ in 0..1_000 {
383            for i in 0..100_u32 {
384                hll.add(&i.to_le_bytes());
385            }
386        }
387        let est = hll.estimate();
388        // Ingested 100k times but only 100 distinct values.
389        assert!(est >= 90 && est <= 110, "est {est}");
390        assert_eq!(hll.total_added(), 100_000);
391    }
392
393    #[test]
394    fn merge_agrees_with_single_sketch() {
395        let mut a = HyperLogLog::with_default_precision();
396        let mut b = HyperLogLog::with_default_precision();
397        let mut full = HyperLogLog::with_default_precision();
398        for i in 0..5_000_u32 {
399            a.add(&i.to_le_bytes());
400            full.add(&i.to_le_bytes());
401        }
402        for i in 5_000..10_000_u32 {
403            b.add(&i.to_le_bytes());
404            full.add(&i.to_le_bytes());
405        }
406        a.merge(&b).expect("same precision");
407        // Merged estimate should hit within a register-noise
408        // window of the single-sketch ground truth.
409        let merged_est = a.estimate();
410        let full_est = full.estimate();
411        let delta = (merged_est as i64 - full_est as i64).unsigned_abs();
412        assert!(
413            delta < 200,
414            "delta {delta}, merged {merged_est}, full {full_est}"
415        );
416    }
417
418    #[test]
419    fn merge_rejects_precision_mismatch() {
420        let mut a = HyperLogLog::new(10).unwrap();
421        let b = HyperLogLog::new(12).unwrap();
422        assert!(a.merge(&b).is_err());
423    }
424
425    #[test]
426    fn reset_clears_all_registers_and_counter() {
427        let mut hll = HyperLogLog::with_default_precision();
428        for i in 0..1_000_u32 {
429            hll.add(&i.to_le_bytes());
430        }
431        assert!(hll.estimate() > 0);
432        hll.reset();
433        assert_eq!(hll.estimate(), 0);
434        assert_eq!(hll.total_added(), 0);
435    }
436
437    #[test]
438    fn add_hash_path_matches_add_bytes_path() {
439        let mut a = HyperLogLog::with_default_precision();
440        let mut b = HyperLogLog::with_default_precision();
441        for i in 0..1_000_u32 {
442            let bytes = i.to_le_bytes();
443            a.add_bytes(&bytes);
444            // Mirror the same hash DefaultHasher produces.
445            use core::hash::Hash;
446            use std::hash::DefaultHasher;
447            let mut h = DefaultHasher::new();
448            bytes.hash(&mut h);
449            b.add_hash(h.finish());
450        }
451        // Bit-exact agreement: same hash → same register update.
452        assert_eq!(a.estimate(), b.estimate());
453    }
454
455    #[test]
456    fn memory_bytes_matches_register_count() {
457        let hll = HyperLogLog::new(12).unwrap();
458        assert_eq!(hll.memory_bytes(), 4096);
459        let hll16 = HyperLogLog::new(16).unwrap();
460        assert_eq!(hll16.memory_bytes(), 65_536);
461    }
462
463    #[cfg(all(feature = "serde", feature = "postcard"))]
464    #[test]
465    fn postcard_roundtrip_preserves_estimate() {
466        let mut hll = HyperLogLog::with_default_precision();
467        for i in 0..5_000_u32 {
468            hll.add(&i.to_le_bytes());
469        }
470        let before = hll.estimate();
471        let bytes = postcard::to_allocvec(&hll).expect("serde ok");
472        let back: HyperLogLog = postcard::from_bytes(&bytes).expect("serde ok");
473        assert_eq!(back.estimate(), before);
474        assert_eq!(back.total_added(), hll.total_added());
475    }
476
477    #[cfg(all(feature = "serde", feature = "postcard"))]
478    #[test]
479    fn deserialize_rejects_out_of_range_precision() {
480        let bad = HyperLogLogShadow {
481            precision: MAX_PRECISION + 1,
482            registers: alloc::vec![0_u8; 1 << MAX_PRECISION],
483            total_added: 0,
484        };
485        let bytes = postcard::to_allocvec(&bad).unwrap();
486        let back: Result<HyperLogLog, _> = postcard::from_bytes(&bytes);
487        assert!(back.is_err());
488    }
489
490    #[cfg(all(feature = "serde", feature = "postcard"))]
491    #[test]
492    fn deserialize_rejects_register_length_mismatch() {
493        let bad = HyperLogLogShadow {
494            precision: DEFAULT_PRECISION,
495            registers: alloc::vec![0_u8; 10], // should be 4096.
496            total_added: 0,
497        };
498        let bytes = postcard::to_allocvec(&bad).unwrap();
499        let back: Result<HyperLogLog, _> = postcard::from_bytes(&bytes);
500        assert!(back.is_err());
501    }
502}