1#![allow(rustdoc::bare_urls)]
2#![doc = include_str!("../README.md")]
3#![cfg_attr(not(feature = "std"), no_std)]
4
5extern crate alloc;
6use alloc::{boxed::Box, vec::Vec};
7use core::hash::{BuildHasher, Hash, Hasher};
8use core::iter::repeat;
9use core::sync::atomic::Ordering::Relaxed;
10
11#[cfg(feature = "loom")]
12pub(crate) use loom::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize};
13
14#[cfg(not(feature = "loom"))]
15pub(crate) use core::sync::atomic::{AtomicU64, AtomicU8, AtomicUsize};
16
17#[cfg(all(feature = "loom", feature = "serde"))]
18compile_error!("features `loom` and `serde` are mutually exclusive");
19
20mod atomic_f64;
21use atomic_f64::AtomicF64;
22mod beta;
23use beta::beta_horner;
24mod hasher;
25pub use hasher::DefaultHasher;
26mod error;
27pub use error::Error;
28mod math;
29use math::*;
30
31#[derive(Debug, Clone)]
44#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
45pub struct HyperLogLog<S = DefaultHasher> {
46 registers: Box<[u8]>,
48 precision: u32,
50 hasher: S,
51 zeros: usize,
52 sum: f64,
53 correction: f64,
54}
55
56#[derive(Debug)]
70#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
71pub struct AtomicHyperLogLog<S = DefaultHasher> {
72 registers: Box<[AtomicU8]>,
74 precision: u32,
76 hasher: S,
77 zeros: AtomicUsize,
78 sum: AtomicF64,
79 correction: f64,
80}
81
82impl<S: BuildHasher> HyperLogLog<S> {
83 pub fn with_hasher(precision: u8, hasher: S) -> Self {
86 validate_precision(precision);
87 let num_registers = 1 << precision;
88 let data: Vec<_> = repeat(0).take(num_registers).collect();
89 Self {
90 hasher,
91 precision: precision as u32,
92 zeros: data.len(),
93 correction: correction(data.len()),
94 registers: data.into(),
95 sum: f64::from(num_registers as u32),
96 }
97 }
98}
99
100impl<S: BuildHasher> AtomicHyperLogLog<S> {
101 pub fn with_hasher(precision: u8, hasher: S) -> Self {
104 validate_precision(precision);
105 let num_registers = 1 << precision;
106 let data: Vec<_> = repeat(0).take(num_registers).map(AtomicU8::new).collect();
107 Self {
108 hasher,
109 precision: precision as u32,
110 zeros: AtomicUsize::new(data.len()),
111 correction: correction(data.len()),
112 registers: data.into(),
113 sum: AtomicF64::new(f64::from(num_registers as u32)),
114 }
115 }
116}
117
118macro_rules! impl_new {
119 ($name:ident) => {
120 impl $name {
121 pub fn new(precision: u8) -> $name<DefaultHasher> {
124 $name::with_hasher(precision, DefaultHasher::default())
125 }
126
127 pub fn seeded(precision: u8, seed: u128) -> $name<DefaultHasher> {
130 $name::with_hasher(precision, DefaultHasher::seeded(&seed.to_be_bytes()))
131 }
132 }
133 };
134}
135
136impl_new!(HyperLogLog);
137impl_new!(AtomicHyperLogLog);
138
139macro_rules! impl_common {
140 ($name:ident) => {
141 impl<S: BuildHasher> $name<S> {
142 #[inline(always)]
144 pub fn len(&self) -> usize {
145 self.registers.len()
146 }
147
148 #[inline]
150 pub fn count(&self) -> usize {
151 self.raw_count() as usize
152 }
153
154 #[inline(always)]
155 fn raw_count_inner(&self, zeros: usize, sum: f64) -> f64 {
156 let d = sum + beta_horner(zeros, self.precision);
157 self.correction * (self.len() * (self.len() - zeros)) as f64 / d
158 }
159 }
160
161 impl<T: Hash, S: BuildHasher> Extend<T> for $name<S> {
162 #[inline]
163 fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
164 self.insert_all(iter)
165 }
166 }
167
168 impl<S: BuildHasher> PartialEq for $name<S> {
169 fn eq(&self, other: &Self) -> bool {
170 if self.len() != other.len() {
171 return false;
172 }
173 core::iter::zip(self.iter(), other.iter()).all(|(l, r)| l == r)
174 }
175 }
176 impl<S: BuildHasher> Eq for $name<S> {}
177 };
178}
179
180impl_common!(HyperLogLog);
181impl_common!(AtomicHyperLogLog);
182
183impl<S: BuildHasher> HyperLogLog<S> {
184 #[inline]
186 pub fn iter(&self) -> impl Iterator<Item = u8> + '_ {
187 self.registers.iter().map(|x| *x)
188 }
189
190 #[inline(always)]
192 pub fn insert<T: Hash + ?Sized>(&mut self, value: &T) {
193 let mut hasher = self.hasher.build_hasher();
194 value.hash(&mut hasher);
195 self.insert_hash(hasher.finish());
196 }
197
198 #[inline(always)]
200 pub fn insert_hash(&mut self, hash: u64) {
201 let index = hash >> (64 - self.precision);
202 let new = 1 + hash.trailing_zeros() as u8;
203 self.update(new, index as usize);
204 }
205
206 #[inline(always)]
207 fn update(&mut self, new: u8, index: usize) {
208 let old = self.registers[index];
209 self.registers[index] = core::cmp::max(self.registers[index], new);
210 if old == 0 {
211 self.zeros -= 1;
212 }
213 if old < new {
214 let diff = harmonic_term(old) - harmonic_term(new);
215 self.sum -= diff;
216 }
217 }
218
219 #[inline]
221 pub fn insert_all<T: Hash, I: IntoIterator<Item = T>>(&mut self, iter: I) {
222 for val in iter {
223 self.insert(&val);
224 }
225 }
226
227 #[inline(always)]
231 pub fn merge(&mut self, other: &Self) -> Result<(), Error> {
232 if self.len() != other.len() {
233 return Err(Error::IncompatibleLength);
234 }
235
236 for (i, x) in other.iter().enumerate() {
239 self.update(x, i);
240 }
241
242 Ok(())
243 }
244
245 #[inline]
247 pub fn raw_count(&self) -> f64 {
248 let zeros = self.zeros;
249 let sum = self.sum;
250 self.raw_count_inner(zeros, sum)
251 }
252}
253
254impl<S: BuildHasher> AtomicHyperLogLog<S> {
255 #[inline]
257 pub fn iter(&self) -> impl Iterator<Item = u8> + '_ {
258 self.registers.iter().map(|x| x.load(Relaxed))
259 }
260
261 #[inline(always)]
263 pub fn insert<T: Hash + ?Sized>(&self, value: &T) {
264 let mut hasher = self.hasher.build_hasher();
265 value.hash(&mut hasher);
266 self.insert_hash(hasher.finish());
267 }
268
269 #[inline(always)]
271 pub fn insert_hash(&self, hash: u64) {
272 let index = hash >> (64 - self.precision);
273 let new = 1 + hash.trailing_zeros() as u8;
274 self.update(new, index as usize);
275 }
276
277 #[inline(always)]
278 fn update(&self, new: u8, index: usize) {
279 let old = self.registers[index].fetch_max(new, Relaxed);
280 if old == 0 {
281 self.zeros.fetch_sub(1, Relaxed);
282 }
283 if old < new {
284 let diff = harmonic_term(old) - harmonic_term(new);
285 self.sum.fetch_sub(diff, Relaxed);
286 }
287 }
288
289 #[inline]
291 pub fn insert_all<T: Hash, I: IntoIterator<Item = T>>(&self, iter: I) {
292 for val in iter {
293 self.insert(&val);
294 }
295 }
296
297 #[inline(always)]
301 pub fn merge(&self, other: &Self) -> Result<(), Error> {
302 if self.len() != other.len() {
303 return Err(Error::IncompatibleLength);
304 }
305
306 for (i, x) in other.iter().enumerate() {
309 self.update(x, i);
310 }
311
312 Ok(())
313 }
314
315 #[inline]
317 pub fn raw_count(&self) -> f64 {
318 let zeros = self.zeros.load(Relaxed);
319 let sum = self.sum.load(Relaxed);
320 self.raw_count_inner(zeros, sum)
321 }
322}
323
324impl<S: BuildHasher + Clone> Clone for AtomicHyperLogLog<S> {
325 fn clone(&self) -> Self {
326 Self {
327 hasher: self.hasher.clone(),
328 precision: self.precision,
329 zeros: AtomicUsize::new(self.zeros.load(Relaxed)),
330 correction: self.correction,
331 registers: self.iter().map(AtomicU8::new).collect::<Vec<_>>().into(),
332 sum: AtomicF64::new(self.sum.load(Relaxed)),
333 }
334 }
335}
336
337#[inline]
338fn validate_precision(precision: u8) {
339 assert!(
340 (4..=18).contains(&precision),
341 "Precisions 4..=18 supported only."
342 );
343}
344
345#[inline(always)]
347fn harmonic_term(x: u8) -> f64 {
348 f64::from_bits(u64::MAX.wrapping_sub(u64::from(x)) << 54 >> 2)
349}
350
351#[inline]
353pub fn precision_for_error(error: f64) -> u8 {
354 assert!(0.0 < error && error < 1.0);
355 let bias_constant = 1.0389617614136892; ceil(log2(pow(bias_constant / error, 2.0))) as u8
357}
358
359#[inline]
361pub fn error_for_precision(precision: u8) -> f64 {
362 validate_precision(precision);
363 let bias_constant = 1.0389617614136892; bias_constant / sqrt((1u64 << precision) as f64)
365}
366
367#[inline(always)]
368fn correction(count: usize) -> f64 {
369 let base = 0.7213475204444817; let approx = 1.0794415416798357; match count {
373 16 => 0.673,
374 32 => 0.697,
375 64 => 0.709,
376 _ => base / (1.0 + approx / count as f64),
377 }
378}
379
380macro_rules! impl_tests {
381 ($modname:ident, $name:ident) => {
382 #[allow(unused_mut)]
383 #[cfg(not(feature = "loom"))]
384 #[cfg(test)]
385 mod $modname {
386 use super::*;
387 #[test]
388 fn test_clone() {
389 let mut hll = $name::seeded(4, 42);
390 hll.insert_all(1..10);
391 let mut cloned = hll.clone();
392 assert_eq!(hll, cloned);
393 cloned.insert(&42);
394 assert!(hll != cloned);
395 }
396
397 #[test]
398 fn test_low_error() {
399 for (p, thresh) in [(8, 0.15), (9, 0.15), (10, 0.15)] {
400 low_error(p, thresh);
401 }
402 }
403
404 fn low_error(precision: u8, thresh: f64) {
405 let mut hll = $name::seeded(precision, 42);
406 let mut counted = 0;
407 let mut total_err = 0f64;
408 let mut total_diff = 0f64;
409
410 for x in 1..10_000_000 {
411 hll.insert(&x);
412 if x % 1_00_000 == 0 {
413 let real = x as f64;
414 let diff = hll.raw_count() - real;
415 let err = diff.abs() / real;
416 assert!(err < thresh, "{}", err);
417
418 counted += 1;
419 total_err += err;
420 total_diff += diff / real;
421 }
422 }
423
424 assert!((total_err - total_diff).abs() / counted as f64 > 0.01);
425 }
426
427 #[test]
428 fn test_merge() {
429 let mut left = $name::seeded(8, 42);
430 let mut right = $name::seeded(8, 42);
431
432 for x in 1..2000 {
433 left.insert(&x);
434 }
435 for x in 1000..3000 {
436 right.insert(&x);
437 }
438
439 left.merge(&right).unwrap();
440
441 let real = 3000 as f64;
442 let my_acc = (real - (left.count() as f64 - real).abs()) / real;
443 assert!(my_acc > 0.75, "{}", my_acc);
444 }
445
446 #[cfg(feature = "serde")]
447 #[test]
448 fn test_serde() {
449 for precision in 4..=18 {
450 let mut before = $name::seeded(precision, 42);
451 before.extend(0..=1000);
452
453 let s = serde_cbor::to_vec(&before).unwrap();
454 let mut after: $name = serde_cbor::from_slice(&s).unwrap();
455 assert_eq!(before, after);
456
457 before.extend(1000..=2000);
458 after.extend(1000..=2000);
459 assert_eq!(before, after);
460 }
461 }
462
463 #[test]
464 fn test_error_helpers() {
465 for precision in 4..=18 {
466 let err = error_for_precision(precision);
467 let prec = precision_for_error(err);
468 assert_eq!(prec, precision);
469 }
470 }
471 }
472 };
473}
474
475impl_tests!(non_atomic, HyperLogLog);
476impl_tests!(atomic, AtomicHyperLogLog);
477
478#[cfg(feature = "loom")]
479#[cfg(test)]
480mod loom_tests {
481 use super::*;
482 use core::sync::atomic::Ordering;
483
484 #[test]
485 fn test_loom() {
486 loom::model(|| {
487 let hll = loom::sync::Arc::new(AtomicHyperLogLog::seeded(4, 42));
488 let expected = AtomicHyperLogLog::seeded(4, 42);
489 expected.insert_all(1..=4);
490 let handles: Vec<_> = [(1..=2), (2..=4)]
491 .into_iter()
492 .map(|data| {
493 let v = hll.clone();
494 loom::thread::spawn(move || v.insert_all(data))
495 })
496 .collect();
497
498 for handle in handles {
499 handle.join().unwrap();
500 }
501 let res = hll.iter().collect::<Vec<_>>();
502 assert_eq!(res, expected.iter().collect::<Vec<_>>());
503 assert_eq!(
504 hll.zeros.load(Ordering::SeqCst),
505 expected.zeros.load(Ordering::SeqCst)
506 );
507 assert_eq!(
508 hll.sum.load(Ordering::SeqCst),
509 expected.sum.load(Ordering::SeqCst)
510 );
511 });
512 }
513}