anomstream_core/
hyperloglog.rs1use alloc::vec;
34use alloc::vec::Vec;
35use core::hash::{Hash, Hasher};
36use std::hash::DefaultHasher;
37
38#[cfg(not(feature = "std"))]
39#[allow(unused_imports)]
40use num_traits::Float;
41
42use crate::error::{RcfError, RcfResult};
43
44pub const MIN_PRECISION: u8 = 4;
46pub const MAX_PRECISION: u8 = 16;
48pub const DEFAULT_PRECISION: u8 = 12;
51
52#[derive(Clone, Debug)]
68#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
69#[cfg_attr(feature = "serde", serde(try_from = "HyperLogLogShadow"))]
70pub struct HyperLogLog {
71 precision: u8,
73 registers: Vec<u8>,
75 total_added: u64,
78}
79
80#[cfg(feature = "serde")]
86#[derive(serde::Serialize, serde::Deserialize)]
87#[allow(clippy::missing_docs_in_private_items)]
88struct HyperLogLogShadow {
89 precision: u8,
90 registers: Vec<u8>,
91 total_added: u64,
92}
93
94#[cfg(feature = "serde")]
95impl TryFrom<HyperLogLogShadow> for HyperLogLog {
96 type Error = RcfError;
97
98 fn try_from(raw: HyperLogLogShadow) -> Result<Self, Self::Error> {
99 if !(MIN_PRECISION..=MAX_PRECISION).contains(&raw.precision) {
100 return Err(RcfError::InvalidConfig(
101 alloc::format!(
102 "HyperLogLog: precision {} out of [{MIN_PRECISION}, {MAX_PRECISION}]",
103 raw.precision
104 )
105 .into(),
106 ));
107 }
108 let expected = 1_usize << raw.precision;
109 if raw.registers.len() != expected {
110 return Err(RcfError::InvalidConfig(
111 alloc::format!(
112 "HyperLogLog: register bank length {} != expected {expected} for precision {}",
113 raw.registers.len(),
114 raw.precision
115 )
116 .into(),
117 ));
118 }
119 Ok(Self {
120 precision: raw.precision,
121 registers: raw.registers,
122 total_added: raw.total_added,
123 })
124 }
125}
126
127impl HyperLogLog {
128 pub fn new(precision: u8) -> RcfResult<Self> {
135 if !(MIN_PRECISION..=MAX_PRECISION).contains(&precision) {
136 return Err(RcfError::InvalidConfig(
137 alloc::format!(
138 "HyperLogLog: precision {precision} out of [{MIN_PRECISION}, {MAX_PRECISION}]"
139 )
140 .into(),
141 ));
142 }
143 let m = 1_usize << precision;
144 Ok(Self {
145 precision,
146 registers: vec![0_u8; m],
147 total_added: 0,
148 })
149 }
150
151 #[must_use]
159 pub fn with_default_precision() -> Self {
160 Self::new(DEFAULT_PRECISION).expect("DEFAULT_PRECISION is in range")
161 }
162
163 #[must_use]
165 pub fn register_count(&self) -> usize {
166 1_usize << self.precision
167 }
168
169 #[must_use]
171 pub fn precision(&self) -> u8 {
172 self.precision
173 }
174
175 #[must_use]
177 pub fn total_added(&self) -> u64 {
178 self.total_added
179 }
180
181 #[must_use]
183 pub fn memory_bytes(&self) -> usize {
184 self.registers.len()
185 }
186
187 pub fn add<T: Hash + ?Sized>(&mut self, value: &T) {
192 let mut h = DefaultHasher::new();
193 value.hash(&mut h);
194 self.add_hash(h.finish());
195 }
196
197 #[inline]
201 pub fn add_bytes(&mut self, key: &[u8]) {
202 let mut h = DefaultHasher::new();
203 key.hash(&mut h);
204 self.add_hash(h.finish());
205 }
206
207 #[allow(clippy::cast_possible_truncation)]
212 #[inline]
213 pub fn add_hash(&mut self, hash: u64) {
214 self.total_added = self.total_added.saturating_add(1);
215 let p = self.precision;
216 let idx = (hash >> (64 - p)) as usize;
220 let tail = hash << p;
224 let rho = if tail == 0 {
225 64 - p + 1
226 } else {
227 (tail.leading_zeros() as u8) + 1
228 };
229 let slot = &mut self.registers[idx];
230 if rho > *slot {
231 *slot = rho;
232 }
233 }
234
235 #[must_use]
237 #[allow(
238 clippy::cast_precision_loss,
239 clippy::cast_possible_truncation,
240 clippy::cast_sign_loss
241 )]
242 pub fn estimate(&self) -> u64 {
243 let m = self.register_count();
244 let m_f = m as f64;
245
246 let mut sum = 0.0_f64;
248 let mut zeros: usize = 0;
249 for &r in &self.registers {
250 if r == 0 {
251 zeros += 1;
252 }
253 sum += 2.0_f64.powi(-(i32::from(r)));
254 }
255
256 let alpha = alpha_m(m);
257 let raw = alpha * m_f * m_f / sum;
258
259 if raw <= 2.5 * m_f && zeros > 0 {
262 let v = zeros as f64;
263 return (m_f * (m_f / v).ln()).round().max(0.0) as u64;
264 }
265
266 raw.round().max(0.0) as u64
269 }
270
271 pub fn merge(&mut self, other: &Self) -> RcfResult<()> {
281 if self.precision != other.precision {
282 return Err(RcfError::InvalidConfig(
283 alloc::format!(
284 "HyperLogLog::merge: precision mismatch ({} vs {})",
285 self.precision,
286 other.precision
287 )
288 .into(),
289 ));
290 }
291 for (slot, other_r) in self.registers.iter_mut().zip(other.registers.iter()) {
292 if *other_r > *slot {
293 *slot = *other_r;
294 }
295 }
296 self.total_added = self.total_added.saturating_add(other.total_added);
297 Ok(())
298 }
299
300 pub fn reset(&mut self) {
302 self.registers.iter_mut().for_each(|r| *r = 0);
303 self.total_added = 0;
304 }
305}
306
307#[must_use]
309#[allow(clippy::cast_precision_loss)]
310fn alpha_m(m: usize) -> f64 {
311 match m {
312 16 => 0.673,
313 32 => 0.697,
314 64 => 0.709,
315 _ => 0.7213 / (1.0 + 1.079 / m as f64),
316 }
317}
318
319#[cfg(test)]
320#[allow(
321 clippy::cast_precision_loss,
322 clippy::cast_possible_truncation,
323 clippy::cast_sign_loss,
324 clippy::cast_possible_wrap,
325 clippy::items_after_statements,
326 clippy::manual_range_contains
327)]
328mod tests {
329 use super::*;
330
331 #[test]
332 fn new_rejects_out_of_range_precision() {
333 assert!(HyperLogLog::new(3).is_err());
334 assert!(HyperLogLog::new(17).is_err());
335 assert!(HyperLogLog::new(MIN_PRECISION).is_ok());
336 assert!(HyperLogLog::new(MAX_PRECISION).is_ok());
337 }
338
339 #[test]
340 fn empty_sketch_estimates_zero() {
341 let hll = HyperLogLog::with_default_precision();
342 assert_eq!(hll.estimate(), 0);
343 }
344
345 #[test]
346 fn exact_cardinality_at_tiny_scale() {
347 let mut hll = HyperLogLog::with_default_precision();
350 for i in 0..10_u32 {
351 hll.add(&i.to_le_bytes());
352 }
353 let est = hll.estimate();
354 assert!(est >= 9 && est <= 11, "est {est}");
355 }
356
357 #[test]
358 fn estimates_within_5pct_at_10k_distinct() {
359 let mut hll = HyperLogLog::with_default_precision();
360 for i in 0..10_000_u32 {
361 hll.add(&i.to_le_bytes());
362 }
363 let est = hll.estimate();
364 let err = (est as i64 - 10_000).unsigned_abs() as f64 / 10_000.0;
365 assert!(err < 0.05, "err {err:.4}, est {est}");
366 }
367
368 #[test]
369 fn estimates_within_3pct_at_100k_distinct_p14() {
370 let mut hll = HyperLogLog::new(14).expect("p=14 in range");
371 for i in 0..100_000_u32 {
372 hll.add(&i.to_le_bytes());
373 }
374 let est = hll.estimate();
375 let err = (est as i64 - 100_000).unsigned_abs() as f64 / 100_000.0;
376 assert!(err < 0.03, "err {err:.4}, est {est}");
377 }
378
379 #[test]
380 fn duplicate_inserts_do_not_inflate_estimate() {
381 let mut hll = HyperLogLog::with_default_precision();
382 for _ in 0..1_000 {
383 for i in 0..100_u32 {
384 hll.add(&i.to_le_bytes());
385 }
386 }
387 let est = hll.estimate();
388 assert!(est >= 90 && est <= 110, "est {est}");
390 assert_eq!(hll.total_added(), 100_000);
391 }
392
393 #[test]
394 fn merge_agrees_with_single_sketch() {
395 let mut a = HyperLogLog::with_default_precision();
396 let mut b = HyperLogLog::with_default_precision();
397 let mut full = HyperLogLog::with_default_precision();
398 for i in 0..5_000_u32 {
399 a.add(&i.to_le_bytes());
400 full.add(&i.to_le_bytes());
401 }
402 for i in 5_000..10_000_u32 {
403 b.add(&i.to_le_bytes());
404 full.add(&i.to_le_bytes());
405 }
406 a.merge(&b).expect("same precision");
407 let merged_est = a.estimate();
410 let full_est = full.estimate();
411 let delta = (merged_est as i64 - full_est as i64).unsigned_abs();
412 assert!(
413 delta < 200,
414 "delta {delta}, merged {merged_est}, full {full_est}"
415 );
416 }
417
418 #[test]
419 fn merge_rejects_precision_mismatch() {
420 let mut a = HyperLogLog::new(10).unwrap();
421 let b = HyperLogLog::new(12).unwrap();
422 assert!(a.merge(&b).is_err());
423 }
424
425 #[test]
426 fn reset_clears_all_registers_and_counter() {
427 let mut hll = HyperLogLog::with_default_precision();
428 for i in 0..1_000_u32 {
429 hll.add(&i.to_le_bytes());
430 }
431 assert!(hll.estimate() > 0);
432 hll.reset();
433 assert_eq!(hll.estimate(), 0);
434 assert_eq!(hll.total_added(), 0);
435 }
436
437 #[test]
438 fn add_hash_path_matches_add_bytes_path() {
439 let mut a = HyperLogLog::with_default_precision();
440 let mut b = HyperLogLog::with_default_precision();
441 for i in 0..1_000_u32 {
442 let bytes = i.to_le_bytes();
443 a.add_bytes(&bytes);
444 use core::hash::Hash;
446 use std::hash::DefaultHasher;
447 let mut h = DefaultHasher::new();
448 bytes.hash(&mut h);
449 b.add_hash(h.finish());
450 }
451 assert_eq!(a.estimate(), b.estimate());
453 }
454
455 #[test]
456 fn memory_bytes_matches_register_count() {
457 let hll = HyperLogLog::new(12).unwrap();
458 assert_eq!(hll.memory_bytes(), 4096);
459 let hll16 = HyperLogLog::new(16).unwrap();
460 assert_eq!(hll16.memory_bytes(), 65_536);
461 }
462
463 #[cfg(all(feature = "serde", feature = "postcard"))]
464 #[test]
465 fn postcard_roundtrip_preserves_estimate() {
466 let mut hll = HyperLogLog::with_default_precision();
467 for i in 0..5_000_u32 {
468 hll.add(&i.to_le_bytes());
469 }
470 let before = hll.estimate();
471 let bytes = postcard::to_allocvec(&hll).expect("serde ok");
472 let back: HyperLogLog = postcard::from_bytes(&bytes).expect("serde ok");
473 assert_eq!(back.estimate(), before);
474 assert_eq!(back.total_added(), hll.total_added());
475 }
476
477 #[cfg(all(feature = "serde", feature = "postcard"))]
478 #[test]
479 fn deserialize_rejects_out_of_range_precision() {
480 let bad = HyperLogLogShadow {
481 precision: MAX_PRECISION + 1,
482 registers: alloc::vec![0_u8; 1 << MAX_PRECISION],
483 total_added: 0,
484 };
485 let bytes = postcard::to_allocvec(&bad).unwrap();
486 let back: Result<HyperLogLog, _> = postcard::from_bytes(&bytes);
487 assert!(back.is_err());
488 }
489
490 #[cfg(all(feature = "serde", feature = "postcard"))]
491 #[test]
492 fn deserialize_rejects_register_length_mismatch() {
493 let bad = HyperLogLogShadow {
494 precision: DEFAULT_PRECISION,
495 registers: alloc::vec![0_u8; 10], total_added: 0,
497 };
498 let bytes = postcard::to_allocvec(&bad).unwrap();
499 let back: Result<HyperLogLog, _> = postcard::from_bytes(&bytes);
500 assert!(back.is_err());
501 }
502}