1use std::collections::HashMap;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum CounterError {
17 PrecisionMismatch,
19 StreamNotFound(String),
21}
22
23impl std::fmt::Display for CounterError {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 match self {
26 CounterError::PrecisionMismatch => write!(f, "HyperLogLog precision mismatch"),
27 CounterError::StreamNotFound(s) => write!(f, "Stream not found: {}", s),
28 }
29 }
30}
31
32impl std::error::Error for CounterError {}
33
34const FNV_OFFSET_BASIS_64: u64 = 14_695_981_039_346_656_037;
39const FNV_PRIME_64: u64 = 1_099_511_628_211;
40
41fn fnv1a_64(data: &[u8]) -> u64 {
42 let mut hash = FNV_OFFSET_BASIS_64;
43 for &byte in data {
44 hash ^= byte as u64;
45 hash = hash.wrapping_mul(FNV_PRIME_64);
46 }
47 hash ^= hash >> 33;
50 hash = hash.wrapping_mul(0xff51afd7ed558ccd);
51 hash ^= hash >> 33;
52 hash = hash.wrapping_mul(0xc4ceb9fe1a85ec53);
53 hash ^= hash >> 33;
54 hash
55}
56
57#[derive(Debug, Clone)]
68pub struct HyperLogLog {
69 pub registers: Vec<u8>,
71 pub m: usize,
73 pub alpha: f64,
75 precision: u8,
77}
78
79impl HyperLogLog {
80 pub fn new(precision: u8) -> Self {
85 assert!(
86 (4..=16).contains(&precision),
87 "HyperLogLog precision must be in [4, 16]"
88 );
89 let m = 1usize << precision;
90 let alpha = compute_alpha(m);
91 Self {
92 registers: vec![0u8; m],
93 m,
94 alpha,
95 precision,
96 }
97 }
98
99 pub fn add(&mut self, item: &str) {
101 let hash = fnv1a_64(item.as_bytes());
102 let idx = (hash >> (64 - self.precision)) as usize;
104 let w = hash << self.precision;
110 let rho = if w == 0 {
111 (64u32 - self.precision as u32) as u8 + 1
112 } else {
113 w.leading_zeros() as u8 + 1
114 };
115 if rho > self.registers[idx] {
116 self.registers[idx] = rho;
117 }
118 }
119
120 pub fn count(&self) -> u64 {
122 let m = self.m as f64;
123
124 let sum: f64 = self
126 .registers
127 .iter()
128 .map(|&r| 2.0_f64.powi(-(r as i32)))
129 .sum();
130
131 let raw_estimate = self.alpha * m * m / sum;
132
133 if raw_estimate <= 2.5 * m {
135 let zeros = self.registers.iter().filter(|&&r| r == 0).count() as f64;
136 if zeros > 0.0 {
137 return (m * (m / zeros).ln()).round() as u64;
138 }
139 }
140
141 if raw_estimate > (1u64 << 32) as f64 / 30.0 {
143 let correction = -(2.0_f64.powi(32)) * (1.0 - raw_estimate / 2.0_f64.powi(32)).ln();
144 return correction.round() as u64;
145 }
146
147 raw_estimate.round() as u64
148 }
149
150 pub fn merge(&mut self, other: &HyperLogLog) -> Result<(), CounterError> {
154 if self.m != other.m {
155 return Err(CounterError::PrecisionMismatch);
156 }
157 for (a, &b) in self.registers.iter_mut().zip(other.registers.iter()) {
158 if b > *a {
159 *a = b;
160 }
161 }
162 Ok(())
163 }
164
165 pub fn clear(&mut self) {
167 for r in &mut self.registers {
168 *r = 0;
169 }
170 }
171
172 pub fn register_count(&self) -> usize {
174 self.m
175 }
176}
177
178fn compute_alpha(m: usize) -> f64 {
180 match m {
181 16 => 0.673,
182 32 => 0.697,
183 64 => 0.709,
184 _ => 0.7213 / (1.0 + 1.079 / m as f64),
185 }
186}
187
188pub struct CardinalityEstimator {
194 streams: HashMap<String, HyperLogLog>,
195 precision: u8,
196}
197
198impl CardinalityEstimator {
199 pub fn new(precision: u8) -> Self {
201 Self {
202 streams: HashMap::new(),
203 precision,
204 }
205 }
206
207 pub fn add(&mut self, stream: &str, item: &str) {
209 self.streams
210 .entry(stream.to_string())
211 .or_insert_with(|| HyperLogLog::new(self.precision))
212 .add(item);
213 }
214
215 pub fn estimate(&self, stream: &str) -> u64 {
219 self.streams.get(stream).map(|h| h.count()).unwrap_or(0)
220 }
221
222 pub fn union_estimate(&self, streams: &[&str]) -> u64 {
227 let mut merged: Option<HyperLogLog> = None;
228 for &name in streams {
229 if let Some(hll) = self.streams.get(name) {
230 match &mut merged {
231 None => merged = Some(hll.clone()),
232 Some(m) => {
233 let _ = m.merge(hll);
236 }
237 }
238 }
239 }
240 merged.map(|h| h.count()).unwrap_or(0)
241 }
242
243 pub fn stream_count(&self) -> usize {
245 self.streams.len()
246 }
247}
248
249#[cfg(test)]
254mod tests {
255 use super::*;
256
257 const TOLERANCE: f64 = 0.30; fn within_tolerance(estimate: u64, expected: u64, tol: f64) -> bool {
261 if expected == 0 {
262 return estimate == 0;
263 }
264 let ratio = estimate as f64 / expected as f64;
265 ratio >= (1.0 - tol) && ratio <= (1.0 + tol)
266 }
267
268 #[test]
273 fn test_new_precision_4() {
274 let hll = HyperLogLog::new(4);
275 assert_eq!(hll.m, 16);
276 assert_eq!(hll.registers.len(), 16);
277 }
278
279 #[test]
280 fn test_new_precision_8() {
281 let hll = HyperLogLog::new(8);
282 assert_eq!(hll.m, 256);
283 }
284
285 #[test]
286 fn test_new_precision_16() {
287 let hll = HyperLogLog::new(16);
288 assert_eq!(hll.m, 65536);
289 }
290
291 #[test]
292 fn test_register_count() {
293 let hll = HyperLogLog::new(6);
294 assert_eq!(hll.register_count(), 64);
295 }
296
297 #[test]
302 fn test_empty_count_is_zero() {
303 let hll = HyperLogLog::new(8);
304 assert_eq!(hll.count(), 0);
305 }
306
307 #[test]
312 fn test_single_item_count() {
313 let mut hll = HyperLogLog::new(10);
314 hll.add("hello");
315 let c = hll.count();
316 assert!(c >= 1, "Expected at least 1, got {}", c);
317 }
318
319 #[test]
324 fn test_count_monotone_increases() {
325 let mut hll = HyperLogLog::new(10);
326 let mut prev = 0u64;
327 for i in 0..100 {
328 hll.add(&format!("item_{}", i));
329 let curr = hll.count();
330 assert!(
331 curr >= prev,
332 "Count decreased from {} to {} at step {}",
333 prev,
334 curr,
335 i
336 );
337 prev = curr;
338 }
339 }
340
341 #[test]
346 fn test_cardinality_100_items() {
347 let mut hll = HyperLogLog::new(10);
348 for i in 0..100u64 {
349 hll.add(&format!("item_{}", i));
350 }
351 let est = hll.count();
352 assert!(
353 within_tolerance(est, 100, TOLERANCE),
354 "Estimate {} not within {}% of 100",
355 est,
356 (TOLERANCE * 100.0) as u32
357 );
358 }
359
360 #[test]
361 fn test_cardinality_1000_items() {
362 let mut hll = HyperLogLog::new(10);
363 for i in 0..1000u64 {
364 hll.add(&format!("element_{}", i));
365 }
366 let est = hll.count();
367 assert!(
368 within_tolerance(est, 1000, TOLERANCE),
369 "Estimate {} not within {}% of 1000",
370 est,
371 (TOLERANCE * 100.0) as u32
372 );
373 }
374
375 #[test]
376 fn test_duplicate_items_not_counted() {
377 let mut hll = HyperLogLog::new(10);
378 for _ in 0..100 {
379 hll.add("same_item");
380 }
381 let est = hll.count();
382 assert!(est < 10, "Duplicates inflated count to {}", est);
384 }
385
386 #[test]
391 fn test_clear_resets_to_zero() {
392 let mut hll = HyperLogLog::new(8);
393 for i in 0..50 {
394 hll.add(&format!("x{}", i));
395 }
396 hll.clear();
397 assert_eq!(hll.count(), 0);
398 }
399
400 #[test]
401 fn test_clear_then_reuse() {
402 let mut hll = HyperLogLog::new(8);
403 for i in 0..50 {
404 hll.add(&format!("x{}", i));
405 }
406 hll.clear();
407 hll.add("hello");
408 let c = hll.count();
409 assert!(c >= 1);
410 }
411
412 #[test]
417 fn test_merge_compatible() {
418 let mut hll1 = HyperLogLog::new(10);
419 let mut hll2 = HyperLogLog::new(10);
420 for i in 0..100 {
421 hll1.add(&format!("a{}", i));
422 }
423 for i in 0..100 {
424 hll2.add(&format!("b{}", i));
425 }
426 let result = hll1.merge(&hll2);
427 assert!(result.is_ok());
428 let est = hll1.count();
429 assert!(
431 within_tolerance(est, 200, TOLERANCE),
432 "Merged estimate {} not within tolerance of 200",
433 est
434 );
435 }
436
437 #[test]
438 fn test_merge_overlapping() {
439 let mut hll1 = HyperLogLog::new(10);
440 let mut hll2 = HyperLogLog::new(10);
441 for i in 0..100 {
442 hll1.add(&format!("item{}", i));
443 hll2.add(&format!("item{}", i));
444 }
445 let _ = hll1.merge(&hll2);
446 let est = hll1.count();
447 assert!(
449 within_tolerance(est, 100, TOLERANCE),
450 "Overlapping merge estimate {} not within tolerance of 100",
451 est
452 );
453 }
454
455 #[test]
456 fn test_merge_precision_mismatch_error() {
457 let mut hll1 = HyperLogLog::new(8);
458 let hll2 = HyperLogLog::new(10);
459 let result = hll1.merge(&hll2);
460 assert_eq!(result, Err(CounterError::PrecisionMismatch));
461 }
462
463 #[test]
468 fn test_precision_5_accuracy() {
469 let mut hll = HyperLogLog::new(5);
470 for i in 0..50 {
471 hll.add(&format!("p5_{}", i));
472 }
473 let est = hll.count();
474 assert!(within_tolerance(est, 50, TOLERANCE));
475 }
476
477 #[test]
478 fn test_precision_12_accuracy() {
479 let mut hll = HyperLogLog::new(12);
480 for i in 0..500 {
481 hll.add(&format!("p12_{}", i));
482 }
483 let est = hll.count();
484 assert!(within_tolerance(est, 500, TOLERANCE));
485 }
486
487 #[test]
492 fn test_estimator_single_stream() {
493 let mut est = CardinalityEstimator::new(10);
494 for i in 0..100 {
495 est.add("users", &format!("user_{}", i));
496 }
497 let c = est.estimate("users");
498 assert!(within_tolerance(c, 100, TOLERANCE));
499 }
500
501 #[test]
502 fn test_estimator_unknown_stream_returns_zero() {
503 let est = CardinalityEstimator::new(10);
504 assert_eq!(est.estimate("nonexistent"), 0);
505 }
506
507 #[test]
508 fn test_estimator_multiple_streams() {
509 let mut est = CardinalityEstimator::new(10);
510 for i in 0..100 {
511 est.add("stream_a", &format!("a{}", i));
512 est.add("stream_b", &format!("b{}", i));
513 }
514 assert_eq!(est.stream_count(), 2);
515 assert!(within_tolerance(est.estimate("stream_a"), 100, TOLERANCE));
516 assert!(within_tolerance(est.estimate("stream_b"), 100, TOLERANCE));
517 }
518
519 #[test]
520 fn test_estimator_stream_count() {
521 let mut est = CardinalityEstimator::new(8);
522 assert_eq!(est.stream_count(), 0);
523 est.add("x", "item1");
524 assert_eq!(est.stream_count(), 1);
525 est.add("y", "item2");
526 assert_eq!(est.stream_count(), 2);
527 est.add("x", "item3"); assert_eq!(est.stream_count(), 2);
529 }
530
531 #[test]
532 fn test_estimator_union_estimate_two_streams() {
533 let mut est = CardinalityEstimator::new(10);
534 for i in 0..100 {
535 est.add("a", &format!("a{}", i));
536 }
537 for i in 0..100 {
538 est.add("b", &format!("b{}", i));
539 }
540 let union = est.union_estimate(&["a", "b"]);
541 assert!(within_tolerance(union, 200, TOLERANCE));
543 }
544
545 #[test]
546 fn test_estimator_union_estimate_nonexistent_streams() {
547 let est = CardinalityEstimator::new(10);
548 assert_eq!(est.union_estimate(&["no1", "no2"]), 0);
549 }
550
551 #[test]
552 fn test_estimator_union_estimate_single_stream() {
553 let mut est = CardinalityEstimator::new(10);
554 for i in 0..50 {
555 est.add("only", &format!("i{}", i));
556 }
557 let union = est.union_estimate(&["only"]);
558 assert!(within_tolerance(union, 50, TOLERANCE));
559 }
560
561 #[test]
562 fn test_estimator_union_mixed_existing_and_missing() {
563 let mut est = CardinalityEstimator::new(10);
564 for i in 0..80 {
565 est.add("real", &format!("r{}", i));
566 }
567 let union = est.union_estimate(&["real", "missing"]);
568 assert!(within_tolerance(union, 80, TOLERANCE));
569 }
570
571 #[test]
572 fn test_counter_error_display() {
573 let e1 = CounterError::PrecisionMismatch;
574 assert!(!e1.to_string().is_empty());
575 let e2 = CounterError::StreamNotFound("foo".to_string());
576 assert!(e2.to_string().contains("foo"));
577 }
578
579 #[test]
580 fn test_fnv1a_deterministic() {
581 assert_eq!(fnv1a_64(b"hello"), fnv1a_64(b"hello"));
582 assert_ne!(fnv1a_64(b"hello"), fnv1a_64(b"world"));
583 }
584}