Skip to main content

oxirs_vec/
approximate_counter.rs

1//! Approximate cardinality counting using the HyperLogLog algorithm.
2//!
3//! HyperLogLog (HLL) is a probabilistic data structure for estimating the
4//! cardinality of large multisets using sub-linear memory.  This implementation
5//! follows the original Flajolet et al. 2007 paper with small/large range
6//! corrections and 64-bit FNV-1a hashing.
7
8use std::collections::HashMap;
9
10// ---------------------------------------------------------------------------
11// Error type
12// ---------------------------------------------------------------------------
13
14/// Errors returned by HyperLogLog and cardinality-estimator operations.
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum CounterError {
17    /// Two HLL sketches have different register counts and cannot be merged.
18    PrecisionMismatch,
19    /// No sketch exists for the given stream name.
20    StreamNotFound(String),
21}
22
23impl std::fmt::Display for CounterError {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        match self {
26            CounterError::PrecisionMismatch => write!(f, "HyperLogLog precision mismatch"),
27            CounterError::StreamNotFound(s) => write!(f, "Stream not found: {}", s),
28        }
29    }
30}
31
32impl std::error::Error for CounterError {}
33
34// ---------------------------------------------------------------------------
35// FNV-1a 64-bit hash
36// ---------------------------------------------------------------------------
37
38const FNV_OFFSET_BASIS_64: u64 = 14_695_981_039_346_656_037;
39const FNV_PRIME_64: u64 = 1_099_511_628_211;
40
41fn fnv1a_64(data: &[u8]) -> u64 {
42    let mut hash = FNV_OFFSET_BASIS_64;
43    for &byte in data {
44        hash ^= byte as u64;
45        hash = hash.wrapping_mul(FNV_PRIME_64);
46    }
47    // Apply a Murmur3-style 64-bit finalizer for better bit avalanche.
48    // Without this, FNV-1a has weak diffusion across the upper bits.
49    hash ^= hash >> 33;
50    hash = hash.wrapping_mul(0xff51afd7ed558ccd);
51    hash ^= hash >> 33;
52    hash = hash.wrapping_mul(0xc4ceb9fe1a85ec53);
53    hash ^= hash >> 33;
54    hash
55}
56
57// ---------------------------------------------------------------------------
58// HyperLogLog
59// ---------------------------------------------------------------------------
60
61/// A single HyperLogLog sketch for estimating cardinality.
62///
63/// # Parameters
64/// * `precision` (b) — controls the number of registers m = 2^b.  Precision
65///   must be in the range [4, 16].  Higher precision → less error but more
66///   memory.  Typical error rate: ~1.04 / sqrt(m).
67#[derive(Debug, Clone)]
68pub struct HyperLogLog {
69    /// m = 2^precision registers, each storing the maximum leading-zero count.
70    pub registers: Vec<u8>,
71    /// Number of registers (m = 2^precision).
72    pub m: usize,
73    /// Bias-correction constant α_m.
74    pub alpha: f64,
75    /// Precision parameter b where m = 2^b.
76    precision: u8,
77}
78
79impl HyperLogLog {
80    /// Create a new HyperLogLog sketch.
81    ///
82    /// # Panics
83    /// Panics if `precision < 4` or `precision > 16`.
84    pub fn new(precision: u8) -> Self {
85        assert!(
86            (4..=16).contains(&precision),
87            "HyperLogLog precision must be in [4, 16]"
88        );
89        let m = 1usize << precision;
90        let alpha = compute_alpha(m);
91        Self {
92            registers: vec![0u8; m],
93            m,
94            alpha,
95            precision,
96        }
97    }
98
99    /// Add an item to the sketch.
100    pub fn add(&mut self, item: &str) {
101        let hash = fnv1a_64(item.as_bytes());
102        // Use the top `precision` bits as the register index.
103        let idx = (hash >> (64 - self.precision)) as usize;
104        // Shift the hash left by `precision` bits to get the "w" bits (the
105        // lower 64-precision bits, shifted to the most-significant positions).
106        // Count the number of leading zeros in those bits, then add 1.
107        // If all w bits are zero (hash << precision == 0) leading_zeros returns
108        // 64; clamp to 64-precision to avoid overflows in very small sketches.
109        let w = hash << self.precision;
110        let rho = if w == 0 {
111            (64u32 - self.precision as u32) as u8 + 1
112        } else {
113            w.leading_zeros() as u8 + 1
114        };
115        if rho > self.registers[idx] {
116            self.registers[idx] = rho;
117        }
118    }
119
120    /// Estimate the number of distinct items added.
121    pub fn count(&self) -> u64 {
122        let m = self.m as f64;
123
124        // Harmonic mean of 2^(-M[j])
125        let sum: f64 = self
126            .registers
127            .iter()
128            .map(|&r| 2.0_f64.powi(-(r as i32)))
129            .sum();
130
131        let raw_estimate = self.alpha * m * m / sum;
132
133        // Small-range correction
134        if raw_estimate <= 2.5 * m {
135            let zeros = self.registers.iter().filter(|&&r| r == 0).count() as f64;
136            if zeros > 0.0 {
137                return (m * (m / zeros).ln()).round() as u64;
138            }
139        }
140
141        // Large-range correction (2^32)
142        if raw_estimate > (1u64 << 32) as f64 / 30.0 {
143            let correction = -(2.0_f64.powi(32)) * (1.0 - raw_estimate / 2.0_f64.powi(32)).ln();
144            return correction.round() as u64;
145        }
146
147        raw_estimate.round() as u64
148    }
149
150    /// Merge another HLL sketch into this one by taking the per-register max.
151    ///
152    /// Both sketches must have the same precision.
153    pub fn merge(&mut self, other: &HyperLogLog) -> Result<(), CounterError> {
154        if self.m != other.m {
155            return Err(CounterError::PrecisionMismatch);
156        }
157        for (a, &b) in self.registers.iter_mut().zip(other.registers.iter()) {
158            if b > *a {
159                *a = b;
160            }
161        }
162        Ok(())
163    }
164
165    /// Reset all registers to zero.
166    pub fn clear(&mut self) {
167        for r in &mut self.registers {
168            *r = 0;
169        }
170    }
171
172    /// Return the number of registers in this sketch.
173    pub fn register_count(&self) -> usize {
174        self.m
175    }
176}
177
178/// Compute the bias-correction constant α_m.
179fn compute_alpha(m: usize) -> f64 {
180    match m {
181        16 => 0.673,
182        32 => 0.697,
183        64 => 0.709,
184        _ => 0.7213 / (1.0 + 1.079 / m as f64),
185    }
186}
187
188// ---------------------------------------------------------------------------
189// CardinalityEstimator
190// ---------------------------------------------------------------------------
191
192/// Multi-stream cardinality estimator backed by one HLL sketch per stream.
193pub struct CardinalityEstimator {
194    streams: HashMap<String, HyperLogLog>,
195    precision: u8,
196}
197
198impl CardinalityEstimator {
199    /// Create a new estimator; all per-stream sketches use `precision`.
200    pub fn new(precision: u8) -> Self {
201        Self {
202            streams: HashMap::new(),
203            precision,
204        }
205    }
206
207    /// Add `item` to the sketch for `stream` (creates the stream if absent).
208    pub fn add(&mut self, stream: &str, item: &str) {
209        self.streams
210            .entry(stream.to_string())
211            .or_insert_with(|| HyperLogLog::new(self.precision))
212            .add(item);
213    }
214
215    /// Estimate the cardinality of a single `stream`.
216    ///
217    /// Returns 0 if the stream has never been seen.
218    pub fn estimate(&self, stream: &str) -> u64 {
219        self.streams.get(stream).map(|h| h.count()).unwrap_or(0)
220    }
221
222    /// Estimate the cardinality of the union of several `streams` by merging
223    /// their sketches.
224    ///
225    /// Returns 0 when none of the named streams exist.
226    pub fn union_estimate(&self, streams: &[&str]) -> u64 {
227        let mut merged: Option<HyperLogLog> = None;
228        for &name in streams {
229            if let Some(hll) = self.streams.get(name) {
230                match &mut merged {
231                    None => merged = Some(hll.clone()),
232                    Some(m) => {
233                        // Ignore precision mismatch — shouldn't happen since
234                        // all streams use the same precision.
235                        let _ = m.merge(hll);
236                    }
237                }
238            }
239        }
240        merged.map(|h| h.count()).unwrap_or(0)
241    }
242
243    /// Return the number of tracked streams.
244    pub fn stream_count(&self) -> usize {
245        self.streams.len()
246    }
247}
248
249// ---------------------------------------------------------------------------
250// Tests
251// ---------------------------------------------------------------------------
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256
257    // HLL tolerance — HyperLogLog is approximate.
258    const TOLERANCE: f64 = 0.30; // 30% is generous but safe
259
260    fn within_tolerance(estimate: u64, expected: u64, tol: f64) -> bool {
261        if expected == 0 {
262            return estimate == 0;
263        }
264        let ratio = estimate as f64 / expected as f64;
265        ratio >= (1.0 - tol) && ratio <= (1.0 + tol)
266    }
267
268    // -----------------------------------------------------------------------
269    // Basic construction
270    // -----------------------------------------------------------------------
271
272    #[test]
273    fn test_new_precision_4() {
274        let hll = HyperLogLog::new(4);
275        assert_eq!(hll.m, 16);
276        assert_eq!(hll.registers.len(), 16);
277    }
278
279    #[test]
280    fn test_new_precision_8() {
281        let hll = HyperLogLog::new(8);
282        assert_eq!(hll.m, 256);
283    }
284
285    #[test]
286    fn test_new_precision_16() {
287        let hll = HyperLogLog::new(16);
288        assert_eq!(hll.m, 65536);
289    }
290
291    #[test]
292    fn test_register_count() {
293        let hll = HyperLogLog::new(6);
294        assert_eq!(hll.register_count(), 64);
295    }
296
297    // -----------------------------------------------------------------------
298    // Empty cardinality
299    // -----------------------------------------------------------------------
300
301    #[test]
302    fn test_empty_count_is_zero() {
303        let hll = HyperLogLog::new(8);
304        assert_eq!(hll.count(), 0);
305    }
306
307    // -----------------------------------------------------------------------
308    // Single-item cardinality
309    // -----------------------------------------------------------------------
310
311    #[test]
312    fn test_single_item_count() {
313        let mut hll = HyperLogLog::new(10);
314        hll.add("hello");
315        let c = hll.count();
316        assert!(c >= 1, "Expected at least 1, got {}", c);
317    }
318
319    // -----------------------------------------------------------------------
320    // Monotone increase
321    // -----------------------------------------------------------------------
322
323    #[test]
324    fn test_count_monotone_increases() {
325        let mut hll = HyperLogLog::new(10);
326        let mut prev = 0u64;
327        for i in 0..100 {
328            hll.add(&format!("item_{}", i));
329            let curr = hll.count();
330            assert!(
331                curr >= prev,
332                "Count decreased from {} to {} at step {}",
333                prev,
334                curr,
335                i
336            );
337            prev = curr;
338        }
339    }
340
341    // -----------------------------------------------------------------------
342    // Cardinality accuracy
343    // -----------------------------------------------------------------------
344
345    #[test]
346    fn test_cardinality_100_items() {
347        let mut hll = HyperLogLog::new(10);
348        for i in 0..100u64 {
349            hll.add(&format!("item_{}", i));
350        }
351        let est = hll.count();
352        assert!(
353            within_tolerance(est, 100, TOLERANCE),
354            "Estimate {} not within {}% of 100",
355            est,
356            (TOLERANCE * 100.0) as u32
357        );
358    }
359
360    #[test]
361    fn test_cardinality_1000_items() {
362        let mut hll = HyperLogLog::new(10);
363        for i in 0..1000u64 {
364            hll.add(&format!("element_{}", i));
365        }
366        let est = hll.count();
367        assert!(
368            within_tolerance(est, 1000, TOLERANCE),
369            "Estimate {} not within {}% of 1000",
370            est,
371            (TOLERANCE * 100.0) as u32
372        );
373    }
374
375    #[test]
376    fn test_duplicate_items_not_counted() {
377        let mut hll = HyperLogLog::new(10);
378        for _ in 0..100 {
379            hll.add("same_item");
380        }
381        let est = hll.count();
382        // Should be close to 1, not 100
383        assert!(est < 10, "Duplicates inflated count to {}", est);
384    }
385
386    // -----------------------------------------------------------------------
387    // Clear
388    // -----------------------------------------------------------------------
389
390    #[test]
391    fn test_clear_resets_to_zero() {
392        let mut hll = HyperLogLog::new(8);
393        for i in 0..50 {
394            hll.add(&format!("x{}", i));
395        }
396        hll.clear();
397        assert_eq!(hll.count(), 0);
398    }
399
400    #[test]
401    fn test_clear_then_reuse() {
402        let mut hll = HyperLogLog::new(8);
403        for i in 0..50 {
404            hll.add(&format!("x{}", i));
405        }
406        hll.clear();
407        hll.add("hello");
408        let c = hll.count();
409        assert!(c >= 1);
410    }
411
412    // -----------------------------------------------------------------------
413    // Merge
414    // -----------------------------------------------------------------------
415
416    #[test]
417    fn test_merge_compatible() {
418        let mut hll1 = HyperLogLog::new(10);
419        let mut hll2 = HyperLogLog::new(10);
420        for i in 0..100 {
421            hll1.add(&format!("a{}", i));
422        }
423        for i in 0..100 {
424            hll2.add(&format!("b{}", i));
425        }
426        let result = hll1.merge(&hll2);
427        assert!(result.is_ok());
428        let est = hll1.count();
429        // Should be roughly 200 unique items
430        assert!(
431            within_tolerance(est, 200, TOLERANCE),
432            "Merged estimate {} not within tolerance of 200",
433            est
434        );
435    }
436
437    #[test]
438    fn test_merge_overlapping() {
439        let mut hll1 = HyperLogLog::new(10);
440        let mut hll2 = HyperLogLog::new(10);
441        for i in 0..100 {
442            hll1.add(&format!("item{}", i));
443            hll2.add(&format!("item{}", i));
444        }
445        let _ = hll1.merge(&hll2);
446        let est = hll1.count();
447        // Same items — union cardinality ≈ 100
448        assert!(
449            within_tolerance(est, 100, TOLERANCE),
450            "Overlapping merge estimate {} not within tolerance of 100",
451            est
452        );
453    }
454
455    #[test]
456    fn test_merge_precision_mismatch_error() {
457        let mut hll1 = HyperLogLog::new(8);
458        let hll2 = HyperLogLog::new(10);
459        let result = hll1.merge(&hll2);
460        assert_eq!(result, Err(CounterError::PrecisionMismatch));
461    }
462
463    // -----------------------------------------------------------------------
464    // Precision variants
465    // -----------------------------------------------------------------------
466
467    #[test]
468    fn test_precision_5_accuracy() {
469        let mut hll = HyperLogLog::new(5);
470        for i in 0..50 {
471            hll.add(&format!("p5_{}", i));
472        }
473        let est = hll.count();
474        assert!(within_tolerance(est, 50, TOLERANCE));
475    }
476
477    #[test]
478    fn test_precision_12_accuracy() {
479        let mut hll = HyperLogLog::new(12);
480        for i in 0..500 {
481            hll.add(&format!("p12_{}", i));
482        }
483        let est = hll.count();
484        assert!(within_tolerance(est, 500, TOLERANCE));
485    }
486
487    // -----------------------------------------------------------------------
488    // CardinalityEstimator
489    // -----------------------------------------------------------------------
490
491    #[test]
492    fn test_estimator_single_stream() {
493        let mut est = CardinalityEstimator::new(10);
494        for i in 0..100 {
495            est.add("users", &format!("user_{}", i));
496        }
497        let c = est.estimate("users");
498        assert!(within_tolerance(c, 100, TOLERANCE));
499    }
500
501    #[test]
502    fn test_estimator_unknown_stream_returns_zero() {
503        let est = CardinalityEstimator::new(10);
504        assert_eq!(est.estimate("nonexistent"), 0);
505    }
506
507    #[test]
508    fn test_estimator_multiple_streams() {
509        let mut est = CardinalityEstimator::new(10);
510        for i in 0..100 {
511            est.add("stream_a", &format!("a{}", i));
512            est.add("stream_b", &format!("b{}", i));
513        }
514        assert_eq!(est.stream_count(), 2);
515        assert!(within_tolerance(est.estimate("stream_a"), 100, TOLERANCE));
516        assert!(within_tolerance(est.estimate("stream_b"), 100, TOLERANCE));
517    }
518
519    #[test]
520    fn test_estimator_stream_count() {
521        let mut est = CardinalityEstimator::new(8);
522        assert_eq!(est.stream_count(), 0);
523        est.add("x", "item1");
524        assert_eq!(est.stream_count(), 1);
525        est.add("y", "item2");
526        assert_eq!(est.stream_count(), 2);
527        est.add("x", "item3"); // same stream
528        assert_eq!(est.stream_count(), 2);
529    }
530
531    #[test]
532    fn test_estimator_union_estimate_two_streams() {
533        let mut est = CardinalityEstimator::new(10);
534        for i in 0..100 {
535            est.add("a", &format!("a{}", i));
536        }
537        for i in 0..100 {
538            est.add("b", &format!("b{}", i));
539        }
540        let union = est.union_estimate(&["a", "b"]);
541        // ~200 unique items
542        assert!(within_tolerance(union, 200, TOLERANCE));
543    }
544
545    #[test]
546    fn test_estimator_union_estimate_nonexistent_streams() {
547        let est = CardinalityEstimator::new(10);
548        assert_eq!(est.union_estimate(&["no1", "no2"]), 0);
549    }
550
551    #[test]
552    fn test_estimator_union_estimate_single_stream() {
553        let mut est = CardinalityEstimator::new(10);
554        for i in 0..50 {
555            est.add("only", &format!("i{}", i));
556        }
557        let union = est.union_estimate(&["only"]);
558        assert!(within_tolerance(union, 50, TOLERANCE));
559    }
560
561    #[test]
562    fn test_estimator_union_mixed_existing_and_missing() {
563        let mut est = CardinalityEstimator::new(10);
564        for i in 0..80 {
565            est.add("real", &format!("r{}", i));
566        }
567        let union = est.union_estimate(&["real", "missing"]);
568        assert!(within_tolerance(union, 80, TOLERANCE));
569    }
570
571    #[test]
572    fn test_counter_error_display() {
573        let e1 = CounterError::PrecisionMismatch;
574        assert!(!e1.to_string().is_empty());
575        let e2 = CounterError::StreamNotFound("foo".to_string());
576        assert!(e2.to_string().contains("foo"));
577    }
578
579    #[test]
580    fn test_fnv1a_deterministic() {
581        assert_eq!(fnv1a_64(b"hello"), fnv1a_64(b"hello"));
582        assert_ne!(fnv1a_64(b"hello"), fnv1a_64(b"world"));
583    }
584}